Skip to content

kraken.std.cargo

kraken.std.cargo

Provides tasks for Rust projects that build using Cargo.

CargoAuthProxyTask

Bases: BackgroundTask

This task starts a local proxy server that injects HTTP basic authentication credentials in HTTP(S) requests to a Cargo repository to work around Cargo's current inability to interface with private repositories.

Source code in kraken/std/cargo/tasks/cargo_auth_proxy_task.py
class CargoAuthProxyTask(BackgroundTask):
    """This task starts a local proxy server that injects HTTP basic authentication credentials in HTTP(S) requests
    to a Cargo repository to work around Cargo's current inability to interface with private repositories."""

    description = "Creates a proxy that injects credentials to route Cargo traffic through."

    #: The Cargo config file to update.
    cargo_config_file: Property[Path] = Property.default(".cargo/config.toml")

    #: A list of the Cargo registries for which to inject credentials for based on matching paths.
    registries: Property[list[CargoRegistry]]

    #: The URL of the proxy. This property is only valid and accessible for tasks that immediately directly depend
    #: on this task and will be invalidated at the task teardown.
    proxy_url: Property[str] = Property.output()

    #: The path to the certificate file that needs to be trusted in order to talk to the proxy over HTTPS.
    proxy_cert_file: Property[Path] = Property.output()

    #: Path to the mitmweb binary.
    mitmweb_bin: Property[str] = Property.default("mitmweb")

    #: Additional args for the mitmproxy.
    #: We pass `--no-http2` by default as that breaks Cargo HTTP/2 multiplexing. See
    #: https://github.com/rust-lang/cargo/issues/12202
    mitmproxy_additional_args: Property[Sequence[str]] = Property.default_factory(lambda: ["--no-http2"])

    @contextlib.contextmanager
    def _inject_config(self) -> Iterator[None]:
        """Injects the proxy URL and cert file into the Cargo and Git configuration."""

        # TODO (@NiklasRosenstein): Can we get away without temporarily modifying the GLOBAL Git config?

        cargo_config_toml = self.project.directory / self.cargo_config_file.get()
        cargo_config = tomli.loads(cargo_config_toml.read_text()) if cargo_config_toml.is_file() else {}

        git_config_file = Path("~/.gitconfig").expanduser()
        git_config = load_gitconfig(git_config_file) if git_config_file.is_file() else {}

        with contextlib.ExitStack() as exit_stack:
            # Temporarily update the Cargo configuration file to inject the HTTP(S) proxy and CA info.
            cargo_http = cargo_config.setdefault("http", {})
            cargo_http["proxy"] = self.proxy_url.get()
            cargo_http["cainfo"] = str(self.proxy_cert_file.get().absolute())

            for registry in self.registries.get():
                if not registry.read_credentials:
                    continue
                if registry.alias in cargo_config["registries"]:
                    entry = cargo_config["registries"][registry.alias]
                    entry["token"] = f"Bearer {registry.read_credentials[1]}"

            logger.info("updating %s", cargo_config_toml)
            fp = exit_stack.enter_context(
                atomic_file_swap(cargo_config_toml, "w", always_revert=True, create_dirs=True)
            )
            fp.write(tomli_w.dumps(cargo_config))
            fp.close()

            # Temporarily update the Git configuration file to inject the HTTP(S) proxy and CA info.
            git_http = git_config.setdefault("http", {})
            git_http["proxy"] = self.proxy_url.get()
            git_http["sslCAInfo"] = str(self.proxy_cert_file.get().absolute())
            logger.info("updating %s", git_config_file)
            fp = exit_stack.enter_context(atomic_file_swap(git_config_file, "w", always_revert=True, create_dirs=True))
            fp.write(dump_gitconfig(git_config))
            fp.close()

            yield

    # Task

    def start_background_task(self, exit_stack: contextlib.ExitStack) -> TaskStatus:
        auth: dict[str, tuple[str, str]] = {}
        for registry in self.registries.get():
            if not registry.read_credentials:
                continue
            host = not_none(urlparse(registry.index).hostname)
            auth[host] = registry.read_credentials

        proxy_url, cert_file = start_mitmweb_proxy(
            auth=auth, mitmweb_bin=self.mitmweb_bin.get(), additional_args=self.mitmproxy_additional_args.get()
        )
        self.proxy_url.set(proxy_url)
        self.proxy_cert_file.set(cert_file)
        exit_stack.callback(lambda: self.proxy_url.clear())
        exit_stack.callback(lambda: self.proxy_cert_file.clear())
        exit_stack.enter_context(self._inject_config())
        return TaskStatus.started()

CargoBuildTask

Bases: Task

This task runs cargo build using the specified parameters. It will respect the authentication credentials configured in :attr:CargoProjectSettings.auth.

Source code in kraken/std/cargo/tasks/cargo_build_task.py
class CargoBuildTask(Task):
    """This task runs `cargo build` using the specified parameters. It will respect the authentication
    credentials configured in :attr:`CargoProjectSettings.auth`."""

    #: The build target (debug or release). If this is anything else, the :attr:`out_binaries` will be set
    #: to an empty list instead of parsed from the Cargo manifest.
    target: Property[str]

    #: Additional arguments to pass to the Cargo command-line.
    additional_args: Property[list[str]] = Property.default_factory(list)

    #: Whether to build incrementally or not.
    incremental: Property[bool | None] = Property.default(None)

    #: Whether to pass --locked to cargo or not.
    #:
    #: When set to None, --locked is passed if Cargo.lock exists.
    locked: Property[bool | None] = Property.default(None)

    #: Environment variables for the Cargo command.
    env: Property[dict[str, str]] = Property.default_factory(dict)

    #: Number of times to retry before failing this job
    retry_attempts: Property[int] = Property.default(0)

    #: An output property for the Cargo binaries that are being produced by this build.
    out_binaries: Property[list[CargoBinaryArtifact]] = Property.output()

    #: An output property for the Cargo libraries that are being produced by this build.
    out_libraries: Property[list[CargoLibraryArtifact]] = Property.output()

    #: Flag indicating if we should execute this command from the project directory
    from_project_dir: Property[bool] = Property.default(False)

    def __init__(self, name: str, project: Project) -> None:
        super().__init__(name, project)

    def get_description(self) -> str | None:
        command = self.get_cargo_command({})
        self.make_safe(command, {})
        return f"Run `{' '.join(command)}`."

    def get_cargo_command_additional_flags(self) -> list[str]:
        return shlex.split(os.environ.get("KRAKEN_CARGO_BUILD_FLAGS", ""))

    def should_add_locked_flag(self) -> bool:
        locked = self.locked.get()
        if locked is None:
            # pass --locked if we have a lock file
            # since we may be in a workspace member, we need to search up!
            for parent in (Path.cwd() / "Cargo.toml").parents:
                if (parent / "Cargo.lock").exists():
                    return True
        elif locked:
            # if locked is True, we should *always* pass --locked.
            # the expectation is that the command will fail w/o Cargo.lock.
            return True
        return False

    def get_additional_args(self) -> list[str]:
        args = self.additional_args.get()
        if "--locked" not in args and self.should_add_locked_flag():
            args = ["--locked", *args]
        return args

    def get_cargo_command(self, env: dict[str, str]) -> list[str]:
        incremental = self.incremental.get()
        if incremental is not None:
            env["CARGO_INCREMENTAL"] = "1" if incremental else "0"
        return ["cargo", "build"] + self.get_additional_args()

    def make_safe(self, args: list[str], env: dict[str, str]) -> None:
        pass

    def execute(self) -> TaskStatus:
        env = self.env.get()
        command = self.get_cargo_command(env) + self.get_cargo_command_additional_flags()

        safe_command = command[:]
        safe_env = env.copy()
        self.make_safe(safe_command, safe_env)
        self.logger.info("%s [env: %s]", safe_command, safe_env)

        out_binaries: list[CargoBinaryArtifact] = []
        out_libraries_candidates: list[CargoLibraryArtifact] = []
        if self.target.get_or(None) in ("debug", "release"):
            # Expose the output binaries that are produced by this task.
            # We only expect a binary to be built if the target is debug or release.
            manifest = CargoMetadata.read(self.project.directory, self.from_project_dir.get())
            target_dir = manifest.target_directory / self.target.get()
            for artifact in manifest.artifacts:
                # Rust binaries have an extensionless name whereas libraries are prefixed with "lib" and suffixed with
                #
                # - ".rlib" for Rust libraries
                # - ".so" (Linux), ".dylib" (macOS) or ".dll" (Windows) for dynamic Rust and system libraries
                # - ".a" (Linux, macOS) or ".lib" (Windows) for static system libraries
                if artifact.kind is ArtifactKind.BIN:
                    out_binaries.append(CargoBinaryArtifact(artifact.name, target_dir / artifact.name))
                elif artifact.kind is ArtifactKind.LIB:
                    base_name = f"lib{artifact.name}"
                    for file_extension in ["rlib", "so", "dylib", "dll", "a", "lib"]:
                        filename = ".".join([base_name.replace("-", "_"), file_extension])
                        out_libraries_candidates.append(CargoLibraryArtifact(base_name, target_dir / filename))

        total_attempts = self.retry_attempts.get() + 1

        result = -1
        while total_attempts > 0:
            result = sp.call(command, cwd=self.project.directory, env={**os.environ, **env})

            if result == 0:
                # Check that binaries which were due have been built.
                for out_bin in out_binaries:
                    assert out_bin.path.is_file(), out_bin
                self.out_binaries.set(out_binaries)

                # Check that at least one library has been built if libraries were due.
                out_libraries = []
                if len(out_libraries_candidates) != 0:
                    for out_libraries_candidate in out_libraries_candidates:
                        # Since we generate all possible file extensions, we must only keep the ones that exist
                        if out_libraries_candidate.path.is_file():
                            out_libraries.append(out_libraries_candidate)
                    assert (
                        len(out_libraries) != 0
                    ), f'No libraries were built even though some were due, e.g. "{out_libraries_candidates[0].name}"'
                self.out_libraries.set(out_libraries)
                break
            else:
                total_attempts -= 1
                self.logger.warn("%s failed with result %s", safe_command, result)
                self.logger.warn("There are %s attempts remaining", total_attempts)
                if total_attempts > 0:
                    self.logger.info("Waiting for 10 seconds before retrying..")
                    time.sleep(10)

        return TaskStatus.from_exit_code(safe_command, result)

CargoClippyTask

Bases: CargoBuildTask

Runs cargo clippy for linting or applying suggestions.

Source code in kraken/std/cargo/tasks/cargo_clippy_task.py
class CargoClippyTask(CargoBuildTask):
    """Runs `cargo clippy` for linting or applying suggestions."""

    workspace: Property[bool] = Property.default(True)
    all_features: Property[bool] = Property.default(True)
    fix: Property[bool] = Property.default(False)
    allow: Property[str | None] = Property.default("staged")
    deny_warnings: Property[bool] = Property.default(False)

    # CargoBuildTask

    def get_cargo_command(self, env: dict[str, str]) -> list[str]:
        command = ["cargo", "clippy"]
        if self.workspace.get():
            command += ["--workspace"]
        if self.all_features.get():
            command += ["--all-features"]
        if self.fix.get():
            command += ["--fix"]
            allow = self.allow.get()
            if allow == "staged":
                command += ["--allow-staged"]
            elif allow == "dirty":
                command += ["--allow-dirty", "--allow-staged"]
            elif allow is not None:
                raise ValueError(f"invalid allow: {allow!r}")
        # must be last, as this argument it passed to cargo check
        if self.deny_warnings.get():
            command += ["--", "-D warnings"]
        return command

CargoProject dataclass

Container for all Cargo related settings that can be automatically managed from a Kraken build.

Source code in kraken/std/cargo/config.py
@dataclasses.dataclass
class CargoProject:
    """Container for all Cargo related settings that can be automatically managed from a Kraken build."""

    #: The registries for the Cargo project. We store the registrie's by their alias.
    registries: dict[str, CargoRegistry] = dataclasses.field(default_factory=dict)

    #: Environment variables for cargo build steps.
    build_env: dict[str, str] = dataclasses.field(default_factory=dict)

    def add_registry(
        self,
        alias: str,
        index: str,
        read_credentials: tuple[str, str] | None = None,
        publish_token: str | None = None,
    ) -> None:
        """Add a registry to the project.

        :param alias: The alias of the registry. This alias is used in` Cargo.toml` to describe which registry to look
            up a create in. It is also used to designate the registry to publish to in `cargo publish`.
        :param index: The registry index URL.
        :param read_credentials: A `(username, password)` tuple for reading from the repository (optional).
        :param publish_token: A token to publish to the repository (optional).
        """

        self.registries[alias] = CargoRegistry(alias, index, read_credentials, publish_token)

    @staticmethod
    def get_or_create(project: Project | None) -> CargoProject:
        project = project or Project.current()
        return project.find_metadata(CargoProject, CargoProject)
add_registry
add_registry(
    alias: str,
    index: str,
    read_credentials: tuple[str, str] | None = None,
    publish_token: str | None = None,
) -> None

Add a registry to the project.

:param alias: The alias of the registry. This alias is used inCargo.toml to describe which registry to look up a create in. It is also used to designate the registry to publish to in cargo publish. :param index: The registry index URL. :param read_credentials: A (username, password) tuple for reading from the repository (optional). :param publish_token: A token to publish to the repository (optional).

Source code in kraken/std/cargo/config.py
def add_registry(
    self,
    alias: str,
    index: str,
    read_credentials: tuple[str, str] | None = None,
    publish_token: str | None = None,
) -> None:
    """Add a registry to the project.

    :param alias: The alias of the registry. This alias is used in` Cargo.toml` to describe which registry to look
        up a create in. It is also used to designate the registry to publish to in `cargo publish`.
    :param index: The registry index URL.
    :param read_credentials: A `(username, password)` tuple for reading from the repository (optional).
    :param publish_token: A token to publish to the repository (optional).
    """

    self.registries[alias] = CargoRegistry(alias, index, read_credentials, publish_token)

CargoPublishTask

Bases: CargoBuildTask

Publish a Cargo crate.

Source code in kraken/std/cargo/tasks/cargo_publish_task.py
class CargoPublishTask(CargoBuildTask):
    """Publish a Cargo crate."""

    #: Path to the Cargo configuration file (defaults to `.cargo/config.toml`).
    cargo_config_file: Property[Path] = Property.default(".cargo/config.toml")

    #: Name of the package to publish (only requried for publishing packages from workspace)
    package_name: Property[str | None] = Property.default(None)

    #: The registry to publish the package to.
    registry: Property[CargoRegistry]

    #: Verify (build the crate).
    verify: Property[bool] = Property.default(True)

    #: Allow dirty worktree.
    allow_dirty: Property[bool] = Property.default(False)

    #: Version to be bumped up to
    version: Property[str | None] = Property.default(None)

    #: Cargo.toml which to temporarily bump
    cargo_toml_file: Property[Path] = Property.default("Config.toml")

    #: Allow Overwrite of existing packages
    allow_overwrite: Property[bool] = Property.default(False)

    def prepare(self) -> TaskStatus | None:
        """Checks if the crate@version already exists in the registry. If so, the task will be skipped"""
        if self.allow_overwrite.get():
            return TaskStatus.pending()

        manifest = CargoManifest.read(self.cargo_toml_file.get())
        manifest_package = manifest.package
        manifest_package_name = manifest_package.name if manifest_package is not None else None
        manifest_version = manifest_package.version if manifest_package is not None else None

        package_name = self.package_name.get() or manifest_package_name
        version = self.version.get() or manifest_version

        if not package_name:
            return TaskStatus.pending("Unable to verify package existence - unknown package name")
        if not version:
            return TaskStatus.pending("Unable to verify package existence - unknown version")

        try:
            return self._check_package_existence(package_name, version, self.registry.get())
        except Exception as e:
            logger.warn(
                "An error happened while checking for {} existence in %s, %s",
                package_name,
                self.registry.get().alias,
                e,
            )
            return TaskStatus.pending("Unable to verify package existence")

    def get_cargo_command(self, env: dict[str, str]) -> list[str]:
        super().get_cargo_command(env)
        registry = self.registry.get()
        if registry.publish_token is None:
            raise ValueError(f'registry {registry.alias!r} missing a "publish_token"')
        command = (
            ["cargo", "publish"]
            + (["--locked"] if self.should_add_locked_flag() else [])
            + self.additional_args.get()
            + ["--registry", registry.alias, "--token", registry.publish_token]
            + ([] if self.verify.get() else ["--no-verify"])
        )
        package_name = self.package_name.get()
        if package_name is not None:
            command += ["--package", package_name]
        if self.allow_dirty.get() and "--allow-dirty" not in command:
            command.append("--allow-dirty")
        return command

    def make_safe(self, args: list[str], env: dict[str, str]) -> None:
        args[args.index(not_none(self.registry.get().publish_token))] = "[MASKED]"
        super().make_safe(args, env)

    def __init__(self, name: str, project: Project) -> None:
        super().__init__(name, project)
        self._base_command = ["cargo", "publish"]

    def _get_updated_cargo_toml(self, version: str) -> str:
        manifest = CargoManifest.read(self.cargo_toml_file.get())
        if manifest.package is None:
            return manifest.to_toml_string()

        fixed_version_string = self._sanitize_version(version)
        manifest.package.version = fixed_version_string
        if manifest.workspace and manifest.workspace.package:
            manifest.workspace.package.version = version

        if self.registry.is_filled():
            CargoProject.get_or_create(self.project)
            registry = self.registry.get()
            if manifest.dependencies:
                self._push_version_to_path_deps(fixed_version_string, manifest.dependencies.data, registry.alias)
            if manifest.build_dependencies:
                self._push_version_to_path_deps(fixed_version_string, manifest.build_dependencies.data, registry.alias)
        return manifest.to_toml_string()

    def _push_version_to_path_deps(
        self, version_string: str, dependencies: dict[str, Any], registry_alias: str
    ) -> None:
        """For each dependency in the given dependencies, if the dependency is a `path` dependency, injects the current
        version and registry (required for publishing - path dependencies cannot be published alone)."""
        for dep_name in dependencies:
            dependency = dependencies[dep_name]
            if isinstance(dependency, dict):
                if "path" in dependency:
                    dependency["version"] = f"={version_string}"
                    dependency["registry"] = registry_alias

    def execute(self) -> TaskStatus:
        with contextlib.ExitStack() as stack:
            if (version := self.version.get()) is not None:
                content = self._get_updated_cargo_toml(version)
                fp = stack.enter_context(atomic_file_swap(self.cargo_toml_file.get(), "w", always_revert=True))
                fp.write(content)
                fp.close()
            result = super().execute()
        return result

    @staticmethod
    def _sanitize_version(version: str) -> str:
        """
        Cargo does not play nicely with semver metadata (ie. 1.0.1-dev3+abc123)
        We replace that to 1.0.1-dev3abc123
        """
        return version.replace("+", "")

    @classmethod
    def _check_package_existence(cls, package_name: str, version: str, registry: CargoRegistry) -> TaskStatus | None:
        """
        Checks wether the given `package_name`@`version` is indexed in the provided `registry`.

        Checking is done by reading from the registry's index HTTP API, following the
        [Index Format](https://doc.rust-lang.org/cargo/reference/registry-index.html) documentation
        """
        if not registry.index.startswith("sparse+"):
            return TaskStatus.pending("Unable to verify package existence - Only sparse registries are supported")
        index = registry.index.removeprefix("sparse+")
        index = index.removesuffix("/")

        # >> Index authentication
        session = requests.sessions.Session()
        config_response = session.get(f"{index}/config.json")
        if config_response.status_code == 401:
            if registry.read_credentials is None:
                return TaskStatus.pending(
                    "Unable to verify package existence - registry requires authentication, but no credentials set"
                )
            session.auth = registry.read_credentials
            config_response = session.get(f"{index}/config.json")
            if config_response.status_code % 200 != 0:
                logger.warn(config_response.text)
                return TaskStatus.pending(
                    "Unable to verify package existence - failed to download config.json file from registry"
                )

        # >> Index files layout
        # Reference: https://doc.rust-lang.org/cargo/reference/registry-index.html#index-files
        path = []
        if len(package_name) == 1:
            path = ["1"]
        elif len(package_name) == 2:
            path = ["2"]
        elif len(package_name) == 3:
            path = ["3", package_name.lower()[0]]
        else:
            package_name_lower = package_name.lower()
            path = [package_name_lower[0:2], package_name_lower[2:4]]

        # >> Download the index file
        index_path = "/".join(path + [package_name])
        index_response = session.get(f"{index}/{index_path}")

        if index_response.status_code in [404, 410, 451]:
            return TaskStatus.pending(f"Package {package_name} does not already exists in {registry.alias}")
        elif index_response.status_code % 200 != 0:
            logger.warn(index_response.text)
            return TaskStatus.pending("Unable to verify package existence - error when fetching package information")

        sanitized_version = cls._sanitize_version(version)

        # >> Search for relevant version in the index file
        for registry_version in index_response.text.split("\n"):
            # Index File is sometimes newline terminated
            if not registry_version:
                continue
            registry_version = cls._sanitize_version(json.loads(registry_version).get("vers", ""))
            if registry_version == sanitized_version:
                return TaskStatus.skipped(
                    f"Package {package_name} with version {version} already exists in {registry.alias}"
                )
        return TaskStatus.pending(
            f"Package {package_name} with version {version} does not already exists in {registry.alias}"
        )
prepare
prepare() -> TaskStatus | None

Checks if the crate@version already exists in the registry. If so, the task will be skipped

Source code in kraken/std/cargo/tasks/cargo_publish_task.py
def prepare(self) -> TaskStatus | None:
    """Checks if the crate@version already exists in the registry. If so, the task will be skipped"""
    if self.allow_overwrite.get():
        return TaskStatus.pending()

    manifest = CargoManifest.read(self.cargo_toml_file.get())
    manifest_package = manifest.package
    manifest_package_name = manifest_package.name if manifest_package is not None else None
    manifest_version = manifest_package.version if manifest_package is not None else None

    package_name = self.package_name.get() or manifest_package_name
    version = self.version.get() or manifest_version

    if not package_name:
        return TaskStatus.pending("Unable to verify package existence - unknown package name")
    if not version:
        return TaskStatus.pending("Unable to verify package existence - unknown version")

    try:
        return self._check_package_existence(package_name, version, self.registry.get())
    except Exception as e:
        logger.warn(
            "An error happened while checking for {} existence in %s, %s",
            package_name,
            self.registry.get().alias,
            e,
        )
        return TaskStatus.pending("Unable to verify package existence")

CargoRegistry dataclass

Represents a Cargo registry.

Source code in kraken/std/cargo/config.py
@dataclasses.dataclass
class CargoRegistry:
    """Represents a Cargo registry."""

    #: The registrt alias. This is used as an identifier when publishing the registry and when referencing a crate
    #: from the registry in the `Cargo.toml` dependencies.
    alias: str

    #: The URL of the Cargo registry index. This usually points to a Git repository, as that is how Cargo registries
    #: are stored. The index URL must be present in `.cargo/config.toml` for Cargo to consume crates from it.
    index: str

    #: Authentication credentials for reading from the registry. This is only needed if the registry is private and the
    #: index URL is an HTTP(S) URL. The credentials will be passed using HTTP Basic authentication.
    read_credentials: tuple[str, str] | None = None

    #: The publish token for this registry.
    publish_token: str | None = None

CargoSqlxDatabaseCreateTask

Bases: CargoBaseSqlxTask

Create a database using sqlx-cli.

Source code in kraken/std/cargo/tasks/cargo_sqlx_database_create.py
class CargoSqlxDatabaseCreateTask(CargoBaseSqlxTask):
    """Create a database using sqlx-cli."""

    description = "Create a database using sqlx-cli"

    def execute(self) -> TaskStatus:
        return self._execute_command(["database", "create"])

CargoSqlxDatabaseDropTask

Bases: CargoBaseSqlxTask

Drop a database using sqlx-cli.

Source code in kraken/std/cargo/tasks/cargo_sqlx_database_drop.py
class CargoSqlxDatabaseDropTask(CargoBaseSqlxTask):
    """Drop a database using sqlx-cli."""

    description = "Drop a database using sqlx-cli"

    def execute(self) -> TaskStatus:
        return self._execute_command(["database", "drop"])

CargoSqlxMigrateTask

Bases: CargoBaseSqlxTask

Apply SQL migrations using sqlx-cli.

Source code in kraken/std/cargo/tasks/cargo_sqlx_migrate.py
class CargoSqlxMigrateTask(CargoBaseSqlxTask):
    """Apply SQL migrations using sqlx-cli."""

    description = "Apply SQL migrations using sqlx-cli"
    migrations: Property[Path]

    def execute(self) -> TaskStatus:
        arguments = ["migrate", "run"]
        if self.migrations.is_filled():
            arguments.extend(["--source", str(self.migrations.get().absolute())])

        return self._execute_command(arguments)

CargoSqlxPrepareTask

Bases: CargoBaseSqlxTask

Generate sqlx's query-.json files for offline mode using sqlx-cli. If check=True, verify that the query-.json files are up-to-date with the current database schema and code queries.

Source code in kraken/std/cargo/tasks/cargo_sqlx_prepare.py
class CargoSqlxPrepareTask(CargoBaseSqlxTask):
    """Generate sqlx's query-*.json files for offline mode using sqlx-cli. If check=True, verify that the query-*.json
    files are up-to-date with the current database schema and code queries."""

    migrations: Property[Path]
    check: Property[bool] = Property.default(False)

    def execute(self) -> TaskStatus:
        arguments = ["prepare"]
        if self.check.get():
            arguments.append("--check")

        return self._execute_command(arguments)

    def get_description(self) -> str | None:
        if self.check.get():
            return "Check that query-*.json files are up-to-date with the current database schema and code queries"
        return "Generate the query-*.json files for offline mode"

CargoSyncConfigTask

Bases: RenderFileTask

This task updates the .cargo/config.toml file to inject configuration values.

Source code in kraken/std/cargo/tasks/cargo_sync_config_task.py
class CargoSyncConfigTask(RenderFileTask):
    """This task updates the `.cargo/config.toml` file to inject configuration values."""

    file: Property[Path]

    #: If enabled, the configuration file will be replaced rather than updated.
    replace: Property[bool] = Property.default(False)

    #: The global-credential-providers to set in the config. If not set, the config won't be touched. The providers
    #: must be specified in reverse order of precedence. Read more about credential providers here:
    #: https://doc.rust-lang.org/cargo/reference/registry-authentication.html
    global_credential_providers: Property[Sequence[str]] = Property.default(["cargo:token"])

    #: The registries to insert into the configuration.
    registries: Property[list[CargoRegistry]] = Property.default_factory(list)

    #: Enable fetching Cargo indexes with the Git CLI.
    git_fetch_with_cli: Property[bool]

    #: Whether to use the sparse protocol for crates.io.
    crates_io_protocol: Property[Literal["git", "sparse"]] = Property.default("sparse")

    def __init__(self, name: str, project: Project) -> None:
        super().__init__(name, project)
        self.file.setcallable(lambda: project.directory / ".cargo" / "config.toml")
        self.content.setcallable(lambda: self.get_file_contents(self.file.get()))

    def get_file_contents(self, file: Path) -> str | bytes:
        content = tomli.loads(file.read_text()) if not self.replace.get() and file.exists() else {}
        if self.global_credential_providers.is_set():
            if self.global_credential_providers.get() is None:
                content.setdefault("registry", {}).pop("global-credential-providers", None)
            else:
                content.setdefault("registry", {})["global-credential-providers"] = list(
                    self.global_credential_providers.get()
                )
        content.setdefault("registries", {})["crates-io"] = {"protocol": self.crates_io_protocol.get()}
        for registry in self.registries.get():
            content.setdefault("registries", {})[registry.alias] = {"index": registry.index}

        if self.git_fetch_with_cli.is_filled():
            if self.git_fetch_with_cli.get():
                content.setdefault("net", {})["git-fetch-with-cli"] = True
            else:
                if "net" in content:
                    content["net"].pop("git-fetch-with-cli", None)
        lines = []
        if self.replace.get():
            lines.append("# This file is managed by Kraken. Manual edits to this file will be overwritten.")
        else:
            lines.append(
                "# This file is partially managed by Kraken. Comments and manually added "
                "repositories are not preserved."
            )
        lines.append(tomli_w.dumps(content))
        return "\n".join(lines)

CargoTestTask

Bases: CargoBuildTask

This task runs cargo test using the specified parameters. It will respect the authentication credentials configured in :attr:CargoProjectSettings.auth.

Source code in kraken/std/cargo/tasks/cargo_test_task.py
class CargoTestTask(CargoBuildTask):
    """This task runs `cargo test` using the specified parameters. It will respect the authentication
    credentials configured in :attr:`CargoProjectSettings.auth`."""

    description = "Run `cargo test`."

    def get_cargo_command(self, env: dict[str, str]) -> list[str]:
        super().get_cargo_command(env)
        return ["cargo", "test"] + self.get_additional_args()

cargo_auth_proxy

cargo_auth_proxy(
    *, project: Project | None = None
) -> CargoAuthProxyTask

Creates a background task that the :func:cargo_build and :func:cargo_publish tasks will depend on to inject the read credentials for private registries into HTTPS requests made by Cargo. This is only needed when private registries are used.

Source code in kraken/std/cargo/__init__.py
def cargo_auth_proxy(*, project: Project | None = None) -> CargoAuthProxyTask:
    """Creates a background task that the :func:`cargo_build` and :func:`cargo_publish` tasks will depend on to
    inject the read credentials for private registries into HTTPS requests made by Cargo. This is only needed when
    private registries are used."""

    project = project or Project.current()
    cargo = CargoProject.get_or_create(project)

    mitmweb_bin = pex_build(
        "mitmweb", requirements=["mitmproxy>=10.0.0,<11.0.0"], console_script="mitmweb"
    ).output_file.map(lambda p: str(p.absolute()))

    task = project.task("cargoAuthProxy", CargoAuthProxyTask, group=CARGO_BUILD_SUPPORT_GROUP_NAME)
    task.registries = Supplier.of_callable(lambda: list(cargo.registries.values()))
    task.mitmweb_bin = mitmweb_bin

    # The auth proxy is required for both building and publishing cargo packages with private cargo project dependencies
    project.group(CARGO_PUBLISH_SUPPORT_GROUP_NAME).add(task)

    # The auth proxy injects values into the cargo config, the cargoSyncConfig.check ensures that it reflects
    # the temporary changes that should be made to the config. The check has to run before the auth proxy,
    # otherwise it is guaranteed to fail.
    task.depends_on(":cargoSyncConfig.check?", mode="order-only")
    return task

cargo_build

cargo_build(
    mode: Literal["debug", "release"],
    incremental: bool | None = None,
    env: dict[str, str] | None = None,
    workspace: bool = False,
    *,
    exclude: Collection[str] = (),
    group: str | None = "build",
    name: str | None = None,
    project: Project | None = None,
    features: list[str] | None = None,
    depends_on: Sequence[Task] = (),
    locked: bool | None = None
) -> CargoBuildTask

Creates a task that runs cargo build.

:param mode: Whether to create a task that runs the debug or release build. :param incremental: Whether to build incrementally or not (with the --incremental= option). If not specified, the option is not specified and the default behaviour is used. :param env: Override variables for the build environment variables. Values in this dictionary override variables in :attr:CargoProject.build_env. :param exclude: List of workspace crates to exclude from the build. :param name: The name of the task. If not specified, defaults to :cargoBuild{mode.capitalised()}. :param features: List of Cargo features to enable in the build.

Source code in kraken/std/cargo/__init__.py
def cargo_build(
    mode: Literal["debug", "release"],
    incremental: bool | None = None,
    env: dict[str, str] | None = None,
    workspace: bool = False,
    *,
    exclude: Collection[str] = (),
    group: str | None = "build",
    name: str | None = None,
    project: Project | None = None,
    features: list[str] | None = None,
    depends_on: Sequence[Task] = (),
    locked: bool | None = None,
) -> CargoBuildTask:
    """Creates a task that runs `cargo build`.

    :param mode: Whether to create a task that runs the debug or release build.
    :param incremental: Whether to build incrementally or not (with the `--incremental=` option). If not
        specified, the option is not specified and the default behaviour is used.
    :param env: Override variables for the build environment variables. Values in this dictionary override
        variables in :attr:`CargoProject.build_env`.
    :param exclude: List of workspace crates to exclude from the build.
    :param name: The name of the task. If not specified, defaults to `:cargoBuild{mode.capitalised()}`.
    :param features: List of Cargo features to enable in the build."""

    assert mode in ("debug", "release"), repr(mode)
    project = project or Project.current()
    cargo = CargoProject.get_or_create(project)

    additional_args = []
    if workspace:
        additional_args.append("--workspace")
    for crate in exclude:
        additional_args.append("--exclude")
        additional_args.append(crate)
    if mode == "release":
        additional_args.append("--release")
    if features:
        additional_args.append("--features")
        # `cargo build` expects features to be comma separated, in one string.
        # For example `cargo build --features abc,efg` instead of `cargo build --features abc efg`.
        additional_args.append(",".join(features))

    task = project.task(
        f"cargoBuild{mode.capitalize()}" if name is None else name,
        CargoBuildTask,
        group=group,
    )
    task.incremental = incremental
    task.target = mode
    task.additional_args = additional_args
    task.locked = locked
    task.env = Supplier.of_callable(lambda: {**cargo.build_env, **(env or {})})

    task.depends_on(f":{CARGO_BUILD_SUPPORT_GROUP_NAME}?")

    for dependency in depends_on:
        task.depends_on(dependency)

    return task

cargo_check_toolchain_version

cargo_check_toolchain_version(
    minimal_version: str, *, project: Project | None = None
) -> CargoCheckToolchainVersionTask

Creates a task that checks that cargo is at least at version minimal_version

Source code in kraken/std/cargo/__init__.py
def cargo_check_toolchain_version(
    minimal_version: str, *, project: Project | None = None
) -> CargoCheckToolchainVersionTask:
    """Creates a task that checks that cargo is at least at version `minimal_version`"""

    project = project or Project.current()
    task = project.task(
        f"cargoCheckVersion/{minimal_version}",
        CargoCheckToolchainVersionTask,
        group=CARGO_BUILD_SUPPORT_GROUP_NAME,
    )
    task.minimal_version = minimal_version
    return task

cargo_deny

cargo_deny(
    *,
    project: Project | None = None,
    checks: Sequence[str] | Supplier[Sequence[str]] = (),
    config_file: Path | Supplier[Path] | None = None,
    error_message: str | None = None
) -> CargoDenyTask

Adds a task running cargo-deny for cargo projects. This checks different rules on dependencies, such as scanning for vulnerabilities, unwanted licences, or custom bans.

:param checks: The list of cargo-deny checks to run, as defined in https://embarkstudios.github.io/cargo-deny/checks/index.html. If not provided, defaults to all of them. :param config_file: The configuration file as defined in https://embarkstudios.github.io/cargo-deny/checks/cfg.html If not provided defaults to cargo-deny default location. :param error_message: The error message to show if the task fails.

Source code in kraken/std/cargo/__init__.py
def cargo_deny(
    *,
    project: Project | None = None,
    checks: Sequence[str] | Supplier[Sequence[str]] = (),
    config_file: Path | Supplier[Path] | None = None,
    error_message: str | None = None,
) -> CargoDenyTask:
    """Adds a task running cargo-deny for cargo projects. This checks different rules on dependencies, such as scanning
    for vulnerabilities, unwanted licences, or custom bans.

    :param checks: The list of cargo-deny checks to run, as defined in
    https://embarkstudios.github.io/cargo-deny/checks/index.html. If not provided, defaults to all of them.
    :param config_file: The configuration file as defined in https://embarkstudios.github.io/cargo-deny/checks/cfg.html
    If not provided defaults to cargo-deny default location.
    :param error_message: The error message to show if the task fails.
    """

    project = project or Project.current()
    task = project.task("cargoDeny", CargoDenyTask)
    task.checks = checks
    task.config_file = config_file
    task.error_message = error_message
    return task

cargo_login

cargo_login(
    *, project: Project | None = None
) -> CargoLoginTask

Creates a task that will be added to the build and publish support groups to login in the Cargo registries

Source code in kraken/std/cargo/__init__.py
def cargo_login(
    *,
    project: Project | None = None,
) -> CargoLoginTask:
    """Creates a task that will be added to the build and publish support groups
    to login in the Cargo registries"""

    project = project or Project.current()
    cargo = CargoProject.get_or_create(project)
    task = project.task("cargoLogin", CargoLoginTask, group="apply")
    task.registries = Supplier.of_callable(lambda: list(cargo.registries.values()))
    project.group(CARGO_BUILD_SUPPORT_GROUP_NAME).add(task)
    project.group(CARGO_PUBLISH_SUPPORT_GROUP_NAME).add(task)

    # We need to have the credentials providers set up by cargoSyncConfig
    task.depends_on(":cargoSyncConfig")
    return task

cargo_publish

cargo_publish(
    registry: str,
    incremental: bool | None = None,
    env: dict[str, str] | None = None,
    *,
    verify: bool = True,
    retry_attempts: int = 0,
    additional_args: Sequence[str] = (),
    name: str = "cargoPublish",
    package_name: str | None = None,
    project: Project | None = None,
    version: str | None = None,
    cargo_toml_file: Path = Path("Cargo.toml"),
    allow_overwrite: bool = False
) -> CargoPublishTask

Creates a task that publishes the create to the specified registry.

:param registry: The alias of the registry to publish to. :param incremental: Incremental builds on or off. :param env: Environment variables (overrides :attr:CargoProject.build_env). :param verify: If this is enabled, the cargo publish task will build the crate after it is packaged. Disabling this just packages the crate and publishes it. Only if this is enabled will the created task depend on the auth proxy. :param retry_attempts: Retry the publish task if it fails, up to a maximum number of attempts. Sometimes cargo publishes can be flakey depending on the destination. Defaults to 0 retries.

Source code in kraken/std/cargo/__init__.py
def cargo_publish(
    registry: str,
    incremental: bool | None = None,
    env: dict[str, str] | None = None,
    *,
    verify: bool = True,
    retry_attempts: int = 0,
    additional_args: Sequence[str] = (),
    name: str = "cargoPublish",
    package_name: str | None = None,
    project: Project | None = None,
    version: str | None = None,
    cargo_toml_file: Path = Path("Cargo.toml"),
    allow_overwrite: bool = False,
) -> CargoPublishTask:
    """Creates a task that publishes the create to the specified *registry*.

    :param registry: The alias of the registry to publish to.
    :param incremental: Incremental builds on or off.
    :param env: Environment variables (overrides :attr:`CargoProject.build_env`).
    :param verify: If this is enabled, the `cargo publish` task will build the crate after it is packaged.
        Disabling this just packages the crate and publishes it. Only if this is enabled will the created
        task depend on the auth proxy.
    :param retry_attempts: Retry the publish task if it fails, up to a maximum number of attempts. Sometimes
        cargo publishes can be flakey depending on the destination. Defaults to 0 retries.
    """

    project = project or Project.current()
    cargo = CargoProject.get_or_create(project)

    task = project.task(
        f"{name}/{package_name}" if package_name is not None else name,
        CargoPublishTask,
        group="publish",
    )
    task.registry = Supplier.of_callable(lambda: cargo.registries[registry])
    task.additional_args = list(additional_args)
    task.allow_dirty = True
    task.incremental = incremental
    task.verify = verify
    task.retry_attempts = retry_attempts
    task.package_name = package_name
    task.env = Supplier.of_callable(lambda: {**cargo.build_env, **(env or {})})
    task.version = version
    task.cargo_toml_file = cargo_toml_file
    task.depends_on(f":{CARGO_PUBLISH_SUPPORT_GROUP_NAME}?")
    task.allow_overwrite = allow_overwrite
    return task

cargo_registry

cargo_registry(
    alias: str,
    index: str,
    read_credentials: tuple[str, str] | None = None,
    publish_token: str | None = None,
    project: Project | None = None,
) -> None

Adds a Cargo registry to the project. The registry must be synced to disk into the .cargo/config.toml configuration file. You need to make sure to add a sync task using :func:cargo_sync_config if you manage your Cargo registries with this function. Can be called multiple times.

:param alias: The registry alias. :param index: The registry index URL (usually an HTTPS URL that ends in .git). :param read_credentials: Username/password to read from the registry (only for private registries). :param publish_token: The token to use with cargo publish.

Note

It appears that for Artifactory, the publish_token must be of the form Bearer <TOKEN> where the token is a token generated manually via the JFrog UI. It cannot be an API key.

Source code in kraken/std/cargo/__init__.py
def cargo_registry(
    alias: str,
    index: str,
    read_credentials: tuple[str, str] | None = None,
    publish_token: str | None = None,
    project: Project | None = None,
) -> None:
    """Adds a Cargo registry to the project. The registry must be synced to disk into the `.cargo/config.toml`
    configuration file. You need to make sure to add a sync task using :func:`cargo_sync_config` if you manage
    your Cargo registries with this function. Can be called multiple times.

    :param alias: The registry alias.
    :param index: The registry index URL (usually an HTTPS URL that ends in `.git`).
    :param read_credentials: Username/password to read from the registry (only for private registries).
    :param publish_token: The token to use with `cargo publish`.

    !!! note Artifactory

        It appears that for Artifactory, the *publish_token* must be of the form `Bearer <TOKEN>` where the token
        is a token generated manually via the JFrog UI. It cannot be an API key.
    """

    cargo = CargoProject.get_or_create(project)
    cargo.add_registry(alias, index, read_credentials, publish_token)

cargo_sync_config

cargo_sync_config(
    *, replace: bool = False, project: Project | None = None
) -> CargoSyncConfigTask

Creates a task that the :func:cargo_build and :func:cargo_publish tasks will depend on to synchronize the .cargo/config.toml configuration file, ensuring that the Cargo registries configured with the :func:cargo_registry function are present and up to date.

Source code in kraken/std/cargo/__init__.py
def cargo_sync_config(
    *,
    replace: bool = False,
    project: Project | None = None,
) -> CargoSyncConfigTask:
    """Creates a task that the :func:`cargo_build` and :func:`cargo_publish` tasks will depend on to synchronize
    the `.cargo/config.toml` configuration file, ensuring that the Cargo registries configured with the
    :func:`cargo_registry` function are present and up to date."""

    project = project or Project.current()
    cargo = CargoProject.get_or_create(project)
    task = project.task("cargoSyncConfig", CargoSyncConfigTask, group="apply")
    task.registries = Supplier.of_callable(lambda: list(cargo.registries.values()))
    task.replace = replace
    check_task = task.create_check()
    project.group(CARGO_BUILD_SUPPORT_GROUP_NAME).add(check_task)
    return task

cargo_test

cargo_test(
    incremental: bool | None = None,
    env: dict[str, str] | None = None,
    *,
    group: str | None = "test",
    project: Project | None = None,
    features: list[str] | None = None,
    depends_on: Sequence[Task] = (),
    workspace: bool | None = None
) -> CargoTestTask

Creates a task that runs cargo test.

:param incremental: Whether to build the tests incrementally or not (with the --incremental= option). If not specified, the option is not specified and the default behaviour is used. :param env: Override variables for the build environment variables. Values in this dictionary override variables in :attr:CargoProject.build_env. :param features: List of Cargo features to enable in the build. :param workspace: Run tests in all workspace crates (by default only the default members are selected).

Source code in kraken/std/cargo/__init__.py
def cargo_test(
    incremental: bool | None = None,
    env: dict[str, str] | None = None,
    *,
    group: str | None = "test",
    project: Project | None = None,
    features: list[str] | None = None,
    depends_on: Sequence[Task] = (),
    workspace: bool | None = None,
) -> CargoTestTask:
    """Creates a task that runs `cargo test`.

    :param incremental: Whether to build the tests incrementally or not (with the `--incremental=` option). If not
        specified, the option is not specified and the default behaviour is used.
    :param env: Override variables for the build environment variables. Values in this dictionary override
        variables in :attr:`CargoProject.build_env`.
    :param features: List of Cargo features to enable in the build.
    :param workspace: Run tests in all workspace crates (by default only the default members are selected)."""

    project = project or Project.current()
    cargo = CargoProject.get_or_create(project)

    additional_args = []
    if features:
        additional_args.append("--features")
        # `cargo build` expects features to be comma separated, in one string.
        # for example `cargo build --features abc,efg` instead of `cargo build --features abc efg`.
        additional_args.append(",".join(features))
    if workspace:
        additional_args.append("--workspace")

    task = project.task("cargoTest", CargoTestTask, group=group)
    task.incremental = incremental
    task.additional_args = additional_args
    task.env = Supplier.of_callable(lambda: {**cargo.build_env, **(env or {})})
    task.depends_on(f":{CARGO_BUILD_SUPPORT_GROUP_NAME}?")

    for dependency in depends_on:
        task.depends_on(dependency)

    return task

rustup_target_add

rustup_target_add(
    target: str,
    *,
    group: str | None = None,
    project: Project | None = None
) -> RustupTargetAddTask

Creates a task that installs a given target for Cargo

Source code in kraken/std/cargo/__init__.py
def rustup_target_add(target: str, *, group: str | None = None, project: Project | None = None) -> RustupTargetAddTask:
    """Creates a task that installs a given target for Cargo"""

    project = project or Project.current()
    task = project.task(f"rustupTargetAdd/{target}", RustupTargetAddTask, group=group)
    task.target = target
    return task