# SPDX-License-Identifier: BSD-2-Clause
""" This module provides specification items and an item cache. """
# Copyright (C) 2019, 2022 embedded brains GmbH & Co. KG
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# pylint: disable=too-many-lines
from contextlib import contextmanager
import base64
import hashlib
import os
import pickle
import re
import string
import stat
from typing import Any, Callable, Dict, Iterable, Iterator, List, Match, \
NamedTuple, Optional, Set, TextIO, Tuple, Union
import json
import yaml
try:
from yaml import CSafeLoader as SafeLoader
except ImportError: # pragma: no cover
from yaml import SafeLoader # type: ignore
class ItemGetValueContext(NamedTuple):
""" Context used to get an item value. """
item: "Item"
path: str
value: Any
key: str
index: Any # should be int, but this triggers a mypy error
args: Optional[str]
def arg(self, name: str, value: Optional[str] = None) -> str:
""" Get argument value by name. """
args = dict(
kv.split("=") # type: ignore
for kv in self.args.split(",")) # type: ignore
if value:
return args.get(name, value)
return args[name]
ItemMap = Dict[str, "Item"]
ItemGetValue = Callable[[ItemGetValueContext], Any]
ItemGetValueMap = Dict[str, Tuple[ItemGetValue, Any]]
def _is_enabled_op_and(enabled: List[str], enabled_by: Any) -> bool:
for next_enabled_by in enabled_by:
if not is_enabled(enabled, next_enabled_by):
return False
return True
def _is_enabled_op_not(enabled: List[str], enabled_by: Any) -> bool:
return not is_enabled(enabled, enabled_by)
def _is_enabled_op_or(enabled: List[str], enabled_by: Any) -> bool:
for next_enabled_by in enabled_by:
if is_enabled(enabled, next_enabled_by):
return True
return False
_IS_ENABLED_OP = {
"and": _is_enabled_op_and,
"not": _is_enabled_op_not,
"or": _is_enabled_op_or
}
def is_enabled(enabled: List[str], enabled_by: Any) -> bool:
""" Verifies if the given parameter is enabled by specific enables. """
if isinstance(enabled_by, bool):
return enabled_by
if isinstance(enabled_by, list):
return _is_enabled_op_or(enabled, enabled_by)
if isinstance(enabled_by, dict):
key, value = next(iter(enabled_by.items()))
return _IS_ENABLED_OP[key](enabled, value)
return enabled_by in enabled
def _str_representer(dumper, data):
return dumper.represent_scalar("tag:yaml.org,2002:str",
data,
style="|" if "\n" in data else "")
yaml.add_representer(str, _str_representer)
class Link:
""" A link to an item. """
def __init__(self, item: "Item", data: Any):
self._item = item
self._data = data
@classmethod
def create(cls, link: "Link", item: "Item") -> "Link":
""" Creates a link using an existing link with a new target item. """
return cls(item, link._data) # pylint: disable=protected-access
def __getitem__(self, name: str) -> Any:
return self._data[name]
def __setitem__(self, key: str, value: Any) -> None:
self._data[key] = value
def __lt__(self, other: Any) -> bool:
if not isinstance(other, Link):
return NotImplemented
# pylint: disable=protected-access
return self._item.uid < other._item.uid
@property
def item(self) -> "Item":
""" The item referenced by this link. """
return self._item
@property
def role(self) -> str:
""" The link role. """
return self._data["role"]
def _get_value(ctx: ItemGetValueContext) -> Any:
value = ctx.value[ctx.key]
if ctx.index >= 0:
return value[ctx.index]
return value
def normalize_key_path(key_path: str, prefix: str = "") -> str:
""" Normalizes the key path with an optional prefix path. """
if not os.path.isabs(key_path):
key_path = os.path.join(prefix, key_path)
return os.path.normpath(key_path)
_TYPES = {
type(True): "B".encode("utf-8"),
type(1.0): "F".encode("utf-8"),
type(1): "I".encode("utf-8"),
type(None): "N".encode("utf-8"),
type(""): "S".encode("utf-8"),
}
def _hash_data(data, state) -> None:
if isinstance(data, list):
for value in data:
_hash_data(value, state)
elif isinstance(data, dict):
for key, value in sorted(data.items()):
if not key.startswith("_"):
state.update(key.encode("utf-8"))
_hash_data(value, state)
else:
state.update(_TYPES[type(data)])
state.update(str(data).encode("utf-8"))
def data_digest(data: Any) -> str:
""" Returns a digest of the data. """
state = hashlib.sha256()
_hash_data(data, state)
return base64.urlsafe_b64encode(state.digest()).decode("ascii")
_UID_TO_UPPER = re.compile(r"[/_-]+(.)")
def _match_to_upper(match: Match) -> str:
return match.group(1).upper()
def _is_link_enabled(link: Link) -> bool:
return link.item._data["_enabled"] # pylint: disable=protected-access
def link_is_enabled(_link: Link) -> bool:
""" Returns true. """
return True
class Item:
""" Objects of this class represent a specification item. """
# pylint: disable=too-many-public-methods
def __init__(self, item_cache: "ItemCache", uid: str, data: Any):
data["_type"] = ""
self._cache = item_cache
self._ident = _UID_TO_UPPER.sub(_match_to_upper, uid)
self._uid = uid
self._data = data
self._links_to_parents: List[Link] = []
self._links_to_children: List[Link] = []
self._resolved_proxy = False
def __eq__(self, other: Any) -> bool:
if not isinstance(other, Item):
return NotImplemented
return self._uid == other._uid # pylint: disable=protected-access
def __lt__(self, other: Any) -> bool:
if not isinstance(other, Item):
return NotImplemented
return self._uid < other._uid # pylint: disable=protected-access
def __hash__(self) -> int:
return hash(self._uid)
def __contains__(self, key: str) -> bool:
return key in self._data
def __getitem__(self, key: str) -> Any:
return self._data[key]
def __setitem__(self, key: str, value: Any) -> None:
self._data[key] = value
@property
def cache(self) -> "ItemCache":
""" Returns the cache of the item. """
return self._cache
@property
def digest(self) -> str:
""" Returns the digest of the item data. """
return data_digest(self._data)
def get(self, key: str, default: Any) -> Any:
"""
Gets the attribute value if the attribute exists, otherwise the
specified default value is returned.
"""
return self._data.get(key, default)
@property
def uid(self) -> str:
""" Returns the UID of the item. """
return self._uid
@property
def ident(self) -> str:
""" Returns the identifier of the item. """
return self._ident
@property
def spec(self) -> str:
""" Returns the UID of the item with an URL-like format. """
return f"spec:{self._uid}"
@property
def spec_2(self) -> str:
"""
Returns the UID of the item with an URL-like format with invisible
white space to allow line breaks.
"""
uid = self._uid.replace("/", "/\u200b")
return f"spec:{uid}"
def to_abs_uid(self, abs_or_rel_uid: str) -> str:
"""
Returns the absolute UID of an absolute UID or an UID relative to this
item.
"""
if abs_or_rel_uid == ".":
return self._uid
if os.path.isabs(abs_or_rel_uid):
return abs_or_rel_uid
return os.path.normpath(
os.path.join(os.path.dirname(self.uid), abs_or_rel_uid))
def map(self, abs_or_rel_uid: str) -> "Item":
"""
Maps the absolute UID or the UID relative to this item to the
corresponding item.
"""
return self._cache[self.to_abs_uid(abs_or_rel_uid)]
def links_to_parents(
self,
role: Optional[Union[str, Iterable[str]]] = None,
is_link_enabled: Callable[[Link], bool] = _is_link_enabled
) -> Iterator[Link]:
""" Yields the links to the parents of this items. """
if role is None:
for link in self._links_to_parents:
if is_link_enabled(link):
yield link
elif isinstance(role, str):
for link in self._links_to_parents:
if link.role == role and is_link_enabled(link):
yield link
else:
for link in self._links_to_parents:
if link.role in role and is_link_enabled(link):
yield link
def parents(
self,
role: Optional[Union[str, Iterable[str]]] = None,
is_link_enabled: Callable[[Link], bool] = _is_link_enabled
) -> Iterator["Item"]:
""" Yields the parents of this items. """
for link in self.links_to_parents(role, is_link_enabled):
yield link.item
def parent(
self,
role: Optional[Union[str, Iterable[str]]] = None,
index: Optional[int] = 0,
is_link_enabled: Callable[[Link],
bool] = _is_link_enabled) -> "Item":
""" Returns the parent with the specified role and index. """
for item_index, item in enumerate(self.parents(role, is_link_enabled)):
if item_index == index:
return item
raise IndexError
def parent_link(
self,
role: Optional[Union[str, Iterable[str]]] = None,
index: Optional[int] = 0,
is_link_enabled: Callable[[Link],
bool] = _is_link_enabled) -> Link:
""" Returns the parent link with the specified role and index. """
for link_index, link in enumerate(
self.links_to_parents(role, is_link_enabled)):
if link_index == index:
return link
raise IndexError
def links_to_children(
self,
role: Optional[Union[str, Iterable[str]]] = None,
is_link_enabled: Callable[[Link], bool] = _is_link_enabled
) -> Iterator[Link]:
""" Yields the links to the children of this items. """
if role is None:
for link in self._links_to_children:
if is_link_enabled(link):
yield link
elif isinstance(role, str):
for link in self._links_to_children:
if link.role == role and is_link_enabled(link):
yield link
else:
for link in self._links_to_children:
if link.role in role and is_link_enabled(link):
yield link
def children(
self,
role: Optional[Union[str, Iterable[str]]] = None,
is_link_enabled: Callable[[Link], bool] = _is_link_enabled
) -> Iterator["Item"]:
""" Yields the children of this items. """
for link in self.links_to_children(role, is_link_enabled):
yield link.item
def child(
self,
role: Optional[Union[str, Iterable[str]]] = None,
index: Optional[int] = 0,
is_link_enabled: Callable[[Link],
bool] = _is_link_enabled) -> "Item":
""" Returns the child with the specified role and index. """
for item_index, item in enumerate(self.children(role,
is_link_enabled)):
if item_index == index:
return item
raise IndexError
def child_link(
self,
role: Optional[Union[str, Iterable[str]]] = None,
index: Optional[int] = 0,
is_link_enabled: Callable[[Link],
bool] = _is_link_enabled) -> Link:
""" Returns the child link with the specified role and index. """
for link_index, link in enumerate(
self.links_to_children(role, is_link_enabled)):
if link_index == index:
return link
raise IndexError
def init_parents(self, item_cache: "ItemCache") -> None:
""" Initializes the list of links to parents of this items. """
for data in self._data["links"]:
try:
link = Link(item_cache[self.to_abs_uid(data["uid"])], data)
self._links_to_parents.append(link)
except KeyError as err:
msg = (f"item '{self.uid}' links "
f"to non-existing item '{data['uid']}'")
raise KeyError(msg) from err
def init_children(self) -> None:
""" Initializes the list of links to children of this items. """
for link in self._links_to_parents:
link.item.add_link_to_child(Link.create(link, self))
def add_link_to_parent(self, link: Link):
""" Adds the link as a parent item link to this item. """
self._links_to_parents.append(link)
def add_link_to_child(self, link: Link):
""" Adds the link as a child item link to this item. """
self._links_to_children.append(link)
def is_enabled(self, enabled: List[str]):
"""
Returns true if the item is enabled by the enabled set, otherwise
returns false.
"""
return is_enabled(enabled, self._data["enabled-by"])
@property
def enabled(self) -> bool:
""" Returns true if the item is enabled, otherwise returns false. """
return self._data["_enabled"]
@property
def resolved_proxy(self) -> bool:
""" Is true if the item is a resolved proxy, otherwise false. """
return self._resolved_proxy
@property
def data(self) -> Any:
""" The item data. """
return self._data
@property
def file(self) -> str:
""" Returns the file of the item. """
return self._data["_file"]
@file.setter
def file(self, value: str):
""" Sets the file of the item. """
self._data["_file"] = value
@property
def type(self) -> str:
""" Returns the type of the item. """
return self._data["_type"]
def save(self):
""" Saves the item to the corresponding file. """
self._cache.save_data(self.file, self._data)
def load(self):
""" Loads the item from the corresponding file. """
self._data = self._cache.load_data(self.file, self._uid)
def create_unique_link(child: Item, parent: Item, data: Any) -> None:
"""
Creates a unique link from the child to the parent item and vice versa
using the data for the link.
"""
for item in parent.children(data["role"]):
if item.uid == child.uid:
break
else:
parent.add_link_to_child(Link(child, data))
child.add_link_to_parent(Link(parent, data))
class ItemTemplate(string.Template):
""" String template for item mapper identifiers. """
idpattern = "[a-zA-Z0-9._/-]+:[\\[\\]a-zA-Z0-9._/-]+(:[^${}]*)?"
class _ItemMapperContext(dict):
""" Context to map identifiers to items and attribute values. """
def __init__(self, mapper: "ItemMapper", item: Optional[Item],
prefix: Optional[str], recursive: bool):
super().__init__()
self._mapper = mapper
self._item = item
self._prefix = prefix
self._recursive = recursive
def __getitem__(self, identifier):
item, key_path, value = self._mapper.map(identifier, self._item,
self._prefix)
if self._recursive:
return self._mapper.substitute(value, item,
os.path.dirname(key_path))
return value
class _GetValueDictionary(dict):
def __init__(self, get_value: ItemGetValue):
super().__init__()
self._get_value = get_value
def get(self, _key, _default):
return (self._get_value, {})
class ItemMapper:
""" Maps identifiers to items and attribute values. """
def __init__(self, item: Item, recursive: bool = False):
self._item = item
self._recursive = recursive
self._prefix = [""]
self._get_value_map: Dict[str, ItemGetValueMap] = {}
self.copyrights_by_license: Dict[str, Set[str]] = {}
@property
def item(self) -> Item:
""" The item of the mapper. """
return self._item
@item.setter
def item(self, item: Item) -> None:
""" Sets the item of the mapper. """
self._item = item
def _add_get_value_map(
self, type_path_key: str, new_get_value_map: Tuple[ItemGetValue,
Dict]) -> None:
type_name, path_key = type_path_key.split(":")
keys = path_key.strip("/").split("/")
get_value_map = self._get_value_map.setdefault(type_name, {})
for key in keys[:-1]:
_, get_value_map = get_value_map.setdefault(key, (_get_value, {}))
get_value_map[keys[-1]] = new_get_value_map
def add_get_value(self, type_path_key: str,
get_value: ItemGetValue) -> None:
"""
Adds a get value for the specified type and key path.
"""
self._add_get_value_map(type_path_key, (get_value, {}))
def add_get_value_dictionary(self, type_path_key: str,
get_value: ItemGetValue) -> None:
"""
Adds a get value dictionary for the specified type and key path.
"""
self._add_get_value_map(type_path_key,
(_get_value, _GetValueDictionary(get_value)))
def push_prefix(self, prefix: str) -> None:
""" Pushes a key path prefix. """
self._prefix.append(prefix)
def pop_prefix(self) -> None:
""" Pops a key path prefix. """
self._prefix.pop()
@contextmanager
def prefix(self, prefix: str) -> Iterator[None]:
""" Opens a key path prefix context. """
self.push_prefix(prefix)
yield
self.pop_prefix()
@contextmanager
def scope(self, item: Item) -> Iterator[None]:
""" Opens an item scope context. """
previous = self._item
self._item = item
yield
self._item = previous
def get_value_map(self, item: Item) -> ItemGetValueMap:
""" Returns the get value map for the item. """
return self._get_value_map.get(item.type, {})
def _get_by_normalized_key_path(self, item: Item, normalized_key_path: str,
args: Optional[str]) -> Any:
"""
Gets the attribute value associated with the normalized key path.
"""
get_value_map = self.get_value_map(item)
path = "/"
value = item.data
for key in normalized_key_path.strip("/").split("/"):
parts = key.split("[")
try:
index = int(parts[1].split("]")[0])
except IndexError:
index = -1
ctx = ItemGetValueContext(item, path, value, parts[0], index, args)
get_value, get_value_map = get_value_map.get(
parts[0], (_get_value, {}))
value = get_value(ctx)
path = os.path.join(path, key)
self.copyrights_by_license.setdefault(item["SPDX-License-Identifier"],
set()).update(item["copyrights"])
return value
def map(self,
identifier: str,
item: Optional[Item] = None,
prefix: Optional[str] = None) -> Tuple[Item, str, Any]:
"""
Maps the identifier with item and prefix to the associated item, key
path, and attribute value.
"""
colon = identifier.find(":")
uid = identifier[:colon]
more = identifier[colon + 1:]
colon = more.find(":")
if colon < 0:
key_path = more
args = None
else:
key_path = more[:colon]
args = more[colon + 1:]
if item is None:
item = self._item
if uid == ".":
if prefix is None:
prefix = "/".join(self._prefix)
else:
prefix = ""
try:
item = item.map(uid)
except KeyError as err:
msg = (f"item '{uid}' relative to {item.spec} "
f"specified by '{identifier}' does not exist")
raise ValueError(msg) from err
key_path = normalize_key_path(key_path, prefix)
try:
value = self._get_by_normalized_key_path(item, key_path, args)
except Exception as err:
msg = (f"cannot get value for '{key_path}' of {item.spec} "
f"specified by '{identifier}'")
raise ValueError(msg) from err
return item, key_path, value
def __getitem__(self, identifier):
item, key_path, value = self.map(identifier)
if self._recursive:
return self.substitute(value, item, os.path.dirname(key_path))
return value
def substitute(self,
text: Optional[str],
item: Optional[Item] = None,
prefix: Optional[str] = None) -> str:
"""
Performs a variable substitution using the item mapper with the item
and prefix.
"""
if not text:
return ""
try:
context = _ItemMapperContext(self, item, prefix, self._recursive)
return ItemTemplate(text).substitute(context)
except Exception as err:
spec = self._item.spec if item is None else item.spec
if prefix is None:
prefix = "/".join(self._prefix)
msg = (f"substitution for {spec} using prefix '{prefix}' "
f"failed for text: {text}")
raise ValueError(msg) from err
class _SpecType(NamedTuple):
key: str
refinements: Dict[str, Any]
def _gather_spec_refinements(item: Item) -> Optional[_SpecType]:
new_type: Optional[_SpecType] = None
for link in item._links_to_children: # pylint: disable=protected-access
if link.role == "spec-refinement":
key = link["spec-key"]
if new_type is None:
new_type = _SpecType(key, {})
assert new_type.key == key
new_type.refinements[
link["spec-value"]] = _gather_spec_refinements(link.item)
return new_type
def _load_yaml_data(path: str, uid: str) -> Any:
with open(path, "r", encoding="utf-8") as src:
try:
data = yaml.load(src.read(), Loader=SafeLoader)
except yaml.YAMLError as err:
msg = ("YAML error while loading specification item file "
f"'{path}': {str(err)}")
raise IOError(msg) from err
data["_file"] = os.path.abspath(path)
data["_uid"] = uid
return data
def _load_json_data(path: str, uid: str) -> Any:
with open(path, "r", encoding="utf-8") as src:
try:
data = json.load(src)
except json.JSONDecodeError as err:
msg = ("JSON error while loading specification item file "
f"'{path}': {str(err)}")
raise IOError(msg) from err
data["_file"] = os.path.abspath(path)
data["_uid"] = uid
return data
def _is_item_enabled(enabled: List[str], item: Item) -> bool:
return is_enabled(enabled, item["enabled-by"])
def item_is_enabled(_enabled: List[str], _item: Item) -> bool:
""" Returns true. """
return True
def _resolve_proxy(proxy: Item, is_link_enabled: Callable[[Link],
bool]) -> None:
# pylint: disable=protected-access
try:
member = proxy.child("proxy-member", is_link_enabled=is_link_enabled)
except IndexError:
pass
else:
member._links_to_parents.extend(proxy._links_to_parents)
member._links_to_children.extend(proxy._links_to_children)
proxy._data = member._data
proxy._ident = member._ident
proxy._resolved_proxy = True
proxy._uid = member._uid
for link in proxy._links_to_parents:
for link_2 in link.item._links_to_children:
if link_2.item == proxy:
link_2._item = member
for link in proxy._links_to_children:
for link_2 in link.item._links_to_parents:
if link_2.item == proxy:
link_2._item = member
proxy._links_to_children = member._links_to_children
proxy._links_to_parents = member._links_to_parents
class ItemCache(dict):
""" This class provides a cache of specification items. """
# pylint: disable=too-many-instance-attributes
def __init__(self,
config: Any,
post_process_load: Optional[Callable[[ItemMap], None]] = None,
is_item_enabled: Callable[[List[str], Item],
bool] = _is_item_enabled):
super().__init__()
self._cache_index: int = 0
self._cache_directory: str = os.path.abspath(
config.get("cache-directory", "cache"))
self._types: Set[str] = set()
self.items_by_type: Dict[str, List[Item]] = {}
self._updates = 0
for path in config["paths"]:
self.load_items(path)
if post_process_load:
post_process_load(self)
if config.get("initialize-links", True):
self.initialize_links()
spec_root = config["spec-type-root-uid"]
if spec_root:
self._root_type = _gather_spec_refinements(self[spec_root])
else:
self._root_type = None
self._enabled = config.get("enabled", [])
self._is_enabled = is_item_enabled
for item in self.values():
self.set_type(item)
item["_enabled"] = is_item_enabled(self._enabled, item)
if config.get("resolve-proxies", False):
self.resolve_proxies()
@property
def updates(self) -> bool:
"""
Returns true if the item cache updates occurred due to new, modified,
or removed files.
"""
return self._updates > 0
@property
def types(self) -> Set[str]:
""" Returns the types of the items. """
return self._types
@property
def enabled(self) -> List[str]:
""" Returns the enabled set. """
return self._enabled
def set_enabled(self,
enabled: List[str],
is_item_enabled: Callable[[List[str], Item],
bool] = _is_item_enabled):
"""
Sets the enabled status of all items according to the enabled set using
the is item enabled function.
"""
self._enabled = enabled
self._is_enabled = is_item_enabled
for item in self.values():
item["_enabled"] = is_item_enabled(enabled, item)
def resolve_proxies(
self,
is_link_enabled: Callable[[Link],
bool] = _is_link_enabled) -> None:
""" Resolves each proxy item to the its first enabled member. """
for item in self.items_by_type.get("proxy", []):
_resolve_proxy(item, is_link_enabled)
def add_volatile_item(self, uid: str, data: Any) -> Item:
"""
Adds an item with the specified data to the cache and returns it.
The item is not added to the persistent cache storage.
"""
item = self.create_item(uid, data)
item.init_parents(self)
item.init_children()
self.set_type(item)
item["_enabled"] = self._is_enabled(self._enabled, item)
return item
def add_volatile_item_from_file(self, uid: str, path: str) -> Item:
"""
Adds an item stored in the specified file to the cache and returns it.
The item is not added to the persistent cache storage.
"""
return self.add_volatile_item(uid, self.load_data(path, uid))
def create_item(self, uid: str, data: Any) -> Item:
"""
Creates an item for the UID with the data and adds it to the cache.
"""
item = Item(self, uid, data)
self[uid] = item
return item
def _load_items_in_dir(self, base: str, path: str, cache_file: str,
update_cache: bool) -> Set[str]:
data_by_uid: Dict[str, Any] = {}
if update_cache:
self._updates += 1
for name in os.listdir(path):
path2 = os.path.join(path, name)
if name.endswith(".yml") and not name.startswith("."):
uid = "/" + os.path.relpath(path2, base).replace(
".yml", "")
data_by_uid[uid] = _load_yaml_data(path2, uid)
os.makedirs(os.path.dirname(cache_file), exist_ok=True)
with open(cache_file, "wb") as out:
pickle.dump(data_by_uid, out)
else:
with open(cache_file, "rb") as pickle_src:
data_by_uid = pickle.load(pickle_src)
for uid, data in iter(data_by_uid.items()):
self.create_item(uid, data)
return set(data_by_uid.keys())
def _load_items_recursive(self, index: str, base: str, path: str,
cache_dir: str) -> Set[str]:
uids: Set[str] = set()
mid = os.path.abspath(path)
mid = mid.replace(os.path.commonpath([cache_dir, mid]), "").strip("/")
cache_file = os.path.join(cache_dir, index, mid, "spec.pickle")
try:
mtime = os.path.getmtime(cache_file)
update_cache = False
except FileNotFoundError:
update_cache = True
else:
update_cache = mtime <= os.path.getmtime(path)
for name in os.listdir(path):
path2 = os.path.join(path, name)
if name.endswith(".yml") and not name.startswith("."):
if not update_cache:
update_cache = mtime <= os.path.getmtime(path2)
elif stat.S_ISDIR(os.lstat(path2).st_mode):
uids.update(
self._load_items_recursive(index, base, path2, cache_dir))
uids.update(
self._load_items_in_dir(base, path, cache_file, update_cache))
return uids
def load_items(self, path: str) -> Set[str]:
""" Recursively loads the items in the directory path. """
index = self._cache_index
self._cache_index = index + 1
return self._load_items_recursive(str(index), path, path,
self._cache_directory)
def load_data(self, path: str, uid: str) -> Any:
""" Loads the item data from the file specified by path. """
return _load_yaml_data(path, uid)
def _save_data(self, file: TextIO, data: Any) -> None:
file.write(
yaml.dump(data, default_flow_style=False, allow_unicode=True))
def save_data(self, path: str, data: Any) -> None:
""" Saves the item data to the file specified by path. """
with open(path, "w", encoding="utf-8") as file:
data2 = {}
for key, value in data.items():
if not key.startswith("_"):
data2[key] = value
self._save_data(file, data2)
def initialize_links(self) -> None:
""" Initializes the links to parents and children. """
for item in self.values():
item.init_parents(self)
for item in sorted(self.values()):
item.init_children()
def set_type(self, item: Item) -> None:
""" Sets the type of the item. """
spec_type = self._root_type
value = item.data
path: List[str] = []
while spec_type is not None:
type_name = value[spec_type.key]
path.append(type_name)
spec_type = spec_type.refinements[type_name]
the_type = "/".join(path)
item["_type"] = the_type
self._types.add(the_type)
self.items_by_type.setdefault(the_type, []).append(item)
def set_types(self, root_type_uid: str) -> None:
"""
Sets the root type of the cache and then sets the type of all items.
"""
self._root_type = _gather_spec_refinements(self[root_type_uid])
for item in self.values():
self.set_type(item)
class EmptyItemCache(ItemCache):
""" This class provides a empty cache of specification items. """
def __init__(self):
super().__init__({
"cache-directory": ".",
"paths": [],
"spec-type-root-uid": None
})
class JSONItemCache(ItemCache):
""" This class provides a cache of specification items using JSON. """
def _load_json_items(self, base: str, path: str) -> Set[str]:
uids: Set[str] = set()
for name in os.listdir(path):
path2 = os.path.join(path, name)
if name.endswith(".json") and not name.startswith("."):
uid = "/" + os.path.relpath(path2, base).replace(".json", "")
self.create_item(uid, _load_json_data(path2, uid))
elif stat.S_ISDIR(os.lstat(path2).st_mode):
uids.update(self._load_json_items(base, path2))
return uids
def load_items(self, path: str) -> Set[str]:
return self._load_json_items(path, path)
def load_data(self, path: str, uid: str) -> Any:
return _load_json_data(path, uid)
def _save_data(self, file: TextIO, data: Any) -> None:
json.dump(data, file, sort_keys=True, indent=2)
class EmptyItem(Item):
""" Objects of this class represent empty items. """
def __init__(self):
super().__init__(EmptyItemCache(), "", {})