Sink transforms rely on SinkConnection to call force_run_chain.notify_one() to trigger a new chain run to avoid responses getting stuck in the SinkConnection until the client sends another request.
However this approach currently has a race condition allowing messages to get stuck.
Consider this scenario:
SinkConnection read_task: force_run_chain.notify_one() - Has no affect since there is already a notification.
At this point we have 2 message batches pending in the read channel.
server.rs in main task: force_run_chain.notified triggers causing a chain run to occur.
SinkConnection try_recv in main task: in_rx.try_recv()
At this point, there is still one message batch still in the read channel.
But it will be stuck there until either another response comes back or the client sends another request.
This PR fixes this issue by looping in SinkConnection::try_recv to ensure we completely drain the channel after receiving a force_run_chain.notified()
Pulled out of https://github.com/shotover/shotover-proxy/pull/1553 where this is required for tests to pass.
Sink transforms rely on SinkConnection to call
force_run_chain.notify_one()
to trigger a new chain run to avoid responses getting stuck in the SinkConnection until the client sends another request.However this approach currently has a race condition allowing messages to get stuck. Consider this scenario:
in_tx.send(messsages)
force_run_chain.notify_one()
in_tx.send(messsages)
force_run_chain.notify_one()
- Has no affect since there is already a notification.At this point we have 2 message batches pending in the read channel.
force_run_chain.notified
triggers causing a chain run to occur.in_rx.try_recv()
At this point, there is still one message batch still in the read channel. But it will be stuck there until either another response comes back or the client sends another request.
This PR fixes this issue by looping in
SinkConnection::try_recv
to ensure we completely drain the channel after receiving aforce_run_chain.notified()