dabeaz / curio

Good Curio!
Other
4.04k stars 243 forks source link

Async state preserving reentrant Kernel #172

Closed imrn closed 4 years ago

imrn commented 7 years ago

Currently, Kernel.run() blocks the thread until it's done with the tasks at the hand. If curio is embedded in an other application you should either:

Here we'll discuss about providing semantics for:

You can also find previous discussion at #111.

dabeaz commented 7 years ago

Right now, the behavior of curio is for the Kernel.run() method to execute until all non-daemonic tasks finish executing. When run() returns, the daemonic tasks continue to persist. So, on a subsequent call to run(), they will still be there. Thus, one approach that's possible now is to write code like this::

from curio import *
from curio.socket import *

async def echo_server(address):
    sock = socket(AF_INET, SOCK_STREAM)
    sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, True)
    sock.bind(address)
    sock.listen(5)
    while True:
        client, addr = await sock.accept()
        await spawn(echo_client(client, addr), daemon=True)

async def echo_client(sock, address):
    print('Connection from', address)
    async with sock:
        while True:
            data = await sock.recv(100000)
            if not data:
                break
            await sock.sendall(b'Got:'+data)

async def start():
    await spawn(echo_server(('', 25000)), daemon=True)

async def pump():
    pass

if __name__ == '__main__':
    kernel = Kernel()
    with kernel:
        kernel.run(start())
        while True:
            kernel.run(pump())

In this code, the run() method is just repeatedly given a coroutine that does nothing. It could do more than that, but at a minimum it causes the kernel to run a complete scheduling cycle.

There are all sorts of various weird things about this. For example, it's critical that every created task be marked as daemonic. Certain things like sleeping probably don't work right (would likely sleep in the kernel for the full duration of the requested sleep time). Nevertheless, it still sort of works.

So, one possible approach on pause/restarting would be to refine this a bit. For example, perhaps the run() method is modified so that it returns as soon as its passed coroutine terminates (regardless of any other tasks that might be running). Maybe run() called with no arguments causes Curio to go through a single scheduling cycle. Perhaps I get rid of daemonic tasks altogether under this scheme (might be worth it).

The big idea though is that if you wanted the kernel to pause, you'd basically set up your tasks first in some kind of starting coroutine. After that, you'd repeatedly call Kernel.run() to drive things (either supplying a short-running coroutine or None).

Some benefits: The semantics of run() don't really have to change--it still runs the supplied coroutine and returns its value. I don't need to add a new trap for pausing.

Cons: Maybe it's weird having to repeatedly submit a short-running coroutine like that. It might not provide the level of control needed if you needed to pause at a specific point (although you could certainly coordinate it with various synchronization primitives).

imrn commented 7 years ago

Did you noticed this can scale linearly from just "1 cycle" to "full run" depending on the developer's ingenuity on setting up his application structure?

Coro submitted to run is infact the "pause condition watchdog" which can be a no-op as above or a timer or anything watching the app provided that related sync primitives are laid before hand. Perfect..

And just to make sure, lets confirm: For a no-op coro case, "1 complete scheduling cycle" means Kernel.run() will just deal with ready tasks and exit. That is, it will NOT block. Right?

It's fine run() with no args stepping once. Supplying a dummy coro does not look good. May be run() can supply it as a default. Eliminating daemons is also fine. It seems all good to a me. Thanks for defining it fairly crafty. :)

dabeaz commented 7 years ago

For the "no-op" case, Kernel.run() would poll for I/O events, check for sleeping tasks, and run a single scheduling cycle of anything that happened to wake up as a result. I would envision it as NOT blocking under any circumstance (a slight tweak would need to be made to I/O polling so that it uses a 0-timeout value).

I'm experimenting with an implementation of this at the moment. Not ready to check in yet though.

dabeaz commented 7 years ago

I have committed a highly experimental implementation of this. Here is an example on which to test:

from curio import *
from curio.socket import *

async def echo_client(sock, address):
    print('Connection from', address)
    async with sock:
        while True:
            data = await sock.recv(100000)
            if not data:
                break
            await sock.sendall(b'Got:'+data)

async def sleeper():
    while True:
        await sleep(1)
        print('sleeping', n)

async def start():
    await spawn(tcp_server('', 25000, echo_client), daemon=True)
    await spawn(sleeper(), daemon=True)

n = 0
def main():
    global n
    kernel = Kernel()
    with kernel:
        kernel.run_coro(start())
        while True:
            kernel.run_coro()
            n += 1

if __name__ == '__main__':
    main()

If you run this code, you should see a rapidly increasing counter value being printed every second. It should also allow incoming network connections. The counter value should continue to increase rapidly even when connections are made. The main() function is driving the kernel by repeatedly calling run_coro().

A few critical notes:

Consider this a starting point. Would be interested to have people play around with it.

imrn commented 7 years ago

I've replaced run() by pasting above run_coro loop nearly as is. Running inhouse http1.x/2.0 framework with many of the curio goodies: Handlers having many timeouts, spawns, locks, events, queues, processes (with hefty ones), etc, etc.. So far without any glitches. Impressed.. :)

BUT with some %100 cpu core utilization. Infact this is expected for a non-blocking call in an infinite loop. May be an option for blocking/non-blocking call is needed. Lets think about it...

imrn commented 7 years ago

May be an option for blocking/non-blocking call is needed. Lets think about it...

May be not. It may be developer's responsibility to provide relevant coro for blocking. What do you think?

imrn commented 7 years ago

Or there may be an option for one shot blocking call.

run(blocking=oneshot) may work like this: if there are ready tasks run them and exit. if not, wait for the first one, process it and exit.

Any other suggestions for possible operation modes/params?

dabeaz commented 7 years ago

Would need to think about the oneshot option. One potential problem with I/O is that Curio could just sit there forever waiting for something to happen. That's probably not what you want if you're interfacing Curio with something else.

I could imagine a timeout option that got applied to the I/O selector perhaps. That might be more sane.

dabeaz commented 7 years ago

I've added an experimental timeout option to the run_coro() method. For example:

kernel.run_coro(timeout=0.1)

That specifies the maximum duration of I/O polling in the absence of any other work to do.

imrn commented 7 years ago

Would need to think about the oneshot option. One potential problem with I/O is that Curio could just sit there forever waiting for something to happen. That's probably not what you want if you're interfacing Curio with something else.

Well, once we step outside of the ideal world of curio, we are faced with the very classical problem of "to block, or not-to block, how to block, when to block, ...block ...block...".

Curio is the final result of the collective iterations of searching a solution to this. Look at those huge # of projects of code trying to find the right mix. Yes, the place curio is standing is the result of too much sweat and blood.

Infact, curio too blocks at run() 'hard' so you don't have to worry about it again! Yet, we are here at this issue to find some reasonable balance.

Anyway, that said, there are too many ways for dealing the blocking problem. But I think we should NOT. We can always say "Go and get your fancy main coordinator coroutine". If it blocks unfavorably then developer should deal with it in better ways.

Saying this is probably easy, because the developer who leaves child coros behind is either knows what he is doing or doesn't know what he is doing. Either way he deserves the same answer. ;)

And blocking is a hard problem and it can lead to "Eager Optimization can cause starvation" type of discussions. People will come and want their blocking problems be "solved"... Be warned... ;)

And at the last resort thread are always there.

I think we should focus on providing really needed options for cases that can NOT be covered by the coordinator coro.

Lets enumerate really necessary cases to cover and decide for the final parameters for run().

We probably dont want to contribute to the electricity bills of people running curio main run loop having certain "if conditions" evaluating to false for 99.99999% of the time. ;)

dabeaz commented 7 years ago

I modified the run_coro() method so that run_coro(timeout=None) causes Curio to block as it normally would waiting for I/O or a timeout. This is basically the "oneshot" behavior.

imrn commented 7 years ago

Still runs at %100 cpu.

Infact, it starts with <1%. But after some activity it shots to %100 and never comes back.

Commented code is the original. Runs at around <1%.

async def run_co(self, servers):
    await spawn(self.runservers(servers))

def run(self, servers):
    # run(self.runservers(servers))
    kernel = Kernel()
    with kernel:
        kernel.run_coro(self.run_co(servers))
        while True:
            kernel.run_coro(timeout=None)
dabeaz commented 7 years ago

Hmmm. I'm just not seeing that behavior on my end. What operating system is this on?

imrn commented 7 years ago

64-bit Arch Linux. I can debug using Pycharm. Where/What should I look for?

imrn commented 7 years ago

Added the helper function I use. See above..

dabeaz commented 7 years ago

Not seeing 100% CPU on Linux either. So, not what to advise. Probably need more info.

imrn commented 7 years ago

Ok preparing a lean setup so that current bells and whistes will not interfere. I'll report back.

imrn commented 7 years ago

Your above example runs fine. Redirecting my browser to 127.0.0.1:25000 gets the echo. No jump to 100%. Good.

Probably I should find the thing in the framework which spins the loop fast. Interestingly there is no anomally for normal run(). Any possible guesses? I'm on python 3.6.

imrn commented 7 years ago

Got something. While loop is innocent. Infact it never runs!

kernel.run_coro(self.run_co(servers)) never returns.

That's where 100% cpu occurs.

class Tcpd:

    @staticmethod
    async def runservers(servers):
        for s in servers:
            await spawn(tcp_server(s.ip, s.port, s.handler)) # , daemon=True. no effect.

    async def run_co(self, servers):
        await spawn(self.runservers(servers), daemon=True)

    def run(self, servers):
        # run(self.runservers(servers))
        kernel = Kernel()
        with kernel:
            kernel.run_coro(self.run_co(servers))  # <- Never returns.. 100% is here
            while True:
                kernel.run_coro(timeout=None)
imrn commented 7 years ago

What's more interesting is that all servers are operational. Applications run without any problem.

dabeaz commented 7 years ago

Oh, one thing I noticed---in order for run_coro() to return, all of the background tasks have to be daemonic. So, make sure you use spawn(self.runservers(servers), daemon=True). As for why it would be sitting there with 100% CPU, that is an interesting mystery. I can't imagine what it would be doing there.

As a note: I think there was a bug on OS-X that caused 100% CPU load under certain conditions (maybe if nothing was waiting for I/O). Pretty sure that got fixed. Maybe it's something similar here though.

imrn commented 7 years ago

All good now and at 0.1% load. May be that's why we should get rid of daemons.. ;)

Other finding is this: run_co -> spawns children tcp_servers

Only making run_co daemonic solves it all. Making other children daemonic or not has no effect. See above.. Does this mean anything?

dabeaz commented 7 years ago

Daemon status is inherited when running under run_coro(). So all of those children should be daemonic even though it wasn't specified explicitly.

imrn commented 7 years ago

I only copied-pasted the loop, didn't notice the daemon thing. So the result is the last 10 messages. Sorry for that.

Well, I guess the concept for the new run() is in place. As I'm testing, the full framework is up and running. It has most of the curio goodies. May be we can go forward polishing it for the release without daemons.

What do you think?

imrn commented 7 years ago

Perhaps we can go over the run() interface. I'll suggest a convention similar to select. That is:

1) Non-blocking: run(coro, timeout=0) -> select(..., 0). Only process ready tasks. Exit. 2) Blocking (OneShot): run(coro, timeout=None) -> select(..., None). Like 1. If none ready block until next. Process it. Exit. 3) Blocking (Hard): current run() 4) Blocking (Timeout): run(coro, timeout=5) -> select(..., 5)

An interesting result of this listing is that I think we are heading to a "Future Kind of a Return Value" from run(). May be the task itself.

That is; run() is a way of "Injecting Tasks" into the curio environment. And the caller of the run() will take actions according to the "Readiness of the Futures/Tasks".

Now lets think about this interface. After having a reasonable consensus on this, I think, we can begin to provide details for 1,2,3,4,any_other?. I think they are currently in a rough form.

imrn commented 7 years ago

One other thing is that, if we're going through such a new paradigm of running tasks and returning values then it will be substantially different than current run().

Obviously we will not want to disturb current run() users. And we don't want to be slowed by current structure and compatibility issues.

As this is going to be curio's new gateway to the sync world, probably it is best to have current run() and the new thing as seperate interfaces.

David; you've already forked the run(). I'm also dropping "run()" on the subject line. This thing will probably stay as experimental and evolving for a while. Interesting days ahead. ;)

dabeaz commented 7 years ago

I don't see the current changes as anything other than very minor improvements to the current run() method. run() already preserved the state of any existing daemon tasks. I've merely tweaked it so that calling it with no coroutine makes it go through a single scheduling cycle. The optional timeout argument allows you to specify/override the maximum waiting time on the internal select() call. The experimental run_coro() currently passes all unit tests for the normal run() method.

The only additional change I'm thinking about is the run(coro, timeout=seconds) option. That's an interesting option. My inclination on that is to make it equivalent to run(timeout_after(seconds, coro)).

imrn commented 7 years ago

Let me give an example for curio tasks/futures. They are effectively the await/spawn for the sync world. Powered by Curio..

async def someasync():
   await sleep(5)
   return 1

# This is sync spawn
task1 = kernel.run(someasync(), timeout=0)  # Non-blocking
task2 = kernel.run(someasync(), timeout=0)  # Non-blocking
kernel.run() # Blocks until all done
res1 = task1.result
res2 = task2.result

# This is sync await
task3 = kernel.run(someasync())  # Blocks until all done
res3 = task3.result

task4 = kernel.run(someasync(), timeout=0)  # Non-Blocking
task5 = kernel.run(someasync(), timeout=0)  # Non-Blocking

# This is curio driven by parent app
while not task4.terminated:
    kernel.run(timeout=None) # Oneshot
res4 = task4.result
imrn commented 7 years ago

Returning tasks from run() may disturb current structure. If it can be handled some way I'm ok with a single code base.

dabeaz commented 7 years ago

I really don't want to go in the direction of futures for the run() method.

imrn commented 7 years ago

Well, I actually mean we can already envision tasks as futures. No new future thing. Just return tasks from run() and update the result when done. That's it. Did you see above example? I'm now updating it with some more.

dabeaz commented 7 years ago

My inclination is for Curio to stay focused on execution of coroutines . The run(coro) method currently does just that. There are many different ways by which a result could be computed and returned. For example, the above scenario is already supported in part now:

 kernel = Kernel()
 task1 = kernel.run_coro(spawn(someasync(), daemon=True)
 task2 = kernel.run_coro(spawn(someasync(), daemon=True)

 # Run until task 1 terminates
 result1 = kernel.run_coro(task1.join())

 # Poll until task 2 terminates
 while not task2.terminated:
       kernel.run_coro(timeout=None)
       print('Other stuff...')

 result2 = kernel.run_coro(task2.join())
imrn commented 7 years ago

My inclination is for Curio to stay focused on execution of coroutines.

Technically correct. However, such bold decleration keeps sync and async code as seperate islands. Below approach will make more easy and natural to mix them, while you are still staying on your focus.

The run(coro) method currently does just that.

Yeah, same thing and complex looking. An other humane version below. ;) It requires nothing more than below additions to Kernel and Task. (Plus other things for returning tasks from run() as I proposed in previous message.)

I'm not trying to hide asyncs. Infact, I want them with sync code everywhere. ;) Just having a curio kernel makes this possible.

# Look, No asyncs!

v = kernel.await(someasync())  # Blocks until ready
print(v)

f1 = kernel.spawn(someasync()) # Will not block
f2 = kernel.spawn(someasync())
f3 = kernel.spawn(someasync())
f4 = kernel.spawn(someasync())

flist = [f1, f2, f3, f4]
shuffle(flist)     # Will work everyway.

# Async iterator without any asyncs..
for f in flist:
    print(f.val())     # May or may not block for each. See below.

class Kernel:
    def spawn(self, coro):  # Rename as getfuture if you like
        task = self.run(coro, timeout=0)  # Non-Blocking
        task.kernel = self
        return task

    def await(self, coro):  # Yeah, I know await is a keyword..
        task = self.run(coro)  # Blocks until ready
        return task.result

class Task:
    def val(self):
        while not self.terminated:
            self.kernel.run(timeout=None)  # Oneshot
        return self.result

        # Above while may not even fire because some other
        # kernel iteration may already run this task to
        # completion.
dabeaz commented 7 years ago

The thing is, I want the worlds of async and sync to be separate islands in Curio. Backing that statement up for a moment....

... the earliest versions of Curio had a small collection of methods on the Kernel class including a spawn() method. However, in thinking about that, I became concerned about muddying the waters between operations performed on the Kernel by synchronous code and operations performed by asynchronous code. For example, is it legal to use Kernel.spawn() in both contexts? Can outside threads call Kernel.spawn()? I also looked at the asyncioEvent loop and it's large zoo of functions like run_forever(), run_until_complete(), call_soon(), create_task(), and whatnot. I consciously decided to take a different direction and make the invocation of Curio as dead simple as possible. run()--that is the one and only public API function for invoking Curio from synchronous code.

Even in the proposed code above, it gets muddy real fast. For example, consider that Task.val() method. Who can call that? Synchronous code outside of Curio? A separate thread outside of Curio? An asynchronous task running within Curio? Synchronous code called by an async task currently running in Curio? Is calling the kernel.run() method legal in all of these contexts? (most likely not).

At this point, I'm not sure I want to back out of that and start adding methods to the Kernel class. If someone wants to make the kernel launch a collection of background tasks, or have it return task objects, or fool around with Futures, I feel that it can all be done in the initial coroutine given to run(). Although I'm fine with run() having a few tweaks related to scheduling/polling for the purpose of it sitting in an outside event loop, I'd like to keep everything else involving the kernel sitting on the async side of the fence.

njsmith commented 7 years ago

The problem with treating run as a new select, is the same problem that select has in the first place: it's not composable. Barring threads, your program loop can only make one blocking call, whether that's select or curio.run_with_timeout. So somehow everything blocking has to go into that one call. In this case it means that all blocking code has to go into curio, leaving only non-blocking code outside curio... but you can just as easily put non-blocking code into curio too, so what's the point of splitting it up?

The example above with spawning tasks etc could just as easily be written as a single async function passed to main. There's no additional power from writing it this way; it just makes a new redundant way of writing the same things we can already write.

The exception is that having a run_one_iteration style interface is useful to allow really janky event loop integration where you busy-pool two event loops in alternation, maybe with a small sleep. (And using a timeout argument on one of the loops to implement the sleep can improve latency a bit, but again, only for some events.) I'm not saying we shouldn't support this. But we should recognize how limited the value is.

imrn commented 7 years ago

Ok. Points are: 1) Some revisions for run() for reentancy: With coros, blocking/non-blocking timeouts. Select is no model, just its timeout values: 0, 5, None. Status: On track. 2) Request for an other sibling of run() returning tasks. Discussed above. I'll digest it further. Closed.

So first things first. Lets polish (1) and get it to main stream.

dabeaz commented 7 years ago

I agree that it's kind of janky with respect to integrating with other stuff in this way (polling). One might be able to do something more interesting by making a custom Selector object and using that with the Kernel. However, that's not something I've spent much time exploring.

dabeaz commented 7 years ago

Regarding a reentrant run() method... that's not likely to happen. Too many horrible things that can go wrong with that.

imrn commented 7 years ago

Oops.. What about the proof concept you provided and we've tested? Will it be shelved?

dabeaz commented 7 years ago

I'll probably keep the run_coro() method, but have it replace the existing run() method. It already passes all of the existing unit tests. It adds a few tweaks to current behavior that are unlikely to affect any existing Curio code. Also, it's significantly faster than run() if someone decides to use it for polling or submitting a lot tasks to the kernel from sync code.

I'm still undecided about the fate of daemonic tasks. For now they're staying.

dabeaz commented 7 years ago

Also, the new run() method is, itself, implemented as a coroutine. So, it adds a whole extra level of meta-coroutine insanity to the whole thing which is already quite insane. Maybe that's good. Or not ;-).

imrn commented 7 years ago

So I understand that run() will grab some goodies from this discussion but will not be reentrant. Correct? What about timeouts for blocking, non-blocking thing?

dabeaz commented 7 years ago

It will have the current features of the run_coro() method which include the various timeout options. By re-entrant, I am referring to the problem of allowing run() to be invoked when run() is already running (e.g., recursive calls, etc.). That won't be allowed. Repeated calls to run() one after the other are fine.

imrn commented 7 years ago

Oh No!!. Nobody requested calling run() recursively... Reentrancy for Kernel.run() was always "Calling run() one after an other" in the sense that Reentrant lock() acquired multiple times... I'm surprised that we got this far if everyone understood it that differently... :))

Anyway I'm now happy with run_coro(). If you can organize timeouts similar to sockets/select I'll be happier.

1) Non-blocking: run(coro, timeout=0) -> select(..., 0). Only process ready tasks. Exit. 2) Blocking (OneShot): run(coro, timeout=None) -> select(..., None). Like 1. If none ready block until next. Process it. Exit. 3) Blocking (Hard): current run() 4) Blocking (Timeout): run(coro, timeout=5) -> select(..., 5)

Let's get it in the main stream.. Thanks for the effort.

Probably I'll be still after run() returning tasks. But first let this reentracy ;) stabilized and I myself digest your comments. Thanks again.

dabeaz commented 7 years ago

I'm keeping it, but there are probably only going to be three cases:

  1. run(coro) . Runs a coroutine until it and all non-daemonic tasks it might have spawned terminate.
  2. run(timeout=val). Run daemonic tasks through a single cycle of the scheduler. timeout value gets passed to select(). timeout=None makes it block.
  3. run(coro, timeout=secs). If a timeout is given in combination with a coroutine, I'm going to make it apply the effect of timeout_after(secs, coro) on it.

I'm still on the fence about the daemonic/non-daemonic task distinction. For now, I'm keeping them.

dabeaz commented 7 years ago

Just a note. run_coro() is now gone--it was always just temporary for experimentation. The normal run() method has this behavior.

imrn commented 7 years ago

Ok. Let's confirm, document and propose detailed behaviour of this thing. This post also has the hidden agenda of removing daemons. Please provide feedback citing item #. After contributions, I'll compile a new set and post again.

1) Kernel is a pool of tasks. 2) Kernel.run() and spawn() add tasks to this pool. 3) Kernel.run(coro=None, timeout=None) has the following properties: 4) timeout determines maximum time for polling ready events.

5) if coro==None: run() always returns None, and (6, 7, 8) applies. 6) timeout==None: Run ready tasks. Wait for and process next event. Return. (OneShot-Blocking) 7) timeout==0: Only run ready tasks. Return. (Non-blocking) 8) timeout>0: Process ready and upcoming events within x secs. (Timeout-Blocking)

9) if coro!=None: (10, 11, 12) applies: 10) timeout==None: Run until the task is done. Return result. (Hard-Blocking) Like traditional run(). PLUS other tasks may still remain in the pool, ready to be run on next iterations. 11) timeout==0: Add the task to the pool. Only run ready tasks like (7). Return None. 12) timeout>0: Process other events while waiting for the task and return result if it is ready in x secs.

Proposal: Stepped OneShot driving until all done:

while kernel.tasks:
   kernel.run()
dabeaz commented 7 years ago

I'm not willing to have that much complexity built in to the run() method. Run currently operates as follows:

  1. run(coro). Runs the specified coroutine to completion and returns its result.
  2. run(). Runs the kernel for a single scheduling cycle, executing work from background tasks (if any), and returns.

That's basically it. If an optional timeout is given, it imposes a deadline either by causing a TaskTimeout exception to be delivered to the supplied coroutine or by supplying the timeout argument to the select() call used for I/O polling.

I don't really want to have it do anything more than this.

imrn commented 7 years ago

It is not a request list. Infact, it is a procedural documentation for run() as I understand. So far I see it as an extended versions of yours. (Except, it has the assumption of tasks entering and leaving the pool without any 'daemon' stamp on them.)

May be some items are not covered. That's why I wrote it to confirm and possibly talk about daemons removal. I think this could be a basis for it.

dabeaz commented 7 years ago

There are things in the list that appear to attach different semantics to different values of the timeout when, in fact, the timeout is just passed directly to select() and there's no other interpretation of it. run(coro, timeout=secs) is the same as run(timeout_after(secs, coro)).