eclipse-hono / hono

Eclipse Hono™ Project
https://eclipse.dev/hono
Eclipse Public License 2.0
449 stars 139 forks source link

ExampleApplication with Kafka configuration problems #3617

Closed ramonunibz closed 3 months ago

ramonunibz commented 3 months ago

I can't figure out how to run the example application with Kafka found under hono/examples/hono-client-examples/src/main/java/org/eclipse/hono/vertx/example/HonoExampleApplication.java. I know the application comes with the default configuration for AMQP, so I set USE_KAFKA to true, HONO_CLIENT_USER to hono, HONO_CLIENT_PASSWORD to hono-secret. Two things that I changed from HonoExampleConstants are HONO_KAFKA_CONSUMER_PORT to 9094 (I also tried keeping it 9092), as it is the port I used to access also in the jar file used in the getting started tutorial, and the TENANT_ID to 953805d9-4e6e-4709-a2b1-532768f318f6 which is the one I sent the telemetry data from.

This is how I send telemetry data:

This is how I start the jar where I also receive the messages:

java -jar hono-cli-2.5.0-exec.jar app --host 127.0.0.1 --port 9094 -u hono -p hono-secret --ca-file truststore.pem --disable-hostname-verification consume --tenant 953805d9-4e6e-4709-a2b1-532768f318f6

my pods: kubectl get service -n hono NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE eclipse-hono-adapter-amqp LoadBalancer 10.110.31.254 127.0.0.1 5671:32667/TCP 35d eclipse-hono-adapter-http LoadBalancer 10.109.89.167 127.0.0.1 8443:31804/TCP 35d eclipse-hono-adapter-mqtt LoadBalancer 10.107.45.71 127.0.0.1 8883:31383/TCP 35d eclipse-hono-kafka ClusterIP 10.107.123.29 9092/TCP,9095/TCP 35d eclipse-hono-kafka-controller-0-external LoadBalancer 10.111.115.140 127.0.0.1 9094:30555/TCP 35d eclipse-hono-kafka-controller-headless ClusterIP None 9094/TCP,9092/TCP,9093/TCP 35d eclipse-hono-service-auth ClusterIP 10.102.202.160 5671/TCP,8088/TCP 35d eclipse-hono-service-command-router ClusterIP 10.99.9.107 5671/TCP 35d eclipse-hono-service-device-registry ClusterIP 10.99.199.48 5671/TCP,8080/TCP,8443/TCP 35d eclipse-hono-service-device-registry-ext LoadBalancer 10.107.122.10 127.0.0.1 28443:31906/TCP 35d eclipse-hono-service-device-registry-headless ClusterIP None 35d image

Thank you in advance :)

sophokles73 commented 3 months ago

The example application code seems quite outdated and still defaults to AMQP 1.0 messaging infrastructure. You already figured out that you need to set some environment variables to use Kafka instead.

You will also need to

BTW we would be happy to accept a PR that fixes the example code accordingly :-)

ramonunibz commented 3 months ago

I will do just that. At the moment, I seem to have a problem with the createEventConsumer() and createTelemetryConsumer() methods, which do not throw any error and do not succeed. This is probably due to some misconfiguration in the Kafka client.

ramonunibz commented 3 months ago

So I've finished to set up the example application, the only problem is that I've adapted it just for the use case of Kafka. I'll nevertheless copy the file here for anyone interested!

/*******************************************************************************
 * Copyright (c) 2016, 2023 Contributors to the Eclipse Foundation
 *
 * See the NOTICE file(s) distributed with this work for additional
 * information regarding copyright ownership.
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License 2.0 which is available at
 * http://www.eclipse.org/legal/epl-2.0
 *
 * SPDX-License-Identifier: EPL-2.0
 *******************************************************************************/

package org.eclipse.hono.vertx.example.base;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionException;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.eclipse.hono.application.client.ApplicationClient;
import org.eclipse.hono.application.client.DownstreamMessage;
import org.eclipse.hono.application.client.MessageConsumer;
import org.eclipse.hono.application.client.MessageContext;
import org.eclipse.hono.application.client.kafka.impl.KafkaApplicationClientImpl;
import org.eclipse.hono.client.kafka.CommonKafkaClientConfigProperties;
import org.eclipse.hono.client.kafka.consumer.MessagingKafkaConsumerConfigProperties;
import org.eclipse.hono.client.kafka.producer.CachingKafkaProducerFactory;
import org.eclipse.hono.client.kafka.producer.MessagingKafkaProducerConfigProperties;
import org.eclipse.hono.config.FileFormat;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.quarkus.runtime.Quarkus;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;

/**
 * Example base class for consuming telemetry and event data from devices connected to Hono and sending commands to these devices.
 * <p>
 * This class implements all necessary code to get Hono's messaging consumer client and Hono's command client running.
 * <p>
 * The code consumes data until it receives any input on its console (which finishes it and closes vertx).
 */
@SuppressFBWarnings(
        value = { "HARD_CODE_PASSWORD", "PREDICTABLE_RANDOM" },
        justification = """
                We use the default passwords of the Hono Sandbox installation throughout this class
                for ease of use. The passwords are publicly documented and do not affect any
                private installations of Hono.
                The values returned by the Random are only used as arbitrary values in example message
                payload.
                """)
public class HonoExampleApplicationBase {

    public static final String HONO_CLIENT_USER = System.getProperty("username", "hono");
    public static final String HONO_CLIENT_PASSWORD = System.getProperty("password", "hono-secret");
    public static final Boolean USE_PLAIN_CONNECTION =
            Boolean.valueOf(System.getProperty("plain.connection", "false"));
    public static final Boolean SEND_ONE_WAY_COMMANDS =
            Boolean.valueOf(System.getProperty("sendOneWayCommands", "false"));
    public static final Boolean USE_KAFKA = Boolean.valueOf(System.getProperty("kafka", "true"));
    private static final String MESSAGE_TYPE_TELEMETRY = "telemetry";
    private static final String MESSAGE_TYPE_EVENT = "event";

    ApplicationClient<? extends MessageContext> client;
    private final Set<String> supportedMessageTypes = new HashSet<>();
    private final Vertx vertx = Vertx.vertx();

    /**
     * Helper method to set credentials.
     */
    private String scramJaasConfig(final String username, final String password) {
        return """
                %s required username="%s" password="%s";
                """.formatted(ScramLoginModule.class.getName(), username, password);
    }
    /**
     * Creates an application client for Kafka based messaging. Unlike with AMQP, the Kafka clients manage their
     * connections to the cluster internally.
     * <p>
     * NB: if you want to integrate this code with your own software, it might be necessary to copy the trust store to
     * your project as well and adopt the file path.
     */

    Future<KafkaApplicationClientImpl> createKafkaClient() {

        final var commonProps = new HashMap<String, String>();
        final String bootstrapServers;
        //change path to match location of truststore.pem.
        final String path = "truststore.pem";
        commonProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, path);

        Optional.ofNullable(FileFormat.detect(path))
            .ifPresent(fileFormat -> commonProps.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, fileFormat.name()));

        bootstrapServers = "127.0.0.1:9094";
        commonProps.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        commonProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
        commonProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name);
        commonProps.put(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_512.mechanismName());
        commonProps.put(
                SaslConfigs.SASL_JAAS_CONFIG,
                scramJaasConfig("hono", "hono-secret"));

        final var commonClientConfig = new CommonKafkaClientConfigProperties();
        commonClientConfig.setCommonClientConfig(commonProps);

        final var consumerProps = new MessagingKafkaConsumerConfigProperties();
        consumerProps.setCommonClientConfig(commonClientConfig);
        final var producerProps = new MessagingKafkaProducerConfigProperties();
        producerProps.setCommonClientConfig(commonClientConfig);

        System.err.printf("Connecting to Kafka based messaging infrastructure [%s]%n", bootstrapServers);
        final var kafkaClient = new KafkaApplicationClientImpl(
                vertx,
                consumerProps,
                CachingKafkaProducerFactory.sharedFactory(vertx),
                producerProps);
        return startClientAndWaitForReadiness(kafkaClient)
                .map(ok -> {
                    this.client = kafkaClient;
                    return kafkaClient;
                }).onSuccess(ok -> System.out.println("client started"))
                .onFailure(err -> System.out.println("failed to create client " + err));
    }

    private Future<Void> startClientAndWaitForReadiness(final ApplicationClient<? extends MessageContext> client) {
        final Promise<Void> readyTracker = Promise.promise();
        client.addOnClientReadyHandler(result -> {
            if (result.succeeded()) {
                readyTracker.complete();
                System.out.println("completed"); // Client is ready, complete the promise
            } else {
                readyTracker.fail(result.cause());
                System.out.println("failed"); // Error occurred, fail the promise
            }
        });
        return client.start()
                .compose(ok -> readyTracker.future());
    }

    /**
     * Creates an application client for Kafka based messaging. Unlike with AMQP, the Kafka clients manage their
     * connections to the cluster internally.
     * <p>
     * NB: if you want to integrate this code with your own software, it might be necessary to copy the trust store to
     * your project as well and adopt the file path.
     * @return Application
     */
    public Future<ApplicationClient<? extends MessageContext>> getApplicationClient() {
        final Promise<ApplicationClient<? extends MessageContext>> result = Promise.promise();
        if (client != null) {
            result.complete(client);
        } else if (!USE_KAFKA) {
        } else {
            createKafkaClient()
                .onSuccess(result::complete)
                .onFailure(result::fail);
        }
        return result.future();
    }

    /**
     * Start the application client and set the message handling method to treat data that is received.
     */
    protected void consumeData() {
        supportedMessageTypes.add(MESSAGE_TYPE_EVENT);
        supportedMessageTypes.add(MESSAGE_TYPE_TELEMETRY);

        try {
            getApplicationClient()
                .compose(this::createConsumers)
                .onSuccess(ok -> System.err.println("""
                        Consuming messages for tenant [%s], ctrl-c to exit.
                        """.formatted(HonoExampleConstants.TENANT_ID)))
                .toCompletionStage()
                .toCompletableFuture()
                .join();
            Quarkus.waitForExit();
            return;
        } catch (final CompletionException e) {
            System.out.println(e.getCause());
            System.err.println("failed to create message consumer(s): %s".formatted(e.getMessage()));
            return;
        }
    }

    /**
     * Creates a consumer to consume messages.
     */

    //consumer
    private Future<Void> createConsumers(final ApplicationClient<? extends MessageContext> client) {

        final Handler<Throwable> closeHandler = cause -> {
            System.err.println("peer has closed message consumer(s) unexpectedly, trying to reopen ...");
            vertx.setTimer(1000L, reconnect -> {
                createConsumers(client);
            });
        };

        final List<Future<MessageConsumer>> consumerFutures = new ArrayList<>();
        if (supportedMessageTypes.contains(MESSAGE_TYPE_EVENT)) {
            consumerFutures.add(
                    client.createEventConsumer(
                            HonoExampleConstants.TENANT_ID,
                            msg -> printMessage(MESSAGE_TYPE_EVENT, msg),
                            closeHandler));
        }

        if (supportedMessageTypes.contains(MESSAGE_TYPE_TELEMETRY)) {
            consumerFutures.add(
                    client.createTelemetryConsumer(
                            HonoExampleConstants.TENANT_ID,
                            msg -> printMessage(MESSAGE_TYPE_TELEMETRY, msg),
                            closeHandler));
        }

        return Future.all(consumerFutures)
                .mapEmpty();
    }

    private void printMessage(final String endpoint, final DownstreamMessage<? extends MessageContext> message) {

        System.out.println("%s %s %s %s %s".formatted(
                endpoint.charAt(0),
                message.getDeviceId(),
                Optional.ofNullable(message.getContentType()).orElse("-"),
                Optional.ofNullable(message.getPayload())
                    .map(Buffer::toString)
                    .orElse("-"),
                message.getProperties().getPropertiesMap()));
    }
}