cylc / cylc-flow

Cylc: a workflow engine for cycling systems.
https://cylc.github.io
GNU General Public License v3.0
329 stars 93 forks source link

xtriggers: re-implement as async functions #3497

Open oliver-sanders opened 4 years ago

oliver-sanders commented 4 years ago

Supersedes the same idea from #2917

The Problem:

At the moment XTriggers are run by the subprocess pool, consequences:

Botching Bodging the XTrigger loading mechanism for #3465 seems nasty.

Close #2917

The Proposal:

Convert XTriggers to asynchronous functions and call them directly from the main loop piggy-backing on the main loop plugin functionality introduced in https://github.com/cylc/cylc-flow/pull/3492.

This will turn XTriggers into a blocking stage of the main loop (where they would effectively be running in an unlimited thread pool) something like this:

async def main_loop(self):
    # ...
    await asyncio.gather(*[
        trigger(args)
        for trigger in xtriggers
    ])
    # ...
    await asyncio.sleep()

Consequences:

Bonus Marks:

If we move to asynchronous XTriggers we gain the ability to have long-running XTrigger functions with minimal overheads.

This means we can very easily and efficiently implement a push interface for XTriggers (in addition to the pre existing pill interface):

async def push(*args):
    socket = ...
    await msg = socket.recv(). # could take hours, doesn’t matter
    return json.loads(msg)

Super Bonus Marks:

Another long-lived interface which would also be fairly straight forward to implement is ‘yield’ I.e a single long-lived asynchronous function which yields cycle points and data as and when it becomes available:

async def yields(*args)
    socket = ...
    while True:
        # listen for messages
        msg = await socket.recv()
        data = json.loads(msg)
        # yield XTrigger values to Cylc Flow when they arrive
        yield (
            data[‘time’], # cycle point
            data
        )

A nicer solution for Kafka / message brokers?

Hyper bonus marks:

Once you’ve made it as far as a yield interface you have effectively achieved the cycle driver interface I’ve been harking on about for the last two years. Use coroutines to provide a cycling interface rather than botching bodging external triggers into a cycling regime which doesn’t fit, they can become a first class cycling object in their own right:

async def drives(*args)
    while True:
        await data = event()
        # kicks off a new cycle point at time()
        # this provides a solution to observation type workflows where data doesn’t arrive on regular sequences
        yield (time(), data)

This is a little more involved, requiring a major abstraction of the cycling interface and classes and beyond the scope of Cylc8 but worth keeping in mind so as to keep doors open.

Questions:

The backbone of the work has already been done with main loop plugins (XTriggers are just a special case) but are there any hitches @cylc/core ?

Pull requests welcome!

hjoliver commented 4 years ago

Supersedes #2917 !

oliver-sanders commented 4 years ago

Didn't spot that!

dwsutherland commented 4 years ago

I think this is blocking (hence why WFS => UIS is run in the thread pool executor):

async def push(*args):
    socket = ...
    await msg = socket.recv(). # could take hours, doesn’t matter
    return json.loads(msg)

However, there's an option to make it socket.recv() non-blocking (then you can loop over it like a queue).. And I hadn't tried asyncio.gathers(iterable_of_asyncfuncs) because the the iterable_of_asyncfuncs changed size for the UIS (amongst other things).

I may be wrong (?)..

oliver-sanders commented 4 years ago

There'll be a way of making it non-blocking, it's an untested illustrative example.

[edit] I think this example is non-blocking providing your socket implementation doesn't have additional blocking parts. But as an aside putting the xtrigger runner into another process wouldn't be a bad shout to reduce the potential for xtriggers blocking the Scheduler's main loop similar to how they can currently block the subprocpool's main loop by saturating the pool.

hjoliver commented 3 years ago

Comment from a NIWA colleague:

3497: looks good, especially if taken as far as the yield interface which would allow cylc external triggering via something like rabitmq etc which would allow multiple cylc suites to collaborate using a pub/sub pattern rather than the current inter suite polling

oliver-sanders commented 3 years ago

Discussed another potential use case here recently where an integer cycling workflow is driven by an external system.

oliver-sanders commented 1 year ago

Note, currently the only way to achieve external event driven cycling in Cylc is using (deprecated?) extriggers (not to be mistaken for xtriggers).

# An example of a CI-type workflow which kicks off a new cycle of tasks every time    
# an external-trigger is received    

# Start the workflow as normal:    
# $ cylc vip <path/to/this/workflow>    

# Then kick off a cycle specifying any desired environment variables e.g:    
# $ ./bin/trigger <workflow-id> WORLD=earth    

[scheduling]    
    cycling mode = integer    
    initial cycle point = 1    
    runahead limit = P5  # max number of cycles which can run in parallel    
    [[special tasks]]    
        # register the external trigger and tell it which task to run    
        external-trigger = build("build")                                                                                          
    [[graph]]    
        P1 = """    
            build => a => b => c    
        """    

[runtime]    
    [[build, a, b, c]]    
        script = """    
            echo "Hello $CYLC_TASK_NAME!"    
        """    
oliver-sanders commented 4 months ago

An idea I've been mulling for a while, for discussion when this issue is picked up...

Implement these async xtriggers as tasks, not xtriggers, i.e:

[runtime]
  [[@file_watcher]]  # @ prefix not necessary
    run mode = xtrigger  # using Tim's lovely new "run mode" feature developed for skip mode
    [[[xtrigger]]]
      function = cylc.site_extensions.ops_file_watcher
      # xtrigger args
      cycle = ops/global//<cycle>
      mode = operational
    [[[environment]]]
      ANSWER = 42
    [[[outputs]]]
      file1 = file1
      file2 = file2
      file3 = file3

Why:

What this doesn't solve (passively) is event-driven cycling (see 1, 2). This is a problem async xtriggers would be perfectly capable of solving which is currently only possible via ext-triggers. A solution which leaves this door open would be highly desirable:

[scheduling]
  [[graph]]
    @file_watcher = """
      foo => bar => baz
    """

But I haven't got any good suggestions for that ATM.