GoogleCloudDataproc / flink-bigquery-connector

BigQuery integration to Apache Flink's Table API
Apache License 2.0
15 stars 11 forks source link

question: BigQuery's datetime type is not supported #162

Closed caicancai closed 1 week ago

caicancai commented 1 week ago

0001-01-01 08:05:43.000000

code

    public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        BigQueryTableConfig tableConfig = BigQueryReadTableConfig.newBuilder()
                                ...
                .build();

        tEnv.createTable(
                "bigQuerySourceTable",
                BigQueryTableSchemaProvider.getTableDescriptor(tableConfig));
        Table sourceTable = tEnv.from("bigQuerySourceTable");
        sourceTable = sourceTable.select($("datetime"));
        sourceTable.execute().print();
        BigQueryTableConfig sinkTableConfig = BigQuerySinkTableConfig.newBuilder()
                                ...
                .build();

        tEnv.createTable(
                "bigQuerySinkTable",
                BigQueryTableSchemaProvider.getTableDescriptor(sinkTableConfig));

        TableResult res = sourceTable.executeInsert("bigQuerySinkTable");
        res.await();
    }
Caused by: com.google.cloud.flink.bigquery.sink.exceptions.BigQueryConnectorException: com.google.cloud.flink.bigquery.sink.exceptions.BigQueryConnectorException: Error while writing to BigQuery
    at com.google.cloud.flink.bigquery.sink.writer.BaseWriter.logAndThrowFatalException(BaseWriter.java:248)
    at com.google.cloud.flink.bigquery.sink.writer.BigQueryDefaultWriter.validateAppendResponse(BigQueryDefaultWriter.java:110)
    at com.google.cloud.flink.bigquery.sink.writer.BaseWriter.validateAppendResponses(BaseWriter.java:239)
    at com.google.cloud.flink.bigquery.sink.writer.BaseWriter.flush(BaseWriter.java:118)
    at com.google.cloud.flink.bigquery.sink.writer.BigQueryDefaultWriter.flush(BigQueryDefaultWriter.java:49)
    at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.endInput(SinkWriterOperator.java:182)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:96)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$finish$0(StreamOperatorWrapper.java:149)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:149)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:156)
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.finishOperators(RegularOperatorChain.java:115)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:606)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:565)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.util.concurrent.ExecutionException: java.util.concurrent.ExecutionException: com.google.cloud.bigquery.storage.v1.Exceptions$AppendSerializationError: INVALID_ARGUMENT: Errors found while processing rows. Please refer to the row_errors field for details. The list may not be complete because of the size limitations. Entity: projects/aftership-dev/datasets/temporary/tables/flink_bigquery_test_date/streams/_default
    at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:592)
    at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:571)
    at com.google.common.util.concurrent.FluentFuture$TrustedFuture.get(FluentFuture.java:91)
    at com.google.common.util.concurrent.ForwardingFuture.get(ForwardingFuture.java:67)
    at com.google.cloud.flink.bigquery.sink.writer.BigQueryDefaultWriter.validateAppendResponse(BigQueryDefaultWriter.java:103)
    ... 20 more
Caused by: com.google.cloud.bigquery.storage.v1.Exceptions$AppendSerializationError: com.google.cloud.bigquery.storage.v1.Exceptions$AppendSerializationError: INVALID_ARGUMENT: Errors found while processing rows. Please refer to the row_errors field for details. The list may not be complete because of the size limitations. Entity: projects/aftership-dev/datasets/temporary/tables/flink_bigquery_test_date/streams/_default
    at com.google.cloud.bigquery.storage.v1.ConnectionWorker.lambda$requestCallback$1(ConnectionWorker.java:1123)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
caicancai commented 1 week ago

@prashastia Could you help me answer this question if you have time?

prashastia commented 1 week ago

Could you also share the schema for the source and sink table.

caicancai commented 1 week ago

Could you also share the schema for the source and sink table.

source schema: [create_time: TIMESTAMP_LTZ(6)]
Sink schema:    [create_time: TIMESTAMP_LTZ(6)]
prashastia commented 1 week ago

What about the BigQuery Table Schema for source and sink tables.

caicancai commented 1 week ago

What about the BigQuery Table Schema for source and sink tables.

I don’t understand what you mean. My source had many field types at the beginning, but later I determined that writing would fail when the field type is datetime. I even created a test source table and sink test table for this purpose. To verify my conjecture, the result is the above exception.

Source table and sink table all are only one field, which is of datetime type

it even only has three pieces of data

p.s I just started using bigquery, please correct me if there is anything wrong

caicancai commented 1 week ago

alright, The connector would write to a existing BQ table What is the schema of the Destination BigQuery table? and where are you reading the records from? What is the schema of that source. In other words, what is the schema of flink_bigquery_test_date BQ Table.

source: bigquery sink : bigquery

I will test it based on source: pubsub, sink: bigquery later

prashastia commented 1 week ago

alright, The connector would write to a existing BQ table What is the schema of the Destination BigQuery table? and where are you reading the records from? What is the schema of that source. In other words, what is the schema of flink_bigquery_test_date BQ Table.

source: bigquery sink : bigquery

I will test it based on source: pubsub, sink: bigquery later

what about their respective schemas? for the source bigquery table and the sink bigquery table

caicancai commented 1 week ago

what about their respective schemas? for the source bigquery table and the sink bigquery table

image

The schema types of sink and source should be consistent Yep, and that is all the fields in the table right? no other columns exist?

prashastia commented 1 week ago

Yep, and that is all the fields in the table right? no other columns exist?

caicancai commented 1 week ago

Yep, and that is all the fields in the table right? no other columns exist?

yes I synchronized all other fields successfully, but only this field failed to synchronize.

prashastia commented 1 week ago
sourceTable = sourceTable.select($("datetime"));

This is created_at in your code right?

caicancai commented 1 week ago
sourceTable = sourceTable.select($("datetime"));

This is created_at in your code right?

yep

caicancai commented 1 week ago
sourceTable = sourceTable.select($("datetime"));

This is created_at in your code right?

yep

prashastia commented 1 week ago
sourceTable = sourceTable.select($("datetime"));

This is created_at in your code right?

yep

and still facing this error? after changing the field name?

caicancai commented 1 week ago

yep If you don't reproduce it, I'll suspect a problem with my environment and I'll close this question

caicancai commented 1 week ago

There is nothing else at the moment. I think you can delete the information related to my company. Actually, I don’t want to disclose it on GitHub. Sorry.