apache / druid

Apache Druid: a high performance real-time analytics database.
https://druid.apache.org/
Apache License 2.0
13.5k stars 3.7k forks source link

Metrics reporting system for native parallel batch ingestion #10352

Open jihoonson opened 4 years ago

jihoonson commented 4 years ago

Motivation

Currently, the Parallel task doesn't provide any metrics and so you need to read through task logs when something goes wrong. Reading task logs is harder when it comes to Parallel task because you first need to find out what subtask went wrong in the supervisor task logs and then read through the subtask logs again. Even when you find the right task logs of a bad subtask, it might be hard to find anything good since task logs only have limited information of what the task has been doing which you need to interpret instead of actual metrics. We already find providing those metrics useful for streaming ingestion. The batch ingestion could similarly benefit from it.

Proposed changes

To help with easy debugging, the native parallel batch ingestion should provide useful metrics. These metrics will be exposed via both task reporting system and metrics emitter.

Task reports

Both live and complete task reports will be provided. Live reports will be provided while the ingestion task is running and complete task reports will be available once the task is done.

The subtask report will include metrics for bytes in/out, rows in/filtered/unparseable/out, disk spills, fetch time, and errors. The supervisor task report will include the metrics per phase which are mostly the average of subtask metrics.

Live reports

The live reports of the supervisor task will include:

The live reports of subtasks will include:

Complete reports

The complete reports of the supervisor task will include:

The complete reports of the subtasks will include:

Metrics

For task metrics, all the above metrics will be emitted via metrics emitter as well.

MiddleManager will additionally emit these metrics.

Live reporting system for Parallel task

live reporting system live reporting system (1)

Complete reporting system for Parallel task

Rationale

Rationale for the list of metrics

Live reports and metrics are mostly useful for debugging. The new metrics should be able to answer these questions.

How is my ingestion going?

Why is my ingestion slow?

What was the last state of my succeeded ingestion?

Why did my ingestion fail?

Why does my ingestion not create segments?

Why is my query slow after ingestion?

More HTTP connections for live reporting system

In the proposed live reporting system, each subtask needs to talk to its supervisor task over HTTP. This will result in more HTTP connections between tasks. However, I would like to go with the current approach for now instead of making connections between middleManagers because

Additional memory pressure in the supervisor task

The supervisor task will track metrics per phase not per subtask (except for error messages in failed subtasks). The metrics for each phase is computed by aggregating subtask metrics whenever they send reports. As a result, the supervisor task needs to keep more or less 20 metrics per phase in memory. This shouldn't be large.

Operational impact

As described above, there will be two changes of more HTTP connections between peons and additional memory usage in the supervisor task. However, neither of them is expected to have a huge impact in operation.

Test plan

The live and complete reports should be tested in integration tests. I will perform some testing for metrics on our internal cluster.

jihoonson commented 4 years ago

I haven't come up with metrics names yet. Will add them to this proposal later.

liran-funaro commented 4 years ago

I'm delighted to see this proposal. I think adding these metrics are essential for users who need to analyze issues during ingestion. Currently, to evaluate #10001, parsing logs was the only method that we could use to analyze resource consumption in our experiments and in production. I hope this proposal will gain the attention it deserves.

mghosh4 commented 4 years ago

Thanks @jihoonson for working on this. This will be super useful. I had a few things that I wanted to draw your attention to:

  1. We have scenarios in our setup where maxNumConcurrentSubTasks can become as large as 700 or so. That being said, I do believe having 20 numConnections is highly unnecessary considering they will not get any queries (batch ingestion). Do you foresee any issues at this scale?
  2. Another alternate design that I can think of is that the tasks continue to report their metrics to the Overlord (heartbeat) and the supervisor task polls them like it already does to check for health status. It would mean slightly higher memory requirements on Overlord side assuming they might have to store this information. But overlord can probably store aggregates for most metrics. What are other downsides you see? One pro is it does not add any new HTTP connection requirements.
loquisgon commented 3 years ago

Putting the communication among tasks/indexer/supervisor in a streaming system (async) is an alternative to sync communication using HTTP.