From d08eeb5b46e40b080ecf57c05edeb817598c5e64 Mon Sep 17 00:00:00 2001 From: Sebastian Huber Date: Sat, 13 Mar 2021 19:22:07 +0100 Subject: validation: Add TransitionMap --- rtemsspec/validation.py | 392 +++++++++++++++++++++++++----------------------- 1 file changed, 207 insertions(+), 185 deletions(-) diff --git a/rtemsspec/validation.py b/rtemsspec/validation.py index 358d7f37..082da455 100644 --- a/rtemsspec/validation.py +++ b/rtemsspec/validation.py @@ -467,28 +467,19 @@ class _Transition(NamedTuple): map_entry_index: int -_ConditionIndexToEnum = Tuple[Tuple[str, ...], ...] +_IdxToX = Tuple[Tuple[str, ...], ...] _TransitionMap = List[List[_Transition]] -def _state_to_index(conditions: List[Any]) -> Tuple[Dict[str, int], ...]: +def _to_st_idx(conditions: List[Any]) -> Tuple[Dict[str, int], ...]: return tuple( - dict((state["name"], index) - for index, state in enumerate(condition["states"] + [{ - "name": "N/A" - }])) for condition in conditions) + dict((state["name"], st_idx) for st_idx, state in enumerate( + itertools.chain(condition["states"], [{ + "name": "N/A" + }]))) for condition in conditions) -def _condition_index_to_enum(prefix: str, - conditions: List[Any]) -> _ConditionIndexToEnum: - return tuple( - tuple([f"{prefix}_{condition['name']}"] + [ - f"{prefix}_{condition['name']}_{state['name']}" - for state in condition["states"] - ] + [f"{prefix}_{condition['name']}_NA"]) for condition in conditions) - - -def _condition_index_to_name(conditions: List[Any]) -> _ConditionIndexToEnum: +def _to_st_name(conditions: List[Any]) -> _IdxToX: return tuple( tuple( itertools.chain((state["name"] @@ -496,209 +487,165 @@ def _condition_index_to_name(conditions: List[Any]) -> _ConditionIndexToEnum: for condition in conditions) -def _add_condition_enum(content: CContent, - index_to_enum: _ConditionIndexToEnum) -> None: - for enum in index_to_enum: - content.add("typedef enum {") - with content.indent(): - content.add(",\n".join(enum[1:])) - content.add(f"}} {enum[0]};") - - -class _ActionRequirementTestItem(_TestItem): - """ An action requirement test item. """ +class TransitionMap: + """ Representation of an action requirement transition map. """ # pylint: disable=too-many-instance-attributes def __init__(self, item: Item): - super().__init__(item) - self._pre_condition_count = len(item["pre-conditions"]) - self._post_condition_count = len(item["post-conditions"]) - self._pre_index_to_enum = _condition_index_to_enum( - f"{self.ident}_Pre", item["pre-conditions"]) - self._post_index_to_enum = _condition_index_to_enum( - f"{self.ident}_Post", item["post-conditions"]) - self._post_index_to_state = _condition_index_to_name( + self._item = item + self._pre_co_count = len(item["pre-conditions"]) + self._post_co_count = len(item["post-conditions"]) + self._post_co_idx_st_idx_to_st_name = _to_st_name( + item["post-conditions"]) + self._pre_co_idx_st_name_to_st_idx = _to_st_idx(item["pre-conditions"]) + self._post_co_idx_st_name_to_st_idx = _to_st_idx( item["post-conditions"]) - self._pre_state_to_index = _state_to_index(item["pre-conditions"]) - self._post_state_to_index = _state_to_index(item["post-conditions"]) - self._pre_index_to_condition = dict( - (index, condition) - for index, condition in enumerate(item["pre-conditions"])) - self._post_index_to_name = dict( - (index, condition["name"]) - for index, condition in enumerate(item["post-conditions"])) + self._pre_co_idx_to_cond = dict( + (co_idx, condition) + for co_idx, condition in enumerate(item["pre-conditions"])) + self._post_co_idx_to_co_name = dict( + (co_idx, condition["name"]) + for co_idx, condition in enumerate(item["post-conditions"])) + self._map = self._build_map() + self._check_completeness() - def _add_pre_condition_descriptions(self, content: CContent) -> None: - for condition in self["pre-conditions"]: - content.add("static const char * const " - f"{self.ident}_PreDesc_{condition['name']}[] = {{") - with content.indent(): - content.add(",\n".join( - itertools.chain((f"\"{state['name']}\"" - for state in condition["states"]), - ["\"NA\""]))) - content.add("};") - content.add("static const char * const * const " - f"{self.ident}_PreDesc[] = {{") - with content.indent(): - content.add(",\n".join([ - f"{self.ident}_PreDesc_{condition['name']}" - for condition in self["pre-conditions"] - ] + ["NULL"])) - content.add("};") + def __getitem__(self, key: str): + return self._item[key] - def add_default_context_members(self, content: CContent) -> None: - super().add_default_context_members(content) - content.add_description_block( - "This member defines the pre-condition states " - "for the next action.", None) - content.add(f"size_t pcs[ {self._pre_condition_count} ];") - content.add_description_block( - "This member indicates if the test action loop " - "is currently executed.", None) - content.add("bool in_action_loop;") + def __iter__(self): + yield from self._map - def _add_fixture_scope(self, content: CContent) -> None: - params = ["void *arg", "char *buf", "size_t n"] - with content.function("static size_t", f"{self.ident}_Scope", params): - content.add([f"{self.context} *ctx;", "", "ctx = arg;"]) - with content.condition("ctx->in_action_loop"): - content.call_function( - "return", "T_get_scope", - [f"{self.ident}_PreDesc", "buf", "n", "ctx->pcs"]) - content.add("return 0;") + def _check_completeness(self) -> None: + for map_idx, transistions in enumerate(self): + if not transistions or transistions[0].enabled_by != "1": + raise ValueError( + f"transition map of {self._item.spec} contains no default " + "entry for pre-condition set " + f"{{{self._map_index_to_pre_conditions(map_idx)}}}") - def _map_index_to_pre_conditions(self, map_index: int) -> str: + def _map_index_to_pre_conditions(self, map_idx: int) -> str: conditions = [] - for condition in reversed(self.item["pre-conditions"]): + for condition in reversed(self._item["pre-conditions"]): states = condition["states"] count = len(states) - index = int(map_index % count) - conditions.append(f"{condition['name']}={states[index]['name']}") - map_index //= count + st_idx = int(map_idx % count) + conditions.append(f"{condition['name']}={states[st_idx]['name']}") + map_idx //= count return ", ".join(reversed(conditions)) - def _add_transitions(self, trans_index: int, condition_index: int, - map_index: int, transition: Dict[str, Any], - transition_map: _TransitionMap, info: List[str], - post_cond: Tuple[int, ...]) -> None: + def _add_transitions(self, trans_idx: int, co_idx: int, map_idx: int, + transition: Dict[str, + Any], transition_map: _TransitionMap, + info: List[str], post_cond: Tuple[int, ...]) -> None: # pylint: disable=too-many-arguments # pylint: disable=too-many-locals - if condition_index < self._pre_condition_count: - condition = self._pre_index_to_condition[condition_index] + if co_idx < self._pre_co_count: + condition = self._pre_co_idx_to_cond[co_idx] state_count = len(condition["states"]) - map_index *= state_count + map_idx *= state_count states = transition["pre-conditions"][condition["name"]] if isinstance(states, str): assert states in ["all", "N/A"] - for index in range(state_count): - self._add_transitions(trans_index, condition_index + 1, - map_index + index, transition, + for st_idx in range(state_count): + self._add_transitions(trans_idx, co_idx + 1, + map_idx + st_idx, transition, transition_map, info + [str(int(states == "N/A"))], post_cond) else: - for state in states: + for st_name in states: try: - index = self._pre_state_to_index[condition_index][ - state] + st_idx = self._pre_co_idx_st_name_to_st_idx[co_idx][ + st_name] except KeyError as err: - msg = (f"transition map entry {trans_index} of " - f"{self.item.spec} refers to non-existent " + msg = (f"transition map entry {trans_idx} of " + f"{self._item.spec} refers to non-existent " f"state {err} of pre-condition " f"'{condition['name']}'") raise ValueError(msg) from err - self._add_transitions(trans_index, condition_index + 1, - map_index + index, transition, + self._add_transitions(trans_idx, co_idx + 1, + map_idx + st_idx, transition, transition_map, info + ["0"], post_cond) else: enabled_by = enabled_by_to_exp(transition["enabled-by"], ExpressionMapper()) - if enabled_by == "1" and transition_map[map_index]: + if enabled_by == "1" and transition_map[map_idx]: raise ValueError( - f"transition map entry {trans_index} of " - f"{self.item.spec} duplicates pre-condition set " - f"{{{self._map_index_to_pre_conditions(map_index)}}} " + f"transition map entry {trans_idx} of " + f"{self._item.spec} duplicates pre-condition set " + f"{{{self._map_index_to_pre_conditions(map_idx)}}} " "defined by transition map entry " - f"{transition_map[map_index][0].map_entry_index}") - transition_map[map_index].append( - _Transition(enabled_by, post_cond, ", ".join(info), - trans_index)) + f"{transition_map[map_idx][0].map_entry_index}") + transition_map[map_idx].append( + _Transition(enabled_by, post_cond, ", ".join(info), trans_idx)) - def _add_default(self, trans_index: int, transition_map: _TransitionMap, + def _add_default(self, trans_idx: int, transition_map: _TransitionMap, info: List[str], post_cond: Tuple[int, ...]) -> None: for transition in transition_map: if not transition: transition.append( - _Transition( - "1", post_cond, - ", ".join(info + ["0"] * self._pre_condition_count), - trans_index)) + _Transition("1", post_cond, + ", ".join(info + ["0"] * self._pre_co_count), + trans_idx)) - def _get_transition_map(self) -> _TransitionMap: + def _build_map(self) -> _TransitionMap: transition_count = 1 for condition in self["pre-conditions"]: state_count = len(condition["states"]) if state_count == 0: raise ValueError(f"pre-condition '{condition['name']}' of " - f"{self.item.spec} has no states") + f"{self._item.spec} has no states") transition_count *= state_count transition_map = [list() for _ in range(transition_count) ] # type: _TransitionMap - for trans_index, transition in enumerate(self["transition-map"]): + for trans_idx, transition in enumerate(self["transition-map"]): if isinstance(transition["post-conditions"], dict): try: info = ["0"] post_cond = tuple( - self._post_state_to_index[index][ + self._post_co_idx_st_name_to_st_idx[co_idx][ transition["post-conditions"][ - self._post_index_to_name[index]]] - for index in range(self._post_condition_count)) + self._post_co_idx_to_co_name[co_idx]]] + for co_idx in range(self._post_co_count)) except KeyError as err: - msg = (f"transition map entry {trans_index} of " - f"{self.item.spec} refers to non-existent " + msg = (f"transition map entry {trans_idx} of " + f"{self._item.spec} refers to non-existent " f"post-condition state {err}") raise ValueError(msg) from err else: info = ["1"] post_cond = tuple( - len(self._post_state_to_index[index]) - 1 - for index in range(self._post_condition_count)) + self._post_co_idx_st_name_to_st_idx[co_idx]["N/A"] + for co_idx in range(self._post_co_count)) if isinstance(transition["pre-conditions"], dict): - self._add_transitions(trans_index, 0, 0, transition, + self._add_transitions(trans_idx, 0, 0, transition, transition_map, info, post_cond) else: assert transition["pre-conditions"] == "default" - self._add_default(trans_index, transition_map, info, post_cond) + self._add_default(trans_idx, transition_map, info, post_cond) return transition_map def _get_entry(self, variant: _Transition) -> str: entry = f"E( {variant.info}, " + ", ".join( - self._post_index_to_state[cond_index][state_index] - for cond_index, state_index in enumerate( - variant.post_conditions)) + " )," + self._post_co_idx_st_idx_to_st_name[co_idx][st_idx] + for co_idx, st_idx in enumerate(variant.post_conditions)) wrapper = textwrap.TextWrapper() wrapper.initial_indent = " " wrapper.subsequent_indent = " " wrapper.width = 75 - return "\n".join(wrapper.wrap(entry)) + return "\n".join(wrapper.wrap(entry)) + " )," def _get_entry_bits(self) -> int: - bits = self._pre_condition_count + 1 - for enum in self._post_index_to_enum: - bits += math.ceil(math.log2(len(enum))) + bits = self._pre_co_count + 1 + for st_idx_to_st_name in self._post_co_idx_st_idx_to_st_name: + bits += math.ceil(math.log2(len(st_idx_to_st_name))) return 2**max(math.ceil(math.log2(bits)), 3) - def _add_transition_map(self, content: CContent) -> None: - transition_map = self._get_transition_map() + def add_map(self, content: CContent, ident: str) -> None: + """ Adds the transition map definitions to the content. """ entries = [] - for map_index, transistions in enumerate(transition_map): - if not transistions or transistions[0].enabled_by != "1": - raise ValueError( - f"transition map of {self.item.spec} contains no default " - "entry for pre-condition set " - f"{{{self._map_index_to_pre_conditions(map_index)}}}") + for transistions in self._map: if len(transistions) == 1: entries.append(self._get_entry(transistions[0])) else: @@ -722,76 +669,151 @@ class _ActionRequirementTestItem(_TestItem): state_bits = math.ceil(math.log2(len(condition["states"]) + 1)) content.append( f"uint{bits}_t Post_{condition['name']} : {state_bits};") - content.add(f"}} {self.ident}_Entry;") - pre_count = 1 + self._pre_condition_count - entry = ("#define E( " + ", ".join( - f"x{index}" - for index in range(pre_count + self._post_condition_count) - ) + ") { " + ", ".join( - itertools.chain((f"x{index}" for index in range(pre_count)), ( - f"{self.ident}_Post_{condition['name']}_##x{pre_count + index}" - for index, condition in enumerate(self["post-conditions"])))) + - " }") + content.add(f"}} {ident}_Entry;") + pre_count = 1 + self._pre_co_count + entry = "#define E( " + entry += ", ".join(f"x{index}" + for index in range(pre_count + self._post_co_count)) + entry += ") { " + entry += ", ".join( + itertools.chain( + (f"x{index}" for index in range(pre_count)), + (f"{ident}_Post_{condition['name']}_##x{pre_count + co_idx}" + for co_idx, condition in enumerate(self["post-conditions"])))) wrapper = textwrap.TextWrapper() wrapper.initial_indent = "" wrapper.subsequent_indent = " " wrapper.width = 77 - content.add(" \\\n".join(wrapper.wrap(entry))) - content.add( - [f"static const {self.ident}_Entry", f"{self.ident}_Map[] = {{"]) + content.add(" \\\n".join(wrapper.wrap(entry)) + " }") + content.add([f"static const {ident}_Entry", f"{ident}_Map[] = {{"]) entries[-1] = entries[-1].replace("),", ")") content.append(entries) content.append(["};", "", "#undef E"]) + def get_post_entry_member(self, co_idx: int) -> str: + """ + Gets the post-condition entry member name for the post-condition index. + """ + return f"Post_{self._post_co_idx_to_co_name[co_idx]}" + + +def _to_enum(prefix: str, conditions: List[Any]) -> _IdxToX: + return tuple( + tuple([f"{prefix}_{condition['name']}"] + [ + f"{prefix}_{condition['name']}_{state['name']}" + for state in condition["states"] + ] + [f"{prefix}_{condition['name']}_NA"]) for condition in conditions) + + +def _add_condition_enum(content: CContent, co_idx_to_enum: _IdxToX) -> None: + for enum in co_idx_to_enum: + content.add("typedef enum {") + with content.indent(): + content.add(",\n".join(enum[1:])) + content.add(f"}} {enum[0]};") + + +class _ActionRequirementTestItem(_TestItem): + """ An action requirement test item. """ + def __init__(self, item: Item): + super().__init__(item) + self._pre_co_count = len(item["pre-conditions"]) + self._pre_co_idx_to_enum = _to_enum(f"{self.ident}_Pre", + item["pre-conditions"]) + self._post_co_idx_to_enum = _to_enum(f"{self.ident}_Post", + item["post-conditions"]) + + def _add_pre_condition_descriptions(self, content: CContent) -> None: + for condition in self["pre-conditions"]: + content.add("static const char * const " + f"{self.ident}_PreDesc_{condition['name']}[] = {{") + with content.indent(): + content.add(",\n".join( + itertools.chain((f"\"{state['name']}\"" + for state in condition["states"]), + ["\"NA\""]))) + content.add("};") + content.add("static const char * const * const " + f"{self.ident}_PreDesc[] = {{") + with content.indent(): + content.add(",\n".join([ + f"{self.ident}_PreDesc_{condition['name']}" + for condition in self["pre-conditions"] + ] + ["NULL"])) + content.add("};") + + def add_default_context_members(self, content: CContent) -> None: + super().add_default_context_members(content) + content.add_description_block( + "This member defines the pre-condition states " + "for the next action.", None) + content.add(f"size_t pcs[ {self._pre_co_count} ];") + content.add_description_block( + "This member indicates if the test action loop " + "is currently executed.", None) + content.add("bool in_action_loop;") + + def _add_fixture_scope(self, content: CContent) -> None: + params = ["void *arg", "char *buf", "size_t n"] + with content.function("static size_t", f"{self.ident}_Scope", params): + content.add([f"{self.context} *ctx;", "", "ctx = arg;"]) + with content.condition("ctx->in_action_loop"): + content.call_function( + "return", "T_get_scope", + [f"{self.ident}_PreDesc", "buf", "n", "ctx->pcs"]) + content.add("return 0;") + def _add_call(self, content: CContent, key: str, name: str) -> None: if self[key] is not None: content.gap = False content.call_function(None, f"{self.ident}_{name}", ["ctx"]) - def _add_loop_body(self, content: CContent) -> None: + def _add_loop_body(self, content: CContent, + transition_map: TransitionMap) -> None: with content.condition(f"{self.ident}_Map[ index ].Skip"): content.append(["++index;", "continue;"]) content.add_blank_line() self._add_call(content, "test-prepare", "Prepare") - for index, enum in enumerate(self._pre_index_to_enum): + for index, enum in enumerate(self._pre_co_idx_to_enum): content.gap = False content.call_function(None, f"{enum[0]}_Prepare", ["ctx", f"ctx->pcs[ {index} ]"]) self._add_call(content, "test-action", "Action") content.append(f"entry = {self.ident}_Map[ index ];") - for index, enum in enumerate(self._post_index_to_enum): + for index, enum in enumerate(self._post_co_idx_to_enum): content.gap = False - content.call_function( - None, f"{enum[0]}_Check", - ["ctx", f"entry.Post_{self._post_index_to_name[index]}"]) + content.call_function(None, f"{enum[0]}_Check", [ + "ctx", f"entry.{transition_map.get_post_entry_member(index)}" + ]) self._add_call(content, "test-cleanup", "Cleanup") content.append("++index;") - def _add_for_loops(self, content: CContent, index: int) -> None: - if index < self._pre_condition_count: + def _add_for_loops(self, content: CContent, transition_map: TransitionMap, + index: int) -> None: + if index < self._pre_co_count: var = f"ctx->pcs[ {index} ]" - begin = self._pre_index_to_enum[index][1] - end = self._pre_index_to_enum[index][-1] + begin = self._pre_co_idx_to_enum[index][1] + end = self._pre_co_idx_to_enum[index][-1] with content.for_loop(f"{var} = {begin}", f"{var} < {end}", f"++{var}"): - if index + 1 == self._pre_cond_count: + if index + 1 == self._pre_co_count: content.add(f"{self.ident}_Entry entry;") name = self._item['pre-conditions'][index]["name"] pre_na = f"{self.ident}_Map[ index ].Pre_{name}_NA" with content.condition(pre_na): content.append(f"{var} = {end};") content.append(f"index += ( {end} - 1 )") - for index_2 in range(index + 1, self._pre_condition_count): + for index_2 in range(index + 1, self._pre_co_count): with content.indent(): content.append( - f"* {self._pre_index_to_enum[index_2][-1]}") + f"* {self._pre_co_idx_to_enum[index_2][-1]}") content.lines[-1] += ";" - self._add_for_loops(content, index + 1) + self._add_for_loops(content, transition_map, index + 1) else: - self._add_loop_body(content) + self._add_loop_body(content, transition_map) - def _add_test_case(self, content: CContent, header: Dict[str, - Any]) -> None: + def _add_test_case(self, content: CContent, transition_map: TransitionMap, + header: Dict[str, Any]) -> None: fixture = f"{self.ident}_Fixture" prologue = CContent() epilogue = CContent() @@ -825,14 +847,13 @@ class _ActionRequirementTestItem(_TestItem): align = False with content.function(ret, name, params, align=align): content.add(prologue) - self._add_for_loops(content, 0) + self._add_for_loops(content, transition_map, 0) content.add(epilogue) def _add_handler(self, content: CContent, conditions: List[Any], - index_to_enum: _ConditionIndexToEnum, - action: str) -> None: - for condition_index, condition in enumerate(conditions): - enum = index_to_enum[condition_index] + co_idx_to_enum: _IdxToX, action: str) -> None: + for co_idx, condition in enumerate(conditions): + enum = co_idx_to_enum[co_idx] handler = f"{enum[0]}_{action}" params = [f"{self.context} *ctx", f"{enum[0]} state"] with content.function("static void", handler, params): @@ -860,8 +881,8 @@ class _ActionRequirementTestItem(_TestItem): def add_header_body(self, content: CContent, header: Dict[str, Any]) -> None: - _add_condition_enum(content, self._pre_index_to_enum) - _add_condition_enum(content, self._post_index_to_enum) + _add_condition_enum(content, self._pre_co_idx_to_enum) + _add_condition_enum(content, self._post_co_idx_to_enum) super().add_header_body(content, header) def generate(self, content: CContent, base_directory: str, @@ -871,15 +892,15 @@ class _ActionRequirementTestItem(_TestItem): if header: self.generate_header(base_directory, header) else: - _add_condition_enum(content, self._pre_index_to_enum) - _add_condition_enum(content, self._post_index_to_enum) + _add_condition_enum(content, self._pre_co_idx_to_enum) + _add_condition_enum(content, self._post_co_idx_to_enum) instance = self.add_context(content) self._add_pre_condition_descriptions(content) content.add(self.substitute_code(self["test-support"])) self._add_handler(content, self["pre-conditions"], - self._pre_index_to_enum, "Prepare") + self._pre_co_idx_to_enum, "Prepare") self._add_handler(content, self["post-conditions"], - self._post_index_to_enum, "Check") + self._post_co_idx_to_enum, "Check") optional_code = "ctx->in_action_loop = false;" setup = self.add_support_method(content, "test-setup", @@ -900,11 +921,12 @@ class _ActionRequirementTestItem(_TestItem): f" .teardown = {teardown},", f" .scope = {self.ident}_Scope,", f" .initial_context = {instance}", "};" ]) - self._add_transition_map(content) + transition_map = TransitionMap(self.item) + transition_map.add_map(content, self.ident) self.add_function(content, "test-prepare", "Prepare") self.add_function(content, "test-action", "Action") self.add_function(content, "test-cleanup", "Cleanup") - self._add_test_case(content, header) + self._add_test_case(content, transition_map, header) content.add("/** @} */") -- cgit v1.2.3