Some of our bidirectional streaming tests were failing intermittently. When they failed, the symptom was that more bytes were received by the end user (in the test) than the server sent.
For example, in the test testStreamingDownload_1kChunk_10kChunks_100BDownloadWatermark, we expect:
✅ The server sends 10,000 chunks of 1024 bytes:
❯ cat repro.txt | grep -i "server sent body chunk" | head -3
Server sent body chunk 1/10000 of 1024
Server sent body chunk 2/10000 of 1024
Server sent body chunk 3/10000 of 1024
❯ cat repro.txt | grep -i "server sent body chunk" | wc -l
10000
✅ URLSession didReceive data callback called a non-deterministic number of times because it may re-chunk the bytes internally, but the total number of bytes through the delegate calls is 10,240,000:
❌ The response body chunks emitted by the AsyncBackpressuredStream to the user (in the test) match 1:1 those received by the delegate callback:
❯ cat repro.txt | grep "Client received some response body bytes" | head -3
Client received some response body bytes (numBytes: 1024)
Client received some response body bytes (numBytes: 2048)
Client received some response body bytes (numBytes: 1024)
❯ cat repro.txt | grep "didReceive data" | awk '{ print $6 }' | tr -d \) | wc -l
333
❯ cat repro.txt | grep "Client received some response body bytes" | wc -l
334
❌ The total number of bytes emitted by the AsyncBackpressuredStream to the user (in the test) match is 10,240,000 and it can then reconstruct 10,000 chunks of 1024 to match what the server sent:
❯ cat repro.txt | grep "Client received some response body bytes" | awk '{ print $8 }' | tr -d \) | paste -sd+ | bc -l
10280960
❯ cat repro.txt | grep "Client reconstructing" | tail -3
Client reconstructing and verifying chunk 10038/10000
Client reconstructing and verifying chunk 10039/10000
Client reconstructing and verifying chunk 10040/10000
So we see that there was one more element emitted from the AsyncBackpressuredStream than the delegate callback wrote, which meant the test saw an additional 40960 bytes than it expected to and consequently reconstructed an additional 40 chunks of size 1024 over what the server sent.
We can see that the AsyncBackpressuredStream duplicates an element along the way, of 40960 bytes,
After some investigation, it turned out there was a missing transition in the state machine that underlies the AsyncBackpressuredStream. When calling suspendNext when there are buffered elements, but we are above the watermark, we popped the first item from the buffer and returned it without updating the state (with the new buffer, without the popped element).
Consequently, this meant that the next call to next() would return the same element again.
Modifications
The following modifications have been made in separate commits to aid review:
Add debug logging to the state machine functions, logging the old state, event, new state, and resulting action.
Add two tests which reproduce this error.
Add the missing state transition (which causes the newly added tests to reliably pass).
Result
Duplicate elements are no longer emitted from the response body.
Test Plan
Unit tests were added that fail without the fix, that now pass reliably.
Additional notes
The implementation we are using for AsyncBackpressuredStream was taken from an early draft of SE-0406. We should probably move to using something closer matching that of the current PR to the Swift tree, or that used by swift-grpc, which has also adopted this code and cleaned it up to remove the dependencies on the standard library internals. Additionally, that implementation does not have this missing state transition and also adds an intermediate state to the state machine to avoid unintended copy-on-write.
Motivation
Some of our bidirectional streaming tests were failing intermittently. When they failed, the symptom was that more bytes were received by the end user (in the test) than the server sent.
For example, in the test
testStreamingDownload_1kChunk_10kChunks_100BDownloadWatermark
, we expect:✅ The server sends 10,000 chunks of 1024 bytes:
✅ URLSession
didReceive data
callback called a non-deterministic number of times because it may re-chunk the bytes internally, but the total number of bytes through the delegate calls is 10,240,000:❌ The response body chunks emitted by the
AsyncBackpressuredStream
to the user (in the test) match 1:1 those received by the delegate callback:❌ The total number of bytes emitted by the
AsyncBackpressuredStream
to the user (in the test) match is 10,240,000 and it can then reconstruct 10,000 chunks of 1024 to match what the server sent:So we see that there was one more element emitted from the
AsyncBackpressuredStream
than the delegate callback wrote, which meant the test saw an additional 40960 bytes than it expected to and consequently reconstructed an additional 40 chunks of size 1024 over what the server sent.We can see that the
AsyncBackpressuredStream
duplicates an element along the way, of 40960 bytes,After some investigation, it turned out there was a missing transition in the state machine that underlies the
AsyncBackpressuredStream
. When callingsuspendNext
when there are buffered elements, but we are above the watermark, we popped the first item from the buffer and returned it without updating the state (with the new buffer, without the popped element).Consequently, this meant that the next call to
next()
would return the same element again.Modifications
The following modifications have been made in separate commits to aid review:
Result
Duplicate elements are no longer emitted from the response body.
Test Plan
Additional notes
The implementation we are using for
AsyncBackpressuredStream
was taken from an early draft of SE-0406. We should probably move to using something closer matching that of the current PR to the Swift tree, or that used by swift-grpc, which has also adopted this code and cleaned it up to remove the dependencies on the standard library internals. Additionally, that implementation does not have this missing state transition and also adds an intermediate state to the state machine to avoid unintended copy-on-write.