nats-io / nats-kafka

NATS to Kafka Bridging
Apache License 2.0
131 stars 32 forks source link

JetStreamToKafka keytype subject is using subject from config instead of message #108

Open pmysinski opened 1 month ago

pmysinski commented 1 month ago

What version were you using?

nats-kafka: 1.4.2 nats-server: v2.9.7

What environment was the server running in?

NATS docker image: nats:2.9.7-alpine3.16 Nats-kafka docker image: natsio/nats-kafka:1.4.2

Is this defect reproducible?

Using configuration of connector:

# The interval to wait to reconnect
reconnectinterval: 5000,

# The global connection timeout
connecttimeout: 5000,

nats: {
  Servers: ["nats:4222"],
  ConnectTimeout: 5000,
  MaxReconnects: 120,
  ReconnectWait: 5000,
}

connect: [
 {
      type: "JetStreamToKafka",
      brokers: ["kafka:29092"],
      durablename: "JetStreamToKafka_durablename",
      topic: "test-topic",
      subject: "test-subject.*.*.params",
      keytype: "subject",
  },
],

Publish 1 message to NATS jetstream on subject "test-subject.test.123.params" We can observe message on Kafka topic with key: "test-subject...params"

Given the capability you are leveraging, describe your expectation?

I expect that messages published to Kafka topic have key equal to NATS message subject. In the example if I publish to "test-subject.test.123.params" I expect kafka message key to be the same. and not just copy of subject that it is subcribed to (which can be a wildcard)

Given the expectation, what is the defect you are observing?

As the doc says about keytype:

subject - the subject the incoming NATS message was sent on is used as the key

It is a defect on line: https://github.com/nats-io/nats-kafka/blob/a2281b503cc56e439f85e6a1105506c8f1e88b76/server/core/connector.go#L400