quarkusio / quarkus

Quarkus: Supersonic Subatomic Java.
https://quarkus.io
Apache License 2.0
13.57k stars 2.63k forks source link

Kafka - redpanda - add host Ip for advertside listener #42465

Open dcdh opened 1 month ago

dcdh commented 1 month ago

Description

Current behavior

Red panda dev service is using this kind of code to define the kafka advertside listener

        addresses.add(String.format("OUTSIDE://%s:%d", getHost(), getMappedPort(DevServicesKafkaProcessor.KAFKA_PORT)));

getHost() return localhost and it is problematic when I want to use debezium as a container because debezium will try to connect using localhost + mapped port (from host) and the localhost used inside the debezium container refer to the container localhost (127.0.0.1) and not the host's localhost (which is not 127.0.0.1 and should refer to host ip).

Multiple listener can be defined. Is it possible to add the one using the docker host ip ?

Maybe it should be nice to do the same for other kafka containers (Strimzi,...)

Implementation ideas

Implementation could be done this way regarding the RedPandaKafkaContainer

        addresses.add(String.format("OUTSIDE_HOST://%s:%d", InetAddress.getLocalHost().getHostAddress(), getMappedPort(DevServicesKafkaProcessor.KAFKA_PORT)));

So we should get something likes this

    private String getKafkaAdvertisedAddresses() {
        List<String> addresses = new ArrayList<>();
        if (useSharedNetwork) {
            addresses.add(String.format("PLAINTEXT://%s:29092", hostName));
        }
        // See https://github.com/quarkusio/quarkus/issues/21819
        // Kafka is always exposed to the Docker host network
        addresses.add(String.format("OUTSIDE://%s:%d", getHost(), getMappedPort(DevServicesKafkaProcessor.KAFKA_PORT)));
        addresses.add(String.format("OUTSIDE_HOST://%s:%d", InetAddress.getLocalHost().getHostAddress(), getMappedPort(DevServicesKafkaProcessor.KAFKA_PORT)));
        return String.join(",", addresses);
    }

warning: I am not sure about the naming.

Thanks. Damien

No response

quarkus-bot[bot] commented 1 month ago

/cc @alesj (kafka), @cescoffier (kafka), @ozangunalp (kafka)

dcdh commented 1 month ago

As a quick win I tried to update the localhost defined inside /etc/hosts - to bind it to the host ip - this way:

    public DebeziumConnectContainer(final DockerImageName dockerImageName, final DebeziumConnectorDevService.KafkaNetworkResolution kafkaNetworkResolution) {
        super(dockerImageName);
        Validate.validState(dockerImageName.isCompatibleWith(DockerImageName.parse("debezium/connect")));
        this.kafkaNetworkResolution = Objects.requireNonNull(kafkaNetworkResolution);
        this.withExposedPorts(DEBEZIUM_REST_API_PORT);
        this.withNetwork(Network.SHARED);
        this.withAccessToHost(true);// ok je peux accéder aux service exposé sur le host !
        this.withCreateContainerCmdModifier(cmd -> {
            cmd.withEntrypoint("sh");
        });
        this.withCommand("-c", "while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT);
        this.withLogConsumer(frameConsumer());
        this.waitingFor(Wait.forLogMessage(".*Finished starting connectors and tasks.*", 1));
    }

    @Override
    protected void containerIsStarting(InspectContainerResponse containerInfo, boolean reused) {
        super.containerIsStarting(containerInfo, reused);

        // localhost inside docker must refer to the docker host ip
        // https://github.com/quarkusio/quarkus/issues/21819
        // The advertise address is defined to localhost kafka external port
        //language=sh
        final String fixBatch = """
                #!/bin/sh
                export BOOTSTRAP_SERVERS=%s
                export KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:/opt/kafka/config/connect-log4j.properties
                export KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
                export VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
                export GROUP_ID=1
                export CONFIG_STORAGE_TOPIC=my_connect_configs
                export OFFSET_STORAGE_TOPIC=my_connect_offsets
                export STATUS_STORAGE_TOPIC=my_connect_statuses
                # hack here - localhost will refer to host ip
                echo "%s localhost" > /etc/hosts
                /docker-entrypoint.sh
                """.formatted(kafkaNetworkResolution.toBootstrapServers(), kafkaNetworkResolution.hostIpAddress());
        //noinspection OctalInteger
        copyFileToContainer(
                Transferable.of(fixBatch.getBytes(StandardCharsets.UTF_8), 0777),
                STARTER_SCRIPT);
    }

But it is not working because I need to be root to update /etc/hosts Got permission denied

I've tried with a busybox container and telnet works.