clj-commons / manifold

A compatibility layer for event-driven abstractions
1.02k stars 106 forks source link

Best way to do "live" batching? #190

Closed mac01021 closed 3 years ago

mac01021 commented 4 years ago

I have a use case where I want to batch the contents of a stream based on both duration and on batch size, exactly as allowed by the batch function in manifold.

I want to write each batch out to a different file, but write each message within the batch as soon as it arrives, rather than accumulating the entire batch in memory before writing any of it. (In actuality, my batches may be too large to fit in memory).

So, I think, rather than the stream of vectors returned by batch I want a stream of streams where each substream will be closed when the duration/size limit is hit and the next substream is available as soon as the previous one is closed.

Looking through the docs, it does not seem that anything like this is built in. So what is the easiest way for me to meet the needs described above? Is it to use manifold? Or to do my batching logic outside?

Thanks!

KingMob commented 3 years ago

If you're not looking to do this in parallel, one way might be to use (transform (partition-by f)) or even connect-via, and do your batching logic in the fn, but instead of using that logic to batch, use it to determine what file to append to. Then you'd pass down a map containing the original value, plus the filename, and process that. (You'd have to extract the grouping logic from consume, though).

Or, if you need the batch for other reasons, you could have two downstream consumers, one using consume with the fn above to write to files, and another using batch to actually group.

A stream of streams should work, too, but will be more complicated.