benthosdev / benthos

Fancy stream processing made operationally mundane
https://www.benthos.dev
MIT License
7.68k stars 752 forks source link

Add nats processor that uses request reply #2504

Closed hooksie1 closed 3 weeks ago

hooksie1 commented 1 month ago

This is related to #2501. It adds a processor for NATS request/reply similar to how the HTTP processor works.

tagging @codegangsta

Jeffail commented 1 month ago

Hey @hooksie1, is this implementing similar behaviour as https://github.com/benthosdev/benthos/pull/2354?

hooksie1 commented 1 month ago

Ah yeah looks like it. I think neither me nor @codegangsta nor @mihaitodor realized that one existed. I can close in favor of that PR if that's what's wanted.

mihaitodor commented 1 month ago

Oops, my bad. Completely forgot there was another PR that's similar. Sorry about that!

hooksie1 commented 1 month ago

No it's fine. I can move the testing over to that PR since it doesn't have any. Let me know what you want me to do. The other might need updated since it's a few months old

Jeffail commented 1 month ago

@hooksie1 yeah I've merged the original PR, it'd be good to get some tests and general improvements on top of it, are you happy to rebase and adapt your PR?

hooksie1 commented 1 month ago

Yeah I can do that.

hooksie1 commented 1 month ago

ok, I think I got it. I had to modify the request reply processor a tiny bit. The way it was previously, I was getting a result store not found within batch context error from the processor. I switched from using that convert message function to just copying the message and then returning that in the messageBatch. Let me know if that's wrong, but I couldn't get it to work the other way. Fwiw this is the config I was using:

input:
  http_server:
    path: /
    allowed_verbs:
      - POST
    timeout: 5s
pipeline:
  processors:
    - switch:
      - check: '@http_server_request_path == "/test"'
        processors:
          - mapping: |
              root = json()
              root.title = "test"
          - nats_request_reply:
              urls: ["nats://localhost:4222"]
              subject: "test.thing"
output:
  sync_response: {}
hooksie1 commented 4 weeks ago

@Jeffail let me know if you need me to do anything else!