Skip to content

Context Queue

Queue for managing DAG execution contexts.

Priority-based context queue for DAG executors.

AsyncContextQueue

Asynchronous interface for the context queue.

Source code in shutils/dag/context_queue.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
class AsyncContextQueue:
    """Asynchronous interface for the context queue."""

    def __init__(self, context_queue: ContextQueue):
        """Initialize with the underlying context queue.

        Args:
            context_queue: The shared context queue instance.
        """
        self.__context_queue = context_queue

    async def get(self) -> Context:
        """Async get the highest-priority context from the queue."""
        ret = await self.__context_queue._priority_queue.async_q.get()
        return ret.item

    @asynccontextmanager
    async def _get_with_context(self) -> AsyncGenerator[Context]:
        get_context = False
        try:
            yield await self.get()
            get_context = True
        finally:
            if get_context:
                self.__context_queue._priority_queue.async_q.task_done()

    async def put(self, context: Context, priority: ContextPriority = ContextPriority.FIFO_HIGH):
        """Async put a context into the queue with the given priority.

        Args:
            context: The context to enqueue.
            priority: The priority level for scheduling.
        """
        async def runner():
            item = PrioritizedItem(priority.value, self.__context_queue._counter_dict[priority], context)
            await self.__context_queue._priority_queue.async_q.put(item)
            if priority == ContextPriority.LIFO:
                self.__context_queue._counter_dict[priority] -= 1
            else:
                self.__context_queue._counter_dict[priority] += 1

        await self.__context_queue._lock.async_run(runner)

__init__(context_queue)

Initialize with the underlying context queue.

Parameters:

Name Type Description Default
context_queue ContextQueue

The shared context queue instance.

required
Source code in shutils/dag/context_queue.py
104
105
106
107
108
109
110
def __init__(self, context_queue: ContextQueue):
    """Initialize with the underlying context queue.

    Args:
        context_queue: The shared context queue instance.
    """
    self.__context_queue = context_queue

get() async

Async get the highest-priority context from the queue.

Source code in shutils/dag/context_queue.py
112
113
114
115
async def get(self) -> Context:
    """Async get the highest-priority context from the queue."""
    ret = await self.__context_queue._priority_queue.async_q.get()
    return ret.item

put(context, priority=ContextPriority.FIFO_HIGH) async

Async put a context into the queue with the given priority.

Parameters:

Name Type Description Default
context Context

The context to enqueue.

required
priority ContextPriority

The priority level for scheduling.

FIFO_HIGH
Source code in shutils/dag/context_queue.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
async def put(self, context: Context, priority: ContextPriority = ContextPriority.FIFO_HIGH):
    """Async put a context into the queue with the given priority.

    Args:
        context: The context to enqueue.
        priority: The priority level for scheduling.
    """
    async def runner():
        item = PrioritizedItem(priority.value, self.__context_queue._counter_dict[priority], context)
        await self.__context_queue._priority_queue.async_q.put(item)
        if priority == ContextPriority.LIFO:
            self.__context_queue._counter_dict[priority] -= 1
        else:
            self.__context_queue._counter_dict[priority] += 1

    await self.__context_queue._lock.async_run(runner)

ContextPriority

Bases: Enum

Priority levels for context scheduling.

Source code in shutils/dag/context_queue.py
20
21
22
23
24
25
class ContextPriority(Enum):
    """Priority levels for context scheduling."""

    LIFO = 0
    FIFO_HIGH = 1
    FIFO_LOW = 2

ContextQueue

Priority queue for contexts with sync and async interfaces.

Source code in shutils/dag/context_queue.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class ContextQueue:
    """Priority queue for contexts with sync and async interfaces."""

    def __init__(self):
        """Initialize the context queue with per-priority counters."""
        self._priority_queue: PriorityQueue[PrioritizedItem] = PriorityQueue()
        self._counter_dict: dict[ContextPriority, int] = {}
        self._lock = SmartLock()
        self._sync_queue = None
        self._async_queue = None
        for priority in ContextPriority:
            self._counter_dict[priority] = 0

    @property
    def sync_queue(self) -> "SyncContextQueue":
        """Lazy accessor for the synchronous queue interface."""
        if self._sync_queue is None:
            self._sync_queue = SyncContextQueue(self)
        return self._sync_queue

    @property
    def async_queue(self) -> "AsyncContextQueue":
        """Lazy accessor for the asynchronous queue interface."""
        if self._async_queue is None:
            self._async_queue = AsyncContextQueue(self)
        return self._async_queue

async_queue property

Lazy accessor for the asynchronous queue interface.

sync_queue property

Lazy accessor for the synchronous queue interface.

__init__()

Initialize the context queue with per-priority counters.

Source code in shutils/dag/context_queue.py
44
45
46
47
48
49
50
51
52
def __init__(self):
    """Initialize the context queue with per-priority counters."""
    self._priority_queue: PriorityQueue[PrioritizedItem] = PriorityQueue()
    self._counter_dict: dict[ContextPriority, int] = {}
    self._lock = SmartLock()
    self._sync_queue = None
    self._async_queue = None
    for priority in ContextPriority:
        self._counter_dict[priority] = 0

PrioritizedItem dataclass

Context queue item with priority and sequence for ordering.

Attributes:

Name Type Description
priority int

Numeric priority value.

sequence int

Sequence number for ordering within the same priority.

item Context

The context payload.

Source code in shutils/dag/context_queue.py
27
28
29
30
31
32
33
34
35
36
37
38
@dataclass(order=True)
class PrioritizedItem:
    """Context queue item with priority and sequence for ordering.

    Attributes:
        priority: Numeric priority value.
        sequence: Sequence number for ordering within the same priority.
        item: The context payload.
    """
    priority: int
    sequence: int
    item: Context = field(compare=False)

SyncContextQueue

Synchronous blocking interface for the context queue.

Source code in shutils/dag/context_queue.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class SyncContextQueue:
    """Synchronous blocking interface for the context queue."""

    def __init__(self, context_queue: ContextQueue):
        """Initialize with the underlying context queue.

        Args:
            context_queue: The shared context queue instance.
        """
        self.__context_queue = context_queue

    def get(self) -> Context:
        """Get the highest-priority context from the queue."""
        return self.__context_queue._priority_queue.sync_q.get().item

    def put(self, context: Context, priority: ContextPriority = ContextPriority.FIFO_HIGH):
        """Put a context into the queue with the given priority.

        Args:
            context: The context to enqueue.
            priority: The priority level for scheduling.
        """
        def runner():
            self.__context_queue._priority_queue.sync_q.put(
                PrioritizedItem(priority.value, self.__context_queue._counter_dict[priority], context)
            )
            if priority == ContextPriority.LIFO:
                self.__context_queue._counter_dict[priority] -= 1
            else:
                self.__context_queue._counter_dict[priority] += 1

        self.__context_queue._lock.sync_run(runner)

__init__(context_queue)

Initialize with the underlying context queue.

Parameters:

Name Type Description Default
context_queue ContextQueue

The shared context queue instance.

required
Source code in shutils/dag/context_queue.py
71
72
73
74
75
76
77
def __init__(self, context_queue: ContextQueue):
    """Initialize with the underlying context queue.

    Args:
        context_queue: The shared context queue instance.
    """
    self.__context_queue = context_queue

get()

Get the highest-priority context from the queue.

Source code in shutils/dag/context_queue.py
79
80
81
def get(self) -> Context:
    """Get the highest-priority context from the queue."""
    return self.__context_queue._priority_queue.sync_q.get().item

put(context, priority=ContextPriority.FIFO_HIGH)

Put a context into the queue with the given priority.

Parameters:

Name Type Description Default
context Context

The context to enqueue.

required
priority ContextPriority

The priority level for scheduling.

FIFO_HIGH
Source code in shutils/dag/context_queue.py
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def put(self, context: Context, priority: ContextPriority = ContextPriority.FIFO_HIGH):
    """Put a context into the queue with the given priority.

    Args:
        context: The context to enqueue.
        priority: The priority level for scheduling.
    """
    def runner():
        self.__context_queue._priority_queue.sync_q.put(
            PrioritizedItem(priority.value, self.__context_queue._counter_dict[priority], context)
        )
        if priority == ContextPriority.LIFO:
            self.__context_queue._counter_dict[priority] -= 1
        else:
            self.__context_queue._counter_dict[priority] += 1

    self.__context_queue._lock.sync_run(runner)