mateusjunges / laravel-kafka

Use Kafka Producers and Consumers in your laravel app with ease!
https://junges.dev/documentation/laravel-kafka
MIT License
550 stars 80 forks source link

cant send message #294

Closed RezaZkr closed 1 month ago

RezaZkr commented 3 months ago

hi my laravel version is 11 php version 8.3.8 i can send data to specific topic with this custom class:

<?php
namespace App\Library\Kafka;

use RdKafka\Conf;
use RdKafka\Headers;
use RdKafka\Producer;

class Publish
{
    private $producer;

    public function __construct()
    {
        $conf = new Conf();
        $conf->set('bootstrap.servers', config('kafka.brokers'));
        $conf->set('message.max.bytes',209715200);

        $this->producer = new Producer($conf);
    }

    public function produce(string $topic, array $message, array $headers = [], string $key = null, $partition = null): void
    {
        $topic = $this->producer->newTopic($topic);
        $messageJson = json_encode($message);

        if (is_null($partition)) {
            $partition = RD_KAFKA_PARTITION_UA;
        }

        try {
            if (count($headers)) {
                $topic->producev($partition, 0, $messageJson, $key, $headers);
            } else {
                $topic->produce($partition, 0, $messageJson, $key);
            }

            $this->producer->flush(86400000);//24H
        } catch (RdKafka\Exception $e) {
            report($e);
            throw $e;
        }
    }
}

and when use this code:

try {

            $message = new \Junges\Kafka\Message\Message(
                headers: ['a' => 'b'],
                body: ['c' => 'd'],
                key: '1'
            );

            \Junges\Kafka\Facades\Kafka::publish()
                ->onTopic('test_topic')
                ->withMessage($message)
                ->usingSerializer(new \Junges\Kafka\Message\Serializers\JsonSerializer())
                ->send();

            return 'Success';

        } catch (ConsumerException|\Exception $e) {
            return 'Fail';
        }

receive success message but I can't see any message on the confluent panel. Below is a sample log.

%7|1719055458.966|WAKEUPFD|rdkafka#producer-20| [thrd:app]: broker:29092/bootstrap: Enabled low-latency ops queue wake-ups
%7|1719055458.966|BRKMAIN|rdkafka#producer-20| [thrd::0/internal]: :0/internal: Enter main broker thread
%7|1719055458.967|BROKER|rdkafka#producer-20| [thrd:app]: broker:29092/bootstrap: Added new broker with NodeId -1
%7|1719055458.967|CONNECT|rdkafka#producer-20| [thrd:app]: broker:29092/bootstrap: Selected for cluster connection: bootstrap servers added (broker has 0 connection attempt(s))
%7|1719055458.967|INIT|rdkafka#producer-20| [thrd:app]: librdkafka v2.0.2 (0x20002ff) rdkafka#producer-20 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer, GCC GXX INSTALL GNULD LDS C11THREADS LIBDL PLUGINS ZLIB SSL SASL_CYRUS ZSTD HDRHISTOGRAM LZ4_EXT SYSLOG RAPIDJSON SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, debug 0xfffff)
%7|1719055458.967|CONF|rdkafka#producer-20| [thrd:app]: Client configuration:
%7|1719055458.967|CONF|rdkafka#producer-20| [thrd:app]:   client.software.version = 2.0.2
%7|1719055458.967|CONF|rdkafka#producer-20| [thrd:app]:   metadata.broker.list = broker:29092
%7|1719055458.967|CONF|rdkafka#producer-20| [thrd:app]:   debug = generic,broker,topic,metadata,feature,queue,msg,protocol,cgrp,security,fetch,interceptor,plugin,consumer,admin,eos,mock,assignor,conf,all
%7|1719055458.967|CONF|rdkafka#producer-20| [thrd:app]:   log_level = 7
%7|1719055458.967|CONF|rdkafka#producer-20| [thrd:app]:   opaque = 0x7f4ea4736290
%7|1719055458.967|CONF|rdkafka#producer-20| [thrd:app]:   ssl_key = [redacted]
%7|1719055458.967|CONF|rdkafka#producer-20| [thrd:app]:   compression.codec = snappy
%7|1719055458.967|CONF|rdkafka#producer-20| [thrd:app]:   dr_msg_cb = 0x7f4ea6901160
%7|1719055458.967|TOPIC|rdkafka#producer-20| [thrd:app]: New local topic: test_topic
%7|1719055458.967|TOPPARNEW|rdkafka#producer-20| [thrd:app]: NEW test_topic [-1] 0x55558c1a6810 refcnt 0x55558c1a68a0 (at rd_kafka_topic_new0:468)
%7|1719055458.967|CONNECT|rdkafka#producer-20| [thrd:app]: Not selecting any broker for cluster connection: still suppressed for 49ms: leader query
%7|1719055458.967|METADATA|rdkafka#producer-20| [thrd:app]: Hinted cache of 1/1 topic(s) being queried
%7|1719055458.967|METADATA|rdkafka#producer-20| [thrd:app]: Skipping metadata refresh of 1 topic(s): leader query: no usable brokers
%7|1719055458.967|PURGE|rdkafka#producer-20| [thrd:app]: test_topic [-1]: purging queues (purge_flags 0x3, exclude xmit_msgq)
%7|1719055458.967|BRKMAIN|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Enter main broker thread
%7|1719055458.967|CONNECT|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Received CONNECT op
%7|1719055458.967|STATE|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Broker changed state INIT -> TRY_CONNECT
%7|1719055458.967|BROADCAST|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: Broadcasting state change
%7|1719055458.967|CONNECT|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: broker in state TRY_CONNECT connecting
%7|1719055458.967|STATE|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
%7|1719055458.967|BROADCAST|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: Broadcasting state change
%7|1719055458.967|CONNECT|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Connecting to ipv4#172.18.0.3:29092 (plaintext) with socket 9
%7|1719055458.967|CONNECT|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Connected to ipv4#172.18.0.3:29092
%7|1719055458.967|CONNECTED|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Connected (#1)
%7|1719055458.967|FEATURE|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1719055458.967|STATE|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1719055458.967|BROADCAST|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: Broadcasting state change
%7|1719055458.967|PURGE|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Purging queues with flags queue,inflight
%7|1719055458.967|PURGEQ|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Purged 0 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
%7|1719055458.967|PURGEQ|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Purged 0 message(s) from 0 partition(s)
%7|1719055458.968|PURGEQ|rdkafka#producer-20| [thrd:app]: Purged 0 message(s) from 0 UA-partition(s)
%7|1719055458.968|WAKEUP|rdkafka#producer-20| [thrd:app]: broker:29092/bootstrap: Wake-up: flushing
%7|1719055458.968|WAKEUP|rdkafka#producer-20| [thrd:app]: Wake-up sent to 1 broker thread in state >= UP: flushing
%7|1719055458.968|DESTROY|rdkafka#producer-20| [thrd:app]: Terminating instance (destroy flags none (0x0))
%7|1719055458.968|TERMINATE|rdkafka#producer-20| [thrd:app]: Interrupting timers
%7|1719055458.968|TERMINATE|rdkafka#producer-20| [thrd:app]: Sending TERMINATE to internal main thread
%7|1719055458.968|TERMINATE|rdkafka#producer-20| [thrd:app]: Joining internal main thread
%7|1719055458.968|TERMINATE|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Handle is terminating in state APIVERSION_QUERY: 3 refcnts (0x55558c13ac38), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 1 request(s) in retry+outbuf
%7|1719055458.968|TERMINATE|rdkafka#producer-20| [thrd:main]: Internal main thread terminating
%7|1719055458.968|DESTROY|rdkafka#producer-20| [thrd:main]: Destroy internal
%7|1719055458.968|BROADCAST|rdkafka#producer-20| [thrd:main]: Broadcasting state change
%7|1719055458.968|DESTROY|rdkafka#producer-20| [thrd:main]: Removing all topics
%7|1719055458.968|TOPPARREMOVE|rdkafka#producer-20| [thrd:main]: Removing toppar test_topic [-1] 0x55558c1a6810
%7|1719055458.968|DESTROY|rdkafka#producer-20| [thrd:main]: test_topic [-1]: 0x55558c1a6810 DESTROY_FINAL
%7|1719055458.968|DESTROY|rdkafka#producer-20| [thrd:main]: Sending TERMINATE to broker:29092/bootstrap
%7|1719055458.968|TERMINATE|rdkafka#producer-20| [thrd:main]: Purging reply queue
%7|1719055458.968|TERMINATE|rdkafka#producer-20| [thrd:main]: Decommissioning internal broker
%7|1719055458.968|TERMINATE|rdkafka#producer-20| [thrd:main]: Join 2 broker thread(s)
%7|1719055458.968|TERM|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Received TERMINATE op in state APIVERSION_QUERY: 2 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
%7|1719055458.968|TERM|rdkafka#producer-20| [thrd::0/internal]: :0/internal: Received TERMINATE op in state INIT: 2 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
%7|1719055458.968|FAIL|rdkafka#producer-20| [thrd::0/internal]: :0/internal: Client is terminating (after 1ms in state INIT) (_DESTROY)
%7|1719055458.968|STATE|rdkafka#producer-20| [thrd::0/internal]: :0/internal: Broker changed state INIT -> DOWN
%7|1719055458.968|FAIL|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Client is terminating (after 0ms in state APIVERSION_QUERY) (_DESTROY)
%7|1719055458.968|FEATURE|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Updated enabled protocol features -ApiVersion to 
%7|1719055458.968|STATE|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Broker changed state APIVERSION_QUERY -> DOWN
%7|1719055458.968|BROADCAST|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: Broadcasting state change
%7|1719055458.968|BUFQ|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Purging bufq with 0 buffers
%7|1719055458.968|BUFQ|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Purging bufq with 0 buffers
%7|1719055458.968|BUFQ|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Updating 0 buffers on connection reset
%7|1719055458.968|BRKTERM|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
%7|1719055458.968|TERMINATE|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Handle is terminating in state DOWN: 1 refcnts (0x55558c13ac38), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
%7|1719055458.968|FAIL|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
%7|1719055458.968|BUFQ|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Purging bufq with 0 buffers
%7|1719055458.968|BUFQ|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Purging bufq with 0 buffers
%7|1719055458.968|BUFQ|rdkafka#producer-20| [thrd:broker:29092/bootstrap]: broker:29092/bootstrap: Updating 0 buffers on connection reset
%7|1719055458.968|BROADCAST|rdkafka#producer-20| [thrd::0/internal]: Broadcasting state change
%7|1719055458.968|BUFQ|rdkafka#producer-20| [thrd::0/internal]: :0/internal: Purging bufq with 0 buffers
%7|1719055458.968|BUFQ|rdkafka#producer-20| [thrd::0/internal]: :0/internal: Purging bufq with 0 buffers
%7|1719055458.968|BUFQ|rdkafka#producer-20| [thrd::0/internal]: :0/internal: Updating 0 buffers on connection reset
%7|1719055458.968|BRKTERM|rdkafka#producer-20| [thrd::0/internal]: :0/internal: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
%7|1719055458.968|TERMINATE|rdkafka#producer-20| [thrd::0/internal]: :0/internal: Handle is terminating in state DOWN: 1 refcnts (0x55558c13a008), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
%7|1719055458.968|FAIL|rdkafka#producer-20| [thrd::0/internal]: :0/internal: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
%7|1719055458.968|BUFQ|rdkafka#producer-20| [thrd::0/internal]: :0/internal: Purging bufq with 0 buffers
%7|1719055458.968|BUFQ|rdkafka#producer-20| [thrd::0/internal]: :0/internal: Purging bufq with 0 buffers
%7|1719055458.968|BUFQ|rdkafka#producer-20| [thrd::0/internal]: :0/internal: Updating 0 buffers on connection reset
%7|1719055458.969|TERMINATE|rdkafka#producer-20| [thrd:main]: Internal main thread termination done
%7|1719055458.969|TERMINATE|rdkafka#producer-20| [thrd:app]: Destroying op queues
%7|1719055458.969|TERMINATE|rdkafka#producer-20| [thrd:app]: Termination done: freeing resources
= "Success"

below image when use my custom class and can send message successfully Screenshot_1

theaungmyatmoe commented 3 months ago

You don't need to configure the native RDKafka ext yourself. You just need to use producer and consumer from the package. You can find it on the doc. This might be mismatch between this package and rdkafka lib ext.

RezaZkr commented 3 months ago

@aungmyatmoethegreat No, it's not like that. I checked the package codes and found out that it uses php native rdkafka lib.

theaungmyatmoe commented 3 months ago

Correct, this package use RDKafka internally because RDKafka is defacto for PHP to communicate with Kafka Servers. But you don't need to use like that. You only need to use abstracted version of this package.

RezaZkr commented 3 months ago

@aungmyatmoethegreat You didn't understand my question. When I send a message with my custom class, everything is fine. But when I use the package to send a message, receive success but not shown on confluent panel.

theaungmyatmoe commented 3 months ago

I see your issue, it might be some issue with this package, I just tested with confluent in last month is it's fine. Check the sasl config.

mateusjunges commented 3 months ago

which version of laravel-kafka are you using?

RezaZkr commented 3 months ago

@mateusjunges V2

mateusjunges commented 1 month ago

Can you please try with the following docker-compose file? I have no issues publishing/consuming messages using v2.

---
version: '2'
services:

  broker:
    image: confluentinc/cp-kafka:7.7.0
    hostname: broker
    container_name: broker
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
      KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
      # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid" 
      # See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'

  schema-registry:
    image: confluentinc/cp-schema-registry:7.7.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

  connect:
    image: cnfldemos/cp-server-connect-datagen:0.6.4-7.6.0
    hostname: connect
    container_name: connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      # CLASSPATH required due to CC-2422
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.7.0.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR

  control-center:
    image: confluentinc/cp-enterprise-control-center:7.7.0
    hostname: control-center
    container_name: control-center
    depends_on:
      - broker
      - schema-registry
      - connect
      - ksqldb-server
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
      CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_CONNECT_HEALTHCHECK_ENDPOINT: '/connectors'
      CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
      CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021

  ksqldb-server:
    image: confluentinc/cp-ksqldb-server:7.7.0
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
      - connect
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_HOST_NAME: ksqldb-server
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      KSQL_KSQL_CONNECT_URL: "http://connect:8083"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'

  ksqldb-cli:
    image: confluentinc/cp-ksqldb-cli:7.7.0
    container_name: ksqldb-cli
    depends_on:
      - broker
      - connect
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

  ksql-datagen:
    image: confluentinc/ksqldb-examples:7.7.0
    hostname: ksql-datagen
    container_name: ksql-datagen
    depends_on:
      - ksqldb-server
      - broker
      - schema-registry
      - connect
    command: "bash -c 'echo Waiting for Kafka to be ready... && \
                       cub kafka-ready -b broker:29092 1 40 && \
                       echo Waiting for Confluent Schema Registry to be ready... && \
                       cub sr-ready schema-registry 8081 40 && \
                       echo Waiting a few seconds for topic creation to finish... && \
                       sleep 11 && \
                       tail -f /dev/null'"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      STREAMS_BOOTSTRAP_SERVERS: broker:29092
      STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
      STREAMS_SCHEMA_REGISTRY_PORT: 8081

  rest-proxy:
    image: confluentinc/cp-kafka-rest:7.7.0
    depends_on:
      - broker
      - schema-registry
    ports:
      - 8082:8082
    hostname: rest-proxy
    container_name: rest-proxy
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'