Open leandrojmp opened 1 year ago
It is possible to consume data with an input integration (like kafka), and then via a custom ingest pipeline use some attribute in your kafka data to route it to the appropriate integration. This routing is done by the reroute
processor.
For example let's say you nginx and apache logs in kafka.
reroute
processors. You could use the data_stream.dataset
field to apply the routing.Now when the kafka input reads data it will hit your custom pipeline then be routed to the appropriated data stream (and get processed by its default pipeline).
Hello @andrewkroh,
So I just need a custom pipeline on the Kafka integration with a reroute processor?
For the Fortigate example, I would just need to add something like this:
{
"reroute": {
"dataset": "fortinet_fortigate.log"
}
}
And the log would be rerouted to the fortinet_fortigate.log
ingestion pipeline? But how the ingestion pipeline will be triggered? Internally elasticsearch will add the name of the ingest pipeline to the request?
reroute
supports mustache templates so it is possible to reference a data from the event. This is relevant if the data was pushed by Elastic Agent because you could refer to {{{data_stream.dataset}}}
.
But how the ingestion pipeline will be triggered? Internally elasticsearch will add the name of the ingest pipeline to the request?
Both data streams and regular indices have the concept of a default pipeline (see index.default_pipeline
in docs). So for example if you directly sent data to the _bulk API and only specified a Fleet data stream, then that data would be passed through the integration's default pipeline before landing in the backing index. Fleet will always add the integration's pipeline as the default_pipeline
.
So in the case of reroute
it will honor this index.default_pipeline
setting too, and that is how the pipeline is executed.
One of things I was concerned about was making sure that if you used kafka_logs
that it would have the permissions to write to other arbitrary data streams. I checked, and the integration is setup to allow writes to logs-*
. So as long as you only reroute to other log integrations the permissions aspect is OK.
Thanks @andrewkroh!
We were planning to use the Kafka Custom Log integration, but saw that the kind of reroute we need will only work on 8.9+, we are still on 8.8.1, but an upgrade is planned for next month.
Another question, the documentation does not list what are the exported fields from this integration, but looking at the sample_event.json file, it seems that some kafka
fields are exported.
"kafka": {
"headers": [],
"key": "",
"offset": 0,
"partition": 0,
"topic": "testTopic"
}
Is this correct? We are going to need to have something to filter on to reroute, for example the kafka topic.
Hi! We just realized that we haven't looked into this issue in a while. We're sorry! We're labeling this issue as Stale
to make it hit our filters and make sure we get back to it as soon as possible. In the meantime, it'd be extremely helpful if you could take a look at it as well and confirm its relevance. A simple comment with a nice emoji will be enough :+1
. Thank you for your contribution!
👍
Hello, it is possible for this to be looked at?
While it is possible to use the Kafka Custom Log integration, it would be better to have the kafka input available directly in the integrations.
Hello,
Currently the Elastic Agent supports many inputs like file, tcp, udp, httpjson, s3, event hub etc, but some integrations are limited to just one or a couple of those inputs.
One of the main issues for us to adopt the integations is that the majority of our data is on Kafka Cluster, our firewall logs, cloudtrail logs, cloudflare logs and many others are on Kafka Clusters.
Until recently it was not clear if the Elastic Agent would support Kafka Input or not, but looking the available integrations I saw that there is an integration named Kafka Custom Logs that indeed uses the kafka input of filebeat.
But this is the only integration that has this input for now.
It would be nice if for every other integration you had the option to add a custom input to collect the data and them use the pipelines of the integration to parse it.
For example, if the Fortigate integration had this option we would be able to use the Elastic Agent integration to consume our firewall logs and stop using Logstash for this case.
Feature Request:
Add the option to use a custom input on an integration allowing in this case to consume the data if the source is not one of the hard-coded in the integration.