lensesio / stream-reactor

A collection of open source Apache 2.0 Kafka Connector maintained by Lenses.io.
https://lenses.io
Apache License 2.0
1.01k stars 366 forks source link

S3 Sink connector reports consumer lag of 1 despite processing all records #1132

Open jamielwhite opened 6 months ago

jamielwhite commented 6 months ago

Issue Guidelines

What version of the Stream Reactor are you reporting this issue for?

6.3.0

Are you running the correct version of Kafka/Confluent for the Stream reactor release?

Yes (Kafka 3.6.0, Confluent 7.6.0)

Do you have a supported version of the data source/sink .i.e Cassandra 3.0.9?

Yes (S3 sink 6.3.0)

Have you read the docs?

Yes

What is the expected behaviour?

I expect the Kafka consumer group to report a lag of 0 once it has processed all records in the topic.

What was observed?

The consumer group lag remained at 1 once it caught up to new messages. When I read the files from S3, it had written the latest message on the topic. So the connector appears to be processing all of the messages but not committing the offsets how I'd expect.

➜  ~ kafka-consumer-groups --bootstrap-server localhost:19092 --describe --group connect-backup-s3-sink

GROUP                  TOPIC               PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        
connect-backup-s3-sink test_backup_topic_2 0          131             132             1            
connect-backup-s3-sink test_backup_topic_1 0          199             200             1  

What is your Connect cluster configuration (connect-avro-distributed.properties)?

group.id=test-kafka-connect
status.storage.replication.factor=1
key.converter=io.confluent.connect.avro.AvroConverter
config.storage.topic=connect-config
offset.storage.replication.factor=1
plugin.path=/usr/share/java/plugins
offset.storage.topic=connect-offsets
bootstrap.servers=kafka:9092
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
rest.advertised.host.name=localhost
rest.port=8083
status.storage.topic=connect-status
value.converter.schema.registry.url=http://schema-registry:8081
config.storage.replication.factor=1

What is your connector properties configuration (my-connector.properties)?

{
  "name": "backup-s3-sink",
  "config": {
    "connector.class": "io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector",
    "connect.s3.kcql": "INSERT INTO my-bucket-name SELECT * FROM `*` STOREAS `AVRO` WITH_FLUSH_INTERVAL = 30 PROPERTIES('store.envelope'=true)",
    "connect.s3.custom.endpoint": "http://localstack:4566",
    "connect.s3.vhost.bucket": true,
    "topics": "test_backup_topic_1,test_backup_topic_2"
  }
}
brandon-powers commented 6 months ago

+1, I'm seeing this as well on 6.3.0.

JKCai commented 6 months ago

Seeing the same behaviour too on version 6.3.0.