apache / druid

Apache Druid: a high performance real-time analytics database.
https://druid.apache.org/
Apache License 2.0
13.44k stars 3.69k forks source link

Develop a new Indexer process for running ingestion tasks #7900

Open jon-wei opened 5 years ago

jon-wei commented 5 years ago

Motivation

The MiddleManager currently runs each task in a separate JVM, with identical configuration across all tasks spawned by a given MM. This model has some drawbacks:

Proposed changes

A new process type, Indexer (tentative name), will be added. This new process type is an alternative to the MiddleManager that runs all tasks in the single Indexer JVM instead of forking new processes.

Query processing

Task API resources

The Indexer will have a single ChatHandlerResource at /druid/worker/v1/chat/{id}, serving requests for all tasks managed by the Indexer.

Task Resource Management

The Indexer will have a configurable global heap limit globalIngestionHeapLimitBytes across all tasks that applies to ingestion workloads (what maxBytesInMemory is currently estimating) and merging workloads (related to ).

globalIngestionHeapLimitBytes should be lower than the total JVM heap size, to leave space for query workloads and to account for the fuzziness of the memory estimates being used.

When the sum of ingestion memory usage and merging memory usage across all tasks reaches globalIngestionHeapLimitBytes, the Indexer will trigger a global persist, causing each managed task to persist its in-heap segment data.

Per-task ingestion heap memory usage will continue to be tracked using the same mechanisms that support maxBytesInMemory.

To track per-task merging memory usage, the task flow will change slightly:

The Indexer will impose a resource management model that only considers byte-based memory estimates and limits. Row-based limits in any task specs will be ignored (rewritten to whatever represents "unlimited" for a given property).

Each task will not have a fixed individual memory limit by default, only the global limit is applied. The Indexer will also have a mode where the global limit is divided evenly across the number of worker slots.

The Indexer will allow optional per-task maxBytesInMemory configurations for tasks. If an individual maxBytesInMemory limit is hit, a task will persist individually. This is to help address potential memory starvation issues, when a subset of tasks have significantly higher data generation rates than other tasks. The per-task limit would be used to constrain tasks that are known ahead of time to have disparately high data generation rates.

Task Assignments

In the Indexer, different tasks can consume different amounts of resources, when using the default mode where tasks do not have individual maxBytesInMemory limits. This is a significant change from the existing MM model, where all tasks receive identical resource allocations.

While this can result in better resource utilization when a combination of small and big tasks run together, it opens potential for skew in assigned workloads (e.g. a majority of high-resource consumption tasks happen to be assigned at the same time to a single Indexer).

To address this initially, task assignments to Indexer processes can be done mostly randomly (with some consideration for how many tasks have already been assigned to a Indexer) , combined with deployment guidance in the docs that instruct users to limit task durations when using Indexer in this mode.

If the Indexer is running in the mode where each task has an equal share of the global heap limit, then the traditional MM task assignment algorithm can be used.

Rationale

To address the motivating concerns, other approaches are possible:

The proposed approach was chosen because it addresses all three concerns.

The primary drawback of the proposed approach is the loss of fault isolation from having separate task processes. For this reason, this proposal suggests adding a new process type instead of replacing the MM.

The loss of fault isolation means that the Indexer will need supporting patches to function well, described in the future work section.

Operational impact

Test plan (optional)

The Indexer will be deployed in large clusters that are currently using MiddleManagers, completely replacing the MiddleManagers, and the correctness and stability of ingestion will be verified over time.

Future work

For the proposed approach to work well, we will need to adjust the segment merging sequence such that its memory usage is more bounded: currently, ingestion tasks can be prone to OOM during index merging, and the impact of such failures would be amplified in the single-process Indexer.

We will also need to address the memory estimation issues with growable aggregators, described in issue https://github.com/apache/incubator-druid/issues/6743, although this is a less crucial supporting patch compared to memory bounded index merging.

The initial implementation of Indexer will not be exposed to users in documentation: until the bounded memory segment merging is supported, Indexers will not be stable enough to be a practical choice.

Other potential future work:

Related Patches

Initial process and task runner PR: https://github.com/apache/incubator-druid/pull/8107

dclim commented 5 years ago

@jon-wei thanks for writing this up, some questions:

When the sum of ingestion memory usage and merging memory usage across all tasks reaches globalIngestionHeapLimitBytes, the BigIndexer will trigger a global persist, causing each managed task to persist its in-heap segment data.

The BigIndexer will also have a mode where the global limit is divided evenly across the number of worker slots.

I think having a global heap limit configuration instead of just using the per-task maxBytesInMemory is great and would allow us to optimize our usage of the available memory. I don't really understand why we need to have a mode that divides the heap into fixed portions; what is the benefit of this? To me this feels like a throwback to the current model where every worker is allocated the same memory, whether it needs it or not, and large tasks suffer when run together with a bunch of small tasks. The main advantage I can see for this is that the global spill will not trigger in this case, which prevents unnecessary spills when you have a bunch of small tasks running with some big tasks that would otherwise keep triggering the global spill. As another option to consider, what do you think about having a configuration parameter that controls the minimum bytes in memory that needs to be reached before a sink is eligible to be spilled during a global spill? The default value could be something like globalIngestionHeapLimitBytes / druid.worker.capacity + some accounting for merging memory requirements. This would give the flexibility of heavy tasks being able to utilize more than its 'equal share' of memory, while preventing unnecessary fragmentation for lightweight tasks.

fjy commented 5 years ago

I think we should just call it an indexer process.

jon-wei commented 5 years ago

@dclim

Do we get query prioritization support? Since the query-serving resources are a shared pool among all the tasks, we'd want to make sure that important queries can take precedence over heavier ones.

Yes, this should be supported.

What does the query endpoint look like? Will it be shared among all tasks similar to the chat handler (rather than using a different port for each task)? Something like POST http://{bigIndexerHost}:{bigIndexerPort}/druid/v2/{taskId}

I'm thinking it should be shared across all tasks, but the endpoint would just be http://{bigIndexerHost}:{bigIndexerPort}/druid/v2 without a taskId

How do shared lookups work? Are all lookups loaded into memory when the BigIndexer starts, regardless of whether it is running tasks and if those tasks are query-serving? Will a BigIndexer release lookups that haven't been used in a while?

For simplicity in the initial implementation, I'm thinking all lookups should be loaded into memory when the indexer starts. (Re: query-serving, I believe lookups can also be used during ingestion, via transform specs->lookup expressions)

Can the connection/thread pool for chat handlers be separate from the one used for serving queries? For supervised tasks, the supervisor tends to flip out if the task stops responding to control/status requests, and if the pool is shared, I could see some annoying issues where heavy query load to one set of tasks causes the supervisor to kill completely unrelated tasks for being unresponsive.

That sounds like a good idea, I'll look into that.

To me this feels like a throwback to the current model where every worker is allocated the same memory, whether it needs it or not, and large tasks suffer when run together with a bunch of small tasks. The main advantage I can see for this is that the global spill will not trigger in this case, which prevents unnecessary spills when you have a bunch of small tasks running with some big tasks that would otherwise keep triggering the global spill.

Hm, it was intended as a throwback mode, I'll reconsider whether that's useful to include.

As another option to consider, what do you think about having a configuration parameter that controls the minimum bytes in memory that needs to be reached before a sink is eligible to be spilled during a global spill? The default value could be something like globalIngestionHeapLimitBytes / druid.worker.capacity + some accounting for merging memory requirements. This would give the flexibility of heavy tasks being able to utilize more than its 'equal share' of memory, while preventing unnecessary fragmentation for lightweight tasks.

This sounds useful, I will look into this as well.

@fjy

I think we should just call it an indexer process.

That sounds good.

dclim commented 5 years ago

@jon-wei thanks! Two other things that I was thinking about that might be interesting, or might be out of scope for this proposal:

Otherwise, I'm +1 on this proposal. Assuming that the new indexer will have enable/disable routes that toggle whether it will receive new task assignments similar to the MM for supporting rolling updates.

jon-wei commented 5 years ago

@dclim Great ideas, I think they make sense as follow-on features, I've added them to the Future Work section

jihoonson commented 5 years ago

@jon-wei nice proposal! Thank you for writing it up. The proposal looks good to me overall.

I just have a couple of questions.

  • When a merge notice is received, the Indexer will attempt to record two memory allocations in its memory usage tracking state:
    • a fixed amount of heap. This can scale based on some proportion of globalIngestionHeapLimitBytes.
    • a fixed amount of direct memory, needed for decompression buffers and dictionary conversions. This can scale based on the size in bytes of the fuzzy +1 factor in the familiar (druid.processing.numThreads + druid.processing.numMergeBuffers + 1) * druid.processing.buffer.sizeBytes formula.

This sounds like each task needs to reserve some memory up front. How does the indexer schedule the memory reservation request?

himanshug commented 5 years ago

Points raised in "motivation" are valid, so +1 for the proposal.

however...

A new process type, BigIndexer (tentative name), will be added. This new process type is an alternative to the MiddleManager that runs all tasks in the single BigIndexer JVM instead of forking new processes.

It looks like a MiddleManager that runs tasks in its own jvm process instead of spawning new processes. When I look at it from that perspective, it seems more logical to add a configuration parameter to existing MiddleManager node type that says whether or not to run task processes in same jvm process.

jon-wei commented 5 years ago

When I look at it from that perspective, it seems more logical to add a configuration parameter to existing MiddleManager node type that says whether or not to run task processes in same jvm process.

@himanshug I found that it was cleaner to define a new node type, since there are aspects of the new process that diverge from the existing MM's behavior.

For example, with lookups, MM process doesn't need to load them but the new indexer does. In DruidNodeDiscoveryProvider, I would otherwise have to associate the MiddleManager node type with LookupNodeService and doing that kind of control across the board with a shared node type seems complicated.

himanshug commented 5 years ago

specific case of announcing the LookupNodeService from MM is possible based on the configuration. but, it seems you found it complicated for other reasons as well, so its fine.

its just that, in the past, I have heard complaints about druid having the perception of difficult to deploy because of too many different node types.

liran-funaro commented 4 years ago

I realize that I'm a little late for the party since CliIndexer is already merged, but I just want to raise a possible issue with this design.

Once many concurrent incremental-indexes will be processed on the same JVM heap, the number of the long-lived objects will be larger than any of the individual Peons. Unfuretntly, the JVM does not handle well workloads with a huge number of long-lived objects. This evidently causes long pause times for each GC cycle that can add up to up to 50% of the process runtime. However, the value of using the CliIndexer, IMO, is great.

To solve this, I suggest storing all incremental index data (keys and values) off-heap, which will reduce the number of heap objects dramatically. Please, check out my issue (#9967) and PR (#10001) that solves exactly this problem.

This solution improves the CPU and RAM utilization of the batch ingestion by over 50% in both serial and parallel ingestion modes, and might greatly improve the resource utilization and performance of the ingestion using the CliIndexer.

yage99 commented 2 years ago

@jon-wei Hello Jonathan, we think indexer is a good feature and really simplifies the configuration process. Our druid cluster now have 90%+ task running by indexer. However, we noticed that the memory limit is evenly distributed accross tasks. Our question is why don't enable memory sharing accross different tasks slot to have better memory usage? Is there any risk? The document says per-task memory limit will be removed, which version will you chose to deliver this feature? Thanks :)