eclipse-paho / paho.mqtt.java

Eclipse Paho Java MQTT client library. Paho is an Eclipse IoT project.
https://eclipse.org/paho
Other
2.14k stars 887 forks source link

ClassCastException storing message in persistBufferedMessage #1037

Open albertoalcolea opened 5 months ago

albertoalcolea commented 5 months ago

Please fill out the form below before submitting, thank you!

This bug relates to:

And this fix:

Under some circunstances, a SUBSCRIBE or UNSUBSCRIBE message might be persisted in the persistBufferedMessage is set when the client has lost the connection with the server.

public void sendNoWait(MqttWireMessage message, MqttToken token) throws MqttException {
    final String methodName = "sendNoWait";
    if (isConnected() ||
            (!isConnected() && message instanceof MqttConnect) ||
            (isDisconnecting() && message instanceof MqttDisconnect)) {
        if(disconnectedMessageBuffer != null && disconnectedMessageBuffer.getMessageCount() != 0){
            //@TRACE 507=Client Connected, Offline Buffer available, but not empty. Adding message to buffer. message={0}
            log.fine(CLASS_NAME, methodName, "507", new Object[] {message.getKey()});
            if(disconnectedMessageBuffer.isPersistBuffer()){
                if (message instanceof MqttPublish) {
                    this.clientState.persistBufferedMessage(message);
                }
            }
            disconnectedMessageBuffer.putMessage(message, token);
        } else {
            this.internalSend(message, token);
        }
    } else if(disconnectedMessageBuffer != null) {
        //@TRACE 508=Offline Buffer available. Adding message to buffer. message={0}
        log.fine(CLASS_NAME, methodName, "508", new Object[] {message.getKey()});
        if(disconnectedMessageBuffer.isPersistBuffer()){
            this.clientState.persistBufferedMessage(message);
        }
        disconnectedMessageBuffer.putMessage(message, token);
    } else {
        //@TRACE 208=failed: not connected
        log.fine(CLASS_NAME, methodName, "208");
        throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
    }
}

In this scenario, the message enters in the second branch of the conditional: else if (disconnectedMessageBuffer != null) { ... And the code tries to persist it in the buffer, what ends up causing a ClassCastException when trying to store the message in the buffer as a MQTTPublish object in the ClientState class:

this.persistence.put(key, (MqttPublish)message);

My suggestion, which may be insufficient in case of a race condition, is to implement the same safe-check that was added in the issue 606 validating the message is an instance of the MqttPublish class:

    (...)
    } else if(disconnectedMessageBuffer != null) {
        //@TRACE 508=Offline Buffer available. Adding message to buffer. message={0}
        log.fine(CLASS_NAME, methodName, "508", new Object[] {message.getKey()});
        if(disconnectedMessageBuffer.isPersistBuffer()){
                        if (message instanceof MqttPublish) {
                            this.clientState.persistBufferedMessage(message);
                        }
        }
        disconnectedMessageBuffer.putMessage(message, token);
    } 
    (...)
zhubinsheng commented 2 months ago

m