spion / promise-streams

A collection of node.js streams that work well with promises (through, map, reduce, etc...)
93 stars 11 forks source link

How to continue processing on error #12

Open zam6ak opened 8 years ago

zam6ak commented 8 years ago

Hi

I have a TypeScript program that processes CSV file and for each record a number of awaited operations are performed (DB calls, net lookups, etc, etc)

The questions I have are:

import * as Promise from 'bluebird';
import * as fs from 'fs';
import * as csvParse from 'csv-parse';
const ps = require('promise-streams'); // no typings :(

const csvFile = __dirname + '\\..\\data\\sample.csv';
const csvParserOptions: csvParse.ICsvParseOpts = {
  delimiter: ',',
  rowDelimiter: '\r\n',
  quote: '"',
  columns: true,
  skip_empty_lines: true,
  trim: true,
  auto_parse: false
};

async function processCsvFile(csvFile: string): Promise<void> {
  let recordNum = 1;
  const inStream: fs.ReadStream = fs.createReadStream(csvFile);
  const csvParser: csvParse.CsvParser = csvParse(csvParserOptions);

  return ps.wait(

      inStream
      .pipe(csvParser)
      .pipe(ps.map({concurrent: 1}, (csvRecord: any) => {
        recordNum++;
        console.log(`1) Processing CSV record #: ${recordNum.toLocaleString()} , data: ${JSON.stringify(csvRecord, null, 0)}`);
        return processCsvFileRecord(csvRecord);
      }))

  ).then(() => {
    console.log(`ps.wait() Finished processing!`);
  });

}

async function processCsvFileRecord(csvRecord: any): Promise<void> {
  console.log('2) About to do some other work #1...');
  await dbOp1();
  console.log('3) About to do some other work #2...');
  await dbOp2();
  return;
}

// just a simulation via delay...
async function dbOp1(): Promise<void> {
  return  Promise.delay(1000).then(() => { console.log('2-1) running dbOp1 (1 sec)...'); });
}

async function dbOp2(): Promise<void> {
  return Promise.delay(2000).then(() => { console.log('3-1) running dbOp1 (2 secs)...'); });
}

async function main(): Promise<void> {
    console.log('Begin main()....');
    await processCsvFile(csvFile);
    console.log('The end!');
}

main();
spion commented 8 years ago

I'm still working on error semantics, which will be fixed in promise-streams 2.0

Here is my initial idea/proposal:

https://gist.github.com/spion/ecdc92bc5de5b381da30#initial-proposed-error-semantics