Skip to content

Task State

State management for DAG tasks.

Task state tracking for DAG contexts, including completion and retry logic.

AsyncTaskState

Asynchronous wrapper for task state operations.

Source code in shutils/dag/task_state.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
class AsyncTaskState:
    """Asynchronous wrapper for task state operations."""

    def __init__(self, task_state: TaskStateMixin):
        """Initialize with the underlying task state mixin.

        Args:
            task_state: The mixin instance to wrap.
        """
        self.__task_state = task_state

    async def read_wrapper[T](self, func: Callable[..., T], *args, **kwargs):
        """Execute a function under an async read lock."""
        async with self.__task_state._task_alock.read():
            return func(*args, **kwargs)

    async def write_wrapper[T](self, func: Callable[..., T], *args, **kwargs):
        """Execute a function under an async write lock."""
        async with self.__task_state._task_alock.write():
            return func(*args, **kwargs)

    def _complete(self, task: "TaskBase"):
        return self.write_wrapper(self.__task_state.sync_task_state._complete, task)

    def avaliable_task(self):
        """Return available tasks via async read wrapper."""
        return self.read_wrapper(self.__task_state.sync_task_state.avaliable_task)

    async def retry(self, task: "TaskBase"):
        """Increment and return the retry count for a task.

        Args:
            task: The task being retried.

        Returns:
            The new retry count.
        """
        async with self.__task_state._retry_alock:
            if task in self.__task_state._retry_dict:
                self.__task_state._retry_dict[task] += 1
            else:
                self.__task_state._retry_dict[task] = 1
            return self.__task_state._retry_dict[task]

__init__(task_state)

Initialize with the underlying task state mixin.

Parameters:

Name Type Description Default
task_state TaskStateMixin

The mixin instance to wrap.

required
Source code in shutils/dag/task_state.py
131
132
133
134
135
136
137
def __init__(self, task_state: TaskStateMixin):
    """Initialize with the underlying task state mixin.

    Args:
        task_state: The mixin instance to wrap.
    """
    self.__task_state = task_state

avaliable_task()

Return available tasks via async read wrapper.

Source code in shutils/dag/task_state.py
152
153
154
def avaliable_task(self):
    """Return available tasks via async read wrapper."""
    return self.read_wrapper(self.__task_state.sync_task_state.avaliable_task)

read_wrapper(func, *args, **kwargs) async

Execute a function under an async read lock.

Source code in shutils/dag/task_state.py
139
140
141
142
async def read_wrapper[T](self, func: Callable[..., T], *args, **kwargs):
    """Execute a function under an async read lock."""
    async with self.__task_state._task_alock.read():
        return func(*args, **kwargs)

retry(task) async

Increment and return the retry count for a task.

Parameters:

Name Type Description Default
task TaskBase

The task being retried.

required

Returns:

Type Description

The new retry count.

Source code in shutils/dag/task_state.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
async def retry(self, task: "TaskBase"):
    """Increment and return the retry count for a task.

    Args:
        task: The task being retried.

    Returns:
        The new retry count.
    """
    async with self.__task_state._retry_alock:
        if task in self.__task_state._retry_dict:
            self.__task_state._retry_dict[task] += 1
        else:
            self.__task_state._retry_dict[task] = 1
        return self.__task_state._retry_dict[task]

write_wrapper(func, *args, **kwargs) async

Execute a function under an async write lock.

Source code in shutils/dag/task_state.py
144
145
146
147
async def write_wrapper[T](self, func: Callable[..., T], *args, **kwargs):
    """Execute a function under an async write lock."""
    async with self.__task_state._task_alock.write():
        return func(*args, **kwargs)

ErrorInfo dataclass

Information about an error that occurred during task execution.

Attributes:

Name Type Description
has_error bool

Whether an error occurred.

exception Exception | None

The exception instance, if any.

error_node str | None

The ID of the task that caused the error.

Source code in shutils/dag/task_state.py
21
22
23
24
25
26
27
28
29
30
31
32
@dataclass
class ErrorInfo:
    """Information about an error that occurred during task execution.

    Attributes:
        has_error: Whether an error occurred.
        exception: The exception instance, if any.
        error_node: The ID of the task that caused the error.
    """
    has_error: bool = False
    exception: Exception | None = None
    error_node: str | None = None

SyncTaskState

Synchronous thread-safe wrapper for task state operations.

Source code in shutils/dag/task_state.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
class SyncTaskState:
    """Synchronous thread-safe wrapper for task state operations."""

    def __init__(self, task_state: TaskStateMixin):
        """Initialize with the underlying task state mixin.

        Args:
            task_state: The mixin instance to wrap.
        """
        self.__task_state = task_state

    def _complete(self, task: "TaskBase"):
        with self.__task_state._task_lock.write():
            self.__task_state._complete(task)

    def avaliable_task(self):
        """Return a list of currently available tasks under a read lock."""
        with self.__task_state._task_lock.read():
            return list(self.__task_state._available_tasks)

__init__(task_state)

Initialize with the underlying task state mixin.

Parameters:

Name Type Description Default
task_state TaskStateMixin

The mixin instance to wrap.

required
Source code in shutils/dag/task_state.py
109
110
111
112
113
114
115
def __init__(self, task_state: TaskStateMixin):
    """Initialize with the underlying task state mixin.

    Args:
        task_state: The mixin instance to wrap.
    """
    self.__task_state = task_state

avaliable_task()

Return a list of currently available tasks under a read lock.

Source code in shutils/dag/task_state.py
121
122
123
124
def avaliable_task(self):
    """Return a list of currently available tasks under a read lock."""
    with self.__task_state._task_lock.read():
        return list(self.__task_state._available_tasks)

TaskStateMixin

Mixin providing task completion tracking and available-task discovery.

Source code in shutils/dag/task_state.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
class TaskStateMixin:
    """Mixin providing task completion tracking and available-task discovery."""

    def __init__(self):
        """Initialize task state with empty completion and availability sets."""
        self._completed_tasks: set[TaskBase] = set()
        self._available_tasks: set[TaskBase] = set()
        self._task_lock: RWLock = RWLock()
        self._task_alock: AsyncRWLock = AsyncRWLock()

        self._retry_dict: dict[TaskBase, int] = {}
        self._retry_alock: asyncio.Lock = asyncio.Lock()

        self._destory: bool = False

        self._error_info: ErrorInfo = ErrorInfo()
        self._skip_complete: bool = False

    def __repr__(self):
        return f"TaskState(destory={self._destory}, error_info={self._error_info})"

    def is_destory(self) -> bool:
        """Check if this context has been destroyed."""
        return self._destory

    def set_destory(self, value: bool):
        """Set the destroyed flag."""
        self._destory = value

    @property
    def available_tasks(self) -> set["TaskBase"]:
        """Set of tasks whose upstream dependencies are all completed."""
        return self._available_tasks

    def _add_available_task(self, task: "TaskBase"):
        """Mark a task as available for execution."""
        self._available_tasks.add(task)

    @property
    def error_info(self) -> ErrorInfo:
        return self._error_info

    @error_info.setter
    def error_info(self, value: ErrorInfo):
        self._error_info = value

    @property
    def sync_task_state(self) -> "SyncTaskState":
        """Lazy accessor for the sync task state wrapper."""
        return SyncTaskState(self)

    @property
    def async_task_state(self) -> "AsyncTaskState":
        """Lazy accessor for the async task state wrapper."""
        return AsyncTaskState(self)

    def _complete(self, task: "TaskBase"):
        if self._skip_complete:
            self._skip_complete = False
            return
        if task not in self._completed_tasks:
            self._available_tasks.discard(task)
            self._completed_tasks.add(task)

            for down_task in task.downstream_tasks:
                if down_task in self._completed_tasks:
                    continue
                if all(up_task in self._completed_tasks for up_task in down_task.upstream_tasks):
                    self._available_tasks.add(down_task)

async_task_state property

Lazy accessor for the async task state wrapper.

available_tasks property

Set of tasks whose upstream dependencies are all completed.

sync_task_state property

Lazy accessor for the sync task state wrapper.

__init__()

Initialize task state with empty completion and availability sets.

Source code in shutils/dag/task_state.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def __init__(self):
    """Initialize task state with empty completion and availability sets."""
    self._completed_tasks: set[TaskBase] = set()
    self._available_tasks: set[TaskBase] = set()
    self._task_lock: RWLock = RWLock()
    self._task_alock: AsyncRWLock = AsyncRWLock()

    self._retry_dict: dict[TaskBase, int] = {}
    self._retry_alock: asyncio.Lock = asyncio.Lock()

    self._destory: bool = False

    self._error_info: ErrorInfo = ErrorInfo()
    self._skip_complete: bool = False

is_destory()

Check if this context has been destroyed.

Source code in shutils/dag/task_state.py
56
57
58
def is_destory(self) -> bool:
    """Check if this context has been destroyed."""
    return self._destory

set_destory(value)

Set the destroyed flag.

Source code in shutils/dag/task_state.py
60
61
62
def set_destory(self, value: bool):
    """Set the destroyed flag."""
    self._destory = value