Closed mr-swifter closed 1 year ago
This is reproducible on 22.2.6.
Create topics:
export REDPANDA_BROKERS="linux:9092"
export REDPANDA_API_ADMIN_ADDRS="linux:9644"
rpk cluster config set enable_transactions true
rpk topic create txn_input --partitions 6 #-replicas 3
rpk topic create txn_output --partitions 6 #-replicas 3
Build librdkafka and apply transaction_id.patch.txt :
git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
patch < transaction_id.patch.txt
./configure && make
A small wrapper for transactions to save logs:
cat run-transaction.sh
#!/bin/bash
if [ "$#" -ne 1 ]; then
echo "usage: $0 transcation_id"
exit -1
fi
transaction_id="${1}"
log_file="transactions_${transaction_id}_$(date +'%Y%m%d_%H%M')"
export TZ=UTC
./examples/transactions $REDPANDA_BROKERS txn_input txn_output ${transaction_id} 2>&1 | tee -a ${log_file}
#| perl -pe 's/([\d]{10})/localtime $1/eg;' | tee -a ${log_file}
Produce data into processing topic:
./examples/rdkafka_performance -b ${REDPANDA_BROKERS} -P -t txn_input -m "1 " -s 1000 -r 1000
Launch single transaction processor:
bash run-transaction.sh id_first_$$
Once data has been started to be processed, start the 2nd instance which will cause the first instance to abort.
bash run-transaction.sh id_second_$$
Trace logs from Redpanda:
TRACE 2022-10-27 16:03:48,789 [shard 0] kafka - request_context.h:160 - [192.168.66.1:53790] sending 1:fetch response {throttle_time_ms=0 error_code={ error_code: none [0] } session_id=0 topics={{name={txn_input} partitions={{partition_index=4 error_code={ error_code: none [0] } high_watermark=54737 last_stable_offset=54737 log_start_offset=0 aborted={nullopt} preferred_read_replica=-1 records={{size 1602327}}}, {partition_index=5 error_code={ error_code: none [0] } high_watermark=53383 last_stable_offset=53383 log_start_offset=0 aborted={nullopt} preferred_read_replica=-1 records={{size 1606371}}}, {partition_index=3 error_code={ error_code: none [0] } high_watermark=53148 last_stable_offset=53148 log_start_offset=0 aborted={nullopt} preferred_read_replica=-1 records={{size 1589182}}}}}}}
TRACE 2022-10-27 16:03:48,791 [shard 0] tx - rm_group_frontend.cc:263 - sending name:begin_group_tx, group_id:{librdkafka_transactions_example_group}, pid:{producer_identity: id=11004, epoch=0}, tx_seq:1884, ec:cluster::tx_errc:0, etag:25
TRACE 2022-10-27 16:03:48,792 [shard 0] tx - tx_gateway_frontend.cc:49 - released_lock name:add_offsets_to_tx, tx_id:{id_first_52628}
TRACE 2022-10-27 16:03:48,792 [shard 0] kafka - request_context.h:160 - [192.168.66.1:53786] sending 25:add_offsets_to_txn response {throttle_time_ms=0 error_code={ error_code: none [0] }}
TRACE 2022-10-27 16:03:48,796 [shard 0] kafka - requests.cc:91 - [192.168.66.1:53786] processing name:add_partitions_to_txn, key:24, version:0 for rdkafka, mem_units: 8134
TRACE 2022-10-27 16:03:48,796 [shard 0] tx - tx_gateway_frontend.cc:45 - got_lock name:add_partition_to_tx, tx_id:{id_first_52628}
TRACE 2022-10-27 16:03:48,796 [shard 0] tx - rm_partition_frontend.cc:247 - processing name:begin_tx, ntp:{kafka/txn_output/5}, pid:{producer_identity: id=11004, epoch=0}, tx_seq:1884
ERROR 2022-10-27 16:03:48,796 [shard 0] cluster - rm_stm.cc:364 - there is already an ongoing transaction within {producer_identity: id=11004, epoch=0} session
WARN 2022-10-27 16:03:48,796 [shard 0] tx - rm_partition_frontend.cc:303 - rm_stm::begin_tx({kafka/txn_output/5},...) failed with cluster::tx_errc:16
TRACE 2022-10-27 16:03:48,796 [shard 0] tx - rm_partition_frontend.cc:256 - sending name:begin_tx, ntp:{kafka/txn_output/5}, pid:{producer_identity: id=11004, epoch=0}, tx_seq:1884, ec:cluster::tx_errc:16, etag:-9223372036854775808
WARN 2022-10-27 16:03:48,796 [shard 0] tx - rm_partition_frontend.cc:149 - local execution of begin_tx({kafka/txn_output/5},...) failed with cluster::tx_errc:16
WARN 2022-10-27 16:03:48,796 [shard 0] tx - tx_gateway_frontend.cc:981 - begin_tx({kafka/txn_output/5},...) failed with cluster::tx_errc:16
TRACE 2022-10-27 16:03:48,796 [shard 0] tx - tx_gateway_frontend.cc:49 - released_lock name:add_partition_to_tx, tx_id:{id_first_52628}
TRACE 2022-10-27 16:03:48,796 [shard 0] kafka - request_context.h:160 - [192.168.66.1:53786] sending 24:add_partitions_to_txn response {throttle_time_ms=0 results={{name={txn_output} results={{partition_index=5 error_code={ error_code: invalid_txn_state [48] }}}}}}
TRACE 2022-10-27 16:03:48,797 [shard 0] kafka - requests.cc:91 - [192.168.66.1:53791] processing name:txn_offset_commit, key:28, version:3 for rdkafka, mem_units: 8326
TRACE 2022-10-27 16:03:48,797 [shard 0] kafka - txn_offset_commit.cc:86 - Handling request {transactional_id=id_first_52628 group_id={librdkafka_transactions_example_group} producer_id=11004 producer_epoch=0 generation_id=88 member_id={rdkafka-6970344e-9228-4f0c-bf63-adcc3373d66b} group_instance_id={nullopt} topics={{name={txn_input} partitions={{partition_index=3 committed_offset=49979 committed_leader_epoch=-1 committed_metadata={}}}}}}
TRACE 2022-10-27 16:03:48,797 [shard 0] kafka - request_context.h:160 - [192.168.66.1:53791] sending 28:txn_offset_commit response {throttle_time_ms=0 topics={{name={txn_input} partitions={{partition_index=3 error_code={ error_code: none [0] }}}}}}
TRACE 2022-10-27 16:03:48,811 [shard 0] exception - Throw exception at:
0x43c1827 0x41089f3 /opt/redpanda/lib/libc++abi.so.1+0x2ca97 0x41f10d3 0x41f11b7 0x41c44cb 0x41c732f 0x41c5417 0x40f85db 0x40f6dd7 0x17c662f 0x4451567 /opt/redpanda/lib/libc.so.6+0x24a9b 0x17c3637
--------
seastar::continuation<seastar::internal::promise_base_with_type<seastar::temporary_buffer<char> >, seastar::reactor::do_read_some(seastar::pollable_fd_state&, seastar::internal::buffer_allocator*)::$_34, seastar::future<seastar::temporary_buffer<char> > seastar::future<void>::then_impl_nrvo<seastar::reactor::do_read_some(seastar::pollable_fd_state&, seastar::internal::buffer_allocator*)::$_34, seastar::future<seastar::temporary_buffer<char> > >(seastar::reactor::do_read_some(seastar::pollable_fd_state&, seastar::internal::buffer_allocator*)::$_34&&)::'lambda'(seastar::internal::promise_base_with_type<seastar::temporary_buffer<char> >&&, seastar::reactor::do_read_some(seastar::pollable_fd_state&, seastar::internal::buffer_allocator*)::$_34&, seastar::future_state<seastar::internal::monostate>&&), void>
--------
seastar::continuation<seastar::internal::promise_base_with_type<seastar::temporary_buffer<char> >, seastar::net::posix_data_source_impl::get()::$_2, seastar::future<seastar::temporary_buffer<char> > seastar::future<seastar::temporary_buffer<char> >::then_impl_nrvo<seastar::net::posix_data_source_impl::get()::$_2, seastar::future<seastar::temporary_buffer<char> > >(seastar::net::posix_data_source_impl::get()::$_2&&)::'lambda'(seastar::internal::promise_base_with_type<seastar::temporary_buffer<char> >&&, seastar::net::posix_data_source_impl::get()::$_2&, seastar::future_state<seastar::temporary_buffer<char> >&&), seastar::temporary_buffer<char> >
--------
seastar::continuation<seastar::internal::promise_base_with_type<seastar::temporary_buffer<char> >, seastar::input_stream<char>::read_exactly(unsigned long)::'lambda'(auto), seastar::future<seastar::temporary_buffer<char> > seastar::future<seastar::temporary_buffer<char> >::then_impl_nrvo<seastar::input_stream<char>::read_exactly(unsigned long)::'lambda'(auto), seastar::future<seastar::temporary_buffer<char> > >(auto&&)::'lambda'(seastar::internal::promise_base_with_type<seastar::temporary_buffer<char> >&&, seastar::input_stream<char>::read_exactly(unsigned long)::'lambda'(auto)&, seastar::future_state<seastar::temporary_buffer<char> >&&), seastar::temporary_buffer<char> >
--------
seastar::internal::coroutine_traits_base<std::__1::optional<unsigned long> >::promise_type
--------
seastar::continuation<seastar::internal::promise_base_with_type<void>, kafka::connection_context::process_one_request()::$_0, seastar::future<void> seastar::future<std::__1::optional<unsigned long> >::then_impl_nrvo<kafka::connection_context::process_one_request()::$_0, seastar::future<void> >(kafka::connection_context::process_one_request()::$_0&&)::'lambda'(seastar::internal::promise_base_with_type<void>&&, kafka::connection_context::process_one_request()::$_0&, seastar::future_state<std::__1::optional<unsigned long> >&&), std::__1::optional<unsigned long> >
--------
seastar::internal::do_until_state<kafka::protocol::apply(net::server::resources)::$_4, kafka::protocol::apply(net::server::resources)::$_3>
--------
seastar::continuation<seastar::internal::promise_base_with_type<void>, seastar::future<void> seastar::future<void>::handle_exception<kafka::protocol::apply(net::server::resources)::$_5>(kafka::protocol::apply(net::server::resources)::$_5&&)::'lambda'(kafka::protocol::apply(net::server::resources)::$_5&&), seastar::futurize<kafka::protocol::apply(net::server::resources)::$_5>::type seastar::future<void>::then_wrapped_nrvo<seastar::future<void>, seastar::future<void> seastar::future<void>::handle_exception<kafka::protocol::apply(net::server::resources)::$_5>(kafka::protocol::apply(net::server::resources)::$_5&&)::'lambda'(kafka::protocol::apply(net::server::resources)::$_5&&)>(seastar::future<void> seastar::future<void>::handle_exception<kafka::protocol::apply(net::server::resources)::$_5>(kafka::protoco:
Full trace log: rpk_debug.txt.gz
thanks for the report @mr-swifter and @freef4ll . looking
I'll dig into this sometime this week, thanks for the nice repro.
I had a chance to try out your steps, I managed to repro it once in 3/4 runs on 22.2.7.
So the issue here seems to be a race of group offset commit (AddOffsetsToTxn + EndTxn) and a group rebalance operation. In this particular version, if there is a group re-balance is in progress, we give up and return an unknown_server_error
that throws off the client and is an unrecoverable error. This rebalance is triggered when the second producer joins the same group.
We already made a lot of improvements in this area last week, we made these operations idempotent and added retries. Since this rebalancing state is transient, retries should be able to handle it for the most part (code changes are already in redpanda-nightly:latest
docker). Also we plan to revisit the unknown_server_error
(in #6978) and only return it in most exceptional circumstances in the code.
Would you be able to repeat your experiment with redpanda-nightly:latest
? I did a few times and I couldn't reproduce with latest changes, can you confirm too?
Thanks @bharathv !
On the nightly 20221029git3362da8 (rev 3362da8)
, still able to reproduce:
transactions_id_first_31821_20221031_0956.txt.gz :
%7|Mon Oct 31 06:56:23 2022.005|PARTITIONER|rdkafka#producer-1| [thrd:app]: txn_output [4] is the new sticky partition
%7|Mon Oct 31 06:56:23 2022.005|ADDPARTS|rdkafka#producer-1| [thrd:app]: Marked txn_output [4] as part of transaction: scheduling registration
%7|Mon Oct 31 06:56:23 2022.005|TXNAPI|rdkafka#producer-1| [thrd:app]: Transactional API called: send_offsets_to_transaction (in txn state InTransaction, idemp state Assigned)
%7|Mon Oct 31 06:56:23 2022.005|SEND|rdkafka#producer-1| [thrd:TxnCoordinator]: TxnCoordinator/0: Sent AddOffsetsToTxnRequest (v0, 86 bytes @ 0, CorrId 2623)
%7|Mon Oct 31 06:56:23 2022.006|ADDPARTS|rdkafka#producer-1| [thrd:main]: TxnCoordinator/0: Registering partitions with transaction
%7|Mon Oct 31 06:56:23 2022.006|SEND|rdkafka#producer-1| [thrd:TxnCoordinator]: TxnCoordinator/0: Sent AddPartitionsToTxnRequest (v0, 71 bytes @ 0, CorrId 2624)
%7|Mon Oct 31 06:56:23 2022.012|RECV|rdkafka#producer-1| [thrd:TxnCoordinator]: TxnCoordinator/0: Received AddOffsetsToTxnResponse (v0, 6 bytes, CorrId 2623, rtt 6.79ms)
%7|Mon Oct 31 06:56:23 2022.012|ADDOFFSETS|rdkafka#producer-1| [thrd:main]: AddOffsetsToTxn response from TxnCoordinator/0: NO_ERROR ()
%7|Mon Oct 31 06:56:23 2022.012|SEND|rdkafka#producer-1| [thrd:192.168.66.7:9092/0]: 192.168.66.7:9092/0: Sent TxnOffsetCommitRequest (v3, 167 bytes @ 0, CorrId 1748)
%7|Mon Oct 31 06:56:23 2022.018|RECV|rdkafka#producer-1| [thrd:TxnCoordinator]: TxnCoordinator/0: Received AddPartitionsToTxnResponse (v0, 30 bytes, CorrId 2624, rtt 12.35ms)
%3|Mon Oct 31 06:56:23 2022.018|ADDPARTS|rdkafka#producer-1| [thrd:main]: TxnCoordinator/0: Failed to add partition "txn_output" [4] to transaction: Broker: Producer attempted a transactional operation in an invalid state
%1|Mon Oct 31 06:56:23 2022.018|TXNERR|rdkafka#producer-1| [thrd:main]: Fatal transaction error: Failed to add partitions to transaction: Broker: Producer attempted a transactional operation in an invalid state (INVALID_TXN_STATE)
%7|Mon Oct 31 06:56:23 2022.018|RECV|rdkafka#producer-1| [thrd:192.168.66.7:9092/0]: 192.168.66.7:9092/0: Received TxnOffsetCommitResponse (v3, 26 bytes, CorrId 1748, rtt 6.39ms)
%7|Mon Oct 31 06:56:23 2022.018|FATAL|rdkafka#producer-1| [thrd:main]: Fatal error: Broker: Producer attempted a transactional operation in an invalid state: Failed to add partitions to transaction: Broker: Producer attempted a transactional operation in an invalid state
%3|Mon Oct 31 06:56:23 2022.018|ERROR|rdkafka#producer-1| [thrd:main]: Fatal error: Broker: Producer attempted a transactional operation in an invalid state: Failed to add partitions to transaction: Broker: Producer attempted a transactional operation in an invalid state
%7|Mon Oct 31 06:56:23 2022.018|TXNSTATE|rdkafka#producer-1| [thrd:main]: Transaction state change InTransaction -> FatalError
TRACE 2022-10-31 06:56:23,059 [shard 0] tx - rm_group_frontend.cc:263 - sending name:begin_group_tx, group_id:{librdkafka_transactions_example_group}, pid:{producer_identity: id=12001, epoch=3}, tx_seq:13142, ec:tx_errc::none, etag:31
TRACE 2022-10-31 06:56:23,059 [shard 0] tx - tx_gateway_frontend.cc:51 - released_lock name:add_offsets_to_tx, tx_id:{id_first_31821}
TRACE 2022-10-31 06:56:23,059 [shard 0] tx - tx_gateway_frontend.cc:100 - leaving tm_stm's gate
TRACE 2022-10-31 06:56:23,059 [shard 0] kafka - request_context.h:168 - [192.168.66.1:58252] sending 25:add_offsets_to_txn for {rdkafka}, response {throttle_time_ms=0 error_code={ error_code: none [0] }}
TRACE 2022-10-31 06:56:23,062 [shard 0] kafka - requests.cc:91 - [192.168.66.1:58252] processing name:add_partitions_to_txn, key:24, version:0 for rdkafka, mem_units: 8134
TRACE 2022-10-31 06:56:23,062 [shard 0] tx - tx_gateway_frontend.cc:98 - entered tm_stm's gate
TRACE 2022-10-31 06:56:23,062 [shard 0] tx - tx_gateway_frontend.cc:47 - got_lock name:add_partition_to_tx, tx_id:{id_first_31821}
TRACE 2022-10-31 06:56:23,062 [shard 0] tx - rm_partition_frontend.cc:253 - processing name:begin_tx, ntp:{kafka/txn_output/4}, pid:{producer_identity: id=12001, epoch=3}, tx_seq:13142
WARN 2022-10-31 06:56:23,062 [shard 0] tx - [{kafka/txn_output/4}] - rm_stm.cc:381 - can't begin a tx {producer_identity: id=12001, epoch=3} with tx_seq 13142: a producer id is already involved in a tx with tx_seq 13141
WARN 2022-10-31 06:56:23,062 [shard 0] tx - rm_partition_frontend.cc:309 - rm_stm::begin_tx({kafka/txn_output/4},...) failed with tx_errc::unknown_server_error
TRACE 2022-10-31 06:56:23,062 [shard 0] tx - rm_partition_frontend.cc:262 - sending name:begin_tx, ntp:{kafka/txn_output/4}, pid:{producer_identity: id=12001, epoch=3}, tx_seq:13142, ec:tx_errc::unknown_server_error, etag:-9223372036854775808
WARN 2022-10-31 06:56:23,062 [shard 0] tx - rm_partition_frontend.cc:155 - local execution of begin_tx pid:{producer_identity: id=12001, epoch=3} tx_seq:13142 failed with tx_errc::unknown_server_error
WARN 2022-10-31 06:56:23,062 [shard 0] tx - tx_gateway_frontend.cc:1132 - begin_tx request to {kafka/txn_output/4} failed with tx_errc::unknown_server_error
TRACE 2022-10-31 06:56:23,062 [shard 0] tx - tx_gateway_frontend.cc:51 - released_lock name:add_partition_to_tx, tx_id:{id_first_31821}
TRACE 2022-10-31 06:56:23,062 [shard 0] tx - tx_gateway_frontend.cc:100 - leaving tm_stm's gate
TRACE 2022-10-31 06:56:23,062 [shard 0] kafka - request_context.h:168 - [192.168.66.1:58252] sending 24:add_partitions_to_txn for {rdkafka}, response {throttle_time_ms=0 results={{name={txn_output} results={{partition_index=4 error_code={ error_code: invalid_txn_state [48] }}}}}}
TRACE 2022-10-31 06:56:23,062 [shard 0] kafka - requests.cc:91 - [192.168.66.1:58257] processing name:txn_offset_commit, key:28, version:3 for rdkafka, mem_units: 8326
TRACE 2022-10-31 06:56:23,062 [shard 0] kafka - txn_offset_commit.cc:86 - Handling request {transactional_id=id_first_31821 group_id={librdkafka_transactions_example_group} producer_id=12001 producer_epoch=3 generation_id=100 member_id={rdkafka-a242d32e-0262-41c6-b7a5-0d3f52a77c52} group_instance_id={nullopt} topics={{name={txn_input} partitions={{partition_index=3 committed_offset=1156 committed_leader_epoch=-1 committed_metadata={}}}}}}
TRACE 2022-10-31 06:56:23,062 [shard 0] raft - [group_id:143, {kafka/__consumer_offsets/1}] replicate_entries_stm.cc:186 - Self append entries - {raft_group:{143}, commit_index:{6298628}, term:{31}, prev_log_index:{6298628}, prev_log_term:{31}, last_visible_index:{6298628}}
TRACE 2022-10-31 06:56:23,062 [shard 0] storage - storage_resources.cc:215 - stm_take_bytes 12918357 += 177 (current 10609119237)
TRACE 2022-10-31 06:56:23,062 [shard 0] storage - storage_resources.cc:189 - offset_translator_take_bytes 40979848 (current 177)
TRACE 2022-10-31 06:56:23,062 [shard 0] storage - storage_resources.cc:202 - configuration_manager_take_bytes 12918357 += 177 (current 10609119237)
TRACE 2022-10-31 06:56:23,062 [shard 0] raft - [group_id:143, {kafka/__consumer_offsets/1}] replicate_entries_stm.cc:196 - Leader append result: {append_time:9403373, base_offset:{6298629}, last_offset:{6298629}, byte_size:177}
TRACE 2022-10-31 06:56:23,062 [shard 0] io - dev 0 : req 0xe0000959b380 queue len 4096 ticket 128:1024
TRACE 2022-10-31 06:56:23,062 [shard 0] io - dev 0 : req 0xe0000959b380 submit
TRACE 2022-10-31 06:56:23,064 [shard 0] io - dev 0 : req 0xe0000959b380 complete
TRACE 2022-10-31 06:56:23,064 [shard 0] raft - [group_id:143, {kafka/__consumer_offsets/1}] consensus.cc:2247 - flushed offset updated: 6298629
TRACE 2022-10-31 06:56:23,064 [shard 0] raft - [group_id:143, {kafka/__consumer_offsets/1}] consensus.cc:2561 - Leader commit index updated 6298629
TRACE 2022-10-31 06:56:23,064 [shard 0] raft - [group_id:143, {kafka/__consumer_offsets/1}] replicate_entries_stm.cc:374 - Replication result [offset: 6298629, term: 31, commit_idx: 6298629, current_term: 31]
TRACE 2022-10-31 06:56:23,064 [shard 0] raft - [group_id:143, {kafka/__consumer_offsets/1}] replicate_entries_stm.cc:407 - Replication success, last offset: 6298629, term: 31
TRACE 2022-10-31 06:56:23,064 [shard 0] kafka - request_context.h:168 - [192.168.66.1:58257] sending 28:txn_offset_commit for {rdkafka}, response {throttle_time_ms=0 topics={{name={txn_input} partitions={{partition_index=3 error_code={ error_code: none [0] }}}}}}
TRACE 2022-10-31 06:56:23,070 [shard 0] exception - Throw exception at:
0x4883b2f 0x45baaeb /opt/redpanda/lib/libc++abi.so.1+0x2d53b 0x46a1f83 0x46a2067 0x46755d7 0x467843f 0x4676523 0x45aa58b 0x45a8d87 0x195e88b 0x4914c1f /opt/redpanda/lib/libc.so.6+0x2b1c7 /opt/redpanda/lib/libc.so.6+0x2b29f 0x1959f6f
--------
seastar::continuation<seastar::internal::promise_base_with_type<seastar::temporary_buffer<char> >, seastar::reactor::do_read_some(seastar::pollable_fd_state&, seastar::internal::buffer_allocator*)::$_34, seastar::future<seastar::temporary_buffer<char> > seastar::future<void>::then_impl_nrvo<seastar::reactor::do_read_some(seastar::pollable_fd_state&, seastar::internal::buffer_allocator*)::$_34, seastar::future<seastar::temporary_buffer<char> > >(seastar::reactor::do_read_some(seastar::pollable_fd_state&, seastar::internal::buffer_allocator*)::$_34&&)::'lambda'(seastar::internal::promise_base_with_type<seastar::temporary_buffer<char> >&&, seastar::reactor::do_read_some(seastar::pollable_fd_state&, seastar::internal::buffer_allocator*)::$_34&, seastar::future_state<seastar::internal::monostate>&&), void>
--------
seastar::continuation<seastar::internal::promise_base_with_type<seastar::temporary_buffer<char> >, seastar::net::posix_data_source_impl::get()::$_2, seastar::future<seastar::temporary_buffer<char> > seastar::future<seastar::temporary_buffer<char> >::then_impl_nrvo<seastar::net::posix_data_source_impl::get()::$_2, seastar::future<seastar::temporary_buffer<char> > >(seastar::net::posix_data_source_impl::get()::$_2&&)::'lambda'(seastar::internal::promise_base_with_type<seastar::temporary_buffer<char> >&&, seastar::net::posix_data_source_impl::get()::$_2&, seastar::future_state<seastar::temporary_buffer<char> >&&), seastar::temporary_buffer<char> >
--------
seastar::continuation<seastar::internal::promise_base_with_type<seastar::temporary_buffer<char> >, seastar::input_stream<char>::read_exactly(unsigned long)::'lambda'(auto), seastar::future<seastar::temporary_buffer<char> > seastar::future<seastar::temporary_buffer<char> >::then_impl_nrvo<seastar::input_stream<char>::read_exactly(unsigned long)::'lambda'(auto), seastar::future<seastar::temporary_buffer<char> > >(auto&&)::'lambda'(seastar::internal::promise_base_with_type<seastar::temporary_buffer<char> >&&, seastar::input_stream<char>::read_exactly(unsigned long)::'lambda'(auto)&, seastar::future_state<seastar::temporary_buffer<char> >&&), seastar::temporary_buffer<char> >
--------
seastar::internal::coroutine_traits_base<std::__1::optional<unsigned long> >::promise_type
--------
This looks like a new bug, particularly with tracking of sequence numbers. Looks like an abort is not cleaning up the state correctly resulting in next begin throwing an invalid_tx_state.
@rystsov This is related to your recent addition of tx_seq in fence batches, mind taking a look? I filtered the logs to below, look at tx_seq 13141
, seems like abort not reset-ing the seq state? I stared at the code for bit but couldn't spot an obvious bug, does this ring a bell?
TRACE 2022-10-31 06:56:21,990 [shard 0] tx - rm_group_frontend.cc:253 - processing name:begin_group_tx, group_id:{librdkafka_transactions_example_group}, pid:{producer_identity: id=12001, epoch=3}, tx_seq:13141
TRACE 2022-10-31 06:56:21,991 [shard 0] tx - rm_group_frontend.cc:263 - sending name:begin_group_tx, group_id:{librdkafka_transactions_example_group}, pid:{producer_identity: id=12001, epoch=3}, tx_seq:13141, ec:tx_errc::none, etag:31
TRACE 2022-10-31 06:56:21,992 [shard 0] tx - rm_partition_frontend.cc:253 - processing name:begin_tx, ntp:{kafka/txn_output/4}, pid:{producer_identity: id=12001, epoch=3}, tx_seq:13141
TRACE 2022-10-31 06:56:21,993 [shard 0] tx - rm_partition_frontend.cc:262 - sending name:begin_tx, ntp:{kafka/txn_output/4}, pid:{producer_identity: id=12001, epoch=3}, tx_seq:13141, ec:tx_errc::none, etag:10
TRACE 2022-10-31 06:56:21,995 [shard 0] tx - rm_partition_frontend.cc:703 - processing name:abort_tx, ntp:{kafka/txn_output/4}, pid:{producer_identity: id=12001, epoch=3}, tx_seq:13141
TRACE 2022-10-31 06:56:21,995 [shard 0] tx - rm_partition_frontend.cc:711 - sending name:abort_tx, ntp:{kafka/txn_output/4}, pid:{producer_identity: id=12001, epoch=3}, tx_seq:13141, ec:tx_errc::none
TRACE 2022-10-31 06:56:21,995 [shard 0] tx - rm_group_frontend.cc:607 - processing name:abort_group_tx, group_id:{librdkafka_transactions_example_group}, pid:{producer_identity: id=12001, epoch=3}, tx_seq:13141
TRACE 2022-10-31 06:56:21,995 [shard 0] tx - rm_group_frontend.cc:615 - sending name:abort_group_tx, group_id:{librdkafka_transactions_example_group}, pid:{producer_identity: id=12001, epoch=3}, tx_seq:13141, ec:tx_errc::none
TRACE 2022-10-31 06:56:23,054 [shard 0] tx - rm_group_frontend.cc:253 - processing name:begin_group_tx, group_id:{librdkafka_transactions_example_group}, pid:{producer_identity: id=12001, epoch=3}, tx_seq:13142
TRACE 2022-10-31 06:56:23,059 [shard 0] tx - rm_group_frontend.cc:263 - sending name:begin_group_tx, group_id:{librdkafka_transactions_example_group}, pid:{producer_identity: id=12001, epoch=3}, tx_seq:13142, ec:tx_errc::none, etag:31
TRACE 2022-10-31 06:56:23,062 [shard 0] tx - rm_partition_frontend.cc:253 - processing name:begin_tx, ntp:{kafka/txn_output/4}, pid:{producer_identity: id=12001, epoch=3}, tx_seq:13142
WARN 2022-10-31 06:56:23,062 [shard 0] tx - [{kafka/txn_output/4}] - rm_stm.cc:381 - can't begin a tx {producer_identity: id=12001, epoch=3} with tx_seq 13142: a producer id is already involved in a tx with tx_seq 13141
TRACE 2022-10-31 06:56:23,062 [shard 0] tx - rm_partition_frontend.cc:262 - sending name:begin_tx, ntp:{kafka/txn_output/4}, pid:{producer_identity: id=12001, epoch=3}, tx_seq:13142, ec:tx_errc::unknown_server_error, etag:-9223372036854775808
WARN 2022-10-31 06:56:23,062 [shard 0] tx - rm_partition_frontend.cc:155 - local execution of begin_tx pid:{producer_identity: id=12001, epoch=3} tx_seq:13142 failed with tx_errc::unknown_server_error
@bharathv yeah, I think I know what's happening; abort is idempotent, basically if it can't find an ongoing tx it thinks that it was already aborted and returns success without doing anything. the problem is that when we check for an ongoing tx we check if a partition already got any data but ignores if a transaction started but hasn't received any data. it should be simple fix, I'll send a PR today
Ya that makes sense, we need to tighten that check.
nice finds @bharathv and @rystsov
Keeping it open before we confirm that the bug is gone
Thanks @rystsov and @bharathv!
I think it looks much better now, but I had to switch to a different hardware/environment as the nightly arm64 containers are lagging.
On a stress test where several of the transaction generators are continuously are being killed, I have been able to reproduce twice https://github.com/redpanda-data/redpanda/issues/7072, but didn't have tx
trace logging enabled and will update that case when this race reproduces.
@freef4ll, thank you very much for helping out here. Yes, we are aware of the arm64 containers missing for the latest release candidate. We merged a fix that should help with arm64 containers being built (we plan to have another release-candidate on Friday, 4th Nov by 9pm PST).
@freef4ll by the way there is no excuse in seeing the fatal error during the regular testing and we're working on fixing it but in general they are unavoidable (and in kafka too) so if you're planning to use transactions in prod we recommend to expect fatal errors and add code to handle them.
Why the fatals are unavoidable? Imagine that the connection between your app and redpanda is damaged just during the commit so the app never got an ack. In this case the client eventually gets timeout and the true status of the tx (committed / aborted) is unknown. Semantically this situation isn't different from the fatals so you anyway need to handle it.
Guys, thanks for fixing this!
Version & Environment
Redpanda version: Redpanda v22.2.2 - b6d1d461c81519a97b5ad52292fd52e796f05816 Operating System: MacOS 12.5 librdkafka v/1.9.1 / v.1.9.2 Docker: Desktop 4.11.0, Engine: 20.10.17
What went wrong?
I have two applications, consuming data within the same consumer group from topic A (6 partitions) and write data to topic B. All done in transactions (committing offset to A + writing data to B).
I start first application and it works fine, it gets all partitions assigned to the itself and process data normally. When I start second application in actually kicks out the first one. What happens is first app gets abortable transaction error, so it tries to abort transaction. But transaction abort return unknown fatal error.
librdkafka logs from first application: first-app-with-redpanda.txt
transactional.id
s are different for sure.What should have happened instead?
I tested the same with Kafka (v.2.13-3.2.1) and it seems it works normally, so transaction was successfully aborted. first-app-with-kafka.txt
How to reproduce the issue?
It can be reproduced with librdkafka transaction example code. A few changes require:
This basically to increate rate of transactions, so second app most probably will come when first app in. the middle of transaction
Additional information
I found the https://github.com/redpanda-data/redpanda/issues/3142, but I'm not sure it's the same, since leadership seems is not changing at that time, only rebalancing when new consumer joins.