scramjetorg / scramjet

Public tracker for Scramjet Cloud Platform, a platform that bring data from many environments together.
https://www.scramjet.org
MIT License
253 stars 20 forks source link

Iterating each chunk with async function with stopping stream #121

Closed Peterculazh closed 2 years ago

Peterculazh commented 2 years ago

I want to read each row(chunk) with async function and stop on each chunk and wait for resolving the async function before resume stream, but instead async functions running in parallel, looks like stream is not waiting for resolving the async function.

Reading stream from fast-csv

let rowNumber = 0;
DataStream
  .from(parseStream(stream))
  .do(async (row: string[]) => {
      console.log(`Reading row - ${row}`);
      // read row
      // perform async request to another endpoint
      // store in db
      rowNumber++;
  })
  .run()
MichalCz commented 2 years ago

Hi @Peterculazh,

That's indeed how Scramjet works, not only that, but also if you have a number of functions these will run immediately chained when some chunks are resolved.

In order to achieve what you want, you need to inform the framework that you want to run the function one by one like this:

DataStream
  .from(parseStream(stream))
  .setOptions({maxParallel: 1})
  .do(async (row: string[]) => {
      console.log(`Reading row - ${row}`);
      // read row
      // perform async request to another endpoint
      // store in db
      rowNumber++;
  })
  .run()

I would however consider the option to run your ops in a slightly different fashion - split up your workflow and see which methods you can run in parallel, and which you shouldn't - you can gain a lot of performance this way.

DataStream
  .from(parseStream(stream))
  .setOptions({maxParallel: 32})
  .map((row: string[]) => {
    console.log(`Reading row - ${row}`);
    // read and return mapped row
  })
  .map(async (row: MappedRow) => {
    // perform async request to another endpoint
  })
  .setOptions({maxParallel: 1})
  .do(async (row: MappedRowWithExtraData) => {
    // store in db
    rowNumber++;
  })
  .run()

You could potentially use batch() for the DB also.

I hope this solves your issue. Do you think there's a place for a special method in the framework for this? Maybe something like uniDo or uniMap or just uni()?

Peterculazh commented 2 years ago

A lot thanks. First example is perfectly run as expected and required. As for parallels, I don't need in my case for parallels due of sensitive data, but I got it, thanks. As for special method, I think uniDo is good

Feel free to close topic, my problem is solved