Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Challenges with File Pulse Connector for Real-Time Log Streaming from Kubernetes: Meeting Requirements for Incremental Updates and Dynamic File Monitoring #661

Open
max533 opened this issue Aug 6, 2024 · 1 comment
Labels
question Further information is requested

Comments

@max533
Copy link

max533 commented Aug 6, 2024

Provide details of the setup you're running

File Pulse Connector Version 2.14.1
Docker Version 20.10.13

Outline your question

I am currently working on using File Pulse Connector to achieve real-time log streaming from Kubernetes container applications. However, I am encountering difficulties in meeting the following requirements simultaneously, or it appears that a very large number of Kafka Connect workers may be needed to achieve this:

I have also reviewed related discussions on Stack Overflow and GitHub regarding these issues:

But to ensure I’m not making any configuration errors due to unfamiliarity, I’d like to consult with an expert to confirm whether these requirements can be met or if Kafka Connect might not be suitable for this use case.

Here are the requirements I need to achieve:

  1. Log files must be continuously monitored. When new entries are added to the log files—at irregular intervals (e.g., once a day or not at all)—the connector should only send the newly added portions to the Kafka topic, rather than re-sending the entire file content.
  2. The number of log files may increase significantly, potentially reaching hundreds or thousands. The system should be able to detect new files and automatically start monitoring them.

Below is my Kafka Connect file pulse connector configuration file

{
  "connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
  "filters": "GroupMultilineException, ParseLog4jLog, AppendNode",
  "filters.GroupMultilineException.negate": "false",
  "filters.GroupMultilineException.pattern": "^[\\t]",
  "filters.GroupMultilineException.type": "io.streamthoughts.kafka.connect.filepulse.filter.MultiRowFilter",
  "filters.ParseLog4jLog.pattern": "%{TIMESTAMP_ISO8601:logdate} %{LOGLEVEL:loglevel} %{GREEDYDATA:message}",
  "filters.ParseLog4jLog.overwrite": "message",
  "filters.ParseLog4jLog.source": "message",
  "filters.ParseLog4jLog.type": "io.streamthoughts.kafka.connect.filepulse.filter.GrokFilter",
  "filters.ParseLog4jLog.ignoreFailure": "true",
  "filters.AppendNode.type": "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
  "filters.AppendNode.field": "$.nodeName",
  "filters.AppendNode.value": "{{ extract_array( split($metadata.path, '/'), 3) }}",
  "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy",
  "fs.cleanup.policy.triggered.on":"COMMITTED",
  "fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing",
  "fs.listing.directory.path": "/mnt/log",
  "fs.listing.filters": "io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter",
  "fs.listing.interval.ms": "10000",
  "file.filter.regex.pattern":".*\\.log$",
  "offset.policy.class":"io.streamthoughts.kafka.connect.filepulse.offset.DefaultSourceOffsetPolicy",
  "offset.attributes.string": "hash",
  "read.max.wait.ms": "86400000",
  "ignore.committed.offsets": "false",
  "topic": "connect-file-pulse-quickstart-log4j",
  "tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalRowFileInputReader",
  "tasks.file.status.storage.class": "io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore",
  "tasks.file.status.storage.bootstrap.servers": "my-cluster-kafka-bootstrap:9092",
  "tasks.file.status.storage.topic": "connect-file-pulse-status",
  "tasks.file.status.storage.topic.partitions": 10,
  "tasks.file.status.storage.topic.replication.factor": 1,
  "tasks.max": 1
}
@max533 max533 added the question Further information is requested label Aug 6, 2024
@OneCricketeer
Copy link

OneCricketeer commented Oct 28, 2024

Is there a specific reason to use a JVM (Kafka Connect) for this vs something more slim like Promtail, Elastic Filebeat, Fluentbit, vector.dev, Splunk/Graylog, etc?

More specifically why are pods generating logs as files instead of output streams?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants