Closed dvic closed 1 year ago
One thing that I'm doubting is that we might want to introduce a more advanced Serializer behaviour, so that the user can use different content types for different events. Then the part of decoding system events might also change.
One thing that I'm doubting is that we might want to introduce a more advanced Serializer behaviour, so that the user can use different content types for different events. Then the part of decoding system events might also change.
I think this is nice to have, but also a rare use-case so I wouldn't introduce it without a user requesting it.
I implemented your comments, however, locally I just had a flaky test failing:
1) test resume subscription should resume from checkpoint (Commanded.EventStore.Adapters.Spear.SubscriptionTest)
test/event_store/subscription_test.exs:648
Assertion with == failed
code: assert Enum.map(received_events, & &1.stream_id) == ["stream2"]
left: ["stream1"]
right: ["stream2"]
stacktrace:
test/event_store/subscription_test.exs:666: (test)
The following output was logged:
09:28:09.271 [debug] Spear event store attempting to append to stream "test_resume_subscription_should_resume_from_checkpoint-stream1" 1 event(s)
09:28:09.309 [debug] Spear event store attempting to append to stream "test_resume_subscription_should_resume_from_checkpoint-stream2" 1 event(s)
09:28:09.314 [debug] Spear event store subscription "subscriber" to stream: "$ce-test_resume_subscription_should_resume_from_checkpoint", start from: :origin
09:28:09.331 [debug] Spear event store subscription "subscriber" received event: %Spear.Event{id: "eea2d6d1-c8a4-460d-8aae-1c2eff445dd1", type: "Elixir.Commanded.EventStore.Adapters.Spear.SubscriptionTest.BankAccountOpened", body: "{\"account_number\":1,\"initial_balance\":1000}", link: %Spear.Event{id: "ef903a4e-cdf0-4601-9b72-4d27838ae999", type: "$>", body: "0@test_resume_subscription_should_resume_from_checkpoint-stream1", link: nil, metadata: %{commit_position: 123004, content_type: "application/octet-stream", created: ~U[2023-03-17 08:28:09.313495Z], custom_metadata: "{\"$v\":\"3:-1:1:4\",\"$c\":120179,\"$p\":120179,\"$o\":\"test_resume_subscription_should_resume_from_checkpoint-stream1\",\"$causedBy\":\"eea2d6d1-c8a4-460d-8aae-1c2eff445dd1\",\"$correlationId\":\"dba65d67-0310-4dd8-aebc-fe5e84042217\"}", prepare_position: 123004, stream_name: "$ce-test_resume_subscription_should_resume_from_checkpoint", stream_revision: 0}}, metadata: %{commit_position: 123004, content_type: "application/json", created: ~U[2023-03-17 08:28:09.298337Z], custom_metadata: "{\"$causationId\":\"1f41c197-b408-4dff-89ca-1ec326186d04\",\"$correlationId\":\"dba65d67-0310-4dd8-aebc-fe5e84042217\",\"user_id\":\"test\"}", prepare_position: 123004, stream_name: "test_resume_subscription_should_resume_from_checkpoint-stream1", stream_revision: 0}}
09:28:09.334 [debug] Spear event store subscription "subscriber" ack event: 123004 "ef903a4e-cdf0-4601-9b72-4d27838ae999"
09:28:09.349 [debug] Spear event store subscription "subscriber" received event: %Spear.Event{id: "eea2d6d1-c8a4-460d-8aae-1c2eff445dd1", type: "Elixir.Commanded.EventStore.Adapters.Spear.SubscriptionTest.BankAccountOpened", body: "{\"account_number\":1,\"initial_balance\":1000}", link: %Spear.Event{id: "ef903a4e-cdf0-4601-9b72-4d27838ae999", type: "$>", body: "0@test_resume_subscription_should_resume_from_checkpoint-stream1", link: nil, metadata: %{commit_position: 123004, content_type: "application/octet-stream", created: ~U[2023-03-17 08:28:09.313495Z], custom_metadata: "{\"$v\":\"3:-1:1:4\",\"$c\":120179,\"$p\":120179,\"$o\":\"test_resume_subscription_should_resume_from_checkpoint-stream1\",\"$causedBy\":\"eea2d6d1-c8a4-460d-8aae-1c2eff445dd1\",\"$correlationId\":\"dba65d67-0310-4dd8-aebc-fe5e84042217\"}", prepare_position: 123004, stream_name: "$ce-test_resume_subscription_should_resume_from_checkpoint", stream_revision: 0}}, metadata: %{commit_position: 123004, content_type: "application/json", created: ~U[2023-03-17 08:28:09.298337Z], custom_metadata: "{\"$causationId\":\"1f41c197-b408-4dff-89ca-1ec326186d04\",\"$correlationId\":\"dba65d67-0310-4dd8-aebc-fe5e84042217\",\"user_id\":\"test\"}", prepare_position: 123004, stream_name: "test_resume_subscription_should_resume_from_checkpoint-stream1", stream_revision: 0}}
09:28:09.383 [debug] Spear event store subscription "subscriber" down due to: :shutdown (subscriber)
For some reason the subscription received the the event stream1 twice 🤨
When the
$all
is used (possible after #46), then the current implementation forevent_number
is not appropriate, because it can result in non-increasing numbers. With$ce-prefix
it was not a problem because the stream version of the link event, which is currently used as the event number, is always increasing: each event is a link on the$ce-prefix
stream. On the$all
stream the events are returned without any links and thus the stream version of the actual event is used and this is of course not an increasing number across the$all
stream.This PR uses the
commit_position
for theevent_number
. However, this makes many tests fail, assertion failures likeAs we discussed this, the reason for this failure is because the commit positions are not gapless, but it should not be a problem. I did check the event handler implementation and I think using this commit position should work, see https://github.com/commanded/commanded/blob/v1.4.1/lib/commanded/event/handler.ex#L872.
Furthermore, I decided to put the link inside the metadata. In this way, the caller does not lose information in case there was a link and we simply decode Spear.Event one to one to a RecordedEvent.