Open muhammadali321 opened 1 year ago
Debezium converts date & datetime to EPOCH in UTC, that's why you get int instead of date, cmiiw.
Where do we need to transform this type?
Depends on where you need the converted data. If in Kafka, then the source. If only in the sink, then there.
In either case, you need both transforms listed in the config
transforms=unwrap,ConvertTimestamp
Hello Team, I'm pretty new to Kafka and my use case is to stream Realtime data from MySQL to Greenplum using Kafka
we have a table in mysql replicadb database
structure on mysql side
CREATE TABLE
student_enroll_custom1
(student_id
int(11) DEFAULT NULL,enroll_date
date default '2021-01-01 00:00:00' ) ENGINE=InnoDB DEFAULT CHARSET=utf8mysql debezium cdc connector configuration
name=mysql-con-1 connector.class=io.debezium.connector.mysql.MySqlConnector database.hostname=10.223.0.123 database.port=3306 database.user=debezium database.password=debezium database.server.id=1 database.server.name=mysql-svr database.connectionTimeZone=America/New_York database.include.list=replicadb database.history.kafka.bootstrap.servers=afi03:9092 database.history.kafka.topic=schema_chn.replicadb include.schema.changes=true time.precision.mode=connect transforms=route transforms.route.type=org.apache.kafka.connect.transforms.RegexRouter transforms.route.regex=([^.]+)\.([^.]+)\.([^.]+) transforms.route.replacement=$3
transforms.timestampConverter.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.timestampConverter.format=YYYY-MM-dd
Connect-standalone properties
bootstrap.servers=afi03:9092 key.converter=io.confluent.connect.json.JsonSchemaConverter value.converter=io.confluent.connect.json.JsonSchemaConverter key.converter.schema.registry.url=http://afi03:9081 value.converter.schema.registry.url=http://afi03:9081 offset.storage.file.filename=/home/kafka/standalone/standalone.log offset.flush.interval.ms=10000 plugin.path=/usr/share/java,/usr/share/java/debezium-connector-mysql,/usr/share/java/confluentinc-kafka-connect-jdbc-10.6.0,/usr/share/java/TimestampConverter-1.2.4-SNAPSHOT.jar
JDBC Sink Connector
name=test-jdbc-sink connector.class=io.confluent.connect.jdbc.JdbcSinkConnector tasks.max=4 connection.url=jdbc:postgresql://10.321.61.12:5432/gpafiniti?currentSchema=replicadb&user=gpadmin&password=pivotal@1234# topics=student_enroll_custom1 dialect.name=PostgreSqlDatabaseDialect auto.create=true insert.mode=insert transforms=unwrap transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState schemas.enable=false
for date datatype
time.precision.mode=connect
transforms=TimestampConverter
transforms.TimestampConverter.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.TimestampConverter.format=yyyy-MM-dd
transforms.TimestampConverter.target.type=string
transforms.TimestampConverter.target.field=enroll_date
On topic student_enroll_custom1, the data is stored in integer format for enroll_date column
{"before":null,"after":{"student_id":1,"enroll_date":18993},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-svr","ts_ms":1669844712677,"snapshot":"true","db":"replicadb","sequence":null,"table":"student_enroll_custom1","server_id":0,"gtid":null,"file":"mysql-bin.000014","pos":6877932,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1669844712678,"transaction":null}
{"before":null,"after":{"student_id":1,"enroll_date":18993},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-svr","ts_ms":1669844712678,"snapshot":"true","db":"replicadb","sequence":null,"table":"student_enroll_custom1","server_id":0,"gtid":null,"file":"mysql-bin.000014","pos":6877932,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1669844712678,"transaction":null}
Now the question is
1: Why we are getting int against date columns? is there any possibility to change it to string or date on source side? 2: Where do we need to transform this type? like on source connecter(debezium CDC) or on sink connector side(postgreql sink connector)? 3: How to transform this enroll_date field? I have used TimestampConverter configuration but no luck. Maybe I'm not providing correct configuration for timestamp. Can you please brief about where and to use timestamp transformation here Thanks