agmen-hu / node-datapumps

Node.js ETL (Extract, Transform, Load) toolkit for easy data import, export or transfer between systems.
MIT License
291 stars 38 forks source link

Copying data between two mongo servers #32

Closed cressie176 closed 8 years ago

cressie176 commented 8 years ago

I need to copy and transform data from one mongo server to another, but not sure the best approach. It seems possible if I use two pumps as follows

sourcePump
  .mixin(MongodbMixin('mongodb://server1/books'))
  .useCollection('authors')
  .from(sourcePump.find())
  .process(function(data) {
    return destinationPump.insert(data)
  })
  .logErrorsToConsole()

destinationPump
  .mixin(MongodbMixin('mongodb://server2/books'))
  .useCollection('authors')
  .logErrorsToConsole()

sourcePump.run()
    .then(function() {
      console.log("Done");
    });

Is there a better way?

novaki commented 8 years ago

Hi,

I'd also use two pumps: one for reading and one for writing. I'd leave the .process() method of the first pump untouched (it'll copy data into its output buffer), and insert data in the second pump. E.g.:

sourcePump
  .mixin(MongodbMixin('mongodb://server1/books'))
  .useCollection('authors')
  .from(sourcePump.find())
  .logErrorsToConsole()

destinationPump
  .mixin(MongodbMixin('mongodb://server2/books'))
  .useCollection('authors')
  .from(sourcePump.buffer())
  .process(function(data) {
    return destinationPump.insert(data)
  })
  .logErrorsToConsole()

Promise = require('bluebird');
Promise.all([ sourcePump.run(), destinationPump.run() ])
    .then(function() {
      console.log("Done");
    });

You can also add the pumps to a group.

cressie176 commented 8 years ago

Thanks for the quick response. We went down the group route, but now getting stuck when trying to migrate multiple collections. We have a different pump per collection, but when the first pump finishes it appears to close the connection. Any ideas?

const group = datapumps.group()

_.each(etl.collections, (collection, index) => {

    console.log('Processing collection', collection.name)
    const sourcePump = 'source-' + collection.name
    const destinationPump = 'destination-' + collection.name

    group.addPump(sourcePump)
        .mixin(MongodbMixin(source.mongodbUrl + etl.database))
        .useCollection(collection.name)
        .from(group.pump(sourcePump).find(collection.query))

    group.addPump(destinationPump)
        .mixin(MongodbMixin(destination.mongodbUrl + etl.database))
        .useCollection(collection.name)
        .from(group.pump(sourcePump))
        .process(function(data) {
            group.pump(destinationPump).save(data)
        })
})

group
    .logErrorsToConsole()
    .start()
    .whenFinished().then(() => {
        console.log(new Date(), "done")
    })

Output

Processing collection foo
Processing collection bar
Unhandled rejection MongoError: Connection Closed By Application
    at Object.toError (/Users/steve/Development/tes/miscellaneous/mongo-pump/node_modules/datapumps/node_modules/mongodb/lib/mongodb/utils.js:114:11)
    at Server.close (/Users/steve/Development/tes/miscellaneous/mongo-pump/node_modules/datapumps/node_modules/mongodb/lib/mongodb/connection/server.js:188:38)
    at Db.close (/Users/steve/Development/tes/miscellaneous/mongo-pump/node_modules/datapumps/node_modules/mongodb/lib/mongodb/db.js:360:21)
    at /Users/steve/Development/tes/miscellaneous/mongo-pump/node_modules/datapumps/lib/mixin/MongodbMixin.js:22:37
    at tryCatcher (/Users/steve/Development/tes/miscellaneous/mongo-pump/node_modules/datapumps/node_modules/bluebird/js/main/util.js:26:23)
    at Promise._settlePromiseFromHandler (/Users/steve/Development/tes/miscellaneous/mongo-pump/node_modules/datapumps/node_modules/bluebird/js/main/promise.js:507:31)
    at Promise._settlePromiseAt (/Users/steve/Development/tes/miscellaneous/mongo-pump/node_modules/datapumps/node_modules/bluebird/js/main/promise.js:581:18)
    at Promise._settlePromises (/Users/steve/Development/tes/miscellaneous/mongo-pump/node_modules/datapumps/node_modules/bluebird/js/main/promise.js:697:14)
    at Async._drainQueue (/Users/steve/Development/tes/miscellaneous/mongo-pump/node_modules/datapumps/node_modules/bluebird/js/main/async.js:123:16)
    at Async._drainQueues (/Users/steve/Development/tes/miscellaneous/mongo-pump/node_modules/datapumps/node_modules/bluebird/js/main/async.js:133:10)
    at Immediate.Async.drainQueues [as _onImmediate] (/Users/steve/Development/tes/miscellaneous/mongo-pump/node_modules/datapumps/node_modules/bluebird/js/main/async.js:15:14)
    at processImmediate [as _immediateCallback] (timers.js:383:17)
sophrosyne:mongo-pump steve$