redpanda-data / redpanda

Redpanda is a streaming data platform for developers. Kafka API compatible. 10x faster. No ZooKeeper. No JVM!
https://redpanda.com
9.38k stars 577 forks source link

kafka.errors.NodeNotReadyError: NodeNotReadyError #12628

Open xuarehere opened 1 year ago

xuarehere commented 1 year ago

Version & Environment

version: "3.7"
name: redpanda-quickstart
networks:
  redpanda_network:
    driver: bridge
volumes:
  redpanda-0: null
  redpanda-1: null
  redpanda-2: null

What went wrong?

I use this Redpanda Quickstart | Redpanda Docs to build RedPanda which can be used on a machine.However, when I want to send data from Machine 1 to Machine 2, I always encounter errors. What should I do?

kafka.errors.NodeNotReadyError: NodeNotReadyError

I am a newer to use the Redpanda. When I want to send data from Machine 1 to Machine 2, I always encounter errors, maybe I have config a wrrong file?

    controller_version = self._client.check_version(controller_id, timeout=(self.config['api_version_auto_timeout_ms'] / 1000))
  File "/home/hdd2T/C_cache/py/env/lib/python3.9/site-packages/kafka/client_async.py", line 909, in check_version
    version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter']))
  File "/home/hdd2T/C_cache/py/env/lib/python3.9/site-packages/kafka/conn.py", line 1240, in check_version
    raise Errors.NodeNotReadyError()
kafka.errors.NodeNotReadyError: NodeNotReadyError

What should have happened instead?

Case1: It works well Case2: Something wrong, Maybe I didn't read the document carefully. I am a newer to use the Redpanda, and I have read the document several times, but I still haven't found a suitable tutorial to let it work normally.

image

How to reproduce the issue?

I follow this tutorial: Redpanda Quickstart | Redpanda Docs

docker

docker-compose.yml


---
version: "3.7"
name: redpanda-quickstart
networks:
  redpanda_network:
    driver: bridge
volumes:
  redpanda-0: null
  redpanda-1: null
  redpanda-2: null
services:
  redpanda-0:
    command:
      - redpanda
      - start
      - --kafka-addr internal://0.0.0.0:9092,external://0.0.0.0:19092
      # Address the broker advertises to clients that connect to the Kafka API.
      # Use the internal addresses to connect to the Redpanda brokers'
      # from inside the same Docker network.
      # Use the external addresses to connect to the Redpanda brokers'
      # from outside the Docker network.
      - --advertise-kafka-addr internal://redpanda-0:9092,external://localhost:19092
      - --pandaproxy-addr internal://0.0.0.0:8082,external://0.0.0.0:18082
      # Address the broker advertises to clients that connect to the HTTP Proxy.
      - --advertise-pandaproxy-addr internal://redpanda-0:8082,external://localhost:18082
      - --schema-registry-addr internal://0.0.0.0:8081,external://0.0.0.0:18081
      # Redpanda brokers use the RPC API to communicate with eachother internally.
      - --rpc-addr redpanda-0:33145
      - --advertise-rpc-addr redpanda-0:33145
      # Tells Seastar (the framework Redpanda uses under the hood) to use 1 core on the system.
      - --smp 1
      # The amount of memory to make available to Redpanda.
      - --memory 1G
      # Mode dev-container uses well-known configuration properties for development in containers.
      - --mode dev-container
      # enable logs for debugging.
      - --default-log-level=debug
    image: docker.redpanda.com/redpandadata/redpanda:v23.2.3
    container_name: redpanda-0
    volumes:
      - redpanda-0:/var/lib/redpanda/data
    networks:
      - redpanda_network
    ports:
      - 18081:18081
      - 18082:18082
      - 19092:19092
      - 19644:9644
  redpanda-1:
    command:
      - redpanda
      - start
      - --kafka-addr internal://0.0.0.0:9092,external://0.0.0.0:29092
      - --advertise-kafka-addr internal://redpanda-1:9092,external://localhost:29092
      - --pandaproxy-addr internal://0.0.0.0:8082,external://0.0.0.0:28082
      - --advertise-pandaproxy-addr internal://redpanda-1:8082,external://localhost:28082
      - --schema-registry-addr internal://0.0.0.0:8081,external://0.0.0.0:28081
      - --rpc-addr redpanda-1:33145
      - --advertise-rpc-addr redpanda-1:33145
      - --smp 1
      - --memory 1G
      - --mode dev-container
      - --default-log-level=debug
      - --seeds redpanda-0:33145
    image: docker.redpanda.com/redpandadata/redpanda:v23.2.3
    container_name: redpanda-1
    volumes:
      - redpanda-1:/var/lib/redpanda/data
    networks:
      - redpanda_network
    ports:
      - 28081:28081
      - 28082:28082
      - 29092:29092
      - 29644:9644
    depends_on:
      - redpanda-0
  redpanda-2:
    command:
      - redpanda
      - start
      - --kafka-addr internal://0.0.0.0:9092,external://0.0.0.0:39092
      - --advertise-kafka-addr internal://redpanda-2:9092,external://localhost:39092
      - --pandaproxy-addr internal://0.0.0.0:8082,external://0.0.0.0:38082
      - --advertise-pandaproxy-addr internal://redpanda-2:8082,external://localhost:38082
      - --schema-registry-addr internal://0.0.0.0:8081,external://0.0.0.0:38081
      - --rpc-addr redpanda-2:33145
      - --advertise-rpc-addr redpanda-2:33145
      - --smp 1
      - --memory 1G
      - --mode dev-container
      - --default-log-level=debug
      - --seeds redpanda-0:33145
    image: docker.redpanda.com/redpandadata/redpanda:v23.2.3
    container_name: redpanda-2
    volumes:
      - redpanda-2:/var/lib/redpanda/data
    networks:
      - redpanda_network
    ports:
      - 38081:38081
      - 38082:38082
      - 39092:39092
      - 39644:9644
    depends_on:
      - redpanda-0
  console:
    container_name: redpanda-console
    image: docker.redpanda.com/redpandadata/console:v2.3.0
    networks:
      - redpanda_network
    entrypoint: /bin/sh
    command: -c 'echo "$$CONSOLE_CONFIG_FILE" > /tmp/config.yml; /app/console'
    environment:
      CONFIG_FILEPATH: /tmp/config.yml
      CONSOLE_CONFIG_FILE: |
        kafka:
          brokers: ["redpanda-0:9092"]
          schemaRegistry:
            enabled: true
            urls: ["http://redpanda-0:8081"]
        redpanda:
          adminApi:
            enabled: true
            urls: ["http://redpanda-0:9644"]
    ports:
      - 8080:8080
    depends_on:
      - redpanda-0

code

Case1:

admin.py

from kafka import KafkaAdminClient
from kafka.admin import NewTopic
from kafka.admin import KafkaAdminClient, NewTopic
class ChatAdmin:
  def __init__(self, brokers):
    self.admin = KafkaAdminClient(bootstrap_servers=brokers)
  def topic_exists(self, topic_name):
    topics_metadata = self.admin.list_topics()
    return topic_name in topics_metadata
  def create_topic(self, topic_name, num_partitions=1, replication_factor=1):
    if not self.topic_exists(topic_name):
      new_topic = NewTopic(name=topic_name, num_partitions=num_partitions, replication_factor=replication_factor)
      self.admin.create_topics([new_topic])
      print(f"Topic {topic_name} created.")
    else:
      print(f"Topic {topic_name} already exists.")
  def close(self):
    self.admin.close()

app.py

import threading
from producer import ChatProducer
from consumer import ChatConsumer
from admin import ChatAdmin
brokers = ["localhost:19092"]
topic = "chat-room"
def consumer_thread(consumer):
  consumer.print_messages()
if __name__ == "__main__":
  admin = ChatAdmin(brokers)
  if not admin.topic_exists(topic):
    print(f"Creating topic: {topic}")
    admin.create_topic(topic)
  username = input("Enter your username: ")
  producer = ChatProducer(brokers, topic)
  consumer = ChatConsumer(brokers, topic)
  consumer_t = threading.Thread(target=consumer_thread, args=(consumer,))
  consumer_t.daemon = True
  consumer_t.start()
  print("Connected. Press Ctrl+C to exit")
  try:
    while True:
      message = input()
      producer.send_message(username, message)
  except KeyboardInterrupt:
    pass
  finally:
    print("\nClosing chat...")
    producer.close()
    consumer.close()
    admin.close()
    consumer_t.join(1)

consumer.py

from kafka import KafkaConsumer
import json
import uuid
class ChatConsumer:
  def __init__(self, brokers, topic, group_id=None):
    if group_id is None:
      group_id = str(uuid.uuid4())
    self.consumer = KafkaConsumer(
      topic,
      bootstrap_servers=brokers,
      group_id=group_id,
      value_deserializer=lambda m: json.loads(m.decode("utf-8")),
    )
  def print_messages(self):
    for msg in self.consumer:
      print(f"{msg.value['user']}: {msg.value['message']}")
  def close(self):
    self.consumer.close()

producer.py

from kafka import KafkaProducer
import json
class ChatProducer:
  def __init__(self, brokers, topic):
    self.topic = topic
    self.producer = KafkaProducer(
      bootstrap_servers=brokers,
      value_serializer=lambda v: json.dumps(v).encode("utf-8"),
    )
  def send_message(self, user, message):
    self.producer.send(self.topic, {"user": user, "message": message})
    self.producer.flush()
  def close(self):
    self.producer.close()
Case2:

admin.py

from kafka import KafkaAdminClient
from kafka.admin import NewTopic
from kafka.admin import KafkaAdminClient, NewTopic
class ChatAdmin:
  def __init__(self, brokers):
    self.admin = KafkaAdminClient(bootstrap_servers=brokers)
  def topic_exists(self, topic_name):
    topics_metadata = self.admin.list_topics()
    return topic_name in topics_metadata
  def create_topic(self, topic_name, num_partitions=1, replication_factor=1):
    if not self.topic_exists(topic_name):
      new_topic = NewTopic(name=topic_name, num_partitions=num_partitions, replication_factor=replication_factor)
      self.admin.create_topics([new_topic])
      print(f"Topic {topic_name} created.")
    else:
      print(f"Topic {topic_name} already exists.")
  def close(self):
    self.admin.close()

app.py

import threading
from producer import ChatProducer
from consumer import ChatConsumer
from admin import ChatAdmin
brokers = ["10.3.3.101:19092"] 
topic = "chat-room"
def consumer_thread(consumer):
  consumer.print_messages()
if __name__ == "__main__":
  admin = ChatAdmin(brokers)
  if not admin.topic_exists(topic):
    print(f"Creating topic: {topic}")
    admin.create_topic(topic)
  username = input("Enter your username: ")
  producer = ChatProducer(brokers, topic)
  consumer = ChatConsumer(brokers, topic)
  consumer_t = threading.Thread(target=consumer_thread, args=(consumer,))
  consumer_t.daemon = True
  consumer_t.start()
  print("Connected. Press Ctrl+C to exit")
  try:
    while True:
      message = input()
      producer.send_message(username, message)
  except KeyboardInterrupt:
    pass
  finally:
    print("\nClosing chat...")
    producer.close()
    consumer.close()
    admin.close()
    consumer_t.join(1)

consumer.py

from kafka import KafkaConsumer
import json
import uuid
class ChatConsumer:
  def __init__(self, brokers, topic, group_id=None):
    if group_id is None:
      group_id = str(uuid.uuid4())
    self.consumer = KafkaConsumer(
      topic,
      bootstrap_servers=brokers,
      group_id=group_id,
      value_deserializer=lambda m: json.loads(m.decode("utf-8")),
    )
  def print_messages(self):
    for msg in self.consumer:
      print(f"{msg.value['user']}: {msg.value['message']}")
  def close(self):
    self.consumer.close()

producer.py

from kafka import KafkaProducer
import json
class ChatProducer:
  def __init__(self, brokers, topic):
    self.topic = topic
    self.producer = KafkaProducer(
      bootstrap_servers=brokers,
      value_serializer=lambda v: json.dumps(v).encode("utf-8"),
    )
  def send_message(self, user, message):
    self.producer.send(self.topic, {"user": user, "message": message})
    self.producer.flush()
  def close(self):
    self.producer.close()

JIRA Link: CORE-1388

github-actions[bot] commented 7 months ago

This issue hasn't seen activity in 3 months. If you want to keep it open, post a comment or remove the stale label – otherwise this will be closed in two weeks.

rockwotj commented 7 months ago

@xuarehere I would assume that either 10.3.3.101 isn't accessible from the other host OR that docker is exposing the port on that interface and only the local loopback.