apache / druid

Apache Druid: a high performance real-time analytics database.
https://druid.apache.org/
Apache License 2.0
13.42k stars 3.68k forks source link

[Feature Proposal] Improve error reporting for task and query failures #11165

Open kfaraz opened 3 years ago

kfaraz commented 3 years ago

Motivation

Task and query failures in Druid are often difficult to analyze due to missing, incomplete or vague error messages.

A unified error reporting mechanism would improve the experience of a Druid user through:

Scope

The approach discussed here:

Summary of Changes

Flow

Advantages

An Error Code based approach has the following advantages

Examples

Some typical examples could be as under:

Module Name Error Type Module Name + Error Code Message Format Message Args Fully formed Error Message
kafka-emitter Invalid topic name kafka-emitter.topic.invalid The given topic name [%s] is invalid. Please provide a valid topic name. "test-topic" The given topic name [test-topic] is invalid. Please provide a valid topic name.
kafka-indexer Offset out of range kafka-indexer.offset.outofrange The offset [%s] for topic [%s] is out of range. Please check your topic offsets. If the issue persists, consider a hard reset of the supervisor but this might cause loss or duplication of data. "13927608","daily_transactions" The offset [13927608] for topic [daily_transactions] is out of range. Please check your topic offsets. If the issue persists, consider a hard reset of the supervisor but this can cause loss or duplication of data.
druid-compaction Invalid Segment Spec druid-compaction.segmentspec.invalid Compaction of the datasource [%s] for interval [%s] failed because the segments specified in the compaction spec are not the same as the segments currently in use. Some new segments have been published or some segments have been removed. Please consider increasing your "skipOffsetFromLatest". "daily_transactions","2021-01-01T00:00:00Z/2021-02-01T00:00:00Z" Compaction of the datasource [daily_transactions] for interval [2021-01-01T00:00:00Z/2021-02-01T00:00:00Z] failed because the segments specified in the compaction spec are not the same as the segments currently in use. Some new segments have been published or some segments have been removed. Please consider increasing your "skipOffsetFromLatest".

New Classes

Code Snippets

Throwing an Exception

e.g., in an extension, say kafka-emitter, an exception could be thrown as below:

final String topicName = ...;
try {
    // ...
    // Publish to kafka topic here
    // ...
} catch (InvalidTopicException topicEx) {
    throw new DruidTypedException(
        ErrorTypeParams.of(
            KafkaEmitterErrorTypes.MODULE_NAME, // "kafka-emitter"
            KafkaEmitterErrorTypes.INVALID_TOPIC, // integer error code
            // message arguments
            topicName),
        topicEx
    );
}

TaskStatus class (modified)

public class TaskStatus {

    // Existing field. Getting deprecated.
    private final @Nullable String errorMsg;

    // New field. Contains moduleName, errorCode and messageArgs
    // A TaskStatus with this field as null would fall back
    // to the existing field errorMsg
    private final @Nullable ErrorTypeParams errorTypeParams;

    //...
}

Creating a TaskStatus for a failed task

    TaskStatus status = TaskStatus.failure(
        taskId,
        ErrorTypeParams.of(moduleName, errorCode, messageArgs)
    );

Handling exceptions in future callbacks for task execution

public class TaskQueue {

    // ...
    private ListenableFuture<TaskStatus> attachCallbacks(final Task task) {
        // ...
        Futures.addCallback(new FutureCallback<TaskStatus> {

            @Override
            public void onSuccess(final TaskStatus successStatus) {
                // persist the successStatus
            }

            @Override
            public void onFailure(final Throwable t) {
                TaskStatus failureStatus;
                if (t is DruidTypedException) {
                    failureStatus = TaskStatus.failure(task.getId(), ((DruidTypedException) t).getErrorTypeParams());
                } else {
                    // build a generic error message here
                    failureStatus = TaskStatus.failure(task.getId(), ...);
                }
                // persist the failureStatus
            }
        });
    }

    // ...

}

Registering Error Types

Some of the snippets below use an extension kafka-emitter as an example.

Binding the ErrorTypeProvider

@Override
public void configure(Binder binder) {
    Multibinder.newSetBinder(binder, ErrorTypeProvider.class)
        .addBinding().to(KafkaEmitterErrorTypeProvider.class);
}

Listing the error types

public class KafkaEmitterErrorTypeProvider {

    @Override
    public String getModuleName() {
        return KafkaEmitterErrorTypes.MODULE_NAME; // "kafka-emitter";
    }

    @Override
    public List<ErrorType> getErrorTypes() {
        // return the list of all error types for this extension
        return Arrays.asList(
            ...
            // example error type for invalid topic
            ErrorType.of(
                KafkaEmitterErrorTypes.INVALID_TOPIC, 
                "The given topic name [%s] is invalid. Please provide a valid topic name.")
            ...
        );
    }

}

Mapping error codes to types (provided by core Druid):

public class ErrorMessageFormatter {

    private final Map<String, Map<Integer, ErrorType>> moduleToErrorTypes = ...;

    @Inject
    public ErrorMessageFormatter(
        Set<ErrorTypeProvider> errorTypeProviders) {

        for (ErrorTypeProvider provider : errorTypeProviders) {
            // Ensure that there are no module name clashes
            final String moduleName = provider.getModuleName();

            // Add all error types to the map
            Map<Integer, ErrorType> errorTypeMap = new ConcurrentHashMap<>();
            for (ErrorType errorType : provider.getErrorTypes()) {
                errorTypeMap.put(errorType.getCode(), errorType);
            }
        }
    }
}

Building the full error message (to serve UI requests)

public class ErrorMessageFormatter {

    ...

    public String getErrorMessage(ErrorTypeParams errorParams) {
        ErrorType errorType = moduleToErrorTypes
            .get(errorParams.getModuleName())
            .get(errorParams.getCode());

        return String.format(
            errorType.getMessageFormat(),
            errorParams.getMessageArgs().toArray());
    }

    ...

}

API Changes

Operational Impact

The changes would be backward compatible as none of the existing fields would be removed.

Case: Historical is upgraded to a new (error code aware) version but Broker is still on an older version In such cases, the Broker would not be aware of error codes. Thus the Historical should send a non-null errorMsg alongwith a non-null errorInfo. This ensures that a Broker on an old version would be able to use the errorMsg whereas a Broker on a newer version would be able to use the errorInfo.

Design Concerns

Task Failures

Ingestion and compaction tasks are managed by the Overlord. Thus, the Overlord needs to be aware of the error types to be able to serve task statuses over REST APIs.

Query Failures

Queries (SQL and native) are submitted over HTTP connections and the response can contain the detailed error message in case of failures. Thus the Broker need not be aware of the list of error types as there is no persistence of query status (and hence no requirement of persisting error codes and formatting the error messages when requested).

Extensions that are not loaded on Overlord

There are several extensions in Druid which are not loaded on the Overlord and run only on the Middle Managers/Peons. As these are not loaded on the Overlord, it is not aware of the error types that these extensions can throw.

The approach here can be similar to that in Query Failures above. While communicating to the Overlord, the Middle Manager can send back both the ErrorType object (denotes the category of the error) and the ErrorTypeParams (denotes a specific error event). The Overlord can then persist the received ErrorTypeParams in its task status while also adding an entry to its error type mappings.

Storing the mappings from Error Code to Error Type

In the design discussed above, the error types are maintained in-memory (in the Overlord). If extensions register too many error codes for rare scenarios, it would have an unnecessarily large memory usage which could have been used otherwise.

An alternative approach could be to persist the error types in the metadata store accessed via a small in-memory cache.

Pros:

Cons:

jihoonson commented 3 years ago

Hmm, it was a bit hard to understand what is the problem you want solve and how your proposal can address it. If I understand correctly, the ultimate goal is Richer error messages detailing what went wrong and possible actions for mitigation for Easier debugging and RCA (without looking at server logs). And what you are proposing seems actually 2 things, 1) error propagation using the error code and 2) a unified error handling and reporting system based on 1). The ultimate goal sounds reasonable if we are not doing it already, but what it's unclear to me is how the unified system you are proposing achieves the goal. To be more precise, the error propagation using the error code seems enough to me to achieve the goal. So, here is my question around the point 2).

Based on this, I would suggest focusing on the ingestion side in this proposal. What do you think?

Besides the comment on introducing a unified system, here are my comments on other parts.

  • Specifying the severity of errors and other potential side effects
  • Hiding implementation and other sensitive details from the end user
  1. Can you elaborate more on these? What is the problem of the current status you think? And how does the proposal address the problem?

  2. How would the proposed error-code-based system integrate with the current ingestion system? Looking at the code snippets, it seems like we have to modify all codes where throw exceptions to convert them to DruidTypedException? If this is what you are proposing, it seems pretty error-prone because it's easy to forget catching and converting exceptions especially when they are unchecked exceptions. Also, please add a code snippet of TaskStatus after this change. It will help to understand the proposal.

Reference containing rich documentation for each error code, thus greatly simplifying debugging

  1. This is one of the advantages you listed. However, I'm not quite sure how it can simplify debugging because the error code will not be exposed to users? Can you add some more details on this?

  2. Thinking about what the "error code" actually is, is it simply a key to choose the proper error formatter when the error is exposed to users?

  3. In DruidTypedException, what does "typed" mean?

kfaraz commented 3 years ago

Thanks a lotfor the feedback, @jihoonson ! Sorry if the description was somewhat vague.

If I understand correctly, the ultimate goal is Richer error messages detailing what went wrong and possible actions for mitigation for Easier debugging and RCA (without looking at server logs) Yes, your understanding is correct.

By a a unified error handling and reporting system, I simply mean using the error codes and the corresponding exception wherever applicable.

From the user perspective, richer error message for easier debugging is useful when the error is something that the user can work around by adjusting some user settings.

Agreed.

The Druid query system already has an error-code based error reporting system (see Query exception class). In the query system, there are a few user-facing errors and the current error reporting system has been working well. I think it already provides richer error message with a potential workaround.

The proposed error code system closely resembles the existing system we have for query failures. We already have sufficient error codes and error messages for such failures. So, as you suggest, we can focus only on ingestion task failures in this proposal.

Thinking about what the "error code" actually is, is it simply a key to choose the proper error formatter when the error is exposed to users?

Yes, "Error Code" is primarily a key to choose the correct error formatter while showing errors to the users. It is meant to serve two more (minor) purposes:

Specifying the severity of errors and other potential side effects Hiding implementation and other sensitive details from the end user

In some places, we currently expose the names of the Exception class in the error message itself. Through this proposal, we can prevent leaks of such information as there would be curated messages corresponding to each known code, and any non-coded error could be represented as a miscellaneous error.

In DruidTypedException, what does "typed" mean?

The Type refers to the fact that this exception is raised for a specific ErrorType (contains code, moduleName and message format). Sorry, I couldn't think of a more appropriate name for this.

Please let me know what you think.

kfaraz commented 3 years ago

How would the proposed error-code-based system integrate with the current ingestion system? Looking at the code snippets, it seems like we have to modify all codes where throw exceptions to convert them to DruidTypedException? If this is what you are proposing, it seems pretty error-prone because it's easy to forget catching and converting exceptions especially when they are unchecked exceptions. Also, please add a code snippet of TaskStatus after this change. It will help to understand the proposal.

In case of ingestion, the catch-all error handling is already being done in TaskQueue. We would need to modify it to handle the new type of exception as below. This would also require converting the exceptions that we throw from the known failure points to the new exception i.e. the DruidTypedException discussed above.

public class TaskQueue {

    // ...
    private ListenableFuture<TaskStatus> attachCallbacks(final Task task) {
        // ...
        Futures.addCallback(new FutureCallback<TaskStatus> {

            @Override
            public void onSuccess(final TaskStatus successStatus) {
                // persist the successStatus
            }

            @Override
            public void onFailure(final Throwable t) {
                TaskStatus failureStatus;
                if (t instanceof DruidTypedException) {
                    failureStatus = TaskStatus.failure(task.getId(), ((DruidTypedException) t).getErrorTypeParams());
                } else {
                    // build a generic error message here
                    failureStatus = TaskStatus.failure(task.getId(), ...);
                }
                // persist the failureStatus
            }
        });
    }

    // ...

}

Alongwith this, any other place where we return a TaskStatus.failure() would get modified as below:

    TaskStatus status = TaskStatus.failure(
        taskId,
        ErrorTypeParams.of(moduleName, errorCode, messageArgs)
    );

The primary change to the TaskStatus class is as under (also added to the proposal above):

public class TaskStatus {

     // Existing field
     private final @Nullable String errorMsg;

    // New field. Contains moduleName, errorCode and messageArgs.
    // A TaskStatus with this field as null would fall back
    // to the existing field errorMsg
    private final @Nullable ErrorTypeParams errorTypeParams;

    //...
}
kfaraz commented 3 years ago

This is one of the advantages you listed. However, I'm not quite sure how it can simplify debugging because the error code will not be exposed to users? Can you add some more details on this?

This is more of an afterthought.

You are correct that in most cases, error codes would not be exposed to users. But in the corner case where a certain error code is not identified by the Overlord (case discussed above in Extensions not loaded on the Overlord), we might display a generic error message with the error code such as Error occurred on middle manager while running task. Error Code: kafka-indexer.offset.outofrange.

The error codes can also be present in logs rather than the verbose messages themselves. To be able to translate these without having to call an API of the Overlord, we would benefit from having a documented set of known error codes.

jihoonson commented 3 years ago

@kfaraz thanks for adding more details. Please find my new comments below.

In DruidTypedException, what does "typed" mean?

The Type refers to the fact that this exception is raised for a specific ErrorType (contains code, moduleName and message format). Sorry, I couldn't think of a more appropriate name for this.

  1. No worries. I think it should be something else though as now we are going to focus on the ingestion side. Maybe IngestionException?

  2. FYI, TaskQueue.attachCallbacks() is used only in the local mode of overlord (see druid.indexer.runner.type in https://druid.apache.org/docs/latest/configuration/index.html#overlord-operations). The local mode is not used in production, but we should handle it as you suggested.

  3. In peons and indexers, tasks can currently throw any exception anywhere. Peons and indexers catch those exceptions and report task failures to the overlord. I think you will want to change this to not allowing tasks to throw any exceptions, but only the new type of exceptions (DruidTypedException or IngestionException) because you should be able to convert them to TaskStatus.failure(). This means you will have to modify all places where the task throws any exception to use the new exception type. I'm not sure if there is a less invasive way than this..

  4. What do you think about exposing error codes to users? We cannot avoid it in some cases anyway such as where the extension is unloaded and the overlord doesn't understand the error code of existing task statuses. It seems more consistent to me but not harm. Maybe TaskStatus can only show the error code and TaskReport can show the full details of the error.

  5. Semi-related, have you thought about adding the new CANCELED state for ingestion tasks? I think you may want to do it because the error code doesn't seem reasoable for canceled tasks.

kfaraz commented 3 years ago

Thanks a lot, @jihoonson ! All your suggestions sound great. I agree on all of them. I will spend some more time on peons and indexers and see if there could be a less invasive method of using the new type of Exception.