GoogleCloudPlatform / DataflowJavaSDK

Google Cloud Dataflow provides a simple, powerful model for building both batch and streaming parallel data processing pipelines.
http://cloud.google.com/dataflow
855 stars 324 forks source link

Reading multiple files and need the names in processing #581

Open parwaniravi248 opened 7 years ago

parwaniravi248 commented 7 years ago

Hi, I have a requirement where I am reading 2K files and want to parse and validate the same. In the process of validating I need to know the file names in order to reject the same post validation. I have created a list of file names and I read them using loop, but the issue I have here is while submitting the job the JSON file for submission crosses the 10 MB limit and it fails. If I am running the job with 800-1000 files, its running fine.

How would you suggest to implement this?

Do I have to split the file list and run it multiple no of times for small no of files?

Thanks Ravi Parwani

lukecwik commented 7 years ago

Until there is an implementation of SplittableDoFn that works with Dataflow or Dataflow increases the maximum job size description, it seems that splitting the list of files and running multiple pipelines is the simplest solution.

parwaniravi248 commented 7 years ago

Thanks Lukecwik. Can we run the pipeline in loop (lets say 1k file in each loop) until entire files are consumed? Do you have good example where we run the pipeline in loop? Appreciate your help and guidance.

lukecwik commented 7 years ago
int maxNumFiles = 1000;
List<String> files = ...
for (int i = 0; i < files.size(); i += maxNumFiles) {
  buildAndRunPipeline(files.sublist(i, Math.min(files.size(), i + maxNumFiles)));
}

buildAndRunPipeline(List<String> files) {
  Pipeline p = ...
  // build my pipeline over the smaller list of files

  // Will launch the pipeline and not wait till it finishes
  // effectively allowing you to run multiple pipelines in
  // parallel. You'll want to guard against running too
  // many pipelines because you may hit quota limits.
  p.run(); 

  OR

  // Launch and wait till each pipeline finishes before
  // launching the next one.
  p.run().waitUntilFinish();
}