deephaven / deephaven-core

Deephaven Community Core
Other
254 stars 80 forks source link

Ensure high-rate kafka replay does not cause out of memory errors #3625

Open devinrsmith opened 1 year ago

devinrsmith commented 1 year ago

A user reported that kafka consume as a stream table with ALL_PARTITIONS_SEEK_TO_BEGINNING was causing out of memory errors with a 48GiB heap.

There are two current theories

devinrsmith commented 1 year ago

The second theory, while theoretical in the context of this issue at the moment, is a real concern more generally. For example, it would not be unreasonable to think that this same kafka stream should work with a 4GiB heap as well (or other memory constrained situations that need to work through a large replay backlog).

It may be reasonable to think that there should be some maximum memory that we'll allocate for a stream table, and once that limit is hit, we'll immediately start the next update cycle. This would allow the replay to happen "as fast as possible within the given memory limits".

jcferretti commented 1 year ago

We should improve this logging to include the number of bytes processed. We can do that by tweaking StreamConsumer to have an accept method that retuns the number of bytes processed, which can be obtained from each ConsumerRecord using serializedKeySize() and serializedValueSize() (beware that they return -1 if null).

[Update] Done here: https://github.com/deephaven/deephaven-core/pull/3628

devinrsmith commented 1 year ago

I suspect we will be able to reproduce this issue with a small heap size and relatively small kafka broker that. Ie, a 1GiB heap with 10GB of kafka data to replay.

jcferretti commented 1 year ago

In the community chat discussions (here) a couple of potentially relevant Kafka parameters and behaviors were mentioned, listing below for completeness:

devinrsmith commented 1 year ago

It seems like ultimately this is a stream table + barrage issue when there is a client subscribed to the stream table (and not necessarily an issue with kafka itself; although, we should still investigate memory limits when UGP cycle time is set very high, and lots of data is coming in).

For now, a quick and dirty workaround would be to ensure that the web UI doesn't open up a table:

class MyObject(object):
    pass

o = MyObject()
o.events = ... <stream table consumption code> ...

# or other stuff
downstream_agg = o.events.count_by("Count")