streamthoughts / kafka-connect-file-pulse

đź”— A multipurpose Kafka Connect connector that makes it easy to parse, transform and stream any file, in any format, into Apache Kafka
https://streamthoughts.github.io/kafka-connect-file-pulse/
Apache License 2.0
322 stars 64 forks source link

FilePulse source connector 2.10 class could not be found #591

Closed Shykeng closed 9 months ago

Shykeng commented 9 months ago

Hello everyone,

I'm new to kafka and data streaming architecure and I'm currently running into an issue.

Indeed I'm looking for a way to ingest datas from a log file (firstly localy and later on remotely with the SftpFilesystemListing class as suggested here) into kafka topic through Confluent Platform using File Pulse source connector 2.10 dowloaded via Confluent Hub CLI.

However I'm unable to access required classes when trying to set up the connector through Connect API:

 curl \
    -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/source-log-filepulse-00/config \
    -d '{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/home/aurelien/logs/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
    "fs.listing.class":"io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing",
    "fs.cleanup.policy.class":"io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
        "file.filter.regex.pattern":".*\\.log$",
        "task.reader.class":"io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
        "offset.strategy":"name",
        "skip.headers": "1",
        "topic":"enel",
        "internal.kafka.reporter.bootstrap.servers":"broker:29092",
        "internal.kafka.reporter.topic":"connect-file-pulse-status",
        "tasks.max": 1
    }'

It returns:

{
  "error_code": 400,
  "message": "Connector configuration is invalid and contains the following 2 error(s):\nMissing required configuration \"tasks.reader.class\" which has no default value.\nInvalid value io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy for configuration fs.cleanup.policy.class: Class io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy could not be found.\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"
}

And running it directly with the connector wizard on Confluent Control Center doesn't help either.

My plugin.path is set to /opt/confluent/share/confluent-hub-components where I have 3 connectors:

confluentinc-kafka-connect-hdfs3
streamthoughts-kafka-connect-file-pulse
jcustenborder-kafka-connect-spooldir

Running Confluent Platform 7.5 with open jdk 11 on Ubuntu 22.04.3 LTS

Can anyone help me ?

Thank you.

Shykeng commented 9 months ago

Fix it by updating the following fields :

This "fs.cleanup.policy.class":"io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy" to this "fs.cleanup.policy.class":"io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy"

And this "task.reader.class":"io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader" to this "tasks.reader.class":"io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalRowFileInputReader"

Be careful as there are not so much tutorials online which are up to date (i.e. plugin version) and so class may differs !

Which gives us:

 curl \
    -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/connectors/source-log-filepulse-00/config \
    -d '{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/home/aurelien/logs/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "fs.listing.class":"io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing",
        "fs.cleanup.policy.class":"io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy",
        "file.filter.regex.pattern":".*\\.log$",
        "tasks.reader.class":"io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalRowFileInputReader",
        "offset.strategy":"name",
        "skip.headers": "1",
        "topic":"enel",
        "internal.kafka.reporter.bootstrap.servers":"broker:29092",
        "internal.kafka.reporter.topic":"connect-file-pulse-status",
        "tasks.max": 1
    }'

Links to class path I resolved : tasks.reader.class and fs.cleanup.policy.class I hope it may help someone facing those issues!