summaryrefslogblamecommitdiffstats
path: root/rtemsspec/items.py
blob: 49e084159538832795a74302c15bc09d74caffbd (plain) (tree)
1
2
3
4


                                                                   
                                                        





















                                                                             

                                
                                     
             
              

             
         
             
           

                                                                          
           

           




                                               







                                                               
                       
 








                                                                 
 
                           
                                                   
                                                     

 

                                                                    
                                                    



                        
                                                                    
                                              



                                                                   
                                                










                              

                                                                           







                                                     

 








                                                                     

                              
 











                                                                             


                                                        





                                               




                                                 




                                 
 



                                                
                

 






                                                                 






















                                                 

                                         
                            
                           
                                                                   

 






                                         



                                                                          




                                         
           
                                                                 

                                             
                                                                     
                          
                                
                                                             

                         

                                                
                                    
 












                                                                          


                                             


                                           


                                                        

                                   
                                              

                          


                                                    
                                      
 





                                                                       





                                            
             




                                                   



                                                                    








                                                                          




                                                                              

                                 




                                                                    




                                                                     
                                                           
 
                         



                                                                  
                                                              

                                               

                                         
                                   
                                               
                                                               
                              

                                               
                                                               


                              



                                                                  
                                                 
                                                                 
                           
 





                                                                          
                                                                     
                                                                               



                                   





                                                                        
                                                                          

                                                              


                                   
 
                          



                                                                  
                                                               

                                                

                                         
                                   
                                                
                                                               
                              

                                                
                                                               


                              



                                                                  
                                                  
                                                                  
                           
 





                                                                          
                                                                    

                                                                          



                                   





                                                                        
                                                                         

                                                               



                                   
                                                            

                                                                       






                                                                           
 

                                                                        
                                           

                                                                



                                                                 
                                            
                                                                
                                            
 
                                             









                                                                             
 
             




                                                                         




                              








                                             




                                             

                                                         
                                                    
 

                                                           
                                                                
 
 












                                                                          

                                                        
                                                                    

 

                                                                     
 
















                                                                        
                                
 







                                                
                 
                                                           
 
                                                            
                         
                                   
                           
                                                            
                                                            
 









                                            


                                                                              




                                                                              















                                                                             















                                                    







                                                  



                                                           


















                                                                               

                                                                               

                    



                                                                   
           

                                                                            
           
                                    




                                     
                       


                                   

                             
                      

                                               
             
                       
                
                                    
                                   
                                                               

                                                                     
                                                       
            
                                                                          



                                                                       
                                    

                                      

                                                    
                                                                          
                    
 



                                                        
           

                                                                            
           

                     









                                                                             
 
 





                                                                
                                        
                                                                            









                                                                         
                                                
                                                  
            
                                                           

                                                                      

                                           




                                             
                                                
                                                  










                                                                      



                                                             




                                                              


























                                                                             
                      
                                                               
 
                                                  

                             


                                                                               
                          


                                                     

                                                      
                         

                                    
                             
                                   
                                                
                                   




                                                                       

                                                 
                                  
                               
                                                                   

                                                
 
             







                                                                             



                                               














                                                                               
                                  

                                                             







                                                                         
                                                             
           
                                                                         


                                                              
                                          

                               
                           
                                                                

                   





                                                                              
                                                                     
 



                                                                           
                                    
                        

                   
                                                                       
                                                           
                                        
                        
                              


                                                                      

                                                                     
                                                                  

                                                                   

                                             
                                                      
                                                     
                                                   
                                       
                                      
 
                                                                     

                                                          
                                   
                                                                              
                                                                       
            
                                                


                                 

                                                          


                                                                  

                                                                   
                                                       




                                                                              
 
                                                


                                                                  

                                                                 


                                                                    


                                                          




                                                                          
                                                       





                                           

                                                              
                                  
                                   

                                          
 

                                           

                                   
                            



                                                        


                                 
                                                                
 







                                                                            


                                                                     
 
                       




                                      

 

                                                                          
 

                                                                 



                                                                             
                                                                  
                                                       

                                                               
 

                                                

                                                    


                                                          


                                                       

                                                        
 

                                                  
# SPDX-License-Identifier: BSD-2-Clause
""" This module provides specification items and an item cache. """

# Copyright (C) 2019, 2022 embedded brains GmbH & Co. KG
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions and the following disclaimer in the
#    documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

# pylint: disable=too-many-lines

from contextlib import contextmanager
import base64
import hashlib
import os
import pickle
import re
import string
import stat
from typing import Any, Callable, Dict, Iterable, Iterator, List, Match, \
    NamedTuple, Optional, Set, TextIO, Tuple, Union
import json
import yaml

try:
    from yaml import CSafeLoader as SafeLoader
except ImportError:  # pragma: no cover
    from yaml import SafeLoader  # type: ignore


class ItemGetValueContext(NamedTuple):
    """ Context used to get an item value. """
    item: "Item"
    path: str
    value: Any
    key: str
    index: Any  # should be int, but this triggers a mypy error
    args: Optional[str]

    def arg(self, name: str, value: Optional[str] = None) -> str:
        """ Get argument value by name. """
        args = dict(
            kv.split("=")  # type: ignore
            for kv in self.args.split(","))  # type: ignore
        if value:
            return args.get(name, value)
        return args[name]


ItemMap = Dict[str, "Item"]
ItemGetValue = Callable[[ItemGetValueContext], Any]
ItemGetValueMap = Dict[str, Tuple[ItemGetValue, Any]]


def _is_enabled_op_and(enabled: List[str], enabled_by: Any) -> bool:
    for next_enabled_by in enabled_by:
        if not is_enabled(enabled, next_enabled_by):
            return False
    return True


def _is_enabled_op_not(enabled: List[str], enabled_by: Any) -> bool:
    return not is_enabled(enabled, enabled_by)


def _is_enabled_op_or(enabled: List[str], enabled_by: Any) -> bool:
    for next_enabled_by in enabled_by:
        if is_enabled(enabled, next_enabled_by):
            return True
    return False


_IS_ENABLED_OP = {
    "and": _is_enabled_op_and,
    "not": _is_enabled_op_not,
    "or": _is_enabled_op_or
}


def is_enabled(enabled: List[str], enabled_by: Any) -> bool:
    """ Verifies if the given parameter is enabled by specific enables. """
    if isinstance(enabled_by, bool):
        return enabled_by
    if isinstance(enabled_by, list):
        return _is_enabled_op_or(enabled, enabled_by)
    if isinstance(enabled_by, dict):
        key, value = next(iter(enabled_by.items()))
        return _IS_ENABLED_OP[key](enabled, value)
    return enabled_by in enabled


def _str_representer(dumper, data):
    return dumper.represent_scalar("tag:yaml.org,2002:str",
                                   data,
                                   style="|" if "\n" in data else "")


yaml.add_representer(str, _str_representer)


class Link:
    """ A link to an item. """

    def __init__(self, item: "Item", data: Any):
        self._item = item
        self._data = data

    @classmethod
    def create(cls, link: "Link", item: "Item") -> "Link":
        """ Creates a link using an existing link with a new target item. """
        return cls(item, link._data)  # pylint: disable=protected-access

    def __getitem__(self, name: str) -> Any:
        return self._data[name]

    def __setitem__(self, key: str, value: Any) -> None:
        self._data[key] = value

    def __lt__(self, other: Any) -> bool:
        if not isinstance(other, Link):
            return NotImplemented
        # pylint: disable=protected-access
        return self._item.uid < other._item.uid

    @property
    def item(self) -> "Item":
        """ The item referenced by this link. """
        return self._item

    @property
    def role(self) -> str:
        """ The link role. """
        return self._data["role"]


def _get_value(ctx: ItemGetValueContext) -> Any:
    value = ctx.value[ctx.key]
    if ctx.index >= 0:
        return value[ctx.index]
    return value


def normalize_key_path(key_path: str, prefix: str = "") -> str:
    """ Normalizes the key path with an optional prefix path. """
    if not os.path.isabs(key_path):
        key_path = os.path.join(prefix, key_path)
    return os.path.normpath(key_path)


_TYPES = {
    type(True): "B".encode("utf-8"),
    type(1.0): "F".encode("utf-8"),
    type(1): "I".encode("utf-8"),
    type(None): "N".encode("utf-8"),
    type(""): "S".encode("utf-8"),
}


def _hash_data(data, state) -> None:
    if isinstance(data, list):
        for value in data:
            _hash_data(value, state)
    elif isinstance(data, dict):
        for key, value in sorted(data.items()):
            if not key.startswith("_"):
                state.update(key.encode("utf-8"))
                _hash_data(value, state)
    else:
        state.update(_TYPES[type(data)])
        state.update(str(data).encode("utf-8"))


def data_digest(data: Any) -> str:
    """ Returns a digest of the data. """
    state = hashlib.sha256()
    _hash_data(data, state)
    return base64.urlsafe_b64encode(state.digest()).decode("ascii")


_UID_TO_UPPER = re.compile(r"[/_-]+(.)")


def _match_to_upper(match: Match) -> str:
    return match.group(1).upper()


def _is_link_enabled(link: Link) -> bool:
    return link.item._data["_enabled"]  # pylint: disable=protected-access


def link_is_enabled(_link: Link) -> bool:
    """ Returns true. """
    return True


class Item:
    """ Objects of this class represent a specification item. """

    # pylint: disable=too-many-public-methods
    def __init__(self, item_cache: "ItemCache", uid: str, data: Any):
        data["_type"] = ""
        self._cache = item_cache
        self._ident = _UID_TO_UPPER.sub(_match_to_upper, uid)
        self._uid = uid
        self._data = data
        self._links_to_parents: List[Link] = []
        self._links_to_children: List[Link] = []
        self._resolved_proxy = False

    def __eq__(self, other: Any) -> bool:
        if not isinstance(other, Item):
            return NotImplemented
        return self._uid == other._uid  # pylint: disable=protected-access

    def __lt__(self, other: Any) -> bool:
        if not isinstance(other, Item):
            return NotImplemented
        return self._uid < other._uid  # pylint: disable=protected-access

    def __hash__(self) -> int:
        return hash(self._uid)

    def __contains__(self, key: str) -> bool:
        return key in self._data

    def __getitem__(self, key: str) -> Any:
        return self._data[key]

    def __setitem__(self, key: str, value: Any) -> None:
        self._data[key] = value

    @property
    def cache(self) -> "ItemCache":
        """ Returns the cache of the item. """
        return self._cache

    @property
    def digest(self) -> str:
        """ Returns the digest of the item data. """
        return data_digest(self._data)

    def get(self, key: str, default: Any) -> Any:
        """
        Gets the attribute value if the attribute exists, otherwise the
        specified default value is returned.
        """
        return self._data.get(key, default)

    @property
    def uid(self) -> str:
        """ Returns the UID of the item. """
        return self._uid

    @property
    def ident(self) -> str:
        """ Returns the identifier of the item. """
        return self._ident

    @property
    def spec(self) -> str:
        """ Returns the UID of the item with an URL-like format. """
        return f"spec:{self._uid}"

    @property
    def spec_2(self) -> str:
        """
        Returns the UID of the item with an URL-like format with invisible
        white space to allow line breaks.
        """
        uid = self._uid.replace("/", "/\u200b")
        return f"spec:{uid}"

    def to_abs_uid(self, abs_or_rel_uid: str) -> str:
        """
        Returns the absolute UID of an absolute UID or an UID relative to this
        item.
        """
        if abs_or_rel_uid == ".":
            return self._uid
        if os.path.isabs(abs_or_rel_uid):
            return abs_or_rel_uid
        return os.path.normpath(
            os.path.join(os.path.dirname(self.uid), abs_or_rel_uid))

    def map(self, abs_or_rel_uid: str) -> "Item":
        """
        Maps the absolute UID or the UID relative to this item to the
        corresponding item.
        """
        return self._cache[self.to_abs_uid(abs_or_rel_uid)]

    def links_to_parents(
        self,
        role: Optional[Union[str, Iterable[str]]] = None,
        is_link_enabled: Callable[[Link], bool] = _is_link_enabled
    ) -> Iterator[Link]:
        """ Yields the links to the parents of this items. """
        if role is None:
            for link in self._links_to_parents:
                if is_link_enabled(link):
                    yield link
        elif isinstance(role, str):
            for link in self._links_to_parents:
                if link.role == role and is_link_enabled(link):
                    yield link
        else:
            for link in self._links_to_parents:
                if link.role in role and is_link_enabled(link):
                    yield link

    def parents(
        self,
        role: Optional[Union[str, Iterable[str]]] = None,
        is_link_enabled: Callable[[Link], bool] = _is_link_enabled
    ) -> Iterator["Item"]:
        """ Yields the parents of this items. """
        for link in self.links_to_parents(role, is_link_enabled):
            yield link.item

    def parent(
            self,
            role: Optional[Union[str, Iterable[str]]] = None,
            index: Optional[int] = 0,
            is_link_enabled: Callable[[Link],
                                      bool] = _is_link_enabled) -> "Item":
        """ Returns the parent with the specified role and index. """
        for item_index, item in enumerate(self.parents(role, is_link_enabled)):
            if item_index == index:
                return item
        raise IndexError

    def parent_link(
            self,
            role: Optional[Union[str, Iterable[str]]] = None,
            index: Optional[int] = 0,
            is_link_enabled: Callable[[Link],
                                      bool] = _is_link_enabled) -> Link:
        """ Returns the parent link with the specified role and index. """
        for link_index, link in enumerate(
                self.links_to_parents(role, is_link_enabled)):
            if link_index == index:
                return link
        raise IndexError

    def links_to_children(
        self,
        role: Optional[Union[str, Iterable[str]]] = None,
        is_link_enabled: Callable[[Link], bool] = _is_link_enabled
    ) -> Iterator[Link]:
        """ Yields the links to the children of this items. """
        if role is None:
            for link in self._links_to_children:
                if is_link_enabled(link):
                    yield link
        elif isinstance(role, str):
            for link in self._links_to_children:
                if link.role == role and is_link_enabled(link):
                    yield link
        else:
            for link in self._links_to_children:
                if link.role in role and is_link_enabled(link):
                    yield link

    def children(
        self,
        role: Optional[Union[str, Iterable[str]]] = None,
        is_link_enabled: Callable[[Link], bool] = _is_link_enabled
    ) -> Iterator["Item"]:
        """ Yields the children of this items. """
        for link in self.links_to_children(role, is_link_enabled):
            yield link.item

    def child(
            self,
            role: Optional[Union[str, Iterable[str]]] = None,
            index: Optional[int] = 0,
            is_link_enabled: Callable[[Link],
                                      bool] = _is_link_enabled) -> "Item":
        """ Returns the child with the specified role and index. """
        for item_index, item in enumerate(self.children(role,
                                                        is_link_enabled)):
            if item_index == index:
                return item
        raise IndexError

    def child_link(
            self,
            role: Optional[Union[str, Iterable[str]]] = None,
            index: Optional[int] = 0,
            is_link_enabled: Callable[[Link],
                                      bool] = _is_link_enabled) -> Link:
        """ Returns the child link with the specified role and index. """
        for link_index, link in enumerate(
                self.links_to_children(role, is_link_enabled)):
            if link_index == index:
                return link
        raise IndexError

    def init_parents(self, item_cache: "ItemCache") -> None:
        """ Initializes the list of links to parents of this items. """
        for data in self._data["links"]:
            try:
                link = Link(item_cache[self.to_abs_uid(data["uid"])], data)
                self._links_to_parents.append(link)
            except KeyError as err:
                msg = (f"item '{self.uid}' links "
                       f"to non-existing item '{data['uid']}'")
                raise KeyError(msg) from err

    def init_children(self) -> None:
        """ Initializes the list of links to children of this items. """
        for link in self._links_to_parents:
            link.item.add_link_to_child(Link.create(link, self))

    def add_link_to_parent(self, link: Link):
        """ Adds the link as a parent item link to this item. """
        self._links_to_parents.append(link)

    def add_link_to_child(self, link: Link):
        """ Adds the link as a child item link to this item. """
        self._links_to_children.append(link)

    def is_enabled(self, enabled: List[str]):
        """
        Returns true if the item is enabled by the enabled set, otherwise
        returns false.
        """
        return is_enabled(enabled, self._data["enabled-by"])

    @property
    def enabled(self) -> bool:
        """ Returns true if the item is enabled, otherwise returns false. """
        return self._data["_enabled"]

    @property
    def resolved_proxy(self) -> bool:
        """ Is true if the item is a resolved proxy, otherwise false. """
        return self._resolved_proxy

    @property
    def data(self) -> Any:
        """ The item data. """
        return self._data

    @property
    def file(self) -> str:
        """ Returns the file of the item. """
        return self._data["_file"]

    @file.setter
    def file(self, value: str):
        """ Sets the file of the item. """
        self._data["_file"] = value

    @property
    def type(self) -> str:
        """ Returns the type of the item. """
        return self._data["_type"]

    def save(self):
        """ Saves the item to the corresponding file. """
        self._cache.save_data(self.file, self._data)

    def load(self):
        """ Loads the item from the corresponding file. """
        self._data = self._cache.load_data(self.file, self._uid)


def create_unique_link(child: Item, parent: Item, data: Any) -> None:
    """
    Creates a unique link from the child to the parent item and vice versa
    using the data for the link.
    """
    for item in parent.children(data["role"]):
        if item.uid == child.uid:
            break
    else:
        parent.add_link_to_child(Link(child, data))
        child.add_link_to_parent(Link(parent, data))


class ItemTemplate(string.Template):
    """ String template for item mapper identifiers. """
    idpattern = "[a-zA-Z0-9._/-]+:[\\[\\]a-zA-Z0-9._/-]+(:[^${}]*)?"


class _ItemMapperContext(dict):
    """ Context to map identifiers to items and attribute values. """

    def __init__(self, mapper: "ItemMapper", item: Optional[Item],
                 prefix: Optional[str], recursive: bool):
        super().__init__()
        self._mapper = mapper
        self._item = item
        self._prefix = prefix
        self._recursive = recursive

    def __getitem__(self, identifier):
        item, key_path, value = self._mapper.map(identifier, self._item,
                                                 self._prefix)
        if self._recursive:
            return self._mapper.substitute(value, item,
                                           os.path.dirname(key_path))
        return value


class _GetValueDictionary(dict):

    def __init__(self, get_value: ItemGetValue):
        super().__init__()
        self._get_value = get_value

    def get(self, _key, _default):
        return (self._get_value, {})


class ItemMapper:
    """ Maps identifiers to items and attribute values. """

    def __init__(self, item: Item, recursive: bool = False):
        self._item = item
        self._recursive = recursive
        self._prefix = [""]
        self._get_value_map: Dict[str, ItemGetValueMap] = {}
        self.copyrights_by_license: Dict[str, Set[str]] = {}

    @property
    def item(self) -> Item:
        """ The item of the mapper. """
        return self._item

    @item.setter
    def item(self, item: Item) -> None:
        """ Sets the item of the mapper. """
        self._item = item

    def _add_get_value_map(
            self, type_path_key: str, new_get_value_map: Tuple[ItemGetValue,
                                                               Dict]) -> None:
        type_name, path_key = type_path_key.split(":")
        keys = path_key.strip("/").split("/")
        get_value_map = self._get_value_map.setdefault(type_name, {})
        for key in keys[:-1]:
            _, get_value_map = get_value_map.setdefault(key, (_get_value, {}))
        get_value_map[keys[-1]] = new_get_value_map

    def add_get_value(self, type_path_key: str,
                      get_value: ItemGetValue) -> None:
        """
        Adds a get value for the specified type and key path.
        """
        self._add_get_value_map(type_path_key, (get_value, {}))

    def add_get_value_dictionary(self, type_path_key: str,
                                 get_value: ItemGetValue) -> None:
        """
        Adds a get value dictionary for the specified type and key path.
        """
        self._add_get_value_map(type_path_key,
                                (_get_value, _GetValueDictionary(get_value)))

    def push_prefix(self, prefix: str) -> None:
        """ Pushes a key path prefix. """
        self._prefix.append(prefix)

    def pop_prefix(self) -> None:
        """ Pops a key path prefix. """
        self._prefix.pop()

    @contextmanager
    def prefix(self, prefix: str) -> Iterator[None]:
        """ Opens a key path prefix context. """
        self.push_prefix(prefix)
        yield
        self.pop_prefix()

    @contextmanager
    def scope(self, item: Item) -> Iterator[None]:
        """ Opens an item scope context. """
        previous = self._item
        self._item = item
        yield
        self._item = previous

    def get_value_map(self, item: Item) -> ItemGetValueMap:
        """ Returns the get value map for the item. """
        return self._get_value_map.get(item.type, {})

    def _get_by_normalized_key_path(self, item: Item, normalized_key_path: str,
                                    args: Optional[str]) -> Any:
        """
        Gets the attribute value associated with the normalized key path.
        """
        get_value_map = self.get_value_map(item)
        path = "/"
        value = item.data
        for key in normalized_key_path.strip("/").split("/"):
            parts = key.split("[")
            try:
                index = int(parts[1].split("]")[0])
            except IndexError:
                index = -1
            ctx = ItemGetValueContext(item, path, value, parts[0], index, args)
            get_value, get_value_map = get_value_map.get(
                parts[0], (_get_value, {}))
            value = get_value(ctx)
            path = os.path.join(path, key)
        self.copyrights_by_license.setdefault(item["SPDX-License-Identifier"],
                                              set()).update(item["copyrights"])
        return value

    def map(self,
            identifier: str,
            item: Optional[Item] = None,
            prefix: Optional[str] = None) -> Tuple[Item, str, Any]:
        """
        Maps the identifier with item and prefix to the associated item, key
        path, and attribute value.
        """
        colon = identifier.find(":")
        uid = identifier[:colon]
        more = identifier[colon + 1:]
        colon = more.find(":")
        if colon < 0:
            key_path = more
            args = None
        else:
            key_path = more[:colon]
            args = more[colon + 1:]
        if item is None:
            item = self._item
        if uid == ".":
            if prefix is None:
                prefix = "/".join(self._prefix)
        else:
            prefix = ""
            try:
                item = item.map(uid)
            except KeyError as err:
                msg = (f"item '{uid}' relative to {item.spec} "
                       f"specified by '{identifier}' does not exist")
                raise ValueError(msg) from err
        key_path = normalize_key_path(key_path, prefix)
        try:
            value = self._get_by_normalized_key_path(item, key_path, args)
        except Exception as err:
            msg = (f"cannot get value for '{key_path}' of {item.spec} "
                   f"specified by '{identifier}'")
            raise ValueError(msg) from err
        return item, key_path, value

    def __getitem__(self, identifier):
        item, key_path, value = self.map(identifier)
        if self._recursive:
            return self.substitute(value, item, os.path.dirname(key_path))
        return value

    def substitute(self,
                   text: Optional[str],
                   item: Optional[Item] = None,
                   prefix: Optional[str] = None) -> str:
        """
        Performs a variable substitution using the item mapper with the item
        and prefix.
        """
        if not text:
            return ""
        try:
            context = _ItemMapperContext(self, item, prefix, self._recursive)
            return ItemTemplate(text).substitute(context)
        except Exception as err:
            spec = self._item.spec if item is None else item.spec
            if prefix is None:
                prefix = "/".join(self._prefix)
            msg = (f"substitution for {spec} using prefix '{prefix}' "
                   f"failed for text: {text}")
            raise ValueError(msg) from err


class _SpecType(NamedTuple):
    key: str
    refinements: Dict[str, Any]


def _gather_spec_refinements(item: Item) -> Optional[_SpecType]:
    new_type: Optional[_SpecType] = None
    for link in item._links_to_children:  # pylint: disable=protected-access
        if link.role == "spec-refinement":
            key = link["spec-key"]
            if new_type is None:
                new_type = _SpecType(key, {})
            assert new_type.key == key
            new_type.refinements[
                link["spec-value"]] = _gather_spec_refinements(link.item)
    return new_type


def _load_yaml_data(path: str, uid: str) -> Any:
    with open(path, "r", encoding="utf-8") as src:
        try:
            data = yaml.load(src.read(), Loader=SafeLoader)
        except yaml.YAMLError as err:
            msg = ("YAML error while loading specification item file "
                   f"'{path}': {str(err)}")
            raise IOError(msg) from err
        data["_file"] = os.path.abspath(path)
        data["_uid"] = uid
    return data


def _load_json_data(path: str, uid: str) -> Any:
    with open(path, "r", encoding="utf-8") as src:
        try:
            data = json.load(src)
        except json.JSONDecodeError as err:
            msg = ("JSON error while loading specification item file "
                   f"'{path}': {str(err)}")
            raise IOError(msg) from err
        data["_file"] = os.path.abspath(path)
        data["_uid"] = uid
    return data


def _is_item_enabled(enabled: List[str], item: Item) -> bool:
    return is_enabled(enabled, item["enabled-by"])


def item_is_enabled(_enabled: List[str], _item: Item) -> bool:
    """ Returns true. """
    return True


def _resolve_proxy(proxy: Item, is_link_enabled: Callable[[Link],
                                                          bool]) -> None:

    # pylint: disable=protected-access
    try:
        member = proxy.child("proxy-member", is_link_enabled=is_link_enabled)
    except IndexError:
        pass
    else:
        member._links_to_parents.extend(proxy._links_to_parents)
        member._links_to_children.extend(proxy._links_to_children)
        proxy._data = member._data
        proxy._ident = member._ident
        proxy._resolved_proxy = True
        proxy._uid = member._uid
        for link in proxy._links_to_parents:
            for link_2 in link.item._links_to_children:
                if link_2.item == proxy:
                    link_2._item = member
        for link in proxy._links_to_children:
            for link_2 in link.item._links_to_parents:
                if link_2.item == proxy:
                    link_2._item = member
        proxy._links_to_children = member._links_to_children
        proxy._links_to_parents = member._links_to_parents


class ItemCache(dict):
    """ This class provides a cache of specification items. """

    # pylint: disable=too-many-instance-attributes
    def __init__(self,
                 config: Any,
                 post_process_load: Optional[Callable[[ItemMap], None]] = None,
                 is_item_enabled: Callable[[List[str], Item],
                                           bool] = _is_item_enabled):
        super().__init__()
        self._cache_index: int = 0
        self._cache_directory: str = os.path.abspath(
            config.get("cache-directory", "cache"))
        self._types: Set[str] = set()
        self.items_by_type: Dict[str, List[Item]] = {}
        self._updates = 0
        for path in config["paths"]:
            self.load_items(path)
        if post_process_load:
            post_process_load(self)
        if config.get("initialize-links", True):
            self.initialize_links()
        spec_root = config["spec-type-root-uid"]
        if spec_root:
            self._root_type = _gather_spec_refinements(self[spec_root])
        else:
            self._root_type = None
        self._enabled = config.get("enabled", [])
        self._is_enabled = is_item_enabled
        for item in self.values():
            self.set_type(item)
            item["_enabled"] = is_item_enabled(self._enabled, item)
        if config.get("resolve-proxies", False):
            self.resolve_proxies()

    @property
    def updates(self) -> bool:
        """
        Returns true if the item cache updates occurred due to new, modified,
        or removed files.
        """
        return self._updates > 0

    @property
    def types(self) -> Set[str]:
        """ Returns the types of the items. """
        return self._types

    @property
    def enabled(self) -> List[str]:
        """ Returns the enabled set. """
        return self._enabled

    def set_enabled(self,
                    enabled: List[str],
                    is_item_enabled: Callable[[List[str], Item],
                                              bool] = _is_item_enabled):
        """
        Sets the enabled status of all items according to the enabled set using
        the is item enabled function.
        """
        self._enabled = enabled
        self._is_enabled = is_item_enabled
        for item in self.values():
            item["_enabled"] = is_item_enabled(enabled, item)

    def resolve_proxies(
            self,
            is_link_enabled: Callable[[Link],
                                      bool] = _is_link_enabled) -> None:
        """ Resolves each proxy item to the its first enabled member. """
        for item in self.items_by_type.get("proxy", []):
            _resolve_proxy(item, is_link_enabled)

    def add_volatile_item(self, uid: str, data: Any) -> Item:
        """
        Adds an item with the specified data to the cache and returns it.

        The item is not added to the persistent cache storage.
        """
        item = self.create_item(uid, data)
        item.init_parents(self)
        item.init_children()
        self.set_type(item)
        item["_enabled"] = self._is_enabled(self._enabled, item)
        return item

    def add_volatile_item_from_file(self, uid: str, path: str) -> Item:
        """
        Adds an item stored in the specified file to the cache and returns it.

        The item is not added to the persistent cache storage.
        """
        return self.add_volatile_item(uid, self.load_data(path, uid))

    def create_item(self, uid: str, data: Any) -> Item:
        """
        Creates an item for the UID with the data and adds it to the cache.
        """
        item = Item(self, uid, data)
        self[uid] = item
        return item

    def _load_items_in_dir(self, base: str, path: str, cache_file: str,
                           update_cache: bool) -> Set[str]:
        data_by_uid: Dict[str, Any] = {}
        if update_cache:
            self._updates += 1
            for name in os.listdir(path):
                path2 = os.path.join(path, name)
                if name.endswith(".yml") and not name.startswith("."):
                    uid = "/" + os.path.relpath(path2, base).replace(
                        ".yml", "")
                    data_by_uid[uid] = _load_yaml_data(path2, uid)
            os.makedirs(os.path.dirname(cache_file), exist_ok=True)
            with open(cache_file, "wb") as out:
                pickle.dump(data_by_uid, out)
        else:
            with open(cache_file, "rb") as pickle_src:
                data_by_uid = pickle.load(pickle_src)
        for uid, data in iter(data_by_uid.items()):
            self.create_item(uid, data)
        return set(data_by_uid.keys())

    def _load_items_recursive(self, index: str, base: str, path: str,
                              cache_dir: str) -> Set[str]:
        uids: Set[str] = set()
        mid = os.path.abspath(path)
        mid = mid.replace(os.path.commonpath([cache_dir, mid]), "").strip("/")
        cache_file = os.path.join(cache_dir, index, mid, "spec.pickle")
        try:
            mtime = os.path.getmtime(cache_file)
            update_cache = False
        except FileNotFoundError:
            update_cache = True
        else:
            update_cache = mtime <= os.path.getmtime(path)
        for name in os.listdir(path):
            path2 = os.path.join(path, name)
            if name.endswith(".yml") and not name.startswith("."):
                if not update_cache:
                    update_cache = mtime <= os.path.getmtime(path2)
            elif stat.S_ISDIR(os.lstat(path2).st_mode):
                uids.update(
                    self._load_items_recursive(index, base, path2, cache_dir))
        uids.update(
            self._load_items_in_dir(base, path, cache_file, update_cache))
        return uids

    def load_items(self, path: str) -> Set[str]:
        """ Recursively loads the items in the directory path. """
        index = self._cache_index
        self._cache_index = index + 1
        return self._load_items_recursive(str(index), path, path,
                                          self._cache_directory)

    def load_data(self, path: str, uid: str) -> Any:
        """ Loads the item data from the file specified by path. """
        return _load_yaml_data(path, uid)

    def _save_data(self, file: TextIO, data: Any) -> None:
        file.write(
            yaml.dump(data, default_flow_style=False, allow_unicode=True))

    def save_data(self, path: str, data: Any) -> None:
        """ Saves the item data to the file specified by path. """
        with open(path, "w", encoding="utf-8") as file:
            data2 = {}
            for key, value in data.items():
                if not key.startswith("_"):
                    data2[key] = value
            self._save_data(file, data2)

    def initialize_links(self) -> None:
        """ Initializes the links to parents and children. """
        for item in self.values():
            item.init_parents(self)
        for item in sorted(self.values()):
            item.init_children()

    def set_type(self, item: Item) -> None:
        """ Sets the type of the item. """
        spec_type = self._root_type
        value = item.data
        path: List[str] = []
        while spec_type is not None:
            type_name = value[spec_type.key]
            path.append(type_name)
            spec_type = spec_type.refinements[type_name]
        the_type = "/".join(path)
        item["_type"] = the_type
        self._types.add(the_type)
        self.items_by_type.setdefault(the_type, []).append(item)

    def set_types(self, root_type_uid: str) -> None:
        """
        Sets the root type of the cache and then sets the type of all items.
        """
        self._root_type = _gather_spec_refinements(self[root_type_uid])
        for item in self.values():
            self.set_type(item)


class EmptyItemCache(ItemCache):
    """ This class provides a empty cache of specification items. """

    def __init__(self):
        super().__init__({
            "cache-directory": ".",
            "paths": [],
            "spec-type-root-uid": None
        })


class JSONItemCache(ItemCache):
    """ This class provides a cache of specification items using JSON. """

    def _load_json_items(self, base: str, path: str) -> Set[str]:
        uids: Set[str] = set()
        for name in os.listdir(path):
            path2 = os.path.join(path, name)
            if name.endswith(".json") and not name.startswith("."):
                uid = "/" + os.path.relpath(path2, base).replace(".json", "")
                self.create_item(uid, _load_json_data(path2, uid))
            elif stat.S_ISDIR(os.lstat(path2).st_mode):
                uids.update(self._load_json_items(base, path2))
        return uids

    def load_items(self, path: str) -> Set[str]:
        return self._load_json_items(path, path)

    def load_data(self, path: str, uid: str) -> Any:
        return _load_json_data(path, uid)

    def _save_data(self, file: TextIO, data: Any) -> None:
        json.dump(data, file, sort_keys=True, indent=2)


class EmptyItem(Item):
    """ Objects of this class represent empty items. """

    def __init__(self):
        super().__init__(EmptyItemCache(), "", {})