Skip to content

kraken.core

kraken.core

Address

An address is an immutable parsed representation of a task or project reference, comparable to a filesystem path. The separator between elements in the address path is a colon (:). Similarly, a dot (.) refers to the current project, and a double dot (..) refers to the parent project.

The elements of an address can only contain characters matching the :data:Address.Element.VALIDATION_REGEX.

Asterisks are accepted to permit glob pattern matching on the addressable space, where one asterisk (*) is intended to match only within the same hierarchical level (aka. wildcard), wheras a double asterisk (**) is used to match any number of levels (aka. recursive wildcard). A trailing question mark on each element is allowed to permit that address resolution fails at that element.

>>> Address(":a?:b").elements
[Address.Element(value='a', fallible=True), Address.Element(value='b', fallible=False)]
>>> Address("a:..:b").normalize()
Address('b')
Source code in kraken/core/address/_address.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
class Address(metaclass=AddressMeta):
    """
    An address is an immutable parsed representation of a task or project reference, comparable to a filesystem path.
    The separator between elements in the address path is a colon (`:`). Similarly, a dot (`.`) refers to the current
    project, and a double dot (`..`) refers to the parent project.

    The elements of an address can only contain characters matching the :data:`Address.Element.VALIDATION_REGEX`.

    Asterisks are accepted to permit glob pattern matching on the addressable space, where one asterisk (`*`) is
    intended to match only within the same hierarchical level (aka. wildcard), wheras a double asterisk (`**`) is
    used to match any number of levels (aka. recursive wildcard). A trailing question mark on each element is allowed
    to permit that address resolution fails at that element.

        >>> Address(":a?:b").elements
        [Address.Element(value='a', fallible=True), Address.Element(value='b', fallible=False)]
        >>> Address("a:..:b").normalize()
        Address('b')
    """

    SEPARATOR: ClassVar[str] = ":"
    ROOT: ClassVar[Address]
    CURRENT: ClassVar[Address]
    PARENT: ClassVar[Address]
    EMPTY: ClassVar[Address]
    WILDCARD: ClassVar[Address]
    RECURSIVE_WILDCARD: ClassVar[Address]

    _is_absolute: bool
    _is_container: bool
    _elements: list[Element]

    @staticmethod
    def _parse(value: str | Sequence[str]) -> tuple[bool, bool, list[Element]]:
        """Parses a list or strings or lists into (is_absolute, is_container, elements)."""

        # Convert the accepted types of value to a list of strings representing the elements of the address.
        is_absolute = False
        element_strings: list[str]
        if isinstance(value, str):
            if not value:
                element_strings = []
            elif value == Address.SEPARATOR:
                element_strings = [""]
            else:
                element_strings = value.split(Address.SEPARATOR)
        elif isinstance(value, Sequence):
            element_strings = list(value)
        else:
            assert False, type(value)

        # The first element may be an empty string to denote a "root" address.
        is_absolute = False
        if element_strings and not element_strings[0]:
            is_absolute = True
            element_strings.pop(0)

        # The last element may be an empty string to denote a "folder" address.
        is_container = False
        if element_strings and not element_strings[-1]:
            is_container = True
            element_strings.pop(-1)

        # Also, `:` is both absolute and a container
        if is_absolute and len(element_strings) == 0:
            is_container = True

        try:
            elements = [Address.Element.of(x) for x in element_strings]
        except ValueError as exc:
            raise ValueError(f"invalid Address: {Address.SEPARATOR.join(element_strings)!r} (reason: {exc})")

        if len(elements) == 0:
            # For some pathological addresses that are semantically equivalent to `:` (e.g. `:a:..`),
            # it is hard for the caller to easily detect this is a container.
            # Thus, we'll ensure it ourselves here.

            # The root object (":") is both absolute and a container
            # The empty address ("") is neither absolute nor a container
            if is_container or is_absolute:
                is_container = True
                is_absolute = True

        return is_absolute, is_container, elements

    @classmethod
    def create(cls, is_absolute: bool, is_container: bool, elements: list[Element]) -> Address:
        """
        Create a new address object.

        :param is_absolute: Whether the address is absolute (starts with `:`)
        :param elements: The address elements.

            >>> Address.create(True, False, [Address.Element("a", fallible=True), Address.Element("b")])
            Address(':a?:b')
        """

        obj = object.__new__(cls)
        obj._is_absolute = is_absolute
        obj._is_container = is_container
        obj._elements = elements
        obj._hash_key = None

        if len(elements) == 0:
            # For some pathological addresses that are semantically equivalent to `:` (e.g. `:a:..`),
            # it is hard for the caller to easily detect this is a container.
            # Thus, we'll ensure it ourselves here.

            # The root object (":") is both absolute and a container
            # The empty address ("") is neither absolute nor a container
            if obj._is_container or obj._is_absolute:
                obj._is_container = True
                obj._is_absolute = True

        return obj

    def __init__(self, value: str | Sequence[str] | Address) -> None:
        """Create a new Address from a string, sequence of strings or Address.

            >>> Address(":a:b")
            Address(':a:b')
            >>> Address(":a:b".split(":"))
            Address(':a:b')
            >>> Address(["", "a", "b"])
            Address(':a:b')

        Address objects are immutable and are not copied by the constructor (this is implemented via the
        meta class).

            >>> a = Address(':a')
            >>> a is Address(a)
            True

        Use `Address.create()` to construct a new address object from a list of `Address.Element`.
        """

        assert not isinstance(value, Address)
        self._is_absolute, self._is_container, self._elements = self._parse(value)
        self._hash_key: int | None = None

    def __getstate__(self) -> tuple[str]:
        return (str(self),)

    def __setstate__(self, state: tuple[str]) -> None:
        Address.__init__(self, state[0])

    def __str__(self) -> str:
        """
        Returns the string format of the address. Use the `Address` constructor to parse it back into an address.

            >>> str(Address(":a:b"))
            ':a:b'
        """

        value = Address.SEPARATOR.join(str(x) for x in self._elements)
        if self._is_absolute:
            value = f"{Address.SEPARATOR}{value}"
        if self._is_container and not self.is_root():
            # The address must end with a separator...unless it is the root address `:`.
            value = f"{value}{Address.SEPARATOR}"
        return value

    def __repr__(self) -> str:
        """
        Example:

            >>> repr(Address(":a?:b*"))
            "Address(':a?:b*')"
        """

        return f"Address({str(self)!r})"

    def __hash__(self) -> int:
        """
        Returns a stable hash key of the address.
        """

        if self._hash_key is None:
            self._hash_key = hash((Address, self._is_absolute, tuple(self._elements)))
        return self._hash_key

    def __eq__(self, other: object) -> bool:
        if isinstance(other, Address):
            return (self._is_absolute, self._is_container, self._elements) == (
                other._is_absolute,
                other._is_container,
                other._elements,
            )
        return False

    def __len__(self) -> int:
        """
        Returns the number of elements in the address.

        >>> len(Address(":a:b:c"))
        3
        >>> len(Address("a:b:c"))
        3
        """

        return len(self._elements)

    def __getitem__(self, element_index: int) -> Address.Element:
        """
        Returns the _nth_ element in the address.

            >>> Address(":a:b")[1]
            Address.Element(value='b', fallible=False)
        """

        return self._elements[element_index]

    def is_empty(self) -> bool:
        """
        Returns `True` if the address is empty. The empty state is the only invalid state of an address.

            >>> Address("").is_empty()
            True
            >>> Address("a").is_empty()
            False
            >>> bool(Address(""))
            False
            >>> bool(Address("a"))
            True
            >>> Address.EMPTY == Address("")
            True
        """

        return not self._is_absolute and not self._elements

    def __bool__(self) -> bool:
        """
        Returns False if the address is empty, otherwise True.

            >>> bool(Address(":a:b"))
            True
            >>> bool(Address(""))
            False
        """

        return not self.is_empty()

    def is_absolute(self) -> bool:
        """
        Returns `True` if the address is absolute.

            >>> Address(":a").is_absolute()
            True
            >>> Address("a").is_absolute()
            False
            >>> Address("").is_absolute()
            False
        """

        return self._is_absolute

    def is_root(self) -> bool:
        """
        Returns `True` if the address is the root address (`:`).

            >>> Address(":").is_root()
            True
            >>> Address(":a").is_root()
            False
            >>> Address("a").is_root()
            False
            >>> Address("").is_root()
            False
        """
        return self._is_absolute and not self._elements

    def is_concrete(self) -> bool:
        """
        Returns `True` if this is a concrete address. A concrete address is one that is absolute and
        has no globbing elements (see #Address.Element.is_globbing()).

            >>> Address(":a:b").is_concrete()
            True
            >>> Address("a:b").is_concrete()
            False
            >>> Address(":*:b").is_concrete()
            False
            >>> Address(":a:b?").is_concrete()
            False
        """

        return self._is_absolute and all(x.is_concrete() for x in self._elements)

    def is_container(self) -> bool:
        """
        Returns `True` if this is a container address, that is, if it ends with a separator.

            >>> Address(":a:b").is_container()
            False
            >>> Address(":a:b:").is_container()
            True
        """

        return self._is_container

    def normalize(self, *, keep_container: bool = False) -> Address:
        """
        Normalize the address, removing any superfluous elements (`.` for current, `..` for parent). A normalized
        is not a container address. Use #set_container() after #normalize() to make it a container address, or pass
        `True` to the *keep_container* argument to keep the container state.

            >>> Address("").normalize()
            Address('.')
            >>> Address("").normalize(keep_container=True)
            Address('.')
            >>> Address(".").normalize()
            Address('.')
            >>> Address(".").normalize(keep_container=True)
            Address('.')
            >>> Address(".:").normalize()
            Address('.')
            >>> Address(".:").normalize(keep_container=True)
            Address('.:')
            >>> Address(":a:.:b").normalize()
            Address(':a:b')
            >>> Address(":a:.:b").normalize(keep_container=True)
            Address(':a:b')
            >>> Address(":a:..:b").normalize()
            Address(':b')
            >>> Address("..:.:b").normalize()
            Address('..:b')
            >>> Address("..:.:b").normalize(keep_container=True)
            Address('..:b')
            >>> Address("a:b:").normalize()
            Address('a:b')
            >>> Address("a:b:").normalize(keep_container=True)
            Address('a:b:')
            >>> Address("a:b:.").normalize(keep_container=True)
            Address('a:b')
        """

        elements: list[Address.Element] = []
        stack = list(reversed(self._elements))
        while stack:
            current = stack.pop()
            if current.is_parent() and elements:
                elements.pop()
            elif current.is_current():
                pass
            else:
                elements.append(current)
        if not self._is_absolute and not elements:
            elements = [Address.Element(Address.Element.CURRENT, False)]
        return Address.create(self._is_absolute, self.is_container() and keep_container, elements)

    def concat(self, address: str | Address) -> Address:
        """
        Concatenate two addresses. If *address* is absolute, return *address*.

            >>> Address(":a").concat("b:c")
            Address(':a:b:c')
            >>> Address(":a").concat(Address(":b"))
            Address(':b')
            >>> Address(":a").concat(Address("."))
            Address(':a:.')
        """

        if isinstance(address, str):
            address = Address(address)
        if address._is_absolute:
            return address
        return Address.create(self._is_absolute, address._is_container, self._elements + address._elements)

    def append(self, element: str | Element) -> Address:
        """
        Return a new address with one element appended.

            >>> Address(":").append("a")
            Address(':a')
            >>> Address(":a:.").append(".")
            Address(':a:.:.')
        """

        if isinstance(element, str):
            element = Address.Element.of(element)
        assert isinstance(element, Address.Element), type(element)
        return Address.create(self._is_absolute, False, self._elements + [element])

    def set_container(self, is_container: bool) -> Address:
        """
        Return a copy of this address with the container flag set to the given value. The container flag indicates
        whether the string representation of the address is followed by a colon (`:`). This status is irrelevant
        for the root address, as it is always a container.

            >>> Address(":a").set_container(True)
            Address(':a:')
            >>> Address(":a:").set_container(False)
            Address(':a')

        Attempting to set the container status to `False` for the root address will raise a #ValueError. Attempting
        to set any container status to the empty address will also raise a #ValueError.

            >>> Address(":").set_container(True)
            Address(':')
            >>> Address(":").set_container(False)
            Traceback (most recent call last):
            ValueError: Cannot set container status to False for root address
            >>> Address("").set_container(True)
            Traceback (most recent call last):
            ValueError: Cannot set container status for empty address
        """

        if self.is_root():
            if not is_container:
                raise ValueError("Cannot set container status to False for root address")
            return self
        if self.is_empty():
            raise ValueError("Cannot set container status for empty address")
        return Address.create(self._is_absolute, is_container, self._elements)

    @property
    def name(self) -> str:
        """
        Returns the value of the last element in the Address. If the address has no elements, which is
        the case for the root address or an empty address, a #ValueError will be raised.

            >>> Address(":a:b").name
            'b'
            >>> Address("a:b?").name
            'b'
            >>> Address(":").name
            Traceback (most recent call last):
            ValueError: Address(':') has no elements, and thus no name
            >>> Address("").name
            Traceback (most recent call last):
            ValueError: Address('') has no elements, and thus no name
        """

        if not self._elements:
            raise ValueError(f"{self!r} has no elements, and thus no name")
        return self._elements[-1].value

    @property
    def elements(self) -> list[Element]:
        """
        Returns the individual elements of the address. Note that you should also check #is_absolute() to
        understand whether the elements are to be interpreted relative or absolute.

            >>> Address(":").elements
            []
            >>> Address(":a:b").elements
            [Address.Element(value='a', fallible=False), Address.Element(value='b', fallible=False)]
            >>> Address(":a:b").elements == Address("a:b").elements
            True
        """

        return self._elements

    @property
    def parent(self) -> Address:
        """
        Returns the parent address.

            >>> Address(":a:b").parent
            Address(':a')
            >>> Address(":a").parent
            Address(':')
            >>> Address("a").parent
            Address('.')
            >>> Address(".").parent
            Address('..')
            >>> Address("..").parent
            Address('..:..')

        The container status of the address is perserved.

            >>> Address(":a:b").parent
            Address(':a')
            >>> Address(":a:b:").parent
            Address(':a:')

        Use the `set_container()` method to change the container status.

        The root and empty address have no parent.

            >>> Address(":").parent
            Traceback (most recent call last):
            ValueError: Root address has no parent
            >>> Address("").parent
            Traceback (most recent call last):
            ValueError: Empty address has no parent
        """

        if self._is_absolute and not self._elements:
            raise ValueError("Root address has no parent")
        if not self._is_absolute and not self._elements:
            raise ValueError("Empty address has no parent")

        # When we currently have a relative address '.' we want to return '..'.
        if not self._is_absolute and self._elements and self._elements[-1].is_current():
            return Address.create(False, self._is_container, [Address.Element(Address.Element.PARENT, False)])

        # When we currently have a relative address '..' we want to return '..:..'
        if not self._is_absolute and self._elements and self._elements[-1].is_parent():
            return Address.create(
                False, self._is_container, self._elements + [Address.Element(Address.Element.PARENT, False)]
            )

        if not self._is_absolute and len(self._elements) == 1:
            return Address.CURRENT

        # Strip the last element.
        assert self._elements, self
        return Address.create(self._is_absolute, self._is_container, self._elements[:-1])

    Element: ClassVar[TypeAlias] = Element
elements property
elements: list[Element]

Returns the individual elements of the address. Note that you should also check #is_absolute() to understand whether the elements are to be interpreted relative or absolute.

>>> Address(":").elements
[]
>>> Address(":a:b").elements
[Address.Element(value='a', fallible=False), Address.Element(value='b', fallible=False)]
>>> Address(":a:b").elements == Address("a:b").elements
True
name property
name: str

Returns the value of the last element in the Address. If the address has no elements, which is the case for the root address or an empty address, a #ValueError will be raised.

>>> Address(":a:b").name
'b'
>>> Address("a:b?").name
'b'
>>> Address(":").name
Traceback (most recent call last):
ValueError: Address(':') has no elements, and thus no name
>>> Address("").name
Traceback (most recent call last):
ValueError: Address('') has no elements, and thus no name
parent property
parent: Address

Returns the parent address.

>>> Address(":a:b").parent
Address(':a')
>>> Address(":a").parent
Address(':')
>>> Address("a").parent
Address('.')
>>> Address(".").parent
Address('..')
>>> Address("..").parent
Address('..:..')

The container status of the address is perserved.

>>> Address(":a:b").parent
Address(':a')
>>> Address(":a:b:").parent
Address(':a:')

Use the set_container() method to change the container status.

The root and empty address have no parent.

>>> Address(":").parent
Traceback (most recent call last):
ValueError: Root address has no parent
>>> Address("").parent
Traceback (most recent call last):
ValueError: Empty address has no parent
__bool__
__bool__() -> bool

Returns False if the address is empty, otherwise True.

>>> bool(Address(":a:b"))
True
>>> bool(Address(""))
False
Source code in kraken/core/address/_address.py
def __bool__(self) -> bool:
    """
    Returns False if the address is empty, otherwise True.

        >>> bool(Address(":a:b"))
        True
        >>> bool(Address(""))
        False
    """

    return not self.is_empty()
__getitem__
__getitem__(element_index: int) -> Element

Returns the nth element in the address.

>>> Address(":a:b")[1]
Address.Element(value='b', fallible=False)
Source code in kraken/core/address/_address.py
def __getitem__(self, element_index: int) -> Address.Element:
    """
    Returns the _nth_ element in the address.

        >>> Address(":a:b")[1]
        Address.Element(value='b', fallible=False)
    """

    return self._elements[element_index]
__hash__
__hash__() -> int

Returns a stable hash key of the address.

Source code in kraken/core/address/_address.py
def __hash__(self) -> int:
    """
    Returns a stable hash key of the address.
    """

    if self._hash_key is None:
        self._hash_key = hash((Address, self._is_absolute, tuple(self._elements)))
    return self._hash_key
__init__
__init__(value: str | Sequence[str] | Address) -> None

Create a new Address from a string, sequence of strings or Address.

>>> Address(":a:b")
Address(':a:b')
>>> Address(":a:b".split(":"))
Address(':a:b')
>>> Address(["", "a", "b"])
Address(':a:b')

Address objects are immutable and are not copied by the constructor (this is implemented via the meta class).

>>> a = Address(':a')
>>> a is Address(a)
True

Use Address.create() to construct a new address object from a list of Address.Element.

Source code in kraken/core/address/_address.py
def __init__(self, value: str | Sequence[str] | Address) -> None:
    """Create a new Address from a string, sequence of strings or Address.

        >>> Address(":a:b")
        Address(':a:b')
        >>> Address(":a:b".split(":"))
        Address(':a:b')
        >>> Address(["", "a", "b"])
        Address(':a:b')

    Address objects are immutable and are not copied by the constructor (this is implemented via the
    meta class).

        >>> a = Address(':a')
        >>> a is Address(a)
        True

    Use `Address.create()` to construct a new address object from a list of `Address.Element`.
    """

    assert not isinstance(value, Address)
    self._is_absolute, self._is_container, self._elements = self._parse(value)
    self._hash_key: int | None = None
__len__
__len__() -> int

Returns the number of elements in the address.

len(Address(":a:b:c")) 3 len(Address("a:b:c")) 3

Source code in kraken/core/address/_address.py
def __len__(self) -> int:
    """
    Returns the number of elements in the address.

    >>> len(Address(":a:b:c"))
    3
    >>> len(Address("a:b:c"))
    3
    """

    return len(self._elements)
__repr__
__repr__() -> str

Example:

>>> repr(Address(":a?:b*"))
"Address(':a?:b*')"
Source code in kraken/core/address/_address.py
def __repr__(self) -> str:
    """
    Example:

        >>> repr(Address(":a?:b*"))
        "Address(':a?:b*')"
    """

    return f"Address({str(self)!r})"
__str__
__str__() -> str

Returns the string format of the address. Use the Address constructor to parse it back into an address.

>>> str(Address(":a:b"))
':a:b'
Source code in kraken/core/address/_address.py
def __str__(self) -> str:
    """
    Returns the string format of the address. Use the `Address` constructor to parse it back into an address.

        >>> str(Address(":a:b"))
        ':a:b'
    """

    value = Address.SEPARATOR.join(str(x) for x in self._elements)
    if self._is_absolute:
        value = f"{Address.SEPARATOR}{value}"
    if self._is_container and not self.is_root():
        # The address must end with a separator...unless it is the root address `:`.
        value = f"{value}{Address.SEPARATOR}"
    return value
append
append(element: str | Element) -> Address

Return a new address with one element appended.

>>> Address(":").append("a")
Address(':a')
>>> Address(":a:.").append(".")
Address(':a:.:.')
Source code in kraken/core/address/_address.py
def append(self, element: str | Element) -> Address:
    """
    Return a new address with one element appended.

        >>> Address(":").append("a")
        Address(':a')
        >>> Address(":a:.").append(".")
        Address(':a:.:.')
    """

    if isinstance(element, str):
        element = Address.Element.of(element)
    assert isinstance(element, Address.Element), type(element)
    return Address.create(self._is_absolute, False, self._elements + [element])
concat
concat(address: str | Address) -> Address

Concatenate two addresses. If address is absolute, return address.

>>> Address(":a").concat("b:c")
Address(':a:b:c')
>>> Address(":a").concat(Address(":b"))
Address(':b')
>>> Address(":a").concat(Address("."))
Address(':a:.')
Source code in kraken/core/address/_address.py
def concat(self, address: str | Address) -> Address:
    """
    Concatenate two addresses. If *address* is absolute, return *address*.

        >>> Address(":a").concat("b:c")
        Address(':a:b:c')
        >>> Address(":a").concat(Address(":b"))
        Address(':b')
        >>> Address(":a").concat(Address("."))
        Address(':a:.')
    """

    if isinstance(address, str):
        address = Address(address)
    if address._is_absolute:
        return address
    return Address.create(self._is_absolute, address._is_container, self._elements + address._elements)
create classmethod
create(
    is_absolute: bool,
    is_container: bool,
    elements: list[Element],
) -> Address

Create a new address object.

:param is_absolute: Whether the address is absolute (starts with :) :param elements: The address elements.

>>> Address.create(True, False, [Address.Element("a", fallible=True), Address.Element("b")])
Address(':a?:b')
Source code in kraken/core/address/_address.py
@classmethod
def create(cls, is_absolute: bool, is_container: bool, elements: list[Element]) -> Address:
    """
    Create a new address object.

    :param is_absolute: Whether the address is absolute (starts with `:`)
    :param elements: The address elements.

        >>> Address.create(True, False, [Address.Element("a", fallible=True), Address.Element("b")])
        Address(':a?:b')
    """

    obj = object.__new__(cls)
    obj._is_absolute = is_absolute
    obj._is_container = is_container
    obj._elements = elements
    obj._hash_key = None

    if len(elements) == 0:
        # For some pathological addresses that are semantically equivalent to `:` (e.g. `:a:..`),
        # it is hard for the caller to easily detect this is a container.
        # Thus, we'll ensure it ourselves here.

        # The root object (":") is both absolute and a container
        # The empty address ("") is neither absolute nor a container
        if obj._is_container or obj._is_absolute:
            obj._is_container = True
            obj._is_absolute = True

    return obj
is_absolute
is_absolute() -> bool

Returns True if the address is absolute.

>>> Address(":a").is_absolute()
True
>>> Address("a").is_absolute()
False
>>> Address("").is_absolute()
False
Source code in kraken/core/address/_address.py
def is_absolute(self) -> bool:
    """
    Returns `True` if the address is absolute.

        >>> Address(":a").is_absolute()
        True
        >>> Address("a").is_absolute()
        False
        >>> Address("").is_absolute()
        False
    """

    return self._is_absolute
is_concrete
is_concrete() -> bool

Returns True if this is a concrete address. A concrete address is one that is absolute and has no globbing elements (see #Address.Element.is_globbing()).

>>> Address(":a:b").is_concrete()
True
>>> Address("a:b").is_concrete()
False
>>> Address(":*:b").is_concrete()
False
>>> Address(":a:b?").is_concrete()
False
Source code in kraken/core/address/_address.py
def is_concrete(self) -> bool:
    """
    Returns `True` if this is a concrete address. A concrete address is one that is absolute and
    has no globbing elements (see #Address.Element.is_globbing()).

        >>> Address(":a:b").is_concrete()
        True
        >>> Address("a:b").is_concrete()
        False
        >>> Address(":*:b").is_concrete()
        False
        >>> Address(":a:b?").is_concrete()
        False
    """

    return self._is_absolute and all(x.is_concrete() for x in self._elements)
is_container
is_container() -> bool

Returns True if this is a container address, that is, if it ends with a separator.

>>> Address(":a:b").is_container()
False
>>> Address(":a:b:").is_container()
True
Source code in kraken/core/address/_address.py
def is_container(self) -> bool:
    """
    Returns `True` if this is a container address, that is, if it ends with a separator.

        >>> Address(":a:b").is_container()
        False
        >>> Address(":a:b:").is_container()
        True
    """

    return self._is_container
is_empty
is_empty() -> bool

Returns True if the address is empty. The empty state is the only invalid state of an address.

>>> Address("").is_empty()
True
>>> Address("a").is_empty()
False
>>> bool(Address(""))
False
>>> bool(Address("a"))
True
>>> Address.EMPTY == Address("")
True
Source code in kraken/core/address/_address.py
def is_empty(self) -> bool:
    """
    Returns `True` if the address is empty. The empty state is the only invalid state of an address.

        >>> Address("").is_empty()
        True
        >>> Address("a").is_empty()
        False
        >>> bool(Address(""))
        False
        >>> bool(Address("a"))
        True
        >>> Address.EMPTY == Address("")
        True
    """

    return not self._is_absolute and not self._elements
is_root
is_root() -> bool

Returns True if the address is the root address (:).

>>> Address(":").is_root()
True
>>> Address(":a").is_root()
False
>>> Address("a").is_root()
False
>>> Address("").is_root()
False
Source code in kraken/core/address/_address.py
def is_root(self) -> bool:
    """
    Returns `True` if the address is the root address (`:`).

        >>> Address(":").is_root()
        True
        >>> Address(":a").is_root()
        False
        >>> Address("a").is_root()
        False
        >>> Address("").is_root()
        False
    """
    return self._is_absolute and not self._elements
normalize
normalize(*, keep_container: bool = False) -> Address

Normalize the address, removing any superfluous elements (. for current, .. for parent). A normalized is not a container address. Use #set_container() after #normalize() to make it a container address, or pass True to the keep_container argument to keep the container state.

>>> Address("").normalize()
Address('.')
>>> Address("").normalize(keep_container=True)
Address('.')
>>> Address(".").normalize()
Address('.')
>>> Address(".").normalize(keep_container=True)
Address('.')
>>> Address(".:").normalize()
Address('.')
>>> Address(".:").normalize(keep_container=True)
Address('.:')
>>> Address(":a:.:b").normalize()
Address(':a:b')
>>> Address(":a:.:b").normalize(keep_container=True)
Address(':a:b')
>>> Address(":a:..:b").normalize()
Address(':b')
>>> Address("..:.:b").normalize()
Address('..:b')
>>> Address("..:.:b").normalize(keep_container=True)
Address('..:b')
>>> Address("a:b:").normalize()
Address('a:b')
>>> Address("a:b:").normalize(keep_container=True)
Address('a:b:')
>>> Address("a:b:.").normalize(keep_container=True)
Address('a:b')
Source code in kraken/core/address/_address.py
def normalize(self, *, keep_container: bool = False) -> Address:
    """
    Normalize the address, removing any superfluous elements (`.` for current, `..` for parent). A normalized
    is not a container address. Use #set_container() after #normalize() to make it a container address, or pass
    `True` to the *keep_container* argument to keep the container state.

        >>> Address("").normalize()
        Address('.')
        >>> Address("").normalize(keep_container=True)
        Address('.')
        >>> Address(".").normalize()
        Address('.')
        >>> Address(".").normalize(keep_container=True)
        Address('.')
        >>> Address(".:").normalize()
        Address('.')
        >>> Address(".:").normalize(keep_container=True)
        Address('.:')
        >>> Address(":a:.:b").normalize()
        Address(':a:b')
        >>> Address(":a:.:b").normalize(keep_container=True)
        Address(':a:b')
        >>> Address(":a:..:b").normalize()
        Address(':b')
        >>> Address("..:.:b").normalize()
        Address('..:b')
        >>> Address("..:.:b").normalize(keep_container=True)
        Address('..:b')
        >>> Address("a:b:").normalize()
        Address('a:b')
        >>> Address("a:b:").normalize(keep_container=True)
        Address('a:b:')
        >>> Address("a:b:.").normalize(keep_container=True)
        Address('a:b')
    """

    elements: list[Address.Element] = []
    stack = list(reversed(self._elements))
    while stack:
        current = stack.pop()
        if current.is_parent() and elements:
            elements.pop()
        elif current.is_current():
            pass
        else:
            elements.append(current)
    if not self._is_absolute and not elements:
        elements = [Address.Element(Address.Element.CURRENT, False)]
    return Address.create(self._is_absolute, self.is_container() and keep_container, elements)
set_container
set_container(is_container: bool) -> Address

Return a copy of this address with the container flag set to the given value. The container flag indicates whether the string representation of the address is followed by a colon (:). This status is irrelevant for the root address, as it is always a container.

>>> Address(":a").set_container(True)
Address(':a:')
>>> Address(":a:").set_container(False)
Address(':a')

Attempting to set the container status to False for the root address will raise a #ValueError. Attempting to set any container status to the empty address will also raise a #ValueError.

>>> Address(":").set_container(True)
Address(':')
>>> Address(":").set_container(False)
Traceback (most recent call last):
ValueError: Cannot set container status to False for root address
>>> Address("").set_container(True)
Traceback (most recent call last):
ValueError: Cannot set container status for empty address
Source code in kraken/core/address/_address.py
def set_container(self, is_container: bool) -> Address:
    """
    Return a copy of this address with the container flag set to the given value. The container flag indicates
    whether the string representation of the address is followed by a colon (`:`). This status is irrelevant
    for the root address, as it is always a container.

        >>> Address(":a").set_container(True)
        Address(':a:')
        >>> Address(":a:").set_container(False)
        Address(':a')

    Attempting to set the container status to `False` for the root address will raise a #ValueError. Attempting
    to set any container status to the empty address will also raise a #ValueError.

        >>> Address(":").set_container(True)
        Address(':')
        >>> Address(":").set_container(False)
        Traceback (most recent call last):
        ValueError: Cannot set container status to False for root address
        >>> Address("").set_container(True)
        Traceback (most recent call last):
        ValueError: Cannot set container status for empty address
    """

    if self.is_root():
        if not is_container:
            raise ValueError("Cannot set container status to False for root address")
        return self
    if self.is_empty():
        raise ValueError("Cannot set container status for empty address")
    return Address.create(self._is_absolute, is_container, self._elements)

BackgroundTask

Bases: Task

This base class represents a task that starts some process in the background that keeps running which is then terminated when all direct dependant tasks are completed and no work is left. A common use case for this type of task is to spawn sidecar processes which are relied on by other tasks to be available during their execution.

Source code in kraken/core/system/task.py
class BackgroundTask(Task):
    """This base class represents a task that starts some process in the background that keeps running which is
    then terminated when all direct dependant tasks are completed and no work is left. A common use case for this
    type of task is to spawn sidecar processes which are relied on by other tasks to be available during their
    execution."""

    @abc.abstractmethod
    def start_background_task(self, exit_stack: contextlib.ExitStack) -> TaskStatus | None:
        """Start some task or process in the background. Use the *exit_stack* to ensure cleanup of your allocated
        resources in case of an unexpected error or when the background task is torn down. Returning not-None and
        not :attr:`TaskStatusType.STARTED`, or causing an exception will immediately close the exit stack."""

        raise NotImplementedError

    def __del__(self) -> None:
        try:
            self.__exit_stack
        except AttributeError:
            pass
        else:
            logger.warning(
                'BackgroundTask.teardown() did not get called on task "{}". This may cause some issues, such '
                "as an error during serialization or zombie processes.",
                self.address,
            )

    # Task

    def execute(self) -> TaskStatus | None:
        self.__exit_stack = contextlib.ExitStack()
        try:
            status = self.start_background_task(self.__exit_stack)
            if status is None:
                status = TaskStatus.started()
            elif not status.is_started():
                self.__exit_stack.close()
            return status
        except BaseException:
            self.__exit_stack.close()
            raise

    def teardown(self) -> None:
        self.__exit_stack.close()
        del self.__exit_stack
start_background_task abstractmethod
start_background_task(
    exit_stack: ExitStack,
) -> TaskStatus | None

Start some task or process in the background. Use the exit_stack to ensure cleanup of your allocated resources in case of an unexpected error or when the background task is torn down. Returning not-None and not :attr:TaskStatusType.STARTED, or causing an exception will immediately close the exit stack.

Source code in kraken/core/system/task.py
@abc.abstractmethod
def start_background_task(self, exit_stack: contextlib.ExitStack) -> TaskStatus | None:
    """Start some task or process in the background. Use the *exit_stack* to ensure cleanup of your allocated
    resources in case of an unexpected error or when the background task is torn down. Returning not-None and
    not :attr:`TaskStatusType.STARTED`, or causing an exception will immediately close the exit stack."""

    raise NotImplementedError

Context

Bases: MetadataContainer, Currentable['Context']

This class is the single instance where all components of a build process come together.

Source code in kraken/core/system/context.py
class Context(MetadataContainer, Currentable["Context"]):
    """This class is the single instance where all components of a build process come together."""

    #: The focus project is the one that maps to the current working directory when invoking kraken.
    #: Kraken may be invoked in a directory that does not map to a project, in which case this is None.
    focus_project: Project | None = None

    def __init__(
        self,
        build_directory: Path,
        project_finder: ProjectFinder | None = None,
        executor: GraphExecutor | None = None,
        observer: GraphExecutorObserver | None = None,
    ) -> None:
        """
        :param build_directory: The directory in which all files generated durin the build should be stored.
        :param project_finder: This project finder should only search within the directory it was given, not
            around or in parent folders. Defaults to :class:`CurrentDirectoryProjectFinder`.
        :param executor: The executor to use when the graph is executed.
        :param observer: The executro observer to use when the graph is executed.
        """

        super().__init__()
        self.build_directory = build_directory
        self.project_finder = project_finder or CurrentDirectoryProjectFinder.default()
        self.executor = executor or DefaultGraphExecutor(DefaultTaskExecutor())
        self.observer = observer or DefaultPrintingExecutorObserver()
        self._finalized: bool = False
        self._root_project: Project | None = None
        self._listeners: MutableMapping[ContextEvent.Type, list[ContextEvent.Listener]] = collections.defaultdict(list)
        self.focus_project: Project | None = None

    @property
    def root_project(self) -> Project:
        assert self._root_project is not None, "Context.root_project is not set"
        return self._root_project

    @root_project.setter
    def root_project(self, project: Project) -> None:
        assert self._root_project is None, "Context.root_project is already set"
        self._root_project = project

    def load_project(
        self,
        directory: Path,
        parent: Project | None = None,
        require_buildscript: bool = True,
        runner: ScriptRunner | None = None,
        script: Path | None = None,
    ) -> Project:
        """Loads a project from a file or directory.

        :param directory: The directory to load the project from.
        :param parent: The parent project. If no parent is specified, then the :attr:`root_project`
            must not have been initialized yet and the loaded project will be initialize it.
            If the root project is initialized but no parent is specified, an error will be
            raised.
        :param require_buildscript: If set to `True`, a build script must exist in *directory*.
            Otherwise, it will be accepted if no build script exists in the directory.
        :param runner: If the :class:`ScriptRunner` for this project is already known, it can be passed here.
        :param script: If the script to load for the project is already known, it can be passed here. Cannot be
            specified without a *runner*.
        """

        if not runner:
            if script is not None:
                raise ValueError("cannot specify `script` parameter without a `runner` parameter")
            project_info = self.project_finder.find_project(directory)
            if project_info is not None:
                script, runner = project_info
        if not script and runner:
            script = runner.find_script(directory)

        has_root_project = self._root_project is not None
        project = Project(directory.name, directory, parent, self)
        try:
            if parent:
                parent.add_child(project)

            self.trigger(ContextEvent.Type.on_project_init, project)

            with self.as_current(), project.as_current():
                if not has_root_project:
                    self._root_project = project

                if script is None and require_buildscript:
                    raise ProjectLoaderError(
                        project,
                        f"no buildscript found for {project} (directory: {project.directory.absolute().resolve()})",
                    )
                if script is not None:
                    assert runner is not None
                    runner.execute_script(script, {"project": project})

            self.trigger(ContextEvent.Type.on_project_loaded, project)

        except ProjectLoaderError as exc:
            if exc.project is project:
                # Revert changes if the project that the error occurred with is the current project.
                if not has_root_project:
                    self._root_project = None
                if parent:
                    parent.remove_child(project)
            raise

        return project

    def iter_projects(self, relative_to: Project | None = None) -> Iterator[Project]:
        """Iterates over all projects in the context."""

        def _recurse(project: Project) -> Iterator[Project]:
            yield project
            for child_project in project.subprojects().values():
                yield from _recurse(child_project)

        yield from _recurse(relative_to or self.root_project)

    def get_project(self, address: Address) -> Project:
        """
        Find a project by its address. The address must be absolute.
        """

        if not address.is_absolute():
            raise ValueError(f"address '{address}' is not absolute")

        project: Project | None = self.root_project
        assert project is not None

        for element in address.elements:
            project = project.subproject(element.value, "or-none")
            if not project:
                raise ProjectNotFoundError(address)

        return project

    def resolve_tasks(
        self,
        addresses: Iterable[Task | str | Address] | None,
        relative_to: Project | Address | None = None,
        set_selected: bool = False,
    ) -> list[Task]:
        """
        This method finds Kraken tasks by their address, relative to a given project. If no project is
        specified, the address is resolved relative to the root project.

        :param addresses: A list of task addresses to resolve. Task addresses may contain glob patterns
            (`*` and `**` as well as `?` at the end of an address element, see the #Address class for
            more details).

            Any address that consists of only a single non-globbing path element (such as `lint` or `test`)
            will be prefixed by a wildcard (such that they are semantically equivalent to `**:lint` and
            `**:test`, respectively).

            In case the address specifies a container (that is, if it ends with a colon), then this will
            resolve the default tasks or this container.
            As an example, `:` will get the default tasks of the current project, and `:**:` will get the
            default tasks of all sub-projects.
            Specifying `None` is a shorthand for resolving `:` and `:**:`, that is, will resolve to the
            default tasks of the current project and its sub-projects.

        :param relative_to: The Kraken project to resolve the task addresses relative to. If this is not
            specified, the #root_project is used instead.

        :param set_selected: If enabled, addresses that resolve to tasks immediately will be marked as selected
            before they are returned. Note that this does not mark tasks as selected when they are picked up by
            via the default tasks of a project. For example, when `:*` is resolved, the default tasks of all
            sub-projects will be returned, but they will not be marked as selected. The tasks of the root project
            however, will be marked as selected.
        """

        if not isinstance(relative_to, Address):
            relative_to = relative_to.address if relative_to is not None else Address.ROOT

        if not relative_to.is_absolute():
            raise ValueError(f"'relative_to' must be an absolute address (got {relative_to!r})")

        if addresses is None:
            addresses = [
                ".:",  # The current project (will be "expanded" to its default tasks)
                "**:",  # All sub-projects (will be "expanded" to their default tasks)
            ]

        results: list[Task] = []
        space = KrakenAddressSpace(self.root_project)
        for address in addresses:
            if isinstance(address, Task):
                results.append(address)
                continue
            try:
                results += self._resolve_single_address(Address(address), relative_to, space, set_selected)
            except TaskResolutionException:
                if address == "**:":
                    # In case the project has no sub-projects, it is expected not to find any tasks there
                    pass
                else:
                    raise

        return results

    def _resolve_single_address(
        self,
        address: Address,
        relative_to: Address,
        space: KrakenAddressSpace,
        set_selected: bool,
    ) -> list[Task]:
        """
        Resolve a single address in the context.

        Any address that contains only a single path element (such as `lint` or `test`) will be prefixed
        with `**:`, such that they are semantically equivalent to `**:lint` and `**:test`, respectively.
        """

        if address.is_empty():
            raise TaskResolutionException("Impossible to resolve the empty address.")

        # Prefix single-element addresses with `**:`, unless the last element already is `**`.
        if (
            not address.is_absolute()
            and not address.is_container()
            and len(address) == 1
            and not address.elements[0].is_recursive_wildcard()
        ):
            address = Address.RECURSIVE_WILDCARD.concat(address)
        if not address.is_absolute():
            address = relative_to.concat(address).normalize(keep_container=True)

        matches = list(resolve_address(space, self.root_project, address).matches())
        tasks = [t for t in matches if isinstance(t, Task)]
        if set_selected:
            for task in tasks:
                task.selected = True
        projects = [p for p in matches if isinstance(p, Project)]
        if projects:
            # Using the address of a project means we want to select its default tasks
            for proj in projects:
                tasks += [task for task in proj.tasks().values() if task.default]
        if not tasks:
            raise TaskResolutionException(f"'{address}' refers to no tasks.")
        return tasks

    def finalize(self) -> None:
        """Call :meth:`Task.finalize()` on all tasks. This should be called before a graph is created."""

        if self._finalized:
            logger.warning("Context.finalize() called more than once", stack_info=True)
            return

        self._finalized = True
        self.trigger(ContextEvent.Type.on_context_begin_finalize, self)

        # Delegate to finalize calls in all tasks of all projects.
        for project in self.iter_projects():
            self.trigger(ContextEvent.Type.on_project_begin_finalize, project)
            for task in project.tasks().values():
                task.finalize()
            self.trigger(ContextEvent.Type.on_project_finalized, project)

        self.trigger(ContextEvent.Type.on_context_finalized, self)

    def get_build_graph(self, targets: Sequence[str | Address | Task] | None) -> TaskGraph:
        """Returns the :class:`TaskGraph` that contains either all default tasks or the tasks specified with
        the *targets* argument.

        :param targets: A list of targets to resolve and to build the graph from.
        :raise ValueError: If not tasks were selected.
        """

        if targets is None:
            tasks = self.resolve_tasks(None)
        else:
            needs_resolving, resolved = bipartition(lambda t: isinstance(t, Task), targets)
            tasks = cast(list[Task], list(resolved))
            tasks.extend(self.resolve_tasks(needs_resolving))

        if not tasks:
            raise ValueError("no tasks selected")

        graph = TaskGraph(self).trim(tasks)

        assert graph, "TaskGraph cannot be empty"
        return graph

    def execute(self, tasks: list[str | Address | Task] | TaskGraph | None = None) -> TaskGraph:
        """Execute all default tasks or the tasks specified by *targets* using the default executor.
        If :meth:`finalize` was not called already it will be called by this function before the build
        graph is created, unless a build graph is passed in the first place.

        :param tasks: The list of tasks to execute, or the build graph. If none specified, all default
            tasks will be executed.
        :raise BuildError: If any task fails to execute.
        """

        if isinstance(tasks, TaskGraph):
            assert self._finalized, "no, no, this is all wrong. you need to finalize the context first"
            graph = tasks
        else:
            if not self._finalized:
                self.finalize()
            graph = self.get_build_graph(tasks)

        self.executor.execute_graph(graph, self.observer)

        if not graph.is_complete():
            raise BuildError(list(graph.tasks(failed=True)))
        return graph

    @overload
    def listen(
        self, event_type: str | ContextEvent.Type
    ) -> Callable[[ContextEvent.T_Listener], ContextEvent.T_Listener]: ...

    @overload
    def listen(self, event_type: str | ContextEvent.Type, listener: ContextEvent.Listener) -> None: ...

    def listen(self, event_type: str | ContextEvent.Type, listener: ContextEvent.Listener | None = None) -> Any:
        """Registers a listener to the context for the given event type."""

        if isinstance(event_type, str):
            event_type = ContextEvent.Type[event_type]

        def register(listener: ContextEvent.T_Listener) -> ContextEvent.T_Listener:
            assert callable(listener), "listener must be callable, got: %r" % listener
            self._listeners[event_type].append(listener)
            return listener

        if listener is None:
            return register

        register(listener)

    def trigger(self, event_type: ContextEvent.Type, data: Any) -> None:
        assert isinstance(event_type, ContextEvent.Type), repr(event_type)
        assert event_type != ContextEvent.Type.any, "cannot trigger event of type 'any'"
        listeners = (*self._listeners.get(ContextEvent.Type.any, ()), *self._listeners.get(event_type, ()))
        for listener in listeners:
            # TODO(NiklasRosenstein): Should we catch errors in listeners of letting them propagate?
            listener(ContextEvent(event_type, data))
__init__
__init__(
    build_directory: Path,
    project_finder: ProjectFinder | None = None,
    executor: GraphExecutor | None = None,
    observer: GraphExecutorObserver | None = None,
) -> None

:param build_directory: The directory in which all files generated durin the build should be stored. :param project_finder: This project finder should only search within the directory it was given, not around or in parent folders. Defaults to :class:CurrentDirectoryProjectFinder. :param executor: The executor to use when the graph is executed. :param observer: The executro observer to use when the graph is executed.

Source code in kraken/core/system/context.py
def __init__(
    self,
    build_directory: Path,
    project_finder: ProjectFinder | None = None,
    executor: GraphExecutor | None = None,
    observer: GraphExecutorObserver | None = None,
) -> None:
    """
    :param build_directory: The directory in which all files generated durin the build should be stored.
    :param project_finder: This project finder should only search within the directory it was given, not
        around or in parent folders. Defaults to :class:`CurrentDirectoryProjectFinder`.
    :param executor: The executor to use when the graph is executed.
    :param observer: The executro observer to use when the graph is executed.
    """

    super().__init__()
    self.build_directory = build_directory
    self.project_finder = project_finder or CurrentDirectoryProjectFinder.default()
    self.executor = executor or DefaultGraphExecutor(DefaultTaskExecutor())
    self.observer = observer or DefaultPrintingExecutorObserver()
    self._finalized: bool = False
    self._root_project: Project | None = None
    self._listeners: MutableMapping[ContextEvent.Type, list[ContextEvent.Listener]] = collections.defaultdict(list)
    self.focus_project: Project | None = None
execute
execute(
    tasks: (
        list[str | Address | Task] | TaskGraph | None
    ) = None,
) -> TaskGraph

Execute all default tasks or the tasks specified by targets using the default executor. If :meth:finalize was not called already it will be called by this function before the build graph is created, unless a build graph is passed in the first place.

:param tasks: The list of tasks to execute, or the build graph. If none specified, all default tasks will be executed. :raise BuildError: If any task fails to execute.

Source code in kraken/core/system/context.py
def execute(self, tasks: list[str | Address | Task] | TaskGraph | None = None) -> TaskGraph:
    """Execute all default tasks or the tasks specified by *targets* using the default executor.
    If :meth:`finalize` was not called already it will be called by this function before the build
    graph is created, unless a build graph is passed in the first place.

    :param tasks: The list of tasks to execute, or the build graph. If none specified, all default
        tasks will be executed.
    :raise BuildError: If any task fails to execute.
    """

    if isinstance(tasks, TaskGraph):
        assert self._finalized, "no, no, this is all wrong. you need to finalize the context first"
        graph = tasks
    else:
        if not self._finalized:
            self.finalize()
        graph = self.get_build_graph(tasks)

    self.executor.execute_graph(graph, self.observer)

    if not graph.is_complete():
        raise BuildError(list(graph.tasks(failed=True)))
    return graph
finalize
finalize() -> None

Call :meth:Task.finalize() on all tasks. This should be called before a graph is created.

Source code in kraken/core/system/context.py
def finalize(self) -> None:
    """Call :meth:`Task.finalize()` on all tasks. This should be called before a graph is created."""

    if self._finalized:
        logger.warning("Context.finalize() called more than once", stack_info=True)
        return

    self._finalized = True
    self.trigger(ContextEvent.Type.on_context_begin_finalize, self)

    # Delegate to finalize calls in all tasks of all projects.
    for project in self.iter_projects():
        self.trigger(ContextEvent.Type.on_project_begin_finalize, project)
        for task in project.tasks().values():
            task.finalize()
        self.trigger(ContextEvent.Type.on_project_finalized, project)

    self.trigger(ContextEvent.Type.on_context_finalized, self)
get_build_graph
get_build_graph(
    targets: Sequence[str | Address | Task] | None,
) -> TaskGraph

Returns the :class:TaskGraph that contains either all default tasks or the tasks specified with the targets argument.

:param targets: A list of targets to resolve and to build the graph from. :raise ValueError: If not tasks were selected.

Source code in kraken/core/system/context.py
def get_build_graph(self, targets: Sequence[str | Address | Task] | None) -> TaskGraph:
    """Returns the :class:`TaskGraph` that contains either all default tasks or the tasks specified with
    the *targets* argument.

    :param targets: A list of targets to resolve and to build the graph from.
    :raise ValueError: If not tasks were selected.
    """

    if targets is None:
        tasks = self.resolve_tasks(None)
    else:
        needs_resolving, resolved = bipartition(lambda t: isinstance(t, Task), targets)
        tasks = cast(list[Task], list(resolved))
        tasks.extend(self.resolve_tasks(needs_resolving))

    if not tasks:
        raise ValueError("no tasks selected")

    graph = TaskGraph(self).trim(tasks)

    assert graph, "TaskGraph cannot be empty"
    return graph
get_project
get_project(address: Address) -> Project

Find a project by its address. The address must be absolute.

Source code in kraken/core/system/context.py
def get_project(self, address: Address) -> Project:
    """
    Find a project by its address. The address must be absolute.
    """

    if not address.is_absolute():
        raise ValueError(f"address '{address}' is not absolute")

    project: Project | None = self.root_project
    assert project is not None

    for element in address.elements:
        project = project.subproject(element.value, "or-none")
        if not project:
            raise ProjectNotFoundError(address)

    return project
iter_projects
iter_projects(
    relative_to: Project | None = None,
) -> Iterator[Project]

Iterates over all projects in the context.

Source code in kraken/core/system/context.py
def iter_projects(self, relative_to: Project | None = None) -> Iterator[Project]:
    """Iterates over all projects in the context."""

    def _recurse(project: Project) -> Iterator[Project]:
        yield project
        for child_project in project.subprojects().values():
            yield from _recurse(child_project)

    yield from _recurse(relative_to or self.root_project)
listen
listen(
    event_type: str | Type, listener: Listener | None = None
) -> Any

Registers a listener to the context for the given event type.

Source code in kraken/core/system/context.py
def listen(self, event_type: str | ContextEvent.Type, listener: ContextEvent.Listener | None = None) -> Any:
    """Registers a listener to the context for the given event type."""

    if isinstance(event_type, str):
        event_type = ContextEvent.Type[event_type]

    def register(listener: ContextEvent.T_Listener) -> ContextEvent.T_Listener:
        assert callable(listener), "listener must be callable, got: %r" % listener
        self._listeners[event_type].append(listener)
        return listener

    if listener is None:
        return register

    register(listener)
load_project
load_project(
    directory: Path,
    parent: Project | None = None,
    require_buildscript: bool = True,
    runner: ScriptRunner | None = None,
    script: Path | None = None,
) -> Project

Loads a project from a file or directory.

:param directory: The directory to load the project from. :param parent: The parent project. If no parent is specified, then the :attr:root_project must not have been initialized yet and the loaded project will be initialize it. If the root project is initialized but no parent is specified, an error will be raised. :param require_buildscript: If set to True, a build script must exist in directory. Otherwise, it will be accepted if no build script exists in the directory. :param runner: If the :class:ScriptRunner for this project is already known, it can be passed here. :param script: If the script to load for the project is already known, it can be passed here. Cannot be specified without a runner.

Source code in kraken/core/system/context.py
def load_project(
    self,
    directory: Path,
    parent: Project | None = None,
    require_buildscript: bool = True,
    runner: ScriptRunner | None = None,
    script: Path | None = None,
) -> Project:
    """Loads a project from a file or directory.

    :param directory: The directory to load the project from.
    :param parent: The parent project. If no parent is specified, then the :attr:`root_project`
        must not have been initialized yet and the loaded project will be initialize it.
        If the root project is initialized but no parent is specified, an error will be
        raised.
    :param require_buildscript: If set to `True`, a build script must exist in *directory*.
        Otherwise, it will be accepted if no build script exists in the directory.
    :param runner: If the :class:`ScriptRunner` for this project is already known, it can be passed here.
    :param script: If the script to load for the project is already known, it can be passed here. Cannot be
        specified without a *runner*.
    """

    if not runner:
        if script is not None:
            raise ValueError("cannot specify `script` parameter without a `runner` parameter")
        project_info = self.project_finder.find_project(directory)
        if project_info is not None:
            script, runner = project_info
    if not script and runner:
        script = runner.find_script(directory)

    has_root_project = self._root_project is not None
    project = Project(directory.name, directory, parent, self)
    try:
        if parent:
            parent.add_child(project)

        self.trigger(ContextEvent.Type.on_project_init, project)

        with self.as_current(), project.as_current():
            if not has_root_project:
                self._root_project = project

            if script is None and require_buildscript:
                raise ProjectLoaderError(
                    project,
                    f"no buildscript found for {project} (directory: {project.directory.absolute().resolve()})",
                )
            if script is not None:
                assert runner is not None
                runner.execute_script(script, {"project": project})

        self.trigger(ContextEvent.Type.on_project_loaded, project)

    except ProjectLoaderError as exc:
        if exc.project is project:
            # Revert changes if the project that the error occurred with is the current project.
            if not has_root_project:
                self._root_project = None
            if parent:
                parent.remove_child(project)
        raise

    return project
resolve_tasks
resolve_tasks(
    addresses: Iterable[Task | str | Address] | None,
    relative_to: Project | Address | None = None,
    set_selected: bool = False,
) -> list[Task]

This method finds Kraken tasks by their address, relative to a given project. If no project is specified, the address is resolved relative to the root project.

:param addresses: A list of task addresses to resolve. Task addresses may contain glob patterns (* and ** as well as ? at the end of an address element, see the #Address class for more details).

Any address that consists of only a single non-globbing path element (such as `lint` or `test`)
will be prefixed by a wildcard (such that they are semantically equivalent to `**:lint` and
`**:test`, respectively).

In case the address specifies a container (that is, if it ends with a colon), then this will
resolve the default tasks or this container.
As an example, `:` will get the default tasks of the current project, and `:**:` will get the
default tasks of all sub-projects.
Specifying `None` is a shorthand for resolving `:` and `:**:`, that is, will resolve to the
default tasks of the current project and its sub-projects.

:param relative_to: The Kraken project to resolve the task addresses relative to. If this is not specified, the #root_project is used instead.

:param set_selected: If enabled, addresses that resolve to tasks immediately will be marked as selected before they are returned. Note that this does not mark tasks as selected when they are picked up by via the default tasks of a project. For example, when :* is resolved, the default tasks of all sub-projects will be returned, but they will not be marked as selected. The tasks of the root project however, will be marked as selected.

Source code in kraken/core/system/context.py
def resolve_tasks(
    self,
    addresses: Iterable[Task | str | Address] | None,
    relative_to: Project | Address | None = None,
    set_selected: bool = False,
) -> list[Task]:
    """
    This method finds Kraken tasks by their address, relative to a given project. If no project is
    specified, the address is resolved relative to the root project.

    :param addresses: A list of task addresses to resolve. Task addresses may contain glob patterns
        (`*` and `**` as well as `?` at the end of an address element, see the #Address class for
        more details).

        Any address that consists of only a single non-globbing path element (such as `lint` or `test`)
        will be prefixed by a wildcard (such that they are semantically equivalent to `**:lint` and
        `**:test`, respectively).

        In case the address specifies a container (that is, if it ends with a colon), then this will
        resolve the default tasks or this container.
        As an example, `:` will get the default tasks of the current project, and `:**:` will get the
        default tasks of all sub-projects.
        Specifying `None` is a shorthand for resolving `:` and `:**:`, that is, will resolve to the
        default tasks of the current project and its sub-projects.

    :param relative_to: The Kraken project to resolve the task addresses relative to. If this is not
        specified, the #root_project is used instead.

    :param set_selected: If enabled, addresses that resolve to tasks immediately will be marked as selected
        before they are returned. Note that this does not mark tasks as selected when they are picked up by
        via the default tasks of a project. For example, when `:*` is resolved, the default tasks of all
        sub-projects will be returned, but they will not be marked as selected. The tasks of the root project
        however, will be marked as selected.
    """

    if not isinstance(relative_to, Address):
        relative_to = relative_to.address if relative_to is not None else Address.ROOT

    if not relative_to.is_absolute():
        raise ValueError(f"'relative_to' must be an absolute address (got {relative_to!r})")

    if addresses is None:
        addresses = [
            ".:",  # The current project (will be "expanded" to its default tasks)
            "**:",  # All sub-projects (will be "expanded" to their default tasks)
        ]

    results: list[Task] = []
    space = KrakenAddressSpace(self.root_project)
    for address in addresses:
        if isinstance(address, Task):
            results.append(address)
            continue
        try:
            results += self._resolve_single_address(Address(address), relative_to, space, set_selected)
        except TaskResolutionException:
            if address == "**:":
                # In case the project has no sub-projects, it is expected not to find any tasks there
                pass
            else:
                raise

    return results

Graph

Bases: ABC

Interface for task graphs required for execution.

Source code in kraken/core/system/executor/__init__.py
class Graph(abc.ABC):
    """Interface for task graphs required for execution."""

    @abc.abstractmethod
    def ready(self) -> list[Task]:
        """Block until new tasks are ready to be executed. Return empty if no tasks are left. If no tasks are left
        but :meth:`is_complete` returns `False`, the build was unsuccessful."""

    @abc.abstractmethod
    def get_successors(self, task: Task) -> list[Task]:
        """Return all active dependants of the given task."""

    @abc.abstractmethod
    def get_task(self, task_path: Address) -> Task:
        """Return a task by its path."""

    @abc.abstractmethod
    def set_status(self, task: Task, status: TaskStatus) -> None:
        """Set the result of a task. Can be called twice for the same task unless the previous call was passing
        a status with type :attr:`TaskStatusType.STARTED`."""

    @abc.abstractmethod
    def is_complete(self) -> bool:
        """Return `True` if all tasks in the graph are done and successful."""

    @abc.abstractmethod
    def tasks(
        self,
        goals: bool = False,
        pending: bool = False,
        failed: bool = False,
        not_executed: bool = False,
    ) -> Iterator[Task]:
        """Returns the tasks in the graph in arbitrary order.

        :param goals: Return only goal tasks (i.e. leaf nodes).
        :param pending: Return only pending tasks.
        :param failed: Return only failed tasks.
        :param not_executed: Return only not executed tasks (i.e. downstream of failed tasks)"""
get_successors abstractmethod
get_successors(task: Task) -> list[Task]

Return all active dependants of the given task.

Source code in kraken/core/system/executor/__init__.py
@abc.abstractmethod
def get_successors(self, task: Task) -> list[Task]:
    """Return all active dependants of the given task."""
get_task abstractmethod
get_task(task_path: Address) -> Task

Return a task by its path.

Source code in kraken/core/system/executor/__init__.py
@abc.abstractmethod
def get_task(self, task_path: Address) -> Task:
    """Return a task by its path."""
is_complete abstractmethod
is_complete() -> bool

Return True if all tasks in the graph are done and successful.

Source code in kraken/core/system/executor/__init__.py
@abc.abstractmethod
def is_complete(self) -> bool:
    """Return `True` if all tasks in the graph are done and successful."""
ready abstractmethod
ready() -> list[Task]

Block until new tasks are ready to be executed. Return empty if no tasks are left. If no tasks are left but :meth:is_complete returns False, the build was unsuccessful.

Source code in kraken/core/system/executor/__init__.py
@abc.abstractmethod
def ready(self) -> list[Task]:
    """Block until new tasks are ready to be executed. Return empty if no tasks are left. If no tasks are left
    but :meth:`is_complete` returns `False`, the build was unsuccessful."""
set_status abstractmethod
set_status(task: Task, status: TaskStatus) -> None

Set the result of a task. Can be called twice for the same task unless the previous call was passing a status with type :attr:TaskStatusType.STARTED.

Source code in kraken/core/system/executor/__init__.py
@abc.abstractmethod
def set_status(self, task: Task, status: TaskStatus) -> None:
    """Set the result of a task. Can be called twice for the same task unless the previous call was passing
    a status with type :attr:`TaskStatusType.STARTED`."""
tasks abstractmethod
tasks(
    goals: bool = False,
    pending: bool = False,
    failed: bool = False,
    not_executed: bool = False,
) -> Iterator[Task]

Returns the tasks in the graph in arbitrary order.

:param goals: Return only goal tasks (i.e. leaf nodes). :param pending: Return only pending tasks. :param failed: Return only failed tasks. :param not_executed: Return only not executed tasks (i.e. downstream of failed tasks)

Source code in kraken/core/system/executor/__init__.py
@abc.abstractmethod
def tasks(
    self,
    goals: bool = False,
    pending: bool = False,
    failed: bool = False,
    not_executed: bool = False,
) -> Iterator[Task]:
    """Returns the tasks in the graph in arbitrary order.

    :param goals: Return only goal tasks (i.e. leaf nodes).
    :param pending: Return only pending tasks.
    :param failed: Return only failed tasks.
    :param not_executed: Return only not executed tasks (i.e. downstream of failed tasks)"""

GroupTask

Bases: Task

This task can be used to group tasks under a common name. Ultimately it is just another task that depends on the tasks in the group, forcing them to be executed when this task is targeted. Group tasks are not enabled by default.

Source code in kraken/core/system/task.py
class GroupTask(Task):
    """This task can be used to group tasks under a common name. Ultimately it is just another task that depends on
    the tasks in the group, forcing them to be executed when this task is targeted. Group tasks are not enabled
    by default."""

    tasks: list[Task]

    def __init__(self, name: str, project: Project) -> None:
        super().__init__(name, project)
        self.tasks = []
        self.default = False

    def add(self, tasks: str | Address | Task | Iterable[str | Address | Task]) -> None:
        """Add one or more tasks by name or task object to this group.

        This is different from adding a task via :meth:`add_relationship` because the task is instead stored in the
        :attr:`tasks` list which can be used to access the members of the task. Relationships for a group task can
        still be used to express relationships between groups or tasks and groups.

        Also note that :meth:`add_relationship` supports lazy evaluation of task selectors, whereas using this method
        to add a task to the group by a selector string requires that the task already exists.
        """

        if isinstance(tasks, (str, Address, Task)):
            tasks = [tasks]

        for task in tasks:
            if isinstance(task, (str, Address)):
                self.tasks += [
                    t for t in self.project.context.resolve_tasks([task], self.project) if t not in self.tasks
                ]
            elif task not in self.tasks:
                self.tasks.append(task)

    # Task

    def get_outputs(self, output_type: type[T] | type[object] = object) -> Iterable[T] | Iterable[Any]:
        yield from super().get_outputs(output_type)
        for task in self.tasks:
            yield from task.get_outputs(output_type)

    def get_relationships(self) -> Iterable[TaskRelationship]:
        for task in self.tasks:
            yield TaskRelationship(task, True, False)
        yield from super().get_relationships()

    def prepare(self) -> TaskStatus | None:
        return TaskStatus.skipped("is a GroupTask")

    def execute(self) -> TaskStatus | None:
        raise RuntimeError("GroupTask cannot be executed")
add
add(
    tasks: (
        str
        | Address
        | Task
        | Iterable[str | Address | Task]
    ),
) -> None

Add one or more tasks by name or task object to this group.

This is different from adding a task via :meth:add_relationship because the task is instead stored in the :attr:tasks list which can be used to access the members of the task. Relationships for a group task can still be used to express relationships between groups or tasks and groups.

Also note that :meth:add_relationship supports lazy evaluation of task selectors, whereas using this method to add a task to the group by a selector string requires that the task already exists.

Source code in kraken/core/system/task.py
def add(self, tasks: str | Address | Task | Iterable[str | Address | Task]) -> None:
    """Add one or more tasks by name or task object to this group.

    This is different from adding a task via :meth:`add_relationship` because the task is instead stored in the
    :attr:`tasks` list which can be used to access the members of the task. Relationships for a group task can
    still be used to express relationships between groups or tasks and groups.

    Also note that :meth:`add_relationship` supports lazy evaluation of task selectors, whereas using this method
    to add a task to the group by a selector string requires that the task already exists.
    """

    if isinstance(tasks, (str, Address, Task)):
        tasks = [tasks]

    for task in tasks:
        if isinstance(task, (str, Address)):
            self.tasks += [
                t for t in self.project.context.resolve_tasks([task], self.project) if t not in self.tasks
            ]
        elif task not in self.tasks:
            self.tasks.append(task)

Project

Bases: KrakenObject, MetadataContainer, Currentable['Project']

A project consolidates tasks related to a directory on the filesystem.

Source code in kraken/core/system/project.py
class Project(KrakenObject, MetadataContainer, Currentable["Project"]):
    """A project consolidates tasks related to a directory on the filesystem."""

    directory: Path
    context: Context
    metadata: list[Any]  #: A list of arbitrary objects that are usually looked up by type.

    def __init__(self, name: str, directory: Path, parent: Project | None, context: Context) -> None:
        assert isinstance(name, str), type(name)
        assert isinstance(directory, Path), type(directory)
        assert isinstance(parent, Project) or parent is None, type(parent)
        KrakenObject.__init__(self, name, parent)
        MetadataContainer.__init__(self)
        Currentable.__init__(self)

        self.directory = directory
        self.context = context
        self.metadata = []

        # We store all members that can be referenced by a fully qualified name in the same dictionary to ensure
        # we're not accidentally allocating the same name twice.
        self._members: dict[str, Task | Project] = {}

        apply_group = self.group(
            "apply", description="Tasks that perform automatic updates to the project consistency."
        )
        fmt_group = self.group("fmt", description="Tasks that that perform code formatting operations.")
        fmt_group.depends_on(apply_group, mode="strict")

        check_group = self.group("check", description="Tasks that perform project consistency checks.", default=True)

        gen_group = self.group("gen", description="Tasks that perform code generation.", default=True)

        lint_group = self.group("lint", description="Tasks that perform code linting.", default=True)
        lint_group.depends_on(check_group, mode="strict")
        lint_group.depends_on(gen_group, mode="strict")

        build_group = self.group("build", description="Tasks that produce build artefacts.")
        build_group.depends_on(lint_group, mode="order-only")
        build_group.depends_on(gen_group, mode="strict")

        audit_group = self.group("audit", description="Tasks that perform auditing on built artefacts and code")
        audit_group.depends_on(build_group, mode="strict")
        audit_group.depends_on(gen_group, mode="strict")

        test_group = self.group("test", description="Tasks that perform unit tests.", default=True)
        test_group.depends_on(build_group, mode="order-only")
        test_group.depends_on(gen_group, mode="strict")

        integration_test_group = self.group("integrationTest", description="Tasks that perform integration tests.")
        integration_test_group.depends_on(test_group, mode="order-only")
        integration_test_group.depends_on(gen_group, mode="strict")

        publish_group = self.group("publish", description="Tasks that publish build artefacts.")
        publish_group.depends_on(integration_test_group, mode="order-only")
        publish_group.depends_on(build_group, mode="strict")

        deploy_group = self.group("deploy", description="Tasks that deploy applications.")
        deploy_group.depends_on(publish_group, mode="order-only")

        self.group("update", description="Tasks that update dependencies of the project.")

    def __repr__(self) -> str:
        return f"Project({self.address})"

    @property
    def parent(self) -> Project | None:
        if self._parent is None:
            return None
        assert isinstance(self._parent, Project), "Project.parent must be a Project"
        return self._parent

    @property
    def name(self) -> str:
        if self.address.is_root():
            warnings.warn(
                "Accessing Project.name on the root project is deprecated since kraken-core v0.12.0. "
                "In future versions, this will result ValueError being raised. The project name is now "
                "determined by the Address.name, which is undefined on the root address (`:`). "
                "The fallback behaviour for this version is that we return the Project.directory.name.",
                DeprecationWarning,
            )
            return self.directory.name
        return self.address.name

    @property
    def build_directory(self) -> Path:
        """Returns the recommended build directory for the project; this is a directory inside the context
        build directory ammended by the project name."""

        return self.context.build_directory / str(self.address).replace(":", "/").lstrip("/")

    @overload
    def task(self, name: str, /) -> Task:
        """
        Get a task from the project by name. Raises a #TaskNotFound exception if the task does not exist.
        """

    @overload
    def task(
        self,
        name: str,
        type_: type[T_Task],
        /,
        *,
        default: bool | None = None,
        group: str | GroupTask | None = None,
        description: str | None = None,
    ) -> T_Task:
        """
        Create a new task in the project with the specified name. If a member of the project with the same name already
        exists, a #DuplicateMember exception is raised.
        """

    def task(
        self,
        name: str,
        type_: type[T_Task] | None = None,
        /,
        *,
        default: bool | None = None,
        group: str | GroupTask | None = None,
        description: str | None = None,
    ) -> Task | T_Task:
        if type_ is None:
            assert default is None
            assert group is None
            assert description is None

            try:
                task = self._members[name]
                if not isinstance(task, Task):
                    raise KeyError("Not a task")
            except KeyError:
                raise TaskNotFound(self.address.concat(name))
            return task

        if type_ is None or not isinstance(type_, type) or not issubclass(type_, Task):
            raise TypeError(f"Expected a Task type, got {type(type_).__name__}")

        if name in self._members:
            raise DuplicateMember(f"{self} already has a member {name!r}")

        task = type_(name, self)
        if default is not None:
            task.default = default

        match group:
            case str():
                self.group(group).add(task)
            case GroupTask():
                group.add(task)
            case None:
                pass
            case _:
                raise TypeError(f"Expected str or GroupTask, got {type(group)}")

        if description is not None:
            task.description = description

        self._members[name] = task
        return task

    def tasks(self) -> Mapping[str, Task]:
        return {t.name: t for t in self._members.values() if isinstance(t, Task)}

    def subprojects(self) -> Mapping[str, Project]:
        return {p.name: p for p in self._members.values() if isinstance(p, Project)}

    @overload
    def subproject(self, name: str, mode: Literal["empty", "execute"] = "execute") -> Project:
        """
        Mount a sub-project of this project with the specified *name*.

        :param name: The name of the sub-project. The address of the returned project will be the current project's
            address appended by the given *name*. The name must not contain special characters reserved to the Address
            syntax. The sub-project will be bound to the directory with the same *name* in the directory of the current
            project.
        :param mode: Specifies how the project should be created. If set to "empty", the project will be created
            without loading any build scripts. If set to "execute" (default), the project will be created and its
            build scripts will be executed.
        """

    @overload
    def subproject(self, name: str, mode: Literal["if-exists", "or-none"]) -> Project | None:
        """
        Mount a sub-project of this project with the specified *name* and execute it if the directory matching the
        *name* exists. If such a directory does not exist, no project is created and `None` is returned. If you want
        to create a project in any case, you can use this method, and if you get `None` back you can call
        #subproject() again with the *mode* set to "empty".

        Using the `"or-none"` mode, the sub-project will only be returned if it was already loaded.
        """

    def subproject(
        self,
        name: str,
        mode: Literal["empty", "execute", "if-exists", "or-none"] = "execute",
    ) -> Project | None:
        assert isinstance(mode, str), f"mode must be a string, got {type(mode).__name__}"

        obj = self._members.get(name)
        if obj is None and mode == "or-none":
            return None
        if obj is not None:
            if not isinstance(obj, Project):
                raise ValueError(
                    f"{self.address}:{name} does not refer to a project (got {type(obj).__name__} instead)"
                )
            return obj

        directory = self.directory / name
        if mode == "empty":
            project = Project(name, directory, self, self.context)
            self._members[name] = project
        elif mode == "execute" or mode == "if-exists":
            if not directory.is_dir():
                if mode == "if-exists":
                    return None
                raise FileNotFoundError(
                    f"{self.address}:{name} cannot be loaded because the directory {directory} does not exist"
                )
            project = self.context.load_project(directory, self, require_buildscript=False)
            assert name in self._members
            assert self._members[name] is project
        else:
            raise ValueError(f"invalid mode {mode!r}")

        return project

    def has_subproject(self, name: str) -> bool:
        """
        Returns `True` if *name* refers to a subproject that exists in the current project.
        """

        return isinstance(self._members.get(name), Project)

    def add_task(self, task: Task) -> None:
        """Adds a task to the project.

        Raises:
            ValueError: If a member with the same name already exists or if the task's project does not match
        """

        if task.name in self._members:
            raise ValueError(f"{self} already has a member {task.name!r}, cannot add {task}")
        if task.project is not self:
            raise ValueError(f"{task}.project mismatch")
        self._members[task.name] = task

    def add_child(self, project: Project) -> None:
        """Adds a project as a child project.

        Raises:
            ValueError: If a member with the same name already exists or if the project's parent does not match
        """

        if project.name in self._members:
            raise ValueError(f"{self} already has a member {project.name!r}, cannot add {project}")
        if project.parent is not self:
            raise ValueError(f"{project}.parent mismatch")
        self._members[project.name] = project

    def remove_child(self, project: Project) -> None:
        assert project.parent is self
        assert self._members[project.name] is project

        del self._members[project.name]

    def group(self, name: str, *, description: str | None = None, default: bool | None = None) -> GroupTask:
        """Create or get a group of the given name. If a task with the given name already exists, it must refer
        to a task of type :class:`GroupTask`, otherwise a :class:`RuntimeError` is raised.

        :param name: The name of the group in the project.
        :param description: If specified, set the group's description.
        :param default: Whether the task group is run by default."""

        task = self.tasks().get(name)
        if task is None:
            task = self.task(name, GroupTask)
        elif not isinstance(task, GroupTask):
            raise RuntimeError(f"{task.address!r} must be a GroupTask, but got {type(task).__name__}")
        if description is not None:
            task.description = description
        if default is not None:
            task.default = default

        return task
add_child
add_child(project: Project) -> None

Adds a project as a child project.

Raises:

Type Description
ValueError

If a member with the same name already exists or if the project's parent does not match

Source code in kraken/core/system/project.py
def add_child(self, project: Project) -> None:
    """Adds a project as a child project.

    Raises:
        ValueError: If a member with the same name already exists or if the project's parent does not match
    """

    if project.name in self._members:
        raise ValueError(f"{self} already has a member {project.name!r}, cannot add {project}")
    if project.parent is not self:
        raise ValueError(f"{project}.parent mismatch")
    self._members[project.name] = project
add_task
add_task(task: Task) -> None

Adds a task to the project.

Raises:

Type Description
ValueError

If a member with the same name already exists or if the task's project does not match

Source code in kraken/core/system/project.py
def add_task(self, task: Task) -> None:
    """Adds a task to the project.

    Raises:
        ValueError: If a member with the same name already exists or if the task's project does not match
    """

    if task.name in self._members:
        raise ValueError(f"{self} already has a member {task.name!r}, cannot add {task}")
    if task.project is not self:
        raise ValueError(f"{task}.project mismatch")
    self._members[task.name] = task
build_directory
build_directory() -> Path

Returns the recommended build directory for the project; this is a directory inside the context build directory ammended by the project name.

Source code in kraken/core/system/project.py
@property
def build_directory(self) -> Path:
    """Returns the recommended build directory for the project; this is a directory inside the context
    build directory ammended by the project name."""

    return self.context.build_directory / str(self.address).replace(":", "/").lstrip("/")
group
group(
    name: str,
    *,
    description: str | None = None,
    default: bool | None = None
) -> GroupTask

Create or get a group of the given name. If a task with the given name already exists, it must refer to a task of type :class:GroupTask, otherwise a :class:RuntimeError is raised.

:param name: The name of the group in the project. :param description: If specified, set the group's description. :param default: Whether the task group is run by default.

Source code in kraken/core/system/project.py
def group(self, name: str, *, description: str | None = None, default: bool | None = None) -> GroupTask:
    """Create or get a group of the given name. If a task with the given name already exists, it must refer
    to a task of type :class:`GroupTask`, otherwise a :class:`RuntimeError` is raised.

    :param name: The name of the group in the project.
    :param description: If specified, set the group's description.
    :param default: Whether the task group is run by default."""

    task = self.tasks().get(name)
    if task is None:
        task = self.task(name, GroupTask)
    elif not isinstance(task, GroupTask):
        raise RuntimeError(f"{task.address!r} must be a GroupTask, but got {type(task).__name__}")
    if description is not None:
        task.description = description
    if default is not None:
        task.default = default

    return task
has_subproject
has_subproject(name: str) -> bool

Returns True if name refers to a subproject that exists in the current project.

Source code in kraken/core/system/project.py
def has_subproject(self, name: str) -> bool:
    """
    Returns `True` if *name* refers to a subproject that exists in the current project.
    """

    return isinstance(self._members.get(name), Project)

Property

Bases: Supplier[T]

A property represents an input or output parameter of an :class:Object.

Source code in kraken/core/system/property.py
class Property(Supplier[T]):
    """A property represents an input or output parameter of an :class:`Object`."""

    class Deferred(Exception):
        """
        This exception is raised when an output property has no value set. It is distinct from the
        :class:`Supplier.Empty` exception in that it will propagate to the caller in any case.
        """

        def __init__(self, property: Property[Any], message: str | None = None) -> None:
            self.property = property
            self.message = message

        def __str__(self) -> str:
            if self.message:
                return f"{self.message} ({self.property})"
            else:
                return f"the value of {self.property} will be known at a later time"

    ValueAdapter = Callable[[Any], Any]

    # This dictionary is a registry for type adapters that are used to ensure that values passed
    # into a property with :meth:`set()` are of the appropriate type. If a type adapter for a
    # particular type does not exist, a basic type check is performed. Note that the type adaptation
    # is not particularly sophisticated at this point and will not apply on items in nested structures.
    VALUE_ADAPTERS: ClassVar[dict[type, ValueAdapter]] = {}

    @staticmethod
    def output(*, help: str | None = None) -> Any:
        """Assign the result of this function as a default value to a property on the class level of an :class:`Object`
        subclass to mark it as an output property. This is an alternative to using the :class:`typing.Annotated` type
        hint.

        .. code:: Example

            from kraken.core.system.property import Object, Property, output

            class MyObj(Object):
                a: Property[int] = output()
        """

        return PropertyConfig(output=True, help=help)

    @staticmethod
    def required(*, help: str | None = None) -> Any:
        """
        Assign the result of this function as a default value to a property class to declare that it is required. This
        is the default behaviour of the a property, so this function is only useful to specify a help text or to make
        it more explicit in the code.
        """

        return PropertyConfig(help=help)

    @staticmethod
    def default(value: Any, *, help: str | None = None) -> Any:
        """Assign the result of this function as a default value to a property to declare it's default value."""

        return PropertyConfig(default=value, help=help)

    @staticmethod
    def default_factory(func: Callable[[], Any], help: str | None = None) -> Any:
        """Assign the result of this function as a default value to a property to declare it's default factory."""

        return PropertyConfig(default_factory=func, help=help)

    def __init__(
        self,
        owner: PropertyContainer | type[PropertyContainer],
        name: str,
        item_type: TypeHint | Any,
        deferred: bool = False,
        help: str | None = None,
    ) -> None:
        """
        :param owner: The object that owns the property instance.
        :param name: The name of the property.
        :param item_type: The original inner type hint of the property (excluding the Property type itself).
        :param deferred: Whether the property should be initialized with a :class:`DeferredSupplier`.
        :param help: A help text for the property.
        """

        # NOTE(@NiklasRosenstein): We expect that any union member be a ClassTypeHint or TupleTypeHint.
        def _get_types(hint: TypeHint) -> tuple[type, ...]:
            if isinstance(hint, (ClassTypeHint, TupleTypeHint)):
                return (hint.type,)
            elif isinstance(hint, LiteralTypeHint):
                # TODO(@NiklasRosenstein): Add validation to the property to error if a bad value is set.
                return tuple({type(x) for x in hint.values})
            else:
                raise RuntimeError(f"unexpected Property type hint {hint!r}")

        # Determine the accepted types of the property.
        item_type = item_type if isinstance(item_type, TypeHint) else TypeHint(item_type)
        if isinstance(item_type, UnionTypeHint):
            accepted_types = tuple(concat(*map(_get_types, item_type)))
        else:
            accepted_types = _get_types(item_type)

        # Ensure that we have value adapters for every accepted type.
        for accepted_type in accepted_types:
            if accepted_type not in self.VALUE_ADAPTERS:
                if not isinstance(accepted_type, type):
                    raise ValueError(f"missing value adapter for type {accepted_type!r}")
        assert len(accepted_types) > 0

        self.owner = owner
        self.name = name
        self.help = help
        self.accepted_types = accepted_types
        self.item_type = item_type
        self._value: Supplier[T] = DeferredSupplier(self) if deferred else Supplier.void()
        self._derived_from: Sequence[Supplier[Any]] = ()
        self._finalized = False
        self._error_message: str | None = None

    def __repr__(self) -> str:
        try:
            owner_fmt = str(self.owner)
        except Exception:
            owner_fmt = type(self.owner).__name__ + "(<exception during fmt>)"
        return f"Property({owner_fmt}.{self.name})"

    def _adapt_value(self, value: Any) -> Any:
        errors = []
        for accepted_type in self.accepted_types:
            try:
                adapter = self.VALUE_ADAPTERS[accepted_type]
            except KeyError:
                if isinstance(accepted_type, type):
                    adapter = _type_checking_adapter(accepted_type)
                else:
                    raise
            try:
                return adapter(value)
            except TypeError as exc:
                errors.append(exc)
        raise TypeError(f"{self}: " + "\n".join(map(str, errors))) from (errors[0] if len(errors) == 1 else None)

    @property
    def value(self) -> Supplier[T]:
        return self._value

    def derived_from(self) -> Iterable[Supplier[Any]]:
        yield self._value
        yield from self._value.derived_from()
        yield from self._derived_from

    def get(self) -> T:
        try:
            return self._value.get()
        except Supplier.Empty:
            raise Supplier.Empty(self, self._error_message)

    def set(self, value: T | Supplier[T], derived_from: Iterable[Supplier[Any]] = ()) -> None:
        if self._finalized:
            raise RuntimeError(f"{self} is finalized")
        derived_from = list(derived_from)
        if not isinstance(value, Supplier):
            value = Supplier.of(self._adapt_value(value), derived_from)
            derived_from = ()
        self._value = value
        self._derived_from = derived_from

    def setcallable(self, func: Callable[[], T], derived_from: Iterable[Supplier[Any]] = ()) -> None:
        if self._finalized:
            raise RuntimeError(f"{self} is finalized")
        if not callable(func):
            raise TypeError('"func" must be callable')
        self._value = Supplier.of_callable(func, list(derived_from))
        self._derived_from = ()

    def setmap(self, func: Callable[[T], T]) -> None:
        if self._finalized:
            raise RuntimeError(f"{self} is finalized")
        if not callable(func):
            raise TypeError('"func" must be callable')
        self._value = self._value.map(func)

    def setdefault(self, value: T | Supplier[T]) -> None:
        if self._finalized:
            raise RuntimeError(f"{self} is finalized")
        if self._value.is_void():
            self.set(value)

    def setfinal(self, value: T | Supplier[T]) -> None:
        self.set(value)
        self.finalize()

    def seterror(self, message: str) -> None:
        """Set an error message that should be included when the property is read."""

        self._error_message = message

    def clear(self) -> None:
        self.set(Supplier.void())

    def finalize(self) -> None:
        """Prevent further modification of the value in the property."""

        if not self._finalized:
            self._finalized = True

    def provides(self, type_: type) -> bool:
        """Returns `True` if the property may provide an instance or a sequence of the given *type_*."""

        if isinstance(self.item_type, UnionTypeHint):
            types = list(self.item_type)
        elif isinstance(self.item_type, ClassTypeHint):
            types = [self.item_type]
        else:
            assert False, self.item_type

        for provided in types:
            if not isinstance(provided, ClassTypeHint):
                continue
            if issubclass(provided.type, type_):
                return True
            if issubclass(provided.type, Sequence) and provided.args and len(provided.args) == 1:
                inner = provided.args[0]
                if issubclass(inner, type_):
                    return True

        return False

    def get_of_type(self, type_: type[U]) -> list[U]:
        """Return the inner value or values of the property as a flat list of *t*. If the property returns only a
        a single value of the specified type, the returned list will contain only that value. If the property instead
        provides a sequence that contains one or more objects of the provided type, only those objects will be
        returned.

        Note that this does not work with generic parametrized types."""

        value = self.get()
        if type_ is not object and isinstance(value, type_):
            return [value]
        if isinstance(value, Sequence):
            return [x for x in value if isinstance(x, type_)]
        if type_ is object:
            return [cast(U, value)]
        return []

    @staticmethod
    def value_adapter(type_: type) -> Callable[[ValueAdapter], ValueAdapter]:
        """Decorator for functions that serve as a value adapter for the given *type_*."""

        def decorator(func: Property.ValueAdapter) -> Property.ValueAdapter:
            Property.VALUE_ADAPTERS[type_] = func
            return func

        return decorator

    def is_set(self) -> bool:
        """
        Returns #True if the property has been set to a value, #False otherwise. This is different from #is_empty(),
        because it does not require evaluation of the property value. This method reflects whether #set() has been
        called with any other value than a #VoidSupplier or a #DeferredSupplier.
        """

        return not self._value.is_void()

    # Supplier

    def is_empty(self) -> bool:
        if isinstance(self._value, DeferredSupplier):
            return True
        return super().is_empty()

    # Python Descriptor

    def __set__(self, instance: PropertyContainer, value: T | Supplier[T] | None) -> None:
        instance_prop = vars(instance)[self.name]
        assert isinstance(instance_prop, Property)
        if value is not None or type(None) in self.accepted_types:
            instance_prop.set(value)
        else:
            instance_prop.clear()

    def __get__(self, instance: PropertyContainer | None, owner: type[Any]) -> Property[T]:
        if instance is None:
            return self
        instance_prop = vars(instance)[self.name]
        assert isinstance(instance_prop, Property)
        return instance_prop
Deferred

Bases: Exception

This exception is raised when an output property has no value set. It is distinct from the :class:Supplier.Empty exception in that it will propagate to the caller in any case.

Source code in kraken/core/system/property.py
class Deferred(Exception):
    """
    This exception is raised when an output property has no value set. It is distinct from the
    :class:`Supplier.Empty` exception in that it will propagate to the caller in any case.
    """

    def __init__(self, property: Property[Any], message: str | None = None) -> None:
        self.property = property
        self.message = message

    def __str__(self) -> str:
        if self.message:
            return f"{self.message} ({self.property})"
        else:
            return f"the value of {self.property} will be known at a later time"
__init__
__init__(
    owner: PropertyContainer | type[PropertyContainer],
    name: str,
    item_type: TypeHint | Any,
    deferred: bool = False,
    help: str | None = None,
) -> None

:param owner: The object that owns the property instance. :param name: The name of the property. :param item_type: The original inner type hint of the property (excluding the Property type itself). :param deferred: Whether the property should be initialized with a :class:DeferredSupplier. :param help: A help text for the property.

Source code in kraken/core/system/property.py
def __init__(
    self,
    owner: PropertyContainer | type[PropertyContainer],
    name: str,
    item_type: TypeHint | Any,
    deferred: bool = False,
    help: str | None = None,
) -> None:
    """
    :param owner: The object that owns the property instance.
    :param name: The name of the property.
    :param item_type: The original inner type hint of the property (excluding the Property type itself).
    :param deferred: Whether the property should be initialized with a :class:`DeferredSupplier`.
    :param help: A help text for the property.
    """

    # NOTE(@NiklasRosenstein): We expect that any union member be a ClassTypeHint or TupleTypeHint.
    def _get_types(hint: TypeHint) -> tuple[type, ...]:
        if isinstance(hint, (ClassTypeHint, TupleTypeHint)):
            return (hint.type,)
        elif isinstance(hint, LiteralTypeHint):
            # TODO(@NiklasRosenstein): Add validation to the property to error if a bad value is set.
            return tuple({type(x) for x in hint.values})
        else:
            raise RuntimeError(f"unexpected Property type hint {hint!r}")

    # Determine the accepted types of the property.
    item_type = item_type if isinstance(item_type, TypeHint) else TypeHint(item_type)
    if isinstance(item_type, UnionTypeHint):
        accepted_types = tuple(concat(*map(_get_types, item_type)))
    else:
        accepted_types = _get_types(item_type)

    # Ensure that we have value adapters for every accepted type.
    for accepted_type in accepted_types:
        if accepted_type not in self.VALUE_ADAPTERS:
            if not isinstance(accepted_type, type):
                raise ValueError(f"missing value adapter for type {accepted_type!r}")
    assert len(accepted_types) > 0

    self.owner = owner
    self.name = name
    self.help = help
    self.accepted_types = accepted_types
    self.item_type = item_type
    self._value: Supplier[T] = DeferredSupplier(self) if deferred else Supplier.void()
    self._derived_from: Sequence[Supplier[Any]] = ()
    self._finalized = False
    self._error_message: str | None = None
default staticmethod
default(value: Any, *, help: str | None = None) -> Any

Assign the result of this function as a default value to a property to declare it's default value.

Source code in kraken/core/system/property.py
@staticmethod
def default(value: Any, *, help: str | None = None) -> Any:
    """Assign the result of this function as a default value to a property to declare it's default value."""

    return PropertyConfig(default=value, help=help)
default_factory staticmethod
default_factory(
    func: Callable[[], Any], help: str | None = None
) -> Any

Assign the result of this function as a default value to a property to declare it's default factory.

Source code in kraken/core/system/property.py
@staticmethod
def default_factory(func: Callable[[], Any], help: str | None = None) -> Any:
    """Assign the result of this function as a default value to a property to declare it's default factory."""

    return PropertyConfig(default_factory=func, help=help)
finalize
finalize() -> None

Prevent further modification of the value in the property.

Source code in kraken/core/system/property.py
def finalize(self) -> None:
    """Prevent further modification of the value in the property."""

    if not self._finalized:
        self._finalized = True
get_of_type
get_of_type(type_: type[U]) -> list[U]

Return the inner value or values of the property as a flat list of t. If the property returns only a a single value of the specified type, the returned list will contain only that value. If the property instead provides a sequence that contains one or more objects of the provided type, only those objects will be returned.

Note that this does not work with generic parametrized types.

Source code in kraken/core/system/property.py
def get_of_type(self, type_: type[U]) -> list[U]:
    """Return the inner value or values of the property as a flat list of *t*. If the property returns only a
    a single value of the specified type, the returned list will contain only that value. If the property instead
    provides a sequence that contains one or more objects of the provided type, only those objects will be
    returned.

    Note that this does not work with generic parametrized types."""

    value = self.get()
    if type_ is not object and isinstance(value, type_):
        return [value]
    if isinstance(value, Sequence):
        return [x for x in value if isinstance(x, type_)]
    if type_ is object:
        return [cast(U, value)]
    return []
is_set
is_set() -> bool

Returns #True if the property has been set to a value, #False otherwise. This is different from #is_empty(), because it does not require evaluation of the property value. This method reflects whether #set() has been called with any other value than a #VoidSupplier or a #DeferredSupplier.

Source code in kraken/core/system/property.py
def is_set(self) -> bool:
    """
    Returns #True if the property has been set to a value, #False otherwise. This is different from #is_empty(),
    because it does not require evaluation of the property value. This method reflects whether #set() has been
    called with any other value than a #VoidSupplier or a #DeferredSupplier.
    """

    return not self._value.is_void()
output staticmethod
output(*, help: str | None = None) -> Any

Assign the result of this function as a default value to a property on the class level of an :class:Object subclass to mark it as an output property. This is an alternative to using the :class:typing.Annotated type hint.

.. code:: Example

from kraken.core.system.property import Object, Property, output

class MyObj(Object):
    a: Property[int] = output()
Source code in kraken/core/system/property.py
@staticmethod
def output(*, help: str | None = None) -> Any:
    """Assign the result of this function as a default value to a property on the class level of an :class:`Object`
    subclass to mark it as an output property. This is an alternative to using the :class:`typing.Annotated` type
    hint.

    .. code:: Example

        from kraken.core.system.property import Object, Property, output

        class MyObj(Object):
            a: Property[int] = output()
    """

    return PropertyConfig(output=True, help=help)
provides
provides(type_: type) -> bool

Returns True if the property may provide an instance or a sequence of the given type_.

Source code in kraken/core/system/property.py
def provides(self, type_: type) -> bool:
    """Returns `True` if the property may provide an instance or a sequence of the given *type_*."""

    if isinstance(self.item_type, UnionTypeHint):
        types = list(self.item_type)
    elif isinstance(self.item_type, ClassTypeHint):
        types = [self.item_type]
    else:
        assert False, self.item_type

    for provided in types:
        if not isinstance(provided, ClassTypeHint):
            continue
        if issubclass(provided.type, type_):
            return True
        if issubclass(provided.type, Sequence) and provided.args and len(provided.args) == 1:
            inner = provided.args[0]
            if issubclass(inner, type_):
                return True

    return False
required staticmethod
required(*, help: str | None = None) -> Any

Assign the result of this function as a default value to a property class to declare that it is required. This is the default behaviour of the a property, so this function is only useful to specify a help text or to make it more explicit in the code.

Source code in kraken/core/system/property.py
@staticmethod
def required(*, help: str | None = None) -> Any:
    """
    Assign the result of this function as a default value to a property class to declare that it is required. This
    is the default behaviour of the a property, so this function is only useful to specify a help text or to make
    it more explicit in the code.
    """

    return PropertyConfig(help=help)
seterror
seterror(message: str) -> None

Set an error message that should be included when the property is read.

Source code in kraken/core/system/property.py
def seterror(self, message: str) -> None:
    """Set an error message that should be included when the property is read."""

    self._error_message = message
value_adapter staticmethod
value_adapter(
    type_: type,
) -> Callable[[ValueAdapter], ValueAdapter]

Decorator for functions that serve as a value adapter for the given type_.

Source code in kraken/core/system/property.py
@staticmethod
def value_adapter(type_: type) -> Callable[[ValueAdapter], ValueAdapter]:
    """Decorator for functions that serve as a value adapter for the given *type_*."""

    def decorator(func: Property.ValueAdapter) -> Property.ValueAdapter:
        Property.VALUE_ADAPTERS[type_] = func
        return func

    return decorator

Task

Bases: KrakenObject, PropertyContainer, ABC

A Kraken Task is a unit of work that can be executed.

Tasks goe through a number of stages during its lifetime:

  • Creation and configuration
  • Finalization (:meth:finalize) -- Mutations to properties of the task are locked after this.
  • Preparation (:meth:prepare) -- The task prepares itself for execution; it may indicate that it does not need to be executed at this state.
  • Execution (:meth:execute) -- The task executes its logic.

Tasks are uniquely identified by their name and the project they belong to, which is also represented by the tasks's :property:address. Relationhips to other tasks can be added via the :meth:depends_on and required_by methods, or by passing properties of one task into the properties of another.

Source code in kraken/core/system/task.py
class Task(KrakenObject, PropertyContainer, abc.ABC):
    """
    A Kraken Task is a unit of work that can be executed.

    Tasks goe through a number of stages during its lifetime:

    * Creation and configuration
    * Finalization (:meth:`finalize`) -- Mutations to properties of the task are locked after this.
    * Preparation (:meth:`prepare`) -- The task prepares itself for execution; it may indicate that it
        does not need to be executed at this state.
    * Execution (:meth:`execute`) -- The task executes its logic.

    Tasks are uniquely identified by their name and the project they belong to, which is also represented
    by the tasks's :property:`address`. Relationhips to other tasks can be added via the :meth:`depends_on`
    and `required_by` methods, or by passing properties of one task into the properties of another.
    """

    #: A human readable description of the task's purpose. This is displayed in the terminal upon
    #: closer inspection of a task.
    description: str | None = None

    #: Whether the task executes by default when no explicit task is selected to run on the command-line.
    default: bool = False

    #: Whether the task was explicitly selected on the command-line.
    selected: bool = False

    #: A logger that is bound to the task's address. Use this logger to log messages related to the task,
    #: for example when implementing :meth:`finalize`, :meth:`prepare` or :meth:`execute`.
    logger: logging.Logger

    def __init__(self, name: str, project: Project) -> None:
        from kraken.core.system.project import Project

        assert isinstance(name, str), type(name)
        assert isinstance(project, Project), type(project)
        KrakenObject.__init__(self, name, project)
        PropertyContainer.__init__(self)
        self.logger = logging.getLogger(f"{str(self.address)} [{type(self).__module__}.{type(self).__qualname__}]")
        self._outputs: list[Any] = []
        self.__tags: dict[str, set[TaskTag]] = {}
        self.__relationships: list[_Relationship[Address | Task]] = []

    def __repr__(self) -> str:
        return f"{type(self).__name__}({self.address})"

    @property
    def project(self) -> Project:
        """
        A convenient alias for :attr:`parent` which is a lot easier to understand when reading the code.
        """

        from kraken.core.system.project import Project

        assert isinstance(self._parent, Project), "Task.parent must be a Project"
        return self._parent

    @property
    # @deprecated(reason="Task.outputs is deprecated.")
    def outputs(self) -> list[Any]:
        return self._outputs

    def add_tag(self, name: str, *, reason: str, origin: str | None = None) -> None:
        """
        Add a tag to this task. The built-in tag "skip" is used to indicate that a task should not be executed.
        """

        if name not in self.__tags:
            self.__tags[name] = set()

        logger.debug("Adding tag {!r} (reason: {!r}, origin: {!r}) to {}", name, reason, origin, self.address)
        self.__tags[name].add(TaskTag(name, reason, origin))

    def remove_tag(self, tag: TaskTag) -> None:
        """
        Remove a tag from the task. If the tag does not exist, this is a no-op.
        """

        try:
            self.__tags[tag.name].discard(tag)
        except KeyError:
            logger.debug("Attempted to remove tag {!r} from {}, but it does not exist", tag, self.address)
            pass
        else:
            logger.debug("Removed tag {!r} from {}", tag, self.address)

    def get_tags(self, name: str) -> Collection[TaskTag]:
        """
        Get all tags of the specified name.
        """

        return self.__tags.get(name, set())

    # End: Deprecated APIs

    def depends_on(
        self, *tasks: Task | Address | str, mode: RelationshipMode = "strict", _inverse: bool = False
    ) -> None:
        """
        Declare that this task depends on the specified other tasks. Relationships are lazy, meaning references
        to tasks using an address will be evaluated when :meth:`get_relationships` is called.

        If the *mode* is set to `strict`, the relationship is considered a strong dependency, meaning that the
        dependent task must be executed after the dependency. If the *mode* is set to `order-only`, the relationship
        indicates only the order in which the tasks must be executed if both were to be executed in the same run.
        """

        for idx, task in enumerate(tasks):
            if isinstance(task, str):
                task = Address(task)
            if not isinstance(task, Address | Task):
                raise TypeError(f"tasks[{idx}] must be Address | Task | str, got {type(task).__name__}")
            self.__relationships.append(_Relationship(task, mode == "strict", _inverse))

    def required_by(self, *tasks: Task | Address | str, mode: RelationshipMode = "strict") -> None:
        """
        Declare that this task is required by the specified other tasks. This is the inverse of :meth:`depends_on`,
        effectively declaring the same relationship in the opposite direction.
        """

        self.depends_on(*tasks, mode=mode, _inverse=True)

    def get_properties(self) -> Iterable[Property[Any]]:
        for key in self.__schema__:
            property: Property[Any] = getattr(self, key)
            yield property

    def get_relationships(self) -> Iterable[TaskRelationship]:
        """
        Return an iterable that yields all relationships that this task has to other tasks as indicated by
        information available in the task itself. The method will not return relationships established to
        this task from other tasks.

        The iterable will contain every relationship that is declared via :meth:`depends_on` or :meth:`required_by`,
        as well as relationships that are implied by the task's properties. For example, if a property of this
        task is set to the value of a property of another task, a relationship is implied between the tasks.
        """

        # Derive dependencies through property lineage.
        for property in self.get_properties():
            for supplier, _ in property.lineage():
                if supplier is property:
                    continue
                if isinstance(supplier, Property) and isinstance(supplier.owner, Task) and supplier.owner is not self:
                    yield TaskRelationship(supplier.owner, True, False)
                if isinstance(supplier, TaskSupplier):
                    yield TaskRelationship(supplier.get(), True, False)

        # Manually added relationships.
        for rel in self.__relationships:
            if isinstance(rel.other_task, Address):
                try:
                    resolved_tasks = self.project.context.resolve_tasks([rel.other_task], relative_to=self.project)
                except ValueError as exc:
                    raise ValueError(f"in task {self.address}: {exc}")
                for task in resolved_tasks:
                    yield TaskRelationship(task, rel.strict, rel.inverse)
            else:
                assert isinstance(rel.other_task, Task)
                yield cast(TaskRelationship, rel)

    def get_description(self) -> str | None:
        """
        Return the task's description. The default implementation formats the :attr:`description` string with the
        task's properties. Any Path property will be converted to a relative string to assist the reader.
        """

        class _MappingProxy:
            def __getitem__(_, key: str) -> Any:
                if key not in type(self).__schema__:
                    return f"%({key})s"
                prop = getattr(self, key)
                try:
                    value = prop.get()
                except Supplier.Empty:
                    return "<empty>"
                else:
                    if isinstance(value, Path):
                        try:
                            value = value.relative_to(Path.cwd())
                        except ValueError:
                            pass
                    return value

        if self.description:
            return self.description % _MappingProxy()
        return None

    @overload
    def get_outputs(self) -> Iterable[Any]:
        """Iterate over all outputs of the task. This includes all outputs in :attr:`Task.outputs` and the values
        in all properties defines as outputs. All output properties that return a sequence will be flattened.

        This should be called only after the task was executed, otherwise the output properties are likely empty
        and will error when read."""

    @overload
    def get_outputs(self, output_type: type[T]) -> Iterable[T]:
        """Iterate over all outputs of the task of the specified *output_type*. If a property provides a sequence of
        values of the *output_type*, that list is flattened.

        This should be called only after the task was executed, otherwise the output properties are likely empty
        and will error when read.

        :param output_type: The output type to search for."""

    # @deprecated(reason="Rely on the target-rule system to derive the artifacts of a task.")
    def get_outputs(self, output_type: type[T] | type[object] = object) -> Iterable[T] | Iterable[Any]:
        results = []

        for property_name, property_desc in self.__schema__.items():
            if not property_desc.is_output:
                continue
            property: Property[Any] = getattr(self, property_name)
            if property.provides(output_type):
                results += property.get_of_type(output_type)

        for obj in self.outputs:
            if isinstance(obj, output_type):
                results.append(obj)

        return results

    def finalize(self) -> None:
        """
        This method is called by :meth:`Context.finalize()`. It gives the task a chance update its
        configuration before the build process is executed. The default implementation finalizes all non-output
        properties, preventing them to be further mutated.
        """

        for key in self.__schema__:
            prop: Property[Any] = getattr(self, key)
            if not self.__schema__[key].is_output:
                prop.finalize()

    def prepare(self) -> TaskStatus | None:
        """
        Called before a task is executed. This is called from the main process to check for example if the task
        is skippable or up to date. The implementation of this method should be quick to determine the task status,
        otherwise it should be done in :meth:`execute`.

        This method should not return :attr:`TaskStatusType.SUCCEEDED` or :attr:`TaskStatusType.FAILED`. If `None`
        is returned, it is assumed that the task is :attr:`TaskStatusType.PENDING`.
        """

        return TaskStatus.pending()

    @abc.abstractmethod
    def execute(self) -> TaskStatus | None:
        """
        Implements the behaviour of the task. The task can assume that all strict dependencies have been executed
        successfully. Output properties of dependency tasks that are only written by the task's execution are now
        accessible.

        This method should not return :attr:`TaskStatusType.PENDING`. If `None` is returned, it is assumed that the
        task is :attr:`TaskStatusType.SUCCEEDED`. If the task fails, it should return :attr:`TaskStatusType.FAILED`.
        If an exception is raised during this method, the task status is also assumed to be
        :attr:`TaskStatusType.FAILED`. If the task finished successfully but with warnings, it should return
        :attr:`TaskStatusType.WARNING`.
        """

        raise NotImplementedError

    def teardown(self) -> TaskStatus | None:
        """
        This method is called only if the task returns :attr:`TaskStatusType.STARTED` from :meth:`execute`. It is
        called if _all_ direct dependants of the task have been executed (whether successfully or not) or if no further
        task execution is queued.
        """

        return None
project property
project: Project

A convenient alias for :attr:parent which is a lot easier to understand when reading the code.

add_tag
add_tag(
    name: str, *, reason: str, origin: str | None = None
) -> None

Add a tag to this task. The built-in tag "skip" is used to indicate that a task should not be executed.

Source code in kraken/core/system/task.py
def add_tag(self, name: str, *, reason: str, origin: str | None = None) -> None:
    """
    Add a tag to this task. The built-in tag "skip" is used to indicate that a task should not be executed.
    """

    if name not in self.__tags:
        self.__tags[name] = set()

    logger.debug("Adding tag {!r} (reason: {!r}, origin: {!r}) to {}", name, reason, origin, self.address)
    self.__tags[name].add(TaskTag(name, reason, origin))
depends_on
depends_on(
    *tasks: Task | Address | str,
    mode: RelationshipMode = "strict",
    _inverse: bool = False
) -> None

Declare that this task depends on the specified other tasks. Relationships are lazy, meaning references to tasks using an address will be evaluated when :meth:get_relationships is called.

If the mode is set to strict, the relationship is considered a strong dependency, meaning that the dependent task must be executed after the dependency. If the mode is set to order-only, the relationship indicates only the order in which the tasks must be executed if both were to be executed in the same run.

Source code in kraken/core/system/task.py
def depends_on(
    self, *tasks: Task | Address | str, mode: RelationshipMode = "strict", _inverse: bool = False
) -> None:
    """
    Declare that this task depends on the specified other tasks. Relationships are lazy, meaning references
    to tasks using an address will be evaluated when :meth:`get_relationships` is called.

    If the *mode* is set to `strict`, the relationship is considered a strong dependency, meaning that the
    dependent task must be executed after the dependency. If the *mode* is set to `order-only`, the relationship
    indicates only the order in which the tasks must be executed if both were to be executed in the same run.
    """

    for idx, task in enumerate(tasks):
        if isinstance(task, str):
            task = Address(task)
        if not isinstance(task, Address | Task):
            raise TypeError(f"tasks[{idx}] must be Address | Task | str, got {type(task).__name__}")
        self.__relationships.append(_Relationship(task, mode == "strict", _inverse))
execute abstractmethod
execute() -> TaskStatus | None

Implements the behaviour of the task. The task can assume that all strict dependencies have been executed successfully. Output properties of dependency tasks that are only written by the task's execution are now accessible.

This method should not return :attr:TaskStatusType.PENDING. If None is returned, it is assumed that the task is :attr:TaskStatusType.SUCCEEDED. If the task fails, it should return :attr:TaskStatusType.FAILED. If an exception is raised during this method, the task status is also assumed to be :attr:TaskStatusType.FAILED. If the task finished successfully but with warnings, it should return :attr:TaskStatusType.WARNING.

Source code in kraken/core/system/task.py
@abc.abstractmethod
def execute(self) -> TaskStatus | None:
    """
    Implements the behaviour of the task. The task can assume that all strict dependencies have been executed
    successfully. Output properties of dependency tasks that are only written by the task's execution are now
    accessible.

    This method should not return :attr:`TaskStatusType.PENDING`. If `None` is returned, it is assumed that the
    task is :attr:`TaskStatusType.SUCCEEDED`. If the task fails, it should return :attr:`TaskStatusType.FAILED`.
    If an exception is raised during this method, the task status is also assumed to be
    :attr:`TaskStatusType.FAILED`. If the task finished successfully but with warnings, it should return
    :attr:`TaskStatusType.WARNING`.
    """

    raise NotImplementedError
finalize
finalize() -> None

This method is called by :meth:Context.finalize(). It gives the task a chance update its configuration before the build process is executed. The default implementation finalizes all non-output properties, preventing them to be further mutated.

Source code in kraken/core/system/task.py
def finalize(self) -> None:
    """
    This method is called by :meth:`Context.finalize()`. It gives the task a chance update its
    configuration before the build process is executed. The default implementation finalizes all non-output
    properties, preventing them to be further mutated.
    """

    for key in self.__schema__:
        prop: Property[Any] = getattr(self, key)
        if not self.__schema__[key].is_output:
            prop.finalize()
get_description
get_description() -> str | None

Return the task's description. The default implementation formats the :attr:description string with the task's properties. Any Path property will be converted to a relative string to assist the reader.

Source code in kraken/core/system/task.py
def get_description(self) -> str | None:
    """
    Return the task's description. The default implementation formats the :attr:`description` string with the
    task's properties. Any Path property will be converted to a relative string to assist the reader.
    """

    class _MappingProxy:
        def __getitem__(_, key: str) -> Any:
            if key not in type(self).__schema__:
                return f"%({key})s"
            prop = getattr(self, key)
            try:
                value = prop.get()
            except Supplier.Empty:
                return "<empty>"
            else:
                if isinstance(value, Path):
                    try:
                        value = value.relative_to(Path.cwd())
                    except ValueError:
                        pass
                return value

    if self.description:
        return self.description % _MappingProxy()
    return None
get_relationships
get_relationships() -> Iterable[TaskRelationship]

Return an iterable that yields all relationships that this task has to other tasks as indicated by information available in the task itself. The method will not return relationships established to this task from other tasks.

The iterable will contain every relationship that is declared via :meth:depends_on or :meth:required_by, as well as relationships that are implied by the task's properties. For example, if a property of this task is set to the value of a property of another task, a relationship is implied between the tasks.

Source code in kraken/core/system/task.py
def get_relationships(self) -> Iterable[TaskRelationship]:
    """
    Return an iterable that yields all relationships that this task has to other tasks as indicated by
    information available in the task itself. The method will not return relationships established to
    this task from other tasks.

    The iterable will contain every relationship that is declared via :meth:`depends_on` or :meth:`required_by`,
    as well as relationships that are implied by the task's properties. For example, if a property of this
    task is set to the value of a property of another task, a relationship is implied between the tasks.
    """

    # Derive dependencies through property lineage.
    for property in self.get_properties():
        for supplier, _ in property.lineage():
            if supplier is property:
                continue
            if isinstance(supplier, Property) and isinstance(supplier.owner, Task) and supplier.owner is not self:
                yield TaskRelationship(supplier.owner, True, False)
            if isinstance(supplier, TaskSupplier):
                yield TaskRelationship(supplier.get(), True, False)

    # Manually added relationships.
    for rel in self.__relationships:
        if isinstance(rel.other_task, Address):
            try:
                resolved_tasks = self.project.context.resolve_tasks([rel.other_task], relative_to=self.project)
            except ValueError as exc:
                raise ValueError(f"in task {self.address}: {exc}")
            for task in resolved_tasks:
                yield TaskRelationship(task, rel.strict, rel.inverse)
        else:
            assert isinstance(rel.other_task, Task)
            yield cast(TaskRelationship, rel)
get_tags
get_tags(name: str) -> Collection[TaskTag]

Get all tags of the specified name.

Source code in kraken/core/system/task.py
def get_tags(self, name: str) -> Collection[TaskTag]:
    """
    Get all tags of the specified name.
    """

    return self.__tags.get(name, set())
prepare
prepare() -> TaskStatus | None

Called before a task is executed. This is called from the main process to check for example if the task is skippable or up to date. The implementation of this method should be quick to determine the task status, otherwise it should be done in :meth:execute.

This method should not return :attr:TaskStatusType.SUCCEEDED or :attr:TaskStatusType.FAILED. If None is returned, it is assumed that the task is :attr:TaskStatusType.PENDING.

Source code in kraken/core/system/task.py
def prepare(self) -> TaskStatus | None:
    """
    Called before a task is executed. This is called from the main process to check for example if the task
    is skippable or up to date. The implementation of this method should be quick to determine the task status,
    otherwise it should be done in :meth:`execute`.

    This method should not return :attr:`TaskStatusType.SUCCEEDED` or :attr:`TaskStatusType.FAILED`. If `None`
    is returned, it is assumed that the task is :attr:`TaskStatusType.PENDING`.
    """

    return TaskStatus.pending()
remove_tag
remove_tag(tag: TaskTag) -> None

Remove a tag from the task. If the tag does not exist, this is a no-op.

Source code in kraken/core/system/task.py
def remove_tag(self, tag: TaskTag) -> None:
    """
    Remove a tag from the task. If the tag does not exist, this is a no-op.
    """

    try:
        self.__tags[tag.name].discard(tag)
    except KeyError:
        logger.debug("Attempted to remove tag {!r} from {}, but it does not exist", tag, self.address)
        pass
    else:
        logger.debug("Removed tag {!r} from {}", tag, self.address)
required_by
required_by(
    *tasks: Task | Address | str,
    mode: RelationshipMode = "strict"
) -> None

Declare that this task is required by the specified other tasks. This is the inverse of :meth:depends_on, effectively declaring the same relationship in the opposite direction.

Source code in kraken/core/system/task.py
def required_by(self, *tasks: Task | Address | str, mode: RelationshipMode = "strict") -> None:
    """
    Declare that this task is required by the specified other tasks. This is the inverse of :meth:`depends_on`,
    effectively declaring the same relationship in the opposite direction.
    """

    self.depends_on(*tasks, mode=mode, _inverse=True)
teardown
teardown() -> TaskStatus | None

This method is called only if the task returns :attr:TaskStatusType.STARTED from :meth:execute. It is called if all direct dependants of the task have been executed (whether successfully or not) or if no further task execution is queued.

Source code in kraken/core/system/task.py
def teardown(self) -> TaskStatus | None:
    """
    This method is called only if the task returns :attr:`TaskStatusType.STARTED` from :meth:`execute`. It is
    called if _all_ direct dependants of the task have been executed (whether successfully or not) or if no further
    task execution is queued.
    """

    return None

TaskGraph

Bases: Graph

The task graph represents a Kraken context's tasks as a directed acyclic graph data structure.

Before a task graph is passed to an executor, it is usually trimmed to contain only the tasks that are needed for the successful and complete execution of the desired set of "goal tasks".

Source code in kraken/core/system/graph.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
class TaskGraph(Graph):
    """The task graph represents a Kraken context's tasks as a directed acyclic graph data structure.

    Before a task graph is passed to an executor, it is usually trimmed to contain only the tasks that are
    needed for the successful and complete execution of the desired set of "goal tasks"."""

    def __init__(self, context: Context, populate: bool = True, parent: TaskGraph | None = None) -> None:
        """Create a new build graph from the given task list.

        :param context: The context that the graph belongs to.
        :param populate: If enabled, the task graph will be immediately populated with the tasks in the context.
            The graph can also be later populated with the :meth:`populate` method.
        """

        self._parent = parent
        self._context = context

        # Nodes have the form {'data': _Node} and edges have the form {'data': _Edge}.
        # NOTE: DiGraph is not runtime-subscriptable.
        self._digraph: DiGraph[Address] = DiGraph()

        # Keep track of task execution results.
        self._results: dict[Address, TaskStatus] = {}

        # All tasks that have a successful or skipped status are stored here.
        self._ok_tasks: set[Address] = set()

        # All tasks that have a failed status are stored here.
        self._failed_tasks: set[Address] = set()

        # Keep track of the tasks that returned TaskStatus.STARTED. That means the task is a background task, and
        # if the TaskGraph is deserialized from a state file to continue the build, background tasks need to be
        # reset so they start again if another task requires them.
        self._background_tasks: set[Address] = set()

        if populate:
            self.populate()

    def __bool__(self) -> bool:
        return len(self._digraph.nodes) > 0

    def __len__(self) -> int:
        return len(self._digraph.nodes)

    # Low level internal API

    def _get_task(self, addr: Address) -> Task | None:
        assert isinstance(addr, Address), type(addr)
        data = self._digraph.nodes.get(addr)
        if data is None:
            return None
        try:
            return cast(Task, data["data"])
        except KeyError:
            raise RuntimeError(f"An unexpected error occurred when fetching the task by address {addr!r}.")

    def _add_task(self, task: Task) -> None:
        self._digraph.add_node(task.address, data=task)
        for rel in task.get_relationships():
            if rel.other_task.address not in self._digraph.nodes:
                self._add_task(rel.other_task)
            a, b = (task, rel.other_task) if rel.inverse else (rel.other_task, task)
            self._add_edge(a.address, b.address, rel.strict, False)

            # If this relationship is one implied through group membership, we're done.
            if isinstance(task, GroupTask) and not rel.inverse and rel.other_task in task.tasks:
                continue

            # When a group depends on some other task, we implicitly make each member of that downstream group
            # depend on the upstream task. If we find another group, we unpack the group further.
            upstream, downstream = (task, rel.other_task) if rel.inverse else (rel.other_task, task)
            if isinstance(downstream, GroupTask):
                downstream_tasks = list(downstream.tasks)
                while downstream_tasks:
                    member = downstream_tasks.pop(0)
                    if member.address not in self._digraph.nodes:
                        self._add_task(member)
                    if isinstance(member, GroupTask):
                        downstream_tasks += member.tasks
                        continue

                    # NOTE(niklas.rosenstein): When a group is nested in another group, we would end up declaring
                    #       that the group depends on itself. That's obviously not supposed to happen. :)
                    if upstream != member:
                        self._add_edge(upstream.address, member.address, rel.strict, True)

    def _get_edge(self, task_a: Address, task_b: Address) -> _Edge | None:
        data = self._digraph.edges.get((task_a, task_b)) or self._digraph.edges.get((task_a, task_b))
        if data is None:
            return None
        return cast(_Edge, data["data"])

    def _add_edge(self, task_a: Address, task_b: Address, strict: bool, implicit: bool) -> None:
        # add_edge() would implicitly add a node, we only want to do that once the node actually exists in
        # the graph though.
        assert task_a in self._digraph.nodes, f"{task_a!r} not yet in the graph"
        assert task_b in self._digraph.nodes, f"{task_b!r} not yet in the graph"
        edge = self._get_edge(task_a, task_b) or _Edge(strict, implicit)
        edge.strict = edge.strict or strict
        edge.implicit = edge.implicit and implicit
        self._digraph.add_edge(task_a, task_b, data=edge)

    # High level internal API

    def _get_required_tasks(self, goals: Iterable[Task]) -> set[Address]:
        """Internal. Return the set of tasks that are required transitively from the goal tasks."""

        def _is_empty_group_subtree(addr: Address) -> bool:
            """
            Returns `True` if the task pointed to by *addr* is a GroupTask and it is empty or only depends on
            other empty groups.
            """

            def _is_empty_group(addr: Address) -> bool:
                """Returns `True` if the task pointed to by *addr* is a GroupTask and it is empty."""

                task = self._get_task(addr)
                if not isinstance(task, GroupTask):
                    return False
                return len(task.tasks) == 0

            def _is_empty_group_or_subtree(addr: Address) -> bool:
                """Returns `True` if the task pointed to by *addr* is a GroupTask and it is empty or only depends
                on other empty groups."""

                task = self._get_task(addr)
                if not isinstance(task, GroupTask):
                    return False
                for pred in self._digraph.predecessors(addr):
                    if not _is_empty_group_or_subtree(pred):
                        return False
                return True

            return _is_empty_group(addr) or _is_empty_group_or_subtree(addr)

        def _recurse_task(addr: Address, visited: set[Address], path: list[Address]) -> None:
            if addr in path:
                raise RuntimeError(f"encountered a dependency cycle: {' → '.join(map(str, path))}")
            visited.add(addr)
            for pred in self._digraph.predecessors(addr):
                if self.get_edge(pred, addr).strict:
                    # If the thing we want to pick up is a GroupTask and it doesn't have any members or other
                    # dependencies that are not also empty groups, we can skip it. It really doesn't need to be in the
                    # build graph.
                    if isinstance(self._get_task(pred), GroupTask):
                        # Check if the group is empty or only depends on other empty groups.
                        if _is_empty_group_subtree(pred):
                            continue
                    _recurse_task(pred, visited, path + [addr])

        active_tasks: set[Address] = set()
        for task in goals:
            _recurse_task(task.address, active_tasks, [])

        return active_tasks

    def _remove_nodes_keep_transitive_edges(self, nodes: Iterable[Address]) -> None:
        """Internal. Remove nodes from the graph, but ensure that transitive dependencies are kept in tact."""

        for addr in nodes:
            for in_task_path in self._digraph.predecessors(addr):
                in_edge = self.get_edge(in_task_path, addr)
                for out_task_path in self._digraph.successors(addr):
                    out_edge = self.get_edge(addr, out_task_path)
                    self._add_edge(
                        in_task_path,
                        out_task_path,
                        strict=in_edge.strict or out_edge.strict,
                        implicit=in_edge.implicit and out_edge.implicit,
                    )
            self._digraph.remove_node(addr)

    def _get_ready_graph(self) -> DiGraph[Address]:
        """Updates the ready graph. Remove all ok tasks (successful or skipped) and any non-strict dependencies
        (edges) on failed tasks."""

        removable_edges: set[tuple[Address, Address]] = set()

        def set_non_strict_edge_for_removal(u: Address, v: Address) -> None:
            out_edge = self.get_edge(u, v)
            if not out_edge.strict:
                removable_edges.add((u, v))

        for failed_task_path in self._failed_tasks:
            for out_task_path in self._digraph.successors(failed_task_path):
                out_task = self._digraph.nodes[out_task_path]["data"]

                if isinstance(out_task, GroupTask):
                    # If the successor is a group task, check that the all of the groups tasks are either successful
                    # or failed, and then remove any non strict dependency (edge) on said group task.
                    group_task_paths = {task.address for task in out_task.tasks}
                    if not group_task_paths.issubset(self._failed_tasks | self._ok_tasks):
                        continue

                    for group_successor_path in self._digraph.successors(out_task_path):
                        set_non_strict_edge_for_removal(out_task_path, group_successor_path)
                else:
                    set_non_strict_edge_for_removal(failed_task_path, out_task_path)

        return cast("DiGraph[Address]", restricted_view(self._digraph, self._ok_tasks, removable_edges))  # type: ignore[no-untyped-call]

    # Public API

    @property
    def context(self) -> Context:
        return self._context

    @property
    def parent(self) -> TaskGraph | None:
        return self._parent

    @property
    def root(self) -> TaskGraph:
        if self._parent:
            return self._parent.root
        return self

    def get_edge(self, pred: Task | Address, succ: Task | Address) -> _Edge:
        if isinstance(pred, Task):
            pred = pred.address
        if isinstance(succ, Task):
            succ = succ.address
        return not_none(self._get_edge(pred, succ), f"edge does not exist ({pred} --> {succ})")

    def get_predecessors(self, task: Task, ignore_groups: bool = False) -> list[Task]:
        """Returns the predecessors of the task in the original full build graph."""

        result = []
        for task in (self.get_task(addr) for addr in self._digraph.predecessors(task.address)):
            if ignore_groups and isinstance(task, GroupTask):
                result += task.tasks
            else:
                result.append(task)
        return result

    def get_status(self, task: Task) -> TaskStatus | None:
        """Return the status of a task."""

        return self._results.get(task.address)

    def populate(self, goals: Iterable[Task] | None = None) -> None:
        """Populate the graph with the tasks from the context. This need only be called if the graph was
        not initially populated in the constructor.

        !!! warning "Inverse relationships"

            This does not recognize inverse relationships from tasks that are not part of *goals* or
            any of their relationships. It is therefore recommended to populate the graph with all tasks in the
            context and use #trim() to reduce the graph.
        """

        if goals is None:
            for project in self.context.iter_projects():
                for task in project.tasks().values():
                    if task.address not in self._digraph.nodes:
                        self._add_task(task)
        else:
            for task in goals:
                if task.address not in self._digraph.nodes:
                    self._add_task(task)

    def trim(self, goals: Sequence[Task]) -> TaskGraph:
        """Returns a copy of the graph that is trimmed to execute only *goals* and their strict dependencies."""

        graph = TaskGraph(self.context, parent=self)
        unrequired_tasks = set(graph._digraph.nodes) - graph._get_required_tasks(goals)
        graph._remove_nodes_keep_transitive_edges(unrequired_tasks)
        graph.results_from(self)
        return graph

    def reduce(self, keep_explicit: bool = False) -> TaskGraph:
        """Return a copy of the task graph that has been transitively reduced.

        :param keep_explicit: Keep non-implicit edges in tact."""

        digraph = self._digraph
        reduced_graph = transitive_reduction(digraph)
        reduced_graph.add_nodes_from(digraph.nodes(data=True))
        reduced_graph.add_edges_from(
            (u, v, digraph.edges[u, v])
            for u, v in digraph.edges
            if (keep_explicit and not digraph.edges[u, v]["data"].implicit) or (u, v) in reduced_graph.edges
        )

        graph = TaskGraph(self.context, populate=False, parent=self)
        graph._digraph = reduced_graph
        graph.results_from(self)

        return graph

    def results_from(self, other: TaskGraph) -> None:
        """Merge the results from the *other* graph into this graph. Only takes the results of tasks that are
        known to the graph. If the same task has a result in both graphs, and one task result is not successful,
        the not successful result is preferred."""

        self._results = {**other._results, **self._results}
        self._ok_tasks.update(other._ok_tasks)
        self._failed_tasks.update(other._failed_tasks)

        for task in self.tasks():
            status_a = self._results.get(task.address)
            status_b = other._results.get(task.address)
            if status_a is not None and status_b is not None and status_a.type != status_b.type:
                resolved_status: TaskStatus | None = status_a if status_a.is_not_ok() else status_b
            else:
                resolved_status = status_a or status_b
            if resolved_status is not None:
                # NOTE: This will already take care of updating :attr:`_background_tasks`.
                self.set_status(task, resolved_status, _force=True)

    def resume(self) -> None:
        """Reset the result of all background tasks that are required by any pending tasks. This needs to be
        called when a build graph is resumed in a secondary execution to ensure that background tasks are active
        for the tasks that require them."""

        reset_tasks: set[Address] = set()
        for task in self.tasks(pending=True):
            for pred in self.get_predecessors(task, ignore_groups=True):
                if pred.address in self._background_tasks:
                    self._background_tasks.discard(pred.address)
                    self._ok_tasks.discard(pred.address)
                    self._failed_tasks.discard(pred.address)
                    self._results.pop(pred.address, None)
                    reset_tasks.add(pred.address)

        if reset_tasks:
            logger.info(
                "Reset the status of %d background task(s): %s", len(reset_tasks), " ".join(map(str, reset_tasks))
            )

    def restart(self) -> None:
        """Discard the results of all tasks."""

        self._results.clear()
        self._ok_tasks.clear()
        self._background_tasks.clear()
        self._failed_tasks.clear()

    def tasks(
        self,
        goals: bool = False,
        pending: bool = False,
        failed: bool = False,
        not_executed: bool = False,
    ) -> Iterator[Task]:
        """Returns the tasks in the graph in arbitrary order.

        :param goals: Return only goal tasks (i.e. leaf nodes).
        :param pending: Return only pending tasks.
        :param failed: Return only failed tasks.
        :param not_executed: Return only not executed tasks (i.e. downstream of failed tasks)"""

        tasks = (self.get_task(addr) for addr in self._digraph)
        if goals:
            # HACK: Cast because of https://github.com/python/typeshed/pull/12472
            tasks = (t for t in tasks if cast(int, self._digraph.out_degree(t.address)) == 0)
        if pending:
            tasks = (t for t in tasks if t.address not in self._results)
        if failed:
            tasks = (t for t in tasks if t.address in self._results and self._results[t.address].is_failed())
        if not_executed:
            tasks = (
                t
                for t in tasks
                if (
                    (t.address not in self._results)
                    or (t.address in self._results and self._results[t.address].is_pending())
                )
            )
        return tasks

    def execution_order(self, all: bool = False) -> Iterable[Task]:
        """Returns all tasks in the order they need to be executed.

        :param all: Return the execution order of all tasks, not just from the target subgraph."""

        order = topological_sort(self._digraph if all else self._get_ready_graph())
        return (self.get_task(addr) for addr in order)

    def mark_tasks_as_skipped(
        self,
        tasks: Sequence[Task | str | Address] = (),
        recursive_tasks: Sequence[Task | str | Address] = (),
        *,
        set_status: bool = False,
        reason: str,
        origin: str,
        reset: bool,
    ) -> None:
        """
        This method adds the `"skip"` tag to all *tasks* and *recursive_tasks*. For the dependencies of the
        *recursive_tasks*, the tag will only be added if the task in question is not required by another task
        that is not being skipped.

        :param set_status: Whether to set #TaskStatusType.SKIPPED for tasks in the graph using #set_status().
        :param reason: A reason to attach to the `"skip"` tag.
        :param origin: An origin to attach to the `"skip"` tag.
        :param reset: Enable this to remove the `"skip"` tags of the same *origin* are removed from all mentioned
            tasks (including transtive dependencies for *recursive_tasks*) the graph first. Note that this does not
            unset any pre-existing task statuses.
        """

        tasks = self.context.resolve_tasks(tasks)
        recursive_tasks = self.context.resolve_tasks(recursive_tasks)

        def get_skip_tag(task: Task) -> TaskTag | None:
            """Return the skip tag associated with this mark operation (i.e., "skip" tags of the same origin)."""

            return next((t for t in task.get_tags("skip") if t.origin == origin), None)

        def iter_predecessors(tasks: Iterable[Task], blackout: Collection[Task]) -> Iterable[Task]:
            """
            Iterate over the predecessors of the given tasks. Skip yielding and recursive iterating over tasks in
            *blackout*.
            """

            stack = list(tasks)
            while stack:
                task = stack.pop()
                if task not in blackout:
                    yield task
                    stack.extend(self.get_predecessors(task))

        # This algorithm works in multiple phases:
        #
        # (1) We gather all tasks that are definitely skipped and mark them with the color "red". We are certain
        #     that these tasks must be skipped because they are either already marked so in the graph, or they are
        #     tasks mentioned directly in the arguments to this function call.
        #
        # (2) We mark the subgraphs (i.e. predecessors) of all recursive_tasks with the color "blue". These are tasks
        #     that can potentially be skipped as well, but we are not sure yet.
        #
        # (3) We walk back through the entire task graph from its leafs, discoloring any "blue" task that we encounter.
        #     If we encounter a "red" task, we keep it colored and ignore its subgraph.

        red_tasks = {*tasks, *recursive_tasks}

        # Add any already skipped tasks to the red tasks, unless reset=True.
        for task in self.tasks():
            if (tag := get_skip_tag(task)) is not None:
                if reset:
                    task.remove_tag(tag)
                else:
                    red_tasks.add(task)

        # Mark predecessors of the recursive_tasks in blue.
        blue_tasks: set[Task] = set()
        for task in iter_predecessors(recursive_tasks, blue_tasks):
            blue_tasks.add(task)

        # Discolor any blue tasks from the root, ignoring any red tasks.
        for task in iter_predecessors(self.tasks(goals=True), red_tasks):
            blue_tasks.discard(task)

        for task in blue_tasks:
            task.add_tag("skip", reason=reason, origin=origin)
            if set_status and self.get_status(task) is None:
                self.set_status(task, TaskStatus.skipped(reason))

    # Graph

    def ready(self) -> list[Task]:
        """Returns all tasks that are ready to be executed. This can be used to constantly query the graph for new
        available tasks as the status of tasks in the graph is updated with :meth:`set_status`. An empty list is
        returned if no tasks are ready. At this point, if no tasks are currently running, :meth:`is_complete` can be
        used to check if the entire task graph was executed successfully."""

        ready_graph = self._get_ready_graph()
        root_set = (
            # HACK: Cast because of https://github.com/python/typeshed/pull/12472
            node
            for node in ready_graph.nodes
            if cast(int, ready_graph.in_degree(node)) == 0 and node not in self._results
        )
        tasks = [self.get_task(addr) for addr in root_set]
        if not tasks:
            return []

        # NOTE(NiklasRosenstein): We don't need to return GroupTasks, we can mark them as skipped right away.
        #       In a future version of Kraken, we want to represent groups not as task objects, so this special
        #       handling code will be obsolete.
        result, groups = map(lambda x: list(x), bipartition((lambda t: isinstance(t, GroupTask)), tasks))
        for group in groups:
            self.set_status(group, TaskStatus.skipped())
        if not result:
            result = self.ready()
        return result

    def get_successors(self, task: Task, ignore_groups: bool = True) -> list[Task]:
        """Returns the successors of the task in the original full build graph.

        Never returns group tasks."""

        result = []
        for task in (self.get_task(addr) for addr in self._digraph.successors(task.address)):
            if ignore_groups and isinstance(task, GroupTask):
                result += task.tasks
            else:
                result.append(task)
        return result

    def get_task(self, addr: Address | str) -> Task:
        if isinstance(addr, str):
            addr = Address(addr)
        if self._parent is None:
            return not_none(self._get_task(addr), lambda: f"no task for {addr!r}")
        return self.root.get_task(addr)

    def set_status(self, task: Task, status: TaskStatus, *, _force: bool = False) -> None:
        """Sets the status of a task, marking it as executed."""

        if not _force and (task.address in self._results and not self._results[task.address].is_started()):
            raise RuntimeError(f"already have a status for task `{task.address}`")
        self._results[task.address] = status
        if status.is_started():
            self._background_tasks.add(task.address)
        if status.is_ok():
            self._ok_tasks.add(task.address)
        if status.is_failed():
            self._failed_tasks.add(task.address)

    def is_complete(self) -> bool:
        """Returns `True` if, an only if, all tasks in the target subgraph have a non-failure result."""

        return set(self._digraph.nodes).issubset(self._ok_tasks)
__init__
__init__(
    context: Context,
    populate: bool = True,
    parent: TaskGraph | None = None,
) -> None

Create a new build graph from the given task list.

:param context: The context that the graph belongs to. :param populate: If enabled, the task graph will be immediately populated with the tasks in the context. The graph can also be later populated with the :meth:populate method.

Source code in kraken/core/system/graph.py
def __init__(self, context: Context, populate: bool = True, parent: TaskGraph | None = None) -> None:
    """Create a new build graph from the given task list.

    :param context: The context that the graph belongs to.
    :param populate: If enabled, the task graph will be immediately populated with the tasks in the context.
        The graph can also be later populated with the :meth:`populate` method.
    """

    self._parent = parent
    self._context = context

    # Nodes have the form {'data': _Node} and edges have the form {'data': _Edge}.
    # NOTE: DiGraph is not runtime-subscriptable.
    self._digraph: DiGraph[Address] = DiGraph()

    # Keep track of task execution results.
    self._results: dict[Address, TaskStatus] = {}

    # All tasks that have a successful or skipped status are stored here.
    self._ok_tasks: set[Address] = set()

    # All tasks that have a failed status are stored here.
    self._failed_tasks: set[Address] = set()

    # Keep track of the tasks that returned TaskStatus.STARTED. That means the task is a background task, and
    # if the TaskGraph is deserialized from a state file to continue the build, background tasks need to be
    # reset so they start again if another task requires them.
    self._background_tasks: set[Address] = set()

    if populate:
        self.populate()
execution_order
execution_order(all: bool = False) -> Iterable[Task]

Returns all tasks in the order they need to be executed.

:param all: Return the execution order of all tasks, not just from the target subgraph.

Source code in kraken/core/system/graph.py
def execution_order(self, all: bool = False) -> Iterable[Task]:
    """Returns all tasks in the order they need to be executed.

    :param all: Return the execution order of all tasks, not just from the target subgraph."""

    order = topological_sort(self._digraph if all else self._get_ready_graph())
    return (self.get_task(addr) for addr in order)
get_predecessors
get_predecessors(
    task: Task, ignore_groups: bool = False
) -> list[Task]

Returns the predecessors of the task in the original full build graph.

Source code in kraken/core/system/graph.py
def get_predecessors(self, task: Task, ignore_groups: bool = False) -> list[Task]:
    """Returns the predecessors of the task in the original full build graph."""

    result = []
    for task in (self.get_task(addr) for addr in self._digraph.predecessors(task.address)):
        if ignore_groups and isinstance(task, GroupTask):
            result += task.tasks
        else:
            result.append(task)
    return result
get_status
get_status(task: Task) -> TaskStatus | None

Return the status of a task.

Source code in kraken/core/system/graph.py
def get_status(self, task: Task) -> TaskStatus | None:
    """Return the status of a task."""

    return self._results.get(task.address)
get_successors
get_successors(
    task: Task, ignore_groups: bool = True
) -> list[Task]

Returns the successors of the task in the original full build graph.

Never returns group tasks.

Source code in kraken/core/system/graph.py
def get_successors(self, task: Task, ignore_groups: bool = True) -> list[Task]:
    """Returns the successors of the task in the original full build graph.

    Never returns group tasks."""

    result = []
    for task in (self.get_task(addr) for addr in self._digraph.successors(task.address)):
        if ignore_groups and isinstance(task, GroupTask):
            result += task.tasks
        else:
            result.append(task)
    return result
is_complete
is_complete() -> bool

Returns True if, an only if, all tasks in the target subgraph have a non-failure result.

Source code in kraken/core/system/graph.py
def is_complete(self) -> bool:
    """Returns `True` if, an only if, all tasks in the target subgraph have a non-failure result."""

    return set(self._digraph.nodes).issubset(self._ok_tasks)
mark_tasks_as_skipped
mark_tasks_as_skipped(
    tasks: Sequence[Task | str | Address] = (),
    recursive_tasks: Sequence[Task | str | Address] = (),
    *,
    set_status: bool = False,
    reason: str,
    origin: str,
    reset: bool
) -> None

This method adds the "skip" tag to all tasks and recursive_tasks. For the dependencies of the recursive_tasks, the tag will only be added if the task in question is not required by another task that is not being skipped.

:param set_status: Whether to set #TaskStatusType.SKIPPED for tasks in the graph using #set_status(). :param reason: A reason to attach to the "skip" tag. :param origin: An origin to attach to the "skip" tag. :param reset: Enable this to remove the "skip" tags of the same origin are removed from all mentioned tasks (including transtive dependencies for recursive_tasks) the graph first. Note that this does not unset any pre-existing task statuses.

Source code in kraken/core/system/graph.py
def mark_tasks_as_skipped(
    self,
    tasks: Sequence[Task | str | Address] = (),
    recursive_tasks: Sequence[Task | str | Address] = (),
    *,
    set_status: bool = False,
    reason: str,
    origin: str,
    reset: bool,
) -> None:
    """
    This method adds the `"skip"` tag to all *tasks* and *recursive_tasks*. For the dependencies of the
    *recursive_tasks*, the tag will only be added if the task in question is not required by another task
    that is not being skipped.

    :param set_status: Whether to set #TaskStatusType.SKIPPED for tasks in the graph using #set_status().
    :param reason: A reason to attach to the `"skip"` tag.
    :param origin: An origin to attach to the `"skip"` tag.
    :param reset: Enable this to remove the `"skip"` tags of the same *origin* are removed from all mentioned
        tasks (including transtive dependencies for *recursive_tasks*) the graph first. Note that this does not
        unset any pre-existing task statuses.
    """

    tasks = self.context.resolve_tasks(tasks)
    recursive_tasks = self.context.resolve_tasks(recursive_tasks)

    def get_skip_tag(task: Task) -> TaskTag | None:
        """Return the skip tag associated with this mark operation (i.e., "skip" tags of the same origin)."""

        return next((t for t in task.get_tags("skip") if t.origin == origin), None)

    def iter_predecessors(tasks: Iterable[Task], blackout: Collection[Task]) -> Iterable[Task]:
        """
        Iterate over the predecessors of the given tasks. Skip yielding and recursive iterating over tasks in
        *blackout*.
        """

        stack = list(tasks)
        while stack:
            task = stack.pop()
            if task not in blackout:
                yield task
                stack.extend(self.get_predecessors(task))

    # This algorithm works in multiple phases:
    #
    # (1) We gather all tasks that are definitely skipped and mark them with the color "red". We are certain
    #     that these tasks must be skipped because they are either already marked so in the graph, or they are
    #     tasks mentioned directly in the arguments to this function call.
    #
    # (2) We mark the subgraphs (i.e. predecessors) of all recursive_tasks with the color "blue". These are tasks
    #     that can potentially be skipped as well, but we are not sure yet.
    #
    # (3) We walk back through the entire task graph from its leafs, discoloring any "blue" task that we encounter.
    #     If we encounter a "red" task, we keep it colored and ignore its subgraph.

    red_tasks = {*tasks, *recursive_tasks}

    # Add any already skipped tasks to the red tasks, unless reset=True.
    for task in self.tasks():
        if (tag := get_skip_tag(task)) is not None:
            if reset:
                task.remove_tag(tag)
            else:
                red_tasks.add(task)

    # Mark predecessors of the recursive_tasks in blue.
    blue_tasks: set[Task] = set()
    for task in iter_predecessors(recursive_tasks, blue_tasks):
        blue_tasks.add(task)

    # Discolor any blue tasks from the root, ignoring any red tasks.
    for task in iter_predecessors(self.tasks(goals=True), red_tasks):
        blue_tasks.discard(task)

    for task in blue_tasks:
        task.add_tag("skip", reason=reason, origin=origin)
        if set_status and self.get_status(task) is None:
            self.set_status(task, TaskStatus.skipped(reason))
populate
populate(goals: Iterable[Task] | None = None) -> None

Populate the graph with the tasks from the context. This need only be called if the graph was not initially populated in the constructor.

Inverse relationships

This does not recognize inverse relationships from tasks that are not part of goals or any of their relationships. It is therefore recommended to populate the graph with all tasks in the context and use #trim() to reduce the graph.

Source code in kraken/core/system/graph.py
def populate(self, goals: Iterable[Task] | None = None) -> None:
    """Populate the graph with the tasks from the context. This need only be called if the graph was
    not initially populated in the constructor.

    !!! warning "Inverse relationships"

        This does not recognize inverse relationships from tasks that are not part of *goals* or
        any of their relationships. It is therefore recommended to populate the graph with all tasks in the
        context and use #trim() to reduce the graph.
    """

    if goals is None:
        for project in self.context.iter_projects():
            for task in project.tasks().values():
                if task.address not in self._digraph.nodes:
                    self._add_task(task)
    else:
        for task in goals:
            if task.address not in self._digraph.nodes:
                self._add_task(task)
ready
ready() -> list[Task]

Returns all tasks that are ready to be executed. This can be used to constantly query the graph for new available tasks as the status of tasks in the graph is updated with :meth:set_status. An empty list is returned if no tasks are ready. At this point, if no tasks are currently running, :meth:is_complete can be used to check if the entire task graph was executed successfully.

Source code in kraken/core/system/graph.py
def ready(self) -> list[Task]:
    """Returns all tasks that are ready to be executed. This can be used to constantly query the graph for new
    available tasks as the status of tasks in the graph is updated with :meth:`set_status`. An empty list is
    returned if no tasks are ready. At this point, if no tasks are currently running, :meth:`is_complete` can be
    used to check if the entire task graph was executed successfully."""

    ready_graph = self._get_ready_graph()
    root_set = (
        # HACK: Cast because of https://github.com/python/typeshed/pull/12472
        node
        for node in ready_graph.nodes
        if cast(int, ready_graph.in_degree(node)) == 0 and node not in self._results
    )
    tasks = [self.get_task(addr) for addr in root_set]
    if not tasks:
        return []

    # NOTE(NiklasRosenstein): We don't need to return GroupTasks, we can mark them as skipped right away.
    #       In a future version of Kraken, we want to represent groups not as task objects, so this special
    #       handling code will be obsolete.
    result, groups = map(lambda x: list(x), bipartition((lambda t: isinstance(t, GroupTask)), tasks))
    for group in groups:
        self.set_status(group, TaskStatus.skipped())
    if not result:
        result = self.ready()
    return result
reduce
reduce(keep_explicit: bool = False) -> TaskGraph

Return a copy of the task graph that has been transitively reduced.

:param keep_explicit: Keep non-implicit edges in tact.

Source code in kraken/core/system/graph.py
def reduce(self, keep_explicit: bool = False) -> TaskGraph:
    """Return a copy of the task graph that has been transitively reduced.

    :param keep_explicit: Keep non-implicit edges in tact."""

    digraph = self._digraph
    reduced_graph = transitive_reduction(digraph)
    reduced_graph.add_nodes_from(digraph.nodes(data=True))
    reduced_graph.add_edges_from(
        (u, v, digraph.edges[u, v])
        for u, v in digraph.edges
        if (keep_explicit and not digraph.edges[u, v]["data"].implicit) or (u, v) in reduced_graph.edges
    )

    graph = TaskGraph(self.context, populate=False, parent=self)
    graph._digraph = reduced_graph
    graph.results_from(self)

    return graph
restart
restart() -> None

Discard the results of all tasks.

Source code in kraken/core/system/graph.py
def restart(self) -> None:
    """Discard the results of all tasks."""

    self._results.clear()
    self._ok_tasks.clear()
    self._background_tasks.clear()
    self._failed_tasks.clear()
results_from
results_from(other: TaskGraph) -> None

Merge the results from the other graph into this graph. Only takes the results of tasks that are known to the graph. If the same task has a result in both graphs, and one task result is not successful, the not successful result is preferred.

Source code in kraken/core/system/graph.py
def results_from(self, other: TaskGraph) -> None:
    """Merge the results from the *other* graph into this graph. Only takes the results of tasks that are
    known to the graph. If the same task has a result in both graphs, and one task result is not successful,
    the not successful result is preferred."""

    self._results = {**other._results, **self._results}
    self._ok_tasks.update(other._ok_tasks)
    self._failed_tasks.update(other._failed_tasks)

    for task in self.tasks():
        status_a = self._results.get(task.address)
        status_b = other._results.get(task.address)
        if status_a is not None and status_b is not None and status_a.type != status_b.type:
            resolved_status: TaskStatus | None = status_a if status_a.is_not_ok() else status_b
        else:
            resolved_status = status_a or status_b
        if resolved_status is not None:
            # NOTE: This will already take care of updating :attr:`_background_tasks`.
            self.set_status(task, resolved_status, _force=True)
resume
resume() -> None

Reset the result of all background tasks that are required by any pending tasks. This needs to be called when a build graph is resumed in a secondary execution to ensure that background tasks are active for the tasks that require them.

Source code in kraken/core/system/graph.py
def resume(self) -> None:
    """Reset the result of all background tasks that are required by any pending tasks. This needs to be
    called when a build graph is resumed in a secondary execution to ensure that background tasks are active
    for the tasks that require them."""

    reset_tasks: set[Address] = set()
    for task in self.tasks(pending=True):
        for pred in self.get_predecessors(task, ignore_groups=True):
            if pred.address in self._background_tasks:
                self._background_tasks.discard(pred.address)
                self._ok_tasks.discard(pred.address)
                self._failed_tasks.discard(pred.address)
                self._results.pop(pred.address, None)
                reset_tasks.add(pred.address)

    if reset_tasks:
        logger.info(
            "Reset the status of %d background task(s): %s", len(reset_tasks), " ".join(map(str, reset_tasks))
        )
set_status
set_status(
    task: Task, status: TaskStatus, *, _force: bool = False
) -> None

Sets the status of a task, marking it as executed.

Source code in kraken/core/system/graph.py
def set_status(self, task: Task, status: TaskStatus, *, _force: bool = False) -> None:
    """Sets the status of a task, marking it as executed."""

    if not _force and (task.address in self._results and not self._results[task.address].is_started()):
        raise RuntimeError(f"already have a status for task `{task.address}`")
    self._results[task.address] = status
    if status.is_started():
        self._background_tasks.add(task.address)
    if status.is_ok():
        self._ok_tasks.add(task.address)
    if status.is_failed():
        self._failed_tasks.add(task.address)
tasks
tasks(
    goals: bool = False,
    pending: bool = False,
    failed: bool = False,
    not_executed: bool = False,
) -> Iterator[Task]

Returns the tasks in the graph in arbitrary order.

:param goals: Return only goal tasks (i.e. leaf nodes). :param pending: Return only pending tasks. :param failed: Return only failed tasks. :param not_executed: Return only not executed tasks (i.e. downstream of failed tasks)

Source code in kraken/core/system/graph.py
def tasks(
    self,
    goals: bool = False,
    pending: bool = False,
    failed: bool = False,
    not_executed: bool = False,
) -> Iterator[Task]:
    """Returns the tasks in the graph in arbitrary order.

    :param goals: Return only goal tasks (i.e. leaf nodes).
    :param pending: Return only pending tasks.
    :param failed: Return only failed tasks.
    :param not_executed: Return only not executed tasks (i.e. downstream of failed tasks)"""

    tasks = (self.get_task(addr) for addr in self._digraph)
    if goals:
        # HACK: Cast because of https://github.com/python/typeshed/pull/12472
        tasks = (t for t in tasks if cast(int, self._digraph.out_degree(t.address)) == 0)
    if pending:
        tasks = (t for t in tasks if t.address not in self._results)
    if failed:
        tasks = (t for t in tasks if t.address in self._results and self._results[t.address].is_failed())
    if not_executed:
        tasks = (
            t
            for t in tasks
            if (
                (t.address not in self._results)
                or (t.address in self._results and self._results[t.address].is_pending())
            )
        )
    return tasks
trim
trim(goals: Sequence[Task]) -> TaskGraph

Returns a copy of the graph that is trimmed to execute only goals and their strict dependencies.

Source code in kraken/core/system/graph.py
def trim(self, goals: Sequence[Task]) -> TaskGraph:
    """Returns a copy of the graph that is trimmed to execute only *goals* and their strict dependencies."""

    graph = TaskGraph(self.context, parent=self)
    unrequired_tasks = set(graph._digraph.nodes) - graph._get_required_tasks(goals)
    graph._remove_nodes_keep_transitive_edges(unrequired_tasks)
    graph.results_from(self)
    return graph

TaskSet

Bases: Collection[Task]

Represents a collection of tasks.

Source code in kraken/core/system/task.py
class TaskSet(Collection[Task]):
    """Represents a collection of tasks."""

    @staticmethod
    def build(
        context: Context | Project,
        selector: str | Address | Task | Iterable[str | Address | Task],
        project: Project | None = None,
    ) -> TaskSet:
        """
        For each item in *selector*, resolve tasks using [`Context.resolve_tasks()`]. If a selector is a string,
        assign the resolved tasks to a partition by that selector value.

        Args:
            context: A Kraken context or project to resolve the *selector* in. If it is a project, string selectors
                are treated relative to the project.
            selector: A single selector string or task, or a sequence thereof. Note that selectors of type [`Address`]
                are converted to string partitions.
        """

        from kraken.core.system.project import Project

        if isinstance(context, Project):
            project = context
            context = context.context
        else:
            project = None

        if isinstance(selector, (str, Address, Task)):
            selector = [selector]

        result = TaskSet()
        for item in selector:
            if isinstance(item, (str, Address)):
                result.add(context.resolve_tasks([item], project), partition=str(item))
            else:
                result.add([item])

        return result

    def __init__(self, tasks: Iterable[Task] = ()) -> None:
        self._tasks = set(tasks)
        self._partition_to_task_map: dict[str, set[Task]] = {}
        self._task_to_partition_map: dict[Task, set[str]] = {}

    def __iter__(self) -> Iterator[Task]:
        return iter(self._tasks)

    def __len__(self) -> int:
        return len(self._tasks)

    def __repr__(self) -> str:
        return f"TaskSet(length={len(self._tasks)}, pttm={self._partition_to_task_map}, ttpm={self._task_to_partition_map})"

    def __contains__(self, __x: object) -> bool:
        return __x in self._tasks

    def add(self, tasks: Iterable[Task], *, partition: str | None = None) -> None:
        """Add the given *tasks* to the set.

        :param tasks: The tasks to add.
        :param partition: If specified, this will register the *tasks* under the given string as a "partition"
            within the set. This is used by :meth:`Project.resolve_tasks` to store which tasks were resolved
            through which task selector string. Later, this can be used to map a task back to the selector it
            was resolved from."""

        tasks = set(tasks)
        self._tasks.update(tasks)
        if partition is not None:
            self._partition_to_task_map.setdefault(partition, set()).update(tasks)
            for task in tasks:
                self._task_to_partition_map.setdefault(task, set()).add(partition)

    def select(self, output_type: type[T]) -> TaskSetSelect[T]:
        """Resolve outputs of the given tasks and return them as a dictionary mapping from task to the values. This
        should only be called after the given tasks have been executed, otherwise the outputs are likely not set.
        Use :meth:`resolve_outputs_supplier` to create a :class:`Supplier` that delegates to this method when it is
        retrieved.

        In addition to looking at output properties, this also includes elements contained in :attr:`Task.output`."""

        return TaskSetSelect(self, output_type)

    def partitions(self) -> TaskSetPartitions:
        """Return a helper class to access the partitions in the set."""

        return TaskSetPartitions(self._partition_to_task_map, self._task_to_partition_map)
add
add(
    tasks: Iterable[Task], *, partition: str | None = None
) -> None

Add the given tasks to the set.

:param tasks: The tasks to add. :param partition: If specified, this will register the tasks under the given string as a "partition" within the set. This is used by :meth:Project.resolve_tasks to store which tasks were resolved through which task selector string. Later, this can be used to map a task back to the selector it was resolved from.

Source code in kraken/core/system/task.py
def add(self, tasks: Iterable[Task], *, partition: str | None = None) -> None:
    """Add the given *tasks* to the set.

    :param tasks: The tasks to add.
    :param partition: If specified, this will register the *tasks* under the given string as a "partition"
        within the set. This is used by :meth:`Project.resolve_tasks` to store which tasks were resolved
        through which task selector string. Later, this can be used to map a task back to the selector it
        was resolved from."""

    tasks = set(tasks)
    self._tasks.update(tasks)
    if partition is not None:
        self._partition_to_task_map.setdefault(partition, set()).update(tasks)
        for task in tasks:
            self._task_to_partition_map.setdefault(task, set()).add(partition)
build staticmethod
build(
    context: Context | Project,
    selector: (
        str
        | Address
        | Task
        | Iterable[str | Address | Task]
    ),
    project: Project | None = None,
) -> TaskSet

For each item in selector, resolve tasks using [Context.resolve_tasks()]. If a selector is a string, assign the resolved tasks to a partition by that selector value.

Parameters:

Name Type Description Default
context Context | Project

A Kraken context or project to resolve the selector in. If it is a project, string selectors are treated relative to the project.

required
selector str | Address | Task | Iterable[str | Address | Task]

A single selector string or task, or a sequence thereof. Note that selectors of type [Address] are converted to string partitions.

required
Source code in kraken/core/system/task.py
@staticmethod
def build(
    context: Context | Project,
    selector: str | Address | Task | Iterable[str | Address | Task],
    project: Project | None = None,
) -> TaskSet:
    """
    For each item in *selector*, resolve tasks using [`Context.resolve_tasks()`]. If a selector is a string,
    assign the resolved tasks to a partition by that selector value.

    Args:
        context: A Kraken context or project to resolve the *selector* in. If it is a project, string selectors
            are treated relative to the project.
        selector: A single selector string or task, or a sequence thereof. Note that selectors of type [`Address`]
            are converted to string partitions.
    """

    from kraken.core.system.project import Project

    if isinstance(context, Project):
        project = context
        context = context.context
    else:
        project = None

    if isinstance(selector, (str, Address, Task)):
        selector = [selector]

    result = TaskSet()
    for item in selector:
        if isinstance(item, (str, Address)):
            result.add(context.resolve_tasks([item], project), partition=str(item))
        else:
            result.add([item])

    return result
partitions
partitions() -> TaskSetPartitions

Return a helper class to access the partitions in the set.

Source code in kraken/core/system/task.py
def partitions(self) -> TaskSetPartitions:
    """Return a helper class to access the partitions in the set."""

    return TaskSetPartitions(self._partition_to_task_map, self._task_to_partition_map)
select
select(output_type: type[T]) -> TaskSetSelect[T]

Resolve outputs of the given tasks and return them as a dictionary mapping from task to the values. This should only be called after the given tasks have been executed, otherwise the outputs are likely not set. Use :meth:resolve_outputs_supplier to create a :class:Supplier that delegates to this method when it is retrieved.

In addition to looking at output properties, this also includes elements contained in :attr:Task.output.

Source code in kraken/core/system/task.py
def select(self, output_type: type[T]) -> TaskSetSelect[T]:
    """Resolve outputs of the given tasks and return them as a dictionary mapping from task to the values. This
    should only be called after the given tasks have been executed, otherwise the outputs are likely not set.
    Use :meth:`resolve_outputs_supplier` to create a :class:`Supplier` that delegates to this method when it is
    retrieved.

    In addition to looking at output properties, this also includes elements contained in :attr:`Task.output`."""

    return TaskSetSelect(self, output_type)

TaskStatus dataclass

Represents a task status with a message.

Source code in kraken/core/system/task.py
@dataclasses.dataclass
class TaskStatus:
    """Represents a task status with a message."""

    type: TaskStatusType
    message: str | None

    def is_ok(self) -> bool:
        return self.type.is_ok()

    def is_not_ok(self) -> bool:
        return self.type.is_not_ok()

    def is_pending(self) -> bool:
        return self.type == TaskStatusType.PENDING

    def is_failed(self) -> bool:
        return self.type == TaskStatusType.FAILED

    def is_interrupted(self) -> bool:
        return self.type == TaskStatusType.INTERRUPTED

    def is_succeeded(self) -> bool:
        return self.type == TaskStatusType.SUCCEEDED

    def is_started(self) -> bool:
        return self.type == TaskStatusType.STARTED

    def is_skipped(self) -> bool:
        return self.type == TaskStatusType.SKIPPED

    def is_up_to_date(self) -> bool:
        return self.type == TaskStatusType.UP_TO_DATE

    def is_warning(self) -> bool:
        return self.type == TaskStatusType.WARNING

    @staticmethod
    def pending(message: str | None = None) -> TaskStatus:
        return TaskStatus(TaskStatusType.PENDING, message)

    @staticmethod
    def failed(message: str | None = None) -> TaskStatus:
        return TaskStatus(TaskStatusType.FAILED, message)

    @staticmethod
    def interrupted(message: str | None = None) -> TaskStatus:
        return TaskStatus(TaskStatusType.INTERRUPTED, message)

    @staticmethod
    def succeeded(message: str | None = None) -> TaskStatus:
        return TaskStatus(TaskStatusType.SUCCEEDED, message)

    @staticmethod
    def started(message: str | None = None) -> TaskStatus:
        return TaskStatus(TaskStatusType.STARTED, message)

    @staticmethod
    def skipped(message: str | None = None) -> TaskStatus:
        return TaskStatus(TaskStatusType.SKIPPED, message)

    @staticmethod
    def up_to_date(message: str | None = None) -> TaskStatus:
        return TaskStatus(TaskStatusType.UP_TO_DATE, message)

    @staticmethod
    def warning(message: str | None = None) -> TaskStatus:
        return TaskStatus(TaskStatusType.WARNING, message)

    @staticmethod
    def from_exit_code(command: list[str] | None, code: int) -> TaskStatus:
        return TaskStatus(
            TaskStatusType.SUCCEEDED if code == 0 else TaskStatusType.FAILED,
            None
            if code == 0 or command is None
            else 'command "' + " ".join(map(shlex.quote, command)) + f'" returned exit code {code}',
        )

TaskStatusType

Bases: Enum

Represents the possible statuses that a task can return from its execution.

Source code in kraken/core/system/task.py
class TaskStatusType(enum.Enum):
    """Represents the possible statuses that a task can return from its execution."""

    PENDING = enum.auto()  #: The task is pending execution (only to be returned from :meth:`Task.prepare`).
    FAILED = enum.auto()  #: The task failed it's preparation or execution.
    INTERRUPTED = enum.auto()  #: The task was interrupted by the user.
    SUCCEEDED = enum.auto()  #: The task succeeded it's execution (only to be returned from :meth:`Task.execute`).
    STARTED = enum.auto()  #: The task started a background task that needs to be torn down later.
    SKIPPED = enum.auto()  #: The task was skipped (i.e. it is not applicable).
    UP_TO_DATE = enum.auto()  #: The task is up to date and did not run (or not run it's usual logic).
    WARNING = enum.auto()  #: The task succeeded, but with warnings (only to be returned from :meth:`Task.execute`).

    def is_ok(self) -> bool:
        return not self.is_not_ok()

    def is_not_ok(self) -> bool:
        return self in (TaskStatusType.PENDING, TaskStatusType.FAILED, TaskStatusType.INTERRUPTED)

    def is_pending(self) -> bool:
        return self == TaskStatusType.PENDING

    def is_failed(self) -> bool:
        return self == TaskStatusType.FAILED

    def is_interrupted(self) -> bool:
        return self == TaskStatusType.INTERRUPTED

    def is_succeeded(self) -> bool:
        return self == TaskStatusType.SUCCEEDED

    def is_started(self) -> bool:
        return self == TaskStatusType.STARTED

    def is_skipped(self) -> bool:
        return self == TaskStatusType.SKIPPED

    def is_up_to_date(self) -> bool:
        return self == TaskStatusType.UP_TO_DATE

    def is_warning(self) -> bool:
        return self == TaskStatusType.WARNING

VoidTask

Bases: Task

This task does nothing and can always be skipped.

Source code in kraken/core/system/task.py
class VoidTask(Task):
    """This task does nothing and can always be skipped."""

    skip: Property[bool] = Property.default(True)
    message: Property[str] = Property.default("is a VoidTask")

    def prepare(self) -> TaskStatus | None:
        if self.skip.get():
            return TaskStatus.skipped(self.message.get())
        return TaskStatus.pending()

    def execute(self) -> TaskStatus | None:
        pass