apache / druid

Apache Druid: a high performance real-time analytics database.
https://druid.apache.org/
Apache License 2.0
13.39k stars 3.68k forks source link

[Proposal] Background compaction #4479

Closed jihoonson closed 1 year ago

jihoonson commented 7 years ago

Motivation

In Druid, segment size is important because it affects to query performance. If it is too small, druid spawns too many threads each of which read only a few bytes. If it's too large, query execution might not be fully parallelized.

Since the size of each segment can vary according to data distribution, it's difficult to optimize created segment size at ingestion time, especially for realtime indexing.

To handle this problem, operators usually have to setup compaction tasks manually which have still some problems like manual segment handling and unsupported concurrent execution of indexing tasks and compaction tasks.

Goals

This proposal is to introduce a new feature of automatic background segment compaction.

Challenge & solution

Druid is using a versioning system for atomic segment update. Whenever new segments are written for the same interval, they have higher versions than old segments. This versioning system works well for normal indexing tasks, but it makes difficult to run compaction tasks and index tasks which appends data to existing data sources (hereafter appending tasks) at the same time.

For example, it would be very common to run compaction tasks while appending tasks are running for the same data source (ex, realtime tasks). In this case, the result of appending tasks must be visible which means they have the same version with the existing segments of the destination data source. Here, if the compaction task increases the version of segments, result segments of appending tasks will be overshadowed because they have a lower version.

A simple solution might be increasing the version of result segments of appending tasks accordingly whenever compaction tasks are finished. However, this will block appending tasks as well as query execution while updating segment versions.

As a result, we need a new mechanism for overshadowing only before-compaction segments. The proposed solution is fine-grained prioritized locking. Each task acquires a prioritized lock only when it is really needed. More details are presented in the below section.

Current locking system

Currently, locking policy is different depending on task types. Batch index task acquires a lock for entire intervals of input data at the very beginning of the task. Realtime index task and KafkaIndexTasks acquire a lock when they allocate a segment. Realtime index additionally acquires a lock when it announces segments. Also, every lock has the same type and same priority.

Proposed Solution

In this proposal, I propose a new locking system. Compaction tasks and index tasks acquire locks of different priorities. That is, index tasks have higher priority and always acquire locks when contending with compaction tasks. To reduce lock contention, lock types are newly added.

Lock types

There are two lock types, i.e., read lock and write lock.

Prioritized locking

A lock has a priority represented by an integer. A task can acquire locks of different priorities if needed. The below table shows the interactions between different lock types and priorities. In the below table, it is assumed that Lock2 is requested while Lock1 is already acquired and not released yet. p(lock) is a function which returns the priority of the given lock.

Lock1 Lock2 Action
read read Both read locks can be acquired.
read write If p(read) >= p(write), then write lock waits for read lock to be released. Otherwise, write lock preempts read lock.
write read If p(write) >= p(read), then read lock waits for write lock to be released. Otherwise, read lock preempts write lock if it is preemptable. If it's non-preemptable, read lock waits for write lock.
write1 write2 If p(write1) >= p(write2), then write2 waits for write1 to be released. Otherwise, write2 preempts write1 if it is preemptable. If it's non-preemptable, write2 waits for write1.

Lock preemption

Lock preemption is implemented by optimistic locking. Read locks can be acquired immediately if there exist only read locks. Write locks can be acquired if there isn't a lock of the higher priority. When a lock of the higher priority is requested, it is acquired only when all existing locks for the same interval and the same dataSource have the lower priority and are preemptable. When a lock is acquired, all existing locks of the same interval and dataSource are revoked. Once a lock is revoked, unlock() or upgrade() for the revoked lock are failed and corresponding TaskAction (lockReleaseAction and lockUpgradeAction) must notify this to the task. Also, a new isValid() method is needed for read lock which is used before releasing read locks.

Fine-grained locking

Each task acquires a lock when it really needs.

Examples

Compaction task

The coordinator runs compaction tasks periodically. A compaction task iterates each interval of the input data source and runs sub tasks for segment compaction which are similar to the existing merge task. Each sub task simply merges input segments into a single segment. Note that the merge can occur across the segments of the different intervals.

  1. Requests a write lock for a given interval. This lock has a lower priority than other types of index tasks.
  2. Once the lock is acquired, gets a list of segments for the interval and add all segments to the input segments.
  3. Repeats 1-2 until the total size of input segments exceeds a configurable threshold.
  4. Merges the input segments and publish the result. Releases all write locks for input segments.
  5. Repeats 1-4 until all intervals are processed.

Update tasks VS compaction task

If an index task asks a lock of higher priority while a compaction task is loading segments, the lock of compaction task is canceled because the lock priority of compaction tasks is lower than that of other tasks. In this case, the compaction task cancels the compaction for the canceled interval and continues its job for other intervals.

When every other tasks using ActionBasedSegmentAllocator find the previousSegmentId for new segment allocation, they must acquire a write lock for the interval where the new segment will be added and then get the list of segments to find the last segment id.

This method guarantees that at most a single task writes data into an interval at any time, thus the problem of overshadowed segments can be avoided.

Implementation plan for compaction task

I'm going to add a compaction task running with a single process as the first step. And then, I'll parallelize it as the second step. For the task parallelization, I'll raise another issue soon.

Required locks for different types of tasks

Task Locks
Realtime (highest priority) write lock when publishing segments
Batch (high priority) read lock if they read segments -> write lock when publishing segments
Append/Merge/Convert/Compaction (low priority) read lock to read segments -> write lock to publish segments
Kill/Move/Archive/Restore (low priority) write lock
gianm commented 7 years ago

@jihoonson this looks good to me in general. I have one question though. Are read locks really needed? Is there any kind of task that wants a read lock but not a write lock?

jihoonson commented 7 years ago

@gianm it relates to the current implementation of some tasks. For example, hadoop index tasks get the list of input segments when they start. Without lock types, this kind of large locks are likely to prevent other tasks from running at the same time if they read the common part of the same data source.

If we can change all task types to get fine-grained locks instead of getting a large lock at the beginning, maybe lock types aren't needed. But, I'm not sure how this change affects to performance of index tasks.

gianm commented 7 years ago

Do you mean hadoop tasks that have dataSource input specs? I thought that would only be for the specific intervals that the indexing job is for, meaning it would need to get a write lock for those intervals anyway. So a read lock by itself doesn't seem that useful.

Am I missing something there though?

jihoonson commented 7 years ago

Do you mean hadoop tasks that have dataSource input specs?

Ah, yes. I mean hadoop tasks which read a dataSource, but the output dataSource can be different from the input dataSource. In this case, they will get a read lock for the input dataSource and a write lock for the output dataSource.

gianm commented 7 years ago

Ah, okay, I wasn't thinking about those. In that case it sounds good to me.

jihoonson commented 7 years ago

Ok. Thank you for the review!

drcrallen commented 7 years ago

For reference https://github.com/druid-io/druid/pull/1679

gianm commented 7 years ago

Good point @drcrallen, it seems this proposal has ended up more similar to #1679 than the original proposal from #4434. I think we should try to merge them.

@pjain1 do you plan to keep working on #1679? Or would you like to fold that together into #4434 and @jihoonson can pick it up?

@jihoonson, the comment history of #1679 probably has some content relevant to this proposal.

jihoonson commented 7 years ago

Interesting! Thanks for letting me know @drcrallen. @gianm, I'll check the comments and the patch itself of #1679.

@pjain1 please let me know if you still want to work on #1679.

jihoonson commented 7 years ago

@pjain1 #1679 is a nice work and many parts are similar to this proposal. As a result, I think most codes of #1679 can be reused. If you don't mind, I'll make a new patch based on your work after resolving conflicts and adding some features needed in this proposal.

jihoonson commented 7 years ago

@gianm @drcrallen @pjain1 I updated the proposal to include #1679.

leventov commented 7 years ago

Differentiating locks by three dimensions (read - write, preemptive - non-preemptive, priority) feels like it could be simplified. It should be possible to merge at least two of them, so leaving just two dimensions, or preferably one dimension.

Also, I suggest to explore "read - restructure (compaction, merge, append) - new data (realtime index, batch index)" dimension, it might be helpful.

Also, something here might be isomorphic to the tri-level read-write-update system.

jihoonson commented 7 years ago

@leventov thank you for the good comment.

Differentiating locks by three dimensions (read - write, preemptive - non-preemptive, priority) feels like it could be simplified. It should be possible to merge at least two of them, so leaving just two dimensions, or preferably one dimension.

I guess you mean merging read/write with preemptive/non-preemptive. The motivation here is compaction tasks should not block the tasks submitted by users. For example, let me suppose there are two tasks, a realtime indexing task and a compaction task which optimizes the segments generated by the realtime task. The realtime task can try to append new data for an interval while the compaction task is running for the same interval because some events might arrive late, and even in such a case, the compaction task should not block the realtime task. Please note that the lock is acquired for an interval, not individual segment. I think preemptible write lock is a simple solution to solve this problem. Please let me know if you have any better idea.

Also, I suggest to explore "read - restructure (compaction, merge, append) - new data (realtime index, batch index)" dimension, it might be helpful. Also, something here might be isomorphic to the tri-level read-write-update system.

I think it's a nice idea and closely related to fine-grained locking. Actually, fine-grained locking is a quite large issue to be addressed here. I'll raise a new issue for fine-grained locking if it's worth.

leventov commented 7 years ago

@jihoonson could you please describe types of locks for all current types of tasks + the future compaction tasks, in the proposed dimensions?

jihoonson commented 7 years ago

@leventov sure, I added a table for it to the proposal. All types of tasks basically acquire read locks if they need to read segments. And then, they upgrade them to write locks while publishing segments.

leventov commented 6 years ago

@jihoonson does #4550 contain this new three-dimensional system?

jihoonson commented 6 years ago

Ah, that system was a little bit changed. Instead of complicated preemptive/non-preemtive lock states, I added a new method doInCriticalSection() to TaskLockbox which guarantees that a given action is performed without lock preemption. I'll update this proposal shortly.

pjain1 commented 6 years ago

@jihoonson missed your comments, feel free to use any of the code.

stale[bot] commented 5 years ago

This issue has been marked as stale due to 280 days of inactivity. It will be closed in 2 weeks if no further activity occurs. If this issue is still relevant, please simply write any comment. Even if closed, you can still revive the issue at any time or discuss it on the dev@druid.apache.org list. Thank you for your contributions.

jihoonson commented 5 years ago

Still relevant.

stale[bot] commented 5 years ago

This issue is no longer marked as stale.

github-actions[bot] commented 1 year ago

This issue has been marked as stale due to 280 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If this issue is still relevant, please simply write any comment. Even if closed, you can still revive the issue at any time or discuss it on the dev@druid.apache.org list. Thank you for your contributions.

github-actions[bot] commented 1 year ago

This issue has been closed due to lack of activity. If you think that is incorrect, or the issue requires additional review, you can revive the issue at any time.