GoogleCloudPlatform / DataflowJavaSDK

Google Cloud Dataflow provides a simple, powerful model for building both batch and streaming parallel data processing pipelines.
http://cloud.google.com/dataflow
855 stars 324 forks source link

Conversion of PCollection<TableSchema> to TableSchema object #580

Closed kpr6 closed 6 years ago

kpr6 commented 7 years ago

So, I'm trying to read the schema file into an PCollection using TextIO from bucket and make a TableSchema object out of it to feed it to BigQueryIO.Write.withSchema(). Here's bit of code `final TupleTag\ tableSchemaTag = new TupleTag\(){}; final TupleTag\ schemaStartTag = new TupleTag\(){};

PCollectionTuple schemaFields = p.apply(TextIO.Read.named("Reading file schema").from(options.getSchema()).withoutValidation())
                                 .apply(ParDo.named("Generating Table schema").withOutputTags(tableSchemaTag,TupleTagList.of(schemaStartTag)).of(new DoFn<String,TableSchema>(){
                                      @Override
                                      public void processElement(ProcessContext c){
                                        List<TableFieldSchema> fields = new ArrayList<TableFieldSchema>();
                                        for(String schemaWord:c.element().split(",")){
                                              String schemaKV[] = schemaWord.split(":");
                                              fields.add(new TableFieldSchema().setName(schemaKV[0]).setType(schemaKV[1]));
                                        }
                                        TableSchema schema = new TableSchema().setFields(fields);
                                        c.output(schema);
                                        String firstColumn[] = c.element().split(":",2);
                                        c.sideOutput(schemaStartTag,firstColumn[0]);
                                      }
                                    }));

PCollection\ tableSchema = schemaFields.get(tableSchemaTag);`

Now, when I pass this 'tableSchema' to .withSchema(), it gives out this error and i know this is coz it accepts only TableSchema object. Is there any way around this? incompatible types: com.google.cloud.dataflow.sdk.values.PCollection<com.google.api.services.bigquery.model.TableSchema> cannot be converted to com.google.api.services.bigquery.model.TableSchema

aaltay commented 7 years ago

cc: @reuvenlax