Closed aantron closed 5 years ago
I don't have the solution(s), but issues with Lwt_stream
that come to my mind are:
Lwt_stream.map f (Lwt_stream.filter g the_stream)
), so when the consumer is done it doesn't really have a way of signaling the whole pipeline and having the underlying source closed (if the_stream
is a file descriptor, say).I think Lwt_stream
should mostly stay as is; opam makes it easy to depend on 3rd party libraries that have more freedom to explore, at least for now. It's like Stream
in the stdlib, it's almost impossible to improve in a retrocompatible way. I don't think it's time for deprecation yet, not until we have robust, well tested alternatives in 3rd party libs.
Thanks. Agreed.
3rd party libraries that have more freedom to explore
I would like to move this along. Of course, Lwt_stream
will remain mostly as is for a while – we have to be conservative. But I am highly open to, for example, "blessing" an alternative library in the docs of Lwt_stream
, if we have/develop one that addresses many of the issues people have. It's kind of sad to offer a module that:
If Lwt_stream
really is like this, then It's a waste of people's time, and we should look to address it – even if only with documentation for now.
Lwt_stream
is imperfect but I wouldn't go as far as saying it sucks. It does serve a purpose! Both push- and pull-driven streams are useful to have. Having a finite > 1 set of consumers is possible but takes some manual juggling to make it work - certainly not ideal for users. If Lwt_stream
can be improved without breaking the Lwt-using world then it would be good to do so.
An example for a potential Lwt_stream.map_n
to show one way to have a capped concurrent map over a stream:
open Lwt.Infix
let map_n ?(max_threads = 1) f stream =
begin
if max_threads <= 0 then
Lwt.fail_invalid_arg "map_n: max_threads must be > 0"
else
Lwt.return_unit
end >>= fun () ->
let rec loop running available =
begin
if available > 0 then (
Lwt.return (running, available)
)
else (
Lwt.nchoose_split running >>= fun (complete, running) ->
Lwt.return (running, available + List.length complete)
)
end >>= fun (running, available) ->
Lwt_stream.get stream >>= function
| None ->
Lwt.join running
| Some elt ->
loop (f elt :: running) (pred available)
in
loop [] max_threads
I agree with @hcarty, it doesn't suck, but it has shortcomings.
A small suggestion on the map_n
implementation: I tend to use Lwt_pool
and Lwt_list.map_p
for this kind of things, so I wonder if it would fit to stream as well.
@c-cube That was my first attempt, but map_p
concurrently pulls stream elements and spawns new Lwt threads as long as the stream has more to take. That was a problem for very large or infinite streams. Lwt_pool
blocks within the mapping threads but it doesn't prevent new threads from spawning.
Please tell why Lwt_stream doesn't work for you, ideally at the level of semantics [...]
Consider a finite stream that yields k
successive elements during a program execution. Consider n
concurrent readers on this stream. I expect precisely one of these statements to hold, if the program runs long enough:
n * k
. This means that every reader gets to see every element, which is the semantics expected of lazy lists. In terms of monad transformers, this is ListT Lwt a
.k
. Each element is seen only by a single reader, e.g. each dequeuing event carries a unique element. With respect to uniqueness, this is what's sometimes described as a "channel", and matches what people often expect of a "stream".Presently, neither of these is the case. The number of dequeuing events ,x
, is k <= x <= n * k
. Better yet, it is non-deterministic, and impossible to know statically.
Operationally, if there is an element available in the stream, the following Lwt_stream.next
returns it, and removes it from the stream. If there is no element, however, a reader blocks on the stream. Several readers can end up blocking on the same stream in this time window, and when the next element becomes available, it is pushed to all of them.
In operational terms, this makes certain sense. Semantically, it makes no sense whatsoever, and makes streams between impossible to reason about, and plain useless, in situations with multiple readers.
This, purely semantic, hitch is my greatest gripe about streams, and it's the one I complained about in #151. The fix proposed really only addressed the semantics, but it can both lead to thread starvation and causes needless work. I have a draft of the updated stream implementation, which takes care of explicitly queuing the readers. It guarantees the behavior 2 without these downsides. In addition, it makes the implementation shorter. If there is any interest, I can flesh it out, benchmark it against the current version, and post for review. Alternatively, I was playing with the idea of publishing this implementation as a separate package.
A further issue is that push-based streams are conflated with pull-based streams, and it can be argued that push-streams should be more like channels, while pull-streams should be more like lists, but I can live with the two being lumped into one as this didn't bite me in practice.
Edit:
Forgot to add that the test suite passes when the concurrent-readers behavior is changed, and I could not provoke other programs to misbehave with that code. So changing this is totally feasible.
That's an interesting analysis. I like this semantics of "dequeuing events".
Note that lwt-pipe
, which is just a prototype so far, implements the semantics "exactly k
dequeuing events". No event is lost just because there is no consumer, and the pipe acts as channel (it is really a channel).
For the n * k
semantics, I don't see any good alternative to the (lazy) list, because there is no reliable way of knowing how many consumers are waiting for each item. A list would be a mere immutable value, with all the associate benefits of functional programming.
type 'a llist = 'a llist_cell Lwt.t lazy_t
and 'a llist_cell =
| Nil
| Cons of 'a * 'a llist
@pqwy A desire to have your second case with k
events is what prompted my to write the map_n
implementation above
I'd like to agree with @c-cube and @hcarty above that Lwt_stream
doesn't "suck." It has flaws. But we do want to hear from people that think the flaws are really serious, who might not comment if they don't think we are willing to address them – even if we have to, hypothetically, replace Lwt_stream
. In all cases, thank you to @diml for his work :) And thanks for the comments so far.
@aantron Are you interested in a PR for the Lwt_stream.map_n
implementation above, or something close to it?
@hcarty If you could wait, and we discuss it later, that would be best. I am still pretty far off in the backlog from being able to give this topic the consideration it deserves. It wouldn't be fair to ask you to do work when I can't respond to it properly. But PRs are always appreciated in principle, so thank you for offering :)
I was experimenting with alternate implementations for streams and, after some discussions with @aantron (#257), created this:
Lwt_stream.mli
: https://gist.github.com/rneswold/5ace9696ec92a41825f6f08607ff5dec
Lwt_stream.ml
: https://gist.github.com/rneswold/1f7feccf7383fe578c6d796b62ba654c
This uses the most basic primitive of LWT as its data structure, so I don't think there could be an implementation that uses less resources.
Each element of the stream is a thread obtained from Lwt.wait
. The push function uses the wakener to provide a value and extend the "chain". An Lwt_stream.t
(an opaque type) is a reference to a thread. Multiple threads, with their own cloned stream, will be blocking on the same, underlying thread so when data is added, all threads get unblocked.
When the push function goes out of scope, the final node in the stream becomes fail End_of_stream
.
@pqwy: This implementation promises n * k
dequeued events, if each thread owns a clone of the stream. If Lwt_stream.next
is wrapped with Lwt_mutex.with_lock
, threads can share a stream and you get n
dequeues.
More complicated requirements (e.g. bounded streams) could be built from this. I find, however, the simplicity of this example appealing. As is, it appears to be a very efficient way to broadcast data to multiple threads.
@rneswold just a nitpick, to me, handling non-memory resources using the GC is quite bad. If you wait until a value is finalized to close the stream (and release the underlying resources such as file descriptors) it will probably be too late, and the program will fail with "too many file descriptors are open". Same, requiring to exhaust the stream before it closes is really inefficient (what if I read the first lines of a 100MB file, then do not need the line iterator? reading the whole file before closing it is quite inefficient). I'd argue that a n
dequeue structure (a bounded mutable queue) is more efficient in most cases, and you can build a lazy immutable stream on top if you need n * k
behavior, at the cost of more memory.
@c-cube In general, I agree with you. Many types of resources should be reclaimed as soon as possible. In the case of streams, however, we can make an exception:
n
unread items (allocations) in the stream and there will be 100 * n
threads required to read all the data. Under my example, there are 2 * n
allocations per stream item (the thread and the waker). There are no additional allocations required to block the threads reading the data.I've been reading about ways to write software so that many runtime errors can be caught at compile time1 (by designing the API in such a way that it's impossible to get in the runtime error state.) My stream implementation follows this in that you can only add items to the stream. When the push function goes out of scope, the stream is closed. There's no need to check at run-time whether you're pushing values on a closed stream because it's impossible.
This stream model is very useful and efficient in broadcasting data to LWT threads.
I think your concern is that, if we read a file and convert it to a stream of lines, the file handle is tied to the lifetime of the stream. This is a valid concern and I'm glad you brought it up. In fact, I encourage criticisms. But rather than write-off this implementation, I'd like to see if there's a way to support your use-cases with minimal change to the core design.
Put another way, if 100 threads were reading a stream, under the current implementation...
Wow. My reasoning and math sounded good until I started measuring. The current Lwt_stream
consistently beats my implementation from 15% to 30% (using 4.02.3)! My benchmark module is here:
bench.ml : https://gist.github.com/rneswold/b47e3ce141ae12e10695972de2fba68f
I set n_stream
to 1,000,000 entries and then ran 1 to 1,000 threads reading it. I then set n_stream
to 1,000 and ran 1,000 to 100,000 threads reading it. I'm going to play around with this some more (for instance, is it the writer or readers that isn't scaling well?)
@rneswold I'd be interested in the addition of my lwt-pipe to your benchmark :)
I am working a lot with NodeJS and js_of_ocaml
and the state of tail-recursion optimisation in js_of_ocaml
forces me to write my own stream iterators. It would be very useful to have streams as a separate library (I mean, a separate linker unit) to ease the use of a drop in replacement for the functions it defines.
Here's an updated functional stream implementation I've been playing with:
https://gist.github.com/rneswold/0d80560a80314ce3f1aa57a64ee406dd
@aantron pointed out to me the equivalency between IVar
and Lwt.t
so I based the stream contents on a chain of resolved Lwt.t
values terminated with a Fail
. clone
simply becomes the identity function. The combinators, map
, filter
, combine
, and append
compose nicely and, no matter how many threads read from them, each node of their content is calculated once (an earlier version of this module calculated each node n
times for n
consumers.)
Since the implementation is based on Lwt.t
values, any improvements made to the LWT scheduler will automatically improve the performance of this module.
It still doesn't address @c-cube 's concerns of lazy consumption. I tried writing Lwt_fstream.of_list
to be consumer-driven, but couldn't do it without complicating the core. I'm still trying to find a solution for this use-case.
I provide this example to further discussion of this issue. At the very least, I think I'll package this up as a separate OPAM package once I finish the remaining functions (to make it closely compatible with Lwt_stream
s API.)
@c-cube
just a nitpick, to me, handling non-memory resources using the GC is quite bad. If you wait until a value is finalized to close the stream (and release the underlying resources such as file descriptors) it will probably be too late, and the program will fail with "too many file descriptors are open".
I disagree. The Gc runs depending on how much resources you allocate. If you use a custom block with finalizer you can fine tune how often the Gc should run. For anything holding file descriptors I would say that should be at least every 100 file descriptors. That way you can have 900 files open and being reachable and still never run out of file descriptors. As soon as you get to 1000 open file descriptors the Gc would run and get you back to the 900 still reachable ones so you never hit the 1024 limit.
If I see a problem here it's more that standard file decriptors are not custom blocks but simply ints. So they aren't Gc'ed at all. They are not counted as resource other than the 4/8 bytes of memory they take up in some record or tuple. And then the Gc happily creates 1 million of them before the minor heap gets processed. Obviously then you run out of file descriptors. The above only works if wrap them into a custom block at a higher level.
@mrvn I didn't know you could do such things in custom blocks. But as things stand now, the GC will not protect you against this very annoying error of "too many file descriptors", and it is not clear (to me) how Lwt can change that.
@c-cube You would have to wrap every data structure containing a file decriptor in Lwt with a custom block. The Unix.file_descr should have been a custom block in the first place and close itself when it becomes unreachable (unless already closed explicitly). Only way to tell the Gc about the cost of some external resource that I know of, i.e. make it run more often.
That is why I prefer RAII-like approaches, especially with_foo
functions.
@aantron should Lwt_stream be used for production code at all? In the documentation page (http://ocsigen.org/lwt/dev/api/Lwt_stream) you suggest to look at @c-cube's lwt-pipe, but that is also marked as incomplete -- while the functionalities I tried seem to work fine.
I like the CML approach to concurrency and it would be nice to have something that we could rely upon with Lwt. The other aspect of channels that I like is that they enforce the share nothing, thus should make it easier to have code that will run properly on ocaml multicore. What is your recommendation?
Thanks very much in advance!
A+
@kydos yep, Lwt_stream
can/should be used. It remains stable and maintained. If lwt-pipe suits your needs, though, we suggest you use that.
The idea of that message is to avoid people using Lwt_stream
so much by default, that we don't get enough feedback to develop lwt-pipe or other alternatives. Maybe the message in the docs should be adjusted.
Thank for the prompt response @aantron. I agree that the note on the documentation should be updated as it is perhaps a bit too discouraging :-)
Keep up with the great work on Lwt!
A+
@aantron, to give you some more context, we are using OCaml and Lwt to build a secure and high performance broker for the zenoh protocol. If you are familiar with NDN (Named Data Networking), zenoh addresses the problem of NDN while taking into account the new requirements posed by IoT.
Anyway, I'd like to maintain the "impure" code well isolated and use channels to compose the different processing stages. OCaml requires some discipline to ensure that no mutable state is ever transmitted across a channel, but let's stay we enforce that currently by design and careful coding.
As you can imagine we need the channels (or streams) connecting the various stages to be quite performant. Thus I did some quick benchmarking and my findings give the following rankings in terms of raw throughput as well as stability of the throughout:
Thus, going back to the reflection on your statement from yesterday, it looks like Lwt_stream is the way to go when using Lwt. That said, we want to use Lwt not only because we like it but also because of MirageOS.
If you are interested I can contribute the test code.
Ciao, @kydos
P.S. Have you done much experimenting with Lwt on ocaml-multicore? I was it is one of the open issues. Once you have something working, if you want us to do some benchmarking on throughput would be happy to do so.
That's interesting. I'd be curious to know by how much lwt_stream
is faster than lwt-pipe
?
(Note that the goal of lwt-pipe, when I had time to work on it, was to provide a clean push-based abstraction which would play nice with resources when readers are faster than writers. The problem is that it's still very immature).
@kydos I've done only some rudimentary experiments with Lwt on multicore. I'll be sure to get back when there is something concrete.
@c-cube, on the in-bound processing path the writer is the transport plug-in, which may be reading data from TCP/IP, UDP/IP, raw ETH, etc., and producing protocol frames. Next is our protocol engine. Thus in this case the reader may be slower than the writer, especially on high throughput networks or with large numbers of entries on our routing tables.
In any case the test I did was synthetic. The ultimate truth would be provided by running throughput tests on our broker and seeing what the difference really is. Perhaps I'll have somebody in our team looking at this, but won't be probably before a month. I'll let you know.
@kydos I updated lwt-pipe recently and changed some internals (along with a user contributing read_with_timeout
). Would you be interested in taking another look? :)
@c-cube I'll run the test and will let you know what I get.
A+
Closing, as there are no plans to do anything with/about Lwt_stream
for now.
Please tell why
Lwt_stream
doesn't work for you, ideally at the level of semantics (i.e. prefer "lazy streams aren't right for our use case because blahblah" to "Lwt_stream
is missing a specific function that can easily be added").Hopefully we can
Lwt_stream
towards deprecation,cc @c-cube @hcarty @seliopou @Drup @diml
Please pull in anyone else who has interest/expertise.
Some existing anguish: #239, #151.
EDIT: Despite the issue title, thank you to @diml.
EDIT: Note also #155.