apache / dubbo

The java implementation of Apache Dubbo. An RPC and microservice framework.
https://dubbo.apache.org/
Apache License 2.0
40.42k stars 26.41k forks source link

dubbo远程调用,卡调用主线程了, 提供方已经触发超时了但是消费方 调用主线程还是卡住了挂起了 #13088

Closed flybh521 closed 12 months ago

flybh521 commented 1 year ago

Environment

Dubbo version: 3.2.0 Operating System version: centerOS7 Java version: 1.8

"Thread-52-EventThread" #165 daemon prio=5 os_prio=0 tid=0x00007fc7403c6800 nid=0x478c waiting on condition [0x00007fc5b13aa000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method)

icodening commented 1 year ago

可以尝试使用3.2.6版本,该版本在waitAndDrain方法上增加了超时时间

kimmking commented 1 year ago

蛮奇怪的,提供方如果已经超时,那么应该返回异常信息给调用方了,按理调用方也不会一直阻塞。必定在什么地方存在了其他问题?

EarthChen commented 1 year ago

蛮奇怪的,提供方如果已经超时,那么应该返回异常信息给调用方了,按理调用方也不会一直阻塞。必定在什么地方存在了其他问题?

可以用最新版本再次尝试,应该已经修复了

QingHongYu commented 1 year ago

个人认为,这个和dubbo的消费端线程模型及dubbo consumer请求超时检查机制有关。
dubbo consumer使用HashedWheelTimer来检查请求是否超时。 dubbo的消费端线程模型如下: 屏幕截图 Biz Thread在第二步被唤醒有两种场景:
1.HashedWheelTimer检测到请求超时。
2.provider返回了请求结果并被consumer正确处理。
我们先看第一种场景。
在第一步发送请求给provider之前,dubbo会创建一个TimeoutCheckTask来检查请求超时情况。然后将请求发送给provider。代码如下:

 @Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        Request req;
        if (request instanceof Request) {
            req = (Request) request;
        } else {
            // create request.
            req = new Request();
            req.setVersion(Version.getProtocolVersion());
            req.setTwoWay(true);
            req.setData(request);
        }
        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
        try {
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }

在 DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor)这一步中创建了一个TimeoutCheckTask,并将其提交给HashedWheelTimer,代码如下:

    public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {
        final DefaultFuture future = new DefaultFuture(channel, request, timeout);
        future.setExecutor(executor);
        // timeout check
        timeoutCheck(future);
        return future;
    }
    /**
     * check time out of the future
     */
    private static void timeoutCheck(DefaultFuture future) {
        TimeoutCheckTask task = new TimeoutCheckTask(future.getId());
        future.timeoutCheckTask = TIME_OUT_TIMER.get().newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);
    }

其中newTimeout方法体如下:

    @Override
    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }

        long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();

        if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
            pendingTimeouts.decrementAndGet();
            throw new RejectedExecutionException("Number of pending timeouts ("
                + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
                + "timeouts (" + maxPendingTimeouts + ")");
        }

        start();

        // Add the timeout to the timeout queue which will be processed on the next tick.
        // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

        // Guard against overflow.
        if (delay > 0 && deadline < 0) {
            deadline = Long.MAX_VALUE;
        }
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        timeouts.add(timeout);
        return timeout;
    }

假设在 start(); 和 long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;这两句代码中间发生了时钟回拨,导致deadline<0 ,那么TimeoutCheckTask将永远无法执行本次请求超时检查任务。 换句话说,Biz Thread被唤醒的场景中第一种场景无法达成了。

接下来看看第二种场景:
让我们重新看下消费端线程模型: 屏幕截图

当Biz Thread在第二步等待时,假设provider已经收到请求并开始进行业务逻辑处理,但是在进行业务处理时宕机了,那么消费方将无法收到来着provider的处理结果。Biz Thread被唤醒的第二种场景也无法达成了。
Biz Thread无人唤醒了,卡住了。

flybh521 commented 1 year ago

可以使用3.2.6版本,该版本在waitAndDrain方法上尝试增加了超时时间

EarthChen commented 1 year ago

可以使用3.2.6版本,该版本在waitAndDrain方法上尝试增加了超时时间

请问新版本还是没有解决吗?

flybh521 commented 1 year ago

可以使用3.2.6版本,该版本在waitAndDrain方法上尝试增加了超时时间

请问新版本还是没有解决吗?

目前已经升级版本,后续待观察,目前未出现卡主的情况。

AlbumenJ commented 1 year ago

个人认为,这个和dubbo的消费端线程模型及dubbo consumer请求超时检查机制有关。 dubbo consumer使用HashedWheelTimer来检查请求是否超时。 dubbo的消费端线程模型如下: 屏幕截图 Biz Thread在第二步被唤醒有两种场景: 1.HashedWheelTimer检测到请求超时。 2.provider返回了请求结果并被consumer正确处理。 我们先看第一种场景。 在第一步发送请求给provider之前,dubbo会创建一个TimeoutCheckTask来检查请求超时情况。然后将请求发送给provider。代码如下:

 @Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        Request req;
        if (request instanceof Request) {
            req = (Request) request;
        } else {
            // create request.
            req = new Request();
            req.setVersion(Version.getProtocolVersion());
            req.setTwoWay(true);
            req.setData(request);
        }
        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
        try {
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }

在 DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor)这一步中创建了一个TimeoutCheckTask,并将其提交给HashedWheelTimer,代码如下:

    public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {
        final DefaultFuture future = new DefaultFuture(channel, request, timeout);
        future.setExecutor(executor);
        // timeout check
        timeoutCheck(future);
        return future;
    }
    /**
     * check time out of the future
     */
    private static void timeoutCheck(DefaultFuture future) {
        TimeoutCheckTask task = new TimeoutCheckTask(future.getId());
        future.timeoutCheckTask = TIME_OUT_TIMER.get().newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);
    }

其中newTimeout方法体如下:

    @Override
    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }

        long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();

        if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
            pendingTimeouts.decrementAndGet();
            throw new RejectedExecutionException("Number of pending timeouts ("
                + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
                + "timeouts (" + maxPendingTimeouts + ")");
        }

        start();

        // Add the timeout to the timeout queue which will be processed on the next tick.
        // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

        // Guard against overflow.
        if (delay > 0 && deadline < 0) {
            deadline = Long.MAX_VALUE;
        }
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        timeouts.add(timeout);
        return timeout;
    }

假设在 start(); 和 long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;这两句代码中间发生了时钟回拨,导致deadline<0 ,那么TimeoutCheckTask将永远无法执行本次请求超时检查任务。 换句话说,Biz Thread被唤醒的场景中第一种场景无法达成了。

接下来看看第二种场景: 让我们重新看下消费端线程模型: 屏幕截图

当Biz Thread在第二步等待时,假设provider已经收到请求并开始进行业务逻辑处理,但是在进行业务处理时宕机了,那么消费方将无法收到来着provider的处理结果。Biz Thread被唤醒的第二种场景也无法达成了。 Biz Thread无人唤醒了,卡住了。

这个的根本原因其实是 3.2 早期的版本 ThreadlessExecutor 可能存在无限期的等待导致的。 按正常 AsyncToSync 的等待过程中通过 future 的超时是会自动结束的,但是 ThreadlessExecutor 由于一次优化导致可能 executor 不会结束,最终超时不生效了。

QingHongYu commented 1 year ago

确实,当ThreadlessExecutor 不再无限期等待时,这个问题也就消失了

个人认为,这个和dubbo的消费端线程模型及dubbo consumer请求超时检查机制有关。 dubbo consumer使用HashedWheelTimer来检查请求是否超时。 dubbo的消费端线程模型如下: 屏幕截图 Biz Thread在第二步被唤醒有两种场景: 1.HashedWheelTimer检测到请求超时。 2.provider返回了请求结果并被consumer正确处理。 我们先看第一种场景。 在第一步发送请求给provider之前,dubbo会创建一个TimeoutCheckTask来检查请求超时情况。然后将请求发送给provider。代码如下:

 @Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        Request req;
        if (request instanceof Request) {
            req = (Request) request;
        } else {
            // create request.
            req = new Request();
            req.setVersion(Version.getProtocolVersion());
            req.setTwoWay(true);
            req.setData(request);
        }
        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
        try {
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }

在 DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor)这一步中创建了一个TimeoutCheckTask,并将其提交给HashedWheelTimer,代码如下:

    public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {
        final DefaultFuture future = new DefaultFuture(channel, request, timeout);
        future.setExecutor(executor);
        // timeout check
        timeoutCheck(future);
        return future;
    }
    /**
     * check time out of the future
     */
    private static void timeoutCheck(DefaultFuture future) {
        TimeoutCheckTask task = new TimeoutCheckTask(future.getId());
        future.timeoutCheckTask = TIME_OUT_TIMER.get().newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);
    }

其中newTimeout方法体如下:

    @Override
    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }

        long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();

        if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
            pendingTimeouts.decrementAndGet();
            throw new RejectedExecutionException("Number of pending timeouts ("
                + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
                + "timeouts (" + maxPendingTimeouts + ")");
        }

        start();

        // Add the timeout to the timeout queue which will be processed on the next tick.
        // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

        // Guard against overflow.
        if (delay > 0 && deadline < 0) {
            deadline = Long.MAX_VALUE;
        }
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        timeouts.add(timeout);
        return timeout;
    }

假设在 start(); 和 long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;这两句代码中间发生了时钟回拨,导致deadline<0 ,那么TimeoutCheckTask将永远无法执行本次请求超时检查任务。 换句话说,Biz Thread被唤醒的场景中第一种场景无法达成了。 接下来看看第二种场景: 让我们重新看下消费端线程模型: 屏幕截图 当Biz Thread在第二步等待时,假设provider已经收到请求并开始进行业务逻辑处理,但是在进行业务处理时宕机了,那么消费方将无法收到来着provider的处理结果。Biz Thread被唤醒的第二种场景也无法达成了。 Biz Thread无人唤醒了,卡住了。

这个的根本原因其实是 3.2 早期的版本 ThreadlessExecutor 可能存在无限期的等待导致的。 按正常 AsyncToSync 的等待过程中通过 future 的超时是会自动结束的,但是 ThreadlessExecutor 由于一次优化导致可能 executor 不会结束,最终超时不生效了。

flybh521 commented 12 months ago

现在版本是 3.2.0 我可以在一个注册中心下 一部分 升级到3.2.6版本吗 有没有隐患 还是必须所有的同时升级到3.2.6版本

flybh521 commented 12 months ago

个人认为,这个和dubbo的消费端线程模型及dubbo consumer请求超时检查机制有关。 dubbo consumer使用HashedWheelTimer来检查请求是否超时。 dubbo的消费端线程模型如下: 屏幕截图 Biz Thread在第二步被唤醒有两种场景: 1.HashedWheelTimer检测到请求超时。 2.provider返回了请求结果并被consumer正确处理。 我们先看第一种场景。 在第一步发送请求给provider之前,dubbo会创建一个TimeoutCheckTask来检查请求超时情况。然后将请求发送给provider。代码如下:

 @Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        Request req;
        if (request instanceof Request) {
            req = (Request) request;
        } else {
            // create request.
            req = new Request();
            req.setVersion(Version.getProtocolVersion());
            req.setTwoWay(true);
            req.setData(request);
        }
        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
        try {
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }

在 DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor)这一步中创建了一个TimeoutCheckTask,并将其提交给HashedWheelTimer,代码如下:

    public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {
        final DefaultFuture future = new DefaultFuture(channel, request, timeout);
        future.setExecutor(executor);
        // timeout check
        timeoutCheck(future);
        return future;
    }
    /**
     * check time out of the future
     */
    private static void timeoutCheck(DefaultFuture future) {
        TimeoutCheckTask task = new TimeoutCheckTask(future.getId());
        future.timeoutCheckTask = TIME_OUT_TIMER.get().newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);
    }

其中newTimeout方法体如下:

    @Override
    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }

        long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();

        if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
            pendingTimeouts.decrementAndGet();
            throw new RejectedExecutionException("Number of pending timeouts ("
                + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
                + "timeouts (" + maxPendingTimeouts + ")");
        }

        start();

        // Add the timeout to the timeout queue which will be processed on the next tick.
        // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

        // Guard against overflow.
        if (delay > 0 && deadline < 0) {
            deadline = Long.MAX_VALUE;
        }
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        timeouts.add(timeout);
        return timeout;
    }

假设在 start(); 和 long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;这两句代码中间发生了时钟回拨,导致deadline<0 ,那么TimeoutCheckTask将永远无法执行本次请求超时检查任务。 换句话说,Biz Thread被唤醒的场景中第一种场景无法达成了。 接下来看看第二种场景: 让我们重新看下消费端线程模型: 屏幕截图 当Biz Thread在第二步等待时,假设provider已经收到请求并开始进行业务逻辑处理,但是在进行业务处理时宕机了,那么消费方将无法收到来着provider的处理结果。Biz Thread被唤醒的第二种场景也无法达成了。 Biz Thread无人唤醒了,卡住了。

这个的根本原因其实是 3.2 早期的版本 ThreadlessExecutor 可能存在无限期的等待导致的。 按正常 AsyncToSync 的等待过程中通过 future 的超时是会自动结束的,但是 ThreadlessExecutor 由于一次优化导致可能 executor 不会结束,最终超时不生效了。

现在版本是 3.2.0 我可以在一个注册中心下 一部分 升级到3.2.6版本吗 有没有隐患 还是必须所有的同时升级到3.2.6版本

AlbumenJ commented 12 months ago

个人认为,这个和dubbo的消费端线程模型及dubbo consumer请求超时检查机制有关。 dubbo consumer使用HashedWheelTimer来检查请求是否超时。 dubbo的消费端线程模型如下: 屏幕截图 Biz Thread在第二步被唤醒有两种场景: 1.HashedWheelTimer检测到请求超时。 2.provider返回了请求结果并被consumer正确处理。 我们先看第一种场景。 在第一步发送请求给provider之前,dubbo会创建一个TimeoutCheckTask来检查请求超时情况。然后将请求发送给provider。代码如下:

 @Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        Request req;
        if (request instanceof Request) {
            req = (Request) request;
        } else {
            // create request.
            req = new Request();
            req.setVersion(Version.getProtocolVersion());
            req.setTwoWay(true);
            req.setData(request);
        }
        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
        try {
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }

在 DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor)这一步中创建了一个TimeoutCheckTask,并将其提交给HashedWheelTimer,代码如下:

    public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {
        final DefaultFuture future = new DefaultFuture(channel, request, timeout);
        future.setExecutor(executor);
        // timeout check
        timeoutCheck(future);
        return future;
    }
    /**
     * check time out of the future
     */
    private static void timeoutCheck(DefaultFuture future) {
        TimeoutCheckTask task = new TimeoutCheckTask(future.getId());
        future.timeoutCheckTask = TIME_OUT_TIMER.get().newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);
    }

其中newTimeout方法体如下:

    @Override
    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }

        long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();

        if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
            pendingTimeouts.decrementAndGet();
            throw new RejectedExecutionException("Number of pending timeouts ("
                + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
                + "timeouts (" + maxPendingTimeouts + ")");
        }

        start();

        // Add the timeout to the timeout queue which will be processed on the next tick.
        // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

        // Guard against overflow.
        if (delay > 0 && deadline < 0) {
            deadline = Long.MAX_VALUE;
        }
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        timeouts.add(timeout);
        return timeout;
    }

假设在 start(); 和 long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;这两句代码中间发生了时钟回拨,导致deadline<0 ,那么TimeoutCheckTask将永远无法执行本次请求超时检查任务。 换句话说,Biz Thread被唤醒的场景中第一种场景无法达成了。 接下来看看第二种场景: 让我们重新看下消费端线程模型: 屏幕截图 当Biz Thread在第二步等待时,假设provider已经收到请求并开始进行业务逻辑处理,但是在进行业务处理时宕机了,那么消费方将无法收到来着provider的处理结果。Biz Thread被唤醒的第二种场景也无法达成了。 Biz Thread无人唤醒了,卡住了。

这个的根本原因其实是 3.2 早期的版本 ThreadlessExecutor 可能存在无限期的等待导致的。 按正常 AsyncToSync 的等待过程中通过 future 的超时是会自动结束的,但是 ThreadlessExecutor 由于一次优化导致可能 executor 不会结束,最终超时不生效了。

现在版本是 3.2.0 我可以在一个注册中心下 一部分 升级到3.2.6版本吗 有没有隐患 还是必须所有的同时升级到3.2.6版本

分批升级即可

flybh521 commented 12 months ago

个人认为,这个和dubbo的消费端线程模型及dubbo consumer请求超时检查机制有关。 dubbo consumer使用HashedWheelTimer来检查请求是否超时。 dubbo的消费端线程模型如下: 屏幕截图 Biz Thread在第二步被唤醒有两种场景: 1.HashedWheelTimer检测到请求超时。 2.provider返回了请求结果并被consumer正确处理。 我们先看第一种场景。 在第一步发送请求给provider之前,dubbo会创建一个TimeoutCheckTask来检查请求超时情况。然后将请求发送给provider。代码如下:

 @Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        Request req;
        if (request instanceof Request) {
            req = (Request) request;
        } else {
            // create request.
            req = new Request();
            req.setVersion(Version.getProtocolVersion());
            req.setTwoWay(true);
            req.setData(request);
        }
        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
        try {
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }

在 DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor)这一步中创建了一个TimeoutCheckTask,并将其提交给HashedWheelTimer,代码如下:

    public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {
        final DefaultFuture future = new DefaultFuture(channel, request, timeout);
        future.setExecutor(executor);
        // timeout check
        timeoutCheck(future);
        return future;
    }
    /**
     * check time out of the future
     */
    private static void timeoutCheck(DefaultFuture future) {
        TimeoutCheckTask task = new TimeoutCheckTask(future.getId());
        future.timeoutCheckTask = TIME_OUT_TIMER.get().newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);
    }

其中newTimeout方法体如下:

    @Override
    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }

        long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();

        if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
            pendingTimeouts.decrementAndGet();
            throw new RejectedExecutionException("Number of pending timeouts ("
                + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
                + "timeouts (" + maxPendingTimeouts + ")");
        }

        start();

        // Add the timeout to the timeout queue which will be processed on the next tick.
        // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

        // Guard against overflow.
        if (delay > 0 && deadline < 0) {
            deadline = Long.MAX_VALUE;
        }
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        timeouts.add(timeout);
        return timeout;
    }

假设在 start(); 和 long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;这两句代码中间发生了时钟回拨,导致deadline<0 ,那么TimeoutCheckTask将永远无法执行本次请求超时检查任务。 换句话说,Biz Thread被唤醒的场景中第一种场景无法达成了。 接下来看看第二种场景: 让我们重新看下消费端线程模型: 屏幕截图 当Biz Thread在第二步等待时,假设provider已经收到请求并开始进行业务逻辑处理,但是在进行业务处理时宕机了,那么消费方将无法收到来着provider的处理结果。Biz Thread被唤醒的第二种场景也无法达成了。 Biz Thread无人唤醒了,卡住了。

这个的根本原因其实是 3.2 早期的版本 ThreadlessExecutor 可能存在无限期的等待导致的。 按正常 AsyncToSync 的等待过程中通过 future 的超时是会自动结束的,但是 ThreadlessExecutor 由于一次优化导致可能 executor 不会结束,最终超时不生效了。

现在版本是 3.2.0 我可以在一个注册中心下 一部分 升级到3.2.6版本吗 有没有隐患 还是必须所有的同时升级到3.2.6版本

分批升级即可

非常感谢

yimaotou commented 8 months ago

请问一下,这个问题的复现步骤是怎样的?