shotover / shotover-proxy

L7 data-layer proxy
https://docs.shotover.io
Apache License 2.0
83 stars 16 forks source link

Fix SinkConnection race condition #1711

Closed rukai closed 1 month ago

rukai commented 1 month ago

This PR fixes a race condition I discovered via an intermittent CI failure.

Unexpected event 08:35:15.868972Z PANIC panicked at shotover/src/connection.rs:90:64:
called `Result::unwrap()` on an `Err` value: Empty
   0: rust_begin_unwind
             at /rustc/051478957371ee0084a7c0913941d2a8c4757bb9/library/std/src/panicking.rs:652:5
   1: core::panicking::panic_fmt
             at /rustc/051478957371ee0084a7c0913941d2a8c4757bb9/library/core/src/panicking.rs:72:14
   2: core::result::unwrap_failed
             at /rustc/051478957371ee0084a7c0913941d2a8c4757bb9/library/core/src/result.rs:1679:5
   3: core::result::Result<T,E>::unwrap
             at /rustc/051478957371ee0084a7c0913941d2a8c4757bb9/library/core/src/result.rs:1102:23
      shotover::connection::SinkConnection::set_get_error
             at /home/runner/work/shotover-proxy/shotover-proxy/shotover/src/connection.rs:90:27
   4: shotover::connection::SinkConnection::handle_disconnect_on_recv
             at /home/runner/work/shotover-proxy/shotover-proxy/shotover/src/connection.rs:186:19
   5: shotover::connection::SinkConnection::try_recv_into
             at /home/runner/work/shotover-proxy/shotover-proxy/shotover/src/connection.rs:166:32
   6: shotover::transforms::kafka::sink_cluster::KafkaSinkCluster::recv_responses
             at /home/runner/work/shotover-proxy/shotover-proxy/shotover/src/transforms/kafka/sink_cluster/mod.rs:1048:29
   7: <shotover::transforms::kafka::sink_cluster::KafkaSinkCluster as shotover::transforms::Transform>::transform::{{closure}}
             at /home/runner/work/shotover-proxy/shotover-proxy/shotover/src/transforms/kafka/sink_cluster/mod.rs:345:13
   truncated for brevity...

The issue is a panic in this method: https://github.com/shotover/shotover-proxy/blob/9dd04c1f982a4fd1ffe7e565aea0514d295d9b6f/shotover/src/connection.rs#L85-L88

self.connection_closed_rx.try_recv() returns Err which is then unwrapped. What happened here is we expected a message to be immediately available in the channel but at the time we checked there was no message available. This channel is used to receive error messages back from the connection, so it can be reported to the sink transform. It is correct for this line to be unwrapping here since its an invariant that an error be sent everytime this function is called. Usually this line does succeed, so there is some kind of race condition on the sending side of this channel

If we zoom out a bit we can see the function calling set_get_error as per the stacktrace: https://github.com/shotover/shotover-proxy/blob/9dd04c1f982a4fd1ffe7e565aea0514d295d9b6f/shotover/src/connection.rs#L176-L192

And then even more importantly, the function calling that function: https://github.com/shotover/shotover-proxy/blob/9dd04c1f982a4fd1ffe7e565aea0514d295d9b6f/shotover/src/connection.rs#L159-L163 We can see that set_get_error is called in response to TryRecvError::Disconnected on the in_rx channel. When the in_rx channel is disconnected (also called closed) we need to be able to assume that the error has been sent over connection_closed. However as we can see by the panic we got, this clearly isnt true. To figure out where this race condition is we'll need to explore the other other side of these channels.

The other side of these channels is the read and write tasks. https://github.com/shotover/shotover-proxy/blob/9dd04c1f982a4fd1ffe7e565aea0514d295d9b6f/shotover/src/connection.rs#L320

https://github.com/shotover/shotover-proxy/blob/9dd04c1f982a4fd1ffe7e565aea0514d295d9b6f/shotover/src/connection.rs#L395

These functions are called from https://github.com/shotover/shotover-proxy/blob/9dd04c1f982a4fd1ffe7e565aea0514d295d9b6f/shotover/src/connection.rs#L233 Which runs both of these functions in their own isolated tokio task. Running them in their own tasks allows them to run in parallel (not just concurrently) to each other and the rest of shotover.

Anyway, you can see that in spawn_read_write_tasks the spawned task for reader_task will handle errors by sending the error down connection_closed_tx2 and then allowing the task to complete. https://github.com/shotover/shotover-proxy/blob/9dd04c1f982a4fd1ffe7e565aea0514d295d9b6f/shotover/src/connection.rs#L286-L305

This connection_closed_tx2 is the send side of the channel that get_set_error was panicking on due to the channel being closed. We can see here that when the reader_task function returns an error that error is sent down connection_closed_tx2. So the issue must be that we are sending the error after closing in_tx therefore allowing a race condition.

For example:

  1. reader task - in_tx is closed
  2. transform task - transform task observes that in_rx is closed and runs set_get_error.
  3. transform task - set_get_error panics due to missing error
  4. reader task - error is sent down connection_closed_tx2

But why is this happening?

When we call reader_task, in_tx is moved into reader_task and the function now has ownership of in_tx. This means that when reader_task completes the in_tx is dropped which closes the channel.

To fix this we need to pass in in_tx as a reference only so that the error handling can retain ownership of in_tx and drop it at the correct time to avoid the race condition.

This same problem exists for the write task too, so we have done similar changes there.

codspeed-hq[bot] commented 1 month ago

CodSpeed Performance Report

Merging #1711 will not alter performance

Comparing rukai:fix_sink_connection_race_condition (65c8585) with main (0d3ca8a)

Summary

✅ 39 untouched benchmarks

justinweng-instaclustr commented 1 month ago

Thanks for the comprehensive explanation! Great work!