GoogleCloudPlatform / DataflowJavaSDK

Google Cloud Dataflow provides a simple, powerful model for building both batch and streaming parallel data processing pipelines.
http://cloud.google.com/dataflow
855 stars 324 forks source link

DirectPipelineRunner doesn't support StandardSql with BigQueryIO.READ #539

Closed torbjornvatn closed 7 years ago

torbjornvatn commented 7 years ago

We're writing all our BigQuery queries using the StandardSql option in the web console, but when I tried to execute a query from Dataflow running locally with this syntax enabled with usingStandardSql() I ran into this error:

SEVERE: Error when trying to dry run query SELECT * from `bigquery-public-data.samples.shakespeare` LIMIT 100.
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request
{
  "code" : 400,
  "errors" : [ {
    "domain" : "global",
    "location" : "`bigquery-public-data:samples.shakespeare`",
    "locationType" : "other",
    "message" : "Invalid table name: `bigquery-public-data:samples.shakespeare`",
    "reason" : "invalid"
  } ],
  "message" : "Invalid table name: `bigquery-public-data:samples.shakespeare`"
}

This is an example that triggers the error, and if I switch to DataflowPipelineRunner it works like a charm.

public class StandardSql {

    private static String gsLocation = "gs://hallois/";
    private static String project = "uc-prox-development";

    public static void main(String[] args) {

        DataflowPipelineOptions options = PipelineOptionsFactory.create()
                .as(DataflowPipelineOptions.class);

//        options.setRunner(DataflowPipelineRunner.class);
        options.setRunner(DirectPipelineRunner.class);

        options.setProject(project);

        options.setTempLocation(gsLocation + "jars");

        Pipeline p = Pipeline.create(options);

        p.apply(BigQueryIO.Read
                .fromQuery("SELECT * from `bigquery-public-data.samples.shakespeare` LIMIT 100")
                .usingStandardSql())
                .apply(MapElements.via((TableRow tr) -> (String) tr.get("word")).withOutputType(new TypeDescriptor<String>() {}))
                .apply(TextIO.Write.to(gsLocation + "shakespeare"));

        p.run();
    }
}
dhalperi commented 7 years ago

Thanks for the report, and sorry for the issue.

Can you please check whether this works using the InProcessPipelineRunner? If you are using an SDK newer than 1.6.0, you may want to switch. From the release notes for 1.6.0:

Added InProcessPipelineRunner, an improvement over the DirectPipelineRunner that better implements the Dataflow model. InProcessPipelineRunner runs on a user's local machine and supports multithreaded execution, unbounded PCollections, and triggers for speculative and late outputs.

In Dataflow 2.x, we've removed the old DirectPipelineRunner and replaced it with the InProcessPipelineRunner.

torbjornvatn commented 7 years ago

I've tried switching to InProcessPipelineRunner but got the exact same error I'm afraid.