Closed grantwest closed 6 years ago
I would say this is not much about GenStage but about receiving data from external sources in general. When you are receiving data from Kafka or a DB, you may receive large messages from TCP which is then processed into smaller parts that are sent around. During this step, it is recommend exactly to break this large binary into smaller one by copying the relevant bits, allowing you "let go" of the large binary. This is what libraries like Postgrex do and what libs such as nimble_csv
recommend.
Even if using GenStage, you are not supposed to be using multiple serial stages anyway. Rather a Kafka producer with multiple consumers that are there for parallelism.
Also note we generally don't use the issues tracker for questions, so I suggest for you to have a more nuanced discussion in another medium, such as the elixir forum. :)
This is a real problem with GenStage. If your stages run forever and process data that accumulates like this, it will never go away.
I don't know how to fix it, but I might recommend looking into using the max_heap_size
related emulator flags and more rigorous supervision trees.
I found the solution to be not to keep data in a GenServer beyond 1 message processing. If you never add the data to the state of the GenServer then it gets garbage collected very quickly. If you add it to your state then it takes a long time to garbage collect even if you are storing it in the state for only a short time.
This means you should use mailboxes for queuing, don't queue up data in your state.
I have a series of GenStages that start with reading from Kafka and end with publishing back into Kafka (Using the KafkaEx library). I am reading binary blobs from Kafka (all larger than 64 bytes) and passing them through my GenStages. Even though I have my max demand set pretty low, the total binary memory usage gets way of out hand. Does anyone have suggestions on how to deal with this? Is there anything special that I should consider because this is in the context of GenStage? Or do I need to just approach this as a normal Erlang memory issue?
I just finished reading the memory leaks chapter of Erlang in Anger, but the primary suggestions in the book seem to be at odds with having chosen to use GenStage.