atomgalaxy / review-executor-sendrecv

A board for reviewing the executors proposal P0440
1 stars 0 forks source link

networking client #11

Open dietmarkuehl opened 4 years ago

dietmarkuehl commented 4 years ago

As the project assignment I'm trying to create some sort of telnet-like networking client. I think I can deal with the networking stuff although my current implementation is also somewhat lacking in for this (the current version really hacked together and needs work in various areas to be reasonable).

On a high level I imagine something like

char const hello[] = { 'h', 'e', 'l', 'l', 'o', '\n' };
char            buffer[1024];
io_context         context;
stream_socket socket(context);

auto operation
   = just(endpoint)                                       // should really be resolve
   | async_connect(socket)                      // uses the result of the previous step 
   | socket.async_send(hello)
   | socket.async_receive(buffer)
   | async_wait([](auto&&){ std::cout << "received response\n";})
   ; 
operation.start();
context.run();

As each step of these operations is asynchronous I imagine chaining them together. Instead of using the cute syntax the sequence can be build up by suitable uses of connect(), auxiliary senders, and reversing the order. I believe can work out most of these. However, I do have to problems I don't know how to solve, yet:

  1. Some of these steps use the output of previous step as output. For example, the async_connect(socket) is really a function object which, when called, produces a sender (i.e., it calls socket.async_connect(endpoint)). I haven't figured out, yet, how to connect these together.

  2. A sequence of operations isn't really sufficient: once a response was obtained a similar interaction can be triggered (although now the socket is, of course, readily connected). I don't know how to create a repetition of the operations.

The issue I have is that I don't know where the operation states would go. It seems, I'll need to connect() the various tasks with provide an immovable operation state. The deeper problem may be that I haven't found how to put these operation states into an object of may choice or, when I can't determine their type/size reasonably, put them on the heap. That may well be doable with guaranteed copy elision, though.

I guess, after having gone down all rabbit holes I could find on the way (i.e., implementing some part of the networking TS, concepts, and senders) I'm now lost somewhere and barking up the entirely wrong tree. It will probably take me some time until I have understood how that is properly done.

kirkshoop commented 4 years ago

Starting backwards.

The goal for operation states is that each one is stored in the operation state of its consumer. Thus when async_connect_impl.connect() is called on the sender returned from async_connect it can use decltype(just_sender_impl.connect()) to include space in async_connect_operation_impl to store the just_operation_impl, thus each consumer aggregates the states into its own state. sync_wait() is able to store the state aggregation on the stack because it blocks. The lifetime created by sync_wait works for all the aggregated operation states. in cases where the scope is not bound to the stack or to another enclosing scope (like a consuming operation state) then the aggregated state can be heap allocated and managed in some other way. We try to avoid creating scopes that are not enclosed. This requires a change in thinking from existing unstructured-concurrency patterns (like shared_ptr and other forms of GC).

In this case the goal seems to be to only block in io_context::run(). This pattern de-structures the concurrency because the only signal that the operation is completed is a side-effect written to stdout. We currently have sync_wait() create a scheduler and provide that to the sender connect so that there is something to schedule tasks on. This is in addition to sync_wait() watching for the completion of the sender so that it knows when to exit (sometimes the scheduler queue can run dry before the sender completes, other times the queue might continue to have work after the sender completes). We have started talking about having sync_wait() take a sender and a context. rather than creating its own scheduler it would use the scheduler from the context.

A pipe chains a sender on the left-hand-side to a function that takes a sender and returns a sender on the right-hand-side. So the chain you have supplied will work, if async_connect() returns a function that takes the just_sender_impl and returns a async_connect_sender_impl and then socket.async_send() returns a function that takes the async_connect_sender_impl and returns the async_send_sender_impl etc..

However, it is better for things like socket to be passed to the next function asynchronously in an asynchronous library. So applying all the above it might look like:

char const    hello[] = { 'h', 'e', 'l', 'l', 'o', '\n' };
char          buffer[1024];
io_context    context;

sync_wait(
   just(endpoint)                               // should really be resolve
     | async_connect(stream_socket(context))    // uses the result of the previous step 
     | async_send(hello)                        // uses the result of the previous step
     | async_receive(buffer)                    // uses the result of the previous step
     | tap([](auto&&){ std::cout << "received response\n";})
     | async_close(),                           // uses the result of the previous step
  context);

This expects that each async_ method calls set_value(socket) on the consumer - except for async_close() which calls set_value(). tap() is used to perform side effects without changing or mutating the set_value()/set_error()/set_done() calls from the producer to the consumer

Finally, This example is attempting to design a library for a stream_socket using a single value sender. We have not yet written the paper showing the ways to extend sender/receiver to support async sequences of packets from a stream_socket. The sender/receiver in 0443 is specifically intended to be extended to support async sequences and we hope to show more ASAP.

dietmarkuehl commented 4 years ago

Thank you very much, Kirk! I'll need some time to digest that and apply it to my code. Actually, this sequence of calls is roughly what I can see to get working with the current interface. However, I'm imagining to have a sequence of send/receive pairs: a single value stream socket isn't going to work in general as a long lived stream may pass through way more data than a given node has capacity to store. Likewise, a single chain doesn't work for a server where multiple connections can get accepted and processed interleaved. I haven't figured out fully how to make it work as the operation state seems to prefer living on the stack. ... and your comment (and the example on libunifex) seems to imply that there is another abstraction (a stream) which could help but it currently missing.

The idea behind blocking in run() was that there is actually just one active thread which eventually poll()s (or epoll()s or does the I/O uring equivalent) and just dispatches the [small] work items to the respective callers. It didn't occur to me that sync_wait(..., context) could actually combine waiting for completion and executing work in the context.

atomgalaxy commented 4 years ago

Dietmar, did you ever complete this?