Netflix / eureka

AWS Service registry for resilient mid-tier load balancing and failover.
Apache License 2.0
12.37k stars 3.74k forks source link

NullPointerException in maybeReadTimeOut() method can cause Eureka cluster registration inconsistency #1497

Open laniakea1990 opened 1 year ago

laniakea1990 commented 1 year ago

Description:

An issue has been discovered where an uncommon but occasional exception can occur in the maybeReadTimeOut() method in ReplicationTaskProcessor.java. This can lead to the batchingDispatcher thread pool in the PeerEurekaNode class being exhausted, resulting in registration inconsistencies between nodes in the Eureka cluster.

To reproduce the issue, a 3-node Eureka cluster (node1, node2, node3) was deployed on a Kubernetes container cloud with relevant peer node configurations. In certain exceptional scenarios, the maybeReadTimeOut() method in ReplicationTaskProcessor.java can throw a NullPointerException in its String message = e.getMessage().toLowerCase() line. This exception is caught in the process() method, which passes it to the BatchWorkerRunnable inner class. Each occurrence of the exception causes one less working thread in the batchingDispatcher. The batchingDispatcher's initial working thread count is 20 (read from config.getMaxThreadsForPeerReplication()). When all 20 working threads on Eureka node node1 are exhausted due to the NullPointerException, this causes the Eureka node to fail to execute synchronization registration instance information and other tasks.

some essential code list below:

ReplicationTaskProcessor.java

 public ProcessingResult process(List<ReplicationTask> tasks) {
        ReplicationList list = createReplicationListOf(tasks);
        try {
            EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);
            int statusCode = response.getStatusCode();
            if (!isSuccess(statusCode)) {
                if (statusCode == 503) {
                    logger.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId);
                    return ProcessingResult.Congestion;
                } else {
                    // Unexpected error returned from the server. This should ideally never happen.
                    logger.error("Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size());
                    return ProcessingResult.PermanentError;
                }
            } else {
                handleBatchResponse(tasks, response.getEntity().getResponseList());
            }
        } catch (Throwable e) {
            if (maybeReadTimeOut(e)) {
                logger.error("It seems to be a socket read timeout exception, it will retry later. if it continues to happen and some eureka node occupied all the cpu time, you should set property 'eureka.server.peer-node-read-timeout-ms' to a bigger value", e);
                //read timeout exception is more Congestion then TransientError, return Congestion for longer delay 
                return ProcessingResult.Congestion;
            } else if (isNetworkConnectException(e)) {
                logNetworkErrorSample(null, e);
                return ProcessingResult.TransientError;
            } else {
                logger.error("Not re-trying this exception because it does not seem to be a network exception", e);
                return ProcessingResult.PermanentError;
            }
        }
        return ProcessingResult.Success;
    }

private static boolean maybeReadTimeOut(Throwable e) {
        do {
            if (IOException.class.isInstance(e)) {
               ### !!!!!this line occurs NullPointerException in our prod env.
                String message = e.getMessage().toLowerCase();
                Matcher matcher = READ_TIME_OUT_PATTERN.matcher(message);
                if(matcher.find()) {
                    return true;
                }
            }
            e = e.getCause();
        } while (e != null);
        return false;
}

TaskExecutors.java 
static <ID, T> TaskExecutors<ID, T> batchExecutors(final String name,
                                                       int workerCount,
                                                       final TaskProcessor<T> processor,
                                                       final AcceptorExecutor<ID, T> acceptorExecutor) {
        final AtomicBoolean isShutdown = new AtomicBoolean();
        final TaskExecutorMetrics metrics = new TaskExecutorMetrics(name);
        registeredMonitors.put(name, metrics);
        return new TaskExecutors<>(idx -> new BatchWorkerRunnable<>("TaskBatchingWorker-" + name + '-' + idx, isShutdown, metrics, processor, acceptorExecutor), workerCount, isShutdown);
}
static class BatchWorkerRunnable<ID, T> extends WorkerRunnable<ID, T> {

        BatchWorkerRunnable(String workerName,
                            AtomicBoolean isShutdown,
                            TaskExecutorMetrics metrics,
                            TaskProcessor<T> processor,
                            AcceptorExecutor<ID, T> acceptorExecutor) {
            super(workerName, isShutdown, metrics, processor, acceptorExecutor);
        }

        @Override
        public void run() {
            try {
                while (!isShutdown.get()) {
                    List<TaskHolder<ID, T>> holders = getWork();
                    metrics.registerExpiryTimes(holders);

                    List<T> tasks = getTasksOf(holders);
                    ProcessingResult result = processor.process(tasks);
                    switch (result) {
                        case Success:
                            break;
                        case Congestion:
                        case TransientError:
                            taskDispatcher.reprocess(holders, result);
                            break;
                        case PermanentError:
                            logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName);
                    }
                    metrics.registerTaskResult(result, tasks.size());
                }
            } catch (InterruptedException e) {
                // Ignore
            } catch (Throwable e) {
                // Safe-guard, so we never exit this loop in an uncontrolled way.
               ###  !!! Log 'Discovery WorkerThread error' print some times in our prod env.
                logger.warn("Discovery WorkerThread error", e);
            }
        }
}

Environment:

Kubernetes container cloud Eureka version: 1.10.7 Java version: 1.8.0_202

niu-dali commented 1 year ago

Cannot invoke "String.toLowerCase()" because the return value of "java.lang.Throwable.getMessage()" is null

niu-dali commented 1 year ago

2023-04-21 22:08:00.705 WARN 10165 --- [1.117.233.35-19] c.n.eureka.util.batcher.TaskExecutors : Discovery WorkerThread error

java.lang.NullPointerException: Cannot invoke "String.toLowerCase()" because the return value of "java.lang.Throwable.getMessage()" is null at com.netflix.eureka.cluster.ReplicationTaskProcessor.maybeReadTimeOut(ReplicationTaskProcessor.java:196) at com.netflix.eureka.cluster.ReplicationTaskProcessor.process(ReplicationTaskProcessor.java:95) at com.netflix.eureka.util.batcher.TaskExecutors$BatchWorkerRunnable.run(TaskExecutors.java:190) at java.base/java.lang.Thread.run(Thread.java:833)