Skip to content

Data White Board

Shared data board for inter-task communication within a DAG.

Thread-safe and async-safe data whiteboard for sharing data between DAG tasks.

AsyncDataWhiteBoard

Asynchronous interface for the data whiteboard with async lock support.

Source code in shutils/dag/data_white_board.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
class AsyncDataWhiteBoard:
    """Asynchronous interface for the data whiteboard with async lock support."""

    async def read_wrapper[T](self, func: Callable[..., T], *args, **kwargs):
        """Execute a function under an async read lock."""
        async with self.__data_white_board._async_lock.read():
            return func(*args, **kwargs)

    async def write_wrapper[T](self, func: Callable[..., T], *args, **kwargs):
        """Execute a function under an async write lock."""
        async with self.__data_white_board._async_lock.write():
            return func(*args, **kwargs)

    def __init__(self, data_white_board: DataWhiteBoardMixin):
        """Initialize with the underlying whiteboard mixin.

        Args:
            data_white_board: The mixin instance to wrap.
        """
        self.__data_white_board = data_white_board

    def set_item(self, key, value):
        """Set a key-value pair via async write wrapper."""
        return self.write_wrapper(self.__data_white_board.sync_white_board.__setitem__, key, value)

    def get_item(self, key):
        """Get a value by key via async read wrapper."""
        return self.read_wrapper(self.__data_white_board.sync_white_board.__getitem__, key)

    def contains(self, key):
        """Check if a key exists via async read wrapper."""
        return self.read_wrapper(self.__data_white_board.sync_white_board.__contains__, key)

    def len(self):
        """Get the number of items via async read wrapper."""
        return self.read_wrapper(self.__data_white_board.sync_white_board.__len__)

    def iter(self):
        """Iterate over keys via async read wrapper."""
        return self.read_wrapper(self.__data_white_board.sync_white_board.__iter__)

    def repr(self):
        """Get string representation via async read wrapper."""
        return self.read_wrapper(self.__data_white_board.sync_white_board.__repr__)

    def bool(self):
        """Check if the whiteboard is non-empty via async read wrapper."""
        return self.read_wrapper(self.__data_white_board.sync_white_board.__bool__)

    def del_item(self, key):
        """Delete a key via async write wrapper."""
        return self.write_wrapper(self.__data_white_board.sync_white_board.__delitem__, key)

    def set_data(self, **kwargs):
        """Set multiple key-value pairs via async write wrapper."""
        return self.write_wrapper(self.__data_white_board.sync_white_board.set_data, **kwargs)

    def keys(self):
        """Iterate over keys via async read wrapper."""
        return self.read_wrapper(self.__data_white_board.sync_white_board.keys)

    def values(self):
        """Iterate over values via async read wrapper."""
        return self.read_wrapper(self.__data_white_board.sync_white_board.values)

    def items(self):
        """Iterate over items via async read wrapper."""
        return self.read_wrapper(self.__data_white_board.sync_white_board.items)

    def get(self, key, default=None):
        """Get a value by key with an optional default via async read wrapper."""
        return self.read_wrapper(self.__data_white_board.sync_white_board.get, key, default)

    @asynccontextmanager
    async def rlock(self):
        """Acquire both async and sync read locks."""
        async with self.__data_white_board._async_lock.read():
            with self.__data_white_board._sync_lock.read():
                yield

    @asynccontextmanager
    async def wlock(self):
        """Acquire both async and sync write locks."""
        async with self.__data_white_board._async_lock.write():
            with self.__data_white_board._sync_lock.write():
                yield

    async def copy(self, new_white_board: "DataWhiteBoardMixin", deep_copy: bool = False):
        """Async copy data to another whiteboard.

        Args:
            new_white_board: The target whiteboard mixin.
            deep_copy: Whether to deep-copy data instead of sharing references.
        """
        async with self.__data_white_board._async_lock.read():
            with self.__data_white_board._sync_lock.read():
                if deep_copy:
                    new_white_board._data = copy.deepcopy(self.__data_white_board._data)
                else:
                    new_white_board._sync_lock = self.__data_white_board._sync_lock
                    new_white_board._async_lock = self.__data_white_board._async_lock
                    new_white_board._data = self.__data_white_board._data

__init__(data_white_board)

Initialize with the underlying whiteboard mixin.

Parameters:

Name Type Description Default
data_white_board DataWhiteBoardMixin

The mixin instance to wrap.

required
Source code in shutils/dag/data_white_board.py
150
151
152
153
154
155
156
def __init__(self, data_white_board: DataWhiteBoardMixin):
    """Initialize with the underlying whiteboard mixin.

    Args:
        data_white_board: The mixin instance to wrap.
    """
    self.__data_white_board = data_white_board

bool()

Check if the whiteboard is non-empty via async read wrapper.

Source code in shutils/dag/data_white_board.py
182
183
184
def bool(self):
    """Check if the whiteboard is non-empty via async read wrapper."""
    return self.read_wrapper(self.__data_white_board.sync_white_board.__bool__)

contains(key)

Check if a key exists via async read wrapper.

Source code in shutils/dag/data_white_board.py
166
167
168
def contains(self, key):
    """Check if a key exists via async read wrapper."""
    return self.read_wrapper(self.__data_white_board.sync_white_board.__contains__, key)

copy(new_white_board, deep_copy=False) async

Async copy data to another whiteboard.

Parameters:

Name Type Description Default
new_white_board DataWhiteBoardMixin

The target whiteboard mixin.

required
deep_copy bool

Whether to deep-copy data instead of sharing references.

False
Source code in shutils/dag/data_white_board.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
async def copy(self, new_white_board: "DataWhiteBoardMixin", deep_copy: bool = False):
    """Async copy data to another whiteboard.

    Args:
        new_white_board: The target whiteboard mixin.
        deep_copy: Whether to deep-copy data instead of sharing references.
    """
    async with self.__data_white_board._async_lock.read():
        with self.__data_white_board._sync_lock.read():
            if deep_copy:
                new_white_board._data = copy.deepcopy(self.__data_white_board._data)
            else:
                new_white_board._sync_lock = self.__data_white_board._sync_lock
                new_white_board._async_lock = self.__data_white_board._async_lock
                new_white_board._data = self.__data_white_board._data

del_item(key)

Delete a key via async write wrapper.

Source code in shutils/dag/data_white_board.py
186
187
188
def del_item(self, key):
    """Delete a key via async write wrapper."""
    return self.write_wrapper(self.__data_white_board.sync_white_board.__delitem__, key)

get(key, default=None)

Get a value by key with an optional default via async read wrapper.

Source code in shutils/dag/data_white_board.py
206
207
208
def get(self, key, default=None):
    """Get a value by key with an optional default via async read wrapper."""
    return self.read_wrapper(self.__data_white_board.sync_white_board.get, key, default)

get_item(key)

Get a value by key via async read wrapper.

Source code in shutils/dag/data_white_board.py
162
163
164
def get_item(self, key):
    """Get a value by key via async read wrapper."""
    return self.read_wrapper(self.__data_white_board.sync_white_board.__getitem__, key)

items()

Iterate over items via async read wrapper.

Source code in shutils/dag/data_white_board.py
202
203
204
def items(self):
    """Iterate over items via async read wrapper."""
    return self.read_wrapper(self.__data_white_board.sync_white_board.items)

iter()

Iterate over keys via async read wrapper.

Source code in shutils/dag/data_white_board.py
174
175
176
def iter(self):
    """Iterate over keys via async read wrapper."""
    return self.read_wrapper(self.__data_white_board.sync_white_board.__iter__)

keys()

Iterate over keys via async read wrapper.

Source code in shutils/dag/data_white_board.py
194
195
196
def keys(self):
    """Iterate over keys via async read wrapper."""
    return self.read_wrapper(self.__data_white_board.sync_white_board.keys)

len()

Get the number of items via async read wrapper.

Source code in shutils/dag/data_white_board.py
170
171
172
def len(self):
    """Get the number of items via async read wrapper."""
    return self.read_wrapper(self.__data_white_board.sync_white_board.__len__)

read_wrapper(func, *args, **kwargs) async

Execute a function under an async read lock.

Source code in shutils/dag/data_white_board.py
140
141
142
143
async def read_wrapper[T](self, func: Callable[..., T], *args, **kwargs):
    """Execute a function under an async read lock."""
    async with self.__data_white_board._async_lock.read():
        return func(*args, **kwargs)

repr()

Get string representation via async read wrapper.

Source code in shutils/dag/data_white_board.py
178
179
180
def repr(self):
    """Get string representation via async read wrapper."""
    return self.read_wrapper(self.__data_white_board.sync_white_board.__repr__)

rlock() async

Acquire both async and sync read locks.

Source code in shutils/dag/data_white_board.py
210
211
212
213
214
215
@asynccontextmanager
async def rlock(self):
    """Acquire both async and sync read locks."""
    async with self.__data_white_board._async_lock.read():
        with self.__data_white_board._sync_lock.read():
            yield

set_data(**kwargs)

Set multiple key-value pairs via async write wrapper.

Source code in shutils/dag/data_white_board.py
190
191
192
def set_data(self, **kwargs):
    """Set multiple key-value pairs via async write wrapper."""
    return self.write_wrapper(self.__data_white_board.sync_white_board.set_data, **kwargs)

set_item(key, value)

Set a key-value pair via async write wrapper.

Source code in shutils/dag/data_white_board.py
158
159
160
def set_item(self, key, value):
    """Set a key-value pair via async write wrapper."""
    return self.write_wrapper(self.__data_white_board.sync_white_board.__setitem__, key, value)

values()

Iterate over values via async read wrapper.

Source code in shutils/dag/data_white_board.py
198
199
200
def values(self):
    """Iterate over values via async read wrapper."""
    return self.read_wrapper(self.__data_white_board.sync_white_board.values)

wlock() async

Acquire both async and sync write locks.

Source code in shutils/dag/data_white_board.py
217
218
219
220
221
222
@asynccontextmanager
async def wlock(self):
    """Acquire both async and sync write locks."""
    async with self.__data_white_board._async_lock.write():
        with self.__data_white_board._sync_lock.write():
            yield

write_wrapper(func, *args, **kwargs) async

Execute a function under an async write lock.

Source code in shutils/dag/data_white_board.py
145
146
147
148
async def write_wrapper[T](self, func: Callable[..., T], *args, **kwargs):
    """Execute a function under an async write lock."""
    async with self.__data_white_board._async_lock.write():
        return func(*args, **kwargs)

DataWhiteBoardMixin

Mixin providing a thread-safe and async-safe key-value data store.

Source code in shutils/dag/data_white_board.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class DataWhiteBoardMixin:
    """Mixin providing a thread-safe and async-safe key-value data store."""

    def __init__(self):
        """Initialize the whiteboard with sync and async locks and an empty data dict."""
        self._sync_lock = RWLock()
        self._async_lock = AsyncRWLock()
        self._data = {}
        self._sync_white_board = None
        self._async_white_board = None

    @property
    def sync_white_board(self):
        """Lazy accessor for the sync data whiteboard."""
        if self._sync_white_board is None:
            self._sync_white_board = SyncDataWhiteBoard(self)
        return self._sync_white_board

    @property
    def async_white_board(self):
        """Lazy accessor for the async data whiteboard."""
        if self._async_white_board is None:
            self._async_white_board = AsyncDataWhiteBoard(self)
        return self._async_white_board

    def __repr__(self):
        return f"DataWhiteBoard({self._data.keys()})"

async_white_board property

Lazy accessor for the async data whiteboard.

sync_white_board property

Lazy accessor for the sync data whiteboard.

__init__()

Initialize the whiteboard with sync and async locks and an empty data dict.

Source code in shutils/dag/data_white_board.py
23
24
25
26
27
28
29
def __init__(self):
    """Initialize the whiteboard with sync and async locks and an empty data dict."""
    self._sync_lock = RWLock()
    self._async_lock = AsyncRWLock()
    self._data = {}
    self._sync_white_board = None
    self._async_white_board = None

SyncDataWhiteBoard

Synchronous thread-safe interface for the data whiteboard.

Source code in shutils/dag/data_white_board.py
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
class SyncDataWhiteBoard:
    """Synchronous thread-safe interface for the data whiteboard."""

    def __init__(self, data_white_board: DataWhiteBoardMixin):
        """Initialize with the underlying whiteboard mixin.

        Args:
            data_white_board: The mixin instance to wrap.
        """
        self.__data_white_board = data_white_board

    def __setitem__(self, key, value):
        with self.__data_white_board._sync_lock.write():
            self.__data_white_board._data[key] = value

    def __getitem__(self, key):
        with self.__data_white_board._sync_lock.read():
            return self.__data_white_board._data[key]

    def __contains__(self, key):
        with self.__data_white_board._sync_lock.read():
            return key in self.__data_white_board._data

    def __len__(self):
        with self.__data_white_board._sync_lock.read():
            return len(self.__data_white_board._data)

    def __iter__(self):
        with self.__data_white_board._sync_lock.read():
            yield from iter(self.__data_white_board._data)

    def __bool__(self):
        with self.__data_white_board._sync_lock.read():
            return bool(self.__data_white_board._data)

    def __delitem__(self, key):
        with self.__data_white_board._sync_lock.write():
            del self.__data_white_board._data[key]

    def set_data(self, **kwargs):
        """Set multiple key-value pairs at once."""
        with self.__data_white_board._sync_lock.write():
            self.__data_white_board._data.update(kwargs)

    def keys(self):
        """Iterate over data keys under a read lock."""
        with self.__data_white_board._sync_lock.read():
            yield from self.__data_white_board._data.keys()

    def values(self):
        """Iterate over data values under a read lock."""
        with self.__data_white_board._sync_lock.read():
            yield from self.__data_white_board._data.values()

    def items(self):
        """Iterate over data items under a read lock."""
        with self.__data_white_board._sync_lock.read():
            yield from self.__data_white_board._data.items()

    def get(self, key, default=None) -> Any:
        """Get a value by key with an optional default."""
        with self.__data_white_board._sync_lock.read():
            return self.__data_white_board._data.get(key, default)

    def rlock(self):
        """Acquire a sync read lock context manager."""
        return self.__data_white_board._sync_lock.read()

    def wlock(self):
        """Acquire a sync write lock context manager."""
        return self.__data_white_board._sync_lock.write()

    def copy(self, new_white_board: "DataWhiteBoardMixin", deep_copy: bool = False):
        """Copy data to another whiteboard, optionally deep-copying.

        Args:
            new_white_board: The target whiteboard mixin.
            deep_copy: Whether to deep-copy data instead of sharing references.
        """
        with self.__data_white_board._sync_lock.read():
            if deep_copy:
                new_white_board._data = copy.deepcopy(self.__data_white_board._data)
            else:
                new_white_board._sync_lock = self.__data_white_board._sync_lock
                new_white_board._async_lock = self.__data_white_board._async_lock
                new_white_board._data = self.__data_white_board._data

__init__(data_white_board)

Initialize with the underlying whiteboard mixin.

Parameters:

Name Type Description Default
data_white_board DataWhiteBoardMixin

The mixin instance to wrap.

required
Source code in shutils/dag/data_white_board.py
52
53
54
55
56
57
58
def __init__(self, data_white_board: DataWhiteBoardMixin):
    """Initialize with the underlying whiteboard mixin.

    Args:
        data_white_board: The mixin instance to wrap.
    """
    self.__data_white_board = data_white_board

copy(new_white_board, deep_copy=False)

Copy data to another whiteboard, optionally deep-copying.

Parameters:

Name Type Description Default
new_white_board DataWhiteBoardMixin

The target whiteboard mixin.

required
deep_copy bool

Whether to deep-copy data instead of sharing references.

False
Source code in shutils/dag/data_white_board.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def copy(self, new_white_board: "DataWhiteBoardMixin", deep_copy: bool = False):
    """Copy data to another whiteboard, optionally deep-copying.

    Args:
        new_white_board: The target whiteboard mixin.
        deep_copy: Whether to deep-copy data instead of sharing references.
    """
    with self.__data_white_board._sync_lock.read():
        if deep_copy:
            new_white_board._data = copy.deepcopy(self.__data_white_board._data)
        else:
            new_white_board._sync_lock = self.__data_white_board._sync_lock
            new_white_board._async_lock = self.__data_white_board._async_lock
            new_white_board._data = self.__data_white_board._data

get(key, default=None)

Get a value by key with an optional default.

Source code in shutils/dag/data_white_board.py
108
109
110
111
def get(self, key, default=None) -> Any:
    """Get a value by key with an optional default."""
    with self.__data_white_board._sync_lock.read():
        return self.__data_white_board._data.get(key, default)

items()

Iterate over data items under a read lock.

Source code in shutils/dag/data_white_board.py
103
104
105
106
def items(self):
    """Iterate over data items under a read lock."""
    with self.__data_white_board._sync_lock.read():
        yield from self.__data_white_board._data.items()

keys()

Iterate over data keys under a read lock.

Source code in shutils/dag/data_white_board.py
93
94
95
96
def keys(self):
    """Iterate over data keys under a read lock."""
    with self.__data_white_board._sync_lock.read():
        yield from self.__data_white_board._data.keys()

rlock()

Acquire a sync read lock context manager.

Source code in shutils/dag/data_white_board.py
113
114
115
def rlock(self):
    """Acquire a sync read lock context manager."""
    return self.__data_white_board._sync_lock.read()

set_data(**kwargs)

Set multiple key-value pairs at once.

Source code in shutils/dag/data_white_board.py
88
89
90
91
def set_data(self, **kwargs):
    """Set multiple key-value pairs at once."""
    with self.__data_white_board._sync_lock.write():
        self.__data_white_board._data.update(kwargs)

values()

Iterate over data values under a read lock.

Source code in shutils/dag/data_white_board.py
 98
 99
100
101
def values(self):
    """Iterate over data values under a read lock."""
    with self.__data_white_board._sync_lock.read():
        yield from self.__data_white_board._data.values()

wlock()

Acquire a sync write lock context manager.

Source code in shutils/dag/data_white_board.py
117
118
119
def wlock(self):
    """Acquire a sync write lock context manager."""
    return self.__data_white_board._sync_lock.write()