ocaml-multicore / eio

Effects-based direct-style IO for multicore OCaml
Other
548 stars 66 forks source link

Feature request: Stream select #577

Open dermesser opened 1 year ago

dermesser commented 1 year ago

Thank you very much for your work on eio! I've thoroughly enjoyed working with it, despite being a newcomer to OCaml. Eio feels more advanced than the async frameworks in other languages I've used.

So far, I have been missing one feature: Waiting on multiple streams, which in languages like Go or Rust is called select. I've taken a look at the source code, but as a newbie I didn't want to charge ahead with some untenable solution (and besides, any possible implementation appeared non-trivial to me).

I don't have a specific API in mind, but something simple like

val select : ('a Stream.t * ('a -> 'b)) list -> 'b

to select between different channels producing items of the same type and permitting per-stream handling, or

val select2 : 'a Stream.t -> 'b Stream.t -> ('a, 'b) Either.t

for selecting between exactly two streams may already prove to be useful. I'd love to hear what you think.

anmonteiro commented 1 year ago

Can you implement that in terms of Fiber.first and Stream.take?

dermesser commented 1 year ago

Can you implement that in terms of Fiber.first and Stream.take?

That's clever! I hadn't thought of this at all, but it obviously makes the implementation trivial. I will try to prepare a PR to see if it is met with interest.

Unfortunately the use case of select2 described above appears to not be possible, though (it'd need a combination of Fiber.pair and Fiber.first, IIUC)

talex5 commented 1 year ago

Something like this needs to be atomic, which Fiber.first doesn't support.

The easy solution is to use kcas queues, which can be composed easily.

However, it would also be nice to have direct support in Eio.Stream too. The general idea is that you create an Atomic to hold the result and then CAS that to the value. There are two implementations of Stream, depending on whether it is zero-capacity or not. For the zero-capacity case (sync.ml) there is already support for rejecting a value (causing the producer to retry with the next client). For the non-zero-capacity case, you have a lock and can just decide not to pop the value, I think.

dermesser commented 1 year ago

Thank you for the quick reply! I've been looking around a bit, mostly out of interest, and some questions came up. Keep in mind that it's likely I'm just missing your point here.

The general idea is that you create an Atomic to hold the result and then CAS that to the value.

I've interpreted this to mean that, in order to keep within the framework that there is, we'd still have one fiber for each stream, but an additional atomic to make sure exactly one item is returned and no item is lost. For now, I'm only focused on the Locking implementation. So kind of like this?

(* WARNING: this is NOT real code, it WILL lead to deadlocks *)
  let select streams =
    let result = Atomic.make None in
    (* compare_and_set works for options, I guess? At least as long as we're comparing None to None. *)
    let place_result r = Atomic.compare_and_set result None r in
    let wait_for stream () = begin
      let item = take stream in
      if place_result item
      then ()  (* This channel was the first to receive an item. Return. *)
      (* This channel was not first. Place item back into stream. *)
      (* This will block this fiber indefinitely f the stream
         has been filled up in the meantime and there are no other readers! *)
      else add stream item
    end in
    let spawn_fibers sw = Fiber.any (List.map (fun stream -> fun () -> Fiber.fork ~sw (wait_for stream)) streams) in
    Switch.run spawn_fibers;
    Atomic.get result

For the non-zero-capacity case, you have a lock and can just decide not to pop the value, I think.

It appears that this is tricky to implement. One way to extend the code above is to implement a peek functionality for the Locking stream. In the case where we have to wait for an item to be added to the channel, the item will skip the queue and is directly transmitted through the Waiter.t.

At that point, the item is already in our hands, and we have to do something with it. Assuming another stream has already yielded an item and called place_result above, we can't return it, nor can we rely on the stream's queue having free capacity to add it back. It seems to me that the "skipping the queue" part, transmitting the item through a Waiter.t, makes our life more difficult here.

I've skipped the Sync kind of stream for now, this is just what I learned after messing around a bit.

talex5 commented 1 year ago

I've interpreted this to mean that, in order to keep within the framework that there is, we'd still have one fiber for each stream, but an additional atomic to make sure exactly one item is returned and no item is lost.

No, there shouldn't be a need for any extra fibers. The taking fiber suspends itself and then registers with each stream being watched. When woken, it tries to store the result but if it fails, it rejects the value instead.

It appears that this is tricky to implement. One way to extend the code above is to implement a peek functionality for the Locking stream. In the case where we have to wait for an item to be added to the channel, the item will skip the queue and is directly transmitted through the Waiter.t

Yes, I think this needs to be changed. Probably Waiters would need to be modified (e.g. maybe waiter.finished could hold the value instead? and the same waiter can be on multiple queues?).

talex5 commented 1 year ago

BTW, one reason I haven't tried this already is that we might want to replace the use of Waiters with Cells here first anyway.