getindata / kafka-connect-iceberg-sink

Apache License 2.0
76 stars 27 forks source link

DELETE cmd in POSTGRES results in a crashed Iceberg connector #39

Closed RajotyaGautam closed 1 year ago

RajotyaGautam commented 1 year ago

I am running Postgres, Kafka, Kafka-connect setup. Have installed Postgres connector and Iceberg connector as well. Running cmd Insert, Update run fine resulting in iceberg files exported to S3. However, Delete cmd results in an error in Iceberg consumer image Let me know what needs to be done.

RajotyaGautam commented 1 year ago

connectore config used: { "name": "iceberg-sink", "config": { "connector.class": "com.getindata.kafka.connect.iceberg.sink.IcebergSink", "topics": "postgres.public.dbz_test", "upsert": true, "upsert.keep-deletes": true, "table.auto-create": true, "table.write-format": "parquet", "table.namespace": "gid_streaminglabs_us_east_1dbz", "table.prefix": "debeziumcdc", "iceberg.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog", "iceberg.warehouse": "s3a://gid-streaminglabs-us-east-1/dbz_iceberg/gl_test", "iceberg.fs.defaultFS": "s3a://gid-streaminglabs-us-east-1/dbz_iceberg/gl_test", "iceberg.com.amazonaws.services.s3.enableV4": true, "iceberg.com.amazonaws.services.s3a.enableV4": true, "iceberg.fs.s3a.aws.credentials.provider": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain", "iceberg.fs.s3a.path.style.access": true, "iceberg.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", "errors.log.enable": true } }

clazalde commented 1 year ago

@RajotyaGautam I think that you are getting a null message in the deletes, which is your config in the postgres connector? are u using debezium?

RajotyaGautam commented 1 year ago

Yes, I am using debezium and a postgres connector. This is how the compose file looks like : version: '3' services: zookeeper: image: confluentinc/cp-zookeeper:7.3.2 container_name: zookeeper environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000

broker: image: confluentinc/cp-kafka:7.3.2 container_name: broker ports:

To learn about configuring Kafka for access across networks see

# https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
  - "9092:9092"
depends_on:
  - zookeeper
environment:
  KAFKA_BROKER_ID: 1
  KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
  KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
  KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
  KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
  KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
  KAFKA_AUTO_CREATE_TOPICS_ENABLE: true

postgres: image: postgres ports:

RajotyaGautam commented 1 year ago

Actually I was following article https://getindata.com/blog/real-time-ingestion-iceberg-kafka-connect-apache-iceberg-sink/ posted by you guys but DELETE don't seem to work.

clazalde commented 1 year ago

It happened to me as well but I solved it setting the drop.tombstones to true so I dont have that null message, I also added the ExtractNewRecordState transformation:

"transforms" : "unwrap"
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState"
"transforms.unwrap.add.fields":"op,table,lsn,source.ts_ms,db"
"transforms.unwrap.drop.tombstones":"true"
"transforms.unwrap.delete.handling.mode":"rewrite"

try to add this on your postgre config and tell me if it worked

RajotyaGautam commented 1 year ago

Great... It worked mate... thanks a ton