hivemq / hivemq-mqtt-client

HiveMQ MQTT Client is an MQTT 5.0 and MQTT 3.1.1 compatible and feature-rich high-performance Java client library with different API flavours and backpressure support
https://hivemq.github.io/hivemq-mqtt-client/
Apache License 2.0
855 stars 158 forks source link

Retry mqtt subscription #548

Closed rodmccutcheon closed 1 year ago

rodmccutcheon commented 1 year ago

We have a small microservice to read from a kafka topic and write to mqtt, using Spring Cloud Stream. It works fine, but after some time we get the following warning and no further messages are published to mqtt:

"2022-10-18 16:22:29.861 WARN 1 --- [d | tellus-mqtt] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-mqtt-2, groupId=mqtt] consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches r

There doesn't seem to be any warning/error from the mqtt client library.

Is there a way to programmatically resubscribe to mqtt or recover from this timeout?

Could we implement a custom health check for the actuator to include the mqtt subscription, and then the pod would get automatically restarted by k8s? Something like:

management:
  endpoint:
    health:
      group:
        liveness:
          include: livenessstate,mqtt

Where mqtt is the mqtt component.

EDIT: Here is the consumer code (OutputConfig class):

@Configuration
@Log4j2
@Profile("output")
public class OutputConfig {

    private final Mqtt3ReactorClient outboundMqttClient;
    private final Mqtt3ReactorClient outboundRootMqttClient;
    private final MeterUtils meterUtils;

    @Autowired
    public OutputConfig(@Qualifier("outboundMqttClient") Mqtt3ReactorClient outboundMqttClient,
                        @Qualifier("outboundRootMqttClient") Mqtt3ReactorClient outboundRootMqttClient,
                        MeterUtils meterUtils) {
        this.outboundMqttClient = outboundMqttClient;
        this.outboundRootMqttClient = outboundRootMqttClient;
        this.meterUtils = meterUtils;
        log.info("Starting Output Config!");
    }

    @Bean
    public Consumer<Flux<Output.GatewayNotification>> kafka() {
        return new Output(outboundMqttClient, meterUtils);
    }

    @Bean
    public Consumer<Flux<Output.GatewayNotification>> kafkaRoot() {
        return new Output(outboundRootMqttClient, meterUtils);
    }

}

And Output class:

@Log4j2
public class Output implements Consumer<Flux<Output.GatewayNotification>> {

    public static final HexFormat FORMAT = HexFormat.of().withDelimiter(" ").withUpperCase();
    private final Mqtt3ReactorClient outboundMqttClient;
    private final MeterUtils meterUtils;

    public Output(Mqtt3ReactorClient outboundMqttClient, MeterUtils meterUtils) {
        this.outboundMqttClient = outboundMqttClient;
        this.meterUtils = meterUtils;
    }

    @Override
    public void accept(Flux<Output.GatewayNotification> gatewayNotifications) {
        Flux<Mqtt3Publish> messagesToPublish = gatewayNotifications
                .map(gatewayNotification -> Mqtt3Publish.builder()
                        .topic(gatewayNotification.getAddress())
                        .qos(MqttQos.AT_LEAST_ONCE)
                        .payload(Base64.getDecoder().decode(gatewayNotification.getPayload()))
                        .build());

        outboundMqttClient.publish(messagesToPublish)
                .doOnNext(publishResult -> {
                    log.debug(
                            "Publish acknowledged: " + FORMAT.formatHex(publishResult.getPublish().getPayloadAsBytes()));
                    meterUtils.incrementCounter("output");
                })
                .doOnError(error -> log.error(error.getMessage()))
                .subscribe();
    }

    @Data
    public static class GatewayNotification {
        private String address;
        private String payload;
        private Long buildingId;
    }

HiveMqMqttConfig:

@Configuration
@Log4j2
public class HiveMqMqttConfig {

    @Value("${mqtt.endpointUrl}")
    private String endpointUrl;

    @Value("${mqtt.rootEndpointUrl}")
    private String rootEndpointUrl;

    @Value("${mqtt.inboundClientId}")
    private String inboundClientId;

    @Value("${mqtt.outboundClientId}")
    private String outboundClientId;

    @Value("${mqtt.caFilename:#{null}}")
    private String caFilename;

    @Value("${mqtt.inboundPrivateKeyFilename:#{null}}")
    private String inboundPrivateKeyFilename;

    @Value("${mqtt.inboundRootPrivateKeyFilename:#{null}}")
    private String inboundRootPrivateKeyFilename;

    @Value("${mqtt.inboundClientCertFilename:#{null}}")
    private String inboundClientCertFilename;

    @Value("${mqtt.inboundRootClientCertFilename:#{null}}")
    private String inboundRootClientCertFilename;

    @Value("${mqtt.outboundPrivateKeyFilename:#{null}}")
    private String outboundPrivateKeyFilename;

    @Value("${mqtt.outboundRootPrivateKeyFilename:#{null}}")
    private String outboundRootPrivateKeyFilename;

    @Value("${mqtt.outboundClientCertFilename:#{null}}")
    private String outboundClientCertFilename;

    @Value("${mqtt.outboundRootClientCertFilename:#{null}}")
    private String outboundRootClientCertFilename;

    @Bean(name = "inboundMqttClient")
    public Mqtt3ReactorClient inboundMqttClient() {
        var client = Mqtt3ReactorClient.from(buildMqtt3Client(endpointUrl, UUID.randomUUID().toString(), caFilename, inboundPrivateKeyFilename, inboundClientCertFilename));
        connectClient(client);
        return client;
    }

    @Bean(name = "inboundRootMqttClient")
    public Mqtt3ReactorClient inboundRootMqttClient() {
        var client = Mqtt3ReactorClient.from(buildMqtt3Client(rootEndpointUrl, UUID.randomUUID().toString(), caFilename, inboundRootPrivateKeyFilename, inboundRootClientCertFilename));
        connectClient(client);
        return client;
    }

    @Bean(name = "outboundMqttClient")
    public Mqtt3ReactorClient outboundMqttClient() {
        var client = Mqtt3ReactorClient.from(buildMqtt3Client(endpointUrl, UUID.randomUUID().toString(), caFilename, outboundPrivateKeyFilename, outboundClientCertFilename));
        connectClient(client);
        return client;
    }

    @Bean(name = "outboundRootMqttClient")
    public Mqtt3ReactorClient outboundRootMqttClient() {
        var client = Mqtt3ReactorClient.from(buildMqtt3Client(rootEndpointUrl, UUID.randomUUID().toString(), caFilename, outboundRootPrivateKeyFilename, outboundRootClientCertFilename));
        connectClient(client);
        return client;
    }

    private Mqtt3Client buildMqtt3Client(String endpointUrl, String clientId, String caFilename, String privateKeyFilename, String clientCertFilename) {
        log.info("Creating mqtt3 client with client id: {}", clientId);
        // endpoint is in the form 'protocol://host:port'
        String[] endpointUrlComponents = endpointUrl.split(":");
        String host = endpointUrlComponents[1].substring(2);
        int port = Integer.parseInt(endpointUrlComponents[2]);

        Mqtt3ClientBuilder mqtt3ClientBuilder = Mqtt3Client.builder()
                .identifier(clientId)
                .serverHost(host)
                .serverPort(port)
                .automaticReconnectWithDefaultConfig();

        try {
            if (caFilename != null && !caFilename.isEmpty()) {
                boolean isUsingKeyBasedAuthentication = privateKeyFilename != null && !privateKeyFilename.isEmpty() && clientCertFilename != null && !clientCertFilename.isEmpty();

                PemFileSslContext context
                        = isUsingKeyBasedAuthentication
                        ? new PemFileSslContext(getStreamFromClassPathOrLocal(caFilename), getStreamFromClassPathOrLocal(privateKeyFilename), getStreamFromClassPathOrLocal(clientCertFilename))
                        : new PemFileSslContext(new ClassPathResource(caFilename).getInputStream());

                context.getSocketFactory();
                mqtt3ClientBuilder
                        .sslConfig()
                        .keyManagerFactory(context.getKeyManagerFactory())
                        .trustManagerFactory(context.getTrustManagerFactory())
                        .applySslConfig();

            }
        } catch (IOException | NoSuchAlgorithmException | KeyStoreException | CertificateException |
                 InvalidKeySpecException | UnrecoverableKeyException | PemFileSslContext.SocketFactoryCreationFailedException e) {
            throw new RuntimeException(e);
        }

        return mqtt3ClientBuilder.build();
    }

    private InputStream getStreamFromClassPathOrLocal(String uri) throws IOException {
        return new ClassPathResource(uri).getInputStream();
    }

    private void connectClient(Mqtt3ReactorClient mqtt3ReactorClient) {
        Mono<Mqtt3ConnAck> connAckSingle = mqtt3ReactorClient.connect();

        connAckSingle
                .doOnSuccess(connAck -> log.info("Connected, " + connAck.getReturnCode()))
                .doOnError(throwable -> log.info("Connection failed, " + throwable.getMessage()))
                .subscribe();
    }
}

config:

management:
  endpoint:
    health:
      group:
        liveness:
          include: livenessstate,kafkaConsumers

spring:
  cloud:
    stream:
      kafka:
        bindings:
          kafka-in-0:
            consumer:
              configuration:
                max.poll.records: 10
          kafkaRoot-in-0:
            consumer:
              configuration:
                max.poll.records: 10
      function:
        definition: kafka;kafkaRoot
      bindings:
        kafka-in-0:
          destination: output
          group: mqtt
          consumer:
            concurrency: 1
        kafkaRoot-in-0:
          destination: output
          group: mqtt-root
          consumer:
            concurrency: 1

... (certs/endpoints omitted)

Should we be implementing a retry in the connectClient method?

pglombardo commented 1 year ago

Hi @rodmccutcheon - have you had any progress on this?

This seems like a backpressure issue to me where messages come in from kafka quickly but then get backed up my message processing and pushing out over MQTT.

That kafka message is the result from the backpressure. The common/easy fix is to increase the max.poll.interval.ms setting in Kafka.

Is there a way to programmatically resubscribe to mqtt or recover from this timeout? Should we be implementing a retry in the connectClient method?

Unless you are having connection issues, I don't think resubscribe or connection retries are the issue. I would think focusing on message throughput would have a greater ROI.

Let me know what the latest is on this issue.

pglombardo commented 1 year ago

Since I haven't heard back, I'm going to close out this issue. If anything remains, please feel free to re-open or file another issue. We'd be happy to help out!