# SPDX-License-Identifier: BSD-2-Clause
""" This module provides specification items and an item cache. """
# Copyright (C) 2019, 2020 embedded brains GmbH (http://www.embedded-brains.de)
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
from contextlib import contextmanager
import os
import pickle
import string
import stat
from typing import Any, Callable, Dict, Iterator, List, NamedTuple, Mapping, \
Optional, Tuple
import yaml
class ItemGetValueContext(NamedTuple):
""" Context used to get an item value. """
item: "Item"
path: str
value: Any
key: str
index: Any # should be int, but this triggers a mypy error
@property
def type_path_key(self) -> str:
""" Returns the item type followed the path to the key. """
return f"{self.item.type}:{os.path.join(self.path, self.key)}"
ItemMap = Dict[str, "Item"]
ItemGetValue = Callable[[ItemGetValueContext], Any]
def _is_enabled_op_and(enabled: List[str], enabled_by: Any) -> bool:
for next_enabled_by in enabled_by:
if not is_enabled(enabled, next_enabled_by):
return False
return True
def _is_enabled_op_not(enabled: List[str], enabled_by: Any) -> bool:
return not is_enabled(enabled, enabled_by)
def _is_enabled_op_or(enabled: List[str], enabled_by: Any) -> bool:
for next_enabled_by in enabled_by:
if is_enabled(enabled, next_enabled_by):
return True
return False
_IS_ENABLED_OP = {
"and": _is_enabled_op_and,
"not": _is_enabled_op_not,
"or": _is_enabled_op_or
}
def is_enabled(enabled: List[str], enabled_by: Any) -> bool:
""" Verifies if the given parameter is enabled by specific enables. """
if isinstance(enabled_by, bool):
return enabled_by
if isinstance(enabled_by, list):
return _is_enabled_op_or(enabled, enabled_by)
if isinstance(enabled_by, dict):
key, value = next(iter(enabled_by.items()))
return _IS_ENABLED_OP[key](enabled, value)
return enabled_by in enabled
def _str_representer(dumper, data):
return dumper.represent_scalar("tag:yaml.org,2002:str",
data,
style="|" if "\n" in data else "")
yaml.add_representer(str, _str_representer)
class Link:
""" A link to an item. """
def __init__(self, item: "Item", data: Any):
self._item = item
self._data = data
@classmethod
def create(cls, link: "Link", item: "Item") -> "Link":
""" Creates a link using an existing link with a new target item. """
return cls(item, link._data) # pylint: disable=protected-access
def __getitem__(self, name: str) -> Any:
return self._data[name]
@property
def item(self) -> "Item":
""" The item referenced by this link. """
return self._item
@property
def role(self) -> str:
""" The link role. """
return self._data["role"]
def _get_value(ctx: ItemGetValueContext) -> Any:
value = ctx.value[ctx.key]
if ctx.index >= 0:
return value[ctx.index]
return value
def normalize_key_path(key_path: str, prefix: str = "") -> str:
""" Normalizes the key path with an optional prefix path. """
if not os.path.isabs(key_path):
key_path = os.path.join(prefix, key_path)
return os.path.normpath(key_path)
class Item:
""" Objects of this class represent a specification item. """
def __init__(self, item_cache: "ItemCache", uid: str, data: Any):
self._item_cache = item_cache
self._uid = uid
self._data = data
self._links_to_parents = [] # type: List[Link]
self._links_to_children = [] # type: List[Link]
def __contains__(self, key: str) -> bool:
return key in self._data
def __getitem__(self, key: str) -> Any:
return self._data[key]
def __setitem__(self, key: str, value: Any) -> None:
self._data[key] = value
def get(self, key: str, default: Any) -> Any:
"""
Gets the attribute value if the attribute exists, otherwise the
specified default value is returned.
"""
return self._data.get(key, default)
def get_by_normalized_key_path(
self,
normalized_key_path: str,
get_value: ItemGetValue = _get_value) -> Any:
"""
Gets the attribute value corresponding to the normalized key path.
"""
path = "/"
value = self._data
for key in normalized_key_path.strip("/").split("/"):
parts = key.split("[")
try:
index = int(parts[1].split("]")[0])
except IndexError:
index = -1
ctx = ItemGetValueContext(self, path, value, parts[0], index)
try:
value = get_value(ctx)
except KeyError:
value = _get_value(ctx)
path = os.path.join(path, key)
return value
def get_by_key_path(self,
key_path: str,
prefix: str = "",
get_value: ItemGetValue = _get_value) -> Any:
""" Gets the attribute value corresponding to the key path. """
return self.get_by_normalized_key_path(
normalize_key_path(key_path, prefix), get_value)
@property
def uid(self) -> str:
""" Returns the UID of the item. """
return self._uid
@property
def spec(self) -> str:
""" Returns the UID of the item with an URL-like format. """
return f"spec:{self._uid}"
def to_abs_uid(self, abs_or_rel_uid: str) -> str:
"""
Returns the absolute UID of an absolute UID or an UID relative to this
item.
"""
if abs_or_rel_uid == ".":
return self._uid
if os.path.isabs(abs_or_rel_uid):
return abs_or_rel_uid
return os.path.normpath(
os.path.join(os.path.dirname(self.uid), abs_or_rel_uid))
def map(self, abs_or_rel_uid: str) -> "Item":
"""
Maps the absolute UID or the UID relative to this item to the
corresponding item.
"""
return self._item_cache[self.to_abs_uid(abs_or_rel_uid)]
def links_to_parents(self) -> Iterator[Link]:
""" Yields the links to the parents of this items. """
yield from self._links_to_parents
def parents(self, role: Optional[str] = None) -> Iterator["Item"]:
""" Yields the parents of this items. """
if role is None:
for link in self._links_to_parents:
yield link.item
else:
for link in self._links_to_parents:
if link.role == role:
yield link.item
def links_to_children(self) -> Iterator[Link]:
""" Yields the links to the children of this items. """
yield from self._links_to_children
def children(self, role: Optional[str] = None) -> Iterator["Item"]:
""" Yields the children of this items. """
if role is None:
for link in self._links_to_children:
yield link.item
else:
for link in self._links_to_children:
if link.role == role:
yield link.item
def init_parents(self, item_cache: "ItemCache"):
""" Initializes the list of links to parents of this items. """
for data in self._data["links"]:
try:
link = Link(item_cache[self.to_abs_uid(data["uid"])], data)
self._links_to_parents.append(link)
except KeyError as err:
msg = (f"item '{self.uid}' links "
f"to non-existing item '{data['uid']}'")
raise KeyError(msg) from err
def add_link_to_child(self, link: Link):
""" Adds a link to a child item of this items. """
self._links_to_children.append(link)
def is_enabled(self, enabled: List[str]):
""" Returns true if the item is enabled by the specified enables. """
return is_enabled(enabled, self["enabled-by"])
@property
def data(self) -> Any:
""" The item data. """
return self._data
@property
def file(self) -> str:
""" Returns the file of the item. """
return self._data["_file"]
@file.setter
def file(self, value: str):
""" Sets the file of the item. """
self._data["_file"] = value
@property
def type(self) -> str:
""" Returns the type of the item. """
return self._data["_type"]
def save(self):
""" Saves the item to the corresponding file. """
with open(self.file, "w") as dst:
data = {}
for key, value in self._data.items():
if not key.startswith("_"):
data[key] = value
dst.write(
yaml.dump(data, default_flow_style=False, allow_unicode=True))
def load(self):
""" Loads the item from the corresponding file. """
filename = self.file
with open(filename, "r") as src:
self._data = yaml.safe_load(src.read())
self._data["_file"] = filename
class ItemTemplate(string.Template):
""" String template for item mapper identifiers. """
idpattern = "[a-zA-Z0-9._/-]+(:[][a-zA-Z0-9._/-]+)?(|[a-zA-Z0-9_]+)*"
class ItemMapper(Mapping[str, object]):
""" Maps identifiers to items and attribute values. """
def __init__(self, item: Item, recursive: bool = False):
self._item = item
self._recursive = recursive
self._prefix = [""]
self._get_value = {} # type: Dict[str, ItemGetValue]
@property
def item(self) -> Item:
""" The item of the mapper. """
return self._item
@item.setter
def item(self, item: Item) -> None:
""" Sets the item of the mapper. """
self._item = item
def add_get_value(self, type_path_key: str,
get_value: ItemGetValue) -> None:
"""
Adds a get value for the specified type and key path.
"""
self._get_value[type_path_key] = get_value
def push_prefix(self, prefix: str) -> None:
""" Pushes a key path prefix. """
self._prefix.append(prefix)
def pop_prefix(self) -> None:
""" Pops a key path prefix. """
self._prefix.pop()
@contextmanager
def prefix(self, prefix: str) -> Iterator[None]:
""" Opens a key path prefix context. """
self.push_prefix(prefix)
yield
self.pop_prefix()
def map(self, identifier: str) -> Tuple[Item, str, Any]:
"""
Maps an identifier to the corresponding item and attribute value.
"""
uid_key_path, *pipes = identifier.split("|")
colon = uid_key_path.find(":")
if colon >= 0:
uid, key_path = uid_key_path[:colon], uid_key_path[colon + 1:]
else:
uid, key_path = uid_key_path, "/_uid"
if uid == ".":
item = self._item
prefix = "/".join(self._prefix)
else:
item = self._item.map(uid)
prefix = ""
key_path = normalize_key_path(key_path, prefix)
value = item.get_by_normalized_key_path(key_path, self.get_value)
for func in pipes:
value = getattr(self, func)(value)
return item, key_path, value
@contextmanager
def _item_and_prefix(self, item: Item, prefix: str) -> Iterator[None]:
item_2 = self._item
prefix_2 = self._prefix
self._item = item
self._prefix = [prefix]
yield
self._item = item_2
self._prefix = prefix_2
def __getitem__(self, identifier):
item, key_path, value = self.map(identifier)
if self._recursive:
with self._item_and_prefix(item, os.path.dirname(key_path)):
return self.substitute(value)
return value
def __iter__(self):
raise StopIteration
def __len__(self):
raise AttributeError
def substitute(self, text: Optional[str]) -> str:
""" Performs a variable substitution using the item mapper. """
if not text:
return ""
return ItemTemplate(text).substitute(self)
def substitute_with_prefix(self, text: Optional[str], prefix: str) -> str:
"""
Performs a variable substitution using the item mapper with a prefix.
"""
if not text:
return ""
with self.prefix(prefix):
return ItemTemplate(text).substitute(self)
def get_value(self, ctx: ItemGetValueContext) -> Any:
""" Gets a value by key and optional index. """
return self._get_value[ctx.type_path_key](ctx)
class _SpecType(NamedTuple):
key: str
refinements: Dict[str, Any]
def _gather_spec_refinements(item: Item) -> Optional[_SpecType]:
new_type = None # type: Optional[_SpecType]
for link in item.links_to_children():
if link.role == "spec-refinement":
key = link["spec-key"]
if new_type is None:
new_type = _SpecType(key, {})
assert new_type.key == key
new_type.refinements[
link["spec-value"]] = _gather_spec_refinements(link.item)
return new_type
class ItemCache:
""" This class provides a cache of specification items. """
def __init__(self, config: Any):
self._items = {} # type: ItemMap
self._top_level = {} # type: ItemMap
self._load_items(config)
self._set_types(config)
def __getitem__(self, uid: str) -> Item:
return self._items[uid]
@property
def all(self) -> ItemMap:
""" Returns the map of all specification items. """
return self._items
@property
def top_level(self) -> ItemMap:
""" Returns the map of top-level specification items. """
return self._top_level
def _load_items_in_dir(self, base: str, path: str, cache_file: str,
update_cache: bool) -> None:
data_by_uid = {} # type: Dict[str, Any]
if update_cache:
for name in os.listdir(path):
path2 = os.path.join(path, name)
if name.endswith(".yml") and not name.startswith("."):
uid = "/" + os.path.relpath(path2, base).replace(
".yml", "")
with open(path2, "r") as yaml_src:
data = yaml.safe_load(yaml_src.read())
data["_file"] = os.path.abspath(path2)
data["_uid"] = uid
data_by_uid[uid] = data
os.makedirs(os.path.dirname(cache_file), exist_ok=True)
with open(cache_file, "wb") as out:
pickle.dump(data_by_uid, out)
else:
with open(cache_file, "rb") as pickle_src:
data_by_uid = pickle.load(pickle_src)
for uid, data in iter(data_by_uid.items()):
item = Item(self, uid, data)
self._items[uid] = item
if not item["links"]:
self._top_level[uid] = item
def _load_items_recursive(self, base: str, path: str,
cache_dir: str) -> None:
mid = os.path.abspath(path)
mid = mid.replace(os.path.commonpath([cache_dir, mid]), "").strip("/")
cache_file = os.path.join(cache_dir, mid, "spec.pickle")
try:
mtime = os.path.getmtime(cache_file)
update_cache = False
except FileNotFoundError:
update_cache = True
for name in os.listdir(path):
path2 = os.path.join(path, name)
if name.endswith(".yml") and not name.startswith("."):
update_cache = update_cache or mtime <= os.path.getmtime(path2)
else:
if stat.S_ISDIR(os.lstat(path2).st_mode):
self._load_items_recursive(base, path2, cache_dir)
self._load_items_in_dir(base, path, cache_file, update_cache)
def _init_parents(self) -> None:
for item in self._items.values():
item.init_parents(self)
def _init_children(self) -> None:
for uid in sorted(self._items):
item = self._items[uid]
for link in item.links_to_parents():
link.item.add_link_to_child(Link.create(link, item))
def _load_items(self, config: Any) -> None:
cache_dir = os.path.abspath(config["cache-directory"])
for path in config["paths"]:
self._load_items_recursive(path, path, cache_dir)
self._init_parents()
self._init_children()
def _set_types(self, config: Any) -> None:
spec_root = config["spec-type-root-uid"]
if spec_root:
root_type = _gather_spec_refinements(self[spec_root])
else:
root_type = None
for item in self._items.values():
spec_type = root_type
value = item.data
path = [] # type: List[str]
while spec_type is not None:
type_name = value[spec_type.key]
path.append(type_name)
spec_type = spec_type.refinements[type_name]
item["_type"] = "/".join(path)
class EmptyItemCache(ItemCache):
""" This class provides a empty cache of specification items. """
def __init__(self):
super().__init__({
"cache-directory": ".",
"paths": [],
"spec-type-root-uid": None
})
class EmptyItem(Item):
""" Objects of this class represent empty items. """
def __init__(self):
super().__init__(EmptyItemCache(), "", {})