Skip to content

Context

Execution context for DAG runs.

Context management for DAG execution, including sync/async context wrappers.

AsyncContext

Bases: AsyncDataWhiteBoard, AsyncTaskState

Asynchronous wrapper around Context for async-safe operations.

Source code in shutils/dag/context.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
class AsyncContext(AsyncDataWhiteBoard, AsyncTaskState):
    """Asynchronous wrapper around Context for async-safe operations."""

    def __init__(self, context: Context):
        """Initialize with the underlying context.

        Args:
            context: The core context object.
        """
        AsyncDataWhiteBoard.__init__(self, context)
        AsyncTaskState.__init__(self, context)
        self.__context = context

    @property
    def id(self) -> str:
        """Unique identifier of the context."""
        return self.__context.id

    @property
    def context(self) -> Context:
        """Access the underlying core context."""
        return self.__context

    @property
    def sync_context(self) -> "SyncContext":
        """Access the sync context wrapper for the same core context."""
        return self.__context.sync_context

    def freeze(self):
        """Freeze the context to prevent it from being scheduled."""
        self.__context.freezing = True

    def thraw(self):
        """Unfreeze the context to allow scheduling again."""
        self.__context.freezing = False

    async def destory(self, destory_parent: bool = False):
        """Destroy the context and release runtime resources.

        Args:
            destory_parent: If True, also destroy the parent when this is its last child.
        """
        if self.__context.is_destory():
            return

        self.__context.set_destory(True)
        if self.__context._runtime:
            await self.__context._runtime.async_counter.decrease()
        if self.__context._parent_context:
            async with self.__context._parent_context.parent_rwlock.async_read():
                self.__context._parent_context._child_context_num -= 1
                if destory_parent and self.__context._parent_context._child_context_num == 0:
                    await self.__context._parent_context.async_context.destory()
        if self.__context._child_context_list:
            for child in self.__context._child_context_list:
                await child.async_context.destory()

    async def create(
        self, copy_data: bool = False, deep_copy: bool = False, name: str = "", skip_complete: bool = False
    ) -> "AsyncContext":
        """Create a new context, optionally copying data from this one.

        Args:
            copy_data: Whether to copy data to the new context.
            deep_copy: Whether to deep-copy data instead of sharing references.
            name: Optional name for the new context.
            skip_complete: If True, skip completion tracking for the new context.

        Returns:
            The async wrapper of the new context.
        """
        new_context = Context(self.__context._runtime, name=name)
        if copy_data:
            await self.copy(new_context, deep_copy)
        new_context._skip_complete = skip_complete
        return new_context.async_context

    async def child_context_num(self) -> int:
        """Get the number of child contexts."""
        async with self.__context.parent_rwlock.async_read():
            return self.__context._child_context_num

    async def iter_child_context(self) -> AsyncGenerator["AsyncContext"]:
        """Iterate over child contexts under an async read lock."""
        async with self.__context.parent_rwlock.async_read():
            for child in self.__context._child_context_list:
                yield child.async_context

    async def parent_context(self) -> "AsyncContext | None":
        """Get the parent context, or None if this is a root context."""
        async with self.__context.parent_rwlock.async_read():
            if self.__context._parent_context:
                return self.__context._parent_context.async_context
            else:
                return None

    @overload
    async def create_child(self, num: int = 0, name: str | None = None) -> "AsyncContext": ...

    @overload
    async def create_child(self, num: int, name: str | list[str] | None = None) -> list["AsyncContext"]: ...

    async def create_child(
        self, num: int = 0, name: str | list[str] | None = None
    ) -> "AsyncContext | list[AsyncContext]":
        """Create one or more child contexts.

        Args:
            num: Number of children to create. 0 creates a single child.
            name: Optional name(s) for the children.

        Returns:
            A single AsyncContext if num is 0, otherwise a list of AsyncContexts.
        """
        # if self.__context._parent_context is not None:
        #     raise ValueError("parent context must be a root context")
        return_context = num == 0
        num = num if num else 1
        if name is None:
            name = [""] * num
        elif isinstance(name, str):
            name = [name] * num
        elif isinstance(name, list) and len(name) != num:
            raise ValueError("name list length must be equal to num")
        context = []
        for idx in range(num):
            sub_context = Context(self.__context._runtime, name=name[idx])
            sub_context._parent_context = self.__context
            async with self.__context.parent_rwlock.async_write():
                self.__context._child_context_list.append(sub_context)
                self.__context._child_context_num += 1
            context.append(sub_context.async_context)

        if not return_context:
            return context
        else:
            return context[0]

    async def complete(self, task: "TaskBase"):
        """Mark a task as completed and propagate to the parent context."""
        await super()._complete(task)
        if self.__context._parent_context:
            await self.__context._parent_context.async_context.complete(task)

context property

Access the underlying core context.

id property

Unique identifier of the context.

sync_context property

Access the sync context wrapper for the same core context.

__init__(context)

Initialize with the underlying context.

Parameters:

Name Type Description Default
context Context

The core context object.

required
Source code in shutils/dag/context.py
218
219
220
221
222
223
224
225
226
def __init__(self, context: Context):
    """Initialize with the underlying context.

    Args:
        context: The core context object.
    """
    AsyncDataWhiteBoard.__init__(self, context)
    AsyncTaskState.__init__(self, context)
    self.__context = context

child_context_num() async

Get the number of child contexts.

Source code in shutils/dag/context.py
292
293
294
295
async def child_context_num(self) -> int:
    """Get the number of child contexts."""
    async with self.__context.parent_rwlock.async_read():
        return self.__context._child_context_num

complete(task) async

Mark a task as completed and propagate to the parent context.

Source code in shutils/dag/context.py
353
354
355
356
357
async def complete(self, task: "TaskBase"):
    """Mark a task as completed and propagate to the parent context."""
    await super()._complete(task)
    if self.__context._parent_context:
        await self.__context._parent_context.async_context.complete(task)

create(copy_data=False, deep_copy=False, name='', skip_complete=False) async

Create a new context, optionally copying data from this one.

Parameters:

Name Type Description Default
copy_data bool

Whether to copy data to the new context.

False
deep_copy bool

Whether to deep-copy data instead of sharing references.

False
name str

Optional name for the new context.

''
skip_complete bool

If True, skip completion tracking for the new context.

False

Returns:

Type Description
AsyncContext

The async wrapper of the new context.

Source code in shutils/dag/context.py
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
async def create(
    self, copy_data: bool = False, deep_copy: bool = False, name: str = "", skip_complete: bool = False
) -> "AsyncContext":
    """Create a new context, optionally copying data from this one.

    Args:
        copy_data: Whether to copy data to the new context.
        deep_copy: Whether to deep-copy data instead of sharing references.
        name: Optional name for the new context.
        skip_complete: If True, skip completion tracking for the new context.

    Returns:
        The async wrapper of the new context.
    """
    new_context = Context(self.__context._runtime, name=name)
    if copy_data:
        await self.copy(new_context, deep_copy)
    new_context._skip_complete = skip_complete
    return new_context.async_context

create_child(num=0, name=None) async

create_child(num: int = 0, name: str | None = None) -> AsyncContext
create_child(num: int, name: str | list[str] | None = None) -> list[AsyncContext]

Create one or more child contexts.

Parameters:

Name Type Description Default
num int

Number of children to create. 0 creates a single child.

0
name str | list[str] | None

Optional name(s) for the children.

None

Returns:

Type Description
AsyncContext | list[AsyncContext]

A single AsyncContext if num is 0, otherwise a list of AsyncContexts.

Source code in shutils/dag/context.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
async def create_child(
    self, num: int = 0, name: str | list[str] | None = None
) -> "AsyncContext | list[AsyncContext]":
    """Create one or more child contexts.

    Args:
        num: Number of children to create. 0 creates a single child.
        name: Optional name(s) for the children.

    Returns:
        A single AsyncContext if num is 0, otherwise a list of AsyncContexts.
    """
    # if self.__context._parent_context is not None:
    #     raise ValueError("parent context must be a root context")
    return_context = num == 0
    num = num if num else 1
    if name is None:
        name = [""] * num
    elif isinstance(name, str):
        name = [name] * num
    elif isinstance(name, list) and len(name) != num:
        raise ValueError("name list length must be equal to num")
    context = []
    for idx in range(num):
        sub_context = Context(self.__context._runtime, name=name[idx])
        sub_context._parent_context = self.__context
        async with self.__context.parent_rwlock.async_write():
            self.__context._child_context_list.append(sub_context)
            self.__context._child_context_num += 1
        context.append(sub_context.async_context)

    if not return_context:
        return context
    else:
        return context[0]

destory(destory_parent=False) async

Destroy the context and release runtime resources.

Parameters:

Name Type Description Default
destory_parent bool

If True, also destroy the parent when this is its last child.

False
Source code in shutils/dag/context.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
async def destory(self, destory_parent: bool = False):
    """Destroy the context and release runtime resources.

    Args:
        destory_parent: If True, also destroy the parent when this is its last child.
    """
    if self.__context.is_destory():
        return

    self.__context.set_destory(True)
    if self.__context._runtime:
        await self.__context._runtime.async_counter.decrease()
    if self.__context._parent_context:
        async with self.__context._parent_context.parent_rwlock.async_read():
            self.__context._parent_context._child_context_num -= 1
            if destory_parent and self.__context._parent_context._child_context_num == 0:
                await self.__context._parent_context.async_context.destory()
    if self.__context._child_context_list:
        for child in self.__context._child_context_list:
            await child.async_context.destory()

freeze()

Freeze the context to prevent it from being scheduled.

Source code in shutils/dag/context.py
243
244
245
def freeze(self):
    """Freeze the context to prevent it from being scheduled."""
    self.__context.freezing = True

iter_child_context() async

Iterate over child contexts under an async read lock.

Source code in shutils/dag/context.py
297
298
299
300
301
async def iter_child_context(self) -> AsyncGenerator["AsyncContext"]:
    """Iterate over child contexts under an async read lock."""
    async with self.__context.parent_rwlock.async_read():
        for child in self.__context._child_context_list:
            yield child.async_context

parent_context() async

Get the parent context, or None if this is a root context.

Source code in shutils/dag/context.py
303
304
305
306
307
308
309
async def parent_context(self) -> "AsyncContext | None":
    """Get the parent context, or None if this is a root context."""
    async with self.__context.parent_rwlock.async_read():
        if self.__context._parent_context:
            return self.__context._parent_context.async_context
        else:
            return None

thraw()

Unfreeze the context to allow scheduling again.

Source code in shutils/dag/context.py
247
248
249
def thraw(self):
    """Unfreeze the context to allow scheduling again."""
    self.__context.freezing = False

Context

Bases: DataWhiteBoardMixin, TaskStateMixin

Core context object carrying data and task state through the DAG.

Source code in shutils/dag/context.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class Context(DataWhiteBoardMixin, TaskStateMixin):
    """Core context object carrying data and task state through the DAG."""

    def __init__(self, runtime: "Runtime | None", parent: "Context | None" = None, name: str = ""):
        """Initialize a context with optional runtime, parent, and name.

        Args:
            runtime: The runtime for tracking active context count.
            parent: Optional parent context for hierarchical contexts.
            name: Optional context name; defaults to a UUID.
        """
        DataWhiteBoardMixin.__init__(self)
        TaskStateMixin.__init__(self)
        self.id = name if name else str(uuid.uuid4())
        self.parent_rwlock = SmartRWLock()
        self._parent_context: Context | None = parent
        self._child_context_list: list[Context] = []
        self._child_context_num: int = 0
        self._runtime = runtime
        self.freezing: bool = False
        self.awake_time: dict[TaskBase, float] = {}
        if self._runtime:
            self._runtime.sync_counter.increase()
        if parent:
            # if parent._parent_context is not None:
            #     raise ValueError("parent context must be a root context")
            with parent.parent_rwlock.write():
                parent._child_context_num += 1
                parent._child_context_list.append(self)
        self._sync_context = None
        self._async_context = None

    def __repr__(self):
        if debug_mode:
            return (
                f"{self.__class__.__name__}("
                f"data={DataWhiteBoardMixin.__repr__(self)}, "
                f"state={TaskStateMixin.__repr__(self)}, "
                f"parent={self._parent_context}, "
                f"child_context_num={self._child_context_num}, "
                f"complete_tasks={self._completed_tasks}, "
                f"available_tasks={self.available_tasks})"
            )
        else:
            return f"{self.__class__.__name__}(id={self.id})"

    @property
    def sync_context(self) -> "SyncContext":
        """Lazy accessor for the sync context wrapper."""
        if self._sync_context is None:
            self._sync_context = SyncContext(self)
        return self._sync_context

    @property
    def async_context(self) -> "AsyncContext":
        """Lazy accessor for the async context wrapper."""
        if self._async_context is None:
            self._async_context = AsyncContext(self)
        return self._async_context

    def _awake_interval(self, time_interval: float | int, task: "TaskBase") -> None:
        """Schedule a task to re-awaken after a time interval.

        Args:
            time_interval: Seconds to wait before the task can re-awaken.
            task: The task to schedule.
        """
        self.awake_time[task] = time.time() + time_interval

async_context property

Lazy accessor for the async context wrapper.

sync_context property

Lazy accessor for the sync context wrapper.

__init__(runtime, parent=None, name='')

Initialize a context with optional runtime, parent, and name.

Parameters:

Name Type Description Default
runtime Runtime | None

The runtime for tracking active context count.

required
parent Context | None

Optional parent context for hierarchical contexts.

None
name str

Optional context name; defaults to a UUID.

''
Source code in shutils/dag/context.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def __init__(self, runtime: "Runtime | None", parent: "Context | None" = None, name: str = ""):
    """Initialize a context with optional runtime, parent, and name.

    Args:
        runtime: The runtime for tracking active context count.
        parent: Optional parent context for hierarchical contexts.
        name: Optional context name; defaults to a UUID.
    """
    DataWhiteBoardMixin.__init__(self)
    TaskStateMixin.__init__(self)
    self.id = name if name else str(uuid.uuid4())
    self.parent_rwlock = SmartRWLock()
    self._parent_context: Context | None = parent
    self._child_context_list: list[Context] = []
    self._child_context_num: int = 0
    self._runtime = runtime
    self.freezing: bool = False
    self.awake_time: dict[TaskBase, float] = {}
    if self._runtime:
        self._runtime.sync_counter.increase()
    if parent:
        # if parent._parent_context is not None:
        #     raise ValueError("parent context must be a root context")
        with parent.parent_rwlock.write():
            parent._child_context_num += 1
            parent._child_context_list.append(self)
    self._sync_context = None
    self._async_context = None

LoopContext

Bases: Context

Context used for loop tasks, pre-activating the loop task.

Source code in shutils/dag/context.py
360
361
362
363
364
365
366
367
368
369
370
371
372
class LoopContext(Context):
    """Context used for loop tasks, pre-activating the loop task."""

    def __init__(self, runtime: "Runtime | None", task: "TaskBase", name: str = "LoopContext"):
        """Initialize a LoopContext with the loop task already available.

        Args:
            runtime: The runtime for tracking active context count.
            task: The loop task to make immediately available.
            name: Optional context name.
        """
        super().__init__(runtime, name=name)
        self._add_available_task(task)

__init__(runtime, task, name='LoopContext')

Initialize a LoopContext with the loop task already available.

Parameters:

Name Type Description Default
runtime Runtime | None

The runtime for tracking active context count.

required
task TaskBase

The loop task to make immediately available.

required
name str

Optional context name.

'LoopContext'
Source code in shutils/dag/context.py
363
364
365
366
367
368
369
370
371
372
def __init__(self, runtime: "Runtime | None", task: "TaskBase", name: str = "LoopContext"):
    """Initialize a LoopContext with the loop task already available.

    Args:
        runtime: The runtime for tracking active context count.
        task: The loop task to make immediately available.
        name: Optional context name.
    """
    super().__init__(runtime, name=name)
    self._add_available_task(task)

OutputContext

Bases: Context

Terminal context that holds the final output of DAG execution.

Source code in shutils/dag/context.py
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
class OutputContext(Context):
    """Terminal context that holds the final output of DAG execution."""

    def __init__(self, context: Context | None = None, name: str = "OutputContext"):
        """Initialize an OutputContext, optionally copying data from another context.

        Args:
            context: Optional source context to copy ID and data from.
            name: Optional context name.
        """
        super().__init__(None, name=name)
        if context:
            self.id = context.id
            context.sync_white_board.copy(self)

    async def acopy(self, context: Context):
        """Async copy ID and data from another context.

        Args:
            context: The source context to copy from.
        """
        self.id = context.id
        await context.async_white_board.copy(self)


    def asdit(self) -> dict:
        """Return the raw data dictionary."""
        return self._data

__init__(context=None, name='OutputContext')

Initialize an OutputContext, optionally copying data from another context.

Parameters:

Name Type Description Default
context Context | None

Optional source context to copy ID and data from.

None
name str

Optional context name.

'OutputContext'
Source code in shutils/dag/context.py
400
401
402
403
404
405
406
407
408
409
410
def __init__(self, context: Context | None = None, name: str = "OutputContext"):
    """Initialize an OutputContext, optionally copying data from another context.

    Args:
        context: Optional source context to copy ID and data from.
        name: Optional context name.
    """
    super().__init__(None, name=name)
    if context:
        self.id = context.id
        context.sync_white_board.copy(self)

acopy(context) async

Async copy ID and data from another context.

Parameters:

Name Type Description Default
context Context

The source context to copy from.

required
Source code in shutils/dag/context.py
412
413
414
415
416
417
418
419
async def acopy(self, context: Context):
    """Async copy ID and data from another context.

    Args:
        context: The source context to copy from.
    """
    self.id = context.id
    await context.async_white_board.copy(self)

asdit()

Return the raw data dictionary.

Source code in shutils/dag/context.py
422
423
424
def asdit(self) -> dict:
    """Return the raw data dictionary."""
    return self._data

RateLimitContext

Bases: Context

Context wrapper indicating a rate-limit throttle event.

Source code in shutils/dag/context.py
375
376
377
378
379
380
381
382
383
384
385
386
class RateLimitContext(Context):
    """Context wrapper indicating a rate-limit throttle event."""

    def __init__(self, context: Context):
        """Initialize from an existing context.

        Args:
            context: The original context that was rate-limited.
        """
        self.context = context
        self.id = f"RateLimit#{self.context.id}"
        self.freezing = False

__init__(context)

Initialize from an existing context.

Parameters:

Name Type Description Default
context Context

The original context that was rate-limited.

required
Source code in shutils/dag/context.py
378
379
380
381
382
383
384
385
386
def __init__(self, context: Context):
    """Initialize from an existing context.

    Args:
        context: The original context that was rate-limited.
    """
    self.context = context
    self.id = f"RateLimit#{self.context.id}"
    self.freezing = False

StopContext

Bases: Context

Signal context that instructs workers to stop processing.

Source code in shutils/dag/context.py
389
390
391
392
393
394
class StopContext(Context):
    """Signal context that instructs workers to stop processing."""

    def __init__(self, name: str = "StopContext"):
        """Initialize a stop signal context."""
        super().__init__(None, name=name)

__init__(name='StopContext')

Initialize a stop signal context.

Source code in shutils/dag/context.py
392
393
394
def __init__(self, name: str = "StopContext"):
    """Initialize a stop signal context."""
    super().__init__(None, name=name)

SyncContext

Bases: SyncDataWhiteBoard, SyncTaskState

Synchronous wrapper around Context for thread-safe operations.

Source code in shutils/dag/context.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
class SyncContext(SyncDataWhiteBoard, SyncTaskState):
    """Synchronous wrapper around Context for thread-safe operations."""

    def __init__(self, context: Context):
        """Initialize with the underlying context.

        Args:
            context: The core context object.
        """
        SyncDataWhiteBoard.__init__(self, context)
        SyncTaskState.__init__(self, context)
        self.__context = context

    @property
    def id(self) -> str:
        """Unique identifier of the context."""
        return self.__context.id

    @property
    def context(self) -> Context:
        """Access the underlying core context."""
        return self.__context

    @property
    def async_context(self) -> "AsyncContext":
        """Access the async context wrapper for the same core context."""
        return self.__context.async_context

    def freeze(self):
        """Freeze the context to prevent it from being scheduled."""
        self.__context.freezing = True

    def thraw(self):
        """Unfreeze the context to allow scheduling again."""
        self.__context.freezing = False

    def destory(self, destory_parent: bool = False):
        """Destroy the context and release runtime resources.

        Args:
            destory_parent: If True, also destroy the parent when this is its last child.
        """
        if self.__context.is_destory():
            return

        self.__context.set_destory(True)
        if self.__context._runtime:
            self.__context._runtime.sync_counter.decrease()
        if self.__context._parent_context:
            with self.__context._parent_context.parent_rwlock.write():
                self.__context._parent_context._child_context_num -= 1
                if destory_parent and self.__context._parent_context._child_context_num == 0:
                        self.__context._parent_context.sync_context.destory()
        if self.__context._child_context_list:
            for child in self.__context._child_context_list:
                child.sync_context.destory()

    def create(
        self, copy_data: bool = False, deep_copy: bool = False, name: str = "", skip_complete: bool = False
    ) -> "SyncContext":
        """Create a new context, optionally copying data from this one.

        Args:
            copy_data: Whether to copy data to the new context.
            deep_copy: Whether to deep-copy data instead of sharing references.
            name: Optional name for the new context.
            skip_complete: If True, skip completion tracking for the new context.

        Returns:
            The sync wrapper of the new context.
        """
        new_context = Context(self.__context._runtime, name=name)
        if copy_data:
            self.copy(new_context, deep_copy)
        new_context._skip_complete = skip_complete
        return new_context.sync_context

    def child_context_num(self) -> int:
        """Get the number of child contexts."""
        with self.__context.parent_rwlock.read():
            return self.__context._child_context_num

    def iter_child_context(self):
        """Iterate over child contexts under a read lock."""
        with self.__context.parent_rwlock.read():
            yield from self.__context._child_context_list

    def parent_context(self) -> "Context | None":
        """Get the parent context."""
        with self.__context.parent_rwlock.read():
            return self.__context._parent_context

    def create_child(self, num: int = 0) -> "list[SyncContext] | SyncContext":
        """Create one or more child contexts.

        Args:
            num: Number of children to create. 0 creates a single child.

        Returns:
            A single SyncContext if num is 0, otherwise a list of SyncContexts.
        """
        if num:
            return [Context(self.__context._runtime, self.__context).sync_context for _ in range(num)]
        else:
            return Context(self.__context._runtime, self.__context).sync_context

    def complete(self, task: "TaskBase"):
        """Mark a task as completed and propagate to the parent context."""
        super()._complete(task)
        if self.__context._parent_context:
            self.__context._parent_context.sync_context.complete(task)

async_context property

Access the async context wrapper for the same core context.

context property

Access the underlying core context.

id property

Unique identifier of the context.

__init__(context)

Initialize with the underlying context.

Parameters:

Name Type Description Default
context Context

The core context object.

required
Source code in shutils/dag/context.py
105
106
107
108
109
110
111
112
113
def __init__(self, context: Context):
    """Initialize with the underlying context.

    Args:
        context: The core context object.
    """
    SyncDataWhiteBoard.__init__(self, context)
    SyncTaskState.__init__(self, context)
    self.__context = context

child_context_num()

Get the number of child contexts.

Source code in shutils/dag/context.py
179
180
181
182
def child_context_num(self) -> int:
    """Get the number of child contexts."""
    with self.__context.parent_rwlock.read():
        return self.__context._child_context_num

complete(task)

Mark a task as completed and propagate to the parent context.

Source code in shutils/dag/context.py
208
209
210
211
212
def complete(self, task: "TaskBase"):
    """Mark a task as completed and propagate to the parent context."""
    super()._complete(task)
    if self.__context._parent_context:
        self.__context._parent_context.sync_context.complete(task)

create(copy_data=False, deep_copy=False, name='', skip_complete=False)

Create a new context, optionally copying data from this one.

Parameters:

Name Type Description Default
copy_data bool

Whether to copy data to the new context.

False
deep_copy bool

Whether to deep-copy data instead of sharing references.

False
name str

Optional name for the new context.

''
skip_complete bool

If True, skip completion tracking for the new context.

False

Returns:

Type Description
SyncContext

The sync wrapper of the new context.

Source code in shutils/dag/context.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def create(
    self, copy_data: bool = False, deep_copy: bool = False, name: str = "", skip_complete: bool = False
) -> "SyncContext":
    """Create a new context, optionally copying data from this one.

    Args:
        copy_data: Whether to copy data to the new context.
        deep_copy: Whether to deep-copy data instead of sharing references.
        name: Optional name for the new context.
        skip_complete: If True, skip completion tracking for the new context.

    Returns:
        The sync wrapper of the new context.
    """
    new_context = Context(self.__context._runtime, name=name)
    if copy_data:
        self.copy(new_context, deep_copy)
    new_context._skip_complete = skip_complete
    return new_context.sync_context

create_child(num=0)

Create one or more child contexts.

Parameters:

Name Type Description Default
num int

Number of children to create. 0 creates a single child.

0

Returns:

Type Description
list[SyncContext] | SyncContext

A single SyncContext if num is 0, otherwise a list of SyncContexts.

Source code in shutils/dag/context.py
194
195
196
197
198
199
200
201
202
203
204
205
206
def create_child(self, num: int = 0) -> "list[SyncContext] | SyncContext":
    """Create one or more child contexts.

    Args:
        num: Number of children to create. 0 creates a single child.

    Returns:
        A single SyncContext if num is 0, otherwise a list of SyncContexts.
    """
    if num:
        return [Context(self.__context._runtime, self.__context).sync_context for _ in range(num)]
    else:
        return Context(self.__context._runtime, self.__context).sync_context

destory(destory_parent=False)

Destroy the context and release runtime resources.

Parameters:

Name Type Description Default
destory_parent bool

If True, also destroy the parent when this is its last child.

False
Source code in shutils/dag/context.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def destory(self, destory_parent: bool = False):
    """Destroy the context and release runtime resources.

    Args:
        destory_parent: If True, also destroy the parent when this is its last child.
    """
    if self.__context.is_destory():
        return

    self.__context.set_destory(True)
    if self.__context._runtime:
        self.__context._runtime.sync_counter.decrease()
    if self.__context._parent_context:
        with self.__context._parent_context.parent_rwlock.write():
            self.__context._parent_context._child_context_num -= 1
            if destory_parent and self.__context._parent_context._child_context_num == 0:
                    self.__context._parent_context.sync_context.destory()
    if self.__context._child_context_list:
        for child in self.__context._child_context_list:
            child.sync_context.destory()

freeze()

Freeze the context to prevent it from being scheduled.

Source code in shutils/dag/context.py
130
131
132
def freeze(self):
    """Freeze the context to prevent it from being scheduled."""
    self.__context.freezing = True

iter_child_context()

Iterate over child contexts under a read lock.

Source code in shutils/dag/context.py
184
185
186
187
def iter_child_context(self):
    """Iterate over child contexts under a read lock."""
    with self.__context.parent_rwlock.read():
        yield from self.__context._child_context_list

parent_context()

Get the parent context.

Source code in shutils/dag/context.py
189
190
191
192
def parent_context(self) -> "Context | None":
    """Get the parent context."""
    with self.__context.parent_rwlock.read():
        return self.__context._parent_context

thraw()

Unfreeze the context to allow scheduling again.

Source code in shutils/dag/context.py
134
135
136
def thraw(self):
    """Unfreeze the context to allow scheduling again."""
    self.__context.freezing = False