API Reference: delayed

@grain.delayed.delayed(fn=None, nout=None, copy_on_setitem=True, cache_hfn=None)

Wraps an async function into a delayed function (DelayedFn). It can be used as a decorator, or around the function directly (i.e. fn = delayed(fn)).

Parameters:
  • nout (int) – If set, the return value of the function is assumed to be a iterable with length nout, so that indexing and unpacking on the delayed object is allowed.

  • copy_on_setitem (bool) –

    The default (True) is a safe option to make sure previously unpacked values from a delayed objects are not affected by a later setitem op on that object. This is achieved by making a copy of the object at result evaluation, so this can be turned off for performance. Unlike other operations that can return a separate delayed object, setitem (e.g. a[1] = 1) is an inplace mutating operatation. The following snippet illustrates a situation where a copy is needed:

    r_ = afn() # assume to be a delayed function returns `[3,4]`
    a_, b_ = r_
    r_[1] = 9
    assert (await r_ == [3,9])
    assert (await a_ == 3) and (await b_ == 4) # copy_on_setitem=True
    #assert (await a_ == 3) and (await b_ == 9) # copy_on_setitem=False
    

class grain.delayed.DelayedFn

A DelayedFn submits the function it wraps and returns a Delayed object when called. Resources could be bound to the function with @ operator.

Do not construct this directly, use the @delayed decorator instead.

class grain.delayed.Delayed

A Delayed object represents a value to be computed by Grain.

Do not construct this directly, this is intended to be the return value of a DelayedFn. Alternatively, use delayedval() for wrapping a value into a Delayed object.

A Delayed supports most python operations, each of which creates another Delayed representing the result:

  • Most operators (*, -, += (treated as +), …)

  • Item iteration, indexing, and slicing (a[0])

  • Item mutation __setitem__ (a[0] = 1)

  • Attribute access (a.size)

  • Method calls (a.index(0))

  • Most NumPy functions (np.mean(a))

Operations that aren’t supported include:

  • Attr mutation __setattr__ (a.foo = 1)

  • Use as a predicate (if a: ...)

This is mostly a copy of the implementaton in Dask, but there are several main differences from Dask:

  1. Grain’s Delayed assumes all ops are cheap and perform them locally.

  2. As all delayed objects are reduced locally, and Grain worker does not cache results, it is not allowed to pass delayed objects to a delayed function; we want to be explicit on the intention of dependent/serial jobs. e.g.:

    r1 = await dfn1()
    r2 = await dfn2(r1)
    
  3. Calling a delayed function submit the calculation immediately.

  4. A Delayed object is somehow mutable, setitem is allowed but setattr is not (implemented).

Note

Some words on structured concurrency: As you may have notice, it is possible to pass delayed objects around (e.g. to other functions), but in most of the case it makes more sense to evaluate/”await” it in the same function it is created. Concurrent programming typically involves “spawn” (e.g. calling a delayed function) and “join” (e.g. await) actions. If two things happen in different scopes, it is hard to track the status of the the coroutines. Thus come the idea of structured concurrency, which enforces the “spawn” and “join” to be fully resolved within a block or scope. Stuctured concurrency is the design principle behind Grain’s previous frontend combine and Grain’s underlying async library Trio. Trio’s author has written a more thorough discussion on structured concurrency.

Note

Since the submission / enqueuing of a delayed function is synchronous, while function dispatching behind the scene is asynchronous (because it involves network or subprocess IO), going through too much submission without a checkpoint might overload the queue. Most of the await are a checkpoint; you can always use trio.sleep(0) as a trivial checkpoint. For more information read Trio’s documentation on checkpoints. Imposing a rate limit is another way to include checkpoints; see grain.util.QueueLimiter()

await grain.delayed.each(*dos)

A convinient helper function to await on a list of delayed objects.

Parameters:

*dos (*Delayed or [Iterable[Delayed],]) – list of delayed objects

Returns:

list of corresponding results

grain.delayed.delayedval(v, length=None, copy_on_setitem=True)

A convinient helper function that wraps any value into a delayed object that immediately return that value at evaluation. This is useful for testing and for the following edge case in NumPy:

r_ = afn() # `afn` is a delayed function returning a len-2 array

a = np.arange(3)
#a[[0,1]] += r_ # ValueError; __setitem__ cannot turn `a` into a delayed object

a = delayedval(np.arange(3))
a[[0,1]] += r_ # Ok
Parameters:
  • v (Any) – value to be wrapped

  • length (int) – see nout of delayed()

  • copy_on_setitem (bool) – see copy_on_setitem of delayed()

Returns:

a Delayed object evaluted to v

grain.delayed.run(subtasks, *args, **kwargs)

Delayed’s main entry point. Start your “root” async function(s) here. Except for the first two args, all the other args are optional and are passed to grain.remote_exer.RemoteExecutor (or to grain.head.GrainExecutor if local is set or Gnaw is disabled in the config). Common options are listed below, for more options (usually for internal debug), see the executors.

Parameters:
  • subtasks – Root async function(s) that spawns all the other calculations. This could be an async function or an iterable of async functions. If an iterable is passed, all functions are run concurrently.

  • local (Optional[Resource]) – Resource for local worker. Default to None (Gnaw executor has no local worker). If set, the built-in grain.head.GrainExecutor will be used.

  • config_file (str | Literal[False] | None) – Grain’s config file name. If not set or None, Grain will use the name provided by envar GRAIN_CONFIG, and finally fallback to name grain.toml. If set to False, Grain will use the default profile (see config.py).

  • name (str) – For Gnaw executor only. Name to be recognized in the Gnaw’s log

  • prioritized (bool) – Default to False. For Gnaw executor only. If true, submit tasks to the prioritized queue.