Open loquisgon opened 3 years ago
@loquisgon is https://github.com/apache/druid/pull/11247 a draft PR for this proposal?
@loquisgon this proposal sounds great! I like the idea of splitting the appenderator. Can you add more details for how the batch appenderator will work? There are roughly 3 steps in appenderator, i.e., creating in-memory segments, persisting segments on disk, and merging & pushing segments to deep storage. I'm mostly curious about what the batch appenderator does in each step, and what metadata it tracks of and how. Even if these steps can be straightforward in the batch appenderator, it would be nice to describe them here so that everyone is on the same page.
@jihoonson Added more specific steps as requested
@loquisgon thanks for adding details. I have some comments on those.
Sink
tracks how many rows have been added to itself which is the sum of in-memory segment size and all persisted segment sizes. This metadata is used to check whether the segment hits maxRowsPerSegment
and needs to be pushed. In this proposal, the batch appenderator will remove all in-memory metadata of the Sink
once it is persisted which will lose the row count in the sink. How are you thinking of tracking this information?maxRowsPerSegment
, what will the task do in the merge phase? It currently pushes all segments at once including even the small segments that hasn't hit the limit yet. The current behavior is designed for the streaming ingestion to track of the offsets of pushed segments. The batch task doesn't have to do the same. Instead, it can selectively push only the segments that are large enough. This will create "better" segments in terms of the segment size.The below are nit comments.
AppenderatorImpl
currently persists all in-memory segments when it hits the memory limit. This is because, in streaming ingestion, the realtime task should be able to stop and restart when the middleManager where the task is running on is restarted. The persist operation writes a checkpoint on disk that indicates the stream offsets corresponding to the data that is persisted, so that the task can read from the last checkpoint on restart. The batch task doesn't need this checkpoint as it's not restorable, and thus doesn't need to persist all in-memory segments at once either. So, maybe it's worth considering persisting not all segments but some of them until enough memory space is available. This way can also help creating "better" segments by mitigating https://github.com/apache/druid/issues/11252. However, https://github.com/apache/druid/issues/11252 perhaps needs to be solved by redesigning the merge operation that can be shared by both batch and streaming ingestion, so that all ingestion methods can benefit from it.FireHydrants
and Sinks
for the batch appenderator. However, these are designed to serve realtime query processing and thus have data structures and architectures to support concurrent accesses. Imagining what the batch version of FireHydrants
and Sinks
would look like, I think they will likely be very similar to the realtime version. So, even though these will be unnecessary for the batch appenderator, I think it's OK to reuse them.The first recommendation, Sink
keeping track of how many rows have been added is something I must need to address and I will in the implementation. The others are "nice" to have at this point. I think given the upside of this change (i.e. avoiding OOMs for this cause in the future) is so promising that the most important think now is get the basic correct implementation now that "closes" all Sink
, as described in this proposal. Once we do this in the future we can follow up with more improvements.
Hmm so, based on your comment, I assume you are thinking of pushing all current segments at once in the merge & push phase. In your proposal, you provided promising evidences for why we should not keep sinks and hydrants for batch ingestion. However, it does not provide enough details of the batch appenderator or the rationale for why the batch appenderator must behave in that way besides not holding sinks and hydrants. Because what you are proposing is not just not keeping sinks and hydrants for batch ingestion but adding a new batch appenderator, I think my questions for how it will work is still relevant to this proposal.
I think our proposal is a place holder to discuss what's reasonable design and implementation in the long term. I'm not sure how you define the "basic" implementation, but if you want to replicate the current behavior of AppenderatorImpl
in your initial implementation, I think it's OK because it doesn't make anything worse than now. But then everyone should understand that's your execution plan. Please add it in the proposal.
The first recommendation,
Sink
keeping track of how many rows have been added is something I must need to address and I will in the implementation.
Our proposal is not only for overall architecture and design, but also for a certain level of implementation details. See https://github.com/apache/druid/issues/8728 and https://gist.github.com/gianm/39548daef74f0373b3c87056e3db4627 as a reference. To me, it seems a valid discussion to talk about how you will implement this, but if you want, we can move it to your pr when it's ready. Please keep in mind that the point of design review is planning out details upfront as much as possible so that we can avoid pitfalls.
@jihoonson I see your point that you still need clarification of what needs to be done. Yet I am hesitant to do another pass to the document above because it might muddle things further. However, let me tell you precisely, briefly what my concrete plan is. Analysis of the code, tests and preliminary coding strongly suggest that keeping the data structures for Sink
and Firehydrant
in memory can make ingestion run out of memory. Therefore my plan is pretty simple.
Sink
and Firehidrant
and keeping just enough metadata in memory to recover them from disk later as needed (i.e. directory path for the Sink
, metadata about the Sink
like number of rows in memory so far, etc.) Sink
as usual and create new Firehydrant
.push
, first make sure all in-memory Sink
at this point are persisted & closed (as in 1, to control memory utilization), then just recover the Sink
& Firehydrant
from disk, merge Firehydrant
and push the Sink
, for all Sink
one by one.maxRowsPerSegment
is hit in the InputSourceProcessor
when a row was just added then a push (4) will happen as well then continue adding rows as usual (2)All the persist & push actions are now strictly sequential & synchronous to guarantee correctness.
Therefore the scope for this proposal is strictly limited to the above in order to manage risk & complexity but still achieving important value (i.e. drastically reducing the probability of OOM in these cases). The introduction of a new Appenderator
is just common software engineering when we accept that it really should have a different code path from the real time case. I believe that this code physical & conceptual separation will open up new critical opportunities (such not using the Sink
and Firehidrant
data structure & layout for intermediate persists of batch and even maybe introducing a pre-sorting as well) but these future opportunities are out of scope for this proposal. Having said that in this proposal we will also strive to refactor the obvious leftover code from stream processing in such a way that the code looks & feels clean and is left in the best form possible for future work.
@loquisgon thank you for the well written proposal.
I think it makes sense to think about improving batch behavior by leveraging differences in batch and realtime requirements, so I like the big picture idea.
About structuring the code: there isn't really any perfect way to do it, I think. Introducing a flag is best for minimizing code duplication, but if there are a lot of differences between the paths, they become tough to track since they're mixed together. So separating the classes seems like a good idea. I'd avoid a common superclass, since in cases where we have done it (IndexMerger, IncrementalIndex) I find the logic really hard to follow. There isn't a clear direction of control: sometimes the subclass calls into the superclass, and sometimes the superclass calls into the subclass. IMO the best approach is a shared "helper" class instead of a shared superclass, where control only flows in one direction (the main class calls the helper class; not the other way around).
About performance: how big in bytes was your 1M row test file? It looks like it took 60–90 mins to ingest, which seems like a really long time for just 1M rows. I'd expect being able to do it orders of magnitude faster than that. Did it take a long time because each row is really big, or is it related to the fact that there are a lot of segments? (Another way of asking: how long does it take to ingest the same 1M rows if the timestamps are adjusted to all be the same?) For datasets that worked without error prior to your changes, do your changes have a measurable effect on ingestion speed?
About future work: would you expect these changes to help with non-dynamic partitioning modes? For example, would these changes affect the pre-shuffle partial segment generation phase? Would you expect them to help? It would be interesting to hear your thoughts about future work in this area.
@gianm Good points about the code separation. Yeah, I strongly believe that the code paths serve different use cases (streaming vs batch -- mainly in streaming we need to query the data as it is being ingested while in batch we don't). Therefore splitting the classes is a good idea. Great point about the superclass. I understand your point, avoiding the superclass is better (inheritance can be a form of coupling).
I think the reason that the ingestion takes that long is that the data is intentionally somewhat pathological (even though it simulates a real case in production). It is a series of events over 30 years, every day in between having data. However, the data in every day is only in the order of ~100 rows. Thus there will be about ~10000 segments at the end, all pretty small. I see in my tests that disk i/o dominates, almost no cpu utilization. This is because of the intermediary writes and the merges at the end. I am running my test in my laptop which may also make things worse (but I don't think so, though I noticed that the antivirus sometimes interfered since it insisted in looking at all the tiny intermediate files being created, especially in the random ingest case for same file). I am using DAY granularity. When I used MONTH for same file, it creates 360 segments and it takes an order of magnitude (i.e. 10 times less) less time than DAY granularity. The file is a csv, with about 250 columns. Uncompressed it is about 4.3 G in size.
By the way, I agree with you that ingesting a 1M row file in 1.5 hours sounds like too much. This is not because my changes; since the changes are removing work, it can only make things faster. One way to speed it up is to realize that for batch ingestion the intermediate persists don't have to be in the "segment" format. If we did intermediate persists (for batch only) in a different format (maybe a log data structure optimized for appends) and then created the real segment at the final merge/push phase then I believe things would be way faster. I think this will also accelerate unsorted data ingest. (I don't show data here but the unsorted case for the test file also works with the changes in the proposal but it takes 13 hours!).
About future work. I used an experimental approach to this work. I knew that OOMs were an issue. I decided to take a look at dynamic ingestion since it is the most basic form. I found these issues, which I believe are orthogonal to other issues. If this proposal is accepted, I plan to implement it. Then after that I will use the same approach to hash & range partitioning. Thoughts?
I think the reason that the ingestion takes that long is that the data is intentionally somewhat pathological (even though it simulates a real case in production). It is a series of events over 30 years, every day in between having data. However, the data in every day is only in the order of ~100 rows. Thus there will be about ~10000 segments at the end, all pretty small.
Ah, OK, that makes a bit more sense. I hope this case isn't common, though. Even if ingestion completes, people are going to be in for a rude surprise at query time when faced with the overhead of all these tiny segments.
By the way, I agree with you that ingesting a 1M row file in 1.5 hours sounds like too much. This is not because my changes; since the changes are removing work, it can only make things faster. One way to speed it up is to realize that for batch ingestion the intermediate persists don't have to be in the "segment" format. If we did intermediate persists (for batch only) in a different format (maybe a log data structure optimized for appends) and then created the real segment at the final merge/push phase then I believe things would be way faster.
An excellent idea. There's some work that is sort of in this direction: search for indexSpecForIntermediatePersists
. It isn't a different format, but it's removing some complexity from the segment format.
About future work. I used an experimental approach to this work. I knew that OOMs were an issue. I decided to take a look at dynamic ingestion since it is the most basic form. I found these issues, which I believe are orthogonal to other issues. If this proposal is accepted, I plan to implement it. Then after that I will use the same approach to hash & range partitioning. Thoughts?
Well, sure, that's a good methodology, but I was hoping to have a crystal ball that lets us predict what we might find. It's interesting to think about since it might inform how we structure things today.
My crystal ball, hazy as it may be, suggests that the work you're doing should apply to the first phase of hash/range partitioned ingest, because in those phases, we're also sort of maintaining a bunch of different data destinations at once (corresponding to the expected second-phase tasks). I'm not 100% sure how the code is structured or if some additional work is needed to get these improvements to apply. But logically it seems like they would make sense there too.
Motivation
Currently native batch ingestion has limited guardrails with respect to its memory consumption. On the positive side, it has a simple memory model that estimates heap utilization during its segment creation phase and when it senses memory pressure it offloads some data to disk, freeing up heap (it performs persists "on heap" indices -- aka "intermediate" persists). However, native batch internally keeps creating data structures (such as
Sinks
andFirehydrants
) roughly proportionally to the number of segments and intermediate persists corresponding to the input file that a particular peon is processing. Therefore, regardless of the intermediary persists, as the file size grows, the memory consumption can grow unbounded eventually making ingestion exhaust all Java heap available making the ingestion task fail. The main ways to work around this today is to manage the input file (i.e. dividing it into smaller pieces) or by changing the segment granularity (so fewerSinks
andFireHydrants
are created) but these workarounds are not convenient all the time. One big reason that this situation exists is because the code path for real time ingestion and that of native for theAppenderatorImpl
class is shared causing the batch ingestion path to use data structures that it does not necessarily need. We propose to split the appenderator class into two classes:RealTimeAppenderator
andBatchAppenderator
and refactorBatchAppenderator
so that data structures that are not required to be in memory can be garbaged collected thus removing the unbounded memory growth. This proposal would leaveRealTimeAppenderator
basically intact, same as before.Proposed changes
We start by breaking up the current
AppenderatorImpl
into two new classes:RealTimeAppenderator
andBatchAppenderator
both implementing theAppenderator
interface and potentially being derived from a base class that gathers common code. TheRealTimeAppenderator
is basically the same code as it exists today in theAppenderatorImpl
and it will run unchanged for streaming (i.e. real time) tasks.BatchAppenderator
will be modified as follows.The main change for
BatchAppenderator
is actually straightforward to understand. First unnecessary code that is there to support only the real time use case can be removed (such as theVersionedIntervalTimeLine
and other code). More important, at a high level the main change is as follows. Today, there is code that persists hydrants when the code detects (through a simple but effective memory use model) heap pressure is at a certain level. After the hydrants are persisted some heap memory is released back into the system. However, stress tests and code inspection reveal that not all memory associated with the hydrants is completely released. The consequence of this is that memory grows unbounded in a single task as its number of segments/hydrants increase (in the appendix we will show results of our tests that illustrate this). The change proposed is thus to release all memory being consumed bySinks
andFirehydrants
after they are persisted. The code also makes sure thatSinks
andFireHydrants
are released in the merge phase as soon as they are used (Sinks
andFireHydrants
are recovered from their persisted files just in time when they are merged, on aSink
bySink
basis.) We have already written code that demonstrates that this is feasible and the memory savings are dramatic for our tests demonstrating that memory growth is minimal after this change (see Appendix below for preliminary results).Specific
BatchAppenderator
changesThere are four phases: creating in memory segments, persisting segments on disk, merging persisted segments, and then pushing them. The basic insight in the refactoring is that the batch appenderator does not have to keep any segments in memory after they have been persisted and they can be recovered from persistent storage any time we need them (like in merging). As soon as the current segments in memory are persisted, their associated data structures (i.e.
Sinks
&Firehydrants
) can immediately be released thus be eligible for garbage collection. This is because batch ingestion does not have to support "real time" querying. Next we explain the changes in each phase.Creating in memory segments
This part of the code has practically no change (other than removing data structures & code used for querying but not needed in batch). The regular process works as usual. As a record is added
Sinks
are created or fetched from memory and newFirehydrants
with "on heap" indices are created & added to theSinks
as usual.Persisting segments on disk
The conditions for persisting segments are exactly as before: the memory estimation model determines when to do this according to its measurements and the parameters passed in the ingestion spec (i.e.
maxBytesInMemory
etc.). However, rather than a-synchronically callingpersistAll
inside theadd
method now we synchronously call a new methodpersistAllAndClear()
. This new method is really straightforward and it is mostly composed of previous logic. The critical difference is that it enforces thatpersistAll
and a new methodclear(boolean removeOnDiskData
) (passingfalse
inremoveOnDiskData
) are called strictly sequentially. That is, the code waits until all data has been persisted and then it is cleared.persistHydrant
which is called from withinpersistAll
also no longer swaps the "on heap" memory index with aQueryableIndex
since that is no longer required. After allSinks
and theirFirehydrants
are persisted then theclear(false)
function is immediately called. This function will remove allSinks
andFirehydrants
that were persisted but without removing them from disk. AfterpersistAllAndClear()
is called noSinks
and noFirehydrants
will remain in memory immediately after that point.One critical aspect here is that the act of persisting the
FireHydrant
saves all necessary data and metadata to disk and we keep a file reference to that local storage so the code is able to recover them later as needed.After the current batch has been persisted, when
add
gets called again, the cycle will begin again until the next intermidiate persist or the final merge & push.Merging segments
After all data has been read from the input data source (or when a push is forced by the appenderator driver before the whole file is read because some conditions, like
maxRowsPerSegment
have been met) then the segments will be merged and then pushed. When we reach this phase we first callpersistAllAndClear()
to make sure all data is on disk and noSinks
are in memory. Then right before merge we recover eachSink
and theirFirehydrants
from disk. The key reason we can do this is because we know the base directory where they have been stored (local) and we know exactly their metadata and data layout. It is critical here that we recoverSinks
and associatedFirehydrants
one at a time because we want to merge them one by one then delete them again from memory so that we avoid memory pressure if we recover them all at once and then merge them.Pushing
After segments are merged they can be pushed as usual.
Rationale
An alternative that we considered is not splitting the class but rather introduce a flag. However, this would make the code hard to read and also hard to maintain since the changes, though simple to understand conceptually, are dramatic at the code level and have to modify some areas of the concurrency model used in streaming. Therefore we recommend the proposal to split the appenderators.
The downside of splitting is that now we have a split code base for the
Appenderator
functionality. Initially it may make maintenance of the code slightly more difficult for people used to the old code but eventually the advantages of the separation of concerns principle will kick in in addition to the critical benefits of avoiding unbounded memory growth.Operational impact
The code is not modifying any external APIs so no impact is expected in operation (other than the kind of out of memory issues addressed by this proposal will no longer happen). There will not be anything deprecated or removed by this change and hence no migration path that cluster operators need to be aware of. All of the features introduced will be on by default but given the magnitude of the change a feature flag will be provided to rollback the behavior in case of suspected bugs with no code changes.
Test plan
In addition to usual unit test and integration test coverage we have created a test file that demonstrates the unbounded memory growth as well as the elimination of that growth with the proposed changes.
Future Work
Appenderator
interface redesignThe work proposed here is limited to refactoring the current handling of
Sink
andFirehydrant
in order to minimize their memory utilization so we can ingest large files for batch ingestion without running out of memory. With this refactor we expect that the current concurrency exposed byAppenderator
interface (i.e. the use ofFuture
is some of its methods) is no longer required. However, we propose to separate the redesign of theAppenderator
interface (i.e. potentially splitting into concurrent/non-concurrent or completely eliminating it) in order to accelerate the time of delivery and to have more time to think about an appropriate design.Do not persist all
Sink
andFirehydrant
at the same timeThe design in this proposal assumes all
Sink
andFirehydrant
will be persisted at once and all their memory objects removed (i.e. left unreferenced so that GC can free them). There could be a more optimal implementation where we do not persist some of them (maybe they are small and currently being appended). We leave this for future work.Use alternative file format for intermediate persist (for faster ingest)
While testing versions of the prototype code to support this proposal we noticed that intermediate persists and merge may take a long time. For batch ingestions, since we do not query the segments at real time, we do not really need to use the segment format when persisting. There could be a more efficient format (i.e. fast append) and we could delegate the segment file creation just after the "merge & push" phase is complete.
References
PR for implementation of this design
Bound memory utilization for dynamic partitioning (i.e. memory growth is constant)
Original proposal for
Appenderator
designAppenderators, DataSource metadata, KafkaIndexTask
Appendix
Test design
We created a synthetic CSV file designed to reflect a real case scenario where a Druid user ingestion ran out or memory. The file has 1 million rows, each row with roughly 200 columns. The data represents many years of events and the segment granularity is set to
DAY
. There are roughly 90-100 events per day so at the end there are over 10,000 segments created. We used a peon with 2G of heap to do the runs. The partition type isdynamic
and we used defaults in all other areas (such astuningConfig
etc) of the ingestion spec. The file is ordered by timestamp.Test with current code (Apache/druid master)
The following diagram (obtained using gcplot processing the garbage collection log) shows the memory utilization of the ingestion of the test file. You can see that the memory grows really quickly (both before and after gc) until it just flattens and hits the 2G heap ceiling. Actually, this run ended in an out of memory error (OOM) thus ingestion failed. Ingestion crashed in the "segment creation" phase of the
Appenderator
not even making it to the "merge" phase. Other aspects of the garbage collection log (not shown here) shows that about 50% of the time (all the flat curve at the right of the graph) are actually spend just doing garbage collection and almost no actual work (until ingestion finally crashes). Additional data (not shown) indicates that the run ran out of memory at about the half point of segment creation (~5K segments out of 10K had been created) but before making it to the segment merge phase.Test with code incorporating the proposed changes
This test is the same as the one above but with the new code (code is not production ready yet, just enough to demonstrate the value). Now you see a very different picture. The "EMA" line is the "exponential moving average" of the heap utilization and it clearly shows that memory usage is now very stable over both phases of ingestion and also not getting even close to the 2G heap limit (the left side of the graph with the "spikes" is the segment creation phase where spikes are created when a segment gets persisted and ll its memory is released back. The flat part in the right hand side is the merge phase which clearly shows the effect of merging on a
Sink
bySink
basis and then releasing all the memory associated with theSink
andFireHydrants
after they have been merged. Now the new code fully processes the test file with no memory (or other) issues.