nats-io / nats-kafka

NATS to Kafka Bridging
Apache License 2.0
131 stars 32 forks source link

Add Function To Get Value From Kafka Header With Given Key #106

Closed chenmx98 closed 2 months ago

chenmx98 commented 2 months ago

What motivated this proposal?

The current nats-kafka-bridge is missing the capability to demux messages from kafka to nats based on the kafka messages' header values. With this change, we can use nats-kafka-bridge as a demuxer by further specifying the patterns or criterions in the kafka message header.

What is the proposed change?

In server/core/connector.go:553 add:

        "getKafkaHeaderValue": func(key string, headers []sarama.RecordHeader)
         string {
            for _, header := range(headers) {
                if string(header.Key) == key {
                    return string(header.Value)
                }
            }
            return ""
        },

Who benefits from this change?

Anyone who is using nats-kafka-bridge as a kafka message demuxer to many nats subjects with similar patterns.

Example

connect: [
  {
      type: "KafkaToNATS",
      brokers: ["localhost:9092"]
      id: "foo",
      topic: "bar",
      subject: "Subject-{{ .Headers | getKafkaHeaderValue \"baz\"}}",
  },
]

No response

chenmx98 commented 2 months ago

opened a pr #107