Closed tabVersion closed 1 year ago
@shanicky any idea?
We remark that #7500 by @tabVersion enables non-txn writes to bypass this problem at the cost of making sink at-least-once instead of exactly-once.
This issue remains.
did not reproduce when writing to msk
Got the same error while providing the wrong kafka address. Giving the user a whole bunch of error stacks is not OK. The error information should be simple and clear tell users to check their kafka service.
dev=> CREATE SINK s1 FROM mv WITH (
connector = 'kafka',
format = 'append_only',
properties.bootstrap.server='localhost:19092',
topic='test',
force_append_only='true',scan.startup.mode = 'earliest');
ERROR: QueryError: internal error: Rpc error: gRPC error (Internal error): Actor 49 exit unexpectedly: Executor error: Sink error: Kafka error: Transaction error: Failed to initialize Producer ID: Local: Timed out
backtrace of `StreamError`:
0: std::backtrace_rs::backtrace::libunwind::trace
at /rustc/3984bc5833db8bfb0acc522c9775383e4171f3de/library/std/src/../../backtrace/src/backtrace/libunwind.rs:93:5
1: std::backtrace_rs::backtrace::trace_unsynchronized
at /rustc/3984bc5833db8bfb0acc522c9775383e4171f3de/library/std/src/../../backtrace/src/backtrace/mod.rs:66:5
2: std::backtrace::Backtrace::create
at /rustc/3984bc5833db8bfb0acc522c9775383e4171f3de/library/std/src/backtrace.rs:332:13
3: <risingwave_stream::error::StreamError as core::convert::From<risingwave_stream::error::Inner>>::from
at ./src/stream/src/error.rs:47:10
4: <T as core::convert::Into<U>>::into
at /rustc/3984bc5833db8bfb0acc522c9775383e4171f3de/library/core/src/convert/mod.rs:726:9
5: <risingwave_stream::error::StreamError as core::convert::From<risingwave_stream::executor::error::StreamExecutorError>>::from
at ./src/stream/src/error.rs:94:9
6: <core::result::Result<T,F> as core::ops::try_trait::FromResidual<core::result::Result<core::convert::Infallible,E>>>::from_residual
at /rustc/3984bc5833db8bfb0acc522c9775383e4171f3de/library/core/src/result.rs:2107:27
7: <risingwave_stream::executor::dispatch::DispatchExecutor as risingwave_stream::executor::StreamConsumer>::execute::{{closure}}
at ./src/stream/src/executor/dispatch.rs:264:36
8: <futures_async_stream::try_stream::GenTryStream<G> as futures_core::stream::Stream>::poll_next
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-async-stream-0.2.6/src/lib.rs:463:33
9: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-core-0.3.26/src/stream.rs:120:9
10: <&mut S as futures_core::stream::Stream>::poll_next
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-core-0.3.26/src/stream.rs:104:9
11: <tokio_stream::stream_ext::next::Next<St> as core::future::future::Future>::poll
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-stream-0.1.11/src/stream_ext/next.rs:42:9
12: <minitrace::future::InSpan<T> as core::future::future::Future>::poll
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/minitrace-0.4.0/src/future.rs:118:19
13: <async_stack_trace::StackTraced<F,_> as core::future::future::Future>::poll
at ./src/utils/async_stack_trace/src/lib.rs:182:23
14: risingwave_stream::executor::actor::Actor<C>::run_consumer::{{closure}}
at ./src/stream/src/executor/actor.rs:164:13
15: <tokio::future::maybe_done::MaybeDone<Fut> as core::future::future::Future>::poll
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/future/maybe_done.rs:68:48
16: risingwave_stream::executor::actor::Actor<C>::run::{{closure}}::{{closure}}
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/macros/join.rs:126:24
17: <tokio::future::poll_fn::PollFn<F> as core::future::future::Future>::poll
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/future/poll_fn.rs:58:9
18: risingwave_stream::executor::actor::Actor<C>::run::{{closure}}
at ./src/stream/src/executor/actor.rs:131:9
19: risingwave_stream::task::stream_manager::LocalStreamManagerCore::build_actors::{{closure}}::{{closure}}
at ./src/stream/src/task/stream_manager.rs:653:50
20: async_stack_trace::manager::TraceReporter::trace::{{closure}}::{{closure}}::{{closure}}
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/macros/select.rs:517:49
21: <tokio::future::poll_fn::PollFn<F> as core::future::future::Future>::poll
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/future/poll_fn.rs:58:9
22: async_stack_trace::manager::TraceReporter::trace::{{closure}}::{{closure}}
at ./src/utils/async_stack_trace/src/manager.rs:124:21
23: <tokio::task::task_local::TaskLocalFuture<T,F> as core::future::future::Future>::poll::{{closure}}
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/task/task_local.rs:348:35
24: tokio::task::task_local::LocalKey<T>::scope_inner
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/task/task_local.rs:233:19
25: <tokio::task::task_local::TaskLocalFuture<T,F> as core::future::future::Future>::poll
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/task/task_local.rs:345:13
26: async_stack_trace::manager::TraceReporter::trace::{{closure}}
at ./src/utils/async_stack_trace/src/manager.rs:131:13
27: <futures_util::future::either::Either<A,B> as core::future::future::Future>::poll
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.26/src/future/either.rs:109:32
28: <tokio_metrics::task::Instrumented<T> as core::future::future::Future>::poll
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-metrics-0.1.0/src/task.rs:2365:19
29: task_stats_alloc::allocation_stat::{{closure}}::{{closure}}::{{closure}}
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/macros/select.rs:517:49
30: <tokio::future::poll_fn::PollFn<F> as core::future::future::Future>::poll
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/future/poll_fn.rs:58:9
31: task_stats_alloc::allocation_stat::{{closure}}::{{closure}}
at ./src/utils/task_stats_alloc/src/lib.rs:124:26
32: <tokio::task::task_local::TaskLocalFuture<T,F> as core::future::future::Future>::poll::{{closure}}
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/task/task_local.rs:348:35
33: tokio::task::task_local::LocalKey<T>::scope_inner
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/task/task_local.rs:233:19
34: <tokio::task::task_local::TaskLocalFuture<T,F> as core::future::future::Future>::poll
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/task/task_local.rs:345:13
35: task_stats_alloc::allocation_stat::{{closure}}
at ./src/utils/task_stats_alloc/src/lib.rs:131:9
36: <tracing::instrument::Instrumented<T> as core::future::future::Future>::poll
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tracing-0.1.37/src/instrument.rs:272:9
37: tokio::runtime::task::core::Core<T,S>::poll::{{closure}}
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/runtime/task/core.rs:223:17
38: tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/loom/std/unsafe_cell.rs:14:9
39: tokio::runtime::task::core::Core<T,S>::poll
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/runtime/task/core.rs:212:13
40: tokio::runtime::task::harness::poll_future::{{closure}}
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/runtime/task/harness.rs:476:19
41: <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
at /rustc/3984bc5833db8bfb0acc522c9775383e4171f3de/library/core/src/panic/unwind_safe.rs:271:9
42: std::panicking::try::do_call
at /rustc/3984bc5833db8bfb0acc522c9775383e4171f3de/library/std/src/panicking.rs:483:40
43: ___rust_try
44: std::panicking::try
at /rustc/3984bc5833db8bfb0acc522c9775383e4171f3de/library/std/src/panicking.rs:447:19
45: std::panic::catch_unwind
at /rustc/3984bc5833db8bfb0acc522c9775383e4171f3de/library/std/src/panic.rs:140:14
46: tokio::runtime::task::harness::poll_future
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/runtime/task/harness.rs:464:18
47: tokio::runtime::task::harness::Harness<T,S>::poll_inner
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/runtime/task/harness.rs:198:27
48: tokio::runtime::task::harness::Harness<T,S>::poll
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/runtime/task/harness.rs:152:15
49: tokio::runtime::task::raw::poll
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/runtime/task/raw.rs:255:5
50: tokio::runtime::task::raw::RawTask::poll
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/runtime/task/raw.rs:200:18
51: tokio::runtime::task::LocalNotified<S>::run
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/runtime/task/mod.rs:394:9
52: tokio::runtime::scheduler::multi_thread::worker::Context::run_task::{{closure}}
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/runtime/scheduler/multi_thread/worker.rs:464:13
53: tokio::runtime::coop::with_budget
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/runtime/coop.rs:102:5
54: tokio::runtime::coop::budget
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/runtime/coop.rs:68:5
55: tokio::runtime::scheduler::multi_thread::worker::Context::run_task
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/runtime/scheduler/multi_thread/worker.rs:463:9
56: tokio::runtime::scheduler::multi_thread::worker::Context::run
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/runtime/scheduler/multi_thread/worker.rs:426:24
57: tokio::runtime::scheduler::multi_thread::worker::run::{{closure}}
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/runtime/scheduler/multi_thread/worker.rs:406:17
58: tokio::macros::scoped_tls::ScopedKey<T>::set
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/macros/scoped_tls.rs:61:9
59: tokio::runtime::scheduler::multi_thread::worker::run
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/runtime/scheduler/multi_thread/worker.rs:403:5
60: tokio::runtime::scheduler::multi_thread::worker::Launch::launch::{{closure}}
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/runtime/scheduler/multi_thread/worker.rs:365:45
61: <tokio::runtime::blocking::task::BlockingTask<T> as core::future::future::Future>::poll
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/runtime/blocking/task.rs:42:21
62: <tracing::instrument::Instrumented<T> as core::future::future::Future>::poll
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tracing-0.1.37/src/instrument.rs:272:9
63: tokio::runtime::task::core::Core<T,S>::poll::{{closure}}
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/runtime/task/core.rs:223:17
64: tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/loom/std/unsafe_cell.rs:14:9
65: tokio::runtime::task::core::Core<T,S>::poll
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/runtime/task/core.rs:212:13
66: tokio::runtime::task::harness::poll_future::{{closure}}
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/runtime/task/harness.rs:476:19
67: <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
at /rustc/3984bc5833db8bfb0acc522c9775383e4171f3de/library/core/src/panic/unwind_safe.rs:271:9
68: std::panicking::try::do_call
at /rustc/3984bc5833db8bfb0acc522c9775383e4171f3de/library/std/src/panicking.rs:483:40
69: ___rust_try
70: std::panicking::try
at /rustc/3984bc5833db8bfb0acc522c9775383e4171f3de/library/std/src/panicking.rs:447:19
71: std::panic::catch_unwind
at /rustc/3984bc5833db8bfb0acc522c9775383e4171f3de/library/std/src/panic.rs:140:14
72: tokio::runtime::task::harness::poll_future
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/runtime/task/harness.rs:464:18
73: tokio::runtime::task::harness::Harness<T,S>::poll_inner
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/runtime/task/harness.rs:198:27
74: tokio::runtime::task::harness::Harness<T,S>::poll
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/runtime/task/harness.rs:152:15
75: tokio::runtime::task::raw::poll
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/runtime/task/raw.rs:255:5
76: tokio::runtime::task::raw::RawTask::poll
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/runtime/task/raw.rs:200:18
77: tokio::runtime::task::UnownedTask<S>::run
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/runtime/task/mod.rs:431:9
78: tokio::runtime::blocking::pool::Task::run
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/runtime/blocking/pool.rs:159:9
79: tokio::runtime::blocking::pool::Inner::run
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/runtime/blocking/pool.rs:511:17
80: tokio::runtime::blocking::pool::Spawner::spawn_thread::{{closure}}
at /Users/liuchengyou/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/runtime/blocking/pool.rs:469:13
81: std::sys_common::backtrace::__rust_begin_short_backtrace
at /rustc/3984bc5833db8bfb0acc522c9775383e4171f3de/library/std/src/sys_common/backtrace.rs:121:18
82: std::thread::Builder::spawn_unchecked_::{{closure}}::{{closure}}
at /rustc/3984bc5833db8bfb0acc522c9775383e4171f3de/library/std/src/thread/mod.rs:558:17
83: <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
at /rustc/3984bc5833db8bfb0acc522c9775383e4171f3de/library/core/src/panic/unwind_safe.rs:271:9
84: std::panicking::try::do_call
at /rustc/3984bc5833db8bfb0acc522c9775383e4171f3de/library/std/src/panicking.rs:483:40
85: ___rust_try
86: std::panicking::try
at /rustc/3984bc5833db8bfb0acc522c9775383e4171f3de/library/std/src/panicking.rs:447:19
87: std::panic::catch_unwind
at /rustc/3984bc5833db8bfb0acc522c9775383e4171f3de/library/std/src/panic.rs:140:14
88: std::thread::Builder::spawn_unchecked_::{{closure}}
at /rustc/3984bc5833db8bfb0acc522c9775383e4171f3de/library/std/src/thread/mod.rs:557:30
89: core::ops::function::FnOnce::call_once{{vtable.shim}}
at /rustc/3984bc5833db8bfb0acc522c9775383e4171f3de/library/core/src/ops/function.rs:250:5
90: <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once
at /rustc/3984bc5833db8bfb0acc522c9775383e4171f3de/library/alloc/src/boxed.rs:1988:9
91: <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once
at /rustc/3984bc5833db8bfb0acc522c9775383e4171f3de/library/alloc/src/boxed.rs:1988:9
92: std::sys::unix::thread::Thread::new::thread_start
at /rustc/3984bc5833db8bfb0acc522c9775383e4171f3de/library/std/src/sys/unix/thread.rs:108:17
93: __pthread_deallocate
Got the same error while providing the wrong kafka address.
I don't think it is the same problem. In prev case, the error only occurs in transactions and if we forbid writing via transaction, everything goes well.
We remark that #7500 by @tabVersion enables non-txn writes to bypass this problem at the cost of making sink at-least-once instead of exactly-once.
This issue remains.
My understanding was wrong. Exactly-once delivery cannot be achieved by using Kafka's transaction only. It has to be re-designed, by using a different mechanism, e.g. 2PC(?).
according to msk doc and kafka doc, the above error is related to the following parameters.
property | default | note |
---|---|---|
transaction.max.timeout.ms |
900000 (15 minutes) | The time in milliseconds that the transaction coordinator waits to receive any transaction status updates for the current transaction before the coordinator expires its transactional ID. |
transaction.timeout.ms |
60000 (1 minute) | The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction. |
We remark that https://github.com/risingwavelabs/risingwave/pull/7500 by @tabVersion enables non-txn writes to bypass this problem at the cost of making sink at-least-once instead of exactly-once. This issue remains.
My understanding was wrong. Exactly-once delivery cannot be achieved by using Kafka's transaction only. It has to be re-designed, by using a different mechanism, e.g. 2PC(?). flink.apache.org/2018/02/28/an-overview-of-end-to-end-exactly-once-processing-in-apache-flink-with-apache-kafka-too/#end-to-end-exactly-once-applications-with-apache-flink
since providing txn is not providing exactly once semantic, we are removing it.
close as removing support for kafka sink transaction temporarily
Describe the bug
To Reproduce
No response
Expected behavior
No response
Additional context
No response