Open tawabocah opened 2 years ago
I encountered the same issue when I upgraded confluent from 6.0.1 to 7.0.1, everything ok in the previous version. I tested even use sleepy, move cleanup policy and fixed length reader, this issue will still exist.
I suspect it is related to this change of kafka connect commit offset logic: https://github.com/apache/kafka/pull/11323 However, there are no obvious issue when I use other kafka connector plugins.
Below are the TRACE logs for your information, I added some notes start with # for your investigation easier.
# offset commit interval, 10s each
[2022-03-30 13:21:33,982] DEBUG [FsSourceConnector-Test|task-0|offsets] WorkerSourceTask{id=FsSourceConnector-Test-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:491)
[2022-03-30 13:21:33,982] INFO [FsSourceConnector-Test|task-0|offsets] WorkerSourceTask{id=FsSourceConnector-Test-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:503)
[2022-03-30 13:21:33,983] DEBUG [FsSourceConnector-Test|task-0|offsets] WorkerSourceTask{id=FsSourceConnector-Test-0} Finished offset commitOffsets successfully in 1 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:539)
# first cycle, execution number [31], 60s each
[2022-03-30 13:21:43,392] TRACE [FsSourceConnector-Test|task-0] FixedWidthFileReader Initialized file reader with batch size [0] for file [file:/placeholder.gz]. (com.github.mmolimar.kafka.connect.fs.file.reader.FixedWidthFileReader:41)
[2022-03-30 13:21:43,424] INFO [FsSourceConnector-Test|task-0] FsSourceTask Processing records for file [path = file:/placeholder.gz, length = 90, blocks = [[offset = 0, length = 90, corrupt = false]]]... (com.github.mmolimar.kafka.connect.fs.FsSourceTask:93)
[2022-03-30 13:21:43,448] DEBUG [FsSourceConnector-Test|task-0] FsSourceTask Read [2] records from file [file:/placeholder.gz]. (com.github.mmolimar.kafka.connect.fs.FsSourceTask:106)
[2022-03-30 13:21:43,448] DEBUG [FsSourceConnector-Test|task-0] FsSourceTask Returning [2] records in execution number [31] for policy [com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy]. (com.github.mmolimar.kafka.connect.fs.FsSourceTask:111)
[2022-03-30 13:21:43,448] TRACE [FsSourceConnector-Test|task-0] WorkerSourceTask{id=FsSourceConnector-Test-0} About to send 2 records to Kafka (org.apache.kafka.connect.runtime.WorkerSourceTask:271)
[2022-03-30 13:21:43,462] TRACE [FsSourceConnector-Test|task-0] WorkerSourceTask{id=FsSourceConnector-Test-0} Appending record to the topic test-topic with key null, value Struct{k1=v1,k2=v2,k3=v3} (org.apache.kafka.connect.runtime.WorkerSourceTask:375)
[2022-03-30 13:21:43,464] TRACE [FsSourceConnector-Test|task-0] Topic creation by the connector is disabled or the topic test-topic was previously created.If auto.create.topics.enable is enabled on the broker, the topic will be created with default settings (org.apache.kafka.connect.runtime.WorkerSourceTask:431)
[2022-03-30 13:21:43,485] TRACE [FsSourceConnector-Test|task-0] WorkerSourceTask{id=FsSourceConnector-Test-0} Appending record to the topic test-topic with key null, value Struct{k1=v1,k2=v2,k3=v3} (org.apache.kafka.connect.runtime.WorkerSourceTask:375)
[2022-03-30 13:21:43,485] TRACE [FsSourceConnector-Test|task-0] Topic creation by the connector is disabled or the topic test-topic was previously created.If auto.create.topics.enable is enabled on the broker, the topic will be created with default settings (org.apache.kafka.connect.runtime.WorkerSourceTask:431)
[2022-03-30 13:21:43,485] TRACE [FsSourceConnector-Test|task-0] WorkerSourceTask{id=FsSourceConnector-Test-0} Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:261)
[2022-03-30 13:21:43,485] TRACE [FsSourceConnector-Test|task-0] FsSourceTask Polling for new data... (com.github.mmolimar.kafka.connect.fs.FsSourceTask:79)
[2022-03-30 13:21:43,506] TRACE [FsSourceConnector-Test|task-0] WorkerSourceTask{id=FsSourceConnector-Test-0} Wrote record successfully: topic test-topic partition 0 offset 27 (org.apache.kafka.connect.runtime.WorkerSourceTask:390)
[2022-03-30 13:21:43,507] TRACE [FsSourceConnector-Test|task-0] WorkerSourceTask{id=FsSourceConnector-Test-0} Wrote record successfully: topic test-topic partition 0 offset 28 (org.apache.kafka.connect.runtime.WorkerSourceTask:390)
# issue: first cycle record offset haven't commit
[2022-03-30 13:21:43,983] DEBUG [FsSourceConnector-Test|task-0|offsets] WorkerSourceTask{id=FsSourceConnector-Test-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:491)
[2022-03-30 13:21:43,983] INFO [FsSourceConnector-Test|task-0|offsets] WorkerSourceTask{id=FsSourceConnector-Test-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:503)
[2022-03-30 13:21:43,983] DEBUG [FsSourceConnector-Test|task-0|offsets] WorkerSourceTask{id=FsSourceConnector-Test-0} Finished offset commitOffsets successfully in 0 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:539)
# ...skipped 5 times offsets interval...
# second cycle, execution number [32]
# issue: duplicate record will submit as first cycle offset haven't committed on time
[2022-03-30 13:22:43,532] TRACE [FsSourceConnector-Test|task-0] FixedWidthFileReader Initialized file reader with batch size [0] for file [file:/placeholder.gz]. (com.github.mmolimar.kafka.connect.fs.file.reader.FixedWidthFileReader:41)
[2022-03-30 13:22:43,534] INFO [FsSourceConnector-Test|task-0] FsSourceTask Processing records for file [path = file:/placeholder.gz, length = 90, blocks = [[offset = 0, length = 90, corrupt = false]]]... (com.github.mmolimar.kafka.connect.fs.FsSourceTask:93)
[2022-03-30 13:22:43,535] DEBUG [FsSourceConnector-Test|task-0] FsSourceTask Read [2] records from file [file:/placeholder.gz]. (com.github.mmolimar.kafka.connect.fs.FsSourceTask:106)
[2022-03-30 13:22:43,535] DEBUG [FsSourceConnector-Test|task-0] FsSourceTask Returning [2] records in execution number [32] for policy [com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy]. (com.github.mmolimar.kafka.connect.fs.FsSourceTask:111)
[2022-03-30 13:22:43,535] TRACE [FsSourceConnector-Test|task-0] WorkerSourceTask{id=FsSourceConnector-Test-0} About to send 2 records to Kafka (org.apache.kafka.connect.runtime.WorkerSourceTask:271)
[2022-03-30 13:22:43,536] TRACE [FsSourceConnector-Test|task-0] WorkerSourceTask{id=FsSourceConnector-Test-0} Appending record to the topic test-topic with key null, value Struct{k1=v1,k2=v2,k3=v3} (org.apache.kafka.connect.runtime.WorkerSourceTask:375)
[2022-03-30 13:22:43,536] TRACE [FsSourceConnector-Test|task-0] Topic creation by the connector is disabled or the topic test-topic was previously created.If auto.create.topics.enable is enabled on the broker, the topic will be created with default settings (org.apache.kafka.connect.runtime.WorkerSourceTask:431)
[2022-03-30 13:22:43,537] TRACE [FsSourceConnector-Test|task-0] WorkerSourceTask{id=FsSourceConnector-Test-0} Appending record to the topic test-topic with key null, value Struct{k1=v1,k2=v2,k3=v3} (org.apache.kafka.connect.runtime.WorkerSourceTask:375)
[2022-03-30 13:22:43,537] TRACE [FsSourceConnector-Test|task-0] Topic creation by the connector is disabled or the topic test-topic was previously created.If auto.create.topics.enable is enabled on the broker, the topic will be created with default settings (org.apache.kafka.connect.runtime.WorkerSourceTask:431)
[2022-03-30 13:22:43,537] TRACE [FsSourceConnector-Test|task-0] WorkerSourceTask{id=FsSourceConnector-Test-0} Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:261)
[2022-03-30 13:22:43,537] TRACE [FsSourceConnector-Test|task-0] FsSourceTask Polling for new data... (com.github.mmolimar.kafka.connect.fs.FsSourceTask:79)
[2022-03-30 13:22:43,550] TRACE [FsSourceConnector-Test|task-0] WorkerSourceTask{id=FsSourceConnector-Test-0} Wrote record successfully: topic test-topic partition 0 offset 29 (org.apache.kafka.connect.runtime.WorkerSourceTask:390)
[2022-03-30 13:22:43,550] TRACE [FsSourceConnector-Test|task-0] WorkerSourceTask{id=FsSourceConnector-Test-0} Wrote record successfully: topic test-topic partition 0 offset 30 (org.apache.kafka.connect.runtime.WorkerSourceTask:390)
# issue: first cycle record committed here only after second cycle completed
[2022-03-30 13:22:44,002] DEBUG [FsSourceConnector-Test|task-0|offsets] WorkerSourceTask{id=FsSourceConnector-Test-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:491)
[2022-03-30 13:22:44,002] INFO [FsSourceConnector-Test|task-0|offsets] WorkerSourceTask{id=FsSourceConnector-Test-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:503)
[2022-03-30 13:22:44,007] DEBUG [FsSourceConnector-Test|task-0|offsets] Submitting 1 entries to backing store. The offsets are: {{path=file:/placeholder.gz}={offset=2, file-size=90, eof=true}} (org.apache.kafka.connect.storage.OffsetStorageWriter:165)
[2022-03-30 13:22:44,034] TRACE WorkerSourceTask{id=FsSourceConnector-Test-0} Finished flushing offsets to storage (org.apache.kafka.connect.runtime.WorkerSourceTask:551)
[2022-03-30 13:22:44,035] DEBUG [FsSourceConnector-Test|task-0|offsets] WorkerSourceTask{id=FsSourceConnector-Test-0} Finished commitOffsets successfully in 33 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:586)
[2022-03-30 13:22:54,036] DEBUG [FsSourceConnector-Test|task-0|offsets] WorkerSourceTask{id=FsSourceConnector-Test-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:491)
[2022-03-30 13:22:54,036] INFO [FsSourceConnector-Test|task-0|offsets] WorkerSourceTask{id=FsSourceConnector-Test-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:503)
[2022-03-30 13:22:54,036] DEBUG [FsSourceConnector-Test|task-0|offsets] WorkerSourceTask{id=FsSourceConnector-Test-0} Finished offset commitOffsets successfully in 0 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:539)
# ...skipped 4 times offsets interval...
# third cycle, execution number [33]
[2022-03-30 13:23:43,577] INFO [FsSourceConnector-Test|task-0] SleepyPolicy Moving file [file:/placeholder.gz] to [/done/placeholder.gz] due to it was already processed. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy:290)
[2022-03-30 13:23:43,609] DEBUG [FsSourceConnector-Test|task-0] FsSourceTask Read [0] records from file [file:/placeholder.gz]. (com.github.mmolimar.kafka.connect.fs.FsSourceTask:106)
[2022-03-30 13:23:43,609] DEBUG [FsSourceConnector-Test|task-0] FsSourceTask Returning [0] records in execution number [33] for policy [com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy]. (com.github.mmolimar.kafka.connect.fs.FsSourceTask:111)
[2022-03-30 13:23:43,609] TRACE [FsSourceConnector-Test|task-0] WorkerSourceTask{id=FsSourceConnector-Test-0} About to send 0 records to Kafka (org.apache.kafka.connect.runtime.WorkerSourceTask:271)
[2022-03-30 13:23:43,609] TRACE [FsSourceConnector-Test|task-0] WorkerSourceTask{id=FsSourceConnector-Test-0} Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:261)
[2022-03-30 13:23:43,609] TRACE [FsSourceConnector-Test|task-0] FsSourceTask Polling for new data... (com.github.mmolimar.kafka.connect.fs.FsSourceTask:79)
# issue: second cycle record committed here only after third cycle completed
[2022-03-30 13:23:44,059] DEBUG [FsSourceConnector-Test|task-0|offsets] WorkerSourceTask{id=FsSourceConnector-Test-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:491)
[2022-03-30 13:23:44,059] INFO [FsSourceConnector-Test|task-0|offsets] WorkerSourceTask{id=FsSourceConnector-Test-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:503)
[2022-03-30 13:23:44,060] DEBUG [FsSourceConnector-Test|task-0|offsets] Submitting 1 entries to backing store. The offsets are: {{path=file:/placeholder.gz}={offset=2, file-size=90, eof=true}} (org.apache.kafka.connect.storage.OffsetStorageWriter:165)
[2022-03-30 13:23:44,066] TRACE WorkerSourceTask{id=FsSourceConnector-Test-0} Finished flushing offsets to storage (org.apache.kafka.connect.runtime.WorkerSourceTask:551)
[2022-03-30 13:23:44,066] DEBUG [FsSourceConnector-Test|task-0|offsets] WorkerSourceTask{id=FsSourceConnector-Test-0} Finished commitOffsets successfully in 7 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:586)
[2022-03-30 13:23:54,067] DEBUG [FsSourceConnector-Test|task-0|offsets] WorkerSourceTask{id=FsSourceConnector-Test-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:491)
[2022-03-30 13:23:54,067] INFO [FsSourceConnector-Test|task-0|offsets] WorkerSourceTask{id=FsSourceConnector-Test-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:503)
[2022-03-30 13:23:54,067] DEBUG [FsSourceConnector-Test|task-0|offsets] WorkerSourceTask{id=FsSourceConnector-Test-0} Finished offset commitOffsets successfully in 0 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:539)
# ...skipped 4 times offsets interval...
# execution number [34]
[2022-03-30 13:24:43,643] DEBUG [FsSourceConnector-Test|task-0] FsSourceTask Returning [0] records in execution number [34] for policy [com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy]. (com.github.mmolimar.kafka.connect.fs.FsSourceTask:111)
[2022-03-30 13:24:43,643] TRACE [FsSourceConnector-Test|task-0] WorkerSourceTask{id=FsSourceConnector-Test-0} About to send 0 records to Kafka (org.apache.kafka.connect.runtime.WorkerSourceTask:271)
[2022-03-30 13:24:43,644] TRACE [FsSourceConnector-Test|task-0] WorkerSourceTask{id=FsSourceConnector-Test-0} Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:261)
[2022-03-30 13:24:43,644] TRACE [FsSourceConnector-Test|task-0] FsSourceTask Polling for new data... (com.github.mmolimar.kafka.connect.fs.FsSourceTask:79)
[2022-03-30 13:24:44,070] DEBUG [FsSourceConnector-Test|task-0|offsets] WorkerSourceTask{id=FsSourceConnector-Test-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:491)
[2022-03-30 13:24:44,071] INFO [FsSourceConnector-Test|task-0|offsets] WorkerSourceTask{id=FsSourceConnector-Test-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:503)
[2022-03-30 13:24:44,071] DEBUG [FsSourceConnector-Test|task-0|offsets] WorkerSourceTask{id=FsSourceConnector-Test-0} Finished offset commitOffsets successfully in 0 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:539)
I'm also experiencing the same issue (file being processed multiple times) when using the "sleepy" policy com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy
as well as the CronPolicy.
Closer inspection of the offsets stored in my Kafka Connect offset storage topic shows that the file which keeps being reprocessed has several offset events where "eof":false
in the value of the offset event for that file's key, whereas files which FsSourceConnector is correctly recognizing as finished processing have "eof":true
.
Screenshot from Conduktor analyzing one source file's offsets in the Connect offset storage topic to show what I'm talking about:
It looks like what happened was the following. (For reference, the file in question is a CSV file with 2,323 lines including header row.):
The connector saw a new file, processed rows 0 thru 426, and committed an offset of {"offset":426,"file-size":696277,"eof":false}
(ignore the second offset 1
in my example, that was due to an exception handling a NULL value in the file the first time because I had file_reader.delimited.settings.null_value
set wrong.
I paused the connector between 08:13 and 13:57 because I was troubleshooting/researching this issue.
At 13:57:00, the connector "saw" the file again and began reprocessing it from offset 1:
[2022-09-21 18:57:00,200] INFO CronPolicy Seeking to offset [1] for file [s3a://<redacted>_20220921_000]. (com.github.mmolimar.kafka.connect.fs.policy.CronPolicy)
[2022-09-21 18:57:00,273] INFO FsSourceTask Processing records for file [path = s3a://<redacted>_20220921_000, length = 696277, blocks = [[offset = 0, length = 696277, corrupt = false]]]... (com.github.mmolimar.kafka.connect.fs.FsSourceTask)
One minute later at 18:58:00 (per the cron schedule), it "saw" the file again, and resumed reading it from offset 426:
[2022-09-21 18:58:00,134] INFO CronPolicy Seeking to offset [426] for file [s3a://<redacted>_20220921_000]. (com.github.mmolimar.kafka.connect.fs.policy.CronPolicy)
[2022-09-21 18:58:00,187] INFO FsSourceTask Processing records for file [path = s3a://<redacted>_20220921_000, length = 696277, blocks = [[offset = 0, length = 696277, corrupt = false]]]... (com.github.mmolimar.kafka.connect.fs.FsSourceTask)
However, it looks like on the previous execution of the CronPolicy at 18:57:00, it had already successfully produced all 2323 rows of the file into Kafka. This execution at 18:58:00 caused the rows 427-2323 to be produced to the topic a second time.
At 18:59:00, the CronPolicy fires again, and the connector processes the same file again, this time resuming from offset 847:
[2022-09-21 18:59:00,141] INFO CronPolicy Seeking to offset [847] for file [s3a://<redacted>_20220921_000]. (com.github.mmolimar.kafka.connect.fs.policy.CronPolicy)
[2022-09-21 18:59:00,197] INFO FsSourceTask Processing records for file [path = s3a://<redacted>_20220921_000, length = 696277, blocks = [[offset = 0, length = 696277, corrupt = false]]]... (com.github.mmolimar.kafka.connect.fs.FsSourceTask)
Once more, the connector produced duplicate data to Kafka for all the CSV rows from line 848 onward.
This pattern continues repeatedly until eventually it "thinks" it reached the end of the file and commits offset 2322 with "eof":true"
at 18:33:14 (15:33:14 CDT in my Conduktor screenshot above).
On subsequent executions of the CronPolicy, it recognizes that the file has already been processed, and skips it as expected:
[2022-09-21 20:34:00,146] INFO CronPolicy Skipping file [s3a://<redacted>_20220921_000] due to it was already processed. (com.github.mmolimar.kafka.connect.fs.policy.CronPolicy)
The end result is that I have many duplicate events in the target Kafka topic, and I have more duplicate events for rows further down in the CSV file.
Here's my connector config, for reference:
{
"connector.class": "com.github.mmolimar.kafka.connect.fs.FsSourceConnector",
"file_reader.class": "com.github.mmolimar.kafka.connect.fs.file.reader.CsvFileReader",
"file_reader.delimited.settings.format.delimiter": ",",
"file_reader.delimited.settings.header": true,
"file_reader.delimited.settings.allow_nulls": true,
"fs.uris": "s3a://<redacted>",
"name": "<redacted>",
"policy.class": "com.github.mmolimar.kafka.connect.fs.policy.CronPolicy",
"policy.fs.fs.s3a.access.key": "<redacted>",
"policy.fs.fs.s3a.secret.key" : "<redacted>",
"policy.regexp": "<redacted>",
"policy.cron.expression": "0 0/1 * ? * *",
"topic": "<redacted>",
"file_reader.delimited.settings.null_value": "",
"file_reader.delimited.settings.empty_value": "",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schemaregistry.<redacted>.com:8081",
"transforms": "SetValueSchema",
"transforms.SetValueSchema.schema.name": "<redacted>",
"transforms.SetValueSchema.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value"
}
@mmolimar I humbly beseech you to read the above and provide some feedback/insight.
I'm struggling with the same issue on 7.3.0. I tried rolling back to 6.0.1 and encountered it there too. It happens in every policy, the file gets processed multiple times.
---
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.3.0
container_name: broker
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
kafka-connect:
image: confluentinc/cp-kafka-connect:7.2.3
container_name: kafka-connect
depends_on:
- zookeeper
- broker
ports:
- 8083:8083
environment:
CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 1000
CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/'
command:
- bash
- -c
- |
confluent-hub install --no-prompt mmolimar/kafka-connect-fs:1.3.0
/etc/confluent/docker/run &
sleep infinity
volumes:
- /tmp:/data
Setting the sleep timer to a higher value seems to reduce the number of times each file gets reprocessed but it doesn't stop it. Even if I set it to several minutes the file still gets processed 2 or 3 times before it's moved.
curl \
-i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/xml-filesystem-00/config \
-d '{
"connector.class":"com.github.mmolimar.kafka.connect.fs.FsSourceConnector",
"tasks.max": 1,
"fs.uris":"file:///data/unprocessed",
"topic":"xml-ingest",
"policy.class":"com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy",
"policy.sleepy.sleep":"10000",
"policy.recursive":"true",
"policy.regexp":".*\\.xml$",
"policy.batch_size":"-1",
"policy.cleanup":"move",
"policy.cleanup.move":"file:///data/processed",
"file_reader.class":"com.github.mmolimar.kafka.connect.fs.file.reader.TextFileReader",
"file_reader.batch_size":"-1"
}'
kafka-connect | [2023-02-01 16:18:54,427] INFO FsSourceTask Processing records for file [path = file:/data/unprocessed/test.xml, length = 24, blocks = [[offset = 0, length = 24, corrupt = false]]]... (com.github.mmolimar.kafka.connect.fs.FsSourceTask)
kafka-connect | [2023-02-01 16:18:54,437] INFO FsSourceTask Processing records for file [path = file:/data/unprocessed/test2.xml, length = 9, blocks = [[offset = 0, length = 9, corrupt = false]]]... (com.github.mmolimar.kafka.connect.fs.FsSourceTask)
kafka-connect | [2023-02-01 16:18:54,440] INFO FsSourceTask Processing records for file [path = file:/data/unprocessed/test3.xml, length = 19, blocks = [[offset = 0, length = 19, corrupt = false]]]... (com.github.mmolimar.kafka.connect.fs.FsSourceTask)
kafka-connect | [2023-02-01 16:18:54,444] INFO FsSourceTask Processing records for file [path = file:/data/unprocessed/test4.xml, length = 280, blocks = [[offset = 0, length = 280, corrupt = false]]]... (com.github.mmolimar.kafka.connect.fs.FsSourceTask)
kafka-connect | [2023-02-01 16:18:54,447] INFO FsSourceTask Processing records for file [path = file:/data/unprocessed/test5.xml, length = 19, blocks = [[offset = 0, length = 19, corrupt = false]]]... (com.github.mmolimar.kafka.connect.fs.FsSourceTask)
kafka-connect | [2023-02-01 16:18:54,450] INFO FsSourceTask Processing records for file [path = file:/data/unprocessed/test6.xml, length = 12, blocks = [[offset = 0, length = 12, corrupt = false]]]... (com.github.mmolimar.kafka.connect.fs.FsSourceTask)
kafka-connect | [2023-02-01 16:18:54,459] INFO [Producer clientId=connector-producer-source-xml-filesystem-00-0] Resetting the last seen epoch of partition xml-ingest-0 to 0 since the associated topicId changed from null to B_WprujwS8aE5VHedkX7BQ (org.apache.kafka.clients.Metadata)
kafka-connect | [2023-02-01 16:19:04,521] INFO FsSourceTask Processing records for file [path = file:/data/unprocessed/test.xml, length = 24, blocks = [[offset = 0, length = 24, corrupt = false]]]... (com.github.mmolimar.kafka.connect.fs.FsSourceTask)
kafka-connect | [2023-02-01 16:19:04,523] INFO FsSourceTask Processing records for file [path = file:/data/unprocessed/test2.xml, length = 9, blocks = [[offset = 0, length = 9, corrupt = false]]]... (com.github.mmolimar.kafka.connect.fs.FsSourceTask)
kafka-connect | [2023-02-01 16:19:04,525] INFO FsSourceTask Processing records for file [path = file:/data/unprocessed/test3.xml, length = 19, blocks = [[offset = 0, length = 19, corrupt = false]]]... (com.github.mmolimar.kafka.connect.fs.FsSourceTask)
kafka-connect | [2023-02-01 16:19:04,528] INFO FsSourceTask Processing records for file [path = file:/data/unprocessed/test4.xml, length = 280, blocks = [[offset = 0, length = 280, corrupt = false]]]... (com.github.mmolimar.kafka.connect.fs.FsSourceTask)
kafka-connect | [2023-02-01 16:19:04,529] INFO FsSourceTask Processing records for file [path = file:/data/unprocessed/test5.xml, length = 19, blocks = [[offset = 0, length = 19, corrupt = false]]]... (com.github.mmolimar.kafka.connect.fs.FsSourceTask)
kafka-connect | [2023-02-01 16:19:04,533] INFO FsSourceTask Processing records for file [path = file:/data/unprocessed/test6.xml, length = 12, blocks = [[offset = 0, length = 12, corrupt = false]]]... (com.github.mmolimar.kafka.connect.fs.FsSourceTask)
kafka-connect | [2023-02-01 16:19:04,866] INFO [Producer clientId=producer-1] Resetting the last seen epoch of partition docker-connect-offsets-0 to 0 since the associated topicId changed from null to DWF138FARpqXLsWwspFMXA (org.apache.kafka.clients.Metadata)
kafka-connect | [2023-02-01 16:19:04,866] INFO [Producer clientId=producer-1] Resetting the last seen epoch of partition docker-connect-offsets-5 to 0 since the associated topicId changed from null to DWF138FARpqXLsWwspFMXA (org.apache.kafka.clients.Metadata)
kafka-connect | [2023-02-01 16:19:04,866] INFO [Producer clientId=producer-1] Resetting the last seen epoch of partition docker-connect-offsets-10 to 0 since the associated topicId changed from null to DWF138FARpqXLsWwspFMXA (org.apache.kafka.clients.Metadata)
kafka-connect | [2023-02-01 16:19:04,866] INFO [Producer clientId=producer-1] Resetting the last seen epoch of partition docker-connect-offsets-20 to 0 since the associated topicId changed from null to DWF138FARpqXLsWwspFMXA (org.apache.kafka.clients.Metadata)
kafka-connect | [2023-02-01 16:19:04,866] INFO [Producer clientId=producer-1] Resetting the last seen epoch of partition docker-connect-offsets-15 to 0 since the associated topicId changed from null to DWF138FARpqXLsWwspFMXA (org.apache.kafka.clients.Metadata)
kafka-connect | [2023-02-01 16:19:04,866] INFO [Producer clientId=producer-1] Resetting the last seen epoch of partition docker-connect-offsets-9 to 0 since the associated topicId changed from null to DWF138FARpqXLsWwspFMXA (org.apache.kafka.clients.Metadata)
kafka-connect | [2023-02-01 16:19:04,866] INFO [Producer clientId=producer-1] Resetting the last seen epoch of partition docker-connect-offsets-11 to 0 since the associated topicId changed from null to DWF138FARpqXLsWwspFMXA (org.apache.kafka.clients.Metadata)
kafka-connect | [2023-02-01 16:19:04,866] INFO [Producer clientId=producer-1] Resetting the last seen epoch of partition docker-connect-offsets-16 to 0 since the associated topicId changed from null to DWF138FARpqXLsWwspFMXA (org.apache.kafka.clients.Metadata)
kafka-connect | [2023-02-01 16:19:04,866] INFO [Producer clientId=producer-1] Resetting the last seen epoch of partition docker-connect-offsets-4 to 0 since the associated topicId changed from null to DWF138FARpqXLsWwspFMXA (org.apache.kafka.clients.Metadata)
kafka-connect | [2023-02-01 16:19:04,866] INFO [Producer clientId=producer-1] Resetting the last seen epoch of partition docker-connect-offsets-17 to 0 since the associated topicId changed from null to DWF138FARpqXLsWwspFMXA (org.apache.kafka.clients.Metadata)
kafka-connect | [2023-02-01 16:19:04,866] INFO [Producer clientId=producer-1] Resetting the last seen epoch of partition docker-connect-offsets-3 to 0 since the associated topicId changed from null to DWF138FARpqXLsWwspFMXA (org.apache.kafka.clients.Metadata)
kafka-connect | [2023-02-01 16:19:04,866] INFO [Producer clientId=producer-1] Resetting the last seen epoch of partition docker-connect-offsets-24 to 0 since the associated topicId changed from null to DWF138FARpqXLsWwspFMXA (org.apache.kafka.clients.Metadata)
kafka-connect | [2023-02-01 16:19:04,866] INFO [Producer clientId=producer-1] Resetting the last seen epoch of partition docker-connect-offsets-23 to 0 since the associated topicId changed from null to DWF138FARpqXLsWwspFMXA (org.apache.kafka.clients.Metadata)
kafka-connect | [2023-02-01 16:19:04,866] INFO [Producer clientId=producer-1] Resetting the last seen epoch of partition docker-connect-offsets-13 to 0 since the associated topicId changed from null to DWF138FARpqXLsWwspFMXA (org.apache.kafka.clients.Metadata)
kafka-connect | [2023-02-01 16:19:04,866] INFO [Producer clientId=producer-1] Resetting the last seen epoch of partition docker-connect-offsets-18 to 0 since the associated topicId changed from null to DWF138FARpqXLsWwspFMXA (org.apache.kafka.clients.Metadata)
kafka-connect | [2023-02-01 16:19:04,866] INFO [Producer clientId=producer-1] Resetting the last seen epoch of partition docker-connect-offsets-22 to 0 since the associated topicId changed from null to DWF138FARpqXLsWwspFMXA (org.apache.kafka.clients.Metadata)
kafka-connect | [2023-02-01 16:19:04,866] INFO [Producer clientId=producer-1] Resetting the last seen epoch of partition docker-connect-offsets-2 to 0 since the associated topicId changed from null to DWF138FARpqXLsWwspFMXA (org.apache.kafka.clients.Metadata)
kafka-connect | [2023-02-01 16:19:04,866] INFO [Producer clientId=producer-1] Resetting the last seen epoch of partition docker-connect-offsets-8 to 0 since the associated topicId changed from null to DWF138FARpqXLsWwspFMXA (org.apache.kafka.clients.Metadata)
kafka-connect | [2023-02-01 16:19:04,866] INFO [Producer clientId=producer-1] Resetting the last seen epoch of partition docker-connect-offsets-12 to 0 since the associated topicId changed from null to DWF138FARpqXLsWwspFMXA (org.apache.kafka.clients.Metadata)
kafka-connect | [2023-02-01 16:19:04,866] INFO [Producer clientId=producer-1] Resetting the last seen epoch of partition docker-connect-offsets-19 to 0 since the associated topicId changed from null to DWF138FARpqXLsWwspFMXA (org.apache.kafka.clients.Metadata)
kafka-connect | [2023-02-01 16:19:04,866] INFO [Producer clientId=producer-1] Resetting the last seen epoch of partition docker-connect-offsets-14 to 0 since the associated topicId changed from null to DWF138FARpqXLsWwspFMXA (org.apache.kafka.clients.Metadata)
kafka-connect | [2023-02-01 16:19:04,866] INFO [Producer clientId=producer-1] Resetting the last seen epoch of partition docker-connect-offsets-1 to 0 since the associated topicId changed from null to DWF138FARpqXLsWwspFMXA (org.apache.kafka.clients.Metadata)
kafka-connect | [2023-02-01 16:19:04,866] INFO [Producer clientId=producer-1] Resetting the last seen epoch of partition docker-connect-offsets-6 to 0 since the associated topicId changed from null to DWF138FARpqXLsWwspFMXA (org.apache.kafka.clients.Metadata)
kafka-connect | [2023-02-01 16:19:04,866] INFO [Producer clientId=producer-1] Resetting the last seen epoch of partition docker-connect-offsets-7 to 0 since the associated topicId changed from null to DWF138FARpqXLsWwspFMXA (org.apache.kafka.clients.Metadata)
kafka-connect | [2023-02-01 16:19:04,866] INFO [Producer clientId=producer-1] Resetting the last seen epoch of partition docker-connect-offsets-21 to 0 since the associated topicId changed from null to DWF138FARpqXLsWwspFMXA (org.apache.kafka.clients.Metadata)
kafka-connect | [2023-02-01 16:19:14,588] INFO SleepyPolicy Moving file [file:/data/unprocessed/test.xml] to [file:/data/processed/test.xml] due to it was already processed. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy)
kafka-connect | [2023-02-01 16:19:14,640] INFO SleepyPolicy Moving file [file:/data/unprocessed/test2.xml] to [file:/data/processed/test2.xml] due to it was already processed. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy)
kafka-connect | [2023-02-01 16:19:14,666] INFO SleepyPolicy Moving file [file:/data/unprocessed/test3.xml] to [file:/data/processed/test3.xml] due to it was already processed. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy)
kafka-connect | [2023-02-01 16:19:14,696] INFO SleepyPolicy Moving file [file:/data/unprocessed/test4.xml] to [file:/data/processed/test4.xml] due to it was already processed. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy)
kafka-connect | [2023-02-01 16:19:14,729] INFO SleepyPolicy Moving file [file:/data/unprocessed/test5.xml] to [file:/data/processed/test5.xml] due to it was already processed. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy)
kafka-connect | [2023-02-01 16:19:14,754] INFO SleepyPolicy Moving file [file:/data/unprocessed/test6.xml] to [file:/data/processed/test6.xml] due to it was already processed. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy)
This might same with #89 , but I've tried several cron expressions and nothing works. No change on
offset.flush.internal.ms
so it uses default 60000 ms (as stated here)This might be bug on deletion process, since I've tried various cron expression :
Using kafka from docker :
docker.io/bitnami/kafka:3
with kafka connect dockerconfluentinc/cp-kafka-connect:7.0.1
To replicate :
Expected :
Actual :
So the kafka consumer produces like this:
Every minute
1 0/1 * ? * * *
(once every minute) : file processed twice before deletedKafka consumer:
Kafka connect log
Every 30 second
0/30 * * ? * * *
(every 30 second) : file processed 3 times before deletedKafka consumer
Kafka connect log
Every 5 minutes
1 0/5 * ? * * *
(every 5 minutes): file processed twice before deleted.kafka consumer
kafka connect log
No error / warn log on kafka broker