lensesio / stream-reactor

A collection of open source Apache 2.0 Kafka Connector maintained by Lenses.io.
https://lenses.io
Apache License 2.0
1.01k stars 366 forks source link

AWS S3 sink connect surrounds messages with quotes #874

Open aidan-melen opened 2 years ago

aidan-melen commented 2 years ago

Issue Guidelines

How can I configure the S3 Sink to write messages to S3 objects without surrounding quotes? I would expect the source_topic and target_topic to be identical when using the sink and source connector.

following this aws blog post, more or less.

What version of the Stream Reactor are you reporting this issue for?

I built a fat jar from master branch. The 3.0.1 release won't work because my bucket is in us-west-2 and I found that only the latest build supports the connect.s3.aws.client=AWS property.

Are you running the correct version of Kafka/Confluent for the Stream reactor release?

yes

Have you read the docs?

yes

What is the expected behaviour?

The S3 source connector loads objects messages from S3 to Kafka without surrounding quotes.

What was observed?

Screen Shot 2022-07-18 at 9 45 15 PM

What is your Connect cluster configuration (s3-sink-connector.properties)?

connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
key.converter.schemas.enable=false
connect.s3.kcql=INSERT INTO mkc-tutorial-destination-bucket:tutorial SELECT * FROM source_topic WITH_FLUSH_INTERVAL = 60 WITH_FLUSH_COUNT = 100
tasks.max=2
topics=source_topic
connect.s3.aws.client=AWS
connect.s3.aws.region=us-west-2
schema.enable=false
value.converter=org.apache.kafka.connect.storage.StringConverter
errors.log.enable=true
key.converter=org.apache.kafka.connect.storage.StringConverter

This is what the bucket objects contain:

"asdlkjfasdfkjlfsda"
"jhasdfhiughsdafouih"
"igsdfiuhiudyfs"
"ajshdfoifasdoij"
"ijfhiudhhufdshu"
"oihsdfasdfasdf"
"gfahfahafhaf"
"asdfreatsafh"
""
"asdtadsreatreg"
"a"
"aa"
"aaa"
"aaa"
"aaaa"
"aaaaa"
"aaaaa"
"a"
"a"
"aasdasda"
"asdasd"
"asd"
"asdasdasd"
"asdasdasdsad"
"asdasd"
"adsdasd"
"asda"
"sdasdasd"
""
"asdasd"
"asdasd"
"asd"
"asdasd"
"asd"
"a"
"dasda"
"d"
"asd"

What is your connector properties configuration (s3-source-connector.properties)?

connector.class=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector
key.converter.schemas.enable=false
connect.s3.kcql=INSERT INTO target_topic SELECT * FROM mkc-tutorial-destination-bucket:tutorial WITH_FLUSH_INTERVAL = 60 WITH_FLUSH_COUNT = 100
tasks.max=2
topics=target_topic
connect.s3.aws.client=AWS
connect.s3.aws.region=us-west-2
schema.enable=false
value.converter=org.apache.kafka.connect.storage.StringConverter
errors.log.enable=true
key.converter=org.apache.kafka.connect.storage.StringConverter
iosifnicolae2 commented 2 years ago

We've implemented a solution here: https://github.com/iosifnicolae2/stream-reactor and use the following kcql query (note STOREAS JSON):

INSERT INTO target_topic SELECT * FROM mkc-tutorial-destination-bucket:tutorial STOREAS `JSON` WITH_FLUSH_INTERVAL = 60 WITH_FLUSH_COUNT = 100