moquette-io / moquette

Java MQTT lightweight broker
http://moquette-io.github.io/moquette/
Apache License 2.0
2.32k stars 818 forks source link

Periodic failure #844

Closed Yunustt closed 1 week ago

Yunustt commented 5 months ago

Expected behavior

Actual behavior

Steps to reproduce

Minimal yet complete reproducer code (or URL to code) or complete log file

Moquette MQTT version

MQTT_3_1_1

JVM version (e.g. java -version)

1.8

OS version (e.g. uname -a)

centos 7 More than 10,000 clients are connected, usually once every week, the connection will not be connected, some data cannot be sent down, and some will also appear after internalPublish, empty pointer error, and no stack information. After the restart, it will return to normal. How to troubleshoot this?

andsel commented 5 months ago

@Tong-07 your description is not enough to reproduce or investigate the issue.

Please provide a reproducer or some useful logs when the issue manifest. However, I would check the memory status of the JVM process, in particular the direct memory, used by the network stack (MQTT Netty codecs).

Yunustt commented 2 months ago
/**
 * Intended usage is only for embedded versions of the broker, where the hosting application
 * want to use the broker to send a publish message. Like normal external publish message but
 * with some changes to avoid security check, and the handshake phases for Qos1 and Qos2. It
 * also doesn't notifyTopicPublished because using internally the owner should already know
 * where it's publishing.
 *
 * @param msg
 *            the message to publish
 */
public void internalPublish(MqttPublishMessage msg) {
    final MqttQoS qos = msg.fixedHeader().qosLevel();
    final Topic topic = new Topic(msg.variableHeader().topicName());
    final ByteBuf payload = msg.payload();
    LOG.info("Sending internal PUBLISH message Topic={}, qos={}", topic, qos);

    publish2Subscribers(payload, topic, qos);
    LOG.info("publish2Subscribers internal PUBLISH message Topic={}, qos={}", topic, qos);
    if (!msg.fixedHeader().isRetain()) {
        return;
    }
    if (qos == AT_MOST_ONCE || msg.payload().readableBytes() == 0) {
        // QoS == 0 && retain => clean old retained
        retainedRepository.cleanRetained(topic);
        return;
    }
      private void publish2Subscribers(ByteBuf origPayload, Topic topic, MqttQoS publishingQos) {
    Set<Subscription> topicMatchingSubscriptions = subscriptions.matchQosSharpening(topic);
    for (final Subscription sub : topicMatchingSubscriptions) {
        MqttQoS qos = lowerQosToTheSubscriptionDesired(sub, publishingQos);
        Session targetSession = this.sessionRegistry.retrieve(sub.getClientId());

        boolean isSessionPresent = targetSession != null;
        if (isSessionPresent) {
            LOG.debug("Sending PUBLISH message to active subscriber CId: {}, topicFilter: {}, qos: {}, set : {}",
                      sub.getClientId(), sub.getTopicFilter(), qos, topicMatchingSubscriptions.toArray());
            // we need to retain because duplicate only copy r/w indexes and don't retain() causing refCnt = 0
            ByteBuf payload = origPayload.retainedDuplicate();
            targetSession.sendPublishOnSessionAtQos(topic, qos, payload, sub.getClientId());
        } else {
            // If we are, the subscriber disconnected after the subscriptions tree selected that session as a
            // destination.
            LOG.debug("PUBLISH to not yet present session. CId: {}, topicFilter: {}, qos: {}, set : {}", sub.getClientId(),
                      sub.getTopicFilter(), qos, topicMatchingSubscriptions.toArray());
        }
    }
}

An error is reported in this circled method, because when an exception occurs, the subsequent log is not printed, the captured error has no stack information, just NPE {}, Capture the information is a Java. Lang. NullPointerException: null 。and the client cannot connect, and the data sent on the connection will be NPE is reported at the location in the figure, and there will be this log, WARN 12938 --- [lettuce-eventExecutorLoop-1-4] i.n.u.c.SingleThreadEventExecutor: Unexpected exception from an event executor:

java.lang.OutOfMemoryError: GC overhead limit exceeded

hylkevds commented 2 months ago

Which version of moquette are you using? From the fact that you use public void internalPublish(MqttPublishMessage msg) it must be a pretty old version.

All version prior to 0.17 have known memory leaks, so Out Of Memory Errors are to be expected. Try upgrading to the latest version.

Yunustt commented 2 months ago

您使用的是哪个版本的 moquette?从您使用的事实来看,public void internalPublish(MqttPublishMessage msg)它肯定是一个相当旧的版本。

0.17 之前的所有版本都存在已知内存泄漏,因此可能会出现内存不足错误。请尝试升级到最新版本。

0.13

Yunustt commented 2 months ago

0.17 version Fixed the memory leak problem?

hylkevds commented 2 months ago

yes, as far as we know.