redpanda-data / connect

Fancy stream processing made operationally mundane
https://docs.redpanda.com/redpanda-connect/about/
8.1k stars 826 forks source link

nats input: acknowledging messages breaks other jetstream clients #1772

Open pcosquer opened 1 year ago

pcosquer commented 1 year ago

Hello, I'm running nats in a container, jetstream is enabled:

docker run -p 4222:4222 -p 8222:8222 -p 6222:6222 --rm -ti nats:2.9.0 -js

Benthos is listening on all topics:

input:
  nats:
    urls:
      - nats://localhost:4222
    subject: ">"
pipeline:
  processors:
    - bloblang: |
        root.event = this.catch(deleted())
        root.subject = meta("nats_subject")
output:
  label: ""
  stdout:
    codec: lines

When benthos is running, it breaks other jetstream clients

For example, this go program


package main

import (
    "fmt"

    "github.com/nats-io/nats.go"
)

func main() {
    nc, _ := nats.Connect(nats.DefaultURL)

    js, _ := nc.JetStream()

    for i := 0; i < 100; i++ {
        _, err := js.AddStream(&nats.StreamConfig{
            Name:     "ORDERS",
            Subjects: []string{"ORDERS.*"},
        })
        if err != nil {
            fmt.Printf("Error while adding stream %d: %s\n", i, err)
        }
        err = js.DeleteStream("ORDERS")
        if err != nil {
            fmt.Printf("Error while deleting stream %d: %s\n", i, err)
        }
    }
    nc.Drain()

    nc.Close()
}

will print some errors:

Error while deleting stream 0: invalid character '+' looking for beginning of value
Error while deleting stream 1: invalid character '+' looking for beginning of value
Error while deleting stream 2: invalid character '+' looking for beginning of value
Error while adding stream 3: invalid character '+' looking for beginning of value
Error while deleting stream 4: invalid character '+' looking for beginning of value

With nats-py, error is different

import asyncio
import nats

async def main():
    nc = await nats.connect("nats://localhost")
    js = nc.jetstream()

    for i in range(10000):
        await js.add_stream(name=f'test{i}', subjects=[f'hello{i}'])
        await js.delete_stream(f'test{i}')

    await nc.close()

if __name__ == '__main__':
    asyncio.run(main())
Traceback (most recent call last):
  File "/home/pcosquer/.config/JetBrains/GoLand2022.3/scratches/scratch_6.py", line 16, in <module>
    asyncio.run(main())
  File "/usr/lib64/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib64/python3.9/asyncio/base_events.py", line 647, in run_until_complete
    return future.result()
  File "/home/pcosquer/.config/JetBrains/GoLand2022.3/scratches/scratch_6.py", line 10, in main
    await js.delete_stream(f'test{i}')
  File "/home/pcosquer/.cache/pypoetry/virtualenvs/bootstrapper-e4v8o1B6-py3.9/lib/python3.9/site-packages/nats/js/manager.py", line 120, in delete_stream
    resp = await self._api_request(
  File "/home/pcosquer/.cache/pypoetry/virtualenvs/bootstrapper-e4v8o1B6-py3.9/lib/python3.9/site-packages/nats/js/manager.py", line 356, in _api_request
    resp = json.loads(msg.data)
  File "/usr/lib64/python3.9/json/__init__.py", line 346, in loads
    return _default_decoder.decode(s)
  File "/usr/lib64/python3.9/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/lib64/python3.9/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

If I dump all messages with nats-sub, I can see that benthos sends ack messages to other client inboxes:


[#81997] Received on "$JS.API.STREAM.DELETE.test0" with reply "_INBOX.u1c8utbNOdyURPyyDk0s3K.u1c8utbNOdyURPyyDk0s8W9557"

[#81998] Received on "$JS.EVENT.ADVISORY.STREAM.DELETED.test0"
{"type":"io.nats.jetstream.advisory.v1.stream_action","id":"GvcWqh1s3Bpf7owyQ9Ws27","timestamp":"2023-02-23T14:03:51.28633808Z","stream":"test0","action":"delete"}

[#81999] Received on "_INBOX.u1c8utbNOdyURPyyDk0s3K.u1c8utbNOdyURPyyDk0s8W9557"
+ACK

There is an issue if benthos acknowledges the message before nats. I don't know why I only see ack on _inbox subjects.

I'm using this branch https://github.com/benthosdev/benthos/pull/1769, as it fixes issue with the flood of stream requests.

A workaround is to listen on a more specific subject.

codegangsta commented 1 year ago

Hey. I think I know what's going on here, I'll put up a fix sometime today.

The core NATS client input and output shouldn't be sending Acks, as it's a fire and forget messaging protocol. Only the JetStream based components should be sending acks in this case.

gedw99 commented 1 year ago

Hey. I think I know what's going on here, I'll put up a fix sometime today.

The core NATS client input and output shouldn't be sending Acks, as it's a fire and forget messaging protocol. Only the JetStream based components should be sending acks in this case.

Thanks , I had same problem a while back

codegangsta commented 1 year ago

So after digging into this a little bit today, and the history why this was added, it may make sense to either keep it or evolve it into something configuration driven. We probably can't break existing clients that rely on this, so maybe we add a flag to be able to disable replies on a NATS input.

gedw99 commented 1 year ago

I was thinking a flag also.

We would have to also change the benthos syntax for using the nats plugin ?

Kimamisa commented 1 year ago

Hi, As being said, if following the spec, there should not be any ACK sent in the nats-core protocol. For the existing client, maybe a 2-steps approach with a warning to have them upgrade their connector to the jetstream one can be an option ?

gedw99 commented 1 year ago

Hi, As being said, if following the spec, there should not be any ACK sent in the nats-core protocol. For the existing client, maybe a 2-steps approach with a warning to have them upgrade their connector to the jetstream one can be an option ?

Presuming you mean new nats client API ? github.com/nats-io/nats.go/jetstream at https://github.com/nats-io/nats.go/tree/main/jetstream

Kimamisa commented 1 year ago

Sorry, typo, I meant clientS as in users

tommylp commented 3 months ago

We would have to also change the benthos syntax for using the nats plugin ?

@gedw99 What do you mean by changing the syntax?