conduktor / kafka-stack-docker-compose

docker compose files to create a fully working kafka stack
Apache License 2.0
3.24k stars 1.25k forks source link

KafkaConnectionError: 111 ECONNREFUSED #92

Closed CoinCoderBuffalo closed 3 years ago

CoinCoderBuffalo commented 3 years ago

I have an Airflow DAG calling a kafka-python producer. I am getting the error below trying to send a message from the Airflow container to the Kafka container.

I am using Docker Desktop 3.3.3 on a macbook pro. Airflow and Kafka are in separate docker container apps. Both container apps are using the same docker network, which is specified in the compose files:

networks:
  default:
    external: true
    name: ti-network

I am able to connect to the Kafka container using the conduktor client on my mac pointing to 127.0.0.1:9092

Here is the error message I see in Airflow:

[2021-07-09 17:27:20,322] {conn.py:381} INFO - <BrokerConnection node_id=bootstrap-0 host=kafka1:9092 <connecting> [IPv4 ('172.26.0.9', 9092)]>: connecting to kafka1:9092 [('172.26.0.9', 9092) IPv4]
[2021-07-09 17:27:20,323] {conn.py:1205} INFO - Probing node bootstrap-0 broker version
[2021-07-09 17:27:20,324] {conn.py:410} INFO - <BrokerConnection node_id=bootstrap-0 host=kafka1:9092 <connecting> [IPv4 ('172.26.0.9', 9092)]>: Connection complete.
[2021-07-09 17:27:20,432] {conn.py:1267} INFO - Broker version identified as 2.5.0
[2021-07-09 17:27:20,433] {conn.py:1269} INFO - Set configuration api_version=(2, 5, 0) to skip auto check_version requests on startup
[2021-07-09 17:27:20,589] {conn.py:381} INFO - <BrokerConnection node_id=1 host=127.0.0.1:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: connecting to 127.0.0.1:9092 [('127.0.0.1', 9092) IPv4]
[2021-07-09 17:27:20,590] {conn.py:419} ERROR - Connect attempt to <BrokerConnection node_id=1 host=127.0.0.1:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]> returned error 111. Disconnecting.
[2021-07-09 17:27:20,591] {conn.py:919} INFO - <BrokerConnection node_id=1 host=127.0.0.1:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: Closing connection. KafkaConnectionError: 111 ECONNREFUSED

Here is the Kafka compose file:

version: '2.1'

services:
  zoo1:
    image: zookeeper:3.4.9
    hostname: zoo1
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: 1
      ZOO_PORT: 2181
      ZOO_SERVERS: server.1=zoo1:2888:3888
    volumes:
      - ./zk-single-kafka-single/zoo1/data:/data
      - ./zk-single-kafka-single/zoo1/datalog:/datalog

  kafka1:
    image: confluentinc/cp-kafka:5.5.1
    hostname: kafka1
    ports:
      - "9092:9092"
      - "9999:9999"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_JMX_PORT: 9999
      KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
      KAFKA_CREATE_TOPICS: "TopicCrypto:1:1"
      # For testing small segments 16MB and retention of 128MB
      KAFKA_LOG_SEGMENT_BYTES: 16777216
      KAFKA_LOG_RETENTION_BYTES: 134217728
    volumes:
      - ./zk-single-kafka-single/kafka1/data:/var/lib/kafka/data
    depends_on:
      - zoo1

networks:
  default:
    external: true
    name: ti-network

Here is the kafka-python code:

from kafka import KafkaProducer
import random
import pickle
import os
import logging
from time import sleep
from json import dumps

TOPIC = 'TopicCrypto'
SERVER = 'kafka1:9092'

def generate_message(**kwargs):

    message = kwargs['message']

    logging.info("Message: ", message)

    producer = KafkaProducer(bootstrap_servers=[SERVER], 
        api_version=(2, 5, 0),
        value_serializer=lambda x: dumps(x).encode('utf-8')
    )

    logging.info("Partitions: ", producer.partitions_for(TOPIC))

    producer.send(TOPIC, value=message)

    logging.info("Sent message")

    producer.close()