streamthoughts / kafka-connect-file-pulse

🔗 A multipurpose Kafka Connect connector that makes it easy to parse, transform and stream any file, in any format, into Apache Kafka
https://streamthoughts.github.io/kafka-connect-file-pulse/
Apache License 2.0
324 stars 65 forks source link

Kafka Connect File Pulse Not Processing Updates in Existing Files #654

Open goyaltu-deshaw opened 4 months ago

goyaltu-deshaw commented 4 months ago

Provide details of the setup you're running

I am running Kafka Connect File Pulse version 2.14.1 on a Linux-based operating system.

Outline your question

I am using the following configuration to deploy the connector. It successfully scans any newly added files in the fs.listing.directory.path, but it doesn't handle files that are already present and continuously updated. Essentially, new records ingested into these existing files are not being moved to the Kafka topic. I can't find a specific configuration to address this issue.

Thanks!

Configuration

            "goyaltu-file-pulse-source-connector-2": {
                "connector.name": "filepulse-source-connector",
                "transforms.AlignSchemaWithRegistry.schema.registry.urls": "<>",
                "connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
                "tasks.max": "1",
                "tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalRowFileInputReader",
                "topic": "raft.public.goyaltu.example_app.filepulse2",
                "fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing",
                "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy",
                "fs.listing.directory.path": "/codemill/goyaltu/example_streaming_webapp/csvfiles",
                "fs.listing.interval.ms": "10000",
                "file.filter.regex.pattern": ".*\\.csv",
                "offset.strategy": "name + size + lastmodified",
                "file.input.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
                "filters": "ParseCSVLine",
                "filters.ParseCSVLine.type": "io.streamthoughts.kafka.connect.filepulse.filter.CSVFilter",
                "filters.ParseCSVLine.auto.generate.column.names": "true",
                "filters.ParseCSVLine.trim.column": "true",
                "filters.ParseCSVLine.separator": ",",
                "tasks.file.status.storage.bootstrap.servers": "<>",
                "tasks.file.status.storage.topic": "raft.public.goyaltu.connect-file-pulse-status-2",
                "tasks.file.status.storage.producer.security.protocol": "SASL_PLAINTEXT",
                "tasks.file.status.storage.producer.sasl.mechanism": "GSSAPI",
                "tasks.file.status.storage.producer.request.timeout.ms": "20000",
                "tasks.file.status.storage.consumer.security.protocol": "SASL_PLAINTEXT",
                "tasks.file.status.storage.consumer.sasl.mechanism": "GSSAPI",
                "tasks.file.status.storage.consumer.request.timeout.ms": "20000"
            }

goyaltu-deshaw commented 4 months ago

@fhussonnois