microsoft / FluidFramework

Library for building distributed, real-time collaborative web applications
https://fluidframework.com
MIT License
4.71k stars 531 forks source link

1M Kafka message limit #7599

Closed vladsud closed 1 year ago

vladsud commented 2 years ago

We have talked about it before, but not sure if we ever opened an issue to track it:

There is hard 1Mb Kafka limit for messages (Kafka used by ordering service implementations), which translates to 1Mb payload limit for batch of ops submitted from DocumentDeltaConnection. I do not believe we have a check on client side to validate messages are not exceeding such size. If we implement it, the number should be coming from ordering service (similar to IConnected.serviceConfiguration.maxMessageSize) I'm also not sure what happens today if we exceed it (i.e. does service has reasonable workflow here, like nacking client)? And if nack happens, is it non-retryable nack (such that we do not get into infinite loop of reconnects / resubmits / nacks)?

I'd also suggest add a telemetry error event that is emitted at 50% of 25% of that limit, such that developers learn about it sooner (before end user hits it).

Unrelated - why do we have IConnected.maxMessageSize & IConnected.serviceConfiguration.maxMessageSize? Can we eliminate the former?

vladsud commented 2 years ago

Per https://github.com/microsoft/FluidFramework/issues/7545, it sounds like nacks are not handled correctly (it needs to be non-retryable nack). We might want properly working nack workflow as the only thing that handles these issues, but I'd still suggest having telemetry that gets hit with smaller limits (soft failure).

andre4i commented 2 years ago

I'll close https://github.com/microsoft/FluidFramework/issues/7246. This one has more insights :)

vladsud commented 2 years ago

Per discussion with Tanvir, there is actually no server imposed limit here, as long as total batch of ops is below 1M (including some overhead on server side). That's the reason we impose 16K limit size, as that allows server to keep batch under 512Kb (32 ops per box), and it can keep many boxes in same batch (way over 1M) maintaining batch semantics.

Where it breaks is that 16K is for initial message, before chunking and doing 3 times strigify. SO actual messages are much bigger and we simply violate server imposed policy here. Client should not do it, but ideally server also does not allow such messages!

andre4i commented 2 years ago

Results of my investigation (this also applies to https://github.com/microsoft/FluidFramework/issues/7545 @DLehenbauer ).

I have an experimental PR which tries to do some stress testing wrt to the payload size. It tracks the size of the payload we're sending via a high level end-to-end test. I have run this test against both ODSP and tinylicious and isolated the failure in both.

My findings are as following (cc @vladsud):

The 1MB limit which causes the infinite connect loop is due to how socket.io is currently configured and how the client deals with reconnects

Unfortunately, the failure from socket.io is not as clear as we'd need in the context of the current fluid resilience mechanisms. Socket.io will fail the connection entirely, with transport closed. There is no error handler (that I could find) from socket.io which we can wire into for us to detect this situation. We're not receiving any 'payload too large' or any of the sorts. The connection just gets dropped by the server.

This is caught by our disconnect handler here which in turn will trigger a reconnect here. As the payload has not been sent in the first place, it is considered a series of pending ops. Pending ops are sent again at reconnect. Repeat forever.

The 1MB limit in socket.io can be increased

I configured tinylicious as following:

const io = new Server({
            // enable compatibility with socket.io v2 clients
            // https://socket.io/docs/v4/client-installation/
            allowEIO3: true,
            maxHttpBufferSize: 20e6, // <---
        });

The default is 1e6 (1MB) (see).

With this configuration I was able to successfully transmit payloads of 20MB (as an example).

Going forward

What are the side effects of increasing this limit?

cc @tanviraumi @GaryWilber

The 1MB limit coincides with the Kafka message limit. Considering we are not currently able to send 1MB of messages, I'm wondering if the payload partition logic is actually getting exercised on the server side. In other words, if we send two payloads of 3MBs each, each representing a batch. Is there a guarantee that the batches will not interleave? If not, then we (client) need to come up with a good solution of re-virtualizing batching or splitting storage vs ops when it comes to large contents.

Configuring this limit

Acting on this limit (assuming increasing the socket.io limit does not break batching)

Enforce == error out.

vladsud commented 2 years ago

@tanviraumi , @GaryWilber , please take a look and provide your input (based on earlier discussion in the day)

vladsud commented 2 years ago

Plus @anthony-murphy, of course

vladsud commented 2 years ago

I think we need to start enumerating possible solutions, both short term and long term First, some capabilities available to us, and also requirements:

  1. We can upload any amount of content through blob API, if needed.
  2. We used to have content-only upload (i.e. no sequence number assignment, no broadcast) on socket as well, with ability to reference that content in future op
  3. Clients today assume each op has unique sequence number. I.e. we can't have multiple ops sharing same sequence number (though it's worth pointing out that merge tree has internal capability like that in the form of group op).

So, couple possible solutions:

  1. We can virtualize sequence numbers, i.e. one op at driver / loader level have 1:N mapping to ops in runtime, thus they have different sequencing. This allows runtime to have only one driver/loader sequence number for a batch of ops.
    • Large content (batch) can be uploaded via various mechanisms (many ops like chunking does today, or through blob API) and then client can "assign" sequences to this content later.
  2. We can build some mechanism into protocol that allows assigning (reserving) many sequence numbers to a single op.
    • This leaks out into many places (like storage, storage op requests, etc.).
  3. Someone similar to above (probably staging on the way to 3), we can get range of seq# by uploading "empty" ops in a batch, with actual content being uploaded before that. This substantially raises limits to what we can do, and we can precisely estimate when we hit 1M limit, but it is not a full solution - it likely allows us to kick the can down the road for a year or so
    • that said, the minimum payload is something like {sequenceNumber, clientSequenceNumber, type, referenceSequenceNumber, content}. We could make some of these optional maybe to reduce size even further? Let's say we somehow shrink it to 100 bytes. That means 10K ops max per batch.

IMHO, ideal solution is the one that does not put new requirements to service. So 1 & 3 would work, but 3 assumes continuation in the form of 2, which puts new requirements to service.

GaryWilber commented 2 years ago

@andre4i Great investigation!

I can describe what the server is doing today when sending messages / batches to kafka.

For the purpose of this discussion, let's assume we increase the socket.io message limit to something high so it's not an issue - maybe 100mb.

Here is the logic the server uses to send messages to kafka. The method takes in a single batch of messages

A few key points about that logic

Assuming Kafka has a 1MB message limit, here's a few example scenarios:

  1. If a client sends a batch of messages that's total size is above 1MB, the server will try to send that batch to Kafka and fail.
  2. If 3 clients sends a batch of messages at the exact same time and the total size of each batch is 0.5MB, the server will try to send those batches as 1 Kafka message (1.5MB) and fail.
  3. If X clients sends X batches, each containing 1 message under 16KB, the server will always succeed in sending them to Kafka.

All that said, I don't think we can rely on the server being able to split individual batches. Because doing so would allow the possibility of messages being interleaved. I took a look at the Kafka library we use and I do not see a way for us to guarantee ordering of multiple individual send calls to the Kafka. The Kafka library has some internal buffering logic so it might queue multiple Kafka messages one after another, but it's possible for one message to end up being sent in a future buffer window.

tanviraumi commented 2 years ago

Thanks @andre4i and @GaryWilber for the detailed investigation.

To add to what Gary has said, the message server sends to Kafka are slightly bigger in size that what the client sends. We add some metadata to the ops and then create a buffer on the stringified content (https://github.com/microsoft/FluidFramework/blob/0f0a0d58f6e651db90196494652161b0278f5ae2/server/routerlicious/packages/services-ordering-rdkafka/src/rdkafkaProducer.ts#L239).

vladsud commented 2 years ago

Actually, there might be rather simple solution to these problems that actually does not add any complexity to protocol, and instead - removes it.

Clients already know how to deal with chunked ops (in terms of accumulating them while waiting till the end, summarizer puts them into summaries, etc.). What if we allowed interleaving of sequence numbers for batches (i.e. no requirement on the server to receive and sequence batch in one go), but instead let client not process batched ops until full batch arrived (similar to chunked ops)? If communication breaks, client already knows how to resubmit, and other clients know to discard partial result. And we would get right amount of sequence numbers for ops (as ordering service would assign right numbers).

The only thing that is missing is the right order of sequence numbers. I.e. consider this case:

  1. client A starts sending batched ops - ops 1, 2, 3 are sequenced
  2. client B sends an op - 4
  3. client A sends 2 more ops and closes the batch - 5, 6.

All we need is for every client to realize that 1-3 are "delayed". We need to reorder (only on client! - server should not care) # 1 & # 2 above. I.e. the client need to virtualize sequence numbers and instead present the following to all layers below container runtime:

  1. An op from client B with seq# = 1.
  2. Ops from client A with seq# 2, 3, 4, 5, 6.

However, if connection drops after # 2 (i.e. client A leaves), then client should skip sequences 2-4 (rebased # 1) and proceed as nothing happened with next op that will have seq = 5.

All of that logic can be easily incorporated into stand-alone class (remapper) that hides all of that machinery from the rest of the system. DeltaManager will operate with real seq numbers, while layers in container runtime below this remapper would operate with virtualized seq numbers.

And protocol can be substantially simplified! Basically it becomes an interpretation problem :)

The only complexity I see is any place where we need to communicate runtime state back to layers that are not virtualized. The only area I can think of is summarizer that has to present reference sequence number to storage & ordering service. So there has to be some back-pipe through remapper to be able to do so.

Parts that I did not think about - need more thinking

  1. How that system composes with chunked ops.
  2. Tracking of multiple batches simultaneously (i.e. multiple clients are sending batches at the same time).

I believe these problems are not hard to solve (just need to clearly articulate how to deal with complexity and how to isolate it such that it does not leak to the rest of the system and can be easily tested)

tanviraumi commented 2 years ago

I like this approach. On theory, it should work and preserve the existing behavior. One follow up question regarding this:

  • An op from client B with seq# = 1.
  • Ops from client A with seq# 2, 3, 4, 5, 6

Should we reorder ops from B after A, given that 'A' started with a smaller sequence number? I don't think it matters since these changes are truly concurrent. But wanted to make sure whether reordering would cause any additional issue. I think tracking multiple batches will work in a similar way. Following your example, we will use the sequence number of the last op in a batch for ordering.

vladsud commented 2 years ago

I prefer not to, as that will results in ops from B being "stuck" and waiting for the end of A. Given that batch can be large and client can disconnect while sending it (and server might take a while to realize that), the whole doc would be "paused" for that duration. I'd prefer only ops from one client (who sent incomplete batch) to be waiting for the end, vs. all ops in file being stuck

vladsud commented 2 years ago

I'll move it out of December. I'd love to see other work being complete before we allocate resources on this item.

ghost commented 1 year ago

This issue has been automatically marked as stale because it has had no activity for 180 days. It will be closed if no further activity occurs within 8 days of this comment. Thank you for your contributions to Fluid Framework!