streamthoughts / kafka-connect-file-pulse

🔗 A multipurpose Kafka Connect connector that makes it easy to parse, transform and stream any file, in any format, into Apache Kafka
https://streamthoughts.github.io/kafka-connect-file-pulse/
Apache License 2.0
321 stars 62 forks source link

Clarification on Topic Creation and Timeout Issue in Kafka Connect File Pulse Connector #653

Open goyaltu-deshaw opened 1 month ago

goyaltu-deshaw commented 1 month ago

Provide details of the setup you're running

I am running Kafka Connect File Pulse version 2.14.1 on a Linux-based operating system.

Outline your question

I am using the following configuration to deploy the connector:

{
    "goyaltu-file-pulse-source-connector-2": {
        "connector.name": "filepulse-source-connector",
        "transforms.AlignSchemaWithRegistry.schema.registry.urls": "<>",
        "connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "tasks.max": "1",
        "tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalRowFileInputReader",
        "topic": "raft.public.goyaltu.example_app.filepulse2",
        "fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy",
        "fs.listing.directory.path": "/codemill/goyaltu/example_streaming_webapp/csvfiles",
        "fs.listing.interval.ms": "10000",
        "file.filter.regex.pattern": ".*\\.csv",
        "offset.strategy": "name + size + lastmodified",
        "file.input.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
        "filters": "ParseCSVLine",
        "filters.ParseCSVLine.type": "io.streamthoughts.kafka.connect.filepulse.filter.CSVFilter",
        "filters.ParseCSVLine.extract.column.name": "headers",
        "filters.ParseCSVLine.trim.column": "true",
        "filters.ParseCSVLine.separator": ";",
        "tasks.file.status.storage.bootstrap.servers": "<>",
        "tasks.file.status.storage.topic": "raft.public.goyaltu.connect-file-pulse-status-2",
        "tasks.file.status.storage.producer.security.protocol": "SASL_PLAINTEXT",
        "tasks.file.status.storage.producer.sasl.mechanism": "GSSAPI",
        "tasks.file.status.storage.producer.request.timeout.ms": "20000",
        "tasks.file.status.storage.consumer.security.protocol": "SASL_PLAINTEXT",
        "tasks.file.status.storage.consumer.sasl.mechanism": "GSSAPI",
        "tasks.file.status.storage.consumer.request.timeout.ms": "20000"
    }
}

It is not clear from the documentation (https://streamthoughts.github.io/kafka-connect-file-pulse/docs/developer-guide/) whether the internal topic and output topic need to be pre-created or if the connector will create them automatically.

Additionally, I frequently receive this WARN log, even though my connector is in a running state without creating the internal topic:

[2024-07-23T03:11:23,582 [connector-thread-goyaltu-file-pulse-source-connector-2] io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore.createTopic():78 WARN ]: Failed to create topic '(name=raft.public.goyaltu.connect-file-pulse-status-2, numPartitions=default, replicationFactor=default, replicasAssignments=null, configs={cleanup.policy=compact})'
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: createTopics
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) ~[?:?]
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) ~[?:?]
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) ~[kafka-clients-3.1.2.jar:?]
        at io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore.createTopic(KafkaFileObjectStateBackingStore.java:72) ~[kafka-connect-filepulse-plugin-2.14.1.jar:?]
        at io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore.configure(KafkaFileObjectStateBackingStore.java:62) ~[kafka-connect-filepulse-plugin-2.14.1.jar:?]
        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:405) ~[kafka-clients-3.1.2.jar:?]
        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:434) ~[kafka-clients-3.1.2.jar:?]
        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:419) ~[kafka-clients-3.1.2.jar:?]
        at io.streamthoughts.kafka.connect.filepulse.config.CommonSourceConfig.getStateBackingStore(CommonSourceConfig.java:296) ~[kafka-connect-filepulse-plugin-2.14.1.jar:?]
        at io.streamthoughts.kafka.connect.filepulse.state.StateBackingStoreAccess.lambda$initSharedStateBackingStore$0(StateBackingStoreAccess.java:46) ~[kafka-connect-filepulse-plugin-2.14.1.jar:?]
        at io.streamthoughts.kafka.connect.filepulse.state.FileObjectStateBackingStoreManager.getOrCreateSharedStore(FileObjectStateBackingStoreManager.java:58) [kafka-connect-filepulse-plugin-2.14.1.jar:?]
        at io.streamthoughts.kafka.connect.filepulse.state.StateBackingStoreAccess.initSharedStateBackingStore(StateBackingStoreAccess.java:43) [kafka-connect-filepulse-plugin-2.14.1.jar:?]
        at io.streamthoughts.kafka.connect.filepulse.state.StateBackingStoreAccess.<init>(StateBackingStoreAccess.java:33) [kafka-connect-filepulse-plugin-2.14.1.jar:?]
        at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector.start(FilePulseSourceConnector.java:97) [kafka-connect-filepulse-plugin-2.14.1.jar:2.14.1]
        at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:184) [connect-runtime-3.1.2.jar:?]
        at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:209) [connect-runtime-3.1.2.jar:?]
        at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:348) [connect-runtime-3.1.2.jar:?]
        at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:331) [connect-runtime-3.1.2.jar:?]
        at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:140) [connect-runtime-3.1.2.jar:?]
        at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:117) [connect-runtime-3.1.2.jar:?]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) [?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
        at java.lang.Thread.run(Thread.java:833) [?:?]
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: createTopics

Could you please clarify if the internal topic and output topic need to be pre-created, or if the connector should automatically create them? Additionally, any guidance on resolving the timeout issue would be greatly appreciated.

Thank you!


goyaltu-deshaw commented 1 month ago

Also I want to understand the below log reasoning:

[2024-07-23T03:11:23,595 [KafkaBasedLog Work Thread - raft.public.goyaltu.connect-file-pulse-status-2] io.streamthoughts.kafka.connect.filepulse.storage.KafkaBasedLog$WorkThread.run():337 ERROR ]: Unexpected error in Thread[KafkaBasedLog Work Thread - raft.public.goyaltu.connect-file-pulse-status-2,5,main]
java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1223) ~[kafka-clients-3.1.2.jar:?]
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-3.1.2.jar:?]
        at io.streamthoughts.kafka.connect.filepulse.storage.KafkaBasedLog.poll(KafkaBasedLog.java:259) ~[kafka-connect-filepulse-api-2.14.1.jar:2.14.1]
        at io.streamthoughts.kafka.connect.filepulse.storage.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:330) [kafka-connect-filepulse-api-2.14.1.jar:2.14.1]