# SPDX-License-Identifier: BSD-2-Clause """ This module provides a test output parser. """ # Copyright (C) 2022 embedded brains GmbH (http://www.embedded-brains.de) # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. import base64 import hashlib import re from typing import Any, Dict, Iterable _TEST_BEGIN = re.compile(r"\*\*\* BEGIN OF TEST ([^*]+) \*\*\*") _TEST_VERSION = re.compile(r"\*\*\* TEST VERSION: (.+)") _TEST_STATE = re.compile(r"\*\*\* TEST STATE: (.+)") _TEST_BUILD = re.compile(r"\*\*\* TEST BUILD: ?(.*)") _TEST_TOOLS = re.compile(r"\*\*\* TEST TOOLS: (.+)") _TEST_END = re.compile(r"\*\*\* END OF TEST ([^*]+) \*\*\*") _TS_SUITE_BEGIN = re.compile(r"A:(.+)") _TS_SUITE_END = re.compile(r"Z:([^:]+):C:([^:]+):N:([^:]+):F:([^:]+):D:(.+)") _TS_CASE_BEGIN = re.compile(r"B:(.+)") _TS_CASE_END = re.compile(r"E:([^:]+):N:([^:]+):F:([^:]+):D:(.+)") _TS_PLATFORM = re.compile(r"S:Platform:(.+)") _TS_COMPILER = re.compile(r"S:Compiler:(.+)") _TS_VERSION = re.compile(r"S:Version:(.+)") _TS_BSP = re.compile(r"S:BSP:(.+)") _TS_BUILD_LABEL = re.compile(r"S:BuildLabel:(.+)") _TS_TARGET_HASH = re.compile(r"S:TargetHash:SHA256:(.*)") _TS_RTEMS_DEBUG = re.compile(r"S:RTEMS_DEBUG:([01])$") _TS_RTEMS_MULTIPROCESSING = re.compile(r"S:RTEMS_MULTIPROCESSING:([01])$") _TS_RTEMS_POSIX_API = re.compile(r"S:RTEMS_POSIX_API:([01])$") _TS_RTEMS_PROFILING = re.compile(r"S:RTEMS_PROFILING:([01])$") _TS_RTEMS_SMP = re.compile(r"S:RTEMS_SMP:([01])$") _TS_REPORT_HASH = re.compile(r"Y:ReportHash:SHA256:(.+)") _M_BEGIN = re.compile(r"M:B:(.+)") _M_END = re.compile(r"M:E:([^:]+):D:(.+)") _M_V = re.compile(r"M:V:(.+)") _M_N = re.compile(r"M:N:(.+)") _M_S = re.compile(r"M:S:([^:]+):(.+)") _M_MI = re.compile(r"M:MI:(.+)") _M_P1 = re.compile(r"M:P1:(.+)") _M_Q1 = re.compile(r"M:Q1:(.+)") _M_Q2 = re.compile(r"M:Q2:(.+)") _M_Q3 = re.compile(r"M:Q3:(.+)") _M_P99 = re.compile(r"M:P99:(.+)") _M_MX = re.compile(r"M:MX:(.+)") _M_MAD = re.compile(r"M:MAD:(.+)") _M_D = re.compile(r"M:D:(.+)") _GCOV_BEGIN = re.compile(r"\*\*\* BEGIN OF GCOV INFO BASE64 \*\*\*") _GCOV_END = re.compile(r"\*\*\* END OF GCOV INFO BASE64 \*\*\*") _RECORDS_BEGIN = re.compile(r"\*\*\* BEGIN OF RECORDS BASE64 \*\*\*") _RECORDS_END = re.compile(r"\*\*\* END OF RECORDS BASE64 \*\*\*") _RECORDS_ZLIB_BEGIN = re.compile(r"\*\*\* BEGIN OF RECORDS BASE64 ZLIB \*\*\*") _RECORDS_ZLIB_END = re.compile(r"\*\*\* END OF RECORDS BASE64 ZLIB \*\*\*") def _are_samples_valid(measurement) -> bool: if len(measurement["samples"]) != measurement["sample-count"]: return False if not measurement["samples"]: return True if measurement["min"] != measurement["samples"][0]: return False return measurement["max"] == measurement["samples"][-1] class TestOutputParser: """ Provides a line by line parser of test output. """ # pylint: disable=too-few-public-methods def __init__(self, data) -> None: self.data = data self.consume = self._test_begin self.hash_line = self._hash_none assert "info" not in data self.data["info"] = {} self.data["data-ranges"] = [] assert "test-suite" not in data self.level = 0 self._hash_state = hashlib.sha256() self._measurement: Dict[str, Any] = {} self._test_case: Dict[str, Any] = {} def _error(self, index: int) -> bool: assert "line-parser-error" not in self.data self.data["line-parser-error"] = index self.consume = self._extra return False def _hash_none(self, line: str) -> None: pass def _hash_sha256(self, line: str) -> None: self._hash_state.update(f"{line}\n".encode("ascii")) def _test_begin(self, index: int, line: str) -> bool: mobj = _TEST_BEGIN.match(line) if mobj: self.level += 1 self.data["info"]["line-begin-of-test"] = index self.data["info"]["name"] = mobj.group(1) self.consume = self._test_version return True return self._extra(index, line) def _test_version(self, index: int, line: str) -> bool: mobj = _TEST_VERSION.match(line) if mobj: self.data["info"]["version"] = mobj.group(1) self.data["info"]["line-version"] = index self.consume = self._test_state return True self.consume = self._test_body return self._test_body(index, line) def _test_state(self, index: int, line: str) -> bool: mobj = _TEST_STATE.match(line) if mobj: self.data["info"]["state"] = mobj.group(1) self.data["info"]["line-state"] = index self.consume = self._test_build return True return self._error(index) def _test_build(self, index: int, line: str) -> bool: mobj = _TEST_BUILD.match(line) if mobj: build = mobj.group(1) if build: self.data["info"]["build"] = build.split(", ") else: self.data["info"]["build"] = [] self.data["info"]["line-build"] = index self.consume = self._test_tools return True return self._error(index) def _test_tools(self, index: int, line: str) -> bool: mobj = _TEST_TOOLS.match(line) if mobj: self.data["info"]["tools"] = mobj.group(1) self.data["info"]["line-tools"] = index self.consume = self._test_body return True return self._error(index) def _test_body(self, index: int, line: str) -> bool: if self._test_suite_begin(index, line): return True mobj = _TEST_END.match(line) if mobj: self.level -= 1 if self.data["info"]["name"] == mobj.group(1): self.data["info"]["line-end-of-test"] = index self.consume = self._extra return True return self._error(index) return self._extra(index, line) def _test_suite_begin(self, index: int, line: str) -> bool: mobj = _TS_SUITE_BEGIN.match(line) if mobj: self.level += 1 self.data["test-suite"] = { "duration": "?", "failed-steps-count": "?", "line-begin": index, "line-duration": "?", "line-end": "?", "line-failed-steps-count": "?", "line-report-hash": "?", "line-step-count": "?", "name": mobj.group(1), "report-hash": "?", "report-hash-calculated": "?", "step-count": "?", "test-cases": [] } self.consume = self._test_suite_platform self.hash_line = self._hash_sha256 return True return self._extra(index, line) def _test_suite_platform(self, index: int, line: str) -> bool: mobj = _TS_PLATFORM.match(line) if mobj: self.data["test-suite"]["platform"] = mobj.group(1) self.data["test-suite"]["line-platform"] = index self.consume = self._test_suite_compiler return True return self._error(index) def _test_suite_compiler(self, index: int, line: str) -> bool: mobj = _TS_COMPILER.match(line) if mobj: self.data["test-suite"]["compiler"] = mobj.group(1) self.data["test-suite"]["line-compiler"] = index self.consume = self._test_suite_version return True return self._error(index) def _test_suite_version(self, index: int, line: str) -> bool: mobj = _TS_VERSION.match(line) if mobj: self.data["test-suite"]["version"] = mobj.group(1) self.data["test-suite"]["line-version"] = index self.consume = self._test_suite_bsp return True return self._error(index) def _test_suite_bsp(self, index: int, line: str) -> bool: mobj = _TS_BSP.match(line) if mobj: self.data["test-suite"]["bsp"] = mobj.group(1) self.data["test-suite"]["line-bsp"] = index self.consume = self._test_suite_build_label return True return self._error(index) def _test_suite_build_label(self, index: int, line: str) -> bool: mobj = _TS_BUILD_LABEL.match(line) if mobj: self.data["test-suite"]["build-label"] = mobj.group(1) self.data["test-suite"]["line-build-label"] = index self.consume = self._test_suite_target_hash return True return self._error(index) def _test_suite_target_hash(self, index: int, line: str) -> bool: mobj = _TS_TARGET_HASH.match(line) if mobj: self.data["test-suite"]["target-hash"] = mobj.group(1) self.data["test-suite"]["line-target-hash"] = index self.consume = self._test_suite_rtems_debug return True return self._error(index) def _test_suite_rtems_debug(self, index: int, line: str) -> bool: mobj = _TS_RTEMS_DEBUG.match(line) if mobj: self.data["test-suite"]["rtems-debug"] = bool(int(mobj.group(1))) self.data["test-suite"]["line-rtems-debug"] = index self.consume = self._test_suite_rtems_multiprocessing return True return self._error(index) def _test_suite_rtems_multiprocessing(self, index: int, line: str) -> bool: mobj = _TS_RTEMS_MULTIPROCESSING.match(line) if mobj: self.data["test-suite"]["rtems-multiprocessing"] = bool( int(mobj.group(1))) self.data["test-suite"]["line-rtems-multiprocessing"] = index self.consume = self._test_suite_rtems_posix_api return True return self._error(index) def _test_suite_rtems_posix_api(self, index: int, line: str) -> bool: mobj = _TS_RTEMS_POSIX_API.match(line) if mobj: self.data["test-suite"]["rtems-posix-api"] = bool( int(mobj.group(1))) self.data["test-suite"]["line-rtems-posix-api"] = index self.consume = self._test_suite_rtems_profiling return True return self._error(index) def _test_suite_rtems_profiling(self, index: int, line: str) -> bool: mobj = _TS_RTEMS_PROFILING.match(line) if mobj: self.data["test-suite"]["rtems-profiling"] = bool( int(mobj.group(1))) self.data["test-suite"]["line-rtems-profiling"] = index self.consume = self._test_suite_rtems_smp return True return self._error(index) def _test_suite_rtems_smp(self, index: int, line: str) -> bool: mobj = _TS_RTEMS_SMP.match(line) if mobj: self.data["test-suite"]["rtems-smp"] = bool(int(mobj.group(1))) self.data["test-suite"]["line-rtems-smp"] = index self.consume = self._test_suite_body return True return self._error(index) def _test_suite_body(self, index: int, line: str) -> bool: if self._test_case_begin(index, line): return True mobj = _TS_SUITE_END.match(line) if mobj: self.level -= 1 data = self.data["test-suite"] count = int(mobj.group(2)) if data["name"] == mobj.group(1) and len( data["test-cases"]) == count: data["line-end"] = index data["line-step-count"] = index data["line-failed-steps-count"] = index data["line-duration"] = index data["step-count"] = int(mobj.group(3)) data["failed-steps-count"] = int(mobj.group(4)) data["duration"] = float(mobj.group(5)) self.consume = self._report_hash return True return self._error(index) return self._extra(index, line) def _test_case_begin(self, index: int, line: str) -> bool: mobj = _TS_CASE_BEGIN.match(line) if mobj: self.level += 1 self._test_case = { "line-begin": index, "name": mobj.group(1), "runtime-measurements": [] } self.consume = self._test_case_body return True return self._extra(index, line) def _test_case_body(self, index: int, line: str) -> bool: if self._measurement_begin(index, line): return True mobj = _TS_CASE_END.match(line) if mobj: self.level -= 1 if self._test_case["name"] == mobj.group(1): self._test_case["line-end"] = index self._test_case["line-step-count"] = index self._test_case["line-failed-steps-count"] = index self._test_case["line-duration"] = index self._test_case["step-count"] = int(mobj.group(2)) self._test_case["failed-steps-count"] = int(mobj.group(3)) self._test_case["duration"] = float(mobj.group(4)) self.data["test-suite"]["test-cases"].append(self._test_case) self.consume = self._test_suite_body return True return self._error(index) return self._extra(index, line) def _measurement_begin(self, index: int, line: str) -> bool: mobj = _M_BEGIN.match(line) if mobj: self.level += 1 self._measurement = { "line-begin": index, "name": mobj.group(1), "samples": [] } self.consume = self._measurement_variant return True return self._extra(index, line) def _measurement_variant(self, index: int, line: str) -> bool: mobj = _M_V.match(line) if mobj: self._measurement["variant"] = mobj.group(1) self.consume = self._measurement_count return True return self._error(index) def _measurement_count(self, index: int, line: str) -> bool: mobj = _M_N.match(line) if mobj: self._measurement["sample-count"] = int(mobj.group(1)) self.consume = self._measurement_samples return True return self._error(index) def _measurement_samples(self, index: int, line: str) -> bool: if self._measurement_min(index, line): return True mobj = _M_S.match(line) if mobj: self._measurement["samples"].extend( # type: ignore int(mobj.group(1)) * [float(mobj.group(2))]) return True return self._error(index) def _measurement_min(self, index: int, line: str) -> bool: mobj = _M_MI.match(line) if mobj: self._measurement["min"] = float(mobj.group(1)) self.consume = self._measurement_p1 return True return self._extra(index, line) def _measurement_p1(self, index: int, line: str) -> bool: mobj = _M_P1.match(line) if mobj: self._measurement["p1"] = float(mobj.group(1)) self.consume = self._measurement_q1 return True return self._error(index) def _measurement_q1(self, index: int, line: str) -> bool: mobj = _M_Q1.match(line) if mobj: self._measurement["q1"] = float(mobj.group(1)) self.consume = self._measurement_q2 return True return self._error(index) def _measurement_q2(self, index: int, line: str) -> bool: mobj = _M_Q2.match(line) if mobj: self._measurement["q2"] = float(mobj.group(1)) self.consume = self._measurement_q3 return True return self._error(index) def _measurement_q3(self, index: int, line: str) -> bool: mobj = _M_Q3.match(line) if mobj: self._measurement["q3"] = float(mobj.group(1)) self.consume = self._measurement_p99 return True return self._error(index) def _measurement_p99(self, index: int, line: str) -> bool: mobj = _M_P99.match(line) if mobj: self._measurement["p99"] = float(mobj.group(1)) self.consume = self._measurement_max return True return self._error(index) def _measurement_max(self, index: int, line: str) -> bool: mobj = _M_MX.match(line) if mobj: self._measurement["max"] = float(mobj.group(1)) self.consume = self._measurement_mad return True return self._error(index) def _measurement_mad(self, index: int, line: str) -> bool: mobj = _M_MAD.match(line) if mobj: self._measurement["mad"] = float(mobj.group(1)) self.consume = self._measurement_duration return True return self._error(index) def _measurement_duration(self, index: int, line: str) -> bool: mobj = _M_D.match(line) if mobj: self._measurement["duration-sum"] = float(mobj.group(1)) self.consume = self._measurement_end return True return self._error(index) def _measurement_end(self, index: int, line: str) -> bool: mobj = _M_END.match(line) if mobj: self.level -= 1 if self._measurement["name"] == mobj.group( 1) and _are_samples_valid(self._measurement): self._measurement["line-end"] = index self._measurement["duration-total"] = float(mobj.group(2)) self._test_case["runtime-measurements"].append( # type: ignore self._measurement) self.consume = self._test_case_body return True return self._error(index) def _report_hash(self, index: int, line: str) -> bool: mobj = _TS_REPORT_HASH.match(line) if mobj: digest = base64.urlsafe_b64encode( self._hash_state.digest()).decode("ascii") self._hash_state = hashlib.sha256() self.data["test-suite"]["report-hash-calculated"] = digest self.data["test-suite"]["report-hash"] = mobj.group(1) self.data["test-suite"]["line-report-hash"] = index self.consume = self._test_body self.hash_line = self._hash_none return True return self._extra(index, line) def _gcov_begin(self, index: int, line: str) -> bool: mobj = _GCOV_BEGIN.match(line) if mobj: self.level += 1 self.data["line-gcov-info-base64-begin"] = index self.consume = self._gcov_end return True return False def _gcov_end(self, index: int, line: str) -> bool: mobj = _GCOV_END.match(line) if mobj: self.level -= 1 self.data["line-gcov-info-base64-end"] = index self.data["data-ranges"].append( (self.data["line-gcov-info-base64-begin"] + 1, index)) self.consume = self._extra return True return False def _records_begin(self, index: int, line: str) -> bool: mobj = _RECORDS_BEGIN.match(line) if mobj: self.level += 1 self.data["line-records-base64-begin"] = index self.consume = self._records_end return True return False def _records_end(self, index: int, line: str) -> bool: mobj = _RECORDS_END.match(line) if mobj: self.level -= 1 self.data["line-records-base64-end"] = index self.data["data-ranges"].append( (self.data["line-records-base64-begin"] + 1, index)) self.consume = self._extra return True return False def _records_zlib_begin(self, index: int, line: str) -> bool: mobj = _RECORDS_ZLIB_BEGIN.match(line) if mobj: self.level += 1 self.data["line-records-base64-zlib-begin"] = index self.consume = self._records_zlib_end return True return False def _records_zlib_end(self, index: int, line: str) -> bool: mobj = _RECORDS_ZLIB_END.match(line) if mobj: self.level -= 1 self.data["line-records-base64-zlib-end"] = index self.data["data-ranges"].append( (self.data["line-records-base64-zlib-begin"] + 1, index)) self.consume = self._extra return True return False def _extra(self, index: int, line: str) -> bool: if self._gcov_begin(index, line): return True if self._records_begin(index, line): return True if self._records_zlib_begin(index, line): return True return False def augment_report(report: Dict[str, Any], output: Iterable[str]) -> None: """ Augments the report with the results of the parsed output. """ test_parser = TestOutputParser(report) for index, line in enumerate(output): if not line: continue test_parser.consume(index, line) test_parser.hash_line(line)