streamthoughts / kafka-connect-file-pulse

🔗 A multipurpose Kafka Connect connector that makes it easy to parse, transform and stream any file, in any format, into Apache Kafka
https://streamthoughts.github.io/kafka-connect-file-pulse/
Apache License 2.0
324 stars 65 forks source link

Challenges with File Pulse Connector for Real-Time Log Streaming from Kubernetes: Meeting Requirements for Incremental Updates and Dynamic File Monitoring #661

Open max533 opened 3 months ago

max533 commented 3 months ago

Provide details of the setup you're running

File Pulse Connector Version 2.14.1 Docker Version 20.10.13

Outline your question

I am currently working on using File Pulse Connector to achieve real-time log streaming from Kubernetes container applications. However, I am encountering difficulties in meeting the following requirements simultaneously, or it appears that a very large number of Kafka Connect workers may be needed to achieve this:

I have also reviewed related discussions on Stack Overflow and GitHub regarding these issues:

But to ensure I’m not making any configuration errors due to unfamiliarity, I’d like to consult with an expert to confirm whether these requirements can be met or if Kafka Connect might not be suitable for this use case.

Here are the requirements I need to achieve:

  1. Log files must be continuously monitored. When new entries are added to the log files—at irregular intervals (e.g., once a day or not at all)—the connector should only send the newly added portions to the Kafka topic, rather than re-sending the entire file content.
  2. The number of log files may increase significantly, potentially reaching hundreds or thousands. The system should be able to detect new files and automatically start monitoring them.

Below is my Kafka Connect file pulse connector configuration file

{
  "connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
  "filters": "GroupMultilineException, ParseLog4jLog, AppendNode",
  "filters.GroupMultilineException.negate": "false",
  "filters.GroupMultilineException.pattern": "^[\\t]",
  "filters.GroupMultilineException.type": "io.streamthoughts.kafka.connect.filepulse.filter.MultiRowFilter",
  "filters.ParseLog4jLog.pattern": "%{TIMESTAMP_ISO8601:logdate} %{LOGLEVEL:loglevel} %{GREEDYDATA:message}",
  "filters.ParseLog4jLog.overwrite": "message",
  "filters.ParseLog4jLog.source": "message",
  "filters.ParseLog4jLog.type": "io.streamthoughts.kafka.connect.filepulse.filter.GrokFilter",
  "filters.ParseLog4jLog.ignoreFailure": "true",
  "filters.AppendNode.type": "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
  "filters.AppendNode.field": "$.nodeName",
  "filters.AppendNode.value": "{{ extract_array( split($metadata.path, '/'), 3) }}",
  "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy",
  "fs.cleanup.policy.triggered.on":"COMMITTED",
  "fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing",
  "fs.listing.directory.path": "/mnt/log",
  "fs.listing.filters": "io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter",
  "fs.listing.interval.ms": "10000",
  "file.filter.regex.pattern":".*\\.log$",
  "offset.policy.class":"io.streamthoughts.kafka.connect.filepulse.offset.DefaultSourceOffsetPolicy",
  "offset.attributes.string": "hash",
  "read.max.wait.ms": "86400000",
  "ignore.committed.offsets": "false",
  "topic": "connect-file-pulse-quickstart-log4j",
  "tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalRowFileInputReader",
  "tasks.file.status.storage.class": "io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore",
  "tasks.file.status.storage.bootstrap.servers": "my-cluster-kafka-bootstrap:9092",
  "tasks.file.status.storage.topic": "connect-file-pulse-status",
  "tasks.file.status.storage.topic.partitions": 10,
  "tasks.file.status.storage.topic.replication.factor": 1,
  "tasks.max": 1
}
OneCricketeer commented 3 weeks ago

Is there a specific reason to use a JVM (Kafka Connect) for this vs something more slim like Promtail, Elastic Filebeat, Fluentbit, vector.dev, Splunk/Graylog, etc?

More specifically why are pods generating logs as files instead of output streams?