NFIBrokerage / spear

A sharp EventStoreDB v20+ client backed by Mint :yum:
https://hex.pm/packages/spear
Apache License 2.0
85 stars 14 forks source link

implement the Streams API BatchAppend rpc #53

Closed the-mikedavis closed 3 years ago

the-mikedavis commented 3 years ago

The BatchAppend rpc is a new rpc call in the streams API which is supposed to optimize append throughput.

from #45:

here's the diff of the streams.proto definitions which adds in the new batch-append rpc

diff --git a/src/Protos/Grpc/streams.proto b/src/Protos/Grpc/streams.proto
index d5fc1e50b..dd71e65f5 100644
--- a/src/Protos/Grpc/streams.proto
+++ b/src/Protos/Grpc/streams.proto
@@ -3,12 +3,16 @@ package event_store.client.streams;
 option java_package = "com.eventstore.dbclient.proto.streams";

 import "shared.proto";
+import "status.proto";
+import "google/protobuf/empty.proto";
+import "google/protobuf/timestamp.proto";

 service Streams {
    rpc Read (ReadReq) returns (stream ReadResp);
    rpc Append (stream AppendReq) returns (AppendResp);
    rpc Delete (DeleteReq) returns (DeleteResp);
    rpc Tombstone (TombstoneReq) returns (TombstoneResp);
+   rpc BatchAppend (stream BatchAppendReq) returns (stream BatchAppendResp);
 }

 message ReadReq {
@@ -157,48 +161,101 @@ message AppendResp {
+message BatchAppendReq {
+   event_store.client.UUID correlation_id = 1;
+   Options options = 2;
+   repeated ProposedMessage proposed_messages = 3;
+   bool is_final = 4;
+
+   message Options {
+       event_store.client.StreamIdentifier stream_identifier = 1;
+       oneof expected_stream_position {
+           uint64 stream_position = 2;
+           google.protobuf.Empty no_stream = 3;
+           google.protobuf.Empty any = 4;
+           google.protobuf.Empty stream_exists = 5;
+       }
+       google.protobuf.Timestamp deadline = 6;
+   }
+
+   message ProposedMessage {
+       event_store.client.UUID id = 1;
+       map<string, string> metadata = 2;
+       bytes custom_metadata = 3;
+       bytes data = 4;
+   }
+}
+
+message BatchAppendResp {
+   event_store.client.UUID correlation_id = 1;
+   oneof result {
+       google.rpc.Status error = 2;
+       Success success = 3;
+   }
+
+   event_store.client.StreamIdentifier stream_identifier = 4;
+
+   oneof expected_stream_position {
+       uint64 stream_position = 5;
+       google.protobuf.Empty no_stream = 6;
+       google.protobuf.Empty any = 7;
+       google.protobuf.Empty stream_exists = 8;
+   }
+
+   message Success {
+       oneof current_revision_option {
+           uint64 current_revision = 1;
+           google.protobuf.Empty no_stream = 2;
+       }
+       oneof position_option {
+           event_store.client.AllStreamPosition position = 3;
+           google.protobuf.Empty no_position = 4;
+       }
+   }
+}
+

(note that changes to event_store.client.{shared. => }* have been omitted)

Glancing at the new proto it looks like you stream individual BatchAppendReq messages where each BatchAppendReq adds a chunk of new messages. I would imagine that this rpc was created for the migrator efforts (the tool that migrates data from one eventstoredb to another) where the total append bytes would typically exceed the maximum. It also looks like there are some cool new controls in there for setting a deadline timestamp which could prove useful.

the-mikedavis commented 3 years ago

ah actually the (stream BatchAppendResp) return rpc value means that this actually works a lot like a persistent subscription but in reverse: the client emits chunks of events to append and the server acknowledges the chunks as they get committed

and the other thing that's interesting with the new proto definition here is that each BatchAppendReq has a stream_identifier in its options, which means that you can append to multiple different streams in a single invocation of this rpc, which removes some overhead


this feature is pretty different than any of the other functionality exposed by the grpc interface and the function(s) in spear will probably have to diverge from the normal request/5 template. I could see implementing this in a similar way to Spear.append/4, so that it takes an enumerable and does the chunking on stream_name itself, but then the feature misses the async multi-chunk ability (you can't have an effective in-flight buffer of messages to append)

I imagine this will need to be implemented with multiple new functions so that a consumer of spear has fine-grained control over how the feature gets used (it'll probably need something like a GenStage pipeline to get used most efficiently anyways)

the-mikedavis commented 3 years ago

I think a function interface in spear like this makes sense:

@spec append_batch(
        events :: Enumerable.t(),
        conn,
        send_ack_to :: pid() | GenServer.name(),
        stream_name :: String.t(),
        opts :: Keyword.t()
      ) :: :ok

And this would be asynchronous: the acknowledgement BatchAppendResp would be forwarded to that send_ack_to pid.

opts would have fields:

the-mikedavis commented 3 years ago

:point_up: that might be problematic because there'd be no way to do multiple separate batch appends at the same time on the same connection, which one may wish to do for better performance (maybe there's a way to maximize http2 performance here by splitting up batch-append requests across http2 streams?)

instead it might make sense to shuffle around the arguments like so:

@spec append_batch(
        events :: Enumerable.t(),
        conn,
        batch_id :: reference() | :new,
        stream_name :: String.t(),
        opts :: Keyword.t()
      ) :: {:ok, batch_id :: reference()}

And then opts would have the :send_ack_to option defaulting to self().

Then acknowledgements would be received like so

iex> flush()
{:batch, :ok, batch_id :: reference()}
# or in cases of error
{:batch, :error, reason :: term(), batch_id :: reference()}

or it may make sense to write a new struct for this:

iex> flush()
%Spear.BatchAppendResult{
  batch_id: reference(),
  result: :ok | {:error, reason :: term()} | Spear.Records.BatchAppendResp.t(),
  correlation_id: String.t() # uuid4 correlation_id
}