risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
6.88k stars 569 forks source link

Discussion: Handling inconsistent streams #14031

Closed kwannoel closed 2 months ago

kwannoel commented 9 months ago

Problems

Solutions

  1. Introduce a testing feature, which adds an extra executor between each executor which stores the state of the stream passing through, and checks for inconsistency. Use it in fuzzing test, e.g. sqlsmith. Then we can find inconsistency bugs. This cannot solve the case in production though.

Any other ideas?

BugenZhao commented 9 months ago

BTW, the internal state / cache inconsistency could also be led by data corruption in the storage layer, which is even harder to debug. 😕 The factor can be eliminated by using an in-memory storage backend under the testing feature. However, this also implies that it cannot be covered in production.

kwannoel commented 9 months ago

The factor can be eliminated by using an in-memory storage backend under the testing feature.

Can you elaborate more on how this can catch data corruption on storage layer? Or what kinds of data corruption it can catch?

BugenZhao commented 9 months ago

The factor can be eliminated by using an in-memory storage backend under the testing feature.

Can you elaborate more on how this can catch data corruption on storage layer? Or what kinds of data corruption it can catch?

IIRC, we once encountered an issue that the file cache was not correctly invalidated after node restart, which caused the read to return totally irrelevant results. As the in-memory state backend is simply enough, I guess we can assume that there's no such issue.

kwannoel commented 9 months ago

Add another reason for inconsistent stream: backwards incompatibility.

If the user is using stable features, it's likely the bug could be triggered by backwards compatibility after upgrade.

kwannoel commented 9 months ago

Another case recently that occurred, which caused hash join state to become inconsistent.

A complementary approach suggested by @fuyufjh . First we tolerate the inconsistency to prevent the cluster from crashing, but leave an error log out.

For example, if a row is to be inserted into the cache, but it already exists, it means that this row has been seen before, and we should skip over it. It should still not happen though, so we should log an error.

fuyufjh commented 9 months ago

+1. I have discussed this with multiple people recently, and at least we all agree that we can hardly do anything when meeting this problem.

kwannoel commented 9 months ago

I will work on the testing part, fuzzing with checks on the data stream.

fuyufjh commented 9 months ago

When talking with @stdrc yesterday, we think we can provide an option like "non-strict mode" to warn these inconsistent problems but not panic. As an option, we will always use "strict mode" for testing; while for production deployment, we can decide to enable or disable case by case.

Some known places that need to be refactored:

There should be more cases... Need to review code to find out.

hzxa21 commented 9 months ago

When talking with @stdrc yesterday, we think we can provide an option like "non-strict mode" to warn these inconsistent problems but not panic. As an option, we will always use "strict mode" for testing; while for production deployment, we can decide to enable or disable case by case.

Some known places that need to be refactored:

  • HashAgg
    src/stream/src/executor/aggregation/agg_group.rs:320:13: row count should be non-negative
  • HashJoin
    src/stream/src/executor/managed_state/join/join_entry_state.rs:43:44 unwrap()
    src/stream/src/executor/managed_state/join/join_entry_state.rs:51:13 pk [...] should be in the cache

There should be more cases... Need to review code to find out.

Let me share more info about the three occurrences of the panics:

  1. join_entry_state.rs:51:13 pk [...] should be in the cache
    • image: nightly-20231123
    • stateful operators involved (no agg):
      • inner join
      • left outer join
  2. agg_group.rs:320:13: row count should be non-negative
    • image v1.5.0
    • stateful operators involved
      • inner join
      • left outer join
      • temporal filter
      • agg
  3. join_entry_state.rs:43:44 unwrap()
    • image v1.4.0
    • stateful operators involved
      • left outer join
      • agg

1 and 2 seem to be double delete while 3 seems to be double insert. I wonder whether it is possible to be caused by temporal filter emitting a previously emitted rows to downstream under some corner cases.

Updated: there is no temporal filter involved in 1 and 3. The only common stateful operator seems to be left outer join

hzxa21 commented 9 months ago

When talking with @stdrc yesterday, we think we can provide an option like "non-strict mode" to warn these inconsistent problems but not panic. As an option, we will always use "strict mode" for testing; while for production deployment, we can decide to enable or disable case by case. Some known places that need to be refactored:

  • HashAgg
    src/stream/src/executor/aggregation/agg_group.rs:320:13: row count should be non-negative
  • HashJoin
    src/stream/src/executor/managed_state/join/join_entry_state.rs:43:44 unwrap()
    src/stream/src/executor/managed_state/join/join_entry_state.rs:51:13 pk [...] should be in the cache

There should be more cases... Need to review code to find out.

Let me share more info about the three occurrences of the panics:

  1. join_entry_state.rs:51:13 pk [...] should be in the cache

    • image: nightly-20231123
    • stateful operators involved (no agg):

      • inner join
      • left outer join
      • temporal filter
  2. agg_group.rs:320:13: row count should be non-negative

    • image v1.5.0
    • stateful operators involved

      • inner join
      • left outer join
      • temporal filter
      • agg
  3. join_entry_state.rs:43:44 unwrap()

    • image v1.4.0
    • stateful operators involved

      • left outer join
      • agg

~1 and 2 seem to be double delete while 3 seems to be double insert. I wonder whether it is possible to be caused by temporal filter emitting a previously emitted rows to downstream under some corner cases.~

Updated: there is no temporal filter involved in 3. The only common stateful operator seems to be left outer join

Suspicious join related PR introduced since v1.4.0: https://github.com/risingwavelabs/risingwave/pull/13214

st1page commented 9 months ago

Could because https://github.com/risingwavelabs/risingwave/pull/13351, I will do some invesitagitions

st1page commented 9 months ago

Could because #13351, I will do some invesitagitions

It doesn't seem so... https://github.com/risingwavelabs/risingwave/pull/14166

lmatz commented 9 months ago

one prod case of inconsistency: #14197

kwannoel commented 9 months ago

Another idea I have, we can enable the stream consistency check to be toggled at runtime. This means that inconsistent bugs which trigger crash loops can be caught and debugged in the user's cluster.

And it will catch most cases.

It can be supported in the same way as described in the issue description, by adding an additional executor, and using a system variable to toggle it.

Edit: @fuyufjh raised a good point which is that historical data then needs to be stored all the way to detect it. So seems like it can only be used in testing again..

Edit 2: From offline discussion with @st1page, @TennyZhuang proposed storing on log / local disk most recent operations, bounded by a period, e.g. 10,000 recent operations, we can use that to check.

So it can work, limited by the recent operations.

Edit 3: Typically bugs are triggered by very large data amount.

github-actions[bot] commented 3 months ago

This issue has been open for 60 days with no activity.

If you think it is still relevant today, and needs to be done in the near future, you can comment to update the status, or just manually remove the no-issue-activity label.

You can also confidently close this issue as not planned to keep our backlog clean. Don't worry if you think the issue is still valuable to continue in the future. It's searchable and can be reopened when it's time. 😄

stdrc commented 2 months ago

After some work in previous quarter, we already have a non-strict mode to allow inconsistent streams in out system. And we now have a description for the related config item in https://docs.risingwave.com/docs/dev/node-specific-configurations/#streaming-configurations. So I guess for now we can close this issue as completed?

While we have to acknowledge that the non-strict mode is just to ignore inconsistency. We still don't have a better way to identify inconsistency. We do have many code in several executors that checks the Ops and data in input chunk, but not all executors are covered, so it's still hard to backtrack a panic to find the origin in strict mode.

If anyone has any other thoughts, plz feel free to reopen this issue.