agmen-hu / node-datapumps

Node.js ETL (Extract, Transform, Load) toolkit for easy data import, export or transfer between systems.
MIT License
291 stars 38 forks source link

How to do multiple processBatch commands? #37

Closed thardyman closed 8 years ago

thardyman commented 8 years ago

I have a pump that I'd like to apply several batch processes to. I can't work out how to do this, as the last .processBatch command seems to overwrite all the preceding ones.

For example, I'd like to add the 'timestamp' attribute, perform some transformation then upsert to my SalesForce API. Here is an excerpt of my pipeline...

  etl.pump = new Pump();

  etl.pump
    .mixin(BatchMixin)
    .batchSize(50)
    .from(connections.mysql.query(etl.sourceSql).stream())
    .mixin(SalesForceMixin({connection : sfConnection}))
    .processBatch((rows) => etl.pump.addAttribute(rows, 'timestamp', batchTimestamp))
    .processBatch((rows) => etl.pump.transform(rows, transformFn))
    .processBatch((rows) => etl.pump.upsertRows(rows))
    .logErrorsToConsole()
    .run()
    .then(() => {
      console.log(etl.name + ': done');
      connections.mysql.end();
      connections.salesforce.disconnect();
    })

My addAttribute function in my SalesForceMixin returns a promise that transforms the item...

    target.transform = (_items, transformer) => {
      return new Promise((resolve) => resolve(_items.map(transformer)));
    };

I can see that only the 3rd processBatch command is executed: etl.pump.upsertRows. Looking into the code, I think I can see why; the processBatch command overrides the _processBatch function.

Am I missing something? I don't really want to do all my transformation and loading in one function.

novaki commented 8 years ago

You have three options:

  .processBatch((rows) = > {
    return etl.pump.addAttribute(rows, 'timestamp', batchTimestamp)
      .then(() => { return etl.pump.transform(rows, transformFn); })
      .then(() => { return etl.pump.upsertRows(rows); })
  })

In the first case, I've just changed the three .processBatch calls into a promise chain. Please note that .processBatch must return a Promise, otherwise flow control will not work.

thardyman commented 8 years ago

Thanks @novaki. That is really helpful. I think I'll go with your first option and see how I get on 👍