risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
6.88k stars 569 forks source link

Imbalanced kafka source actors' throughput when running nexmark benchmark #5214

Open lmatz opened 2 years ago

lmatz commented 2 years ago

This is a simple Nexmark q10 query with 12 parallelism on a single computing node with 16 CPU.

The number of records in each Kafka partition: SCR-20220908-fot (1)

The throughput of each source connector: SCR-20220908-fqq

If the speed of source connectors does not match with each other, and if the job has event time and watermark, then the job will probably throw away more events than usual because one partition may make the watermark progress quickly and thus other partitions' events get discarded more aggressively.

Some random ideas: throttle some source actors by considering both the current throughput and the value of the event timestamp.

BugenZhao commented 2 years ago

Is the throughput able to be automatically synced with back pressure? Note that the current bound of the connector message buffer of 512 chunks is too large.

https://github.com/risingwavelabs/risingwave/blob/761b1ba2eb5c1e76e97b9b98447e6dc6f545e80d/src/source/src/connector_source.rs#L69

lmatz commented 2 years ago

Some more context The plan is:

StreamMaterialize { columns: [auction, bidder, price, date_time, date, time, _row_id(hidden)], pk_columns: [_row_id] }
   StreamExchange { dist: HashShard(_row_id) }
     StreamProject { exprs: [Field(bid, 0:Int32), Field(bid, 1:Int32), Field(bid, 2:Int32), Field(bid, 5:Int32), ToChar(Field(bid, 5:Int32), 'YYYY-MM-DD':Varchar), ToChar(Field(bid, 5:Int32), 'HH:MI':Varchar), _row_id] }
       StreamSource { source: "nexmark_source", columns: [event_type, person, auction, bid, _row_id] }

But both StreamMaterialize and StreamExchange do nothing in this case as their code of processing data chunk is commented out.

Every actor (with its corresponding source) should be on its own.

fuyufjh commented 2 years ago

throttle some source actors by considering both the current throughput and the value of the event timestamp.

Should it be applied to NexMark source only or all kinds of sources?

lmatz commented 2 years ago

throttle some source actors by considering both the current throughput and the value of the event timestamp.

Should it be applied to NexMark source only or all kinds of sources?

Currently, it does not block doing benchmark on NexMark sources as the total throughput aggregated from all the source actors is normal, I feel it should be applied to all kinds of sources as long as the job involves watermark?

lmatz commented 2 years ago

Another example: SCR-20220909-jqs

No matter high throughput or low throughput, both are quite stable.

github-actions[bot] commented 1 year ago

This issue has been open for 60 days with no activity. Could you please update the status? Feel free to continue discussion or close as not planned.

BugenZhao commented 1 year ago

Will the situation be better after #6170? I'd like to try it out.

BugenZhao commented 1 year ago

The throughputs are at least really balanced if 3 nodes running on the same host. 🤔

image
lmatz commented 1 year ago

The setting I tested is a single-node setting, so there should be no remote exchange but only local exchange. I am uncertain whether this should be expected to be solved in #6170?

BugenZhao commented 1 year ago

The setting I tested is a single-node setting

That's weird. 😕

lmatz commented 1 year ago

The setting I tested is a single-node setting

That's weird. 😕

No worries, let me track this issue manually when we have the performance dashboard.

So push to the release where performance dashboard is in place.

BugenZhao commented 1 year ago

Related: https://github.com/risingwavelabs/risingwave/issues/8451#issuecomment-1475819678

lmatz commented 1 year ago

SCR-20230322-ikc

Grafana: https://g-2927a1b4d9.grafana-workspace.us-east-1.amazonaws.com/d/EpkBw5W4k/risingwave-bench-dashboard?orgId=1&from=1679413185000&to=1679414780000&to=&var-namespace=nexmark-bs-0-1-2-3-4-daily-20230321

lmatz commented 1 year ago

SCR-20230324-5m

seems pretty easy to reproduce, almost every day recently

BugenZhao commented 1 year ago

Per some discussions with @liurenjie1024 and @hzxa21, I tend to think that this is an issue of Kafka (server or client, both possible) when the workload is heavy. 😟

I'll keep investigating if I get a new idea, but I guess we need some metrics of the Kafka cluster in order to make things clearer. 🤔

lmatz commented 1 year ago

The EBS bandwidth of the Kafka machine is likely limited. SCR-20230327-hl2 SCR-20230327-hkx

The maximum bandwidth of GP2 and the default bandwidth of GP3 are both smaller than the 298MB/s shown in the figure.

Regarding why the throughput during the second half does not go up, probably because there are some extra IO credits at the beginning, and then we need to pay them back. This is hust a guess.

BugenZhao commented 1 year ago

Yes. I also suspected that the bottleneck between Kafka and the source leads to this issue. But there's another remaining problem of why there's the imbalanced performance for different partitions under disk throughput. 😄 Guess it's caused by the internal implementation of Kafka, which needs further investigation.

lmatz commented 1 year ago

But the weird thing is that if looking at the history of q2, it is less severe than q0 q2 does more filtering than q0.

The good news is that q2 can achieve 1M rows/s if this phenomenon does not happen.

And also the client of Kafka since Flink seems not to suffer from the same problem. It uses Java native client while rdkafka is an independent implementation.

BugenZhao commented 1 year ago

But the weird thing is that if looking at the history of q2, it is less severe than q0 q2 does more filtering than q0.

I took a quick look and found that this happens randomly for all queries that reach the throughput of around 800k rows/s, like q0 q1 q3 q10 q22. All imbalances occur when the bytes per sec is under 300 MB/s.

shanicky commented 1 year ago

Can we get the split distribution of each actor? may be we can see the information from the logs.

BugenZhao commented 1 year ago

Can we get the split distribution of each actor? may be we can see the information from the logs.

The splits are assigned equally. See the legends of "Source Throughput Per Partition". 🤔