apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.83k stars 1.77k forks source link

[Feature][Kafka Connector enhancement] Support more deserialization mode in Kafka Source Connector #6462

Open GangLiCN opened 6 months ago

GangLiCN commented 6 months ago

Search before asking

Description

When testing data sync from Kafka to any sink database(e.g. Mysql, PG, clickhouse,,.etc), I find an "issue" and it may need to be enhanced. The issue is : The default key/value's deserializer when cosuming Kafka records is [ByteArrayDeserializer] , Which means no matter you override "kakfa.config" or use default property in your job configuration file the returned data is byteArrays's binding(key,value).
This implement looks like a bit inconvenient because byeArrary is not a standard data type in most popular databases and user must convert it before writing data into database.

Usage Scenario

A simple example configuration file is as below: ` env { execution.parallelism = 4 job.mode = "STREAMING" }

source{ Kafka { format = json
topic = "sensor_data" bootstrap.servers = "localhost:9092" result_table_name = "middle_sensor_tbl" schema = { fields { id = "string" timestamp = "bigint" temperature_c = "decimal(38,20)" } } start_mode = "earliest" kafka.config = { auto.offset.reset = "earliest" enable.auto.commit = "true" max.poll.records = "1000" session.timeout.ms = "60000" key.deserializer = org.apache.kafka.common.serialization.StringDeserializer } } }

sink { jdbc { source_table_name = "middle_sensor_tbl" url = "jdbc:postgresql://localhost:5433/seatunnel_test" driver = "org.postgresql.Driver" database ="seatunnel_test" user = "seatunnel_test" password = "seatunnel_test_123" table = "sensor_data" batch_size = 10000 primary_keys = ["id"] query = "insert into sensor_data(id, timestamp, temperature_c) values(?,?,?)" } } `

Related issues

No

Are you willing to submit a PR?

Code of Conduct

GangLiCN commented 6 months ago

More information: No matter you override(or not) the property "key.deserializer" ( or "value.deserializer") in job configuration file , Kafka Source Connector uses "byteArrayDeserializer" as default deserializer,(it is defined in source code), which introtuces a new issue: How/when to convert these data in "transform" layer before writing to sink database ?

GangLiCN commented 6 months ago

Is any other solution to transfrom data from Kafaka to database ?

github-actions[bot] commented 5 months ago

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.