influxdata / telegraf

Agent for collecting, processing, aggregating, and writing metrics, logs, and other arbitrary data.
https://influxdata.com/telegraf
MIT License
14.6k stars 5.57k forks source link

Kafka Producer & Consumer - Topic based routing based on criteria specified in an expression #4170

Closed edbernier closed 4 years ago

edbernier commented 6 years ago

Feature Request

Proposal:

Wayfair is requesting this capability to enable smarter routing of Kafka traffic to various Topics. Telegraf Kafka Consumer could then direct the data to different InfluxDB instances based on the Topic.

Below is an example of what this would look like: We are looking to have flexible Kafka routing engine for both the Telegraf Kafka consumer and producer.

For Kafka Producer…

We are looking for something that works sort of like a map data structure (or something like a case statement). Imagine the following…

TOPIC_ROUTES: CRITERIA_SET_1: TOPIC(S) CRITERIA_SET_2: TOPIC(S) CRITERIA_SET_3: TOPIC(S) DEFAULT: TOPIC(S)

NOTE: we want the option of using measurement name or tag based criteria sets. Note we may want the flexibility of replicating data to more than one topic.

For the Kafka Consumer we are looking for a routing scheme that will enable us to specify a specific cluster and DB destination per topic set (not necessarily a v1 feature)

            NOTE: HOW CAN WE COMBINE TOPIC AND MEASUREMENT_TAG based filtering?

                  DB_ROUTES:
                      TOPIC_AND_MEASUREMENT_TAG_SET_1:
                          DB(s)
                      TOPIC_AND_MEASUREMENT_TAG_SET_1:
                          DB(s)
                       DEFAULT:
                          DB(s)

Note we may want the flexibility of replicating data to more than one db (not necessarily a v1 feature)

It would allow Telegraf to direct data to appropriate InfluxDB cluster or instance. A very powerful feature which will greatly simplify scaling up or segmenting workloads.

danielnelson commented 6 years ago

On the Kafka output, have you tried using the topic_suffix options? This will allow you to set the topic using the measurement or a set of tags. This can be combined with measurement filtering and multiple Kafka outputs to have a fair bit of flexibility.

For the Kafka input, you can use the tags option on the input to set static tags and combine this with multiple inputs. These tags can be used for routing using measurement filtering options, and removed before output using tagexclude.

Take a look at these and let me know what cases they are not handling.

JimHagan commented 6 years ago

@danielnelson Topic suffix tags do not meet our needs for the Kafka output plugin. We have tried it and we are looking to have more flexibility as we

  1. Cannot always control what tags metrics will contain in them
  2. We need a fallback topic which is not supported by the current capability as we cannot guarantee a topic will always exist.

The suggestion for the Kafka input may be workable. I will look into it.

danielnelson commented 6 years ago

Here is a good example of tag based routing, it is not as direct as what is suggested in that issue, but it ends up being very flexible.

As a potential workaround for topic selection until we can add something more elegant, you could try using the measurement filtering and multiple kafka outputs:

[[outputs.kafka]]
  brokers = ["localhost:9092"]
  topic = "telegraf"

  [outputs.kafka.topic_suffix]
    method = "tags"
    keys = ["foo", "bar"]
    separator = "_"

  # must have foo and bar tags
  [outputs.kafka.tagpass]
    foo = ["*"]
    bar = ["*"]

[[outputs.kafka]]
  brokers = ["localhost:9092"]
  topic = "telegraf"

  # must not have foo and bar tags
  [outputs.kafka.tagdrop]
    foo = ["*"]
    bar = ["*"]
sgtsquiggs commented 5 years ago

@danielnelson your example is invalid (maybe it wasn't at the time?) because tagpass/tagdrop is OR and not AND according to the docs at https://docs.influxdata.com/telegraf/v1.9/administration/configuration/#aggregator-configuration-examples

danielnelson commented 5 years ago

Yes that's a good point, the comments should read "must have foo or bar tags"

JimHagan commented 5 years ago

I was the originator of the requested use case. We are going to be posting a pull request at some point soon that has a somewhat generic implementation where we use measurement name regexp as the criteria to drive the mapping to topic name. @scottgrover Will be working on it for us.

danielnelson commented 5 years ago

Sounds good, update us with a sample of what the configuration design could be before getting to deep into implementation.

scottgrover commented 5 years ago

Hi there,

After reviewing how the kafka output plugin works, I was thinking about adding the logic Jim and I discussed in the GetTopicName function.

We want to add something similar to the topic_suffix logic, possibly named topic_routing to avoid any naming confusion. It's possible this could grow to multiple types of routing but we'd like to start out with regex based routing to start.

Here's an example of what the configuration could look like. Let me know your thoughts.

[kafka.output.topic_routing]
method = "regex"

[[route]]
# Route measurement names ending in 0-9 to topic1
routeregex = "measurement_\d{1}\b"
topicname = "topic1"

[[route]]
# Route measurement name ending in double digits to topic2
routeregex = "measurement_\d{2}\b"
topicname = "topic2"

[[route]]
# Route measurement name starting with 3 a's to topic3
routeregex = "[a]{3}.*"
topicname = "topic3"

Scott

danielnelson commented 5 years ago

This type of functionality, routing based on the metric data, is useful in many or even most of the outputs,. This makes me think that whatever solution we do should be pulled back into the filtering options and/or processors. I feel the same way about the topic_suffix work from before as well.

In these examples, I notice that the topic is static, so it should be possible to do something today like:

[[outputs.kafka]]
  topic = "topic1"
  namepass = ["measurement_[0-9]"]
[[outputs.kafka]]
  topic = "topic2"
  namepass = ["measurement_[0-9][0-9]"]
[[outputs.kafka]]
  topic = "topic3"
  namepass = ["aaa*"]

Of course these are just examples, and obviously the glob syntax is not as advanced as a regular expressions. Can you describe some cases where you are routing off of the measurement but globs are not powerful enough?

Generally, I think of the measurement name to be similar to the name of a class or struct, and so it wouldn't have a ton of variation, could you give me some more information on how they are being assigned?

scottgrover commented 5 years ago

I agree with what you say about the measurement name. We don't have too much variation. Most of our measurement names are straight forward, containing strings separated by anything allowed by influx. Examples: _, - or .

Some example topics are: Wayfair.TRT.Airship Loyalty.airtime_procedures kltk.rest.services.resources.supplier_shipment shipment-center

Some questions:

  1. If we were to use namepass as above, what is the relative overhead of using multiple output pipelines?
  2. When using multiple plugins, is there a single plugin process that handles the events or multiple plugins, operating concurrently, for each of the output plugins defined? In other words, using your example above, would 3 plugin processes be created?

We're under the impression that multiple Kafka output pipelines, and namepass to forward the metrics to said plugins, will have a larger overhead than a single output plugin with routing logic discussed embedded into it. Especially if we move towards pulling 50+ metrics out of our default topic, and routing them to their own kafka topics.

At the same time, when I look at the code I'm under the understanding that when the namepass evaluation passes and the topic gets set, that, if no topic suffix is configured, it would hit the default case in GetTopicName and be routed to the correct topic configured by the namepass with minimal overhead. The only reason I doubt this is I have yet to dig much deeper into how telegraf initializes these output plugins, especially in the case that there's multiple output plugins defined.

If there is large overhead with multiple output pipelines, we'll be moving forward with developing routing within the Kafka output plugin as we're looking towards moving a large amount of metrics (50+) into their own topics.

danielnelson commented 5 years ago

This does define multiple independent outputs, and there are some consequences from moving to multiple plugins. The biggest one is that each output plugin has its own metric buffer.

Separate metric buffers means can set the buffer limit and batch size separately for each plugin if needed. The memory use would potentially be higher, as I assume that the sum of the buffer limits would probably be larger than a single shared buffer. When used with the kafka_consumer though, the number of metrics read on the input would stay the same, limited by max_undelivered_messages.

If you have output plugins that handle very few messages, they could potentially not fill their batch as quickly which would cause them to wait until their next flush interval. Recall that output plugins write after they receive metric_batch size new metrics or the flush interval is reached, whichever comes first. For the best throughput, it may be required to increase the max_undelivered_messages in the input to ensure writes are always occurring.

You would also have connections for each output plugin to Kafka, in this example it would have 3 times the connections.

Each output does create an extra goroutine, but the extra overhead of this is very minor, as is the extra memory used by the plugins. These are not worth considering compared to the overall cost of the system. Namepass also uses globs which are faster than regular expressions in Go, but I think this would also be a minor difference.

If nothing above sounds too problematic, I suggest trying out this method first on a few topics since support for it already exists. If it doesn't work well for some reason and we can't think of an easy resolution, I think it would make sense for us to design a processor that adds a tag to metric based on a pattern, and then add topic_tag option to the kafka output that looks up the value of the tag to use as the topic name and optionally removes the tag before writing.

scottgrover commented 5 years ago

I think we can deal with the memory increase, and the metric buffer issue you mentioned. The metrics we're going to be pushing into separate topics are the ones that send at a ridiculously high rate, not something that we'll be waiting on the metric buffer to get filled.

I don't believe adding more processor plugins into our pipeline will work for our use case. We have a huge influx pipeline. @JimHagan keep me honest, but I don't think it'd be much of a stretch to say its one of the largest.

I've forked telegraf, so I can push my code publicly, and pushed a small change to the kafka output plugin that allows the functionality we're looking for. It's in its infancy, as we're still testing it on our pipeline and looking to add to its current design. But @danielnelson I'd love your perspective on how we're approaching it. I still need to write the unit test for my addition, but should be able to handle get that pushed tomorrow or early Thursday.

Let me know your thoughts: https://github.com/scottgrover/telegraf

danielnelson commented 5 years ago

Code looks good, my only suggestion is that you will want to use metric.GetTag() instead of metric.Tags() since the later is quite expensive and will likely be removed at some point.