risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
6.98k stars 575 forks source link

stream group topn panics when group by timestamp #7276

Closed st1page closed 1 year ago

st1page commented 1 year ago

Describe the bug

No response

To Reproduce

CREATE TABLE bid (
        "auction" BIGINT,
        "bidder" BIGINT,
        "price" BIGINT,
        "channel" VARCHAR,
        "url" VARCHAR,
        "date_time" TIMESTAMP,
        "extra" VARCHAR
    )with (appendonly = true);

    create materialized view mv as SELECT
      B.auction,
      B.price,
      B.bidder,
      B.date_time
    FROM (
      SELECT
        auction,
        price,
        bidder,
        date_time,
        /*use rank here to express top-N with ties*/
        rank() over (partition by window_end order by price desc) as price_rank
      FROM
        TUMBLE(bid, date_time, INTERVAL '10' SECOND)
    ) B
    WHERE price_rank <=1;

INSERT INTO bid (
    auction,
    bidder,
    price,
    channel,
    url,
    date_time,
    extra
) VALUES
(1000, 1001, 73134520, 'channel-7568', 'https://www.nexmark.com/rswp/ygi/_gwv/item.htm?query=1&channel_id=163053568', '2015-07-15 00:00:01', 'bxkfohfuvlkvjarjgrngycoibaooinpatxmrmhotgsqtdarhxlbrgroteageapilufrwznnvea');

Expected behavior

No response

Additional context

thread 'risingwave-streaming-actor' panicked at 'assertion failed: self.left_size() >= ret.len()', /home/sp/risingwave/src/common/src/hash/key.rs:527:17
stack backtrace:
   0: rust_begin_unwind
             at /rustc/bdb07a8ec8e77aa10fb84fae1d4ff71c21180bb4/library/std/src/panicking.rs:575:5
   1: core::panicking::panic_fmt
             at /rustc/bdb07a8ec8e77aa10fb84fae1d4ff71c21180bb4/library/core/src/panicking.rs:64:14
   2: core::panicking::panic
             at /rustc/bdb07a8ec8e77aa10fb84fae1d4ff71c21180bb4/library/core/src/panicking.rs:111:5
   3: <risingwave_common::hash::key::FixedSizeKeySerializer<_> as risingwave_common::hash::key::HashKeySerializer>::append
             at ./src/common/src/hash/key.rs:527:17
   4: risingwave_common::hash::key::serialize_array_to_hash_key
             at ./src/common/src/hash/key.rs:622:9
   5: risingwave_common::hash::key::<impl risingwave_common::array::ArrayImpl>::serialize_to_hash_key
             at ./src/common/src/hash/key.rs:648:9
   6: risingwave_common::hash::key::HashKey::build_from_hash_code
             at ./src/common/src/hash/key.rs:130:13
   7: risingwave_common::hash::key::HashKey::build
             at ./src/common/src/hash/key.rs:112:12
   8: <risingwave_stream::executor::top_n::group_top_n::InnerGroupTopNExecutorNew<K,S,_> as risingwave_stream::executor::top_n::utils::TopNExecutorBase>::apply_chunk::{{closure}}
             at ./src/stream/src/executor/top_n/group_top_n.rs:173:20
   9: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /rustc/bdb07a8ec8e77aa10fb84fae1d4ff71c21180bb4/library/core/src/future/future.rs:124:9
  10: risingwave_stream::executor::top_n::utils::TopNExecutorWrapper<E>::top_n_executor_execute::{{closure}}
             at ./src/stream/src/executor/top_n/utils.rs:128:93
  11: <futures_async_stream::try_stream::from_generator::GenTryStream<G> as futures_core::stream::Stream>::poll_next
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-async-stream-0.2.5/src/lib.rs:430:31
  12: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-core-0.3.25/src/stream.rs:120:9
  13: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-core-0.3.25/src/stream.rs:120:9
  14: futures_util::stream::stream::StreamExt::poll_next_unpin
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/stream/stream/mod.rs:1626:9
  15: <futures_util::stream::stream::next::Next<St> as core::future::future::Future>::poll
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/stream/stream/next.rs:32:9
  16: <minitrace::future::InSpan<T> as core::future::future::Future>::poll
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/minitrace-0.4.0/src/future.rs:118:19
  17: risingwave_stream::executor::wrapper::trace::trace::{{closure}}
             at ./src/stream/src/executor/wrapper/trace.rs:53:60
  18: <futures_async_stream::try_stream::from_generator::GenTryStream<G> as futures_core::stream::Stream>::poll_next
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-async-stream-0.2.5/src/lib.rs:430:31
  19: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-core-0.3.25/src/stream.rs:120:9
  20: futures_util::stream::stream::StreamExt::poll_next_unpin
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/stream/stream/mod.rs:1626:9
  21: <futures_util::stream::stream::next::Next<St> as core::future::future::Future>::poll
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/stream/stream/next.rs:32:9
  22: <async_stack_trace::StackTraced<F,_> as core::future::future::Future>::poll
             at ./src/utils/async_stack_trace/src/lib.rs:182:23
  23: risingwave_stream::executor::wrapper::trace::stack_trace::{{closure}}
             at ./src/stream/src/executor/wrapper/trace.rs:120:70
  24: <futures_async_stream::try_stream::from_generator::GenTryStream<G> as futures_core::stream::Stream>::poll_next
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-async-stream-0.2.5/src/lib.rs:430:31
  25: risingwave_stream::executor::wrapper::schema_check::schema_check::{{closure}}
             at ./src/stream/src/executor/wrapper/schema_check.rs:24:1
  26: <futures_async_stream::try_stream::from_generator::GenTryStream<G> as futures_core::stream::Stream>::poll_next
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-async-stream-0.2.5/src/lib.rs:430:31
  27: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-core-0.3.25/src/stream.rs:120:9
  28: futures_util::stream::stream::StreamExt::poll_next_unpin
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/stream/stream/mod.rs:1626:9
  29: <futures_util::stream::stream::next::Next<St> as core::future::future::Future>::poll
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/stream/stream/next.rs:32:9
  30: risingwave_stream::executor::wrapper::epoch_check::epoch_check::{{closure}}
             at ./src/stream/src/executor/wrapper/epoch_check.rs:31:44
  31: <futures_async_stream::try_stream::from_generator::GenTryStream<G> as futures_core::stream::Stream>::poll_next
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-async-stream-0.2.5/src/lib.rs:430:31
  32: risingwave_stream::executor::wrapper::update_check::update_check::{{closure}}
             at ./src/stream/src/executor/wrapper/update_check.rs:27:1
  33: <futures_async_stream::try_stream::from_generator::GenTryStream<G> as futures_core::stream::Stream>::poll_next
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-async-stream-0.2.5/src/lib.rs:430:31
  34: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-core-0.3.25/src/stream.rs:120:9
  35: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-core-0.3.25/src/stream.rs:120:9
  36: risingwave_stream::executor::simple::SimpleExecutorWrapper<E>::execute_inner::{{closure}}
             at ./src/stream/src/executor/simple.rs:68:5
  37: <futures_async_stream::try_stream::from_generator::GenTryStream<G> as futures_core::stream::Stream>::poll_next
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-async-stream-0.2.5/src/lib.rs:430:31
  38: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-core-0.3.25/src/stream.rs:120:9
  39: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-core-0.3.25/src/stream.rs:120:9
  40: futures_util::stream::stream::StreamExt::poll_next_unpin
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/stream/stream/mod.rs:1626:9
  41: <futures_util::stream::stream::next::Next<St> as core::future::future::Future>::poll
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/stream/stream/next.rs:32:9
  42: <minitrace::future::InSpan<T> as core::future::future::Future>::poll
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/minitrace-0.4.0/src/future.rs:118:19
  43: risingwave_stream::executor::wrapper::trace::trace::{{closure}}
             at ./src/stream/src/executor/wrapper/trace.rs:53:60
  44: <futures_async_stream::try_stream::from_generator::GenTryStream<G> as futures_core::stream::Stream>::poll_next
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-async-stream-0.2.5/src/lib.rs:430:31
  45: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-core-0.3.25/src/stream.rs:120:9
  46: futures_util::stream::stream::StreamExt::poll_next_unpin
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/stream/stream/mod.rs:1626:9
  47: <futures_util::stream::stream::next::Next<St> as core::future::future::Future>::poll
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/stream/stream/next.rs:32:9
  48: <async_stack_trace::StackTraced<F,_> as core::future::future::Future>::poll
             at ./src/utils/async_stack_trace/src/lib.rs:182:23
  49: risingwave_stream::executor::wrapper::trace::stack_trace::{{closure}}
             at ./src/stream/src/executor/wrapper/trace.rs:120:70
  50: <futures_async_stream::try_stream::from_generator::GenTryStream<G> as futures_core::stream::Stream>::poll_next
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-async-stream-0.2.5/src/lib.rs:430:31
  51: risingwave_stream::executor::wrapper::schema_check::schema_check::{{closure}}
             at ./src/stream/src/executor/wrapper/schema_check.rs:24:1
  52: <futures_async_stream::try_stream::from_generator::GenTryStream<G> as futures_core::stream::Stream>::poll_next
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-async-stream-0.2.5/src/lib.rs:430:31
  53: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-core-0.3.25/src/stream.rs:120:9
  54: futures_util::stream::stream::StreamExt::poll_next_unpin
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/stream/stream/mod.rs:1626:9
  55: <futures_util::stream::stream::next::Next<St> as core::future::future::Future>::poll
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/stream/stream/next.rs:32:9
  56: risingwave_stream::executor::wrapper::epoch_check::epoch_check::{{closure}}
             at ./src/stream/src/executor/wrapper/epoch_check.rs:31:44
  57: <futures_async_stream::try_stream::from_generator::GenTryStream<G> as futures_core::stream::Stream>::poll_next
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-async-stream-0.2.5/src/lib.rs:430:31
  58: risingwave_stream::executor::wrapper::update_check::update_check::{{closure}}
             at ./src/stream/src/executor/wrapper/update_check.rs:27:1
  59: <futures_async_stream::try_stream::from_generator::GenTryStream<G> as futures_core::stream::Stream>::poll_next
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-async-stream-0.2.5/src/lib.rs:430:31
  60: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-core-0.3.25/src/stream.rs:120:9
  61: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-core-0.3.25/src/stream.rs:120:9
  62: <risingwave_stream::executor::dispatch::DispatchExecutor as risingwave_stream::executor::StreamConsumer>::execute::{{closure}}
             at ./src/stream/src/executor/dispatch.rs:263:9
  63: <futures_async_stream::try_stream::from_generator::GenTryStream<G> as futures_core::stream::Stream>::poll_next
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-async-stream-0.2.5/src/lib.rs:430:31
  64: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-core-0.3.25/src/stream.rs:120:9
  65: <&mut S as futures_core::stream::Stream>::poll_next
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-core-0.3.25/src/stream.rs:104:9
  66: <tokio_stream::stream_ext::next::Next<St> as core::future::future::Future>::poll
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-stream-0.1.11/src/stream_ext/next.rs:42:9
  67: <minitrace::future::InSpan<T> as core::future::future::Future>::poll
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/minitrace-0.4.0/src/future.rs:118:19
  68: <async_stack_trace::StackTraced<F,_> as core::future::future::Future>::poll
             at ./src/utils/async_stack_trace/src/lib.rs:182:23
  69: risingwave_stream::executor::actor::Actor<C>::run_consumer::{{closure}}
             at ./src/stream/src/executor/actor.rs:128:13
  70: <tokio::future::maybe_done::MaybeDone<Fut> as core::future::future::Future>::poll
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.22.0/src/future/maybe_done.rs:68:48
  71: risingwave_stream::executor::actor::Actor<C>::run::{{closure}}::{{closure}}
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.22.0/src/macros/join.rs:126:24
  72: <tokio::future::poll_fn::PollFn<F> as core::future::future::Future>::poll
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.22.0/src/future/poll_fn.rs:58:9
  73: risingwave_stream::executor::actor::Actor<C>::run::{{closure}}
             at ./src/stream/src/executor/actor.rs:95:9
  74: risingwave_stream::task::stream_manager::LocalStreamManagerCore::build_actors::{{closure}}::{{closure}}
             at ./src/stream/src/task/stream_manager.rs:650:50
  75: async_stack_trace::manager::TraceReporter::trace::{{closure}}::{{closure}}::{{closure}}
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.22.0/src/macros/select.rs:517:49
  76: <tokio::future::poll_fn::PollFn<F> as core::future::future::Future>::poll
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.22.0/src/future/poll_fn.rs:58:9
  77: async_stack_trace::manager::TraceReporter::trace::{{closure}}::{{closure}}
             at ./src/utils/async_stack_trace/src/manager.rs:124:21
  78: <tokio::task::task_local::TaskLocalFuture<T,F> as core::future::future::Future>::poll::{{closure}}
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.22.0/src/task/task_local.rs:348:35
  79: tokio::task::task_local::LocalKey<T>::scope_inner
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.22.0/src/task/task_local.rs:233:19
  80: <tokio::task::task_local::TaskLocalFuture<T,F> as core::future::future::Future>::poll
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.22.0/src/task/task_local.rs:345:13
  81: async_stack_trace::manager::TraceReporter::trace::{{closure}}
             at ./src/utils/async_stack_trace/src/manager.rs:131:13
  82: <futures_util::future::either::Either<A,B> as core::future::future::Future>::poll
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/future/either.rs:89:32
  83: <tokio_metrics::task::Instrumented<T> as core::future::future::Future>::poll
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-metrics-0.1.0/src/task.rs:2365:19
  84: task_stats_alloc::allocation_stat::{{closure}}::{{closure}}::{{closure}}
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.22.0/src/macros/select.rs:517:49
  85: <tokio::future::poll_fn::PollFn<F> as core::future::future::Future>::poll
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.22.0/src/future/poll_fn.rs:58:9
  86: task_stats_alloc::allocation_stat::{{closure}}::{{closure}}
             at ./src/utils/task_stats_alloc/src/lib.rs:124:26
  87: <tokio::task::task_local::TaskLocalFuture<T,F> as core::future::future::Future>::poll::{{closure}}
             at /home/sp/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.22.0/src/task/task_local.rs:348:35
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
xxchan commented 1 year ago

minimum reproduce

CREATE TABLE t (
        x int,
        ts TIMESTAMP
    );

    create materialized view mv as SELECT
      1 as x
    FROM (
      SELECT
        row_number() over (partition by ts order by x) as rank
      FROM
        t
    )
    WHERE rank <=1;

INSERT INTO t VALUES
(1, '2015-07-15 00:00:01');