From c2d23387595c8d4e5568a498b948f9dde2790c49 Mon Sep 17 00:00:00 2001 From: Chris Johns Date: Tue, 18 Dec 2018 15:08:43 +1100 Subject: sb/execute: Port the rtemstoolkit performance fixes for python3 Close #3664. --- source-builder/sb/execute.py | 332 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 278 insertions(+), 54 deletions(-) diff --git a/source-builder/sb/execute.py b/source-builder/sb/execute.py index 12d8114..0c25163 100755 --- a/source-builder/sb/execute.py +++ b/source-builder/sb/execute.py @@ -1,6 +1,6 @@ # # RTEMS Tools Project (http://www.rtems.org/) -# Copyright 2010-2016 Chris Johns (chrisj@rtems.org) +# Copyright 2010-2017 Chris Johns (chrisj@rtems.org) # All rights reserved. # # This file is part of the RTEMS Tools package in 'rtems-tools'. @@ -8,7 +8,7 @@ # Permission to use, copy, modify, and/or distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notice and this permission notice appear in all copies. -# + # # THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR @@ -16,6 +16,7 @@ # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF # OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +# # # Execute commands or scripts. @@ -26,15 +27,21 @@ from __future__ import print_function import functools +import io import os import re import sys import subprocess import threading +import time +import traceback import error import log +# Trace exceptions +trace_threads = False + # Redefine the PIPE from subprocess PIPE = subprocess.PIPE @@ -86,75 +93,240 @@ def arg_subst_str(command, subst): def add(x, y): return x + ' ' + str(y) return functools.reduce(add, cmd, '') -class execute: - """Execute commands or scripts. The 'output' is a funtion - that handles the output from the process.""" - def __init__(self, output = None, error_prefix = '', verbose = False): +class execute(object): + """Execute commands or scripts. The 'output' is a funtion that handles the + output from the process. The 'input' is a function that blocks and returns + data to be written to stdin""" + def __init__(self, output = None, input = None, cleanup = None, + error_prefix = '', verbose = False): + self.lock = threading.Lock() self.output = output + self.input = input + self.cleanup = cleanup self.error_prefix = error_prefix self.verbose = verbose self.shell_exe = None self.shell_commands = False self.path = None self.environment = None - - def capture(self, proc, timeout = None): - """Create 2 threads to read stdout and stderr and send to the - output handler. Based on the 'communicate' code in the subprocess - module.""" - def _readthread(fh, out, prefix = ''): + self.outputting = False + self.timing_out = False + self.proc = None + + def capture(self, proc, command = 'pipe', timeout = None): + """Create 3 threads to read stdout and stderr and send to the output handler + and call an input handler is provided. Based on the 'communicate' code + in the subprocess module.""" + def _writethread(exe, fh, input): + """Call the input handler and write it to the stdin. The input handler should + block and return None or False if this thread is to exit and True if this + is a timeout check.""" + if trace_threads: + print('execute:_writethread: start') + encoding = True + try: + tmp = bytes('temp', sys.stdin.encoding) + except: + encoding = False + try: + while True: + if trace_threads: + print('execute:_writethread: call input', input) + lines = input() + if type(lines) == str or type(lines) == bytes: + try: + if encoding: + lines = bytes(lines, sys.stdin.encoding) + fh.write(lines) + fh.flush() + except: + break + if lines == None or \ + lines == False or \ + (lines == True and fh.closed): + break + except: + if trace_threads: + print('execute:_writethread: exception') + print(traceback.format_exc()) + pass + try: + fh.close() + except: + pass + if trace_threads: + print('execute:_writethread: finished') + + def _readthread(exe, fh, out, prefix = ''): """Read from a file handle and write to the output handler until the file closes.""" - count = 0 - while True: - line = fh.readline() - # str and bytes are the same type in Python2 - if type(line) is not str and type(line) is bytes: - line = line.decode(sys.stdout.encoding) - count += 1 - if len(line) == 0: - break + def _output_line(line, exe, prefix, out, count): + #exe.lock.acquire() + #exe.outputting = True + #exe.lock.release() if out: out(prefix + line) else: log.output(prefix + line) if count > 10: log.flush() - count = 0 - def _timerthread(proc, timer): - """Timer thread calls the timer handler if one - is present once a second. The user provides a handler - and returns False to kill the process or True continue.""" - while True: + if trace_threads: + print('execute:_readthread: start') + count = 0 + line = '' + try: + while True: + # + # The io module file handling return up to the size passed + # in to the read call. The io handle has the default + # buffering size. On any error assume the handle has gone + # and the process is shutting down. + # + try: + data = fh.read1(4096) + except: + data = '' + if len(data) == 0: + if len(line) > 0: + _output_line(line + '\n', exe, prefix, out, count) + break + # str and bytes are the same type in Python2 + if type(data) is not str and type(data) is bytes: + data = data.decode(sys.stdout.encoding) + last_ch = data[-1] + sd = (line + data).split('\n') + if last_ch != '\n': + line = sd[-1] + else: + line = '' + sd = sd[:-1] + if len(sd) > 0: + for l in sd: + _output_line(l + '\n', exe, prefix, out, count) + count += 1 + if count > 10: + count -= 10 + except: + raise + if trace_threads: + print('execute:_readthread: exception') + print(traceback.format_exc()) + pass + try: + fh.close() + except: + pass + if len(line): + _output_line(line, exe, prefix, out, 100) + if trace_threads: + print('execute:_readthread: finished') + + def _timerthread(exe, interval, function): + """Timer thread is used to timeout a process if no output is + produced for the timeout interval.""" + count = interval + while exe.timing_out: time.sleep(1) - if not timer(proc): - proc.stdout.close() - proc.stderr.close() + if count > 0: + count -= 1 + exe.lock.acquire() + if exe.outputting: + count = interval + exe.outputting = False + exe.lock.release() + if count == 0: + try: + proc.kill() + except: + pass + else: + function() + break + + name = os.path.basename(command[0]) + + stdin_thread = None + stdout_thread = None + stderr_thread = None + timeout_thread = None if proc.stdout: stdout_thread = threading.Thread(target = _readthread, - args = (proc.stdout, + name = '_stdout[%s]' % (name), + args = (self, + io.open(proc.stdout.fileno(), + mode = 'rb', + closefd = False), self.output, '')) - stdout_thread.setDaemon(True) + stdout_thread.daemon = True stdout_thread.start() if proc.stderr: stderr_thread = threading.Thread(target = _readthread, - args = (proc.stderr, + name = '_stderr[%s]' % (name), + args = (self, + io.open(proc.stderr.fileno(), + mode = 'rb', + closefd = False), self.output, self.error_prefix)) - stderr_thread.setDaemon(True) + stderr_thread.daemon = True stderr_thread.start() - if proc.stdout: - stdout_thread.join() - if proc.stderr: - stderr_thread.join() - return proc.wait() + if self.input and proc.stdin: + stdin_thread = threading.Thread(target = _writethread, + name = '_stdin[%s]' % (name), + args = (self, + proc.stdin, + self.input)) + stdin_thread.daemon = True + stdin_thread.start() + if timeout: + self.timing_out = True + timeout_thread = threading.Thread(target = _timerthread, + name = '_timeout[%s]' % (name), + args = (self, + timeout[0], + timeout[1])) + timeout_thread.daemon = True + timeout_thread.start() + try: + self.lock.acquire() + try: + self.proc = proc + except: + raise + finally: + self.lock.release() + exitcode = proc.wait() + except: + proc.kill() + raise + finally: + self.lock.acquire() + try: + self.proc = None + except: + raise + finally: + self.lock.release() + if self.cleanup: + self.cleanup(proc) + if timeout_thread: + self.timing_out = False + timeout_thread.join(10) + if stdin_thread: + stdin_thread.join(2) + if stdout_thread: + stdout_thread.join(2) + if stderr_thread: + stderr_thread.join(2) + return exitcode def open(self, command, capture = True, shell = False, cwd = None, env = None, - stdin = None, stdout = None, stderr = None): + stdin = None, stdout = None, stderr = None, + timeout = None): """Open a command with arguments. Provide the arguments as a list or a string.""" if self.verbose: @@ -166,9 +338,13 @@ class execute: if shell: what = 'shell' log.output(what + ': ' + s) + if self.output is None: + raise error.general('capture needs an output handler') if shell and self.shell_exe: command = arg_list(command) command[:0] = self.shell_exe + if not stdin and self.input: + stdin = subprocess.PIPE if not stdout: stdout = subprocess.PIPE if not stderr: @@ -191,10 +367,13 @@ class execute: proc = subprocess.Popen(command, shell = shell, cwd = cwd, env = env, stdin = stdin, stdout = stdout, - stderr = stderr) + stderr = stderr, + close_fds = False) if not capture: return (0, proc) - exit_code = self.capture(proc) + if self.output is None: + raise error.general('capture needs an output handler') + exit_code = self.capture(proc, command, timeout) if self.verbose: log.output('exit: ' + str(exit_code)) except OSError as ose: @@ -204,23 +383,26 @@ class execute: return (exit_code, proc) def spawn(self, command, capture = True, cwd = None, env = None, - stdin = None, stdout = None, stderr = None): + stdin = None, stdout = None, stderr = None, + timeout = None): """Spawn a command with arguments. Provide the arguments as a list or a string.""" return self.open(command, capture, False, cwd, env, - stdin, stdout, stderr) + stdin, stdout, stderr, timeout) def shell(self, command, capture = True, cwd = None, env = None, - stdin = None, stdout = None, stderr = None): + stdin = None, stdout = None, stderr = None, + timeout = None): """Execute a command within a shell context. The command can contain argumments. The shell is specific to the operating system. For example it is cmd.exe on Windows XP.""" return self.open(command, capture, True, cwd, env, - stdin, stdout, stderr) + stdin, stdout, stderr, timeout) def command(self, command, args = None, capture = True, shell = False, cwd = None, env = None, - stdin = None, stdout = None, stderr = None): + stdin = None, stdout = None, stderr = None, + timeout = None): """Run the command with the args. The args can be a list or a string.""" if args and not type(args) is list: @@ -230,18 +412,21 @@ class execute: cmd.extend(args) return self.open(cmd, capture = capture, shell = shell, cwd = cwd, env = env, - stdin = stdin, stdout = stdout, stderr = stderr) + stdin = stdin, stdout = stdout, stderr = stderr, + timeout = timeout) def command_subst(self, command, substs, capture = True, shell = False, cwd = None, env = None, - stdin = None, stdout = None, stderr = None): + stdin = None, stdout = None, stderr = None, + timeout = None): """Run the command from the config data with the option format string subsituted with the subst variables.""" args = arg_subst(command, substs) return self.command(args[0], args[1:], capture = capture, shell = shell or self.shell_commands, cwd = cwd, env = env, - stdin = stdin, stdout = stdout, stderr = stderr) + stdin = stdin, stdout = stdout, stderr = stderr, + timeout = timeout) def set_shell(self, execute): """Set the shell to execute when issuing a shell command.""" @@ -275,6 +460,37 @@ class execute: self.environment = environment return old_environment + def kill(self): + self.lock.acquire() + try: + if self.proc is not None: + self.proc.kill() + except: + raise + finally: + self.lock.release() + + def terminate(self): + self.lock.acquire() + try: + if self.proc is not None: + self.proc.terminate() + except: + raise + finally: + self.lock.release() + + def send_signal(self, signal): + self.lock.acquire() + try: + if self.proc is not None: + print("sending sig") + self.proc.send_signal(signal) + except: + raise + finally: + self.lock.release() + class capture_execution(execute): """Capture all output as a string and return it.""" @@ -303,13 +519,14 @@ class capture_execution(execute): verbose = verbose) def open(self, command, capture = True, shell = False, cwd = None, env = None, - stdin = None, stdout = None, stderr = None): + stdin = None, stdout = None, stderr = None, timeout = None): if not capture: raise error.general('output capture must be true; leave as default') #self.snapper.get_and_clear() exit_code, proc = execute.open(self, command, capture = True, shell = shell, cwd = cwd, env = env, - stdin = stdin, stdout = stdout, stderr = stderr) + stdin = stdin, stdout = stdout, stderr = stderr, + timeout = timeout) return (exit_code, proc, self.snapper.get_and_clear()) def set_output(self, output): @@ -333,11 +550,18 @@ if __name__ == "__main__": if ec == 0: print('piping input into ' + commands['pipe'][0] + ': ' + \ commands['pipe'][2]) - proc.stdin.write(bytes(commands['pipe'][2], sys.stdin.encoding)) + try: + out = bytes(commands['pipe'][2], sys.stdin.encoding) + except: + out = commands['pipe'][2] + proc.stdin.write(out) proc.stdin.close() e.capture(proc) del proc + def capture_output(text): + print(text, end = '') + cmd_shell_test = 'if "%OS%" == "Windows_NT" (echo It is WinNT) else echo Is is not WinNT' sh_shell_test = 'x="me"; if [ $x = "me" ]; then echo "It was me"; else "It was him"; fi' @@ -363,7 +587,7 @@ if __name__ == "__main__": print(arg_subst(['nothing', 'xx-%0-yyy', '%1', '%2-something'], ['subst0', 'subst1', 'subst2'])) - e = execute(error_prefix = 'ERR: ', verbose = True) + e = execute(error_prefix = 'ERR: ', output = capture_output, verbose = True) if sys.platform == "win32": run_tests(e, commands['windows'], False) if os.path.exists('c:\\msys\\1.0\\bin\\sh.exe'): -- cgit v1.2.3