Skip to content

Task Queue

Queue for scheduling and dispatching DAG tasks.

Task-based priority queue for single-level DAG executor scheduling.

TaskItem dataclass

Represents a single task to be executed in the global task queue.

Combines context and task information for task-level scheduling.

Source code in shutils/dag/task_queue.py
35
36
37
38
39
40
41
42
43
44
@dataclass(order=True)
class TaskItem:
    """Represents a single task to be executed in the global task queue.

    Combines context and task information for task-level scheduling.
    """
    priority: int
    sequence: int
    context: Context = field(compare=False)
    task: TaskBase = field(compare=False)

TaskPriority

Bases: Enum

Priority levels for task scheduling.

LIFO_HIGH (0): Tasks from a context that was just executed. Ensures the same DAG path is prioritized. FIFO_HIGH (1): Tasks from newly created contexts. FIFO_LOW (2): Tasks from LoopContext (background/loop work).

Source code in shutils/dag/task_queue.py
22
23
24
25
26
27
28
29
30
31
32
class TaskPriority(Enum):
    """Priority levels for task scheduling.

    LIFO_HIGH (0): Tasks from a context that was just executed.
                   Ensures the same DAG path is prioritized.
    FIFO_HIGH (1): Tasks from newly created contexts.
    FIFO_LOW (2): Tasks from LoopContext (background/loop work).
    """
    LIFO_HIGH = 0
    FIFO_HIGH = 1
    FIFO_LOW = 2

TaskPriorityQueue

Global task queue with priority support.

Replaces context-based queue with task-based scheduling for SimplifiedExecutor. Each task is individually queued with its priority, allowing fine-grained concurrency control.

Source code in shutils/dag/task_queue.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
class TaskPriorityQueue:
    """Global task queue with priority support.

    Replaces context-based queue with task-based scheduling for
    SimplifiedExecutor. Each task is individually queued with its
    priority, allowing fine-grained concurrency control.
    """

    def __init__(self):
        self._queue: PriorityQueue[TaskItem] = PriorityQueue()
        self._sequence_counters: dict[int, int] = {
            TaskPriority.LIFO_HIGH.value: 0,
            TaskPriority.FIFO_HIGH.value: 0,
            TaskPriority.FIFO_LOW.value: 0,
        }
        self._lock = SmartLock()

    async def async_put_task(
        self,
        context: Context,
        task: TaskBase,
        priority: TaskPriority = TaskPriority.FIFO_HIGH
    ) -> None:
        """Put a single task into the queue with priority.

        Args:
            context: The context containing this task
            task: The task to execute
            priority: Priority level for this task
            context_priority: Original context priority for tracking
        """
        async with self._lock.async_lock():
            seq = self._sequence_counters[priority.value]
            self._sequence_counters[priority.value] += 1

            item = TaskItem(
                priority=priority.value,
                sequence=seq,
                context=context,
                task=task,
            )
            await self._queue.async_q.put(item)

    async def async_get_task(self) -> TaskItem:
        """Get a task from the queue.

        Returns:
            TaskItem containing the context and task to execute

        Raises:
            asyncio.CancelledError: If the get operation is cancelled
        """
        ret = await self._queue.async_q.get()
        self._queue.async_q.task_done()
        return ret

    async def async_put_context_tasks(
        self,
        context: Context,
        priority: TaskPriority = TaskPriority.FIFO_HIGH
    ) -> int:
        """Put all available tasks from a context into the queue.

        Used when a context first enters the system or when new tasks
        become available after task completion.

        Args:
            context: The context to get tasks from
            priority: Priority level for all tasks from this context
            context_priority: Original context priority for tracking

        Returns:
            Number of tasks enqueued
        """


        if context.is_destory():
            return 0

        available_tasks = await context.async_task_state.avaliable_task()
        count = 0

        logger.debug(f"async_put_context_tasks: context={context}, available_tasks={available_tasks}")
        for task in available_tasks:
            await self.async_put_task(context, task, priority)
            count += 1

        return count

    @property
    def size(self) -> int:
        """Get the current queue size."""
        return self._queue.async_q.qsize()

    async def join(self) -> None:
        """Wait for all items in the queue to be processed."""
        await self._queue.async_q.join()

size property

Get the current queue size.

async_get_task() async

Get a task from the queue.

Returns:

Type Description
TaskItem

TaskItem containing the context and task to execute

Raises:

Type Description
CancelledError

If the get operation is cancelled

Source code in shutils/dag/task_queue.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
async def async_get_task(self) -> TaskItem:
    """Get a task from the queue.

    Returns:
        TaskItem containing the context and task to execute

    Raises:
        asyncio.CancelledError: If the get operation is cancelled
    """
    ret = await self._queue.async_q.get()
    self._queue.async_q.task_done()
    return ret

async_put_context_tasks(context, priority=TaskPriority.FIFO_HIGH) async

Put all available tasks from a context into the queue.

Used when a context first enters the system or when new tasks become available after task completion.

Parameters:

Name Type Description Default
context Context

The context to get tasks from

required
priority TaskPriority

Priority level for all tasks from this context

FIFO_HIGH
context_priority

Original context priority for tracking

required

Returns:

Type Description
int

Number of tasks enqueued

Source code in shutils/dag/task_queue.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
async def async_put_context_tasks(
    self,
    context: Context,
    priority: TaskPriority = TaskPriority.FIFO_HIGH
) -> int:
    """Put all available tasks from a context into the queue.

    Used when a context first enters the system or when new tasks
    become available after task completion.

    Args:
        context: The context to get tasks from
        priority: Priority level for all tasks from this context
        context_priority: Original context priority for tracking

    Returns:
        Number of tasks enqueued
    """


    if context.is_destory():
        return 0

    available_tasks = await context.async_task_state.avaliable_task()
    count = 0

    logger.debug(f"async_put_context_tasks: context={context}, available_tasks={available_tasks}")
    for task in available_tasks:
        await self.async_put_task(context, task, priority)
        count += 1

    return count

async_put_task(context, task, priority=TaskPriority.FIFO_HIGH) async

Put a single task into the queue with priority.

Parameters:

Name Type Description Default
context Context

The context containing this task

required
task TaskBase

The task to execute

required
priority TaskPriority

Priority level for this task

FIFO_HIGH
context_priority

Original context priority for tracking

required
Source code in shutils/dag/task_queue.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
async def async_put_task(
    self,
    context: Context,
    task: TaskBase,
    priority: TaskPriority = TaskPriority.FIFO_HIGH
) -> None:
    """Put a single task into the queue with priority.

    Args:
        context: The context containing this task
        task: The task to execute
        priority: Priority level for this task
        context_priority: Original context priority for tracking
    """
    async with self._lock.async_lock():
        seq = self._sequence_counters[priority.value]
        self._sequence_counters[priority.value] += 1

        item = TaskItem(
            priority=priority.value,
            sequence=seq,
            context=context,
            task=task,
        )
        await self._queue.async_q.put(item)

join() async

Wait for all items in the queue to be processed.

Source code in shutils/dag/task_queue.py
140
141
142
async def join(self) -> None:
    """Wait for all items in the queue to be processed."""
    await self._queue.async_q.join()