# SPDX-License-Identifier: BSD-2-Clause
""" This module provides functions for specification item verification. """
# Copyright (C) 2020 embedded brains GmbH (http://www.embedded-brains.de)
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
from contextlib import contextmanager
import logging
import re
from typing import Any, Dict, Iterator, List, NamedTuple, Set
from rtemsspec.items import Item, ItemCache
_VerifierMap = Dict[str, "_Verifier"]
class VerifyStatus(NamedTuple):
""" This tuple provides the verify message counts by category. """
critical: int
error: int
warning: int
info: int
debug: int
class _Filter(logging.Filter):
def __init__(self):
super().__init__()
self._counts = {} # type: Dict[int, int]
def filter(self, record: logging.LogRecord) -> bool:
count = self._counts.get(record.levelno, 0)
self._counts[record.levelno] = count + 1
return True
def get_verify_info(self) -> VerifyStatus:
""" Returns the gathered verify information. """
return VerifyStatus(self._counts.get(logging.CRITICAL, 0),
self._counts.get(logging.ERROR, 0),
self._counts.get(logging.WARNING, 0),
self._counts.get(logging.INFO, 0),
self._counts.get(logging.DEBUG, 0))
def _type_name(value: Any):
type_name = type(value).__name__
if type_name == "NoneType":
return "none"
return type_name
class _Path(NamedTuple):
item: Item
path: str
class _AssertContext(NamedTuple):
path: _Path
value: Any
type_info: Dict[str, Any]
ops: Dict[str, Any]
def _assert_op_and(ctx: _AssertContext, assert_info: Any) -> bool:
for element in assert_info:
if not _assert(ctx, element):
return False
return True
def _assert_op_not(ctx: _AssertContext, assert_info: Any) -> bool:
return not _assert(ctx, assert_info)
def _assert_op_or(ctx: _AssertContext, assert_info: Any) -> bool:
for element in assert_info:
if _assert(ctx, element):
return True
return False
def _assert_op_eq(ctx: _AssertContext, assert_info: Any) -> bool:
return ctx.value == assert_info
def _assert_op_ne(ctx: _AssertContext, assert_info: Any) -> bool:
return ctx.value != assert_info
def _assert_op_le(ctx: _AssertContext, assert_info: Any) -> bool:
return ctx.value <= assert_info
def _assert_op_lt(ctx: _AssertContext, assert_info: Any) -> bool:
return ctx.value < assert_info
def _assert_op_ge(ctx: _AssertContext, assert_info: Any) -> bool:
return ctx.value >= assert_info
def _assert_op_gt(ctx: _AssertContext, assert_info: Any) -> bool:
return ctx.value > assert_info
def _assert_op_uid(ctx: _AssertContext, _assert_info: Any) -> bool:
try:
ctx.path.item.map(ctx.value)
except KeyError:
logging.warning("%s cannot resolve UID: %s", _prefix(ctx.path),
ctx.value)
return False
return True
def _assert_op_re(ctx: _AssertContext, assert_info: Any) -> bool:
return re.search(assert_info, ctx.value) is not None
def _assert_op_in(ctx: _AssertContext, assert_info: Any) -> bool:
return ctx.value in assert_info
_WORD_SEPARATOR = re.compile(r"[ \t\n\r\f\v-]+")
def _assert_op_contains(ctx: _AssertContext, assert_info: Any) -> bool:
value = " " + " ".join(_WORD_SEPARATOR.split(ctx.value.lower())) + " "
return any(f" {substring} " in value for substring in assert_info)
def _assert(ctx: _AssertContext, assert_info: Any) -> bool:
if isinstance(assert_info, list):
return _assert_op_or(ctx, assert_info)
key = next(iter(assert_info))
return ctx.ops[key](ctx, assert_info[key])
def _maybe_assert(path: _Path, value: Any, type_info: Any,
ops: Dict[str, Any]) -> bool:
if "assert" in type_info:
return _assert(_AssertContext(path, value, type_info, ops),
type_info["assert"])
return True
_ASSERT_OPS_INT_OR_FLOAT = {
"and": _assert_op_and,
"not": _assert_op_not,
"or": _assert_op_or,
"eq": _assert_op_eq,
"ne": _assert_op_ne,
"le": _assert_op_le,
"lt": _assert_op_lt,
"ge": _assert_op_ge,
"gt": _assert_op_gt,
}
def _assert_int_or_float(path: _Path, value: Any, type_info: Any) -> bool:
return _maybe_assert(path, value, type_info, _ASSERT_OPS_INT_OR_FLOAT)
_ASSERT_OPS_STR = {
"and": _assert_op_and,
"not": _assert_op_not,
"or": _assert_op_or,
"eq": _assert_op_eq,
"ne": _assert_op_ne,
"le": _assert_op_le,
"lt": _assert_op_lt,
"ge": _assert_op_ge,
"gt": _assert_op_gt,
"uid": _assert_op_uid,
"re": _assert_op_re,
"in": _assert_op_in,
"contains": _assert_op_contains,
}
def _assert_str(path: _Path, value: Any, type_info: Any) -> bool:
return _maybe_assert(path, value, type_info, _ASSERT_OPS_STR)
def _assert_type(path: _Path, value: Any, type_expected: str) -> bool:
type_actual = _type_name(value)
if type_actual == type_expected:
return True
logging.error("%s expected type '%s', actual type '%s'", _prefix(path),
type_expected, type_actual)
return False
NAME = re.compile(r"^([a-z][a-z0-9-]*|SPDX-License-Identifier)$")
def _prefix(prefix: _Path) -> str:
if prefix.path.endswith(":"):
return prefix.path
return prefix.path + ":"
class _Verifier:
def __init__(self, name: str, verifier_map: _VerifierMap):
self._name = name
self._verifier_map = verifier_map
self.is_subtype = False
verifier_map[name] = self
def verify_info(self, path: _Path) -> None:
""" Produces a verify logging information. """
logging.info("%s verify using type '%s'", _prefix(path), self._name)
def verify(self, path: _Path, value: Any) -> Set[str]:
""" Verifies a value according to the type information. """
self.verify_info(path)
_assert_type(path, value, self._name)
return set()
def resolve_type_refinements(self) -> None:
""" Resolves the type refinements for this type. """
class _NameVerifier(_Verifier):
def verify(self, path: _Path, value: Any) -> Set[str]:
""" Verifies a name. """
self.verify_info(path)
if _assert_type(path, value, "str") and NAME.search(value) is None:
logging.error("%s invalid name: %s", _prefix(path), value)
return set()
class _UIDVerifier(_Verifier):
def verify(self, path: _Path, value: Any) -> Set[str]:
""" Verifies an attribute key. """
self.verify_info(path)
if _assert_type(path, value, "str"):
try:
path.item.map(value)
except KeyError:
logging.error("%s cannot resolve UID: %s", _prefix(path),
value)
return set()
class _ItemVerifier(_Verifier):
def __init__(self, name: str, verifier_map: _VerifierMap,
info_map: Dict[str, Any], item: Item):
super().__init__(name, verifier_map)
self._info_map = info_map
self._item = item
self._subtype_key = ""
self._subtype_verifiers = {} # type: _VerifierMap
def verify_bool(self, path: _Path, value: Any, type_info: Any) -> Set[str]:
""" Verifies a boolean value. """
# pylint: disable=no-self-use
if type_info and "assert" in type_info:
expected = type_info["assert"]
if expected != value:
logging.error("%s expected %r, actual %r", _prefix(path),
expected, value)
return set()
def _verify_key(self, path: _Path, value: Any, type_name: str,
key: str) -> None:
if type_name in self._verifier_map:
self._verifier_map[type_name].verify(
_Path(path.item, path.path + f"/{key}"), value[key])
else:
logging.error("%s unknown specification type: %s", _prefix(path),
type_name)
def assert_keys_no_constraints(self, path: _Path, specified_keys: Set[str],
keys: List[str]) -> None:
""" Asserts nothing in particular. """
def assert_keys_at_least_one(self, path: _Path, specified_keys: Set[str],
keys: List[str]) -> None:
""" Asserts that at least one specified key is present in the keys. """
present_keys = specified_keys.intersection(keys)
if len(present_keys) == 0:
logging.error(
"%s not at least one key out of %s is present for type '%s'",
_prefix(path), str(sorted(specified_keys)), self._name)
def assert_keys_at_most_one(self, path: _Path, specified_keys: Set[str],
keys: List[str]) -> None:
""" Asserts that at most one specified key is present in the keys. """
present_keys = specified_keys.intersection(keys)
if len(present_keys) > 1:
logging.error(
"%s not at most one key out of %s "
"is present for type '%s': %s", _prefix(path),
str(sorted(specified_keys)), self._name,
str(sorted(present_keys)))
def assert_keys_exactly_one(self, path: _Path, specified_keys: Set[str],
keys: List[str]) -> None:
""" Asserts that exactly one specified key is present in the keys. """
present_keys = specified_keys.intersection(keys)
if len(present_keys) != 1:
logging.error(
"%s not exactly one key out of %s "
"is present for type '%s': %s", _prefix(path),
str(sorted(specified_keys)), self._name,
str(sorted(present_keys)))
def assert_keys_subset(self, path: _Path, specified_keys: Set[str],
keys: List[str]) -> None:
""" Asserts that the specified keys are a subset of the keys. """
if not specified_keys.issubset(keys):
missing_keys = specified_keys.difference(
specified_keys.intersection(keys))
logging.error("%s missing mandatory keys for type '%s': %s",
_prefix(path), self._name, str(sorted(missing_keys)))
def _assert_mandatory_keys(self, path: _Path, type_info: Any,
attr_info: Any, keys: List[str]) -> None:
mandatory_attr_info = type_info["mandatory-attributes"]
if isinstance(mandatory_attr_info, str):
_ASSERT_KEYS[mandatory_attr_info](self, path, set(attr_info), keys)
else:
assert isinstance(mandatory_attr_info, list)
self.assert_keys_subset(path, set(mandatory_attr_info), keys)
def verify_dict(self, path: _Path, value: Any, type_info: Any) -> Set[str]:
""" Verifies a dictionary value. """
keys = sorted(filter(lambda key: not key.startswith("_"), value))
attr_info = type_info["attributes"]
self._assert_mandatory_keys(path, type_info, attr_info, keys)
verified_keys = set() # type: Set[str]
for key in keys:
if key in attr_info:
self._verify_key(path, value, attr_info[key]["spec-type"], key)
verified_keys.add(key)
elif "generic-attributes" in type_info:
key_as_value = {key: key}
self._verify_key(
path, key_as_value,
type_info["generic-attributes"]["key-spec-type"], key)
self._verify_key(
path, value,
type_info["generic-attributes"]["value-spec-type"], key)
verified_keys.add(key)
if self._subtype_key:
if self._subtype_key in keys:
subtype_value = value[self._subtype_key]
if subtype_value in self._subtype_verifiers:
verified_keys.update(
self._subtype_verifiers[subtype_value].verify(
path, value))
else:
logging.error(
"%s unknown subtype for key '%s' for type '%s': %s",
_prefix(path), self._subtype_key, self._name,
subtype_value)
else:
logging.error("%s subtype key '%s' not present for type '%s'",
_prefix(path), self._subtype_key, self._name)
if not self.is_subtype:
unverified_keys = set(keys).difference(verified_keys)
if unverified_keys:
logging.error(
"%s has unverfied keys for type '%s' and its subtypes: %s",
_prefix(path), self._name, str(sorted(unverified_keys)))
return verified_keys
def verify_int_or_float(self, path: _Path, value: Any,
type_info: Any) -> Set[str]:
""" Verifies an integer or float value. """
# pylint: disable=no-self-use
if not _assert_int_or_float(path, value, type_info):
logging.error("%s invalid value: %s", _prefix(path), str(value))
return set()
def verify_list(self, path: _Path, value: Any, type_info: Any) -> Set[str]:
""" Verifies a list value. """
verifier = self._verifier_map[type_info["spec-type"]]
for index, element in enumerate(value):
verifier.verify(_Path(path.item, path.path + f"[{index}]"),
element)
return set()
def verify_none(self, _path: _Path, _value: Any,
_type_info: Any) -> Set[str]:
""" Verifies a none value. """
# pylint: disable=no-self-use
return set()
def verify_str(self, path: _Path, value: Any, type_info: Any) -> Set[str]:
""" Verifies a string value. """
# pylint: disable=no-self-use
if not _assert_str(path, value, type_info):
logging.error("%s invalid value: %s", _prefix(path), value)
return set()
def verify(self, path: _Path, value: Any) -> Set[str]:
self.verify_info(path)
type_name = _type_name(value)
if type_name in self._info_map:
return _VERIFY[type_name](self, path, value,
self._info_map[type_name])
logging.error(
"%s expected value of types %s for type '%s', "
"actual type '%s'", _prefix(path), str(sorted(self._info_map)),
self._name, type_name)
return set()
def _add_subtype_verifier(self, subtype_key: str, subtype_value: str,
subtype_name: str) -> None:
logging.info("add subtype '%s' to '%s'", subtype_name, self._name)
assert not self._subtype_key or self._subtype_key == subtype_key
assert subtype_value not in self._subtype_verifiers
subtype_verifier = self._verifier_map[subtype_name]
subtype_verifier.is_subtype = True
self._subtype_key = subtype_key
self._subtype_verifiers[subtype_value] = subtype_verifier
def resolve_type_refinements(self) -> None:
for link in self._item.links_to_children():
if link.role == "spec-refinement":
self._add_subtype_verifier(link["spec-key"],
link["spec-value"],
link.item["spec-type"])
_VERIFY = {
"bool": _ItemVerifier.verify_bool,
"dict": _ItemVerifier.verify_dict,
"float": _ItemVerifier.verify_int_or_float,
"int": _ItemVerifier.verify_int_or_float,
"list": _ItemVerifier.verify_list,
"none": _ItemVerifier.verify_none,
"str": _ItemVerifier.verify_str,
}
_ASSERT_KEYS = {
"all": _ItemVerifier.assert_keys_subset,
"at-least-one": _ItemVerifier.assert_keys_at_least_one,
"at-most-one": _ItemVerifier.assert_keys_at_most_one,
"exactly-one": _ItemVerifier.assert_keys_exactly_one,
"none": _ItemVerifier.assert_keys_no_constraints,
}
def _create_verifier(item: Item, verifier_map: _VerifierMap) -> _Verifier:
spec_type = item["spec-type"]
assert spec_type not in verifier_map
spec_info = item["spec-info"]
assert isinstance(spec_info, dict)
return _ItemVerifier(spec_type, verifier_map, spec_info, item)
def _gather_item_verifiers(item: Item, verifier_map: _VerifierMap) -> None:
for link in item.links_to_children():
if link.role == "spec-member":
_create_verifier(link.item, verifier_map)
@contextmanager
def _add_filter() -> Iterator[_Filter]:
logger = logging.getLogger()
log_filter = _Filter()
logger.addFilter(log_filter)
yield log_filter
logger.removeFilter(log_filter)
class SpecVerifier:
""" Verifies items according to the specification of the specification. """
# pylint: disable=too-few-public-methods
def __init__(self, item_cache: ItemCache, root_uid: str):
verifier_map = {} # type: _VerifierMap
_NameVerifier("name", verifier_map)
_UIDVerifier("uid", verifier_map)
_Verifier("bool", verifier_map)
_Verifier("float", verifier_map)
_Verifier("int", verifier_map)
_Verifier("none", verifier_map)
_Verifier("str", verifier_map)
try:
root_item = item_cache[root_uid]
except KeyError:
self._root_verifier = None
else:
self._root_verifier = _create_verifier(root_item, verifier_map)
_gather_item_verifiers(root_item, verifier_map)
for name in sorted(verifier_map):
logging.info("type: %s", name)
verifier_map[name].resolve_type_refinements()
def verify_all(self, item_cache: ItemCache) -> VerifyStatus:
""" Verifies all items of the cache. """
with _add_filter() as log_filter:
if self._root_verifier is None:
logging.error("root type item does not exist in item cache")
else:
logging.info("start specification item verification")
for key in sorted(item_cache.all):
item = item_cache[key]
self._root_verifier.verify(_Path(item, f"{item.uid}:"),
item.data)
logging.info("finished specification item verification")
return log_filter.get_verify_info()
def verify(self, item: Item) -> VerifyStatus:
""" Verifies the item. """
with _add_filter() as log_filter:
if self._root_verifier is None:
logging.error("root type item does not exist in item cache")
else:
self._root_verifier.verify(_Path(item, f"{item.uid}:"),
item.data)
return log_filter.get_verify_info()
def verify(config: dict, item_cache: ItemCache) -> VerifyStatus:
"""
Verifies specification items according to the configuration.
:param config: A dictionary with configuration entries.
:param item_cache: The specification item cache.
"""
try:
root_uid = config["root-type"]
except KeyError:
logging.error("configuration has no root type")
return VerifyStatus(0, 1, 0, 0, 0)
verifier = SpecVerifier(item_cache, root_uid)
return verifier.verify_all(item_cache)