composewell / streamly

High performance, concurrent functional programming abstractions
https://streamly.composewell.com
Other
865 stars 66 forks source link

How to split a stream into two separate streams, to be processed separately #1701

Closed sullyj3 closed 2 years ago

sullyj3 commented 2 years ago

I'm looking for something perhaps like this:

(key -> m (Either local remote)) -> t m key -> (t m local, t m remote)

The use case is that I have a stream of keys which designate items I need to fetch. The items will come from either a local cache, or a website which I scrape, depending on how old the cached copy is.

I want to use the function from the first argument to classify each key according to whether it was cached recently. If it was, return Left local - the filepath for retrieving it from the cache. Otherwise, return Right remote - the url necessary for scraping the data and filepath for it to be saved to.

I want the output streams to be separate, because I want them to be processed in different ways. The t m local will be used to handle loading the data from the local cache (which I want to happen serially) and the t m remote will be used to handle scraping data from the website (which I want to happen concurrently, with a Stream.delay between each request to be polite and avoid tying up server resources). After I've fetched all the items, I want to combine them back into a single stream.

Is there an existing way to achieve this? At first I thought it might be Fold.partition, but it seems like that doesn't quite do what I want, since it's a stream of pairs rather than a pair of streams.

harendra-kumar commented 2 years ago

Can you try the tap/distribute functions in Streamly.Internal.Data.Stream.IsStream especially tapAsync/K and distributeAsync_? Let me know if that does not solve your issue.

sullyj3 commented 2 years ago

Thanks, I ended up just using regular lists for this since they weren't very large, but I'll take your word that those would work and close this.