API Reference: delayed
- @grain.delayed.delayed(fn=None, nout=None, copy_on_setitem=True, cache_hfn=None)
Wraps an async function into a delayed function (
DelayedFn). It can be used as a decorator, or around the function directly (i.e.fn = delayed(fn)).- Parameters:
nout (int) – If set, the return value of the function is assumed to be a iterable with length
nout, so that indexing and unpacking on the delayed object is allowed.copy_on_setitem (bool) –
The default (True) is a safe option to make sure previously unpacked values from a delayed objects are not affected by a later setitem op on that object. This is achieved by making a copy of the object at result evaluation, so this can be turned off for performance. Unlike other operations that can return a separate delayed object,
setitem(e.g.a[1] = 1) is an inplace mutating operatation. The following snippet illustrates a situation where a copy is needed:r_ = afn() # assume to be a delayed function returns `[3,4]` a_, b_ = r_ r_[1] = 9 assert (await r_ == [3,9]) assert (await a_ == 3) and (await b_ == 4) # copy_on_setitem=True #assert (await a_ == 3) and (await b_ == 9) # copy_on_setitem=False
- class grain.delayed.DelayedFn
A DelayedFn submits the function it wraps and returns a
Delayedobject when called. Resources could be bound to the function with@operator.Do not construct this directly, use the
@delayeddecorator instead.
- class grain.delayed.Delayed
A Delayed object represents a value to be computed by Grain.
Do not construct this directly, this is intended to be the return value of a
DelayedFn. Alternatively, usedelayedval()for wrapping a value into a Delayed object.A
Delayedsupports most python operations, each of which creates anotherDelayedrepresenting the result:Most operators (
*,-,+=(treated as+), …)Item iteration, indexing, and slicing (
a[0])Item mutation __setitem__ (
a[0] = 1)Attribute access (
a.size)Method calls (
a.index(0))Most NumPy functions (
np.mean(a))
Operations that aren’t supported include:
Attr mutation __setattr__ (
a.foo = 1)Use as a predicate (
if a: ...)
This is mostly a copy of the implementaton in Dask, but there are several main differences from Dask:
Grain’s Delayed assumes all ops are cheap and perform them locally.
As all delayed objects are reduced locally, and Grain worker does not cache results, it is not allowed to pass delayed objects to a delayed function; we want to be explicit on the intention of dependent/serial jobs. e.g.:
r1 = await dfn1() r2 = await dfn2(r1)
Calling a delayed function submit the calculation immediately.
A Delayed object is somehow mutable,
setitemis allowed butsetattris not (implemented).
Note
Some words on structured concurrency: As you may have notice, it is
possible to pass delayed objects around (e.g. to other functions), but in most
of the case it makes more sense to evaluate/”await” it in the same function it
is created. Concurrent programming typically involves “spawn” (e.g. calling a
delayed function) and “join” (e.g. await) actions. If two things happen in
different scopes, it is hard to track the status of the the coroutines. Thus come
the idea of structured concurrency, which enforces the “spawn” and “join” to be
fully resolved within a block or scope. Stuctured concurrency is the design
principle behind Grain’s previous frontend combine and Grain’s underlying
async library Trio. Trio’s author has written a more thorough discussion
on structured concurrency.
Note
Since the submission / enqueuing of a delayed function is synchronous,
while function dispatching behind the scene is asynchronous (because it involves
network or subprocess IO), going through too much submission without a checkpoint
might overload the queue. Most of the await are a checkpoint; you can always
use trio.sleep(0) as a trivial checkpoint. For more information read Trio’s
documentation on checkpoints. Imposing a rate limit is another way to include
checkpoints; see grain.util.QueueLimiter()
- await grain.delayed.each(*dos)
A convinient helper function to await on a list of delayed objects.
- Parameters:
*dos (*Delayed or [Iterable[Delayed],]) – list of delayed objects
- Returns:
list of corresponding results
- grain.delayed.delayedval(v, length=None, copy_on_setitem=True)
A convinient helper function that wraps any value into a delayed object that immediately return that value at evaluation. This is useful for testing and for the following edge case in NumPy:
r_ = afn() # `afn` is a delayed function returning a len-2 array a = np.arange(3) #a[[0,1]] += r_ # ValueError; __setitem__ cannot turn `a` into a delayed object a = delayedval(np.arange(3)) a[[0,1]] += r_ # Ok
- grain.delayed.run(subtasks, *args, **kwargs)
Delayed’s main entry point. Start your “root” async function(s) here. Except for the first two args, all the other args are optional and are passed to
grain.remote_exer.RemoteExecutor(or tograin.head.GrainExecutoriflocalis set or Gnaw is disabled in the config). Common options are listed below, for more options (usually for internal debug), see the executors.- Parameters:
subtasks – Root async function(s) that spawns all the other calculations. This could be an async function or an iterable of async functions. If an iterable is passed, all functions are run concurrently.
local (Optional[Resource]) – Resource for local worker. Default to None (Gnaw executor has no local worker). If set, the built-in
grain.head.GrainExecutorwill be used.config_file (str | Literal[False] | None) – Grain’s config file name. If not set or None, Grain will use the name provided by envar
GRAIN_CONFIG, and finally fallback to namegrain.toml. If set to False, Grain will use the default profile (seeconfig.py).name (str) – For Gnaw executor only. Name to be recognized in the Gnaw’s log
prioritized (bool) – Default to False. For Gnaw executor only. If true, submit tasks to the prioritized queue.