warpstreamlabs / bento

Fancy stream processing made operationally mundane. This repository is a fork of the original project before the license was changed.
https://warpstreamlabs.github.io/bento/
Other
990 stars 56 forks source link

Bug streams mode API #69

Closed emmanuelbertho closed 2 months ago

emmanuelbertho commented 3 months ago

Hello,

I noticed a bug than when I am using the API in streams mode. I created a stream with the /POST/{id} endpoint. The stream contains a broker output with two outputs, with one of the two outputs being in error. If I call /PUT/{id} or /DELETE/{id} after the call will hang on forever. Cancelling and retrying a second time the call works. If the two outputs in the broker are up and not in error the error doesn't appear.

Example of stream { "active": true, "uptime": 6.083234123, "uptime_str": "6.083234192s", "config": { "buffer": { "none": { } }, "input": { "redis_streams": { "body_key": "data", "streams": [ "value-change-Emmanuel" ], "url": "redis://redis:6379" } }, "output": { "broker": { "outputs": [ { "sql_insert": { "args_mapping": "[ this.timestamp, this.data.string() ]", "columns": [ "timestamp", "data" ], "driver": "postgres", "dsn": "postgres://postgres:admin@timescaledb:5432/?database=test&sslmode=disable", "init_statement": "CREATE TABLE IF NOT EXISTS rere (\n timestamp TIMESTAMPTZ NOT NULL,\n data jsonb NOT NULL);\nSELECT create_hypertable('rere', by_range('timestamp'));", "table": "rere" } }, { "sql_insert": { "args_mapping": "[ this.timestamp, this.data.string() ]", "columns": [ "timestamp", "data" ], "driver": "postgres", "dsn": "postgres://postgres:admin@localhost:5432/?database=test&sslmode=disable", "init_statement": "CREATE TABLE IF NOT EXISTS rere (\n timestamp TIMESTAMPTZ NOT NULL,\n data jsonb NOT NULL);\nSELECT create_hypertable('rere', by_range('timestamp'));", "table": "rere" } } ], "pattern": "fan_out" } }, "pipeline": { "processors": [ { "mapping": "\r\n root.data = this\r\n root.data.\"\" = deleted()\r\n root.timestamp = now()" } ] } } }

Stats for the stream: { "input_connection_failed{label=\"\",path=\"root.input\",stream=\"Emmanuel\"}": 0, "input_connection_lost{label=\"\",path=\"root.input\",stream=\"Emmanuel\"}": 0, "input_connection_up{label=\"\",path=\"root.input\",stream=\"Emmanuel\"}": 1, "input_latency_ns{label=\"\",path=\"root.input\",stream=\"Emmanuel\"}": { "p50": 0, "p90": 0, "p99": 0 }, "input_received{label=\"\",path=\"root.input\",stream=\"Emmanuel\"}": 9, "output_batch_sent{label=\"\",path=\"root.output.broker.outputs.0\",stream=\"Emmanuel\"}": 3, "output_batch_sent{label=\"\",path=\"root.output.broker.outputs.1\",stream=\"Emmanuel\"}": 0, "output_connection_failed{label=\"\",path=\"root.output.broker.outputs.0\",stream=\"Emmanuel\"}": 0, "output_connection_failed{label=\"\",path=\"root.output.broker.outputs.1\",stream=\"Emmanuel\"}": 0, "output_connection_lost{label=\"\",path=\"root.output.broker.outputs.0\",stream=\"Emmanuel\"}": 0, "output_connection_lost{label=\"\",path=\"root.output.broker.outputs.1\",stream=\"Emmanuel\"}": 0, "output_connection_up{label=\"\",path=\"root.output.broker.outputs.0\",stream=\"Emmanuel\"}": 1, "output_connection_up{label=\"\",path=\"root.output.broker.outputs.1\",stream=\"Emmanuel\"}": 1, "output_error{label=\"\",path=\"root.output.broker.outputs.0\",stream=\"Emmanuel\"}": 0, "output_error{label=\"\",path=\"root.output.broker.outputs.1\",stream=\"Emmanuel\"}": 26, "output_latency_ns{label=\"\",path=\"root.output.broker.outputs.0\",stream=\"Emmanuel\"}": { "p50": 1919244, "p90": 4668954, "p99": 4668954 }, "output_latency_ns{label=\"\",path=\"root.output.broker.outputs.1\",stream=\"Emmanuel\"}": { "p50": 0, "p90": 0, "p99": 0 }, "output_sent{label=\"\",path=\"root.output.broker.outputs.0\",stream=\"Emmanuel\"}": 3, "output_sent{label=\"\",path=\"root.output.broker.outputs.1\",stream=\"Emmanuel\"}": 0, "processor_batch_received{label=\"\",path=\"root.pipeline.processors.0\",stream=\"Emmanuel\"}": 8, "processor_batch_sent{label=\"\",path=\"root.pipeline.processors.0\",stream=\"Emmanuel\"}": 8, "processor_error{label=\"\",path=\"root.pipeline.processors.0\",stream=\"Emmanuel\"}": 0, "processor_latency_ns{label=\"\",path=\"root.pipeline.processors.0\",stream=\"Emmanuel\"}": { "p50": 45404.5, "p90": 65636, "p99": 65636 }, "processor_received{label=\"\",path=\"root.pipeline.processors.0\",stream=\"Emmanuel\"}": 8, "processor_sent{label=\"\",path=\"root.pipeline.processors.0\",stream=\"Emmanuel\"}": 8, "uptime_ns": 27242374385 }

gregfurman commented 2 months ago

Hi Emmanuel. Thanks for raising this! I'm going to try recreate it locally and update the issue here with some findings.

gregfurman commented 2 months ago

Alright I can't seem to recreate this issue locally. I have a couple of questions:

If I call /PUT/{id} or /DELETE/{id} after the call will hang on forever.

So you're creating that stream with a POST (redis -> bloblang processing -> 2 postgres brokers) and then any subsequent PUT or DELETE request to the same stream ends up hanging forever. I imagine you're using curl for this. Any chance you can run curl -v and paste the logs here?

I think I may understand what is happening here. The stream API docs for PUT says:

The previous stream will be shut down before and a new stream will take its place.

I think there could be an issue with closing the database client for those failed/errored components. Perhaps when the client fails to start up correctly there could be an indefinitely blocking call. Not entirely sure though.

Snippet from sql driver Close() function:

Close closes the database and prevents new queries from starting. Close then waits for all queries that have started processing on the server to finish.

emmanuelbertho commented 2 months ago

I have trouble reproducing it now tough I managed to reproduced it at least 10 times yesterday. I'll send another message if I can find clearly how to reproduce it. Thank you for investigating.

gregfurman commented 2 months ago

Closing with https://github.com/warpstreamlabs/bento/pull/71.

Feel free to re-open if you manage to reproduce it after updating to v1.1.0 of Bento 🍱