agmen-hu / node-datapumps

Node.js ETL (Extract, Transform, Load) toolkit for easy data import, export or transfer between systems.
MIT License
291 stars 38 forks source link

ETL process using the MongodbMixin works, but the node process continues to run... #6

Closed mgarc152 closed 9 years ago

mgarc152 commented 10 years ago

Hello,

I have been working on a code snippet that uses pump groups to extract data from a rest service, transform the data, and then load it into MongoDB. The code below actually executes the ETL process successfully. Additionally, the "whenFinished" function of the pump group is invoked, but the actual Node process does not terminate. I have to invoke "process.exit()" to force termination. I am only seeing this behavior when using the MongodbMixin. Is this maybe related to an open mongo connection? or is the pump group in a bad state?..... Below I have attached the code snippet. Any help is greatly appreciated. Thanks

var datapumps = require('datapumps'); var pumpGroup = datapumps.group();

//add two pumps var pumpOne = pumpGroup.addPump('pumpOne'); var pumpTwo = pumpGroup.addPump('pumpTwo');

//configure the pumps pumpOne .mixin(datapumps.mixin.RestMixin) .from(pumpOne.createBuffer()) .get('http://www.comicvine.com/api/volumes/', { "multipart": false, "query": { "api_key": "XXXXXXXX", "format": "json", "filter": "name:spider-man", "field_list": "id,name,start_year,description,publisher,image" } }) .then(function (volumes) { volumes.results.forEach(function (volume) { //write to the input buffer of pumpOne pumpOne.from().writeAsync(volume); }); });

//pumpTwo input buffer is the output buffer of pumpOne pumpTwo .mixin(datapumps.mixin.MongodbMixin('mongodb://127.0.0.1:27017/pumps')) .from(pumpOne.buffer()) .useCollection('volumes') .process(function(volume){ pumpTwo.insert({name: volume.name}); });

pumpOne .process(function (volume) { //transformation volume.name = volume.name + ' copying records to MongoDB';

    //write to the output buffer of pumpOne
    this.buffer().writeAsync(volume);

    if (pumpOne.from().isEmpty()) {
        pumpOne.from().seal();
    }
});

pumpOne.whenFinished().then(function(){ console.log('Pump 1 has finished'); });

pumpTwo.whenFinished().then(function() { console.log('Pump 2 has finished'); });

pumpGroup .start() .whenFinished().then(function() { if(!pumpOne.errorBuffer().isEmpty()){ console.log(pumpOne.errorBuffer().getContent()); } console.log('_ENDED ETL PROCESS_');

    console.log('Output buffer for pump one', pumpOne.buffer('output'));
    console.log('Output buffer for pump two', pumpTwo.buffer('output'));

    //not sure why the process does not terminate on its own...
    process.exit(0);
});
novaki commented 9 years ago

Yes, that's because of mongo driver, call .close() on in group .whenFinished:

// replace process.exit(0) with:
pumpTwo.db().close();
thxmike commented 9 years ago

I ran through the example provided by mgarc152. with the change novaki specified. The example fails with the following error message: Possibly unhandled MongoError: Connection Closed By Application at Object.toError (/Users/miker/Projects/TestProjects/data_migration/node_modules/datapumps/node_modules/mongodb/lib/mongodb/utils.js:114:11) at Server.close (/Users/miker/Projects/TestProjects/data_migration/node_modules/datapumps/node_modules/mongodb/lib/mongodb/connection/server.js:188:38) at Db.close (/Users/miker/Projects/TestProjects/data_migration/node_modules/datapumps/node_modules/mongodb/lib/mongodb/db.js:359:21) at /Users/miker/Projects/TestProjects/data_migration/server3.js:70:22 at tryCatch1 (/Users/miker/Projects/TestProjects/data_migration/node_modules/datapumps/node_modules/bluebird/js/main/util.js:45:21) at Promise$_callHandler as _callHandler at Promise$_settlePromiseFromHandler as _settlePromiseFromHandler at Promise$_settlePromiseAt as _settlePromiseAt at Promise$_settlePromises as _settlePromises at Async$_consumeFunctionBuffer as _consumeFunctionBuffer

I ran it through the debugger in webstorm (removing both process.exit(0) and pumpTwo.db().close();) and it appears to get stuck in line 41 of the Async Library.

novaki commented 9 years ago

I've spotted an error in the code: pumps do not return a promise as expected (see docs):

pumpOne.process(function (volume) {
    //transformation
    volume.name = volume.name + ' copying records to MongoDB';

    if (pumpOne.from().isEmpty()) {
        pumpOne.from().seal();
    }

    //write to the output buffer of pumpOne
    return this.buffer().writeAsync(volume);
});

pumpTwo.process(function(volume){
    return pumpTwo.insert({name: volume.name});
});

Without returning promises, pumps ended before the mongo query completed and .whenFinished() closed the connection while a query was in progress. Hence the error message.

novaki commented 9 years ago

Also, .process runs in the pump context, so you can write:

pumpTwo.process(function(volume){
    return this.insert({name: volume.name});
});