scramjetorg / scramjet

Public tracker for Scramjet Cloud Platform, a platform that bring data from many environments together.
https://www.scramjet.org
MIT License
253 stars 20 forks source link

More specific parallelism #86

Closed johnpyp closed 4 years ago

johnpyp commented 4 years ago

Is your feature request related to a problem? Please describe. Different parts of a pipeline might want different levels of parallelism, for example you might not want to send more than 20 concurrent http requests, but be fine with 100 concurrent file reads. Describe the solution you'd like It would be nice to have another parameter in the .map and .flatMap functions (and probably others) which would allow you to set a level of parallelism for a specific mapping.

I know Highland implements this with a .parallel function, where you .map into a stream of streams and the .parallel() function streams them into a single stream in parallel. A stream of streams is kind of clunky, so likely accepting stream elements that would otherwise be accepted to .flatMap seems like it would be a nice implementation.

Describe alternatives you've considered I think it's possible to implement this as a module, but that's obviously not ideal for something that I think is a pretty common use case.

I've also considered mapping a batch into a Promise.all, but this is also not ideal because it forces minimum batch sizes which adds a overhead in buffers or backpressure, not to mention that it seemed to block the stream and didn't work when I tried it.

MichalCz commented 4 years ago

Thanks for the proposal, please give me a couple days to think about this since there's a couple things to take into consideration.

MichalCz commented 4 years ago

Ok, so you may not know this (and this should be probably better documented, but this is the issue of the whole core of Scramjet - so sadly nothing new here), but there already is a way to do this - the solution is to call in chain: .setOptions({maxParallel: 32}). (default is 2 * vCPUs)

This however, due to the merging of transforms and somewhat unknown passing of options is not fully dependable and murky.

A situation like this:

source
   .setOptions({maxParallel: 8})
   .map(someTransform)  // maxParallel is 8
   .filter(somethingElse)   // here as well
   .pipe(nonScramjetStream) // not anymore
   .pipe(new DataStream()) // and we're back to default here

can be incredibly frustrating and no matter how good documentation we'd have around, that won't be an enough solution.

What could be done would be either via a domain style callback:

source
   .withOptions({maxParallel: 8}, (stream) => {
      .map(someTransform)  // maxParallel is default
      .filter(somethingElse)   // here as well
      .pipe(nonScramjetStream) // not anymore
      .pipe(new DataStream()) // we're back to default of 8.
   })

Or using some funky proxy keeping that flat - althouth that would mean that the nonScramjetStream would need to be proxied and that could lead to unknown issues.

stale[bot] commented 4 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.