diff options
author | Sebastian Huber <sebastian.huber@embedded-brains.de> | 2023-11-21 11:13:16 +0100 |
---|---|---|
committer | Sebastian Huber <sebastian.huber@embedded-brains.de> | 2023-11-21 11:15:25 +0100 |
commit | cd6cbe8792f038fc7a27d37c0804afa0a244eca9 (patch) | |
tree | 57fc96cefdf5b814db447ede3f9b8972c50b3d67 /rtemsspec/runactions.py | |
parent | ecb305c6fdf43944a9082870474d95041df7dcf0 (diff) |
runactions: New
Diffstat (limited to 'rtemsspec/runactions.py')
-rw-r--r-- | rtemsspec/runactions.py | 314 |
1 files changed, 314 insertions, 0 deletions
diff --git a/rtemsspec/runactions.py b/rtemsspec/runactions.py new file mode 100644 index 00000000..1da507cd --- /dev/null +++ b/rtemsspec/runactions.py @@ -0,0 +1,314 @@ +# SPDX-License-Identifier: BSD-2-Clause +""" This module provides a build step to run actions. """ + +# Copyright (C) 2022, 2023 embedded brains GmbH (http://www.embedded-brains.de) +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import copy +import os +import logging +from pathlib import Path +import shutil +import subprocess +from typing import Any, Dict, List, Optional, Union + +from rtemsspec.directorystate import DirectoryState +from rtemsspec.items import Item, ItemGetValueContext, is_enabled +from rtemsspec.packagebuild import BuildItem, PackageBuildDirector +from rtemsspec.util import copy_and_substitute, remove_empty_directories + + +def _env_clear(item: "RunActions", env: Dict, _action: Dict[str, str]) -> None: + logging.info("%s: env: clear", item.uid) + env.clear() + + +def _env_path_append(item: "RunActions", env: Dict, action: Dict[str, + str]) -> None: + name = action["name"] + value = action["value"] + logging.info("%s: env: append '%s' to %s", item.uid, value, name) + env[name] = f"{env[name]}:{value}" + + +def _env_path_prepend(item: "RunActions", env: Dict, + action: Dict[str, str]) -> None: + name = action["name"] + value = action["value"] + logging.info("%s: env: prepend '%s' to %s", item.uid, value, name) + env[name] = f"{value}:{env[name]}" + + +def _env_set(item: "RunActions", env: Dict, action: Dict[str, str]) -> None: + name = action["name"] + value = action["value"] + logging.info("%s: env: %s = '%s'", item.uid, name, value) + env[name] = value + + +def _env_unset(item: "RunActions", env: Dict, action: Dict[str, str]) -> None: + name = action["name"] + logging.info("%s: env: unset %s", item.uid, name) + del env[name] + + +_ENV_ACTIONS = { + "clear": _env_clear, + "path-append": _env_path_append, + "path-prepend": _env_path_prepend, + "set": _env_set, + "unset": _env_unset +} + + +def _get_host_processor_count(_ctx: ItemGetValueContext) -> str: + count = os.cpu_count() + return str(count if count is not None else 1) + + +class RunActions(BuildItem): + """ Runs actions. """ + + def __init__(self, director: PackageBuildDirector, item: Item): + super().__init__(director, item) + self.mapper.add_get_value(f"{self.item.type}:/host-processor-count", + _get_host_processor_count) + + def run(self): + for index, action in enumerate(self["actions"]): + action_type = action["action"] + logging.info("%s: run action %i: %s", self.uid, index, action_type) + if is_enabled(self.enabled_set, action["enabled-by"]): + output_name = action.get("output-name", None) + if output_name is None: + output = None + else: + try: + output = self.output(output_name) + except ValueError: + continue + RunActions._ACTIONS[action_type](self, action, output) + + def _copy_and_substitute(self, action: Dict, + output: Optional[DirectoryState]) -> None: + assert isinstance(output, DirectoryState) + input_state = self.input(action["input-name"]) + assert isinstance(input_state, DirectoryState) + source = action["source"] + source_base = input_state.directory + target_base = output.directory + if source is None: + prefix = action["target"] + if prefix is None: + prefix = "." + targets: List[str] = [] + for source_file in input_state: + tail = os.path.relpath(source_file, source_base) + target_file = os.path.join(target_base, prefix, tail) + targets.append(tail) + copy_and_substitute(source_file, target_file, self.mapper, + self.uid) + output.add_files(targets) + else: + source_file = os.path.join(source_base, source) + target = action["target"] + if target is None: + target_file = output.file + else: + output.add_files([target]) + target_file = os.path.join(target_base, target) + copy_and_substitute(source_file, target_file, self.mapper, + self.uid) + + def _create_ini_file(self, action: Dict, + output: Optional[DirectoryState]) -> None: + assert isinstance(output, DirectoryState) + target = action["target"] + if target is None: + target = output.file + else: + output.add_files([target]) + target = os.path.join(output.directory, target) + logging.info("%s: create: %s", self.uid, target) + os.makedirs(os.path.dirname(target), exist_ok=True) + with open(target, "w", encoding="utf-8") as dst: + for section in action["sections"]: + if not is_enabled(self.enabled_set, section["enabled-by"]): + continue + dst.write(f"[{section['name']}]\n") + for key_value in section["key-value-pairs"]: + if not is_enabled(self.enabled_set, + key_value["enabled-by"]): + continue + dst.write(f"{key_value['key']} = {key_value['value']}\n") + + def _directory_state_clear(self, _action: Dict, + output: Optional[DirectoryState]) -> None: + assert isinstance(output, DirectoryState) + output.clear() + + def _directory_state_add_files(self, action: Dict, + output: Optional[DirectoryState]) -> None: + assert isinstance(output, DirectoryState) + root = Path(action["path"]).absolute() + pattern = action["pattern"] + logging.info("%s: add files matching '%s' in: %s", self.uid, pattern, + root) + base = output.directory + output.add_files( + [os.path.relpath(path, base) for path in root.glob(pattern)]) + + def _directory_state_add_tarfile_members( + self, action: Dict, output: Optional[DirectoryState]) -> None: + assert isinstance(output, DirectoryState) + root = Path(action["search-path"]) + pattern = action["pattern"] + logging.info("%s: search for tarfiles matching '%s' in: %s", self.uid, + pattern, root) + for path in root.glob(pattern): + output.add_tarfile_members(path, action["prefix-path"], + action["extract"]) + + def _directory_state_tree_op(self, action: Dict, + output: Optional[DirectoryState], + tree_op: Any) -> None: + assert isinstance(output, DirectoryState) + root = Path(action["root"]).absolute() + prefix = action["prefix"] + if prefix is None: + prefix = "." + tree_op(output, root, prefix, action["excludes"]) + + def _directory_state_add_tree(self, action: Dict, + output: Optional[DirectoryState]) -> None: + self._directory_state_tree_op(action, output, DirectoryState.add_tree) + + def _directory_state_copy_tree(self, action: Dict, + output: Optional[DirectoryState]) -> None: + self._directory_state_tree_op(action, output, DirectoryState.copy_tree) + + def _directory_state_move_tree(self, action: Dict, + output: Optional[DirectoryState]) -> None: + self._directory_state_tree_op(action, output, DirectoryState.move_tree) + + def _process(self, action: Dict, + _output: Optional[DirectoryState]) -> None: + env: Union[Dict, None] = None + env_actions = action["env"] + if env_actions: + logging.info("%s: env: modify", self.uid) + env = copy.deepcopy(os.environ.copy()) + for env_action in env_actions: + _ENV_ACTIONS[env_action["action"]](self, env, env_action) + cmd = action["command"] + cwd = action["working-directory"] + logging.info("%s: run in '%s': %s", self.uid, cwd, + " ".join(f"'{i}'" for i in cmd)) + status = subprocess.run(cmd, env=env, check=False, cwd=cwd) + expected_return_code = action["expected-return-code"] + if expected_return_code is not None: + assert status.returncode == expected_return_code + + def _mkdir(self, action: Dict, _output: Optional[DirectoryState]) -> None: + path = Path(action["path"]) + logging.info("%s: make directory: %s", self.uid, path) + path.mkdir(parents=action["parents"], exist_ok=action["exist-ok"]) + + def _remove_path(self, path: Path) -> None: + if path.is_dir(): + logging.info("%s: remove directory: %s", self.uid, path) + path.rmdir() + else: + logging.info("%s: unlink file: %s", self.uid, path) + path.unlink() + + def _remove(self, action: Dict, _output: Optional[DirectoryState]) -> None: + path = Path(action["path"]) + if action["missing-ok"]: + try: + self._remove_path(path) + except FileNotFoundError: + pass + else: + self._remove_path(path) + + def _remove_empty_directories(self, action: Dict, + _output: Optional[DirectoryState]) -> None: + remove_empty_directories(self.uid, action["path"]) + + def _remove_glob(self, action: Dict, + _output: Optional[DirectoryState]) -> None: + root = Path(action["path"]) + for pattern in action["patterns"]: + logging.info( + "%s: remove files and directories matching with '%s' in: %s", + self.uid, pattern, root) + for path in root.glob(pattern): + if path.is_dir(): + if action["remove-tree"]: + logging.info("%s: remove directory tree: %s", self.uid, + path) + shutil.rmtree(path) + else: + logging.info("%s: remove directory: %s", self.uid, + path) + path.rmdir() + else: + logging.info("%s: remove file: %s", self.uid, path) + path.unlink() + + def _remove_tree(self, action: Dict, + _output: Optional[DirectoryState]) -> None: + path = action["path"] + logging.info("%s: remove directory tree: %s", self.uid, path) + if action["missing-ok"]: + try: + shutil.rmtree(path) + except FileNotFoundError: + pass + else: + shutil.rmtree(path) + + def _touch(self, action: Dict, _output: Optional[DirectoryState]) -> None: + path = Path(action["path"]) + logging.info("%s: touch file: %s", self.uid, path) + path.touch(exist_ok=action["exist-ok"]) + + _ACTIONS = { + "copy-and-substitute": _copy_and_substitute, + "create-ini-file": _create_ini_file, + "directory-state-add-files": _directory_state_add_files, + "directory-state-add-tarfile-members": + _directory_state_add_tarfile_members, + "directory-state-add-tree": _directory_state_add_tree, + "directory-state-clear": _directory_state_clear, + "directory-state-copy-tree": _directory_state_copy_tree, + "directory-state-move-tree": _directory_state_move_tree, + "mkdir": _mkdir, + "remove": _remove, + "remove-empty-directories": _remove_empty_directories, + "remove-glob": _remove_glob, + "remove-tree": _remove_tree, + "subprocess": _process, + "touch": _touch + } |