databricks / iceberg-kafka-connect

Apache License 2.0
220 stars 49 forks source link

DebeziumTransform fails on mongodb debezium records. #217

Open MikhailGolubtsov94 opened 8 months ago

MikhailGolubtsov94 commented 8 months ago

The type of "after"/"before" field of the MongoDB source connector produced records is "string" (Debezium documentation). However, in the source code of io.tabular.iceberg.connect.transforms.DebeziumTransform it is assumed that type of "after"/"before" field is "struct" (source code). It causes a corresponding error for mongo cdc debezium records:

Caused by: org.apache.kafka.connect.errors.DataException: Field 'before' is not of type STRUCT
at org.apache.kafka.connect.data.Struct.getCheckType(Struct.java:263)
at org.apache.kafka.connect.data.Struct.getStruct(Struct.java:191)
at io.tabular.iceberg.connect.transforms.DebeziumTransform.applyWithSchema(DebeziumTransform.java:83)
at io.tabular.iceberg.connect.transforms.DebeziumTransform.apply(DebeziumTransform.java:71)
at org.apache.kafka.connect.runtime.TransformationStage.apply(TransformationStage.java:57)
at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:54)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
... 13 more

Possible solution: Use another record format for mongodb records, i.e. keep metadata and "after" field on the same hierarchy level.

{
  // actual record
  "after": "{...}",
  // cdc metadata
  "source" : "...",
  "offset": "...",
  ...
}
tabmatfournier commented 8 months ago

See: https://github.com/tabular-io/iceberg-kafka-connect/pull/204

Either run it as is (will give you op, before, after) unlike the Debezium supplied mongo transform. You can write that directly or plug it into the DebeziumTransform in sequence.