risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
7.04k stars 579 forks source link

scheduler: determine the source parallelism according to the connector split num #8451

Open BugenZhao opened 1 year ago

BugenZhao commented 1 year ago

When we're creating a streaming job, we can let the connector splits (at that time) instruct the parallelism of the source operator.

let split_num = source.discovered_splits().len();
let default_parallelism = specified_parallelism.unwrap_or(cluster.parallel_units().len());

let source_parallelism = min(split_num, default_parallelism);

If there's no future split change or scaling, we can prevent the existence of "idle sources" by doing this, which is crucial for ensuring the proper behavior of the global watermark alignment. Recall that no update from one arm of the merge input leads to no new watermark message yielded. (https://github.com/risingwavelabs/rfcs/blob/51ae8d99fe1522252e03dbe61302410bf33da4cb/rfcs/0016-watermark-operators-explained.md?plain=1#L77-L79)

Note that it's also possible that a split has no data, but we can treat it as an external issue.

Further: This does not resolve the root problem. We may...

st1page commented 1 year ago

LGTM. And to do it better, when scaling out of the external source(ADD a split) happens, we need to do scale out on the source executor's actor if the new split number is less than cluster.parallel_units().

BugenZhao commented 1 year ago

Back to the issue of idle sources:

By generating (local) watermarks for each actor, we're actually making the filter less stricter than in the sequential manner. So there's no correctness issue if we simply exclude some upstream in the merger when aligning the watermarks: even if there's new data from the excluded upstream after a while, we're free to filter it based on the watermark merged from other upstreams, as there was indeed a message with a larger timestamp comes from this source.

So what we need to do might be allowing a second-phase filter in (or after) the merger. 🤔

github-actions[bot] commented 4 months ago

This issue has been open for 60 days with no activity.

If you think it is still relevant today, and needs to be done in the near future, you can comment to update the status, or just manually remove the no-issue-activity label.

You can also confidently close this issue as not planned to keep our backlog clean. Don't worry if you think the issue is still valuable to continue in the future. It's searchable and can be reopened when it's time. 😄