public TableDestination apply(ValueInSingleWindow<TableRow> input) {
String partition = timestampExtractor.apply(input.getValue())
.toString(DateTimeFormat.forPattern("yyyyMMdd").withZoneUTC());
TableReference tableReference = new TableReference();
tableReference.setDatasetId(dataset);
tableReference.setProjectId(projectId);
tableReference.setTableId(String.format("%s_%s", table, partition));
log.debug("Will write to BigQuery table: %s", tableReference);
return new TableDestination(tableReference, null);
}
When the dataflow tries to write to this table, I see the following message:
"errors" : [ {
"domain" : "global",
"message" : "Cannot read partition information from a table that is not partitioned: <project_id>:<dataset>.<table>$19730522",
"reason" : "invalid"
} ]
So, it looks like it's not creating tables with partition in the first place?
Following is the code that writes to BigQuery:
Here's the destination's implementation:
When the dataflow tries to write to this table, I see the following message:
So, it looks like it's not creating tables with partition in the first place?
Apache beam version : 2.2.0