moquette-io / moquette

Java MQTT lightweight broker
http://moquette-io.github.io/moquette/
Apache License 2.0
2.32k stars 818 forks source link

High CPU usage when run Ctrie.insert function #841

Open JasontieZhu opened 5 months ago

JasontieZhu commented 5 months ago

Expected behavior

I found that the cpu usage of mqtt broker is high after running for one month,the cpu usage increase from 9% to 95% The number of CPU cores of the server is 24U I found that the ctrie tree is very large when the cpu usage is high, the tree contains a large number of tomb nodes (TNode) In this case, a large number of CPU resources are consumed by running function: CNode.anychidrenmatch.

1、Why do a large number of TNode nodes exist in the system and are not cleared?

2、Hope that the TNode node can be automatically cleared or a thread can be used to automatically clear the TNode when the TNode is not used.

3、What the following scenarios does the system not automatically clear TNodes?

if (cnode instanceof TNode) { // this inode is a tomb, has no clients and should be cleaned up // Because we implemented cleanTomb below, this should be rare, but possible // Consider calling cleanTomb here too return Action.OK; }

Actual behavior

when the usage is 5%, the number of TNodes is 151, but when the cpu usage is up to 95% , the number of TNodes is 7,065,591 The TNodes are not cleared, and the number is been increasing over time.

Steps to reproduce

Minimal yet complete reproducer code (or URL to code) or complete log file

Moquette MQTT version

0.16 https://github.com/moquette-io/moquette/archive/refs/tags/v0.16.tar.gz

JVM version (e.g. java -version)

1.8

OS version (e.g. uname -a)

JasontieZhu commented 5 months ago

We want to modify the Ctrie remove function if (cnode instanceof TNode) { // this inode is a tomb, has no clients and should be cleaned up // Because we implemented cleanTomb below, this should be rare, but possible // Consider calling cleanTomb here too return cleanTomb(inode, iParent) ? Action.OK : Action.REPEAT }

and a thread can be used to automatically clear the TNode

JasontieZhu commented 5 months ago

There's a possibility 1.inode.compareAndSet successfully, but cleantomb failed when remove the node, and action is change to repeat

  1. run the remove again,and judge that the cnode is change to Tnode. So return the action.ok,the node is not clear
JasontieZhu commented 5 months ago

I found that the TNode's token is null.

because 1、In the old version, it is new TNode() in the Ctrie.remove function, but in the new version, it is new TNode(cnode.getToken()) in the Ctrie.remove function. 2、In the new version, the TNode can be reused when createNodeAndInsertSubscription but not in the old version

so if the tnode remove failed, the number of TNode become more and more in the old version, but not in the new version

the code as follows: new version: if (cnode.containsOnly(clientId) && topic.isEmpty() && cnode.allChildren().isEmpty()) { // last client to leave this node, AND there are no downstream children, remove via TNode tomb if (inode == this.root) { return inode.compareAndSet(cnode, inode.mainNode().copy()) ? Action.OK : Action.REPEAT; } TNode tnode = new TNode(cnode.getToken()); return inode.compareAndSet(cnode, tnode) ? cleanTomb(inode, iParent) : Action.REPEAT;

old version: if (cnode.containsOnly(clientId) && topic.isEmpty() && cnode.allChildren().isEmpty()) { // last client to leave this node, AND there are no downstream children, remove via TNode tomb if (inode == this.root) { return inode.compareAndSet(cnode, inode.mainNode().copy()) ? Action.OK : Action.REPEAT; } TNode tnode = new TNode(); return inode.compareAndSet(cnode, tnode) ? cleanTomb(inode, iParent) : Action.REPEAT;

private Action createNodeAndInsertSubscription(Topic topic, INode inode, CNode cnode, SubscriptionRequest request) {
    final INode newInode = createPathRec(topic, request);
    final CNode updatedCnode;
    **if (cnode instanceof TNode) {
        updatedCnode = new CNode(cnode.getToken());**
    } else {
        updatedCnode = cnode.copy();
    }
    updatedCnode.add(newInode);

    return inode.compareAndSet(cnode, updatedCnode) ? Action.OK_NEW : Action.REPEAT;
JasontieZhu commented 5 months ago

I found that it's maybe solve by the commit hylkevds https://github.com/moquette-io/moquette/commit/62cb2b3b48f6f0665e8387e6feda664e686ecf57

JasontieZhu commented 5 months ago

What the scenarios does the system not automatically clear TNodes?

Just set TNode successfully, but remove TNode failed.

I can't figure out why.

JasontieZhu commented 5 months ago
private Action cleanTomb(INode inode, INode iParent) {
    CNode updatedCnode = iParent.mainNode().copy();
    updatedCnode.remove(inode);
    return iParent.compareAndSet(iParent.mainNode(), updatedCnode) ? Action.OK : Action.REPEAT;
}

public void remove(INode node) {
    this.children.remove(node) ;
}

why do not just change to

private Action cleanTomb(INode inode, INode iParent) {
    return iParent.mainNode().remove(inode) ? Action.OK : Action.REPEAT;
}

public boolean remove(INode node) {
    return this.children.remove(node);
}
hylkevds commented 5 months ago

the compareAndSet is because there may be many parallel threads trying to edit the same node at the same time, and only one can "win" and have it's changes be accepted. The other threads will have to retry.

JasontieZhu commented 5 months ago

1、However, the synchronous simulation shows that the two threads enter the cleantomb function, and the later thread overwrites the result of the previous thread, and the return result is success.

2、I Just use sleep function to increase the probability of problems, and use two threads to call cleantomb at the same time.

3、The following is the test code: private static String cleanTomb(INode inode, INode iParent) { CNode updatedCnode = iParent.mainNode().copy(); updatedCnode.remove(inode); System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb start iParent:" + iParent.mainNode()); System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb start updatedCnode:" + updatedCnode); try { Thread.sleep(3000); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb doing iParent.mainNode:" + iParent.mainNode()); System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb doing updatedCnode:" + updatedCnode); String result = iParent.compareAndSet(iParent.mainNode(), updatedCnode) ? "OK" : "REPEAT"; System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb result iParent.mainNode:" + iParent.mainNode()); System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb result:" + result); return result; } 4、 Assume that the original parent node has four children:users1,user2,user3,user4. Two threads (thread-0 and thread-1) enter the cleantomb function at the same time. Thread-0 removes users1 and thread-2 removes users2. the expect result is that the parent node has two children: user3 and user4, but the actual result is that thread-0 is executed last, and the parent node has three nodes: users2 users3 users4

5、the running result is: Thead: Thread-0 remove: users1 Thead: Thread-1 remove: users2 Thead: Thread-0 users list: users1 users2 users3 users4 Thead: Thread-1 users list: users1 users2 users3 users4 Thead: Thread-0 cleanTomb start iParent:users1 users2 users3 users4 Thead: Thread-1 cleanTomb start iParent:users1 users2 users3 users4 Thead: Thread-0 cleanTomb start updatedCnode:users2 users3 users4 Thead: Thread-1 cleanTomb start updatedCnode:users1 users3 users4 Thead: Thread-1 cleanTomb doing iParent.mainNode:users1 users2 users3 users4 Thead: Thread-0 cleanTomb doing iParent.mainNode:users1 users2 users3 users4 Thead: Thread-1 cleanTomb doing updatedCnode:users1 users3 users4 Thead: Thread-0 cleanTomb doing updatedCnode:users2 users3 users4 Thead: Thread-1 cleanTomb result iParent.mainNode:users2 users3 users4 Thead: Thread-1 cleanTomb result:OK Thead: Thread-0 cleanTomb result iParent.mainNode:users2 users3 users4 Thead: Thread-0 cleanTomb result:OK

6、the return result of two threads are OK,and not retry again

JasontieZhu commented 5 months ago

1、The cause of the problem is that the copy method is a deep copy function. Each function that enters the cleantomb function obtains a copy of the deep copy. If two threads enter the cleantomb function at the same time, although the setting is successful, the last thread that runs the setting result will overwrite the execution result of the previous thread. The TNode of the previous thread is not deleted.

new ArrayList<>(children) is a deepcopy function.

private CNode(Token token, List<INode> children, Set<Subscription> subscriptions) {
    this.token = token; // keep reference, root comparison in directory logic relies on it for now.
    this.subscriptions = new HashSet<>(subscriptions);
    this.children = new ArrayList<>(children);
}

2、for example:、 List aList = Arrays.asList("a", "b", "c"); List bList = new ArrayList<>(aList); System.out.println("aList:" + aList); System.out.println("bList:" + bList); bList.remove("b"); System.out.println("aList remove result:" + aList); System.out.println("bList remove result:" + bList); bList.set(0,"vvvv"); System.out.println("aList set result:" + aList); System.out.println("bList set result:" + bList);

3、the running result is, and aList is not modified: aList:[a, b, c] bList:[a, b, c] aList remove result:[a, b, c] bList remove result:[a, c] aList set result:[a, b, c] bList set result:[vvvv, c]

JasontieZhu commented 5 months ago

Even if return repeat, the following method is executed, and no operation is performed, Action.OK is returned and the TNode node is not deleted. As time goes on, more and more TNode nodes are not deleted, and the tree has more and more child nodes, reaching more than 7 million. So excute the inode.mainNode().anyChildrenMatch function directly causes high CPU usage in the 0.16 version.

        final CNode cnode = inode.mainNode();
        if (cnode instanceof TNode) {
            // this inode is a tomb, has no clients and should be cleaned up
            // Because we implemented cleanTomb below, this should be rare, but possible
            // Consider calling cleanTomb here too
            **return Action.OK;**
        }
JasontieZhu commented 5 months ago

Theoretically, this problem is solved in version 0.17. The token of the TNode node is no longer empty. Even if the token node is not deleted, the node will be restarted when the same token node is added next time, and the TNode node will be converted to the CNode node, In this case, the number of TNode nodes will not increase to 7 million, Unless the token nodes are not reused.

hylkevds https://github.com/moquette-io/moquette/commit/62cb2b3b48f6f0665e8387e6feda664e686ecf57

private Action createNodeAndInsertSubscription(Topic topic, INode inode, CNode cnode, Subscription newSubscription) { final INode newInode = createPathRec(topic, newSubscription); final CNode updatedCnode; if (cnode instanceof TNode) { updatedCnode = new CNode(cnode.getToken()); } else { updatedCnode = cnode.copy(); } updatedCnode.add(newInode); return inode.compareAndSet(cnode, updatedCnode) ? Action.OK : Action.REPEAT;

JasontieZhu commented 5 months ago

Therefore, to avoid this situation, it is recommended that you directly remove the node, but you need to check whether the node exists in the child set.

private Action cleanTomb(INode inode, INode iParent) { return iParent.mainNode().remove(inode) ? Action.OK : Action.REPEAT; }

public boolean remove(INode node) { if (!this.children.contains(node)) { return true; } return this.children.remove(node); }

hylkevds commented 5 months ago

Are you testing this on the development version, or on a released version?

private Action cleanTomb(INode inode, INode iParent) {
    CNode updatedCnode = iParent.mainNode().copy();
    updatedCnode.remove(inode);
    return iParent.compareAndSet(iParent.mainNode(), updatedCnode) ? Action.OK : Action.REPEAT;
}

The problem is that iParent.mainNode() is called twice. That is wrong. The first argument of the compareAndSet must be the original node that we expect to replace. But because iParent.mainNode() is called twice, the node we are replacing may have been replaced by another thread in the meantime, and our copy is no longer valid. But we don't notice, because we're not comparing to the node we copied. Instead we're comparing to whatever is the current mainNode.

So the code should be:

private Action cleanTomb(INode inode, INode iParent) {
    CNode origCnode = iParent.mainNode();
    CNode updatedCnode = origCnode.copy();
    updatedCnode.remove(inode);
    return iParent.compareAndSet(origCnode, updatedCnode) ? Action.OK : Action.REPEAT;
}

Can you please test that?

JasontieZhu commented 5 months ago

1、the version 0.16, the link is https://github.com/moquette-io/moquette/blob/0.16/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java

2、the line is 204:

private Action cleanTomb(INode inode, INode iParent) {
    CNode updatedCnode = iParent.mainNode().copy();
    updatedCnode.remove(inode);
    return iParent.compareAndSet(iParent.mainNode(), updatedCnode) ? Action.OK : Action.REPEAT;
}

3、 you can use the following test code:

import java.util.concurrent.atomic.AtomicInteger;

public class Main { private static INode root = null; private static INode app = null; private static INode users = null;

private static AtomicInteger count = new AtomicInteger();
private static AtomicInteger removeCount = new AtomicInteger();

public static void init() {
    CNode croot = new CNode();
    croot.setToken(new Token("root"));
    root = new INode(croot);
    CNode capp = new CNode();
    capp.setToken(new Token("APP"));
    app = new INode(capp);
    CNode cusers = new CNode();
    cusers.setToken(new Token("users"));
    users = new INode(cusers);
    croot.add(app);
    capp.add(users);
    add();
}

private static String cleanTomb(INode inode, INode iParent) {
    CNode origCnode = iParent.mainNode();
    **CNode updatedCnode = origCnode.copy();**
    updatedCnode.remove(inode);
    System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb start iParent:" + iParent.mainNode());
    System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb start updatedCnode:" + updatedCnode);
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb doing iParent.mainNode:" + iParent.mainNode());
    System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb doing updatedCnode:" + updatedCnode);
    String result = iParent.compareAndSet(iParent.mainNode(), updatedCnode) ? "OK" : "REPEAT";
    System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb result iParent.mainNode:" + iParent.mainNode());
    System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb result:" + result);
    return result;
}

private static INode createLeafNodes(Token token) {
    CNode newLeafCnode = new CNode();
    newLeafCnode.setToken(token);
    return new INode(newLeafCnode);
}

public static void add() {
    int i = 3;
    while (i >= 0) {
        count.incrementAndGet();
        Integer id = count.get();
        users.mainNode().add(createLeafNodes(new Token("users" + id)));
        i--;
    }
    System.out.println("add result list:" + users.mainNode());

}

public static void main(String[] args) {
    init();
    Runnable removeRunnable = () -> {
        removeCount.incrementAndGet();
        Integer id = removeCount.get();
        System.out.println("Thead: " + Thread.currentThread().getName() + " remove: users" + id);
        System.out.println("Thead: " + Thread.currentThread().getName() + " users list: " + users.mainNode());
        INode child = users.mainNode().childOf(new Token("users" + id));
        cleanTomb(child, users);
    };

    Thread t2 = new Thread(removeRunnable);
    Thread t3 = new Thread(removeRunnable);
    t2.start();
    t3.start();
    try {
        t2.join();
        t3.join();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

}

4、the result is only delete one node,not two nodes: add result list:users1 users2 users3 users4 Thead: Thread-0 remove: users1 Thead: Thread-1 remove: users2 Thead: Thread-0 users list: users1 users2 users3 users4 Thead: Thread-1 users list: users1 users2 users3 users4 Thead: Thread-0 cleanTomb start iParent:users1 users2 users3 users4 Thead: Thread-1 cleanTomb start iParent:users1 users2 users3 users4 Thead: Thread-0 cleanTomb start updatedCnode:users2 users3 users4 Thead: Thread-1 cleanTomb start updatedCnode:users1 users3 users4 Thead: Thread-1 cleanTomb doing iParent.mainNode:users1 users2 users3 users4 Thead: Thread-1 cleanTomb doing updatedCnode:users1 users3 users4 Thead: Thread-0 cleanTomb doing iParent.mainNode:users1 users2 users3 users4 Thead: Thread-0 cleanTomb doing updatedCnode:users2 users3 users4 Thead: Thread-1 cleanTomb result iParent.mainNode:users1 users3 users4 Thead: Thread-1 cleanTomb result:OK Thead: Thread-0 cleanTomb result iParent.mainNode:users2 users3 users4 Thead: Thread-0 cleanTomb result:OK

JasontieZhu commented 5 months ago

In the multi-thread scenario, this problem is likely to occur. When the cleantomb function is running at the same time, the same copy is obtained. In this case, the thread that is set later overwrites the thread that is set earlier. The more clients are connected and the more clients are unsubscribed at the same time, the higher the concurrency of cleantomb. so the enviroment has 5 million TNodes, and not be cleaned.

hylkevds commented 5 months ago

You should really use at least 0.17 for testing. A lot has changed since 0.16...

With the fix I just posted, the result of the compareAndSet should no longer be OK when another thread has replaced the original value. It is important to use compareAndSet, precisely to make sure two threads don't overwrite each others changes. The two threads may have the same copy when starting the function, but when the first one replaces the content, the compare will fail for the second thread and it will re-try.

JasontieZhu commented 5 months ago

1、 iParent.compareAndSet(iParent.mainNode(), updatedCnode) ,the function is equivalent to iParent.mainNode().compareAndSet(iParent.mainNode(), newNode) the actual value is iParent.mainNode(), and the expect value is iParent.mainNode(), the actual value is equal to the the expect value,so iParent.compareAndSet(iParent.mainNode(), updatedCnode) is true, and not false

private Action cleanTomb(INode inode, INode iParent) { CNode updatedCnode = iParent.mainNode().copy(); updatedCnode.remove(inode); return iParent.compareAndSet(iParent.mainNode(), updatedCnode) ? Action.OK : Action.REPEAT; }

class INode { private AtomicReference mainNode = new AtomicReference<>();

INode(CNode mainNode) {
    this.mainNode.set(mainNode);
    if (mainNode instanceof TNode) { // this should never happen
        throw new IllegalStateException("TNode should not be set on mainNnode");
    }
}

**boolean compareAndSet(CNode old, CNode newNode) {
    return mainNode.compareAndSet(old, newNode);
}**

boolean compareAndSet(CNode old, TNode newNode) {
    return mainNode.compareAndSet(old, newNode);
}

CNode mainNode() {
    return this.mainNode.get();
}

boolean isTombed() {
    return this.mainNode() instanceof TNode;
}

}

/**
 * Atomically sets the value to the given updated value
 * if the current value {@code ==} the expected value.
 * @param expect the expected value
 * @param update the new value
 * @return {@code true} if successful. False return indicates that
 * the actual value was not equal to the expected value.
 */
public final boolean compareAndSet(V expect, V update) {
    return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}
hylkevds commented 5 months ago

1、 iParent.compareAndSet(iParent.mainNode(), updatedCnode) ,the function is equivalent to iParent.mainNode().compareAndSet(iParent.mainNode(), newNode) the actual value is iParent.mainNode(), and the expect value is iParent.mainNode(), the actual value is equal to the the expect value,so iParent.compareAndSet(iParent.mainNode(), updatedCnode) is true, and not false

Yes, that's why I explained in my previous post why iParent.compareAndSet(iParent.mainNode(), updatedCnode) is wrong. The problem is the second call to iParent.mainNode(). Instead of re-requesting the mainNode, it should pass the instance that was copied, since the mainNode in the iParent may have been changed by another thread in the time it took to make the copy, but if we pass the original value that was copied that conflict will be detected.

JasontieZhu commented 5 months ago

1、The latest version does the same. the link is https://github.com/moquette-io/moquette/blob/main/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java

/**
 *
 * Cleans Disposes of TNode in separate Atomic CAS operation per
 * http://bravenewgeek.com/breaking-and-entering-lose-the-lock-while-embracing-concurrency/
 * We roughly follow this theory above, but we allow CNode with no Subscriptions to linger (for now).
 *
 * @param inode inode that handle to the tomb node.
 * @param iParent inode parent.
 * @return REPEAT if this method wasn't successful or OK.
 */
private Action cleanTomb(INode inode, INode iParent) {
    CNode updatedCnode = iParent.mainNode().copy();
    updatedCnode.remove(inode);
    return iParent.compareAndSet(iParent.mainNode(), updatedCnode) ? Action.OK : Action.REPEAT;
}

2、it is recommended that you directly remove the node, and you need to check whether the node exists in the child set, and You can also give up using the the compareAndSet function. private Action cleanTomb(INode inode, INode iParent) { return iParent.mainNode().remove(inode) ? Action.OK : Action.REPEAT; }

public boolean remove(INode node) { if (!this.children.contains(node)) { return true; } return this.children.remove(node); } 3、the testing code is import java.util.concurrent.atomic.AtomicInteger;

public class Main { private static INode root = null; private static INode app = null; private static INode users = null;

private static AtomicInteger count = new AtomicInteger();
private static AtomicInteger removeCount = new AtomicInteger();

public static void init() {
    CNode croot = new CNode();
    croot.setToken(new Token("root"));
    root = new INode(croot);
    CNode capp = new CNode();
    capp.setToken(new Token("APP"));
    app = new INode(capp);
    CNode cusers = new CNode();
    cusers.setToken(new Token("users"));
    users = new INode(cusers);
    croot.add(app);
    capp.add(users);
    add();
}

private static String cleanTomb(INode inode, INode iParent) {
    CNode updatedCnode = iParent.mainNode().copy();

// CNode updatedCnode = origCnode.copy(); // updatedCnode.remove(inode); System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb start iParent:" + iParent.mainNode()); System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb start updatedCnode:" + updatedCnode); try { Thread.sleep(3000); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb doing iParent.mainNode:" + iParent.mainNode()); // System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb doing updatedCnode:" + updatedCnode); // String result = iParent.compareAndSet(iParent.mainNode(), updatedCnode) ? "OK" : "REPEAT"; String result = iParent.mainNode().remove(inode) ? "OK" : "REPEAT"; System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb result iParent.mainNode:" + iParent.mainNode()); System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb result:" + result); return result; }

private static INode createLeafNodes(Token token) {
    CNode newLeafCnode = new CNode();
    newLeafCnode.setToken(token);
    return new INode(newLeafCnode);
}

public static void add() {
    int i = 3;
    while (i >= 0) {
        count.incrementAndGet();
        Integer id = count.get();
        users.mainNode().add(createLeafNodes(new Token("users" + id)));
        i--;
    }
    System.out.println("add result list:" + users.mainNode());

}

public static void main(String[] args) {
    init();
    Runnable removeRunnable = () -> {
        removeCount.incrementAndGet();
        Integer id = removeCount.get();
        System.out.println("Thead: " + Thread.currentThread().getName() + " remove: users" + id);
        System.out.println("Thead: " + Thread.currentThread().getName() + " users list: " + users.mainNode());
        INode child = users.mainNode().childOf(new Token("users" + id));
        cleanTomb(child, users);
    };

    Thread t2 = new Thread(removeRunnable);
    Thread t3 = new Thread(removeRunnable);
    t2.start();
    t3.start();
    try {
        t2.join();
        t3.join();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

}

4、the modified of CNode.java public boolean remove(INode node) { // this.children.remove(node); if (!this.children.contains(node)) { return true; } return this.children.remove(node); }

5、 the test result is that two nodes has been deleted add result list:users1 users2 users3 users4 Thead: Thread-0 remove: users1 Thead: Thread-1 remove: users2 Thead: Thread-0 users list: users1 users2 users3 users4 Thead: Thread-1 users list: users1 users2 users3 users4 Thead: Thread-0 cleanTomb start iParent:users1 users2 users3 users4 Thead: Thread-0 cleanTomb start updatedCnode:users1 users2 users3 users4 Thead: Thread-1 cleanTomb start iParent:users1 users2 users3 users4 Thead: Thread-1 cleanTomb start updatedCnode:users1 users2 users3 users4 Thead: Thread-0 cleanTomb doing iParent.mainNode:users1 users2 users3 users4 Thead: Thread-1 cleanTomb doing iParent.mainNode:users1 users2 users3 users4 Thead: Thread-0 cleanTomb result iParent.mainNode:users3 users4 Thead: Thread-0 cleanTomb result:OK Thead: Thread-1 cleanTomb result iParent.mainNode:users3 users4 Thead: Thread-1 cleanTomb result:OK

hylkevds commented 5 months ago

1、The latest version does the same. the link is https://github.com/moquette-io/moquette/blob/main/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java

Of course, I just posted the update for you to test here in the thread, I didn't make a PR yet, let alone that Andsel had time to review that PR and merge it...

JasontieZhu commented 5 months ago

OK, thank you very much,I'll test it when you're done.

hylkevds commented 5 months ago

You can test it right now by changing the cleanTomb function to the version I put in https://github.com/moquette-io/moquette/issues/841#issuecomment-2162191580

JasontieZhu commented 5 months ago

1、yes,one of the threads fails to execute, the result is: add result list:users1 users2 users3 users4 Thead: Thread-0 remove: users1 Thead: Thread-1 remove: users2 Thead: Thread-1 users list: users1 users2 users3 users4 Thead: Thread-0 users list: users1 users2 users3 users4 Thead: Thread-1 cleanTomb start iParent:users1 users2 users3 users4 Thead: Thread-1 cleanTomb start updatedCnode:users1 users3 users4 Thead: Thread-0 cleanTomb start iParent:users1 users2 users3 users4 Thead: Thread-0 cleanTomb start updatedCnode:users2 users3 users4 Thead: Thread-1 cleanTomb doing iParent.mainNode:users1 users2 users3 users4 Thead: Thread-0 cleanTomb doing iParent.mainNode:users1 users2 users3 users4 Thead: Thread-0 cleanTomb doing updatedCnode:users2 users3 users4 Thead: Thread-1 cleanTomb doing updatedCnode:users1 users3 users4 Thead: Thread-0 cleanTomb result iParent.mainNode:users2 users3 users4 Thead: Thread-0 cleanTomb result:OK Thead: Thread-1 cleanTomb result iParent.mainNode:users2 users3 users4 Thead: Thread-1 cleanTomb result:REPEAT

2、the test code is import java.util.concurrent.atomic.AtomicInteger;

public class Main { private static INode root = null; private static INode app = null; private static INode users = null;

private static AtomicInteger count = new AtomicInteger();
private static AtomicInteger removeCount = new AtomicInteger();

public static void init() {
    CNode croot = new CNode();
    croot.setToken(new Token("root"));
    root = new INode(croot);
    CNode capp = new CNode();
    capp.setToken(new Token("APP"));
    app = new INode(capp);
    CNode cusers = new CNode();
    cusers.setToken(new Token("users"));
    users = new INode(cusers);
    croot.add(app);
    capp.add(users);
    add();
}

private static String cleanTomb(INode inode, INode iParent) {
    CNode origCnode  = iParent.mainNode();
    CNode updatedCnode = origCnode.copy();
    updatedCnode.remove(inode);
    System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb start iParent:" + iParent.mainNode());
    System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb start updatedCnode:" + updatedCnode);
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb doing iParent.mainNode:" + iParent.mainNode());
    System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb doing updatedCnode:" + updatedCnode);
    String result = iParent.compareAndSet(origCnode, updatedCnode) ? "OK" : "REPEAT";
    System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb result iParent.mainNode:" + iParent.mainNode());
    System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb result:" + result);
    return result;
}

private static INode createLeafNodes(Token token) {
    CNode newLeafCnode = new CNode();
    newLeafCnode.setToken(token);
    return new INode(newLeafCnode);
}

public static void add() {
    int i = 3;
    while (i >= 0) {
        count.incrementAndGet();
        Integer id = count.get();
        users.mainNode().add(createLeafNodes(new Token("users" + id)));
        i--;
    }
    System.out.println("add result list:" + users.mainNode());

}

public static void main(String[] args) {
    init();
    Runnable removeRunnable = () -> {
        removeCount.incrementAndGet();
        Integer id = removeCount.get();
        System.out.println("Thead: " + Thread.currentThread().getName() + " remove: users" + id);
        System.out.println("Thead: " + Thread.currentThread().getName() + " users list: " + users.mainNode());
        INode child = users.mainNode().childOf(new Token("users" + id));
        cleanTomb(child, users);
    };

    Thread t2 = new Thread(removeRunnable);
    Thread t3 = new Thread(removeRunnable);
    t2.start();
    t3.start();
    try {
        t2.join();
        t3.join();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

}

3、 but the threads that failed to execute, retry to run the program, the TNode can not be deleted. because inode.compareAndSet(cnode, tnode) has change the cnode to TNode, and if (cnode instanceof TNode) will directly return Action.OK

public void removeFromTree(Topic topic, String clientID) { Action res; do { res = remove(clientID, topic, this.root, NO_PARENT); } while (res == Action.REPEAT); }

private Action remove(String clientId, Topic topic, INode inode, INode iParent) {
    Token token = topic.headToken();
    final CNode cnode = inode.mainNode();
    if (!topic.isEmpty()) {
        Optional<INode> nextInode = cnode.childOf(token);
        if (nextInode.isPresent()) {
            Topic remainingTopic = topic.exceptHeadToken();
            return remove(clientId, remainingTopic, nextInode.get(), inode);
        }
    }
    **if (cnode instanceof TNode) {
        // this inode is a tomb, has no clients and should be cleaned up
        // Because we implemented cleanTomb below, this should be rare, but possible
        // Consider calling cleanTomb here too
        return Action.OK;
    }**
    if (cnode.containsOnly(clientId) && topic.isEmpty() && cnode.allChildren().isEmpty()) {
        // last client to leave this node, AND there are no downstream children, remove via TNode tomb
        if (inode == this.root) {
            return inode.compareAndSet(cnode, inode.mainNode().copy()) ? Action.OK : Action.REPEAT;
        }
        TNode tnode = new TNode(cnode.getToken());
        return **inode.compareAndSet(cnode, tnode)** ? cleanTomb(inode, iParent) : Action.REPEAT;
    } else if (cnode.contains(clientId) && topic.isEmpty()) {
        CNode updatedCnode = cnode.copy();
        updatedCnode.removeSubscriptionsFor(clientId);
        return inode.compareAndSet(cnode, updatedCnode) ? Action.OK : Action.REPEAT;
    } else {
        //someone else already removed
        return Action.OK;
    }
JasontieZhu commented 5 months ago

so it is recommended that change in this way to ensure that the TNode is deleted. https://github.com/moquette-io/moquette/issues/841#issuecomment-2161901202

hylkevds commented 5 months ago

so it is recommended that change in this way to ensure that the TNode is deleted. #841 (comment)

That won't work, since ArrayList is not thread-safe and thread-safe List implementations are not lock-free.

When fixing a bug leads us to another bug, we'll just have to fix that one too.

JasontieZhu commented 5 months ago

OK, if (cnode instanceof TNode) should cleantomb again, until deleted successfully.

private Action remove(String clientId, Topic topic, INode inode, INode iParent) { Token token = topic.headToken(); if (!topic.isEmpty() && (inode.mainNode().anyChildrenMatch(token))) { Topic remainingTopic = topic.exceptHeadToken(); INode nextInode = inode.mainNode().childOf(token); return remove(clientId, remainingTopic, nextInode, inode); } else { final CNode cnode = inode.mainNode(); if (cnode instanceof TNode) { // this inode is a tomb, has no clients and should be cleaned up // Because we implemented cleanTomb below, this should be rare, but possible // Consider calling cleanTomb here too return cleanTomb(inode, iParent); } if (cnode.containsOnly(clientId) && topic.isEmpty() && cnode.allChildren().isEmpty()) { // last client to leave this node, AND there are no downstream children, remove via TNode tomb if (inode == this.root) { return inode.compareAndSet(cnode, inode.mainNode().copy()) ? Action.OK : Action.REPEAT; } TNode tnode = new TNode(); return inode.compareAndSet(cnode, tnode) ? cleanTomb(inode, iParent) : Action.REPEAT; } else if (cnode.contains(clientId) && topic.isEmpty()) { CNode updatedCnode = cnode.copy(); updatedCnode.removeSubscriptionsFor(clientId); return inode.compareAndSet(cnode, updatedCnode) ? Action.OK : Action.REPEAT; } else { //someone else already removed return Action.OK; }

hylkevds commented 5 months ago

Something else to check: if we remove the last INode from a level, is the parent also cleaned up? We should not be leaving empty nodes hanging around...

JasontieZhu commented 5 months ago

Do all children nodes become TNodes? Only TNodes can be removed. if the node is not TNode, it will not be removed, so the root node is not automatically removed.

JasontieZhu commented 5 months ago

After thinking about it, only one thread compareAndSet succeeds and the other threads compareAndSet fails. In this case, if the cleanTomb concurrency is high and only one success is allowed each time, if the cleanTomb concurrency is deleted again during recursion, the cleanTomb concurrency may increase and the recursion may not end. Therefore, if the cleanTomb fails and the node is found to be a tombstone node, you are advised to perform the cleanup for a maximum of three times. If the cleanup still fails, break the recursion. It would then be better to have a separate thread that regularly handles the deletion of TNodes.

private Action remove(String clientId, Topic topic, INode inode, INode iParent) { Token token = topic.headToken(); final CNode cnode = inode.mainNode(); if (!topic.isEmpty()) { Optional nextInode = cnode.childOf(token); if (nextInode.isPresent()) { Topic remainingTopic = topic.exceptHeadToken(); return remove(clientId, remainingTopic, nextInode.get(), inode); } } if (cnode instanceof TNode) { // this inode is a tomb, has no clients and should be cleaned up // Because we implemented cleanTomb below, this should be rare, but possible // Consider calling cleanTomb here too int i = 3; while (i > 0) { if (cleanTomb(inode, iParent) == Action.OK) { break; } ; i--; } return Action.OK; } if (cnode.containsOnly(clientId) && topic.isEmpty() && cnode.allChildren().isEmpty()) { // last client to leave this node, AND there are no downstream children, remove via TNode tomb if (inode == this.root) { return inode.compareAndSet(cnode, inode.mainNode().copy()) ? Action.OK : Action.REPEAT; } TNode tnode = new TNode(cnode.getToken()); return inode.compareAndSet(cnode, tnode) ? cleanTomb(inode, iParent) : Action.REPEAT; } else if (cnode.contains(clientId) && topic.isEmpty()) { CNode updatedCnode = cnode.copy(); updatedCnode.removeSubscriptionsFor(clientId); return inode.compareAndSet(cnode, updatedCnode) ? Action.OK : Action.REPEAT; } else { //someone else already removed return Action.OK; }

hylkevds commented 5 months ago

There is a maximum of 1 thread per CPU-core doing subscriptions, so there is no way for any "recursion" to increase.

JasontieZhu commented 5 months ago

The machine is a 24 U physical core, each physical core has four cores, and hyper-threading is 2, that is, 24 x 4 x 2 = 192 threads.

JasontieZhu commented 5 months ago

There are multiple threads processing the parent node at the same time when cleantomb, once this parent node fails to remove child, the recursion will continue. My tree is root->app->users->(user1, user2, user3, user4 .....),the users node has many childrens,When all threads are processing the parent node users, remove the different nodes of their children. such as thread 0 remove user1, and thread 1 remove user2 ,and so on. At the same time, just one thread remove successfully,but other threads remove failed,other threads just recursive until successfully.

hylkevds commented 5 months ago

That sounds like a great setup for gathering some statistics! Counting the required tries and storing them in little List of AtomicLongs. If it takes two tries, increase the Long at position 1 (don't want to track 0-tries). Then have a separate thread that outputs the statistics once ever minute or so... That would give us some insight is how big the problem really is.

In 0.17 this problem should be much less than in 0.16, since I sped up the copy procedure a lot and the copy procedure is the slow factor in the parallel process. Faster copies means less chance of a conflict.

Note that they do not recurse, they only repeat.

I guess the worst case in your case happens when many users "log off" and many userX nodes are removed at the same time succession. The root of the issue is the very flat tree you have. We may have to add a method to artificially deepen the tree in your case.

JasontieZhu commented 5 months ago

1、If cleanTomb fails during the running of remove function, repeat is returned, indicating that the recursion continues. During the recursion, the CNode have changed to the TNode. When the CNode is determined as the TNode instance, the following process is performed:

if (cnode instanceof TNode) { // this inode is a tomb, has no clients and should be cleaned up // Because we implemented cleanTomb below, this should be rare, but possible // Consider calling cleanTomb here too return Action.OK; }

2、If we do not modify the logic here, OK is returned and the recursion ends, but if we continue cleantomb here, cleantomb fails and the recursion continues until successfully:

if (cnode instanceof TNode) { // this inode is a tomb, has no clients and should be cleaned up // Because we implemented cleanTomb below, this should be rare, but possible // Consider calling cleanTomb here too int i = 3; while (i > 0) { if (cleanTomb(inode, iParent) == Action.OK) { break; } ; i--; } return Action.OK; }

3、The tree is flat. All devices are mounted under the users node. When the connection channel between the MQTT server and devices is inactive, the server automatically unsubscribes the topic,and By default, the connection is disconnected if the connection is inactive within 10 seconds. This is also added to avoid the increasing number of nodes and not automatically deleted.However, once these devices are simultaneously inactive, the cleantomb function will run simultaneously. At this point, only one thread is successfully compareAndSet on the same parent node. If other threads fail to be compareAndSet , the recursion continues until the modification is successful.

void handleConnectionLost() { String clientID = NettyUtils.clientID(channel); if (clientID == null || clientID.isEmpty()) { return; } LOG.info("Notifying connection lost event"); if (bindedSession.hasWill()) { postOffice.fireWill(bindedSession.getWill()); } if (bindedSession.isClean()) { LOG.debug("Remove session for client"); sessionRegistry.unsubscribe(bindedSession); sessionRegistry.remove(bindedSession); } else { bindedSession.disconnect(); } connected = false; //dispatch connection lost to intercept. String userName = NettyUtils.userName(channel); postOffice.dispatchConnectionLost(clientID,userName); LOG.trace("dispatch disconnection: userName={}", userName); }

4、There is a low probability that deletion fails. However, once a node fails to be deleted, the number of TNodes increases as time goes by. This is why this problem occurs after the system runs for about two months. More than 5 million TNodes are generated, resulting in high CPU usage.

JasontieZhu commented 5 months ago

There is no way to avoid the flatness of the tree built , so it is better to have a separate thread to delete the TNode node. In addition, some statistics logs can be added to collect statistics on the number of CompareAndSet failures, which facilitates fault locating.

hylkevds commented 5 months ago

There is no way to avoid the flatness of the tree built , so it is better to have a separate thread to delete the TNode node. In addition, some statistics logs can be added to collect statistics on the number of CompareAndSet failures, which facilitates fault locating.

It is always possible to internally make a tree deeper, for instance by separating all nodes based on the first character, then by the second, etc. Having a separate thread won't make things more efficient, since that thread still has to content with the other threads creating new nodes.

JasontieZhu commented 5 months ago

If this is the case, if only one thread can delete TNodes and other threads fail to delete TNodes, how should these TNodes be handled or not deleted? Could you send the latest modification? The following modification points may not ensure that all TNodes are deleted. https://github.com/moquette-io/moquette/issues/841#issuecomment-2162191580

hylkevds commented 5 months ago

My working version with the fixes I made above can be found at: https://github.com/FraunhoferIOSB/moquette/tree/fix_841_cleanTomb_reference_loss

JasontieZhu commented 5 months ago

OK. thank you , does this fix would be merged into the main branch?

hylkevds commented 5 months ago

It will probably get merged once Andsel has had time to look at it. In the meantime it would help if you could test it to see if the problem is actually fixed.

JasontieZhu commented 5 months ago

OK, thank you,However, the test period is long. This problem occurs occasionally. Therefore, I need to observe the problem for a long time. Capture the profile to check the memory and obtain the number of TNodes. and check whether the number of TNodes increases.

JasontieZhu commented 5 months ago

I found that the fixs has some errors and it hasn't been merged: Java CI with Maven / Test JDK 11, ubuntu-20.04 (pull_request) Failing after 11m

Error: Errors: Error: ConnectTest.receiveInflightPublishesAfterAReconnect:101 » Runtime Cannot recei... Error: MessageExpirationTest.givenPublishedMessageWithExpiryWhenMessageRemainInBrokerForMoreThanTheExipiryIsNotPublished:156 » Runtime

hylkevds commented 5 months ago

Those build failures are caused by a timeout that is set too strict for the (quite slow) GitHub build systems. That's why it fails only sometimes and not always.

843 should fix it.

JasontieZhu commented 4 months ago

https://github.com/moquette-io/moquette/pull/842 It has not been integrated, and code conflicts exist.

hylkevds commented 4 months ago

I've done a bit of testing with artificial-tree-deepening, with some interesting results. Described in #849

hylkevds commented 4 months ago

Since, as we say in Dutch, Meten is weten (measuring is knowing) I also added threading to the tests:

500000 topics, in the form of: XXXXXX-mainTopic 1-16 Threads 1-5 max token length numbers are durations   1 2 4 6 8 12 16
1 3850 2326 1854 1767 1782 2190 1874
2 1586 1114 966 861 972 862 1026
3 1458 1074 973 823 853 933 910
4 826 565 560 637 485 424 632
5 6489 7242 8159 11910 19079 21325 24514

With a quick graph the significance of the problem becomes really clear: CTreeTokenLength

The first two points (1 and 2 Threads) for token length 6 are: 204621, 311713 For this test, token length 6 is essentially the current default behaviour.

hylkevds commented 3 months ago

842 only fixes a part of the problem, reopening :)

JasontieZhu commented 2 weeks ago

https://github.com/moquette-io/moquette/issues/841#issuecomment-2162191580 After the modification, the CPU usage decreases from 50% to less than 20% after running on the production environment for more than one month.