Skip to content

Runtime

Runtime environment and lifecycle management for DAG execution.

Runtime context counter for tracking active DAG contexts.

AsyncRuntimeCounter

Asynchronous counter for active contexts.

Source code in shutils/dag/runtime.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
class AsyncRuntimeCounter:
    """Asynchronous counter for active contexts."""

    def __init__(self, runtime_counter: RuntimeCounterMixin):
        """Initialize with the underlying counter mixin.

        Args:
            runtime_counter: The mixin instance to wrap.
        """
        self.__runtime_counter = runtime_counter

    async def increase(self):
        """Atomically increment the context counter."""
        async with self.__runtime_counter._conter_lock.async_lock():
            self.__runtime_counter._context_counter += 1

    async def decrease(self):
        """Atomically decrement the context counter."""
        async with self.__runtime_counter._conter_lock.async_lock():
            self.__runtime_counter._context_counter -= 1

__init__(runtime_counter)

Initialize with the underlying counter mixin.

Parameters:

Name Type Description Default
runtime_counter RuntimeCounterMixin

The mixin instance to wrap.

required
Source code in shutils/dag/runtime.py
69
70
71
72
73
74
75
def __init__(self, runtime_counter: RuntimeCounterMixin):
    """Initialize with the underlying counter mixin.

    Args:
        runtime_counter: The mixin instance to wrap.
    """
    self.__runtime_counter = runtime_counter

decrease() async

Atomically decrement the context counter.

Source code in shutils/dag/runtime.py
82
83
84
85
async def decrease(self):
    """Atomically decrement the context counter."""
    async with self.__runtime_counter._conter_lock.async_lock():
        self.__runtime_counter._context_counter -= 1

increase() async

Atomically increment the context counter.

Source code in shutils/dag/runtime.py
77
78
79
80
async def increase(self):
    """Atomically increment the context counter."""
    async with self.__runtime_counter._conter_lock.async_lock():
        self.__runtime_counter._context_counter += 1

Runtime

Bases: RuntimeCounterMixin

Runtime object for tracking active DAG contexts.

Source code in shutils/dag/runtime.py
88
89
90
91
92
93
class Runtime(RuntimeCounterMixin):
    """Runtime object for tracking active DAG contexts."""

    def __init__(self):
        """Initialize the runtime counter."""
        RuntimeCounterMixin.__init__(self)

__init__()

Initialize the runtime counter.

Source code in shutils/dag/runtime.py
91
92
93
def __init__(self):
    """Initialize the runtime counter."""
    RuntimeCounterMixin.__init__(self)

RuntimeCounterMixin

Mixin providing a thread-safe counter for tracking active contexts.

Source code in shutils/dag/runtime.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class RuntimeCounterMixin:
    """Mixin providing a thread-safe counter for tracking active contexts."""

    def __init__(self):
        """Initialize the counter with sync and async interfaces."""
        self._context_counter: int = 0
        self._conter_lock = SmartLock()

        self._async_counter = None
        self._sync_counter = None

    @property
    def counter(self) -> int:
        """Current number of active contexts."""
        return self._context_counter

    @property
    def sync_counter(self) -> "SyncRuntimeCounter":
        """Lazy accessor for the synchronous counter interface."""
        if self._sync_counter is None:
            self._sync_counter = SyncRuntimeCounter(self)
        return self._sync_counter

    @property
    def async_counter(self) -> "AsyncRuntimeCounter":
        """Lazy accessor for the asynchronous counter interface."""
        if self._async_counter is None:
            self._async_counter = AsyncRuntimeCounter(self)
        return self._async_counter

async_counter property

Lazy accessor for the asynchronous counter interface.

counter property

Current number of active contexts.

sync_counter property

Lazy accessor for the synchronous counter interface.

__init__()

Initialize the counter with sync and async interfaces.

Source code in shutils/dag/runtime.py
16
17
18
19
20
21
22
def __init__(self):
    """Initialize the counter with sync and async interfaces."""
    self._context_counter: int = 0
    self._conter_lock = SmartLock()

    self._async_counter = None
    self._sync_counter = None

SyncRuntimeCounter

Synchronous thread-safe counter for active contexts.

Source code in shutils/dag/runtime.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
class SyncRuntimeCounter:
    """Synchronous thread-safe counter for active contexts."""

    def __init__(self, runtime_counter: RuntimeCounterMixin):
        """Initialize with the underlying counter mixin.

        Args:
            runtime_counter: The mixin instance to wrap.
        """
        self.__runtime_counter = runtime_counter

    def increase(self):
        """Atomically increment the context counter."""
        with self.__runtime_counter._conter_lock.sync_lock():
            self.__runtime_counter._context_counter += 1

    def decrease(self):
        """Atomically decrement the context counter."""
        with self.__runtime_counter._conter_lock.sync_lock():
            self.__runtime_counter._context_counter -= 1

__init__(runtime_counter)

Initialize with the underlying counter mixin.

Parameters:

Name Type Description Default
runtime_counter RuntimeCounterMixin

The mixin instance to wrap.

required
Source code in shutils/dag/runtime.py
47
48
49
50
51
52
53
def __init__(self, runtime_counter: RuntimeCounterMixin):
    """Initialize with the underlying counter mixin.

    Args:
        runtime_counter: The mixin instance to wrap.
    """
    self.__runtime_counter = runtime_counter

decrease()

Atomically decrement the context counter.

Source code in shutils/dag/runtime.py
60
61
62
63
def decrease(self):
    """Atomically decrement the context counter."""
    with self.__runtime_counter._conter_lock.sync_lock():
        self.__runtime_counter._context_counter -= 1

increase()

Atomically increment the context counter.

Source code in shutils/dag/runtime.py
55
56
57
58
def increase(self):
    """Atomically increment the context counter."""
    with self.__runtime_counter._conter_lock.sync_lock():
        self.__runtime_counter._context_counter += 1