confluentinc / kafka-connect-bigquery

A Kafka Connect BigQuery sink connector
Apache License 2.0
1 stars 1 forks source link

Bigquery kafka sink connect not accepting boolean values after SMT processing #176

Closed saumyasuhagiya closed 2 years ago

saumyasuhagiya commented 2 years ago

Schemaless Bigquery kafka sink connector SMT not able to save data to bigquery on boolean.

MapsUtil.debugPrint on recordValue before returning from apply(R record).

active = true java.lang.String Schema definition

{
    "mode": "NULLABLE",
    "name": "active",
    "type": "BOOLEAN"
  }

Deserialiser

public class BooleanDeserialiser extends JsonDeserializer<Boolean> {

@Override
public Boolean deserialize(JsonParser parser, DeserializationContext context)
        throws IOException {
    return !"0".equals(parser.getText());
}

Serialiser

public class BooleanSerialiser extends JsonSerializer<Boolean> {

@Override
public void serialize(Boolean value, JsonGenerator gen, SerializerProvider serializers)
        throws IOException {
    gen.writeString(value ? "true" : "false");
}

Error

[row index 76]: invalid: Cannot convert string value to boolean: 1
at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.maybeThrowEncounteredErrors(KCBQThreadPoolExecutor.java:108)
at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.put(BigQuerySinkTask.java:233)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
C0urante commented 2 years ago

The connector does very little for schemaless data before passing it to the BigQuery client API. It's difficult to tell from this description but it seems like the issue is that the upstream active field is a string but the corresponding downstream column is a boolean.

It's not impossible to implement some more sophisticated logic that first retrieves the schema of the table and then examines the content of each incoming sink record to make sure that it adheres to that schema (and, if it doesn't, tries to modify those contents so that they do), but this would likely incur a performance penalty and be fairly complicated to implement for relatively little benefit.

This should instead be addressed with upstream changes. Some options include:

@saumyasuhagiya If I've misunderstood the issue here let me know, otherwise I'll close as won't fix.

saumyasuhagiya commented 2 years ago

Thanks for the details. This might be a straightforward thing to do. However, is there any chance we can improve the logging to provide the exact field name in error? @C0urante This might be helpful for debugging issues faster as well.

I should take schema and compare it, however when I used same payload via command bq insert dev.table_name data.json and it worked fine. It had active as string "true".

C0urante commented 2 years ago

However, is there any chance we can improve the logging to provide the exact field name in error?

I don't think so; we already log the exact error messages that BigQuery gives us in the response from our call to the insert all API. You might consider reaching out to the BigQuery team and seeing if they would consider adding this information to the error messages they send out in the insert all API.

I used same payload via command bq insert dev.table_name data.json and it worked fine. It had active as string "true".

The bq insert docs explain why:

Data types are converted to match the column types of the destination table. This command is intended for testing purposes only. To stream data into BigQuery, use the insertAll API method.

Going to close this as won't fix.