Task Queue
Queue for scheduling and dispatching DAG tasks.
Task-based priority queue for single-level DAG executor scheduling.
TaskItem
dataclass
Represents a single task to be executed in the global task queue.
Combines context and task information for task-level scheduling.
Source code in shutils/dag/task_queue.py
35 36 37 38 39 40 41 42 43 44 | |
TaskPriority
Bases: Enum
Priority levels for task scheduling.
LIFO_HIGH (0): Tasks from a context that was just executed. Ensures the same DAG path is prioritized. FIFO_HIGH (1): Tasks from newly created contexts. FIFO_LOW (2): Tasks from LoopContext (background/loop work).
Source code in shutils/dag/task_queue.py
22 23 24 25 26 27 28 29 30 31 32 | |
TaskPriorityQueue
Global task queue with priority support.
Replaces context-based queue with task-based scheduling for SimplifiedExecutor. Each task is individually queued with its priority, allowing fine-grained concurrency control.
Source code in shutils/dag/task_queue.py
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 | |
size
property
Get the current queue size.
async_get_task()
async
Get a task from the queue.
Returns:
| Type | Description |
|---|---|
TaskItem
|
TaskItem containing the context and task to execute |
Raises:
| Type | Description |
|---|---|
CancelledError
|
If the get operation is cancelled |
Source code in shutils/dag/task_queue.py
89 90 91 92 93 94 95 96 97 98 99 100 | |
async_put_context_tasks(context, priority=TaskPriority.FIFO_HIGH)
async
Put all available tasks from a context into the queue.
Used when a context first enters the system or when new tasks become available after task completion.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
Context
|
The context to get tasks from |
required |
priority
|
TaskPriority
|
Priority level for all tasks from this context |
FIFO_HIGH
|
context_priority
|
Original context priority for tracking |
required |
Returns:
| Type | Description |
|---|---|
int
|
Number of tasks enqueued |
Source code in shutils/dag/task_queue.py
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 | |
async_put_task(context, task, priority=TaskPriority.FIFO_HIGH)
async
Put a single task into the queue with priority.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
Context
|
The context containing this task |
required |
task
|
TaskBase
|
The task to execute |
required |
priority
|
TaskPriority
|
Priority level for this task |
FIFO_HIGH
|
context_priority
|
Original context priority for tracking |
required |
Source code in shutils/dag/task_queue.py
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 | |
join()
async
Wait for all items in the queue to be processed.
Source code in shutils/dag/task_queue.py
140 141 142 | |