redpanda-data / connect

Fancy stream processing made operationally mundane
https://docs.redpanda.com/redpanda-connect/about/
8.13k stars 831 forks source link

Redis Streams - Input #1888

Closed lefig closed 1 year ago

lefig commented 1 year ago

Hi Benthos Community,

I have a question because I am stumped on how to get Redis streams working as an input feed and I can't help thinking that I am doing something silly.

Redis pubsub works fine and I am running on Windows.

input: redis_streams: url: tcp://localhost:6379

consumer_group: avg

start_from_oldest: true
streams: [telemetry]
client_id: my_consumer
body_key: data
create_streams: true

pipeline: processors:

Config fields, showing default values

- log:
    level: DEBUG
    fields_mapping: ""
    message: "Received input message"

output: stdout: codec: lines

logger: level: DEBUG format: logfmt add_timestamp: true static_fields: '@service': redis file: path: "C:\Work\Temp\logs\log.txt"

I would appreciate any suggestions or guidance.

Thanks

mihaitodor commented 1 year ago

Hey @lefig, thanks for reaching out! It's unclear from your description what issue you're seeing with redis_streams. Are you getting any errors? Do you have any logs that you can share?

lefig commented 1 year ago

Sure thing:) and thank you for taking a look. The baffling thing is that I don't see any errors in the logs and its as if the streams are not being received.

time="2023-05-07T14:39:16+01:00" level=info msg="Running main config from specified file" @service=redis path="c:\Work\Tools\Benthos\redis\config-redis.yaml" time="2023-05-07T14:39:16+01:00" level=info msg="Listening for HTTP requests at: http://0.0.0.0:4195" @service=redis time="2023-05-07T14:39:16+01:00" level=info msg="Launching a benthos instance, use CTRL+C to close" @service=redis time="2023-05-07T14:39:16+01:00" level=info msg="Receiving messages from Redis streams: [telemetry]" @service=redis label="" path=root.input time="2023-05-07T14:39:49+01:00" level=info msg="Received SIGINT, the service is closing" @service=redis time="2023-05-07T14:39:49+01:00" level=debug msg="Waiting for pending acks to resolve before shutting down." @service=redis label="" path=root.input time="2023-05-07T14:39:49+01:00" level=debug msg="Pending acks resolved." @service=redis label="" path=root.input

mihaitodor commented 1 year ago

From that log, all I can say is that it connects to Redis Streams just fine, but it doesn't receive any messages from it. If you take a look here at the Benthos integration test for this input (and corresponding output), you'll see that it works fine with a Docker container that gets spun up via https://github.com/ory/dockertest. Maybe the tool you're using to push messages to Redis isn't working correctly, not sure.

lefig commented 1 year ago

So what I have been doing is checking that the streams are being pushed to Redis using a C# receiver app to log and I double check on the gui.

image

I would if this is perhaps related to Windows security. I don't know. But Redis pubsub works fine which is odd. My setup is the Redis Mod Docker container and I will check logs.

Thanks for mentioning about the integration test - I will investigate further.:)

lefig commented 1 year ago

Fabulous, I have found the problem and it works like a treat! The body key setting was invalid.

Ans thank you for the kind support.