Closed cgokce closed 3 years ago
Hi @cgokce--it looks like the problem here is that the keys the connector is seeing aren't instances of Struct
or Map
(it's likely that the values are fine, but hard to say for sure without a stack trace). Giving the Datagen connector a quick look, it seems like the userid
field in the users
quickstart is a String
, so the HoistField
SMT may be warranted here to turn the key from a String
into a Map
(if schemas are disabled) or a Struct
(if schemas are enabled). You can find docs on the SMTs provided by Kafka Connect out of the box here: https://kafka.apache.org/27/documentation.html#connect_transforms
Hope this helps! If you run into more failures, feel free to post here and, if possible, include a full stack trace and version of the connector as that makes it much easier to debug things.
@C0urante, thanks for providing detailed response and pointing the exact source of the the error. My problem is actually much simpler and since the given detail is beyond my understanding, I realized I should rather ask you about my current use case.
Problem I want to build an simple pipeline that applies the both update and insert operations (upsert) to the BigQuery, (just to have a working example of the sink). I currently do not have any data source for testing, so I opted to use Confluent's DataGen connector. Data format and serialization is not important to me as I only want to get a simple working example.
If you can share working example config, I'd be glad to use it, and my problem will be solved. Otherwise, I'll provide my configs below with much detail so you can investigate my error.
I have managed to run an insert pipeline successfully on v1.10, but when I use v2.00 with upsert enabled its not working.
Environment Details
Environment: Confluent ee docker-compose all-in-one
Installation: confluent-hub install --no-prompt wepay/kafka-connect-bigquery:version
Datagen Connector
ConfigFile { "name": "users-datagen", "config": { "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", "tasks.max": "1", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "kafka.topic": "users", "max.interval": "10000", "quickstart": "users", "value.converter.schema.registry.url": "http://schema-registry:8081" }
ValueSchema (Format: AVRO) { "connect.name": "ksql.users", "fields": [ { "name": "registertime", "type": "long" }, { "name": "userid", "type": "string" }, { "name": "regionid", "type": "string" }, { "name": "gender", "type": "string" } ], "name": "users", "namespace": "ksql", "type": "record" }
KeySchema: Not set.
KCBQ Version 1.10: I have managed insert operation and successfully exported the data to the BigQuery dataset.
Configfile { "name": "bq-sink-users", "config": { "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector", "tasks.max": "2", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "topics": "users", "project": "-----", "datasets": ".*=-----", "schemaRetriever": "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever", "keyfile": "-----", "autoCreateTables": "true", "schemaRegistryLocation": "http://schema-registry:8081" } }
KCBQ Version 2.00: I have enabled upsert and achieved this error. One major change is the new schema retriever since other one is not available. ( "schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever")
Configfile { "value.converter.schema.registry.url": "http://schema-registry:8081", "name": "kcbqv22", "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "topics": [ "users" ], "errors.deadletterqueue.topic.name": "users_err", "project": -----", "defaultDataset": "-----", "schemaRetriever": "com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever", "keyfile": "****", "keySource": "FILE", "kafkaKeyFieldName": "userid", "kafkaDataFieldName": "", "autoCreateTables": "true", "upsertEnabled": "true", "deleteEnabled": "false", "intermediateTableSuffix": "_intermediate" }
Full Stack Trace
[2021-02-04 22:09:23,253] INFO ProducerConfig values: acks = -1 batch.size = 16384 bootstrap.servers = [broker:29092] buffer.memory = 33554432 client.dns.lookup = use_all_dns_ips client.id = confluent.monitoring.interceptor.connector-consumer-kcbqv22-0 compression.type = lz4 connections.max.idle.ms = 540000 delivery.timeout.ms = 120000 enable.idempotence = false interceptor.classes = [] internal.auto.downgrade.txn.commit = false key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer linger.ms = 500 max.block.ms = 60000 max.in.flight.requests.per.connection = 1 max.request.size = 10485760 metadata.max.age.ms = 300000 metadata.max.idle.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 10 retry.backoff.ms = 500 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer (org.apache.kafka.clients.producer.ProducerConfig) [2021-02-04 22:09:23,255] INFO Kafka version: 6.0.1-ce (org.apache.kafka.common.utils.AppInfoParser) [2021-02-04 22:09:23,255] INFO Kafka commitId: f75f566c7a4b38d8 (org.apache.kafka.common.utils.AppInfoParser) [2021-02-04 22:09:23,255] INFO Kafka startTimeMs: 1612476563255 (org.apache.kafka.common.utils.AppInfoParser) [2021-02-04 22:09:23,255] INFO interceptor=confluent.monitoring.interceptor.connector-consumer-kcbqv22-0 created for client_id=connector-consumer-kcbqv22-0 client_type=CONSUMER session= cluster=1cjVmIGNRPaorRUQ5xxlTA group=connect-kcbqv22 (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor) [2021-02-04 22:09:23,257] INFO [Producer clientId=confluent.monitoring.interceptor.connector-consumer-kcbqv22-0] Cluster ID: 1cjVmIGNRPaorRUQ5xxlTA (org.apache.kafka.clients.Metadata) [2021-02-04 22:09:23,344] ERROR WorkerSinkTask{id=kcbqv22-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Top-level Kafka Connect schema must be of type 'struct' (org.apache.kafka.connect.runtime.WorkerSinkTask) com.wepay.kafka.connect.bigquery.exception.ConversionConnectException: Top-level Kafka Connect schema must be of type 'struct' at com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter.convertRecord(BigQueryRecordConverter.java:88) at com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter.convertRecord(BigQueryRecordConverter.java:49) at com.wepay.kafka.connect.bigquery.utils.SinkRecordConverter.getUpsertDeleteRow(SinkRecordConverter.java:104) at com.wepay.kafka.connect.bigquery.utils.SinkRecordConverter.getRecordRow(SinkRecordConverter.java:73) at com.wepay.kafka.connect.bigquery.write.batch.TableWriter$Builder.addRow(TableWriter.java:195) at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.put(BigQuerySinkTask.java:253) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:563) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) [2021-02-04 22:09:23,344] ERROR WorkerSinkTask{id=kcbqv22-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:591) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: com.wepay.kafka.connect.bigquery.exception.ConversionConnectException: Top-level Kafka Connect schema must be of type 'struct' at com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter.convertRecord(BigQueryRecordConverter.java:88) at com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter.convertRecord(BigQueryRecordConverter.java:49) at com.wepay.kafka.connect.bigquery.utils.SinkRecordConverter.getUpsertDeleteRow(SinkRecordConverter.java:104) at com.wepay.kafka.connect.bigquery.utils.SinkRecordConverter.getRecordRow(SinkRecordConverter.java:73) at com.wepay.kafka.connect.bigquery.write.batch.TableWriter$Builder.addRow(TableWriter.java:195) at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.put(BigQuerySinkTask.java:253) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:563) ... 10 more [2021-02-04 22:09:23,344] ERROR WorkerSinkTask{id=kcbqv22-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [2021-02-04 22:09:24,324] INFO [Consumer clientId=connector-consumer-kcbqv22-0, groupId=connect-kcbqv22] Revoke previously assigned partitions users-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2021-02-04 22:09:24,324] INFO [Consumer clientId=connector-consumer-kcbqv22-0, groupId=connect-kcbqv22] Member connector-consumer-kcbqv22-0-bc5025b6-550e-4139-a2cb-8105e9f0d099 sending LeaveGroup request to coordinator broker:29092 (id: 2147483646 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2021-02-04 22:09:24,330] INFO Publish thread interrupted for client_id=connector-consumer-kcbqv22-0 client_type=CONSUMER session= cluster=1cjVmIGNRPaorRUQ5xxlTA group=connect-kcbqv22 (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor) [2021-02-04 22:09:24,337] INFO Publishing Monitoring Metrics stopped for client_id=connector-consumer-kcbqv22-0 client_type=CONSUMER session= cluster=1cjVmIGNRPaorRUQ5xxlTA group=connect-kcbqv22 (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor) [2021-02-04 22:09:24,337] INFO [Producer clientId=confluent.monitoring.interceptor.connector-consumer-kcbqv22-0] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer) [2021-02-04 22:09:24,340] INFO Closed monitoring interceptor for client_id=connector-consumer-kcbqv22-0 client_type=CONSUMER session= cluster=1cjVmIGNRPaorRUQ5xxlTA group=connect-kcbqv22 (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor) [2021-02-04 22:09:24,341] INFO [Producer clientId=connector-dlq-producer-kcbqv22-0] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer)
Adressed in issue #89 . Thank you, I'm closing this one.
@cgokce sorry for the delay in response. Were you able to get the connector working in your testing environment with upsert/delete enabled? If not, I'm happy to reopen this issue and lend a hand.
Is-it possible to reopen the issue ? I have the same error @cgokce.
Infra
Connectors installation:
confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.4.0
confluent-hub install --no-prompt wepay/kafka-connect-bigquery:2.1.0
Connector Source:
{
"name": "users-datagen",
"config": {
"connector.class": "DatagenConnector",
"tasks.max": "1",
"kafka.topic": "users",
"max.interval": "10000",
"quickstart": "users",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
Connector Sink:
{
"name": "bq-sink-users",
"config": {
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"topics": "users",
"project": "test",
"defaultDataset": "test",
"kafkaKeyFieldName": "userid",
"kafkaDataFieldName": "",
"upsertEnabled": "true",
"deleteEnabled": "false",
"keyfile": "/etc/config/secret/serviceaccount.json",
"autoCreateTables": "true",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
Schema from schema-registry users-value
:
{
"type": "record",
"name": "users",
"namespace": "ksql",
"fields": [
{
"name": "registertime",
"type": "long"
},
{
"name": "userid",
"type": "string"
},
{
"name": "regionid",
"type": "string"
},
{
"name": "gender",
"type": "string"
}
],
"connect.name": "ksql.users"
}
The error:
[2021-04-02 19:15:50,048] ERROR WorkerSinkTask{id=bq-sink-users-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: com.wepay.kafka.connect.bigquery.exception.ConversionConnectException: Top-level Kafka Connect schema must be of type 'struct'
at com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter.convertRecord(BigQueryRecordConverter.java:88)
at com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter.convertRecord(BigQueryRecordConverter.java:49)
at com.wepay.kafka.connect.bigquery.utils.SinkRecordConverter.getUpsertDeleteRow(SinkRecordConverter.java:104)
at com.wepay.kafka.connect.bigquery.utils.SinkRecordConverter.getRecordRow(SinkRecordConverter.java:73)
at com.wepay.kafka.connect.bigquery.write.batch.TableWriter$Builder.addRow(TableWriter.java:195)
at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.put(BigQuerySinkTask.java:269)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
... 10 more
Is it possible to explain or document, why is it necessary to have a STRUCT schema? What is a STRUCT schema? Why my simple AVRO schema is not working? Is it possible to provide a real world example with datagen?
Thanks for taking time to help me
[2021-04-02 19:15:50,048] ERROR WorkerSinkTask{id=bq-sink-users-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: com.wepay.kafka.connect.bigquery.exception.ConversionConnectException: Top-level Kafka Connect schema must be of type 'struct' at com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter.convertRecord(BigQueryRecordConverter.java:88) at com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter.convertRecord(BigQueryRecordConverter.java:49) at com.wepay.kafka.connect.bigquery.utils.SinkRecordConverter.getUpsertDeleteRow(SinkRecordConverter.java:104) at com.wepay.kafka.connect.bigquery.utils.SinkRecordConverter.getRecordRow(SinkRecordConverter.java:73) at com.wepay.kafka.connect.bigquery.write.batch.TableWriter$Builder.addRow(TableWriter.java:195) at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.put(BigQuerySinkTask.java:269) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586) ... 10 more
Is it possible to explain or document, why is it necessary to have a STRUCT schema? What is a STRUCT schema? Why my simple AVRO schema is not working? Is it possible to provide a real world example with datagen?
Thanks for taking time to help me
Sry I ping the wrong guy: @C0urante
Do I need to open a new issue ?
Even if i use HoistField
, I really don't understand how the upsert can work?
Does it only works in json with jsonconverter
?
{
"name": "bq-sink-users",
"config": {
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"topics": "users",
"project": "test",
"defaultDataset": "test",
"kafkaKeyFieldName": "userid",
"upsertEnabled": "true",
"deleteEnabled": "false",
"keyfile": "/etc/config/secret/serviceaccount.json",
"autoCreateTables": "true",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"transforms": "HoistField",
"transforms.HoistField.type": "org.apache.kafka.connect.transforms.HoistField$Value",
"transforms.HoistField.field": "userid"
}
}
@awattez With upsert/delete enabled, a few things have to happen:
Struct
typesI finally succeeded to make it works :)
Thanks @C0urante
I would write a doc on how to perform the upsert delete with datagen;)
Glad to hear it @awattez!
Hi @awattez ,
Could you please share the working config for upsert operation ? I experienced the same error. Thank you.
Hey @C0urante , I receive also the same error. Could you please assist?
[2021-08-19 10:36:36,051] INFO Attempting to create intermediate table `bla_mysql`.`blaMysql_bla_fWith_tmp_0_1f82d90c_e526_463f_bcf3_685b6ac4cee2_1629369393666` with schema Schema{fields=[Field{name=value, type=RECORD, mode=NULLABLE, description=null, policyTags=null, maxLength=null, scale=null, precision=null}, Field{name=key, type=RECORD, mode=REQUIRED, description=null, policyTags=null, maxLength=null, scale=null, precision=null}, Field{name=i, type=INTEGER, mode=REQUIRED, description=null, policyTags=null, maxLength=null, scale=null, precision=null}, Field{name=partitionTime, type=TIMESTAMP, mode=NULLABLE, description=null, policyTags=null, maxLength=null, scale=null, precision=null}, Field{name=batchNumber, type=INTEGER, mode=REQUIRED, description=null, policyTags=null, maxLength=null, scale=null, precision=null}]} (com.wepay.kafka.connect.bigquery.SchemaManager)
[2021-08-19 10:36:36,626] INFO Attempting to create intermediate table `bla_mysql`.`blaMysql_bla_manualWith_tmp_0_1f82d90c_e526_463f_bcf3_685b6ac4cee2_1629369393666` with schema Schema{fields=[Field{name=value, type=RECORD, mode=NULLABLE, description=null, policyTags=null, maxLength=null, scale=null, precision=null}, Field{name=key, type=RECORD, mode=REQUIRED, description=null, policyTags=null, maxLength=null, scale=null, precision=null}, Field{name=i, type=INTEGER, mode=REQUIRED, description=null, policyTags=null, maxLength=null, scale=null, precision=null}, Field{name=partitionTime, type=TIMESTAMP, mode=NULLABLE, description=null, policyTags=null, maxLength=null, scale=null, precision=null}, Field{name=batchNumber, type=INTEGER, mode=REQUIRED, description=null, policyTags=null, maxLength=null, scale=null, precision=null}]} (com.wepay.kafka.connect.bigquery.SchemaManager)
[2021-08-19 10:36:36,714] INFO Attempting to create intermediate table `bla_mysql`.`blaMysql_bla_fDep_tmp_0_1f82d90c_e526_463f_bcf3_685b6ac4cee2_1629369393666` with schema Schema{fields=[Field{name=value, type=RECORD, mode=NULLABLE, description=null, policyTags=null, maxLength=null, scale=null, precision=null}, Field{name=key, type=RECORD, mode=REQUIRED, description=null, policyTags=null, maxLength=null, scale=null, precision=null}, Field{name=i, type=INTEGER, mode=REQUIRED, description=null, policyTags=null, maxLength=null, scale=null, precision=null}, Field{name=partitionTime, type=TIMESTAMP, mode=NULLABLE, description=null, policyTags=null, maxLength=null, scale=null, precision=null}, Field{name=batchNumber, type=INTEGER, mode=REQUIRED, description=null, policyTags=null, maxLength=null, scale=null, precision=null}]} (com.wepay.kafka.connect.bigquery.SchemaManager)
[2021-08-19 10:36:36,726] INFO Attempting to create intermediate table `bla_mysql`.`blaMysql_bla_l_tmp_0_1f82d90c_e526_463f_bcf3_685b6ac4cee2_1629369393666` with schema Schema{fields=[Field{name=value, type=RECORD, mode=NULLABLE, description=null, policyTags=null, maxLength=null, scale=null, precision=null}, Field{name=key, type=RECORD, mode=REQUIRED, description=null, policyTags=null, maxLength=null, scale=null, precision=null}, Field{name=i, type=INTEGER, mode=REQUIRED, description=null, policyTags=null, maxLength=null, scale=null, precision=null}, Field{name=partitionTime, type=TIMESTAMP, mode=NULLABLE, description=null, policyTags=null, maxLength=null, scale=null, precision=null}, Field{name=batchNumber, type=INTEGER, mode=REQUIRED, description=null, policyTags=null, maxLength=null, scale=null, precision=null}]} (com.wepay.kafka.connect.bigquery.SchemaManager)
[2021-08-19 10:36:36,743] INFO Attempting to create intermediate table `bla_mysql`.`blaMysql_bla_settings_tmp_0_1f82d90c_e526_463f_bcf3_685b6ac4cee2_1629369393666` with schema Schema{fields=[Field{name=value, type=RECORD, mode=NULLABLE, description=null, policyTags=null, maxLength=null, scale=null, precision=null}, Field{name=key, type=RECORD, mode=REQUIRED, description=null, policyTags=null, maxLength=null, scale=null, precision=null}, Field{name=i, type=INTEGER, mode=REQUIRED, description=null, policyTags=null, maxLength=null, scale=null, precision=null}, Field{name=partitionTime, type=TIMESTAMP, mode=NULLABLE, description=null, policyTags=null, maxLength=null, scale=null, precision=null}, Field{name=batchNumber, type=INTEGER, mode=REQUIRED, description=null, policyTags=null, maxLength=null, scale=null, precision=null}]} (com.wepay.kafka.connect.bigquery.SchemaManager)
[2021-08-19 10:36:37,121] INFO Attempting to create intermediate table `bla_mysql`.`blaMysql_bla_trans2_tmp_0_1f82d90c_e526_463f_bcf3_685b6ac4cee2_1629369393666` with schema Schema{fields=[Field{name=value, type=RECORD, mode=NULLABLE, description=null, policyTags=null, maxLength=null, scale=null, precision=null}, Field{name=key, type=RECORD, mode=REQUIRED, description=null, policyTags=null, maxLength=null, scale=null, precision=null}, Field{name=i, type=INTEGER, mode=REQUIRED, description=null, policyTags=null, maxLength=null, scale=null, precision=null}, Field{name=partitionTime, type=TIMESTAMP, mode=NULLABLE, description=null, policyTags=null, maxLength=null, scale=null, precision=null}, Field{name=batchNumber, type=INTEGER, mode=REQUIRED, description=null, policyTags=null, maxLength=null, scale=null, precision=null}]} (com.wepay.kafka.connect.bigquery.SchemaManager)
[2021-08-19 10:36:37,515] INFO Attempting to create table `bla_mysql`.`blaMysql_bla_transactions2` with schema Schema{fields=[Field{name=id, type=INTEGER, mode=REQUIRED, description=null, policyTags=null, maxLength=null, scale=null, precision=null}, Field{name=guid, type=STRING, mode=REQUIRED, description=null, policyTags=null, maxLength=null, scale=null, precision=null}, Field{name=timestamp, type=TIMESTAMP, mode=REQUIRED, description=null, policyTags=null, maxLength=null, scale=null, precision=null}, Field{name=type, type=STRING, mode=REQUIRED, description=null, policyTags=null, maxLength=null, scale=null, precision=null}, Field{name=currency, type=STRING, mode=REQUIRED, description=null, policyTags=null, maxLength=null, scale=null, precision=null}, Field{name=amount, type=BIGNUMERIC, mode=REQUIRED, description=null, policyTags=null, maxLength=null, scale=null, precision=null}, Field{name=referenceId, type=INTEGER, mode=REQUIRED, description=null, policyTags=null, maxLength=null, scale=null, precision=null}, Field{name=data, type=STRING, mode=REQUIRED, description=null, policyTags=null, maxLength=null, scale=null, precision=null}, Field{name=completed, type=INTEGER, mode=REQUIRED, description=null, policyTags=null, maxLength=null, scale=null, precision=null}, Field{name=__timestamp, type=INTEGER, mode=NULLABLE, description=null, policyTags=null, maxLength=null, scale=null, precision=null}, Field{name=__deleted, type=STRING, mode=NULLABLE, description=null, policyTags=null, maxLength=null, scale=null, precision=null}, Field{name=__kafkaKey, type=RECORD, mode=NULLABLE, description=null, policyTags=null, maxLength=null, scale=null, precision=null}]} (com.wepay.kafka.connect.bigquery.SchemaManager)
[2021-08-19 10:36:41,477] INFO Exported 225661 of 648685 records for table 'bla.userId' after 13:49:39.145 (io.debezium.relational.RelationalSnapshotChangeEventSource)
[2021-08-19 10:36:46,187] ERROR WorkerSinkTask{id=BigQueryBlaMysql-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Only Map objects supported in absence of schema for record conversion to BigQuery format. (org.apache.kafka.connect.runtime.WorkerSinkTask)
com.wepay.kafka.connect.bigquery.exception.ConversionConnectException: Only Map objects supported in absence of schema for record conversion to BigQuery format.
at com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter.convertRecord(BigQueryRecordConverter.java:84)
at com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter.convertRecord(BigQueryRecordConverter.java:49)
at com.wepay.kafka.connect.bigquery.utils.SinkRecordConverter.getUpsertDeleteRow(SinkRecordConverter.java:104)
at com.wepay.kafka.connect.bigquery.utils.SinkRecordConverter.getRecordRow(SinkRecordConverter.java:73)
at com.wepay.kafka.connect.bigquery.write.batch.TableWriter$Builder.addRow(TableWriter.java:195)
at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.put(BigQuerySinkTask.java:269)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
[2021-08-19 10:36:46,189] ERROR WorkerSinkTask{id=BigQueryBlaMysql-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
Finally, bq upsert operation is running.
@addiebaratha how did you get it to work?
@guyromb You need to set this (100 in my case) mergeRecordsThreshold
One more thing, I used struct as key
Hi, i made a complete working sample with local stack. https://github.com/awattez/kafka-connect-bigquery-exploration/tree/master/full-stack-local
@C0urante, I am writing an example with confluent cloud cluster, how can I participate to the documentation.
Maybe here: https://docs.confluent.io/kafka-connect-bigquery/current/ or here: https://docs.confluent.io/cloud/current/connectors/cc-gcp-bigquery-sink.html
Hi,
I tried to run a sample insert/update pipeline using the DataGen connector data and kcbqv2. Sink connector only allows the JSON data, or else throwing
Top-level Kafka Connect schema must be of type 'struct'
error.When I use the JSON data, it expects a map object as
Error: Only Map objects supported in absence of schema for record conversion to BigQuery format.
. What kind of map object I need to provide? Is there any way to use the Avro format for easier mapping? If so, can you provide an example config file? Thank you.I provide my config files for both DataGen and kcbqv2 connectors. (running on local docker env)
DataGen Config:
Bigquery Sink Config: