apache / dubbo

The java implementation of Apache Dubbo. An RPC and microservice framework.
Apache License 2.0
40.42k stars 26.41k forks source link

HashedWheelTimer 内存占用过高bug (dubbo 2.7.8) #6820

Closed zx844174097 closed 3 years ago

zx844174097 commented 3 years ago

bug 所在文件 https://github.com/apache/dubbo/blob/master/dubbo-common/src/main/java/org/apache/dubbo/common/timer/HashedWheelTimer.java

出现问题的方法: private boolean isWindows() { return System.getProperty("os.name", "").toLowerCase(Locale.US).contains("win"); } 问题产生原因: 1.waitForNextTick() 中无限调用 isWindows();

2.toLowerCase(Locale.US) 将急速产生 char[] 对象。导致内存占用过高。

解决方案: ` private static boolean isWindows = System.getProperty("os.name", "").toLowerCase(Locale.US).contains("win");

private boolean isWindows() {
    return isWindows;

临时解决方案: 覆盖编写HashedWheelTimer 类 `/*

package org.apache.dubbo.common.timer;

import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.utils.ClassUtils;

import java.util.Collections; import java.util.HashSet; import java.util.Locale; import java.util.Queue; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong;

/* 覆盖阿里该类,用来修复内存占用过高的bug / public class HashedWheelTimer implements Timer {

 * may be in spi?
public static final String NAME = "hased";

private static final Logger logger = LoggerFactory.getLogger(HashedWheelTimer.class);

private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
private static final int INSTANCE_COUNT_LIMIT = 64;
private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER = AtomicIntegerFieldUpdater
        .newUpdater(HashedWheelTimer.class, "workerState");

private final Worker worker = new Worker();
private final Thread workerThread;

private static final int WORKER_STATE_INIT = 0;
private static final int WORKER_STATE_STARTED = 1;
private static final int WORKER_STATE_SHUTDOWN = 2;

 * 0 - init, 1 - started, 2 - shut down
@SuppressWarnings({ "unused", "FieldMayBeFinal" })
private volatile int workerState;

private final long tickDuration;
private final HashedWheelBucket[] wheel;
private final int mask;
private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
private final Queue<HashedWheelTimeout> timeouts = new LinkedBlockingQueue<>();
private final Queue<HashedWheelTimeout> cancelledTimeouts = new LinkedBlockingQueue<>();
private final AtomicLong pendingTimeouts = new AtomicLong(0);
private final long maxPendingTimeouts;

private volatile long startTime;

 * Creates a new timer with the default thread factory
 * ({@link Executors#defaultThreadFactory()}), default tick duration, and
 * default number of ticks per wheel.
public HashedWheelTimer() {

 * Creates a new timer with the default thread factory
 * ({@link Executors#defaultThreadFactory()}) and default number of ticks per
 * wheel.
 * @param tickDuration the duration between tick
 * @param unit         the time unit of the {@code tickDuration}
 * @throws NullPointerException     if {@code unit} is {@code null}
 * @throws IllegalArgumentException if {@code tickDuration} is &lt;= 0
public HashedWheelTimer(long tickDuration, TimeUnit unit) {
    this(Executors.defaultThreadFactory(), tickDuration, unit);

 * Creates a new timer with the default thread factory
 * ({@link Executors#defaultThreadFactory()}).
 * @param tickDuration  the duration between tick
 * @param unit          the time unit of the {@code tickDuration}
 * @param ticksPerWheel the size of the wheel
 * @throws NullPointerException     if {@code unit} is {@code null}
 * @throws IllegalArgumentException if either of {@code tickDuration} and
 *                                  {@code ticksPerWheel} is &lt;= 0
public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
    this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);

 * Creates a new timer with the default tick duration and default number of
 * ticks per wheel.
 * @param threadFactory a {@link ThreadFactory} that creates a background
 *                      {@link Thread} which is dedicated to {@link TimerTask}
 *                      execution.
 * @throws NullPointerException if {@code threadFactory} is {@code null}
public HashedWheelTimer(ThreadFactory threadFactory) {
    this(threadFactory, 100, TimeUnit.MILLISECONDS);

 * Creates a new timer with the default number of ticks per wheel.
 * @param threadFactory a {@link ThreadFactory} that creates a background
 *                      {@link Thread} which is dedicated to {@link TimerTask}
 *                      execution.
 * @param tickDuration  the duration between tick
 * @param unit          the time unit of the {@code tickDuration}
 * @throws NullPointerException     if either of {@code threadFactory} and
 *                                  {@code unit} is {@code null}
 * @throws IllegalArgumentException if {@code tickDuration} is &lt;= 0
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
    this(threadFactory, tickDuration, unit, 512);

 * Creates a new timer.
 * @param threadFactory a {@link ThreadFactory} that creates a background
 *                      {@link Thread} which is dedicated to {@link TimerTask}
 *                      execution.
 * @param tickDuration  the duration between tick
 * @param unit          the time unit of the {@code tickDuration}
 * @param ticksPerWheel the size of the wheel
 * @throws NullPointerException     if either of {@code threadFactory} and
 *                                  {@code unit} is {@code null}
 * @throws IllegalArgumentException if either of {@code tickDuration} and
 *                                  {@code ticksPerWheel} is &lt;= 0
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) {
    this(threadFactory, tickDuration, unit, ticksPerWheel, -1);

 * Creates a new timer.
 * @param threadFactory      a {@link ThreadFactory} that creates a background
 *                           {@link Thread} which is dedicated to
 *                           {@link TimerTask} execution.
 * @param tickDuration       the duration between tick
 * @param unit               the time unit of the {@code tickDuration}
 * @param ticksPerWheel      the size of the wheel
 * @param maxPendingTimeouts The maximum number of pending timeouts after which
 *                           call to {@code newTimeout} will result in
 *                           {@link java.util.concurrent.RejectedExecutionException}
 *                           being thrown. No maximum pending timeouts limit is
 *                           assumed if this value is 0 or negative.
 * @throws NullPointerException     if either of {@code threadFactory} and
 *                                  {@code unit} is {@code null}
 * @throws IllegalArgumentException if either of {@code tickDuration} and
 *                                  {@code ticksPerWheel} is &lt;= 0
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel,
        long maxPendingTimeouts) {

    if (threadFactory == null) {
        throw new NullPointerException("threadFactory");
    if (unit == null) {
        throw new NullPointerException("unit");
    if (tickDuration <= 0) {
        throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
    if (ticksPerWheel <= 0) {
        throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);

    // Normalize ticksPerWheel to power of two and initialize the wheel.
    wheel = createWheel(ticksPerWheel);
    mask = wheel.length - 1;

    // Convert tickDuration to nanos.
    this.tickDuration = unit.toNanos(tickDuration);

    // Prevent overflow.
    if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
        throw new IllegalArgumentException(
                String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration,
                        Long.MAX_VALUE / wheel.length));
    workerThread = threadFactory.newThread(worker);

    this.maxPendingTimeouts = maxPendingTimeouts;

            && WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {

protected void finalize() throws Throwable {
    try {
    } finally {
        // This object is going to be GCed and it is assumed the ship has sailed to do a
        // proper shutdown. If
        // we have not yet shutdown then we want to make sure we decrement the active
        // instance count.

private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
    if (ticksPerWheel <= 0) {
        throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
    if (ticksPerWheel > 1073741824) {
        throw new IllegalArgumentException("ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);

    ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
    HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
    for (int i = 0; i < wheel.length; i++) {
        wheel[i] = new HashedWheelBucket();
    return wheel;

private static int normalizeTicksPerWheel(int ticksPerWheel) {
    int normalizedTicksPerWheel = ticksPerWheel - 1;
    normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 1;
    normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 2;
    normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 4;
    normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 8;
    normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 16;
    return normalizedTicksPerWheel + 1;

 * Starts the background thread explicitly. The background thread will start
 * automatically on demand even if you did not call this method.
 * @throws IllegalStateException if this timer has been {@linkplain #stop()
 *                               stopped} already
public void start() {
    switch (WORKER_STATE_UPDATER.get(this)) {
        throw new IllegalStateException("cannot be started once stopped");
        throw new Error("Invalid WorkerState");

    // Wait until the startTime is initialized by the worker.
    while (startTime == 0) {
        try {
        } catch (InterruptedException ignore) {
            // Ignore - it will be ready very soon.

public Set<Timeout> stop() {
    if (Thread.currentThread() == workerThread) {
        throw new IllegalStateException(HashedWheelTimer.class.getSimpleName() + ".stop() cannot be called from "
                + TimerTask.class.getSimpleName());

        // workerState can be 0 or 2 at this moment - let it always be 2.

        return Collections.emptySet();

    try {
        boolean interrupted = false;
        while (workerThread.isAlive()) {
            try {
            } catch (InterruptedException ignored) {
                interrupted = true;

        if (interrupted) {
    } finally {
    return worker.unprocessedTimeouts();

public boolean isStop() {

public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
    if (task == null) {
        throw new NullPointerException("task");
    if (unit == null) {
        throw new NullPointerException("unit");

    long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();

    if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
        throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount
                + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts
                + ")");


    // Add the timeout to the timeout queue which will be processed on the next
    // tick.
    // During processing all the queued HashedWheelTimeouts will be added to the
    // correct HashedWheelBucket.
    long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

    // Guard against overflow.
    if (delay > 0 && deadline < 0) {
        deadline = Long.MAX_VALUE;
    HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
    return timeout;

 * Returns the number of pending timeouts of this {@link Timer}.
public long pendingTimeouts() {
    return pendingTimeouts.get();

private static void reportTooManyInstances() {
    String resourceType = ClassUtils.simpleClassName(HashedWheelTimer.class);
    logger.error("You are creating too many " + resourceType + " instances. " + resourceType
            + " is a shared resource that must be reused across the JVM,"
            + "so that only a few instances are created.");

private final class Worker implements Runnable {
    private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();

    private long tick;

    public void run() {
        // Initialize the startTime.
        startTime = System.nanoTime();
        if (startTime == 0) {
            // We use 0 as an indicator for the uninitialized value here, so make sure it's
            // not 0 when initialized.
            startTime = 1;

        // Notify the other threads waiting for the initialization at start().

        do {
            final long deadline = waitForNextTick();
            if (deadline > 0) {
                int idx = (int) (tick & mask);
                HashedWheelBucket bucket = wheel[idx];
        } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

        // Fill the unprocessedTimeouts so we can return them from stop() method.
        for (HashedWheelBucket bucket : wheel) {
        for (;;) {
            HashedWheelTimeout timeout = timeouts.poll();
            if (timeout == null) {
            if (!timeout.isCancelled()) {

    private void transferTimeoutsToBuckets() {
        // transfer only max. 100000 timeouts per tick to prevent a thread to stale the
        // workerThread when it just
        // adds new timeouts in a loop.
        for (int i = 0; i < 100000; i++) {
            HashedWheelTimeout timeout = timeouts.poll();
            if (timeout == null) {
                // all processed
            if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
                // Was cancelled in the meantime.

            long calculated = timeout.deadline / tickDuration;
            timeout.remainingRounds = (calculated - tick) / wheel.length;

            // Ensure we don't schedule for past.
            final long ticks = Math.max(calculated, tick);
            int stopIndex = (int) (ticks & mask);

            HashedWheelBucket bucket = wheel[stopIndex];

    private void processCancelledTasks() {
        for (;;) {
            HashedWheelTimeout timeout = cancelledTimeouts.poll();
            if (timeout == null) {
                // all processed
            try {
            } catch (Throwable t) {
                if (logger.isWarnEnabled()) {
                    logger.warn("An exception was thrown while process a cancellation task", t);

     * calculate goal nanoTime from startTime and current tick number, then wait
     * until that goal has been reached.
     * @return Long.MIN_VALUE if received a shutdown request, current time otherwise
     *         (with Long.MIN_VALUE changed by +1)
    private long waitForNextTick() {
        long deadline = tickDuration * (tick + 1);

        for (;;) {
            final long currentTime = System.nanoTime() - startTime;
            long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;

            if (sleepTimeMs <= 0) {
                if (currentTime == Long.MIN_VALUE) {
                    return -Long.MAX_VALUE;
                } else {
                    return currentTime;
            if (isWindows()) {
                sleepTimeMs = sleepTimeMs / 10 * 10;

            try {
            } catch (InterruptedException ignored) {
                if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                    return Long.MIN_VALUE;

    Set<Timeout> unprocessedTimeouts() {
        return Collections.unmodifiableSet(unprocessedTimeouts);

private static final class HashedWheelTimeout implements Timeout {

    private static final int ST_INIT = 0;
    private static final int ST_CANCELLED = 1;
    private static final int ST_EXPIRED = 2;
    private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER = AtomicIntegerFieldUpdater
            .newUpdater(HashedWheelTimeout.class, "state");

    private final HashedWheelTimer timer;
    private final TimerTask task;
    private final long deadline;

    @SuppressWarnings({ "unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
    private volatile int state = ST_INIT;

     * RemainingRounds will be calculated and set by
     * Worker.transferTimeoutsToBuckets() before the HashedWheelTimeout will be
     * added to the correct HashedWheelBucket.
    long remainingRounds;

     * This will be used to chain timeouts in HashedWheelTimerBucket via a
     * double-linked-list. As only the workerThread will act on it there is no need
     * for synchronization / volatile.
    HashedWheelTimeout next;
    HashedWheelTimeout prev;

     * The bucket to which the timeout was added
    HashedWheelBucket bucket;

    HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
        this.timer = timer;
        this.task = task;
        this.deadline = deadline;

    public Timer timer() {
        return timer;

    public TimerTask task() {
        return task;

    public boolean cancel() {
        // only update the state it will be removed from HashedWheelBucket on next tick.
        if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
            return false;
        // If a task should be canceled we put this to another queue which will be
        // processed on each tick.
        // So this means that we will have a GC latency of max. 1 tick duration which is
        // good enough. This way
        // we can make again use of our MpscLinkedQueue and so minimize the locking /
        // overhead as much as possible.
        return true;

    void remove() {
        HashedWheelBucket bucket = this.bucket;
        if (bucket != null) {
        } else {

    public boolean compareAndSetState(int expected, int state) {
        return STATE_UPDATER.compareAndSet(this, expected, state);

    public int state() {
        return state;

    public boolean isCancelled() {
        return state() == ST_CANCELLED;

    public boolean isExpired() {
        return state() == ST_EXPIRED;

    public void expire() {
        if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {

        try {
        } catch (Throwable t) {
            if (logger.isWarnEnabled()) {
                logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);

    public String toString() {
        final long currentTime = System.nanoTime();
        long remaining = deadline - currentTime + timer.startTime;
        String simpleClassName = ClassUtils.simpleClassName(this.getClass());

        StringBuilder buf = new StringBuilder(192).append(simpleClassName).append('(').append("deadline: ");
        if (remaining > 0) {
            buf.append(remaining).append(" ns later");
        } else if (remaining < 0) {
            buf.append(-remaining).append(" ns ago");
        } else {

        if (isCancelled()) {
            buf.append(", cancelled");

        return buf.append(", task: ").append(task()).append(')').toString();

 * Bucket that stores HashedWheelTimeouts. These are stored in a linked-list
 * like datastructure to allow easy removal of HashedWheelTimeouts in the
 * middle. Also the HashedWheelTimeout act as nodes themself and so no extra
 * object creation is needed.
private static final class HashedWheelBucket {

     * Used for the linked-list datastructure
    private HashedWheelTimeout head;
    private HashedWheelTimeout tail;

     * Add {@link HashedWheelTimeout} to this bucket.
    void addTimeout(HashedWheelTimeout timeout) {
        assert timeout.bucket == null;
        timeout.bucket = this;
        if (head == null) {
            head = tail = timeout;
        } else {
            tail.next = timeout;
            timeout.prev = tail;
            tail = timeout;

     * Expire all {@link HashedWheelTimeout}s for the given {@code deadline}.
    void expireTimeouts(long deadline) {
        HashedWheelTimeout timeout = head;

        // process all timeouts
        while (timeout != null) {
            HashedWheelTimeout next = timeout.next;
            if (timeout.remainingRounds <= 0) {
                next = remove(timeout);
                if (timeout.deadline <= deadline) {
                } else {
                    // The timeout was placed into a wrong slot. This should never happen.
                    throw new IllegalStateException(
                            String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
            } else if (timeout.isCancelled()) {
                next = remove(timeout);
            } else {
            timeout = next;

    public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
        HashedWheelTimeout next = timeout.next;
        // remove timeout that was either processed or cancelled by updating the
        // linked-list
        if (timeout.prev != null) {
            timeout.prev.next = next;
        if (timeout.next != null) {
            timeout.next.prev = timeout.prev;

        if (timeout == head) {
            // if timeout is also the tail we need to adjust the entry too
            if (timeout == tail) {
                tail = null;
                head = null;
            } else {
                head = next;
        } else if (timeout == tail) {
            // if the timeout is the tail modify the tail to be the prev node.
            tail = timeout.prev;
        // null out prev, next and bucket to allow for GC.
        timeout.prev = null;
        timeout.next = null;
        timeout.bucket = null;
        return next;

     * Clear this bucket and return all not expired / cancelled {@link Timeout}s.
    void clearTimeouts(Set<Timeout> set) {
        for (;;) {
            HashedWheelTimeout timeout = pollTimeout();
            if (timeout == null) {
            if (timeout.isExpired() || timeout.isCancelled()) {

    private HashedWheelTimeout pollTimeout() {
        HashedWheelTimeout head = this.head;
        if (head == null) {
            return null;
        HashedWheelTimeout next = head.next;
        if (next == null) {
            tail = this.head = null;
        } else {
            this.head = next;
            next.prev = null;

        // null out prev and next to allow for GC.
        head.next = null;
        head.prev = null;
        head.bucket = null;
        return head;

private static boolean isWindows = System.getProperty("os.name", "").toLowerCase(Locale.US).contains("win");

private boolean isWindows() {
    return isWindows;

} `

zx844174097 commented 3 years ago

dubbo-future-timeout-thread 内存占用过高。频繁GC

raydl007 commented 3 years ago


raydl007 commented 3 years ago

@zx844174097 用你的方法测试后,不再频繁gc了,麻烦提个Pull Request吧,自己覆盖代码感觉还是不优雅