googlegenomics / dockerflow

Dockerflow is a workflow runner that uses Dataflow to run a series of tasks in Docker with the Pipelines API
Apache License 2.0
97 stars 17 forks source link

Intra-pipeline command support? #10

Closed tantrev closed 7 years ago

tantrev commented 8 years ago

Just wondering if there's any easy way to add support for multiple commands within the same pipeline object.

In my case, I have many calculations that are performed on a large file. Thus, it would ideal to have only one copy operation to the pipeline object's VM and then the rest of the commands execute within that VM. Of course, it would also be nice if Dataflow's support for acyclic graphs could also optimize the command order within that VM.

Just thought I'd ask. :P

jbingham commented 8 years ago

Each step in the pipeline consists of a single Docker image and a single script to execute on it. Files will be copied onto the local disk from Cloud Storage before running the script, and outputs copied back out to Cloud Storage after the script completes. You can run as much as you like inside that one script -- for example, it can be a multi-line script. Or a single bash script that runs lots of commands.

Does that meet your needs?

tantrev commented 8 years ago

Right, that's exactly what I currently do (I just have a python script that runs a simple sequential workflow with parallelized steps). The only problem is that I haven't found a smart way for task execution that would locally start new parts of the workflow as soon as they're ready (consequently many cores can be wasted while an outlier task is being finished within the workflow). Something like Spotify's Luigi might be able to do the trick, but seems a bit overkill.

The problem with the obvious alternative of running many instances with few cores really just boils down to the wasted overhead in repeatedly copying large files (and to a lesser extent, the cost of having many small instances with large disks). I tried experimenting with Cloud Storage FUSE a while ago as a potential solution, but experienced pretty low throughput.

tantrev commented 8 years ago

Should it be of any interest, it looks like Toil may be a good fix for others in the same predicament.

jbingham commented 8 years ago

Dockerflow launches new VMs for tasks only when the previous steps complete. Outliers shouldn't cost you money for idle VMs.

But it's true that file copying isn't as efficient as it could be. If you want to create a disk and download your files to it and pass the disk from step to step in the workflow, you can, but you have to code it all up.

If Toil solves the issue, that's great! There's a lot I like about Toil.

seandavi commented 8 years ago

@tantrev, within a container, take a look at systems like make, fabric, scons, or even snakemake. Toil could also work, but it might be a little heavyweight for instantiation on a single node. So, the an example development cycle might look like:

Now, you can execute multiple jobs in parallel, with dependencies respected, in EACH step of your dockerflow workflow.

If I understand your problem....

pgrosu commented 8 years ago

@seandavi, @tantrev This was part of what I was trying to suggest via these posts and code:

https://github.com/googlegenomics/pipelines-api-examples/issues/13#issuecomment-194038122

https://github.com/common-workflow-language/cwljava/tree/develop/WIP/Dynamic%20Graph%20Workflows

https://groups.google.com/forum/#!topic/ga4gh-dwg-containers-workflows/WcJZKDYr5_E

The following is a small portion of something bigger I am working on for the past few months, and I'll give you the equivalent code in JavaScript - which hopefully might help:

var async = require('async');
var fs    = require('fs');

var filesToCopy = ['data1.txt', 'data2.txt', 'data3.txt'];

async.map(filesToCopy, async_copy, function() {} );

function async_copy( filename ) {
    console.log("Copying " + filename );
    fs.createReadStream(filename).pipe(fs.createWriteStream(filename + '_copied.log'));
}

console.log("Proceed with next steps asynchronously...");

async.map( filesToCopy, async_print, function() {} )

function async_print( filename ) {
    fs.readFile(filename + '_copied.log', 'utf8', function (err,data) {
        if (err) {
            async_print(filename);
        } else {
        console.log(data);
    }
  });
}

The trick is that you have to start thinking of files as distributed information that will never stop flowing/streaming which might appear at some time in the future, rather than just data in a file.

Hope it helps, `p

tantrev commented 8 years ago

@jbingham @seandavi @pgrosu - thank you for all of the thoughtful responses. I apologize, I kind of slaughtered describing my use case, but @seandavi hit the nail right on the head. I indeed meant to propose marrying @jbingham's beautiful docker flow with another management workflow solution within a "docker step". I completely agree that toil probably is overkill, and it admittedly looks painful to write its workflows. Snakemake looks like a great alternative - I just wished it had toil's cache support and Nextflow's error recovery. Snakemake's support for sub-workflows is quite nice, however.

@pgrosu - that sounds really cool. If I understand correctly (which I probably don't), are you basically proposing piping between nodes? I like it, but how would you handle programs that kick out multiple files? For example, sambamba sort will simultaneously build an index while sorting, so I have thought it didn't really lend itself well to piping. Additionally, I occasionally do operations that take advantage of indexed data - like using Freebayes to call variants but only within the Mitochondrial region. Without Freebayes use of a bam's index file, this otherwise very short operation will take drastically longer from having to stream the entire bam file. For single-threaded tasks, would Google's 2 gbps networking limit be easily saturated? Also, would there be a way to pipe the output of one program to multiple other programs simultaneously? (I could never figure out how to do this even locally) Such predicaments have been the main shackles for me to still think of data inputs/outputs in pipelines as "files", rather than data streams.

pgrosu commented 8 years ago

@tantrev Unfortunately most of these Computer Science concepts are not being exposed too much in the field of Bioinformatics, but are fundamental to how global distributed companies set up their Cloud infrastructure. Without complicating the implementation approach, I'll answer how it is possible to stream data to multiple programs (Subscribers) at the same time from a Publisher - and then I'll answer the rest a little later:

1) Below is the Publisher (the broadcaster), which I named publisher.js:


var dgram = require('dgram');

var server = dgram.createSocket('udp4');

var node_port_by_id = {};
var node_id_by_port = {};

var port = 4000;

// Timer between broadcasts
var tick = 1000;

process.stdin.resume();

server.on('message', function(message, rinfo) {

  if ( message.length > 0 ) {

    cmd = message.toString().trim();
    cmdArray = cmd.split( " " );

    originating_port = cmdArray[0].toString().trim();
    cmdArray.splice(0, 1); //remove the originating port

    verb = cmdArray[0].toString().trim();
    cmdArray.splice(0, 1); //remove the verb
  }

  // Adding the new node's port information to the dictionary
  if ( verb === "add_node" ) {

    node_port = cmdArray[0];
    cmdArray.splice(0, 1); //remove the port

    node_id = cmdArray[0];
    cmdArray.splice(0, 1); //remove the node id
    if ( !(node_port === undefined) ) {
      console.log( "Adding node at port " + node_port + " for node id: " + node_id );

      node_port_by_id[  node_id  ] = node_port;
      node_id_by_port[ node_port ] = node_id;
    }

  } 

});

server.on('listening', function() {
  var address = server.address();
  console.log("The broadcaster is running...");
  console.log('');
});

server.bind(port);

// Broadcast to all connected subscribers
broadcastToAll = function() {

  for(var key in node_id_by_port) {
    message = new Buffer("Hello from the Publisher to the " + node_id_by_port[ key ] );
    server.send( message, 0, message.length, key, '127.0.0.1' );
  }

}

// Launch the ticker to periodically check the queue
setInterval( broadcastToAll , tick );

2) Below is the code of the subcriber(s), which I named subscriber.js:

var dgram = require('dgram');

var host = process.argv[2];
var port = parseInt(process.argv[3], 10);
var client = dgram.createSocket('udp4');
var id = process.argv[4];
var broadcast_port = 4000;
var this_port = "";

var this_stdout = [];
var custom_stdout_placeholder = ""; 

process.stdin.resume();

process.stdin.on('data', function(data) {

  //All messages to the Publisher add the originating port as a prefix in the message
  var originating_port_with_data = new Buffer( this_port + " " + data);
  client.send(originating_port_with_data, 0, originating_port_with_data.length, port, '127.0.0.1');

});

// Process commands
client.on('message', function(message) {

  console.log(message.toString());

});

client.on('listening', function () {

    var address = client.address();

    this_port = address.port.toString();

    // Request the node's port to be added to the Publisher's dictionary
    data = new Buffer(this_port + " add_node " + address.port.toString() + " " + id);
    client.send(data, 0, data.length, broadcast_port, '127.0.0.1');

});

// Ensure that the client.on('listening',...) function executes so that the "add_node"
// portion is processed when the node is first launched
function start() { 
  var data = new Buffer("");
    client.send(data, 0, data.length, broadcast_port, '127.0.0.1'); 
}
start();

console.log('');
console.log('Subscriber started on port ' + port);
console.log('');

3) If you first launch the publisher.js like this, which will run on port 4000:

node publisher.js

Then you can launch connected subscribers like this, starting at port 4001 and incrementing - the format is node subscriber.js local_IP subscriber_PORT subscriber_name:

node subscriber.js 127.0.0.1 4002 first_subscriber

Then you will see something like this being received - once a second - from the publisher to each specific subscriber:

Hello from the Publisher to the first_subscriber
Hello from the Publisher to the first_subscriber
...

And if you connect another subscriber like this:

node subscriber.js 127.0.0.1 4003 second_subscriber

Then you will see this:

Hello from the Publisher to the second_subscriber

There are more complex ideas/layers I did not expose here to keep this simple example readable, but hopefully with time I can help introduce the Bioinformatics community on the concepts and how they can be utilized.

Hope it helps, ~p

deflaux commented 8 years ago

@pgrosu Due to the rules of this project, you must sign the CLA or refrain from including code in any comments. We really value your participation and help! Can you please sign the CLA or edit your comments to remove the code?

tantrev commented 7 years ago

Thank you everyone for all of the help. In retrospect, my question was more or less silly to ask - it's now clear to me that there are many other easy solutions already designed to solve this problem. That being said, I think it's only appropriate for me to close the issue. Thanks again.