launchdarkly / ruby-eventsource

Server-Sent Events client for Ruby
Other
40 stars 16 forks source link

Eventsource cannot handle large payloads #20

Closed baweaver closed 2 years ago

baweaver commented 3 years ago

Brief Summary

Ruby Server SDK calls Eventsource with large payloads of JSON data, causing Rails Puma workers to timeout on the default 90s timeout. We have identified likely causes which will be highlighted below.

Buffer Indexing

The code for handling streaming responses chunks on record breaks (/\r\n/ approx), but uses a mechanism which can lead to exponential growth on large data rows:

def read_line
  loop do
    @lock.synchronize do
      i = @buffer.index(/[\r\n]/)
      if !i.nil? && !(i == @buffer.length - 1 && @buffer[i] == "\r")
        i += 1 if (@buffer[i] == "\r" && @buffer[i + 1] == "\n")
        return @buffer.slice!(0, i + 1).force_encoding(Encoding::UTF_8)
      end
    end
    return nil if !read_chunk_into_buffer
  end
end

Specifically this line is worrying:

i = @buffer.index(/[\r\n]/)

When LD is initialized I believe that it tries to send the entirety of the payload in one line of data, but server responses are chunked streams. If the full response is 150Mb (not theoretical) and the chunk size is say 1Mb (guessing for example) that means it won't hit a record-break until 150Mb.

In other words, it does this (- is one chunk):

buffer = ''
buffer.index(/[\r\n]/)

buffer = '-'
buffer.index(/[\r\n]/)

buffer = '--'
buffer.index(/[\r\n]/)

buffer = '---'
buffer.index(/[\r\n]/)

buffer = '----'
buffer.index(/[\r\n]/)

buffer = '-----'
buffer.index(/[\r\n]/)

# ...

buffer = '-' * 150
buffer.index(/[\r\n]/)

Repeat until you get to 150Mb from what I think might be happening and you've done a substantial number of reads over the same data.

The problem is that eventsource is reading the entirety of the buffer and storing the entirety of the response, rather than checking for record breaks in each chunk.

Stream Init

Why does that become an issue? I believe this chunk of code is relevant:

message = JSON.parse(message.data, symbolize_names: true)
all_data = Impl::Model.make_all_store_data(message[:data])
@feature_store.init(all_data)
@initialized.make_true

...and that won't be triggered by conn.on_event { |event| process_message(event) } (src) until that entire JSON payload is processed from chunks.

Summary

We believe that the Ruby Server SDK is trying to retrieve the entirety of the flag data as a singular response from Ruby Eventsource, and for large clients (150Mb+) this will cause crashes.

Current experimentation I've been working on has been to prevent full reads of a partial buffer, but I believe this reflects more on the entirety of flag data being sent in a single message. If it truly is all being sent in a single response this will be a severe issue for larger clients.

baweaver commented 3 years ago

https://github.com/launchdarkly/ruby-eventsource/compare/master...baweaver:baweaver/bugs/buffer_cursor_offset?expand=1

This is my current idea for a minimal fix, but need to test it against a few prod systems over the next few days to see if that's correct. Gist of the idea is to introduce a cursor to prevent re-reading chunks. String#index takes an offset to start scanning from, which is very useful for this.

eli-darkly commented 3 years ago

This was brought to our attention recently via a support request. I believe your description is accurate, and I understand it now better than I did the original report. I'm not sure I understand this part, though:

Current experimentation I've been working on has been to prevent full reads of a partial buffer, but I believe this reflects more on the entirety of flag data being sent in a single message. If it truly is all being sent in a single response this will be a severe issue for larger clients.

By "response" do you mean a single SSE event? The stream connection normally remains open, so all downloaded data from LaunchDarkly makes up a single HTTP response. But it is true that the first SSE event sent on the stream (regardless of whether it's the Ruby SDK or another SDK) contains the full flag data. You also mentioned "large clients" which was a bit confusing, but I'm guessing that what you meant there was "large LaunchDarkly data sets".

baweaver commented 3 years ago

Yeah, that was poorly worded, let me try that one again.

From observation I believe that the initial flag data provided to init on data stores is given in a single SSE PUT message in the form of a JSON blob, so: data: { entire_flag_payload_here }. Because of our admittedly overkill usage of tokens our flags are approximately 150Mb of data going over the wire.

If that was a single message, non-streamed/chunked, it'd be a bit expensive to deserialize that much JSON. When the response is streamed and chunked there won't be a record break (/[\r\n]/) until the end of the 150Mb of data. That means that the @buffer.index(/[\r\n]/) check will be called for every chunk that's received until the end of that payload, starting from the beginning.

The experiment I'm working on right now introduces a cursor to not traverse already read chunks as a quicker fix.

In the initial message I'd used this example, where every - represents a 1Mb theoretical chunk of 150Mb of data:

buffer = ''
buffer.index(/[\r\n]/)

buffer = '-'
buffer.index(/[\r\n]/)

buffer = '--'
buffer.index(/[\r\n]/)

# ...

buffer = '-' * 150
buffer.index(/[\r\n]/)

The fix tries to do this instead:

buffer = ''
buffer.index(/[\r\n]/)

previous_buffer_size = buffer.size
buffer = '-'
buffer.index(/[\r\n]/, previous_buffer_size)

previous_buffer_size = buffer.size
buffer = '--'
buffer.index(/[\r\n]/, previous_buffer_size)

# ...

previous_buffer_size = buffer.size
buffer = '-' * 150
buffer.index(/[\r\n]/, previous_buffer_size)

...so it'll start scanning where the last @buffer.index left off instead of trying to scan the entire thing again.

To be fair this had me scratching my head for a good week or so trying to figure what was up, and it was hard to think of a good way to approach or fix it, so it took a bit to write this ticket. Sorry about the delay there.

baweaver commented 2 years ago

I'd run some isolated tests to see if I could see just how much impact that index scan has versus using a positional cursor in conjunction with it:

chunk = '-' * 1000
chunks = [*(([chunk] * 999) * 150), chunk + "\n"]

chunks.join.bytesize.fdiv 10**6 # 1 mil
# => 149.851001 (Mb)

# Full scan
Benchmark.measure do
  start_time = Time.now
  previous_time = start_time

  buffer = ''
  chunks.each_with_index do |c, i|
    if i % 1000 == 0
      current_time = Time.now - start_time.to_f
      puts "i: #{i}, delta_time: #{(current_time - previous_time).round(4)}, time: #{current_time.to_f.round(4)}"
      previous_time = current_time
    end

    buffer << c
    break true if buffer.index("\n")
  end
end

# i: 1000, delta_time: 0.0144, time: 0.0144
# i: 2000, delta_time: 0.0388, time: 0.0533
# i: 3000, delta_time: 0.051, time: 0.1043
# ...
# i: 43000, delta_time: 2.4448, time: 46.5457
# i: 44000, delta_time: 2.4821, time: 49.0279
# i: 45000, delta_time: 2.624, time: 51.6519
# ...
# i: 147000, delta_time: 9.4535, time: 648.7023
# i: 148000, delta_time: 9.5025, time: 658.2048
# i: 149000, delta_time: 9.0586, time: 667.2634

# Positional scan
Benchmark.measure do
  start_time = Time.now
  previous_time = start_time

  buffer = ''
  chunks.each_with_index do |c, i|
    if i % 1000 == 0
      current_time = Time.now - start_time.to_f
      puts "i: #{i}, delta_time: #{(current_time - previous_time).round(4)}, time: #{current_time.to_f.round(4)}"
      previous_time = current_time
    end

    pos = buffer.size
    buffer << c
    break true if buffer.index("\n", pos)
  end
end

# i: 142000, delta_time: 0.0007, time: 0.1223
# i: 143000, delta_time: 0.0007, time: 0.123
# i: 144000, delta_time: 0.0008, time: 0.1238
# i: 145000, delta_time: 0.0007, time: 0.1245
# i: 146000, delta_time: 0.0007, time: 0.1252
# i: 147000, delta_time: 0.0007, time: 0.1259
# i: 148000, delta_time: 0.0007, time: 0.1266
# i: 149000, delta_time: 0.0007, time: 0.1274

667.2634 / 0.1274
# => 5218.621638954628 (times faster)

This looks to be a viable fix to try, and I'll test it with our production servers on Monday. Any thoughts?

eli-darkly commented 2 years ago

It looks plausible at first glance, we'll just need to review it in more detail.

eli-darkly commented 2 years ago

Looking at the branch with your proposed fix, there's one thing I'm not sure is right. It's here, where you are moving @cursor_position to the current end-of-buffer immediately before appending each new chunk of data to the buffer.

It seems to me that a more logical place to do this would be within read_line, immediately after you have scanned to the end of the buffer and determined that there isn't a line break in it. Currently I think the result is the same, since read_chunk_into_buffer is only ever called from that same place, but logically it is more related to what read_line is doing.

Either way, I think there's an edge case where it could end up being wrong— but it's an edge case that I think the existing code is already vulnerable to. SSE allows the line break to be \r, \n, or \r\n. So, if we read some data and it ends in \r, we could consider that to be the end of a line but we do not want a subsequent \n at the beginning of the next chunk to be treated as another line break; it should be treated as just an optional suffix to the previous \r. Handling that properly requires keeping track of a little extra state. So I'm going to try to address that in the process of integrating this fix.

(Edge cases like that are a common pitfall in writing an SSE implementation; I'm pretty sure we ran into this one before on another platform, but missed it in Ruby. For that reason, we're looking into implementing a standardized set of acceptance/contract tests to be run against all of our SSE implementations, separate from the higher-level SDK testing.)

eli-darkly commented 2 years ago

I'm also not sure that our use of @lock was actually necessary; if there is a way that read_line or read_chunk_into_buffer would ever get called from more than one thread, I'm not seeing it.

baweaver commented 2 years ago

Yes, and to be clear these are mostly theoretical changes that look decent on paper. I would have to vet out edge cases as well and you know far more than me on how SSEs work. Still intending on giving this a test, but other projects have been demanding attention lately.

I believe the larger change, which has a much larger footprint, would be around sending the entire payload as one SSE versus potential sharding patterns for it, or filtering on the server side before they're even sent down. My fear here is that we won't be the only clients with that large of a payload in the future, so it may be a discussion worth having now.

eli-darkly commented 2 years ago

Sending the payload as multiple events would be a much, much larger change, incompatible with all our existing SDKs and affecting multiple layers of the service infrastructure. It's unlikely that that will happen in the foreseeable future. Adding some kind of filtering mechanism is more feasible and it's been requested as a feature before.

baweaver commented 2 years ago

Agreed that sharding payloads is a major breaking change. Server-side filtering would be far more likely and immediately useful, and if a change like this works it'll buy some more time to address it as an option.

I know we've written filtering at a client-side level for data stores, but ideally the filtering is done on the server side before payloads are sent.

zellyn commented 2 years ago

We've actually added filtering (at the flag data store interface level) in our Go and Java implementations, so that apps suffering from memory ballooning due to LD can explicitly specify the list of flags (or flag name prefixes) they care about. Everything is still downloaded initially, and streamed, but only the specified flags are kept.

Our ideal client SDK would probably look something like this:

Segments would presumable be handled the same way as flags, with the possible optimization of including any segments mentioned in a wanted flag.

The big question would be whether keeping the flag list per connection in Relay would be expensive. If it's a problem, some sort of copy-on-write/coalesce-on-collision system could allow all 50 instances of the same app to (eventually) share the same list of flags.

escardin commented 2 years ago

If you're going to filter watched flags, I would love if it could be filtered by tag. I feel like prefixes is more of a workaround for what tags can do more effectively.

baweaver commented 2 years ago

We might spin those discussions off into a separate ticket, don't want to overload this too much. Also I added a Ruby implementation for local filtering, so that exists too @zellyn.

eli-darkly commented 2 years ago

Filtering would be a general feature request in the entire product, not specific to the Ruby SDK. And it's a request we've received before, so there's no need to file it again; I'll just make a note that there's more interest in it. Let's table that here, so that this GitHub issue on ruby-eventsource can remain about the ruby-eventsource problem.

(Although - @zellyn, I do want to address one thing you mentioned above, as something that would probably not be in any future filtering implementation: the idea that if the client needed a flag it had not previously received, it would fetch the flag from LD or from Relay at that time. We've deliberately never implemented anything in the SDKs that would involve making HTTP requests to the service endpoints on demand during a flag evaluation— that is just too much latency and unpredictability. The SDKs do all of their HTTP dialogue in the background, updating their internal state accordingly, so that flag evaluations can simply reference that internal state. The only exception is that in server-side SDKs which have database integrations, some of that state might need to be requeried from a database if it's not currently cached.)

eli-darkly commented 2 years ago

@baweaver We're reviewing a PR now that has a number of improvements to stream reading, including the one you suggested. We should be able to release the fix this week.

zellyn commented 2 years ago

We've deliberately never implemented anything in the SDKs that would involve making HTTP requests to the service endpoints on demand during a flag evaluation— that is just too much latency and unpredictability. The SDKs do all of their HTTP dialogue in the background, updating their internal state accordingly, so that flag evaluations can simply reference that internal state.

Absolutely. Our internal home-built feature flag system works exactly the same way: all evaluations are local, based on asynchronously updated rules that can be evaluated. I don't think one synchronous evaluation for a flag would hurt, although it would also be acceptable to return the default, log a warning (so that clients can add the flag to the list of initially-requested flags), and fetch the value asynchronously.

Filtering would be a general feature request in the entire product, not specific to the Ruby SDK. And it's a request we've received before, so there's no need to file it again; I'll just make a note that there's more interest in it.

Indeed, it is seeming more and more likely that it is going to become a requirement for Square to be able to successfully use LaunchDarkly.

eli-darkly commented 2 years ago

@baweaver The changes in that PR have been released and the improved SSE client is now in the 6.2.5 release of the Ruby SDK. Please let us know if you see improvements in performance after updating to this version.

eli-darkly commented 2 years ago

Closing this because implementation changes in the last several releases have made it obsolete.