nats-io / nats-kafka

NATS to Kafka Bridging
Apache License 2.0
131 stars 32 forks source link

Cannot create queued subscribers while connecting Jetstream. JetstreamToKafka does not support it? #88

Closed aonurdemir closed 7 months ago

aonurdemir commented 1 year ago

I am using this config.

 {
      "type": "JetStreamToKafka",
      "brokers": [
        "kafka01:9092",
        "kafka02:9092",
        "kafka03:9092",
        "kafka04:9092"
      ],
      "id": "id-test-4",
      "topic": "cdc_test_4",
      "subject": "cdc_test_4.*",
      "durablename": "durable_cdc_test_4",
      "queuename": "durable_cdc_test_4_queue"
    }

When I create 3 bridge containers via docker, the messages coming into jetstream are tripled in kafka. I tried with 2 different jetstream streams as below:

js.AddStream(
        &nats.StreamConfig{
            Name:     opts.StreamName,
            Subjects: []string{fmt.Sprintf("%s.*", opts.StreamName)},
            Storage:  nats.FileStorage,
        },
    )
js.AddStream(
        &nats.StreamConfig{
            Name:      opts.StreamName,
            Subjects:  []string{fmt.Sprintf("%s.*", opts.StreamName)},
            Storage:   nats.FileStorage,
            Retention: nats.WorkQueuePolicy,
        },
    )

Result did not change.

However, in another context where I dont use jetstream and just using pub-sub, NatsToKafka works fine with below config:

{
      "type": "NATSToKafka",
      "brokers": [
        "kafka01:9092",
        "kafka02:9092",
        "kafka03:9092",
        "kafka04:9092"
      ],
      "id": "nats_user_network_quality",
      "topic": "nats_user_network_quality",
      "subject": "data.user_network_quality.subject",
      "queuename": "data.user_network_quality.subject.q1"
}

I want to keep messages in jetstream in case all consumers fail. After coming back, they should be able to continue where they left. (hoping to provide this by durable consumers) I want also fault tolerance and want to run 3 bridge containers on 3 different machines making 9 client consumers. Messages in jetstream should be pushed to only one of them in a circular manner which nats will adjust.

Could please help with the configuration? or does this project not support this kind of configuration?

As far as I investigate the code, there is only one option to subscribe to jetstream:

    return conn.bridge.JetStream().Subscribe(subject, callback, options...)

so it does not seem possible. Right?