risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
6.88k stars 569 forks source link

Cluster entering a recovery loop may caused by Kafka sink QueueFull (Local: Queue full) #16640

Open ly9chee opened 5 months ago

ly9chee commented 5 months ago

Describe the bug

Description

The cluster entering a recovery loop when creating kafka sink from a mv (about 4 million records); when the problematic sink was dropped, the system went back to normal.

create sink sink_1 from mv1
WITH (
    connector='kafka',
    primary_key='***',
    topic='***',
    properties.bootstrap.server='***',
    properties.compression.codec='zstd'
) FORMAT UPSERT ENCODE JSON

20 minutes later, the sink had been successfully recreated.

image

Other observed phenomena

Kafka should work fine, because within the recovery period, the topic had 140 million records written into it, but the upstream mv only had 4 million records.

image

Not sure increasing properties.retry.max can solve this issue.

Error message/log

2024-05-08T02:50:36.677896483Z  WARN actor{otel.name="Actor 4487" actor_id=4487 prev_epoch=6416051083608064 curr_epoch=6416068298735616}:executor{otel.name="Sink 118700000001"}: risingwave_connector::sink::kafka: producing message (key Some([*, *, *])) to topic *** failed error=Message production error: QueueFull (Local: Queue full)
2024-05-08T02:50:36.677906392Z  WARN actor{otel.name="Actor 4487" actor_id=4487 prev_epoch=6416051083608064 curr_epoch=6416068298735616}:executor{otel.name="Sink 118700000001"}: risingwave_connector::sink::kafka: Producer queue full. Delivery future buffer size=100029. Await and retry #1
2024-05-08T02:50:36.677909398Z  WARN actor{otel.name="Actor 4487" actor_id=4487 prev_epoch=6416051083608064 curr_epoch=6416068298735616}:executor{otel.name="Sink 118700000001"}: risingwave_connector::sink::kafka: producing message (key Some([*, *, *])) to topic *** error=Message production error: QueueFull (Local: Queue full)
2024-05-08T02:50:36.677920839Z  WARN actor{otel.name="Actor 4487" actor_id=4487 prev_epoch=6416051083608064 curr_epoch=6416068298735616}:executor{otel.name="Sink 118700000001"}: risingwave_connector::sink::kafka: Producer queue full. Delivery future buffer size=100028. Await and retry #2
2024-05-08T02:50:37.377364973Z ERROR risingwave_stream::task::stream_manager: actor exit with error actor_id=4490 error=Executor error: Sink error: Kafka error: Message production error: QueueFull (Local: Queue full)

Backtrace:
   0: <thiserror_ext::backtrace::MaybeBacktrace as thiserror_ext::backtrace::WithBacktrace>::capture
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/thiserror-ext-0.0.11/src/backtrace.rs:30:18
   1: thiserror_ext::ptr::ErrorBox<T,B>::new
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/thiserror-ext-0.0.11/src/ptr.rs:40:33
   2: <risingwave_stream::executor::error::StreamExecutorError as core::convert::From<E>>::from
             at ./risingwave/src/stream/src/executor/error.rs:35:35
   3: <core::result::Result<T,F> as core::ops::try_trait::FromResidual<core::result::Result<core::convert::Infallible,E>>>::from_residual
             at ./rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/core/src/result.rs:1959:27
   4: risingwave_stream::executor::sink::SinkExecutor<F>::execute_consume_log::{{closure}}
             at ./risingwave/src/stream/src/executor/sink.rs:430:13
   5: <futures_util::stream::once::Once<Fut> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/stream/once.rs:46:33
                                                                                                      51440,1       17%
   5: <futures_util::stream::once::Once<Fut> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/stream/once.rs:46:33
   6: <futures_util::future::future::IntoStream<F> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/lib.rs:102:13
   7: futures_util::stream::select_with_strategy::poll_side
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/stream/select_with_strategy.rs:219:27
   8: futures_util::stream::select_with_strategy::poll_inner
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/stream/select_with_strategy.rs:234:28
   9: <futures_util::stream::select_with_strategy::SelectWithStrategy<St1,St2,Clos,State> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/stream/select_with_strategy.rs:270:17
  10: <futures_util::stream::select::Select<St1,St2> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/stream/select.rs:115:9
  11: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-core-0.3.30/src/stream.rs:120:9
  12: <futures_util::stream::stream::flatten::Flatten<St,<St as futures_core::stream::Stream>::Item> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/stream/stream/flatten.rs:50:44
  13: <futures_util::stream::stream::Flatten<St> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/lib.rs:102:13
  14: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-core-0.3.30/src/stream.rs:120:9
  15: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-core-0.3.30/src/stream.rs:120:9
  16: futures_util::stream::stream::StreamExt::poll_next_unpin
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/stream/stream/mod.rs:1638:9
  17: <futures_util::stream::stream::next::Next<St> as core::future::future::Future>::poll
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/stream/stream/next.rs:32:9
  18: <await_tree::future::Instrumented<F,_> as core::future::future::Future>::poll
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/await-tree-0.1.1/src/future.rs:124:23
  19: risingwave_stream::executor::wrapper::trace::instrument_await_tree::{{closure}}
             at ./risingwave/src/stream/src/executor/wrapper/trace.rs:115:10
                                                                                                      51457,4       17%
  19: risingwave_stream::executor::wrapper::trace::instrument_await_tree::{{closure}}
             at ./risingwave/src/stream/src/executor/wrapper/trace.rs:115:10
  20: <futures_async_stream::try_stream::GenTryStream<G> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-async-stream-0.2.9/src/lib.rs:506:33
  21: risingwave_stream::executor::wrapper::schema_check::schema_check::{{closure}}
             at ./risingwave/src/stream/src/executor/wrapper/schema_check.rs:24:1
  22: <futures_async_stream::try_stream::GenTryStream<G> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-async-stream-0.2.9/src/lib.rs:506:33
  23: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-core-0.3.30/src/stream.rs:120:9
  24: futures_util::stream::stream::StreamExt::poll_next_unpin
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/stream/stream/mod.rs:1638:9
  25: <futures_util::stream::stream::next::Next<St> as core::future::future::Future>::poll
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/stream/stream/next.rs:32:9
  26: risingwave_stream::executor::wrapper::epoch_check::epoch_check::{{closure}}
             at ./risingwave/src/stream/src/executor/wrapper/epoch_check.rs:31:44
  27: <futures_async_stream::try_stream::GenTryStream<G> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-async-stream-0.2.9/src/lib.rs:506:33
  28: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-core-0.3.30/src/stream.rs:120:9
  29: <S as futures_core::stream::TryStream>::try_poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-core-0.3.30/src/stream.rs:196:9
  30: futures_util::stream::try_stream::TryStreamExt::try_poll_next_unpin
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/stream/try_stream/mod.rs:1131:9
  31: <futures_util::stream::try_stream::try_next::TryNext<St> as core::future::future::Future>::poll
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/stream/try_stream/try_next.rs:32:9
  32: <tokio::task::task_local::TaskLocalFuture<T,F> as core::future::future::Future>::poll::{{closure}}
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/task/task_local.rs:347:31
  33: tokio::task::task_local::LocalKey<T>::scope_inner
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/task/task_local.rs:217:19
  34: <tokio::task::task_local::TaskLocalFuture<T,F> as core::future::future::Future>::poll
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/task/task_local.rs:343:19
  35: risingwave_common::util::epoch::task_local::scope::{{closure}}
             at ./risingwave/src/common/src/util/epoch.rs:244:47
  36: risingwave_stream::executor::wrapper::epoch_provide::epoch_provide::{{closure}}
                                                                                                      51485,3       17%
             at ./risingwave/src/common/src/util/epoch.rs:244:47
  36: risingwave_stream::executor::wrapper::epoch_provide::epoch_provide::{{closure}}
             at ./risingwave/src/stream/src/executor/wrapper/epoch_provide.rs:31:59
  37: <futures_async_stream::try_stream::GenTryStream<G> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-async-stream-0.2.9/src/lib.rs:506:33
  38: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-core-0.3.30/src/stream.rs:120:9
  39: futures_util::stream::stream::StreamExt::poll_next_unpin
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/stream/stream/mod.rs:1638:9
  40: <futures_util::stream::stream::next::Next<St> as core::future::future::Future>::poll
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/stream/stream/next.rs:32:9
  41: <tracing::instrument::Instrumented<T> as core::future::future::Future>::poll
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tracing-0.1.40/src/instrument.rs:321:9
  42: risingwave_stream::executor::wrapper::trace::trace::{{closure}}
             at ./risingwave/src/stream/src/executor/wrapper/trace.rs:48:69
  43: <futures_async_stream::try_stream::GenTryStream<G> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-async-stream-0.2.9/src/lib.rs:506:33
  44: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-core-0.3.30/src/stream.rs:120:9
  45: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-core-0.3.30/src/stream.rs:120:9
  46: <risingwave_stream::executor::dispatch::DispatchExecutor as risingwave_stream::executor::StreamConsumer>::execute::{{closure}}
             at ./risingwave/src/stream/src/executor/dispatch.rs:382:9
  47: <futures_async_stream::try_stream::GenTryStream<G> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-async-stream-0.2.9/src/lib.rs:506:33
  48: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-core-0.3.30/src/stream.rs:120:9
  49: <&mut S as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-core-0.3.30/src/stream.rs:104:9
  50: <tokio_stream::stream_ext::next::Next<St> as core::future::future::Future>::poll
             at ./root/.cargo/git/checkouts/tokio-968c02b7a1a41bea/fe39bb8/tokio-stream/src/stream_ext/next.rs:42:9
  51: <tokio_stream::stream_ext::try_next::TryNext<St> as core::future::future::Future>::poll
             at ./root/.cargo/git/checkouts/tokio-968c02b7a1a41bea/fe39bb8/tokio-stream/src/stream_ext/try_next.rs:43:9
  52: <tracing::instrument::Instrumented<T> as core::future::future::Future>::poll
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tracing-0.1.40/src/instrument.rs:321:9
  53: <await_tree::future::Instrumented<F,_> as core::future::future::Future>::poll
                                                                                                      51518,14      17%
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tracing-0.1.40/src/instrument.rs:321:9
  53: <await_tree::future::Instrumented<F,_> as core::future::future::Future>::poll
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/await-tree-0.1.1/src/future.rs:124:23
  54: risingwave_stream::executor::actor::Actor<C>::run_consumer::{{closure}}
             at ./risingwave/src/stream/src/executor/actor.rs:206:18
  55: <tokio::future::maybe_done::MaybeDone<Fut> as core::future::future::Future>::poll
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/future/maybe_done.rs:68:48
  56: risingwave_stream::executor::actor::Actor<C>::run::{{closure}}::{{closure}}::{{closure}}
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/macros/join.rs:126:24
  57: <tokio::future::poll_fn::PollFn<F> as core::future::future::Future>::poll
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/future/poll_fn.rs:58:9
  58: risingwave_stream::executor::actor::Actor<C>::run::{{closure}}::{{closure}}
             at ./risingwave/src/stream/src/executor/actor.rs:162:17
  59: <tokio::task::task_local::TaskLocalFuture<T,F> as core::future::future::Future>::poll::{{closure}}
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/task/task_local.rs:347:31
  60: tokio::task::task_local::LocalKey<T>::scope_inner
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/task/task_local.rs:217:19
  61: <tokio::task::task_local::TaskLocalFuture<T,F> as core::future::future::Future>::poll
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/task/task_local.rs:343:19
  62: risingwave_expr::expr_context::expr_context_scope::{{closure}}
             at ./risingwave/src/expr/core/src/expr_context.rs:35:65
  63: <tokio::task::task_local::TaskLocalFuture<T,F> as core::future::future::Future>::poll::{{closure}}
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/task/task_local.rs:347:31
  64: tokio::task::task_local::LocalKey<T>::scope_inner
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/task/task_local.rs:217:19
  65: <tokio::task::task_local::TaskLocalFuture<T,F> as core::future::future::Future>::poll
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/task/task_local.rs:343:19
  66: risingwave_stream::executor::actor::Actor<C>::run::{{closure}}
             at ./risingwave/src/stream/src/executor/actor.rs:170:10
  67: <futures_util::future::future::map::Map<Fut,F> as core::future::future::Future>::poll
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/future/map.rs:55:37
  68: <futures_util::future::future::Map<Fut,F> as core::future::future::Future>::poll
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/lib.rs:91:13
  69: <tokio::task::task_local::TaskLocalFuture<T,F> as core::future::future::Future>::poll::{{closure}}
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/task/task_local.rs:347:31
  70: tokio::task::task_local::LocalKey<T>::scope_inner
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/task/task_local.rs:217:19
  71: <tokio::task::task_local::TaskLocalFuture<T,F> as core::future::future::Future>::poll
                                                                                                      51552,14      17%
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/task/harness.rs:208:27
 114: tokio::runtime::task::harness::Harness<T,S>::poll
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/task/harness.rs:153:15
 115: tokio::runtime::task::raw::RawTask::poll
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/task/raw.rs:200:18
 116: tokio::runtime::task::UnownedTask<S>::run
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/task/mod.rs:437:9
 117: tokio::runtime::blocking::pool::Task::run
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/blocking/pool.rs:159:9
 118: tokio::runtime::blocking::pool::Inner::run
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/blocking/pool.rs:513:17
 119: tokio::runtime::blocking::pool::Spawner::spawn_thread::{{closure}}
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/blocking/pool.rs:471:13
 120: std::sys_common::backtrace::__rust_begin_short_backtrace
             at ./rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/std/src/sys_common/backtrace.rs:155:18
 121: std::thread::Builder::spawn_unchecked_::{{closure}}::{{closure}}
             at ./rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/std/src/thread/mod.rs:529:17
 122: <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
             at ./rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/core/src/panic/unwind_safe.rs:272:9
 123: std::panicking::try::do_call
             at ./rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/std/src/panicking.rs:552:40
 124: std::panicking::try
             at ./rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/std/src/panicking.rs:516:19
 125: std::panic::catch_unwind
             at ./rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/std/src/panic.rs:142:14
 126: std::thread::Builder::spawn_unchecked_::{{closure}}
             at ./rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/std/src/thread/mod.rs:528:30
 127: core::ops::function::FnOnce::call_once{{vtable.shim}}
             at ./rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/core/src/ops/function.rs:250:5
 128: <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once
             at ./rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/alloc/src/boxed.rs:2015:9
 129: <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once
             at ./rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/alloc/src/boxed.rs:2015:9
 130: std::sys::unix::thread::Thread::new::thread_start
                                                                                                      51674,14      17%
             at ./rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/alloc/src/boxed.rs:2015:9
 130: std::sys::unix::thread::Thread::new::thread_start
             at ./rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/std/src/sys/unix/thread.rs:108:17
 131: start_thread
             at ./nptl/pthread_create.c:442:8
 132: __GI___clone
             at ./misc/../sysdeps/unix/sysv/linux/x86_64/clone.S:100

To Reproduce

No response

Expected behavior

No response

How did you deploy RisingWave?

No response

The version of RisingWave

v1.8.2

Additional context

No response

xiangjinwu commented 5 months ago

@tabVersion Any recommendations / thoughts on this?

tabVersion commented 5 months ago

Seeing the log here

ERROR risingwave_stream::task::stream_manager: actor exit with error actor_id=4490 error=Executor error: Sink error: Kafka error: Message production error: QueueFull (Local: Queue full)

It indicates the batching is too small and it triggers a small failover to ingest the data within the same epoch over and over again. I'd suggest increasing properties.queue.buffering.max.ms and `` to allow larger batching cache and reduce the overhead for each batch. Related issue https://github.com/confluentinc/librdkafka/issues/2247

ly9chee commented 4 months ago

@tabVersion Thanks for the suggestions, but I have a concern about how to set those properties well, because in practice, the upstream thruput can change frequently, it may work pretty well when the upstream thruput is 1k/s, but enters a recovery loop when the upstream thruput gets high(100k/s) or Kafka is experiencing high load. And when we encounter this error, the only thing we can do is drop this sink to prevent cluster from continuing to crash.

It seems that in KafkaPayloadWriter implementation, when a Queue Full error is encountered, we only await one delivery to be sent and immediately create a new delivery future. https://github.com/risingwavelabs/risingwave/blob/91b7ee29ce4d846f9c2ee6d9f56264bab414250a/src/connector/src/sink/kafka.rs#L511-L522 In this case, I think we might await all inflight deliveries being drained or wait a sufficient time before doing a retry, otherwise the producer queue may keep reaching full.

ly9chee commented 4 months ago

It seems this issue is not urgent, because I can't reproduce it.😅

tabVersion commented 2 months ago

remove from milestone, keep open for tracking