GoogleCloudPlatform / DataflowJavaSDK

Google Cloud Dataflow provides a simple, powerful model for building both batch and streaming parallel data processing pipelines.
http://cloud.google.com/dataflow
855 stars 324 forks source link

GCS temp location to store temp files #541

Open demousteven opened 7 years ago

demousteven commented 7 years ago

trying to run BigQueryTornadoes and keep having this error : [ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.4.0:java (default-cli) on project google-cloud-dataflow-java-examples-all: An exception occured while executing the Java clas s. null: InvocationTargetException: BigQueryIO.Read needs a GCS temp location to store temp files. -> [Help 1]

here is my mvn configuration : mvn compile exec:java \ -Dexec.mainClass=com.google.cloud.dataflow.examples.cookbook.BigQueryTornadoes \ -Dexec.args="--project=dev6celbux \ --output=dev6celbux:dev6celbux.Count_table\ --stagingLocation=gs://project_autocomplete/BigQueryTornadoes/staging\ --runner=BlockingDataflowPipelineRunner"

davorbonaci commented 7 years ago

This might be an issue -- we'll look into this.

In the meanwhile, you can easily work around the problem by specifying --tempLocation=gs://... on the command line.

kpr6 commented 7 years ago

Doesn't work, still the error persists

kpr6 commented 7 years ago

I'm trying this way : mvn compile exec:java \ -Dexec.mainClass=com.example.Template \ -Dexec.args="--output=gs://practise-bucket/output \ --dataflowJobFile=gs://practise-bucket/templates/DataFlowEg \ --tempLocation=gs://practise-bucket/temp \ --runner=TemplatingDataflowPipelineRunner"

tgroh commented 7 years ago

You're also getting the "BigQueryIO.Write needs a GCS temp location to store temp files." error message? Your choice of runner and the arguments you're passing to exec don't look like they would cause the message mentioned above.

kpr6 commented 7 years ago

Yes. This is my code. Please help:

public class Template {

  static class stringToTableRow extends DoFn<String, TableRow> {
    @Override
    public void processElement(ProcessContext c) {
      Date dateObj = new Date();
      DateFormat df = new SimpleDateFormat("yyyy-MM-dd");
      String record_date = df.format(dateObj);
      String[] words = c.element().split(",");
      for(int i = 0 ; i < words.length ; i+=2){
          TableRow row = new TableRow();
          row.set("Id",words[i]);
          row.set("Bank",words[i+1]);
          row.set("Record_Date",record_date);
          c.output(row);
        }
      }
    }

  static class stripHeader extends DoFn<String, String>{
    @Override
    public void processElement(ProcessContext c){
      String line = c.element();
      if(!line.split(",")[0].equals("Id")){
        c.output(line);
      }
    }
  }

  public interface TemplateOptions extends DataflowPipelineOptions{

    @Description("Path of the file to read from")
    @Default.String("gs://practise-bucket/Sheet1.csv")
    ValueProvider<String> getInputFile();
    void setInputFile(ValueProvider<String> value);

    @Description("intermediate table")
    @Default.String("helloworld-166506:sampleSheets.table1$20170101")
    ValueProvider<String> getIntTableName();
    void setIntTableName(ValueProvider<String> value); 

    @Description("final staging table")
    @Default.String("helloworld-166506:sampleSheets.table4$20170102")
    ValueProvider<String> getTableName();
    void setTableName(ValueProvider<String> value);

  }

  public static void main(String[] args) {
    // Create a DataflowPipelineOptions object. This object lets us set various execution
    // options for our pipeline, such as the associated Cloud Platform project and the location
    // in Google Cloud Storage to stage files.
    TemplateOptions options = PipelineOptionsFactory.create()
      .as(TemplateOptions.class);
    //options.setRunner(TemplatingDataflowPipelineRunner.class);
    // CHANGE 1/3: Your project ID is required in order to run your pipeline on the Google Cloud.
    options.setProject("helloworld-166506");
    // CHANGE 2/3: Your Google Cloud Storage path is required for staging local files.
    options.setStagingLocation("gs://practise-bucket/staging");

    //options.setTempLocation("gs://practise-bucket/temp");

    // Create the Pipeline object with the options we defined above.
    Pipeline p = Pipeline.create(options);

    p.apply(TextIO.Read.named("Reading day_1").from(options.getInputFile()).withoutValidation())
     .apply(ParDo.of(new stripHeader()))
     .apply(ParDo.of(new stringToTableRow()))
     .apply(BigQueryIO.Write
                           .named("Loading day1 partition")
                           .to(options.getIntTableName())
                           .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                           .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                           .withoutValidation());

     p.apply(BigQueryIO.Read.named("Reading data from intermediate table")
                            .fromQuery("SELECT Bank,Record_Date from " + options.getIntTableName())
                            .withoutValidation())
     .apply(BigQueryIO.Write
                      .named("Writing selected columns to the final table")
                      .to(options.getTableName())
                      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                      .withoutValidation());

    p.run();
  } 
}
tgroh commented 7 years ago

You need to either programatically set the tempLocation (via options.setTempLocation(String)) or use the PipelineOptionsFactory.fromArgs(args) instead of PipelineOptionsFactory.create() to recieve the temp location from the command line.

You've commented out the line that sets the tempLocation field to "gs://practise-bucket/temp"

kpr6 commented 7 years ago

Yeah, Looks like that worked, didn't know it was required at both places. Thanks a lot Btw, stuck with a new error now "InvocationTargetException: java.lang.RuntimeException: Not called from a runtime context." Can you help on this as to what is causing this? I can't identify any mistake since a long time

tgroh commented 7 years ago

Can you provide more of a stack trace?

My initial assumption is that you're using a value provider that doesn't have a value assigned to it, but I would need more information to really look into it

kpr6 commented 7 years ago

Values are assigned to the value provider at runtime, isn't it? btw stack trace:

java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: Not called from a runtime context.
    at com.google.cloud.dataflow.sdk.io.Read$Bounded$1.evaluateReadHelper(Read.java:189)
    at com.google.cloud.dataflow.sdk.io.Read$Bounded$1.evaluate(Read.java:168)
    at com.google.cloud.dataflow.sdk.io.Read$Bounded$1.evaluate(Read.java:164)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:858)
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
    at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:103)
    at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:260)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96)
    at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:181)
    at com.example.Template.main(Template.java:119)
    ... 6 more
Caused by: java.lang.RuntimeException: Not called from a runtime context.
    at com.google.cloud.dataflow.sdk.options.ValueProvider$RuntimeValueProvider.get(ValueProvider.java:219)
    at com.google.cloud.dataflow.sdk.io.FileBasedSource.createReader(FileBasedSource.java:376)
    at com.google.cloud.dataflow.sdk.io.Read$Bounded$1.evaluateReadHelper(Read.java:177)
    ... 19 more
tgroh commented 7 years ago

Values are only assigned if you're executing the pipeline with a value present in the options. That message isn't particularly useful, especially on the DirectPipelineRunner, but the issue is that there's an unset ValueProvider

Caused by: java.lang.RuntimeException: Not called from a runtime context.
at com.google.cloud.dataflow.sdk.options.ValueProvider$RuntimeValueProvider.get(ValueProvider.java:219)
at com.google.cloud.dataflow.sdk.io.FileBasedSource.createReader(FileBasedSource.java:376)
at com.google.cloud.dataflow.sdk.io.Read$Bounded$1.evaluateReadHelper(Read.java:177)
kpr6 commented 7 years ago

This particular error is when I'm trying to create template for the first time and i can't find any unset value providers in my code as you can see in the code I've shared above.

tgroh commented 7 years ago

You're executing via the DirectPipelineRunner, but you need to use the TemplatingDataflowPipelineRunner (which is also commented out)

cobookman commented 7 years ago

I got this error when I set just gcpTempLocation and it was resolved when setting both gcpTempLocation and tempLocation. According to Beam docs gcpTempLocation should set tempLocation as well.

Should I just set tempLocation and not set gcpTempLocation?

yiu31802 commented 6 years ago

Same with @cobookman, I only needed to setTempLocation to remove the corresponding error.

my code: https://github.com/yiu31802/gcp-project/blob/5673ac91

xMTinkerer commented 6 years ago

Bringing this back to the original issue... I ran into the error

BigQueryIO.Read needs a GCS temp location to store temp files

and using the tempLocation instead of stagingLocation as a command line argument fixed it for me.