weibocom / motan

A cross-language remote procedure call(RPC) framework for rapid development of high performance distributed services.
Other
5.89k stars 1.78k forks source link

NettyClient 超时判断 #54

Closed ghost closed 8 years ago

ghost commented 8 years ago

刚开始研究motan第二天,提得比较鸡肋的地方请多包涵,能力不够没好意思提pull request。

NettyClient中使用了一个定时器线程扫描所有的异步请求是否超时,可否改用时间轮算法注册回调函数的办法判断超时?修改如下:

- timeMonitorFuture = scheduledExecutor.scheduleWithFixedDelay(
-               new TimeoutMonitor("timeout_monitor_" + url.getHost() + "_" + url.getPort()),
-               MotanConstants.NETTY_TIMEOUT_TIMER_PERIOD, -MotanConstants.NETTY_TIMEOUT_TIMER_PERIOD,
-               TimeUnit.MILLISECONDS);
+ timeWheel = new TimeWheel();
+ timeWheel.begin();

//关闭client
public synchronized void close(int timeout) {
  ...
- timeMonitorFuture.cancel(true);
+ timeWheel.stop();
  ...
}

public void registerCallback(final long requestId, final NettyResponseFuture nettyResponseFuture) {
...

    this.callbackMap.put(requestId, nettyResponseFuture);
//这里添加超时回调函数
        timeWheel.add(nettyResponseFuture.getTimeout(), new TimeWheel.ExpiringHandler() {
            @Override
            public void onExpire() {
                callbackMap.remove(requestId);
                nettyResponseFuture.cancel();
            }
        });
}
//Hash时间轮:
public class TimeWheel implements Runnable {

    private int timePointer;

    private int maxTTL;

    private int tickDuration;

    private AtomicBoolean stop = new AtomicBoolean(false);

    private ConcurrentHashMap<Integer, Set<ExpiringHandler>> wheel = new ConcurrentHashMap<Integer, Set<ExpiringHandler>>();

    public TimeWheel(int tickDuration, int maxTTL) {
        this.tickDuration = tickDuration;
        this.maxTTL = maxTTL;
    }

    public TimeWheel() {
        this(100, 60 * 60 * 1000);
    }

    public void begin() {
        Executors.newSingleThreadExecutor().execute(this);
    }

    public void add(int ttl, ExpiringHandler handler) {
        if (ttl >= maxTTL)
            throw new MotanFrameworkException("Timeout cannot be more than maxTTL.");
        if(ttl / tickDuration <= 0)
            throw new MotanFrameworkException("Too small ttl.");
        int position = (timePointer + ttl / tickDuration * tickDuration) % maxTTL;
        wheel.putIfAbsent(position, new ConcurrentHashSet<ExpiringHandler>());
        wheel.get(position).add(handler);
    }

    public void stop() {
        stop.getAndSet(true);
    }

    @Override
    public void run() {
        while (!stop.get()) {
            long c0 = System.currentTimeMillis();
            Set<ExpiringHandler> handlers = wheel.remove(timePointer);
            if (handlers != null) {
                for (ExpiringHandler handler : handlers)
                    handler.onExpire();
            }
            if (timePointer + tickDuration > maxTTL)
                timePointer = timePointer + tickDuration - maxTTL;
            else
                timePointer += tickDuration;
            long offset = System.currentTimeMillis() - c0;
            if (offset > tickDuration)
                continue;
            try {
                Thread.sleep(tickDuration - offset);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public interface ExpiringHandler {

        void onExpire();
    }

}
qdaxb commented 8 years ago

这里的超时机制确实有优化的空间,从目前motan的使用情况来说超时时间并不是相当精确,平均下来每次判定超时大概会有10ms - 20ms左右的延迟,并且每次轮询都是o(n)复杂度,也会有少许额外的cpu负担。

不过issue里的代码似乎有并发问题,wheel.get(position).add(handler);Set<ExpiringHandler> handlers = wheel.remove(timePointer);如果恰好以相反的顺序执行的话会出现空指针,并且使用ConcurrentHashset(底层应该是ConcurrentHashMap)的迭代器迭代的同时进行更新操作,我记得应该有可能出现元素被漏掉的情况。

我记得netty自带了HashedWheelTimer,一些开源的项目里应该也有自己的HWT实现,但是无论使用哪种HWT实现,都会加重对内存的消耗,并且在异常的情况下甚至有可能造成内存溢出,对于这个修改我们会更慎重一些,需要完全确认这个修改不会影响到服务稳定性。

总的来说,我觉得@syndory 建议的方向不错,是否可以参考使用netty的HWT重新实现这个功能并提一个PR?

ping @rayzhang0603 @half-dead

ghost commented 8 years ago

@qdaxb 你好,

不过issue里的代码似乎有并发问题,wheel.get(position).add(handler);和Set handlers = wheel.remove(timePointer);如果恰好以相反的顺序执行的话会出现空指针

上面这一段我看懂了,我毕竟还是too young。

并且使用ConcurrentHashset(底层应该是ConcurrentHashMap)的迭代器迭代的同时进行更新操作,我记得应该有可能出现元素被漏掉的情况。

我想问一下这段的问题在哪里,我没有太理解,如果当没有发生上面说的情况下,当我remove一个map中的set时,就没有其它线程能够访问这个set了(add方法会重新添加一个set),只有在run方法中迭代更新。 如果有错误的地方,望指正。

wheel.putIfAbsent(position, new ConcurrentHashSet<ExpiringHandler>());
wheel.get(position).add(handler);
...
Set<ExpiringHandler> handlers = wheel.remove(timePointer);
if (handlers != null) {
    for (ExpiringHandler handler : handlers)
        handler.onExpire();
}
qdaxb commented 8 years ago

@syndory 我说的是这种情况:

时间轴是这样的: 线程A:

wheel.putIfAbsent(position, new ConcurrentHashSet<ExpiringHandler>());
wheel.get(position)   //还没有add

线程B:

Set<ExpiringHandler> handlers = wheel.remove(timePointer);
if (handlers != null) {
**    for (ExpiringHandler handler : handlers)
        handler.onExpire();
}

假设有10个元素,正循环到第5个元素的时候,线程A:

.add(handler);

此时线程A插入了一个handler,但是B正在循环handlers,插入本身不会报错,但是我记得iterater不能保证在迭代的同时插入新数据能取出插入后的数据,相当于新插入的这个handler不会被执行。

half-dead commented 8 years ago

add方法里的position的计算逻辑是不是有问题,比如ttl传201,这一项永远不会被删除

ghost commented 8 years ago

@half-dead 噢,对的,谢谢指正。需要把ttl转化成tickDuration的整倍数才行。

int position = (timePointer + ttl / tickDuration * tickDuration) % maxTTL;

ghost commented 8 years ago

多谢 @qdaxb @half-dead 的指正和帮助进步。请求把issue再保留两天,看看有没有大牛给一个比较合适的TimeWheel实现。

qdaxb commented 8 years ago

@syndory netty自带了HashedWheelTimer

ghost commented 8 years ago

好的,如果合适会提个PR。