smallrye / smallrye-reactive-messaging

SmallRye Reactive Messaging
http://www.smallrye.io/smallrye-reactive-messaging/
Apache License 2.0
241 stars 179 forks source link

MQTT consumer silently stops consuming data #2764

Open marc-batto opened 1 month ago

marc-batto commented 1 month ago

Hi,

We've been using the mqtt functionality for the last couple of months and are really happy with it so far. Yesterday however the integration stopped working all of a sudden and I am not able to figure out what is going wrong. Our usecase is to simply send whatever comes in from the MQTT stream into an eventhub on Azure to do some processing over there. The code itself looks like this:

import io.smallrye.mutiny.Uni;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

@ApplicationScoped
public class MqttRepublisher {

    @Incoming("mqtt-topic")
    @Outgoing("eventhub-egress")
    public Uni<byte[]> republish(byte[] byteList) {
        return Uni.createFrom().item(byteList);
    }
}

The application.properties looks like this:

%prod.mp.messaging.incoming.mqtt-topic.ssl=true
%prod.mp.messaging.incoming.mqtt-topic.ssl.keystore.location=${CERT}
%prod.mp.messaging.incoming.mqtt-topic.ssl.keystore.password=${KEY}
mp.messaging.incoming.mqtt-topic.topic=${MQTT_TOPIC}
%prod.mp.messaging.incoming.mqtt-topic.ssl.keystore.type=PKCS12
%prod.mp.messaging.incoming.mqtt-topic.ssl.truststore.type=PKCS12
mp.messaging.incoming.mqtt-topic.type=smallrye-mqtt
mp.messaging.incoming.mqtt-topic.host=${MQTT_HOSTNAME}
mp.messaging.incoming.mqtt-topic.port=${MQTT_PORT}
mp.messaging.incoming.mqtt-topic.max-message-size=600000
%prod.mp.messaging.incoming.mqtt-topic.client-id=client-id
%prod.mp.messaging.incoming.mqtt-topic.username=client-id
mp.messaging.incoming.mqtt-topic.qos=1

mp.messaging.outgoing.eventhub-egress.topic=${MQTT_EVENTHUB_TOPIC}
mp.messaging.outgoing.eventhub-egress.connector=smallrye-kafka
mp.messaging.outgoing.eventhub-egress.key.serializer=org.apache.kafka.common.serialization.StringSerializer
%prod.mp.messaging.outgoing.eventhub-egress.bootstrap.servers=${EVENT_HUB_READINGS_BOOTSTRAP_SERVERS}
%prod.mp.messaging.outgoing.eventhub-egress.security.protocol=SASL_SSL
%prod.mp.messaging.outgoing.eventhub-egress.sasl.mechanism=PLAIN
%prod.mp.messaging.outgoing.eventhub-egress.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="${AZURE_EVENT_HUB_CONNECTION_STRING}";

In the logging I do not see anything happening at all, the MQTT broker sees there is an active connection. There are messages coming in into the broker on the topic.

When I restart the process it all works again as before, but it did cut off our data influx without any signs of going wrong. I would really like to know how this happened and how I could fix it.

Please let me know if I can supply you with any additional information that you need.

Java 21.0.3 maven 3.9.7 Running on a kubernetes cluster in Azure v 1.29.2 build container from -> ubi9/openjdk-21:1.20 Quarkus 3.11.3

Thanks!