diff options
Diffstat (limited to 'tester/rt/exe.py')
-rw-r--r-- | tester/rt/exe.py | 172 |
1 files changed, 172 insertions, 0 deletions
diff --git a/tester/rt/exe.py b/tester/rt/exe.py new file mode 100644 index 0000000..5655073 --- /dev/null +++ b/tester/rt/exe.py @@ -0,0 +1,172 @@ +# SPDX-License-Identifier: BSD-2-Clause +'''Executable test target.''' + +# Copyright (C) 2013-2020 Chris Johns (chrisj@rtems.org) +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +from __future__ import print_function + +import datetime +import os +import threading +import time + +from rtemstoolkit import execute + + +class exe(object): + '''RTEMS Testing EXE base.''' + + # pylint: disable=useless-object-inheritance + # pylint: disable=too-many-instance-attributes + + def __init__(self, bsp_arch, bsp, trace=False): + self.trace = trace + self.lock_trace = False + self.lock_locked = None + self.lock = threading.RLock() + self.bsp = bsp + self.bsp_arch = bsp_arch + self.output = None + self.output_length = 0 + self.process = None + self.ecode = None + self.output = None + self.output_buffer = '' + self.console = None + self.timeout = None + self.test_too_long = None + self.kill_good = True + + def _lock(self, msg): + if self.lock_trace: + print('|[ LOCK:%s ]|' % (msg)) + self.lock_locked = datetime.datetime.now() + self.lock.acquire() + + def _unlock(self, msg): + if self.lock_trace: + period = datetime.datetime.now() - self.lock_locked + print('|] UNLOCK:%s [| : %s' % (msg, period)) + self.lock.release() + + def _capture(self, text): + self._lock('_capture') + self.output_length += len(text) + self._unlock('_capture') + if self.output is not None: + self.output(text) + + def _timeout(self): + self._kill() + if self.timeout is not None: + self.timeout() + + def _test_too_long(self): + self._kill() + if self.test_too_long is not None: + self.test_too_long() + + def _kill(self): + self._lock('_kill') + self.kill_good = True + self._unlock('_kill') + if self.process: + # pylint: disable=bare-except + try: + self.process.kill() + except: + pass + self.process = None + + def _execute(self, args): + '''Thread to execute the test and to wait for it to finish.''' + # pylint: disable=unused-variable + cmds = args + if self.console is not None: + self.console('exe: %s' % (' '.join(cmds))) + ecode, proc = self.process.open(cmds) + if self.trace: + print('gdb done', ecode) + self._lock('_execute') + self.ecode = ecode + self.process = None + self._unlock('_execute') + + def _monitor(self, timeout): + output_length = self.output_length + step = 0.25 + period = timeout[0] + seconds = timeout[1] + while self.process and period > 0 and seconds > 0: + current_length = self.output_length + if output_length != current_length: + period = timeout[0] + output_length = current_length + if seconds < step: + seconds = 0 + else: + seconds -= step + if period < step: + step = period + period = 0 + else: + period -= step + self._unlock('_monitor') + time.sleep(step) + self._lock('_monitor') + if self.process is not None: + if period == 0: + self._timeout() + elif seconds == 0: + self._test_too_long() + + def open(self, command, ignore_exit_code, output, console, timeout): + '''Open the execute test run''' + # pylint: disable=too-many-arguments + self._lock('_open') + self.timeout = timeout[2] + self.test_too_long = timeout[3] + try: + cmds = execute.arg_list(command) + self.output = output + self.console = console + self.process = execute.execute(output=self._capture) + exec_thread = threading.Thread(target=self._execute, args=[cmds]) + exec_thread.start() + self._monitor(timeout) + if self.ecode is not None and \ + not (self.kill_good or ignore_exit_code) and self.ecode > 0: + if self.output: + self.output('*** TARGET ERROR %d %s ***' % + (self.ecode, os.strerror(self.ecode))) + finally: + self._unlock('_open') + + def kill(self): + '''Kill the test run.''' + self._lock('_kill') + try: + self._kill() + finally: + self._unlock('_kill') |