apache / druid

Apache Druid: a high performance real-time analytics database.
https://druid.apache.org/
Apache License 2.0
13.5k stars 3.7k forks source link

Parallel processing of hydrants in real time path (peon) #16077

Open kaisun2000 opened 8 months ago

kaisun2000 commented 8 months ago

Motivation

Recently we are working on the real time query performance of Druid. We found query/segment/time metric can be in 20sec or more frequently in the Peons for Druid release 25.0.0

It turned out that each segment would be processed in one processing thread. The segment may have multiple hydrants to process, but in sequence manner. To be more specific, SinkQuerySegmentWalker use DirectQueryProcessingPool.INSTANCE to process multiple hydrants. This is serialized

          return new SpecificSegmentQueryRunner<>(
              withPerSinkMetrics(
                  new BySegmentQueryRunner<>(
                      sinkSegmentId,
                      descriptor.getInterval().getStart(),
                      factory.mergeRunners(
                          DirectQueryProcessingPool.INSTANCE,
                          perHydrantRunners
                      )
                  ),
                  toolChest,
                  sinkSegmentId,
                  cpuTimeAccumulator
              ),
              new SpecificSegmentSpec(descriptor)
          );
        }
    );

In our case, there are 20 or so due to the large volumes of ingestion traffic. Each hydrant can take several hundreds milli-sec to 1 sec or so. Thus, the total time used is around 20sec or above.

Proposed changes

The goal is to parallel the processing of hydrants. There are many ways to do it. One consideration is to let most query make progress just as before. For example, if there are 10 threads in the processing pool and two queries each querying 5 segments. Currently each of the two queries would have 5 segments progressing in 5 processing threads. Each threads is working for all the hydrants in a specific segments for a specific query sequentially. The point is that 10 segments are making progress at the same time.

Thus, we propose to introduce a hydrant level processing pool for each thread in current processing pool. In the above example, we have 10 thread in the processing pool. Then we will have 10 hydrant level processing pools. Each segment would use one hydrant level thread pool to process the hydrants in parallel.

Rationale

There are many possible solutions say: 1/ use the same processing pool to parallel the hydrant level processing 2/ use another shared processing pool to parallel the hydrant level processing

Note, here if we maintain two level processing:

We need to parallel hydrant level processing to reduce latency. The hydrant level processing is file I/O bound. Thus, we may not use Unix "select" call to have a async pool such as Netty is used for socket processing. Or put it another way, we have to use more threads to gain I/O throughput, aka, use thread pool to speed it up.

The main reason of above proposal is that we maintain the invariant of the same number of segment making progress before and after for "fairness" consideration.

The potential cons of this approach is that we may have two many threads and thus potentially too much thread scheduling/context switch overhead. This may not be a big issue for two reasons

Operational impact

Not much operation impact, this is just backward compatible.

Test plan (optional)

An optional discussion of how the proposed changes will be tested. This section should focus on higher level system test strategy and not unit tests (as UTs will be implementation dependent).

Future work (optional)

An optional discussion of things that you believe are out of scope for the particular proposal but would be nice follow-ups. It helps show where a particular change could be leading us. There isn't any commitment that the proposal author will actually work on the items discussed in this section.

kaisun2000 commented 7 months ago

Some initial small scale testing, the parallel processing approach can reducing the latency of the queries to 1/2 till 1/5 or the current approach depending on the phase of ingestion progress. The closer to the segment handle off, the higher the reduction of latency.