summaryrefslogblamecommitdiffstats
path: root/tester/rt/exe.py
blob: 5655073a783b6d6bbc986a95e16abdae4a9d8e96 (plain) (tree)











































































































































































                                                                             
# SPDX-License-Identifier: BSD-2-Clause
'''Executable test target.'''

# Copyright (C) 2013-2020 Chris Johns (chrisj@rtems.org)
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions and the following disclaimer in the
#    documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

from __future__ import print_function

import datetime
import os
import threading
import time

from rtemstoolkit import execute


class exe(object):
    '''RTEMS Testing EXE base.'''

    # pylint: disable=useless-object-inheritance
    # pylint: disable=too-many-instance-attributes

    def __init__(self, bsp_arch, bsp, trace=False):
        self.trace = trace
        self.lock_trace = False
        self.lock_locked = None
        self.lock = threading.RLock()
        self.bsp = bsp
        self.bsp_arch = bsp_arch
        self.output = None
        self.output_length = 0
        self.process = None
        self.ecode = None
        self.output = None
        self.output_buffer = ''
        self.console = None
        self.timeout = None
        self.test_too_long = None
        self.kill_good = True

    def _lock(self, msg):
        if self.lock_trace:
            print('|[   LOCK:%s ]|' % (msg))
            self.lock_locked = datetime.datetime.now()
        self.lock.acquire()

    def _unlock(self, msg):
        if self.lock_trace:
            period = datetime.datetime.now() - self.lock_locked
            print('|] UNLOCK:%s [| : %s' % (msg, period))
        self.lock.release()

    def _capture(self, text):
        self._lock('_capture')
        self.output_length += len(text)
        self._unlock('_capture')
        if self.output is not None:
            self.output(text)

    def _timeout(self):
        self._kill()
        if self.timeout is not None:
            self.timeout()

    def _test_too_long(self):
        self._kill()
        if self.test_too_long is not None:
            self.test_too_long()

    def _kill(self):
        self._lock('_kill')
        self.kill_good = True
        self._unlock('_kill')
        if self.process:
            # pylint: disable=bare-except
            try:
                self.process.kill()
            except:
                pass
        self.process = None

    def _execute(self, args):
        '''Thread to execute the test and to wait for it to finish.'''
        # pylint: disable=unused-variable
        cmds = args
        if self.console is not None:
            self.console('exe: %s' % (' '.join(cmds)))
        ecode, proc = self.process.open(cmds)
        if self.trace:
            print('gdb done', ecode)
        self._lock('_execute')
        self.ecode = ecode
        self.process = None
        self._unlock('_execute')

    def _monitor(self, timeout):
        output_length = self.output_length
        step = 0.25
        period = timeout[0]
        seconds = timeout[1]
        while self.process and period > 0 and seconds > 0:
            current_length = self.output_length
            if output_length != current_length:
                period = timeout[0]
            output_length = current_length
            if seconds < step:
                seconds = 0
            else:
                seconds -= step
            if period < step:
                step = period
                period = 0
            else:
                period -= step
            self._unlock('_monitor')
            time.sleep(step)
            self._lock('_monitor')
        if self.process is not None:
            if period == 0:
                self._timeout()
            elif seconds == 0:
                self._test_too_long()

    def open(self, command, ignore_exit_code, output, console, timeout):
        '''Open the execute test run'''
        # pylint: disable=too-many-arguments
        self._lock('_open')
        self.timeout = timeout[2]
        self.test_too_long = timeout[3]
        try:
            cmds = execute.arg_list(command)
            self.output = output
            self.console = console
            self.process = execute.execute(output=self._capture)
            exec_thread = threading.Thread(target=self._execute, args=[cmds])
            exec_thread.start()
            self._monitor(timeout)
            if self.ecode is not None and \
               not (self.kill_good or ignore_exit_code) and self.ecode > 0:
                if self.output:
                    self.output('*** TARGET ERROR %d %s ***' %
                                (self.ecode, os.strerror(self.ecode)))
        finally:
            self._unlock('_open')

    def kill(self):
        '''Kill the test run.'''
        self._lock('_kill')
        try:
            self._kill()
        finally:
            self._unlock('_kill')