summaryrefslogtreecommitdiffstats
path: root/tester/rt/exe.py
diff options
context:
space:
mode:
Diffstat (limited to 'tester/rt/exe.py')
-rw-r--r--tester/rt/exe.py172
1 files changed, 172 insertions, 0 deletions
diff --git a/tester/rt/exe.py b/tester/rt/exe.py
new file mode 100644
index 0000000..5655073
--- /dev/null
+++ b/tester/rt/exe.py
@@ -0,0 +1,172 @@
+# SPDX-License-Identifier: BSD-2-Clause
+'''Executable test target.'''
+
+# Copyright (C) 2013-2020 Chris Johns (chrisj@rtems.org)
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+
+from __future__ import print_function
+
+import datetime
+import os
+import threading
+import time
+
+from rtemstoolkit import execute
+
+
+class exe(object):
+ '''RTEMS Testing EXE base.'''
+
+ # pylint: disable=useless-object-inheritance
+ # pylint: disable=too-many-instance-attributes
+
+ def __init__(self, bsp_arch, bsp, trace=False):
+ self.trace = trace
+ self.lock_trace = False
+ self.lock_locked = None
+ self.lock = threading.RLock()
+ self.bsp = bsp
+ self.bsp_arch = bsp_arch
+ self.output = None
+ self.output_length = 0
+ self.process = None
+ self.ecode = None
+ self.output = None
+ self.output_buffer = ''
+ self.console = None
+ self.timeout = None
+ self.test_too_long = None
+ self.kill_good = True
+
+ def _lock(self, msg):
+ if self.lock_trace:
+ print('|[ LOCK:%s ]|' % (msg))
+ self.lock_locked = datetime.datetime.now()
+ self.lock.acquire()
+
+ def _unlock(self, msg):
+ if self.lock_trace:
+ period = datetime.datetime.now() - self.lock_locked
+ print('|] UNLOCK:%s [| : %s' % (msg, period))
+ self.lock.release()
+
+ def _capture(self, text):
+ self._lock('_capture')
+ self.output_length += len(text)
+ self._unlock('_capture')
+ if self.output is not None:
+ self.output(text)
+
+ def _timeout(self):
+ self._kill()
+ if self.timeout is not None:
+ self.timeout()
+
+ def _test_too_long(self):
+ self._kill()
+ if self.test_too_long is not None:
+ self.test_too_long()
+
+ def _kill(self):
+ self._lock('_kill')
+ self.kill_good = True
+ self._unlock('_kill')
+ if self.process:
+ # pylint: disable=bare-except
+ try:
+ self.process.kill()
+ except:
+ pass
+ self.process = None
+
+ def _execute(self, args):
+ '''Thread to execute the test and to wait for it to finish.'''
+ # pylint: disable=unused-variable
+ cmds = args
+ if self.console is not None:
+ self.console('exe: %s' % (' '.join(cmds)))
+ ecode, proc = self.process.open(cmds)
+ if self.trace:
+ print('gdb done', ecode)
+ self._lock('_execute')
+ self.ecode = ecode
+ self.process = None
+ self._unlock('_execute')
+
+ def _monitor(self, timeout):
+ output_length = self.output_length
+ step = 0.25
+ period = timeout[0]
+ seconds = timeout[1]
+ while self.process and period > 0 and seconds > 0:
+ current_length = self.output_length
+ if output_length != current_length:
+ period = timeout[0]
+ output_length = current_length
+ if seconds < step:
+ seconds = 0
+ else:
+ seconds -= step
+ if period < step:
+ step = period
+ period = 0
+ else:
+ period -= step
+ self._unlock('_monitor')
+ time.sleep(step)
+ self._lock('_monitor')
+ if self.process is not None:
+ if period == 0:
+ self._timeout()
+ elif seconds == 0:
+ self._test_too_long()
+
+ def open(self, command, ignore_exit_code, output, console, timeout):
+ '''Open the execute test run'''
+ # pylint: disable=too-many-arguments
+ self._lock('_open')
+ self.timeout = timeout[2]
+ self.test_too_long = timeout[3]
+ try:
+ cmds = execute.arg_list(command)
+ self.output = output
+ self.console = console
+ self.process = execute.execute(output=self._capture)
+ exec_thread = threading.Thread(target=self._execute, args=[cmds])
+ exec_thread.start()
+ self._monitor(timeout)
+ if self.ecode is not None and \
+ not (self.kill_good or ignore_exit_code) and self.ecode > 0:
+ if self.output:
+ self.output('*** TARGET ERROR %d %s ***' %
+ (self.ecode, os.strerror(self.ecode)))
+ finally:
+ self._unlock('_open')
+
+ def kill(self):
+ '''Kill the test run.'''
+ self._lock('_kill')
+ try:
+ self._kill()
+ finally:
+ self._unlock('_kill')