Closed kristofka closed 2 years ago
This specific race condition is not possible because of the guaranteed ordering of signals between two Erlang processes:
The only signal ordering guarantee given is the following: if an entity sends multiple signals to the same destination entity, the order is preserved; that is, if
A
sends a signalS1
toB
, and later sends signalS2
toB
,S1
is guaranteed not to arrive afterS2
. Note thatS1
may, or may not have been lost.
https://www.erlang.org/doc/reference_manual/processes.html#signal-delivery (also see https://www.erlang.org/blog/message-passing/)
If the ack message
signal arrives at all, it is guaranteed to arrive before the down
signal and associated info message caused by the subscriber terminating.
We could add a blocking ack that returns when Mint.HTTP2.stream_request_body/3
is done sending the packet but this would also be susceptible to failures: a network partition could cause the ack message to be lost since the network is unreliable. Even if EventStoreDB had an ack for the ack message, it would turn into a Two Generals' Problem. It's not really possible for Spear.ack/3
to be reliable.
I am curious.
Such an assumption needs to be carefully considered and kept in mind because it may be the case that both messages were indeed in-order. However, the Selective Receive
feature would still backfire on such an assumption since you could potentially process the message out of order.
I am not saying this is the case here, but just pointing out the nuances just in case.
No?
Yeah, a selective receive could be a footgun. Even with the signals ariving in the right order and the messages being in the mailbox in the right order, you could skip by the {:sub, ref, ack}
message to the {:DOWN, _, _, _, _}
if you had two separate receive
s.
This specific race condition is not possible because of the guaranteed ordering of signals between two
I'm sorry, I've expressed the issue poorly. The issue is not that the connection doesn't ack before closing, but that acking may not be handled by the server. Also, I haven't provided enough details.
Suppose you start event-store like that :
docker run --name esdb-node --rm -it -p 2113:2113 -p 1113:1113 \
eventstore/eventstore:latest --insecure --run-projections=All --enable-atom-pub-over-http --start-standard-projections=true
If you run : mix test test/event_store/subscription_test.exs:570
in the failing_test branch of this fork :
https://github.com/kristofka/commanded-spear-adapter/tree/failing_test, it will run this test and fail most of the time.
If you do the same on the succeeding_test branch, it'll work, most of the time. The only difference between the two is that the subscriber process sleeps for a second before dying.
You can see what happens in the following wireshark captures (sorry, only screenshots, but I can provide the full captures privately):
Frame 72: 1011 bytes on wire (8088 bits), 1011 bytes captured (8088 bits) on interface lo0, id 0
Null/Loopback
Internet Protocol Version 4, Src: 127.0.0.1, Dst: 127.0.0.1
Transmission Control Protocol, Src Port: 2113, Dst Port: 65394, Seq: 1301, Ack: 1891, Len: 955
Source Port: 2113
Destination Port: 65394
[Stream index: 3]
[Conversation completeness: Complete, WITH_DATA (31)]
[TCP Segment Len: 955]
Sequence Number: 1301 (relative sequence number)
Sequence Number (raw): 1802594998
[Next Sequence Number: 2256 (relative sequence number)]
Acknowledgment Number: 1891 (relative ack number)
Acknowledgment number (raw): 1663577776
1000 .... = Header Length: 32 bytes (8)
Flags: 0x018 (PSH, ACK)
Window: 6350
[Calculated window size: 406400]
[Window size scaling factor: 64]
Checksum: 0x01e4 [unverified]
[Checksum Status: Unverified]
Urgent Pointer: 0
Options: (12 bytes), No-Operation (NOP), No-Operation (NOP), Timestamps
[Timestamps]
[SEQ/ACK analysis]
[iRTT: 0.000094000 seconds]
[Bytes in flight: 955]
[Bytes sent since last PSH flag: 955]
TCP payload (955 bytes)
[PDU Size: 955]
HyperText Transfer Protocol 2
Stream: DATA, Stream ID: 3, Length 946
Length: 946
Type: DATA (0)
Flags: 0x00
0... .... .... .... .... .... .... .... = Reserved: 0x0
.000 0000 0000 0000 0000 0000 0000 0011 = Stream Identifier: 3
[Pad Length: 0]
DATA payload (946 bytes)
GRPC Message: /event_store.client.streams.Streams/Read, Response
0... .... = Frame Type: Data (0)
.... ...0 = Compressed Flag: Not Compressed (0)
Message Length: 941
Message Data: 941 bytes
Protocol Buffers: /event_store.client.streams.Streams/Read,response
Message: event_store.client.streams.ReadResp
Field(1): event (message)
Message: event_store.client.streams.ReadResp.ReadEvent
Field(1): event (message)
Message: event_store.client.streams.ReadResp.ReadEvent.RecordedEvent
Field(1): id (message)
Message: event_store.client.UUID
Field(2): string = 65d700fd-ea13-4074-b9dd-88157d5b17b7 (string)
Field(2): stream_identifier (message)
Message: event_store.client.StreamIdentifier
Field(3): stream_name (bytes)
Value: 636f6d6d616e6465647465737431356131663965325f313434615f343634325f38383332…
Field(4): prepare_position = 18446744073709551615 (uint64)
Field(5): commit_position = 18446744073709551615 (uint64)
Field(6): metadata (message)
Message: event_store.client.streams.ReadResp.ReadEvent.RecordedEvent.metadataMapEntry
Field(1): key = type (string)
Field(2): value = Elixir.Commanded.EventStore.Adapters.Spear.SubscriptionTest.BankAccountOpened (string)
Field(6): metadata (message)
Message: event_store.client.streams.ReadResp.ReadEvent.RecordedEvent.metadataMapEntry
Field(1): key = created (string)
Field(2): value = 16667845514256531 (string)
Field(6): metadata (message)
Message: event_store.client.streams.ReadResp.ReadEvent.RecordedEvent.metadataMapEntry
Field(1): key = content-type (string)
Field(2): value = application/json (string)
Field(7): custom_metadata (bytes)
Value: 7b2224636175736174696f6e4964223a2230316430666632632d363237652d346239612d…
Field(8): data (bytes)
Value: 7b226163636f756e745f6e756d626572223a312c22696e697469616c5f62616c616e6365…
Field(2): link (message)
Message: event_store.client.streams.ReadResp.ReadEvent.RecordedEvent
Field(1): id (message)
Message: event_store.client.UUID
Field(2): string = 9a672d1a-3221-4ec8-996a-5e78db621b18 (string)
Field(2): stream_identifier (message)
Message: event_store.client.StreamIdentifier
Field(3): stream_name (bytes)
Value: 2463652d636f6d6d616e6465647465737431356131663965325f313434615f343634325f…
Field(4): prepare_position = 18446744073709551615 (uint64)
Field(5): commit_position = 18446744073709551615 (uint64)
Field(6): metadata (message)
Message: event_store.client.streams.ReadResp.ReadEvent.RecordedEvent.metadataMapEntry
Field(1): key = type (string)
Field(2): value = $> (string)
Field(6): metadata (message)
Message: event_store.client.streams.ReadResp.ReadEvent.RecordedEvent.metadataMapEntry
Field(1): key = created (string)
Field(2): value = 16667845514345097 (string)
Field(6): metadata (message)
Message: event_store.client.streams.ReadResp.ReadEvent.RecordedEvent.metadataMapEntry
Field(1): key = content-type (string)
Field(2): value = application/octet-stream (string)
Field(7): custom_metadata (bytes)
Value: 7b222476223a22333a2d313a313a34222c222463223a3931353339392c222470223a3931…
Field(8): data (bytes)
Value: 3040636f6d6d616e6465647465737431356131663965325f313434615f343634325f3838…
Field(4): no_position (message)
Message: event_store.client.Empty
Protocol Buffers (as JSON Mapping View)
{
"event": {
"event": {
"id": {
"string": "65d700fd-ea13-4074-b9dd-88157d5b17b7"
},
"stream_identifier": {
"stream_name": "Y29tbWFuZGVkdGVzdDE1YTFmOWUyXzE0NGFfNDY0Ml84ODMyX2MyNzRmOWRlOTI3Zi1zdHJlYW0x"
},
"prepare_position": "18446744073709551615",
"commit_position": "18446744073709551615",
"metadata": [
{
"key": "type",
"value": "Elixir.Commanded.EventStore.Adapters.Spear.SubscriptionTest.BankAccountOpened"
},
{
"key": "created",
"value": "16667845514256531"
},
{
"key": "content-type",
"value": "application/json"
}
],
"custom_metadata": "eyIkY2F1c2F0aW9uSWQiOiIwMWQwZmYyYy02MjdlLTRiOWEtYTVlNy00MTNkNzJiMTNiZjEiLCIkY29ycmVsYXRpb25JZCI6IjVmMzc0MGZkLTAzZGQtNGQzNS05NDljLWJmZDIzYjFmN2YzMCIsInVzZXJfaWQiOiJ0ZXN0In0=",
"data": "eyJhY2NvdW50X251bWJlciI6MSwiaW5pdGlhbF9iYWxhbmNlIjoxMDAwfQ=="
},
"link": {
"id": {
"string": "9a672d1a-3221-4ec8-996a-5e78db621b18"
},
"stream_identifier": {
"stream_name": "JGNlLWNvbW1hbmRlZHRlc3QxNWExZjllMl8xNDRhXzQ2NDJfODgzMl9jMjc0ZjlkZTkyN2Y="
},
"prepare_position": "18446744073709551615",
"commit_position": "18446744073709551615",
"metadata": [
{
"key": "type",
"value": "$>"
},
{
"key": "created",
"value": "16667845514345097"
},
{
"key": "content-type",
"value": "application/octet-stream"
}
],
[truncated] "custom_metadata": "eyIkdiI6IjM6LTE6MTo0IiwiJGMiOjkxNTM5OSwiJHAiOjkxNTM5OSwiJG8iOiJjb21tYW5kZWR0ZXN0MTVhMWY5ZTJfMTQ0YV80NjQyXzg4MzJfYzI3NGY5ZGU5MjdmLXN0cmVhbTEiLCIkY2F1c2VkQnkiOiI2NWQ3MDBmZC1lYTEzLTQwNzQtYjlkZC04ODE1N2Q1
"data": "MEBjb21tYW5kZWR0ZXN0MTVhMWY5ZTJfMTQ0YV80NjQyXzg4MzJfYzI3NGY5ZGU5MjdmLXN0cmVhbTE="
},
"no_position": {}
}
}
The ack message is sent on line 69, the reset message is sent on line 71, we didn't get the response and the message is not acknowledged ( contrary to what the client may believe).
A blocking ack would alleviate this issue, returning :ok iff the server did take the ack into account.
Thanks again for your attention
A blocking ack would alleviate this issue, returning :ok iff the server did take the ack into account.
We don't have enough information to block until the server handles the ack: in the protocol the ack is a one way notification from the client to the EventStoreDB with no reply.
Well, it does reply with an empty event which Spear ignores. I agree though, this should be brought up with the event-store folks. If there are no guarantees with acking, then the whole concept of persistent subscription might as well be dropped. By the way, every single client example do await in the official documentation. I think it means acking cannot fail.
BTW your reply is hardly acceptable. First you said that no race condition could happen, then I went and proved they could. Now your answer is that you cannot respect a strong invariant?
You don’t owe me anything, I’ll hard fork this project and fix this as soon as I can. However you shouldn’t lie to your users.
(Also, I’ve spent a lot of time zeroing on your bug, I’ve provided ways to reproduce, and wireshark trace. Your answer is not only wrong, it’s downright insulting).
Well, it does reply with an empty event which Spear ignores.
That ReadResp is from a different subscription. You can see in the wireshark screenshots that it's a event_store.client.streams.ReadResp
which is a separate RPC from event_store.client.persistent_subscription.ReadResp
. It's also on a different HTTP/2 stream: you can see DATA [3]
(stream ID 3) for that ReadResp while all of the persistent subscription messages are on stream ID 7. Finally, you can verify all messages written in the Persistent Subscriptions Read RPC by scanning the source of that RPC for calls to WriteAsync
: there is no response for acks or nacks.
If there are no guarantees with acking, then the whole concept of persistent subscription might as well be dropped.
Persistent subscriptions only offer at-least-once delivery (docs) since acks may be lost or messages may timeout when being handled and be retried. It's up to the application using the persistent subscriptions to handle cases of multiple deliveries.
By the way, every single client example do await in the official documentation.
I hadn't seen this and I was curious what each of the official clients are doing that needs to be awaited. Looking at their code:
.await
s bubble up to tokio::sync::mpsc::Sender::send
on a bounded multi-producer-single-consumer ("MPSC") queue. It's awaiting capacity in the send queue and not a response.backpressuredWrite
also awaits capacity in a send queue and not a response._call.RequestStream.WriteAsync
is defined here as an async write. From the "Only one write can be pending at a time", this also looks like a queue but with a capacity of one. It's also one-directional.None are waiting on responses to the ack, they're just using queues to provide backpressure. We could do something similar in Spear with the following patch:
But it doesn't fix the test-case. It isn't really necessary to provide backpressure on acks anyways since persistent subscriptions have a backpressure mechanism built-in: the in-flight message buffer.
The screenshots of the wireshark sessions you provided - even the failure case - show the ack ReadReq being sent before the RST_STREAM, meaning that Spear is sending messages in the correct order.
The tricky part is that the gRPC spec says that servers should treat RST_STREAM
as an "immediate full-closure" of the stream (docs) with the potential for all currently buffered messages to be corrupted, so EventStore's treatment of RST_STREAM is correct here.
So this seems to be an unfortunate design flaw in using HTTP/2 stream closures as the mechanism for terminating a subscription. This could be fixed upstream in the server by introducing a new ReadResp and ReadReq for terminating a subscription gracefully. You may want to report this upstream. The only thing I can recommend to make that test pass is the Process.sleep/1
you're already using.
BTW your reply is hardly acceptable.
Apologies, that reply was pretty terse. I should've mentioned that I was still intending to look into your reproduction case, I was just low on time at the moment.
First you said that no race condition could happen, then I went and proved they could.
You showed a different race condition - one not in Spear or OTP.
Now your answer is that you cannot respect a strong invariant?
I can't make any change in Spear that actually helps this case for the reasons described above. It's an issue with the protocol that would need to be solved in the server.
You don’t owe me anything, I’ll hard fork this project and fix this as soon as I can. However you shouldn’t lie to your users. ... Also, I’ve spent a lot of time zeroing on your bug, I’ve provided ways to reproduce, and wireshark trace. Your answer is not only wrong, it’s downright insulting
I don't appreciate this tone at all. If the level of support that I provide for free is not to your satisfaction then I agree that you should hard-fork.
I'm closing this as not-planned since it can't be solved in Spear.
Sorry for the tone, I’ve got, perhaps unduly, frustrated by the previous terse answer. I agree the issue can be closed. I think the issue should be added to the documentation. Again, sorry for the tone. And thank you for the thorough answer. My bad.
💜 We are in it together, so make sure to take ownership of acting out of love as much as possible. It is hard, but try your best 💜
@the-mikedavis thank you for your fantastic work, I learned a lot from this thread!
No worries :)
And good call, I've added some notes to the docs for connect_to_persistent_subscription
on delivery guarantees and this case https://github.com/NFIBrokerage/spear/commit/e68905129264b153f9ad99839cab863ad802fe51
Couple of things, while I’ve apologized for my nasty comments, I’d like to make clear that I was VERY wrong. I’ve thought about the issue, and I’ve played with the Java client. Still very wrong. @the-mikedavis has been patient, and led me to the conclusion that I was wrong. The only solution to that issue is to track, client side, the acked events. Finally, after a pull request, I have to say, the standards here are really high. And @the-mikedavis is super helpful.
Hi, Since acknowledging a message is done asynchronously, it is possible that a subscriber would ack an event and then immediately quit. In this case, Mint.HTTP2.cancel_request/2 would be invoked, potentially preventing the acking to be sent to the db. Would it be possible to add a synchronous acking/nacking? I would gladly work on a pull request if you are interested, however, since I'm not that familiar with Mint, nor the records that should be returned, a few pointers would be appreciated. Thank you for the very well documented code.