Skip to content

kraken.common

kraken.common

AsciiTable

A very simple ASCII table formatter.

Supports correctly calculating the width of cells even if they are already ANSI formatted.

Source code in kraken/common/_asciitable.py
class AsciiTable:
    """
    A very simple ASCII table formatter.

    Supports correctly calculating the width of cells even if they are already ANSI formatted.
    """

    #: A list of the header text cells to display at the top of the table.
    headers: list[str]

    #: A list of rows to display in the table. Note that each row should have at least as many elements as
    #: :attr:`headers`, otherwise you will face an :class:`IndexError` in :meth:`print`.
    rows: list[Sequence[str]]

    def __init__(self) -> None:
        self.headers = []
        self.rows = []

    def __iter__(self) -> Iterator[Sequence[str]]:
        yield self.headers
        yield from self.rows

    def print(self, fp: "TextIO | None" = None) -> None:
        widths = [
            max(len(REGEX_ANSI_ESCAPE.sub("", row[col_idx])) for row in self) for col_idx in range(len(self.headers))
        ]
        for row_idx, row in enumerate(self):
            if row_idx == 0:
                row = [colored(x.ljust(widths[col_idx]), attrs=["bold"]) for col_idx, x in enumerate(row)]
            else:
                row = [x.ljust(widths[col_idx]) for col_idx, x in enumerate(row)]
            if row_idx == 1:
                print("  ".join("-" * widths[idx] for idx in range(len(row))), file=fp)
            print("  ".join(row[idx].ljust(widths[idx]) for idx in range(len(row))), file=fp)

BuildscriptMetadata dataclass

Metadata for a Kraken build and its runtime environment.

Source code in kraken/common/_buildscript.py
@dataclass
class BuildscriptMetadata:
    """
    Metadata for a Kraken build and its runtime environment.
    """

    index_url: "str | None" = None
    extra_index_urls: list[str] = field(default_factory=list)
    requirements: list[str] = field(default_factory=list)
    additional_sys_paths: list[str] = field(default_factory=list)
    interpreter_constraint: str | None = None

    def requires(self, requirement: str) -> None:
        self.requirements.append(requirement)

    def extra_index_url(self, url: str) -> None:
        self.extra_index_urls.append(url)

    def additional_sys_path(self, path: str) -> None:
        self.additional_sys_paths.append(path)

    @staticmethod
    @contextmanager
    def capture() -> Iterator["Future[BuildscriptMetadata]"]:
        """
        A context manager that will ensure calling :func:`buildscript` will raise a
        :class:`BuildscriptMetadataException` and catch that exception to return the metadata.

        This is used to retrieve the metadata in Kraken wrapper.
        """

        future: "Future[BuildscriptMetadata]" = Future()
        _global.mode = _Mode.RAISE
        try:
            yield future
        except BuildscriptMetadataException as exc:
            future.set_result(exc.metadata)
        else:
            exception = RuntimeError("No KrakenMetadataException was raised, did metadata() get called?")
            future.set_exception(exception)
            raise exception
        finally:
            _global.mode = _Mode.PASSTHROUGH

    @staticmethod
    @contextmanager
    def callback(func: Callable[["BuildscriptMetadata"], Any]) -> Iterator[None]:
        """
        A context manager that will ensure calling the given *func* after :func:`buildscript` is run.

        This is used to retrieve and react upon the metadata in the Kraken build system.
        """

        _global.mode = _Mode.CALLBACK
        _global.func = func
        try:
            yield
        finally:
            _global.mode = _Mode.PASSTHROUGH
            _global.func = None
callback staticmethod
callback(
    func: Callable[[BuildscriptMetadata], Any]
) -> Iterator[None]

A context manager that will ensure calling the given func after :func:buildscript is run.

This is used to retrieve and react upon the metadata in the Kraken build system.

Source code in kraken/common/_buildscript.py
@staticmethod
@contextmanager
def callback(func: Callable[["BuildscriptMetadata"], Any]) -> Iterator[None]:
    """
    A context manager that will ensure calling the given *func* after :func:`buildscript` is run.

    This is used to retrieve and react upon the metadata in the Kraken build system.
    """

    _global.mode = _Mode.CALLBACK
    _global.func = func
    try:
        yield
    finally:
        _global.mode = _Mode.PASSTHROUGH
        _global.func = None
capture staticmethod
capture() -> Iterator[Future[BuildscriptMetadata]]

A context manager that will ensure calling :func:buildscript will raise a :class:BuildscriptMetadataException and catch that exception to return the metadata.

This is used to retrieve the metadata in Kraken wrapper.

Source code in kraken/common/_buildscript.py
@staticmethod
@contextmanager
def capture() -> Iterator["Future[BuildscriptMetadata]"]:
    """
    A context manager that will ensure calling :func:`buildscript` will raise a
    :class:`BuildscriptMetadataException` and catch that exception to return the metadata.

    This is used to retrieve the metadata in Kraken wrapper.
    """

    future: "Future[BuildscriptMetadata]" = Future()
    _global.mode = _Mode.RAISE
    try:
        yield future
    except BuildscriptMetadataException as exc:
        future.set_result(exc.metadata)
    else:
        exception = RuntimeError("No KrakenMetadataException was raised, did metadata() get called?")
        future.set_exception(exception)
        raise exception
    finally:
        _global.mode = _Mode.PASSTHROUGH

BuildscriptMetadataException

Bases: BaseException

This exception is raised by the :func:metadata function.

Source code in kraken/common/_buildscript.py
class BuildscriptMetadataException(BaseException):
    """
    This exception is raised by the :func:`metadata` function.
    """

    def __init__(self, metadata: BuildscriptMetadata) -> None:
        self.metadata = metadata

    def __str__(self) -> str:
        return (
            "If you are seeing this message, something has gone wrong with catching the exception. This "
            "exception is used to abort and transfer Kraken metadata to the caller."
        )

ColorOptions dataclass

Adds a --no-color option to the argument parser. Use [init_color] to monkey-patch the [termcolor] module to force color output unless the --no-color option is set. This ensures we have colored output even in CI environments by default.

Source code in kraken/common/_option_sets.py
@dataclass
class ColorOptions:
    """
    Adds a `--no-color` option to the argument parser. Use [init_color] to monkey-patch the [termcolor] module
    to force color output unless the `--no-color` option is set. This ensures we have colored output even in CI
    environments by default.
    """

    no_color: bool

    _termcolor_monkeypatched: ClassVar[bool] = False

    @staticmethod
    def add_to_parser(parser: argparse.ArgumentParser) -> None:
        parser.add_argument(
            "--no-color",
            dest="no_color",
            action="store_true",
            help="disable colored output",
        )

    @staticmethod
    def collect(args: argparse.Namespace) -> "ColorOptions":
        return ColorOptions(
            no_color=args.no_color,
        )

    def init_color(self) -> None:
        from kraken.common import _colored

        if self.no_color:
            _colored.COLORS_ENABLED = False

CurrentDirectoryProjectFinder

Bases: ProjectFinder

Goes through a list of script finders and returns the first one matching.

Source code in kraken/common/_runner.py
class CurrentDirectoryProjectFinder(ProjectFinder):
    """
    Goes through a list of script finders and returns the first one matching.
    """

    def __init__(self, script_runners: Iterable[ScriptRunner]) -> None:
        self.script_runners = list(script_runners)

    def find_project(self, directory: Path) -> "ProjectInfo | None":
        for runner in self.script_runners:
            script = runner.find_script(directory)
            if script is not None:
                return ProjectInfo(script, runner)

        return None

    @classmethod
    def default(cls) -> "CurrentDirectoryProjectFinder":
        """
        Returns the default instance that contains the known :class:`ScriptRunner` implementations.
        """

        return cls([PythonScriptRunner()])
default classmethod

Returns the default instance that contains the known :class:ScriptRunner implementations.

Source code in kraken/common/_runner.py
@classmethod
def default(cls) -> "CurrentDirectoryProjectFinder":
    """
    Returns the default instance that contains the known :class:`ScriptRunner` implementations.
    """

    return cls([PythonScriptRunner()])

EnvironmentType

Bases: Enum

This enumeration describes the type of environment that is being used to run Kraken in.

Source code in kraken/common/_environment.py
class EnvironmentType(enum.Enum):
    """
    This enumeration describes the type of environment that is being used to run Kraken in.
    """

    #: This enum reflects that Kraken was run directly, without being invoked through Kraken wrapper.
    NATIVE = 0

    #: Wrapper, using a virtual environment.
    VENV = 1

    #: Use the new shiny `uv` package manager.
    UV = 2

    def is_wrapped(self) -> bool:
        """Whether the environment is managed by Kraken-wrapper."""
        return self != EnvironmentType.NATIVE

    @staticmethod
    def get(environ: Mapping[str, str]) -> "EnvironmentType":
        value = environ.get(KRAKEN_ENVIRONMENT_TYPE_VARIABLE, EnvironmentType.NATIVE.name)
        try:
            return EnvironmentType(value)
        except ValueError:
            raise RuntimeError(
                f"The value of environment variable {KRAKEN_ENVIRONMENT_TYPE_VARIABLE}={value!r} is invalid, "
                f"valid values are {', '.join(x.name for x in EnvironmentType)}. The most likely cause of this "
                "error is that the version of kraken-wrapper and kraken-build are incompatible."
            )

    def set(self, environ: MutableMapping[str, str]) -> None:
        environ[KRAKEN_ENVIRONMENT_TYPE_VARIABLE] = self.name
is_wrapped
is_wrapped() -> bool

Whether the environment is managed by Kraken-wrapper.

Source code in kraken/common/_environment.py
def is_wrapped(self) -> bool:
    """Whether the environment is managed by Kraken-wrapper."""
    return self != EnvironmentType.NATIVE

GitAwareProjectFinder

Bases: ProjectFinder

Finds the root of a project by picking the highest-up build script that does not cross a Git repository boundary or a home directory boundary. Starts from a directory and works it's way up until a stop condition is encountered.

If any build script contains the string # ::krakenw-root, then the directory containing that build script is considered the root of the project. This is useful for projects that have multiple build scripts in different directories, but they should not be considered part of the same project.

Source code in kraken/common/_runner.py
class GitAwareProjectFinder(ProjectFinder):
    """
    Finds the root of a project by picking the highest-up build script that does not cross a Git repository boundary
    or a home directory boundary. Starts from a directory and works it's way up until a stop condition is encountered.

    If any build script contains the string `# ::krakenw-root`, then the directory containing that build script is
    considered the root of the project. This is useful for projects that have multiple build scripts in different
    directories, but they should not be considered part of the same project.
    """

    def __init__(self, delegate: ProjectFinder, home_boundary: "Path | None | NotSet" = None) -> None:
        """
        :param delegate: The project finder to delegate to in any of the directories that this class
            looks through. The first project returned by this finder in any of the directories is used.
        :param home_boundary: A directory which contains a boundary that should not be crossed when
            searching for projects. For example, if this is `/home/foo`, then this class will stop searching
            for projects as soon as it reaches `/home/foo` or any sibling directory (such as `/home/bar`).
            If a path does not live within the home boundary or any of its siblings, the boundary is not
            taken into account.

            When the parameter is set to :const:`NotSet.Value`, it will default to the user's home directory.
            The value can be set to :const:`None` to disable the home boundary check all together.
        """
        self.delegate = delegate
        self.home_boundary = (
            Path("~").expanduser().parent.absolute() if home_boundary is NotSet.Value else home_boundary
        )

    def find_project(self, directory: Path) -> "ProjectInfo | None":
        highest_script: "ProjectInfo | None" = None
        directory = directory.absolute()
        while directory != Path(directory.root):
            # If we're in any directory that could be a home directory, we stop searching.
            if directory.parent == self.home_boundary:
                break

            script = self.delegate.find_project(directory)
            if script is not None:
                highest_script = script
                if script.script.read_text().find("# ::krakenw-root") != -1:
                    break

            # If in the next loop we would cross a Git repository boundary, we stop searching.
            if (directory / ".git").exists():
                break

            directory = directory.parent

        return highest_script

    @classmethod
    def default(cls) -> "GitAwareProjectFinder":
        """
        Returns the default instance that contains a default :class:`CurrentDirectoryProjectFinder`.
        """

        return cls(CurrentDirectoryProjectFinder.default())
__init__
__init__(
    delegate: ProjectFinder,
    home_boundary: Path | None | NotSet = None,
) -> None

:param delegate: The project finder to delegate to in any of the directories that this class looks through. The first project returned by this finder in any of the directories is used. :param home_boundary: A directory which contains a boundary that should not be crossed when searching for projects. For example, if this is /home/foo, then this class will stop searching for projects as soon as it reaches /home/foo or any sibling directory (such as /home/bar). If a path does not live within the home boundary or any of its siblings, the boundary is not taken into account.

When the parameter is set to :const:`NotSet.Value`, it will default to the user's home directory.
The value can be set to :const:`None` to disable the home boundary check all together.
Source code in kraken/common/_runner.py
def __init__(self, delegate: ProjectFinder, home_boundary: "Path | None | NotSet" = None) -> None:
    """
    :param delegate: The project finder to delegate to in any of the directories that this class
        looks through. The first project returned by this finder in any of the directories is used.
    :param home_boundary: A directory which contains a boundary that should not be crossed when
        searching for projects. For example, if this is `/home/foo`, then this class will stop searching
        for projects as soon as it reaches `/home/foo` or any sibling directory (such as `/home/bar`).
        If a path does not live within the home boundary or any of its siblings, the boundary is not
        taken into account.

        When the parameter is set to :const:`NotSet.Value`, it will default to the user's home directory.
        The value can be set to :const:`None` to disable the home boundary check all together.
    """
    self.delegate = delegate
    self.home_boundary = (
        Path("~").expanduser().parent.absolute() if home_boundary is NotSet.Value else home_boundary
    )
default classmethod
default() -> GitAwareProjectFinder

Returns the default instance that contains a default :class:CurrentDirectoryProjectFinder.

Source code in kraken/common/_runner.py
@classmethod
def default(cls) -> "GitAwareProjectFinder":
    """
    Returns the default instance that contains a default :class:`CurrentDirectoryProjectFinder`.
    """

    return cls(CurrentDirectoryProjectFinder.default())

LocalRequirement dataclass

Bases: Requirement

Represents a requirement on a local project on the filesystem.

The string format of a local requirement is name@path. The name must match the distribution name.

Source code in kraken/common/_requirements.py
@dataclasses.dataclass(frozen=True)
class LocalRequirement(Requirement):
    """Represents a requirement on a local project on the filesystem.

    The string format of a local requirement is `name@path`. The `name` must match the distribution name."""

    name: str
    path: Path

    def __post_init__(self) -> None:
        if not self.name:
            raise ValueError(f"invalid requirement: {self}")

    def __str__(self) -> str:
        return f"{self.name} @ {self.path}"

    def to_args(self, base_dir: Path) -> list[str]:
        return [str((base_dir / self.path if base_dir else self.path).absolute())]

PipRequirement dataclass

Bases: Requirement

Represents a Pip requriement.

Source code in kraken/common/_requirements.py
@dataclasses.dataclass(frozen=True)
class PipRequirement(Requirement):
    """Represents a Pip requriement."""

    name: str
    spec: str | None

    def __post_init__(self) -> None:
        if not self.name:
            raise ValueError(f"invalid URL requirement: {self}")

    def __str__(self) -> str:
        return f"{self.name}{self.spec or ''}"

    def to_args(self, base_dir: Path) -> list[str]:
        return [str(self)]

ProjectFinder

Bases: ABC

Base class for finding a Kraken project starting from any directory.

Source code in kraken/common/_runner.py
class ProjectFinder(ABC):
    """
    Base class for finding a Kraken project starting from any directory.
    """

    @abstractmethod
    def find_project(self, directory: Path) -> "ProjectInfo | None": ...

PythonScriptRunner

Bases: ScriptPicker

A finder and runner for Python based Kraken build scripts called .kraken.py.

Note

We can't call the script kraken.py (without the leading dot), as otherwise under most circumstances the script will try to import itself when doing import kraken or from kraken import ....

Source code in kraken/common/_runner.py
class PythonScriptRunner(ScriptPicker):
    """
    A finder and runner for Python based Kraken build scripts called `.kraken.py`.

    !!! note

        We can't call the script `kraken.py` (without the leading dot), as otherwise under most circumstances the
        script will try to import itself when doing `import kraken` or `from kraken import ...`.
    """

    def __init__(self, filenames: Sequence[str] = (".kraken.py",)) -> None:
        super().__init__(filenames)

    def execute_script(self, script: Path, scope: dict[str, Any]) -> None:
        module = types.ModuleType(str(script.parent))
        module.__file__ = str(script)

        code = compile(script.read_text(), script, "exec")
        exec(code, vars(module))

    def has_buildscript_call(self, script: Path) -> bool:
        code = script.read_text()
        if not re.search(r"^from kraken.common import buildscript", code, re.M):
            return False
        if not re.search(r"^buildscript\s*\(", code, re.M):
            return False
        return True

    def get_buildscript_call_recommendation(self, metadata: BuildscriptMetadata) -> str:
        code = "from kraken.common import buildscript\nbuildscript("
        if metadata.index_url:
            code += f"\n    index_url={metadata.index_url!r},"
        if metadata.extra_index_urls:
            if len(metadata.extra_index_urls) == 1:
                code += f"\n    extra_index_urls={metadata.extra_index_urls!r},"
            else:
                code += "\n    extra_index_urls=["
                for url in metadata.extra_index_urls:
                    code += f"\n        {url!r},"
                code += "\n    ],"
        if metadata.requirements:
            if sum(map(len, metadata.requirements)) < 50:
                code += f"\n    requirements={metadata.requirements!r},"
            else:
                code += "\n    requirements=["
                for req in metadata.requirements:
                    code += f"\n        {req!r},"
                code += "\n    ],"
        if not code.endswith("("):
            code += "\n"
        code += ")"
        return code

Requirement

Bases: ABC

Source code in kraken/common/_requirements.py
class Requirement(abc.ABC):
    name: str  #: The distribution name.

    @abc.abstractmethod
    def to_args(self, base_dir: Path) -> list[str]:
        """Convert the requirement to Pip args."""

        raise NotImplementedError
to_args abstractmethod
to_args(base_dir: Path) -> list[str]

Convert the requirement to Pip args.

Source code in kraken/common/_requirements.py
@abc.abstractmethod
def to_args(self, base_dir: Path) -> list[str]:
    """Convert the requirement to Pip args."""

    raise NotImplementedError

RequirementSpec dataclass

Represents the requirements for a kraken build script.

Source code in kraken/common/_requirements.py
@dataclasses.dataclass(frozen=True)
class RequirementSpec:
    """Represents the requirements for a kraken build script."""

    requirements: tuple[Requirement, ...]
    index_url: "str | None" = None
    extra_index_urls: tuple[str, ...] = ()
    interpreter_constraint: "str | None" = None
    pythonpath: tuple[str, ...] = ()

    def __post_init__(self) -> None:
        for req in self.requirements:
            assert isinstance(req, Requirement), type(req)

    def __eq__(self, other: Any) -> bool:
        # NOTE (@NiklasRosenstein): packaging.requirements.Requirement is not properly equality comparable, so
        #       we implement a custom comparison based on the hash digest.
        if isinstance(other, RequirementSpec):
            return (type(self), self.to_hash()) == (type(other), other.to_hash())
        return False

    def with_requirements(self, reqs: Iterable["str | Requirement"]) -> "RequirementSpec":
        """Adds the given requirements and returns a new instance."""

        requirements = list(self.requirements)
        for req in reqs:
            if isinstance(req, str):
                req = parse_requirement(req)
            requirements.append(req)

        return self.replace(requirements=tuple(requirements))

    def with_pythonpath(self, path: Iterable[str]) -> "RequirementSpec":
        """Adds the given pythonpath and returns a new instance."""

        return self.replace(pythonpath=(*self.pythonpath, *path))

    def replace(
        self,
        requirements: "Iterable[Requirement] | None" = None,
        index_url: "str | None | NotSet" = NotSet.Value,
        extra_index_urls: "Iterable[str] | None" = None,
        interpreter_constraint: "str | None | NotSet" = NotSet.Value,
        pythonpath: "Iterable[str] | None" = None,
    ) -> "RequirementSpec":
        return RequirementSpec(
            requirements=self.requirements if requirements is None else tuple(requirements),
            index_url=self.index_url if index_url is NotSet.Value else index_url,
            extra_index_urls=self.extra_index_urls if extra_index_urls is None else tuple(extra_index_urls),
            interpreter_constraint=self.interpreter_constraint
            if interpreter_constraint is NotSet.Value
            else interpreter_constraint,
            pythonpath=self.pythonpath if pythonpath is None else tuple(pythonpath),
        )

    @staticmethod
    def from_json(data: dict[str, Any]) -> "RequirementSpec":
        return RequirementSpec(
            requirements=tuple(parse_requirement(x) for x in data["requirements"]),
            index_url=data.get("index_url"),
            extra_index_urls=tuple(data.get("extra_index_urls", ())),
            interpreter_constraint=data.get("interpreter_constraint"),
            pythonpath=tuple(data.get("pythonpath", ())),
        )

    def to_json(self) -> dict[str, Any]:
        result: dict[str, Any] = {"requirements": [str(x) for x in self.requirements], "pythonpath": self.pythonpath}
        if self.index_url is not None:
            result["index_url"] = self.index_url
        if self.extra_index_urls:
            result["extra_index_urls"] = self.extra_index_urls
        if self.interpreter_constraint:
            result["interpreter_constraint"] = self.interpreter_constraint
        return result

    @staticmethod
    def from_args(args: list[str]) -> "RequirementSpec":
        """Parses the arguments using :mod:`argparse` as if they are Pip install arguments.

        :raise ValueError: If an invalid argument is encountered."""

        parser = argparse.ArgumentParser()
        parser.add_argument("packages", nargs="*")
        parser.add_argument("--index-url")
        parser.add_argument("--extra-index-url", action="append")
        parser.add_argument("--interpreter-constraint")
        parsed, unknown = parser.parse_known_args(args)
        if unknown:
            raise ValueError(f"encountered unknown arguments in requirements: {unknown}")

        return RequirementSpec(
            requirements=tuple(parse_requirement(x) for x in parsed.packages or []),
            index_url=parsed.index_url,
            extra_index_urls=tuple(parsed.extra_index_url or ()),
            interpreter_constraint=parsed.interpreter_constraint,
        )

    def to_args(
        self,
        base_dir: Path = Path("."),
        with_options: bool = True,
        with_requirements: bool = True,
    ) -> list[str]:
        """Converts the requirements back to Pip install arguments.

        :param base_dir: The base directory that relative :class:`LocalRequirement`s should be considered relative to.
        :param with_requirements: Can be set to `False` to not return requirements in the argument, just the index URLs.
        """

        args = []
        if with_options and self.index_url:
            args += ["--index-url", self.index_url]
        if with_options:
            for url in self.extra_index_urls:
                args += ["--extra-index-url", url]
        if with_requirements:
            args += flatten(req.to_args(base_dir) for req in self.requirements)
        return args

    def to_hash(self, algorithm: str = "sha256") -> str:
        """Hash the requirements spec to a hexdigest."""

        hash_parts = [str(req) for req in self.requirements] + ["::pythonpath"] + list(self.pythonpath)
        hash_parts += ["::interpreter_constraint", self.interpreter_constraint or ""]
        return hashlib.new(algorithm, ":".join(hash_parts).encode()).hexdigest()

    @classmethod
    def from_metadata(cls, metadata: BuildscriptMetadata) -> "RequirementSpec":
        return RequirementSpec(
            requirements=tuple(map(parse_requirement, metadata.requirements)),
            index_url=metadata.index_url,
            extra_index_urls=tuple(metadata.extra_index_urls),
            interpreter_constraint=metadata.interpreter_constraint or DEFAULT_INTERPRETER_CONSTRAINT,
            pythonpath=tuple(metadata.additional_sys_paths) + (DEFAULT_BUILD_SUPPORT_FOLDER,),
        )

    def to_metadata(self) -> BuildscriptMetadata:
        return BuildscriptMetadata(
            index_url=self.index_url,
            extra_index_urls=list(self.extra_index_urls),
            requirements=[str(x) for x in self.requirements],
            additional_sys_paths=[x for x in self.pythonpath if x != DEFAULT_BUILD_SUPPORT_FOLDER],
            interpreter_constraint=DEFAULT_INTERPRETER_CONSTRAINT,
        )
from_args staticmethod
from_args(args: list[str]) -> RequirementSpec

Parses the arguments using :mod:argparse as if they are Pip install arguments.

:raise ValueError: If an invalid argument is encountered.

Source code in kraken/common/_requirements.py
@staticmethod
def from_args(args: list[str]) -> "RequirementSpec":
    """Parses the arguments using :mod:`argparse` as if they are Pip install arguments.

    :raise ValueError: If an invalid argument is encountered."""

    parser = argparse.ArgumentParser()
    parser.add_argument("packages", nargs="*")
    parser.add_argument("--index-url")
    parser.add_argument("--extra-index-url", action="append")
    parser.add_argument("--interpreter-constraint")
    parsed, unknown = parser.parse_known_args(args)
    if unknown:
        raise ValueError(f"encountered unknown arguments in requirements: {unknown}")

    return RequirementSpec(
        requirements=tuple(parse_requirement(x) for x in parsed.packages or []),
        index_url=parsed.index_url,
        extra_index_urls=tuple(parsed.extra_index_url or ()),
        interpreter_constraint=parsed.interpreter_constraint,
    )
to_args
to_args(
    base_dir: Path = Path("."),
    with_options: bool = True,
    with_requirements: bool = True,
) -> list[str]

Converts the requirements back to Pip install arguments.

:param base_dir: The base directory that relative :class:LocalRequirements should be considered relative to. :param with_requirements: Can be set to False to not return requirements in the argument, just the index URLs.

Source code in kraken/common/_requirements.py
def to_args(
    self,
    base_dir: Path = Path("."),
    with_options: bool = True,
    with_requirements: bool = True,
) -> list[str]:
    """Converts the requirements back to Pip install arguments.

    :param base_dir: The base directory that relative :class:`LocalRequirement`s should be considered relative to.
    :param with_requirements: Can be set to `False` to not return requirements in the argument, just the index URLs.
    """

    args = []
    if with_options and self.index_url:
        args += ["--index-url", self.index_url]
    if with_options:
        for url in self.extra_index_urls:
            args += ["--extra-index-url", url]
    if with_requirements:
        args += flatten(req.to_args(base_dir) for req in self.requirements)
    return args
to_hash
to_hash(algorithm: str = 'sha256') -> str

Hash the requirements spec to a hexdigest.

Source code in kraken/common/_requirements.py
def to_hash(self, algorithm: str = "sha256") -> str:
    """Hash the requirements spec to a hexdigest."""

    hash_parts = [str(req) for req in self.requirements] + ["::pythonpath"] + list(self.pythonpath)
    hash_parts += ["::interpreter_constraint", self.interpreter_constraint or ""]
    return hashlib.new(algorithm, ":".join(hash_parts).encode()).hexdigest()
with_pythonpath
with_pythonpath(path: Iterable[str]) -> RequirementSpec

Adds the given pythonpath and returns a new instance.

Source code in kraken/common/_requirements.py
def with_pythonpath(self, path: Iterable[str]) -> "RequirementSpec":
    """Adds the given pythonpath and returns a new instance."""

    return self.replace(pythonpath=(*self.pythonpath, *path))
with_requirements
with_requirements(
    reqs: Iterable[str | Requirement],
) -> RequirementSpec

Adds the given requirements and returns a new instance.

Source code in kraken/common/_requirements.py
def with_requirements(self, reqs: Iterable["str | Requirement"]) -> "RequirementSpec":
    """Adds the given requirements and returns a new instance."""

    requirements = list(self.requirements)
    for req in reqs:
        if isinstance(req, str):
            req = parse_requirement(req)
        requirements.append(req)

    return self.replace(requirements=tuple(requirements))

ScriptPicker

Bases: ScriptRunner

Base class for picking the right script file in a directory based on a few criteria.

Source code in kraken/common/_runner.py
class ScriptPicker(ScriptRunner):
    """
    Base class for picking the right script file in a directory based on a few criteria.
    """

    def __init__(self, filenames: Sequence[str]) -> None:
        self.filenames = list(filenames)

    def find_script(self, directory: Path) -> "Path | None":
        for filename in self.filenames:
            script = directory / filename
            if script.is_file():
                return script
        return None

ScriptRunner

Bases: ABC

Abstract class for script runners. Implementations of this class are used to detect a script in a directory and to actually run it. The Kraken wrapper and build system both use this to run a build script, which for the wrapper is needed to extract the build script metadata.

Source code in kraken/common/_runner.py
class ScriptRunner(ABC):
    """
    Abstract class for script runners. Implementations of this class are used to detect a script in a directory
    and to actually run it. The Kraken wrapper and build system both use this to run a build script, which for
    the wrapper is needed to extract the build script metadata.
    """

    @abstractmethod
    def find_script(self, directory: Path) -> "Path | None":
        raise NotImplementedError(self)

    @abstractmethod
    def execute_script(self, script: Path, scope: dict[str, Any]) -> None:
        raise NotImplementedError(self)

    @abstractmethod
    def has_buildscript_call(self, script: Path) -> bool:
        """
        Implement a heuristic to check if the script implements a call to the :func:`buildscript` function.
        """

        raise NotImplementedError(self)

    @abstractmethod
    def get_buildscript_call_recommendation(self, metadata: BuildscriptMetadata) -> str:
        """
        Make a recommendation to the user for the code the user should put into their build script for the
        :func:`buildscript` call that is required by Kraken wrapper.
        """

        raise NotImplementedError(self)
get_buildscript_call_recommendation abstractmethod
get_buildscript_call_recommendation(
    metadata: BuildscriptMetadata,
) -> str

Make a recommendation to the user for the code the user should put into their build script for the :func:buildscript call that is required by Kraken wrapper.

Source code in kraken/common/_runner.py
@abstractmethod
def get_buildscript_call_recommendation(self, metadata: BuildscriptMetadata) -> str:
    """
    Make a recommendation to the user for the code the user should put into their build script for the
    :func:`buildscript` call that is required by Kraken wrapper.
    """

    raise NotImplementedError(self)
has_buildscript_call abstractmethod
has_buildscript_call(script: Path) -> bool

Implement a heuristic to check if the script implements a call to the :func:buildscript function.

Source code in kraken/common/_runner.py
@abstractmethod
def has_buildscript_call(self, script: Path) -> bool:
    """
    Implement a heuristic to check if the script implements a call to the :func:`buildscript` function.
    """

    raise NotImplementedError(self)

Supplier

Bases: Generic[T], ABC

Base class for value suppliers.

Source code in kraken/common/supplier.py
class Supplier(Generic[T], abc.ABC):
    """Base class for value suppliers."""

    class Empty(Exception):
        """Raised when a supplier cannot provide a value."""

        def __init__(self, supplier: "Supplier[Any]", message: "str | None" = None) -> None:
            self.supplier = supplier
            self.message = message

        def __str__(self) -> str:
            if self.message:
                return f"{self.message} ({self.supplier})"
            else:
                return str(self.supplier)

    @abc.abstractmethod
    def derived_from(self) -> Iterable["Supplier[Any]"]:
        """Return an iterable that yields all suppliers that this supplier is derived from."""

    @abc.abstractmethod
    def get(self) -> T:
        """Return the value of the supplier. Depending on the implemenmtation, this may defer to other suppliers."""

    def get_or(self, fallback: U) -> "T | U":
        """Return the value of the supplier, or the *fallback* value if the supplier is empty."""
        try:
            return self.get()
        except Supplier.Empty:
            return fallback

    def get_or_raise(self, get_exception: Callable[[], Exception]) -> T:
        """Return the value of the supplier, or raise the exception provided by *get_exception* if empty."""
        try:
            return self.get()
        except Supplier.Empty:
            raise get_exception()

    def is_empty(self) -> bool:
        """Returns `True` if the supplier is empty."""
        try:
            self.get()
        except Supplier.Empty:
            return True
        else:
            return False

    def is_filled(self) -> bool:
        """Returns `True` if the supplier is not empty."""
        return not self.is_empty()

    def is_void(self) -> bool:
        return False

    def map(self, func: Callable[[T], U]) -> "Supplier[U]":
        """Maps *func* over the value in the supplier."""

        return MapSupplier(func, self)

    def once(self) -> "Supplier[T]":
        """Cache the value forever once :attr:`get` is called."""

        return OnceSupplier(self)

    def __getitem__(self: "Supplier[Mapping[K, V]]", key: K) -> "GetItemSupplier[V]":
        return GetItemSupplier(key, self)

    def lineage(self) -> Iterable[tuple["Supplier[Any]", list["Supplier[Any]"]]]:
        """Iterates over all suppliers in the lineage.

        Yields:
            A supplier and the suppliers it is derived from.
        """

        stack: list["Supplier[Any]"] = [self]
        while stack:
            current = stack.pop(0)
            derived_from = list(current.derived_from())
            yield current, derived_from
            stack += derived_from

    @staticmethod
    def of(value: "T | Supplier[T]", derived_from: Sequence["Supplier[Any]"] = ()) -> "Supplier[T]":
        if isinstance(value, Supplier):
            return value
        return OfSupplier(value, derived_from)

    @staticmethod
    def of_callable(func: Callable[[], T], derived_from: Sequence["Supplier[Any]"] = ()) -> "Supplier[T]":
        return OfCallableSupplier(func, derived_from)

    @staticmethod
    def void(from_exc: "Exception | None" = None, derived_from: Sequence["Supplier[Any]"] = ()) -> "Supplier[T]":
        """Returns a supplier that always raises :class:`Empty`."""

        return VoidSupplier(from_exc, derived_from)

    def __repr__(self) -> str:
        try:
            value = self.get_or(NotSet.Value)
        except Exception as exc:
            inner = f"<exception reading value: {exc}>"
        else:
            if value is NotSet.Value:
                inner = "<empty>"
            else:
                inner = f"value={value!r}"
        return f"{type(self).__name__}({inner})"
Empty

Bases: Exception

Raised when a supplier cannot provide a value.

Source code in kraken/common/supplier.py
class Empty(Exception):
    """Raised when a supplier cannot provide a value."""

    def __init__(self, supplier: "Supplier[Any]", message: "str | None" = None) -> None:
        self.supplier = supplier
        self.message = message

    def __str__(self) -> str:
        if self.message:
            return f"{self.message} ({self.supplier})"
        else:
            return str(self.supplier)
derived_from abstractmethod
derived_from() -> Iterable[Supplier[Any]]

Return an iterable that yields all suppliers that this supplier is derived from.

Source code in kraken/common/supplier.py
@abc.abstractmethod
def derived_from(self) -> Iterable["Supplier[Any]"]:
    """Return an iterable that yields all suppliers that this supplier is derived from."""
get abstractmethod
get() -> T

Return the value of the supplier. Depending on the implemenmtation, this may defer to other suppliers.

Source code in kraken/common/supplier.py
@abc.abstractmethod
def get(self) -> T:
    """Return the value of the supplier. Depending on the implemenmtation, this may defer to other suppliers."""
get_or
get_or(fallback: U) -> T | U

Return the value of the supplier, or the fallback value if the supplier is empty.

Source code in kraken/common/supplier.py
def get_or(self, fallback: U) -> "T | U":
    """Return the value of the supplier, or the *fallback* value if the supplier is empty."""
    try:
        return self.get()
    except Supplier.Empty:
        return fallback
get_or_raise
get_or_raise(get_exception: Callable[[], Exception]) -> T

Return the value of the supplier, or raise the exception provided by get_exception if empty.

Source code in kraken/common/supplier.py
def get_or_raise(self, get_exception: Callable[[], Exception]) -> T:
    """Return the value of the supplier, or raise the exception provided by *get_exception* if empty."""
    try:
        return self.get()
    except Supplier.Empty:
        raise get_exception()
is_empty
is_empty() -> bool

Returns True if the supplier is empty.

Source code in kraken/common/supplier.py
def is_empty(self) -> bool:
    """Returns `True` if the supplier is empty."""
    try:
        self.get()
    except Supplier.Empty:
        return True
    else:
        return False
is_filled
is_filled() -> bool

Returns True if the supplier is not empty.

Source code in kraken/common/supplier.py
def is_filled(self) -> bool:
    """Returns `True` if the supplier is not empty."""
    return not self.is_empty()
lineage
lineage() -> (
    Iterable[tuple[Supplier[Any], list[Supplier[Any]]]]
)

Iterates over all suppliers in the lineage.

Yields:

Type Description
Iterable[tuple[Supplier[Any], list[Supplier[Any]]]]

A supplier and the suppliers it is derived from.

Source code in kraken/common/supplier.py
def lineage(self) -> Iterable[tuple["Supplier[Any]", list["Supplier[Any]"]]]:
    """Iterates over all suppliers in the lineage.

    Yields:
        A supplier and the suppliers it is derived from.
    """

    stack: list["Supplier[Any]"] = [self]
    while stack:
        current = stack.pop(0)
        derived_from = list(current.derived_from())
        yield current, derived_from
        stack += derived_from
map
map(func: Callable[[T], U]) -> Supplier[U]

Maps func over the value in the supplier.

Source code in kraken/common/supplier.py
def map(self, func: Callable[[T], U]) -> "Supplier[U]":
    """Maps *func* over the value in the supplier."""

    return MapSupplier(func, self)
once
once() -> Supplier[T]

Cache the value forever once :attr:get is called.

Source code in kraken/common/supplier.py
def once(self) -> "Supplier[T]":
    """Cache the value forever once :attr:`get` is called."""

    return OnceSupplier(self)
void staticmethod
void(
    from_exc: Exception | None = None,
    derived_from: Sequence[Supplier[Any]] = (),
) -> Supplier[T]

Returns a supplier that always raises :class:Empty.

Source code in kraken/common/supplier.py
@staticmethod
def void(from_exc: "Exception | None" = None, derived_from: Sequence["Supplier[Any]"] = ()) -> "Supplier[T]":
    """Returns a supplier that always raises :class:`Empty`."""

    return VoidSupplier(from_exc, derived_from)

TomlConfigFile

Bases: MutableMapping[str, Any]

A helper class that reads and writes a TOML configuration file.

Source code in kraken/common/_tomlconfig.py
class TomlConfigFile(MutableMapping[str, Any]):
    """
    A helper class that reads and writes a TOML configuration file.
    """

    def __init__(self, path: Path) -> None:
        self.path = path
        self._data: "Dict[str, Any] | None" = None

    def _get_data(self) -> "Dict[str, Any]":
        if self._data is None:
            if self.path.is_file():
                self._data = tomli.loads(self.path.read_text())
            else:
                self._data = {}
        return self._data

    def __getitem__(self, key: str) -> Any:
        return self._get_data()[key]

    def __setitem__(self, key: str, value: Any) -> None:
        self._get_data()[key] = value

    def __delitem__(self, key: str) -> None:
        del self._get_data()[key]

    def __len__(self) -> int:
        return len(self._get_data())

    def __iter__(self) -> Iterator[str]:
        return iter(self._get_data())

    def save(self) -> None:
        self.path.parent.mkdir(exist_ok=True, parents=True)
        self.path.write_text(tomli_w.dumps(self._get_data()))

lazy_str

Delegates to a function to convert to a string.

Source code in kraken/common/_text.py
class lazy_str:
    """
    Delegates to a function to convert to a string.
    """

    def __init__(self, func: Callable[[], str]) -> None:
        self._func = func

    def __str__(self) -> str:
        return self._func()

appending_to_sys_path

appending_to_sys_path(
    path: Iterable[str],
) -> Iterator[None]

A context manager to temporarily append to sys.path.

Source code in kraken/common/_importlib.py
@contextlib.contextmanager
def appending_to_sys_path(path: Iterable[str]) -> Iterator[None]:
    """
    A context manager to temporarily append to `sys.path`.
    """

    prev_path = sys.path[:]
    try:
        sys.path += path
        yield
    finally:
        sys.path[:] = prev_path

atomic_file_swap

atomic_file_swap(
    path: str | Path,
    mode: Literal["w", "wb"],
    always_revert: bool = False,
    create_dirs: bool = False,
) -> Iterator[IO[AnyStr]]

Performs an atomic write to a file while temporarily moving the original file to a different random location.

:param path: The path to replace. :param mode: The open mode for the file (text or binary). :param always_revert: If enabled, swap the old file back into place even if the with context has no errors. :param create_dirs: If the file does not exist, and neither do its parent directories, create the directories. The directory will be removed if the operation is reverted.

Source code in kraken/common/_fs.py
@contextlib.contextmanager  # type: ignore[arg-type, misc]
def atomic_file_swap(
    path: "str | Path",
    mode: Literal["w", "wb"],
    always_revert: bool = False,
    create_dirs: bool = False,
) -> Iterator[IO[AnyStr]]:
    """
    Performs an atomic write to a file while temporarily moving the original file to a different random location.

    :param path: The path to replace.
    :param mode: The open mode for the file (text or binary).
    :param always_revert: If enabled, swap the old file back into place even if the with context has no errors.
    :param create_dirs: If the file does not exist, and neither do its parent directories, create the directories.
            The directory will be removed if the operation is reverted.
    """

    path = Path(path)

    with contextlib.ExitStack() as exit_stack:
        if path.is_file():
            old = exit_stack.enter_context(
                tempfile.NamedTemporaryFile(
                    mode,
                    prefix=path.stem + "~",
                    suffix="~" + path.suffix,
                    dir=path.parent,
                )
            )
            old.close()
            os.rename(path, old.name)
        else:
            old = None

        def _revert() -> None:
            assert isinstance(path, Path)
            if path.is_file():
                path.unlink()
            if old is not None:
                os.rename(old.name, path)

        if not path.parent.is_dir() and create_dirs:
            path.parent.mkdir(exist_ok=True)
            _old_revert = _revert

            def _revert() -> None:
                assert isinstance(path, Path)
                try:
                    shutil.rmtree(path.parent)
                finally:
                    _old_revert()

        try:
            with path.open(mode) as new:
                yield new
        except BaseException:
            _revert()
            raise
        else:
            if always_revert:
                _revert()
            else:
                if old is not None:
                    os.remove(old.name)

buildscript

buildscript(
    *,
    index_url: str | None = None,
    extra_index_urls: Sequence[str] | None = None,
    requirements: Sequence[str] | None = None,
    additional_sys_paths: Sequence[str] | None = None,
    interpreter_constraint: str | None = None
) -> BuildscriptMetadata

Use this function to the dependencies and additional install options for the build environment of your Kraken build script that is installed and managed by Kraken-wrapper. This function must be called at the very beginning of your .kraken.py build script at the root of your project.

Example:

from kraken.common import buildscript
buildscript(
    requirements=["kraken-build"],
)

from kraken.std import ...

You can depend on local dependencies and Python packages from URLs by prefixing them with <package name> @:

buildscript(requirements=[
    "kraken-build @ git+https://github.com/kraken-build/kraken.git@nr/python-project#subdirectory=kraken-build"
])

Parameters:

Name Type Description Default
index_url str | None

The index URL for Python packages to install from. If this is a private package registry, the credentials can be configured with the krakenw auth command.

None
extra_index_urls Sequence[str] | None

Additional index URLs for Python packages to install from.

None
requirements Sequence[str] | None

A list of Python package requirements to install. This usually contains at least kraken-build or some internal extension module that in turn depends on kraken-build.

None
additional_sys_paths Sequence[str] | None

Additional system paths to add to the Python environment.

None
interpreter_constraint str | None

Constraints for the Python interpreter that the build must be run with.

None
Source code in kraken/common/_buildscript.py
def buildscript(
    *,
    index_url: "str | None" = None,
    extra_index_urls: "Sequence[str] | None" = None,
    requirements: "Sequence[str] | None" = None,
    additional_sys_paths: "Sequence[str] | None" = None,
    interpreter_constraint: str | None = None,
) -> BuildscriptMetadata:
    """
    Use this function to the dependencies and additional install options for the build environment of your Kraken
    build script that is installed and managed by Kraken-wrapper. This function must be called at the very beginning
    of your `.kraken.py` build script at the root of your project.

    __Example:__

    ```py
    from kraken.common import buildscript
    buildscript(
        requirements=["kraken-build"],
    )

    from kraken.std import ...
    ```

    You can depend on local dependencies and Python packages from URLs by prefixing them with `<package name> @`:

    ```py
    buildscript(requirements=[
        "kraken-build @ git+https://github.com/kraken-build/kraken.git@nr/python-project#subdirectory=kraken-build"
    ])
    ```

    Args:
        index_url: The index URL for Python packages to install from. If this is a private package registry, the
            credentials can be configured with the `krakenw auth` command.
        extra_index_urls: Additional index URLs for Python packages to install from.
        requirements: A list of Python package requirements to install. This usually contains at least `kraken-build`
            or some internal extension module that in turn depends on `kraken-build`.
        additional_sys_paths: Additional system paths to add to the Python environment.
        interpreter_constraint: Constraints for the Python interpreter that the build must be run with.
    """

    from kraken.core import Project

    if (project := Project.current(None)) and project != project.context.root_project:
        raise RuntimeError("buildscript() should be called only from the root project")

    metadata = BuildscriptMetadata(
        index_url=index_url,
        extra_index_urls=list(extra_index_urls or ()),
        requirements=list(requirements or ()),
        additional_sys_paths=list(additional_sys_paths or ()),
        interpreter_constraint=interpreter_constraint,
    )

    if _global.mode == _Mode.RAISE:
        raise BuildscriptMetadataException(metadata)
    elif _global.mode == _Mode.CALLBACK:
        assert _global.func is not None
        _global.func(metadata)

    return metadata

flatten

flatten(it: Iterable[Iterable[T]]) -> Iterable[T]

Flatten a nested iterable into a single iterable.

Source code in kraken/common/_generic.py
def flatten(it: Iterable[Iterable[T]]) -> Iterable[T]:
    """
    Flatten a nested iterable into a single iterable.
    """

    for item in it:
        yield from item

get_terminal_width

get_terminal_width(default: int = 80) -> int

Returns the terminal width through :func:os.get_terminal_size, falling back to the COLUMNS environment variable. If neither is available, return default.

Source code in kraken/common/_terminal.py
def get_terminal_width(default: int = 80) -> int:
    """
    Returns the terminal width through :func:`os.get_terminal_size`, falling back to the `COLUMNS` environment
    variable. If neither is available, return *default*.
    """

    try:
        terminal_width = os.get_terminal_size().columns
    except OSError:
        try:
            terminal_width = int(os.getenv("COLUMNS", ""))
        except ValueError:
            terminal_width = default
    return terminal_width

inline_text

inline_text(text: str) -> str

A helper that dedents text and replaces a single newline with a single whitespace, yet double newlines are kept in place. To enforce a normal linebreak, add a backslash before a single newline.

Source code in kraken/common/_text.py
def inline_text(text: str) -> str:
    """
    A helper that dedents *text* and replaces a single newline with a single whitespace, yet double newlines are
    kept in place. To enforce a normal linebreak, add a backslash before a single newline.
    """

    marker = f"---{uuid.uuid1()}---"

    text = textwrap.dedent(text).strip()
    text = text.replace("\n\n", marker)
    text = re.sub(r"(?<!\\)\n(?!\n)", " ", text)  # Replace single line newlines by whitespace unless escaped
    text = re.sub(r"\\\n", "\n", text)
    text = text.replace(marker, "\n\n")
    return text

not_none

not_none(
    v: T | None,
    message: str | Callable[[], str] = "expected not-None",
) -> T

Raise a :class:RuntimeError if v is None, otherwise return v.

Source code in kraken/common/_generic.py
def not_none(v: T | None, message: str | Callable[[], str] = "expected not-None") -> T:
    """
    Raise a :class:`RuntimeError` if *v* is `None`, otherwise return *v*.
    """

    if v is None:
        if callable(message):
            message = message()
        raise RuntimeError(message)
    return v

parse_requirement

parse_requirement(
    value: str,
) -> PipRequirement | LocalRequirement | UrlRequirement

Parse a string as a requirement. Return a :class:PipRequirement or :class:LocalRequirement.

Source code in kraken/common/_requirements.py
def parse_requirement(value: str) -> "PipRequirement | LocalRequirement | UrlRequirement":
    """
    Parse a string as a requirement. Return a :class:`PipRequirement` or :class:`LocalRequirement`.
    """

    if match := re.match(r"(.+?)@(.+)", value):
        name, value = match.group(1).strip(), match.group(2).strip()
        if match := re.match(r"(git\+)?(ssh|https?)://.+", value):
            return UrlRequirement(name, value)
        return LocalRequirement(name, Path(value))

    match = re.match(r"([\w\d\-\_]+)(.*)", value)
    if match:
        return PipRequirement(match.group(1), match.group(2).strip() or None)

    raise ValueError(f"invalid requirement: {value!r}")

pluralize

pluralize(word: str, count: int | SupportsLen) -> str

Very naive implementation to pluralize english words (simply appends an s).

Source code in kraken/common/_text.py
def pluralize(word: str, count: "int | SupportsLen") -> str:
    """
    Very naive implementation to pluralize english words (simply appends an s).
    """

    if not isinstance(count, int):
        count = len(count)
    return word if count == 1 else f"{word}s"

propagate_argparse_formatter_to_subparser

propagate_argparse_formatter_to_subparser(
    parser: ArgumentParser,
) -> None

Propagates the formatter on parser to all subparsers.

Source code in kraken/common/_argparse.py
def propagate_argparse_formatter_to_subparser(parser: argparse.ArgumentParser) -> None:
    """Propagates the formatter on *parser* to all subparsers."""

    for action in parser._actions:
        if isinstance(action, argparse._SubParsersAction):
            for subparser in set(action._name_parser_map.values()):
                subparser.formatter_class = parser.formatter_class
                propagate_argparse_formatter_to_subparser(subparser)

safe_rmpath

safe_rmpath(path: Path) -> None

Removes the specified path from the file system. If it is a directory, :func:shutil.rmtree will be used with ignore_errors enabled.

Source code in kraken/common/_fs.py
def safe_rmpath(path: Path) -> None:
    """
    Removes the specified *path* from the file system. If it is a directory, :func:`shutil.rmtree` will be used
    with `ignore_errors` enabled.
    """

    if path.is_dir():
        shutil.rmtree(path, ignore_errors=True)
    else:
        try:
            path.unlink()
        except OSError as exc:
            if exc.errno != errno.ENOENT:
                raise