Open kyuksel opened 5 years ago
Quick correction here (just to make sure we all have consistent terminology -- Kaan, please correct me if there's better terminology for each concept).
Task = a worker. Easiest to think of this as a single vCPU with dedicated resources in isolation of all other tasks. However, could also be a completely isolated cpu thread on a single machine with 10cpus.
Each task gets a number of data shards. A single shard could be something like a single row, or could be all rows with sample_id X. Shard definition is determined by the programmer -- shard routing is handled by dataflow.
Dataflow is in charge of sending shards to tasks. We have no control over this.
for this case, the most scalable approach is for each task to instantiate a client once and only once, to be used across as many shards as it receives, as opposed to
After looking at some of the Beam/Dataflow documentation and online discussions, the following terms seem to be the most relevant:
From Beam execution model:
Instead of processing all elements simultaneously, the elements in a PCollection are processed in bundles. The division of the collection into bundles is arbitrary and selected by the runner. This allows the runner to choose an appropriate middle-ground between persisting results after every element, and having to retry everything if there is a failure. For example, a streaming runner may prefer to process and commit small bundles, and a batch runner may prefer to process larger bundles.
Things get fuzzier with Dataflow runner's dynamic work rebalancing scheme where it may send bundles to a number of workers and then if it detects stragglers, it may take away pieces of bundles away from the stragglers and give to workers that may have finished and/or to newly spawn workers.
TL;DR It seems like the best we can do/control is to have one bundle use a single GCS client. Most of the time, that'll be probably the only client a worker would be using. If it happens to get additional bundles to work on, it would get a new client with it, which people say is fine and Beam-idiomatic.
Hi -- a single worker can get multiple bundles. Most efficient is to have one client per worker, as discussed.
Dan's code implements per bundle start/finish tasks.
According to apache beam, we could be using per task setup, teardown functions: https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/transforms/DoFn.Teardown.html https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/transforms/DoFn.Setup.html
Note that these are java, so hopefully have made it into Python. This is a fairly critical and known part of the MR paradigm, so I'd be very surprised if not implemented.
Example things that are a good idea to do in this method: Close a network connection that was opened in DoFn.Setup Shut down a helper process that was started in DoFn.Setup
@gpbatra is this issue sufficiently stale to be closed?
Dataflow pipeline runs write tensors to GCS buckets using GCS' Python client. From Stackdriver logs, it appears a single client ends up being used by all Dataflow workers. This can be problematic. Investigate whether we can have one client per 'task' (unit of work Dataflow sends to a worker at once) and/or assess how risky having a single client for the entire run would be in the future.