ibis-project / ibis

the portable Python dataframe library
https://ibis-project.org
Apache License 2.0
5.08k stars 586 forks source link

bug: trouble connecting Redpanda (Kafka) to RisingWave as a source or sink #10183

Open lostmygithubaccount opened 1 hour ago

lostmygithubaccount commented 1 hour ago

What happened?

I'm in the Ibis repo. I can spinup RisingWave in a container with:

just up risingwave

I save the following as redpanda-compose.yml:

name: redpanda-quickstart-one-broker
networks:
  redpanda_network:
    driver: bridge
volumes:
  redpanda-0: null
services:
  redpanda-0:
    command:
      - redpanda
      - start
      - --kafka-addr internal://0.0.0.0:9092,external://0.0.0.0:19092
      # Address the broker advertises to clients that connect to the Kafka API.
      # Use the internal addresses to connect to the Redpanda brokers'
      # from inside the same Docker network.
      # Use the external addresses to connect to the Redpanda brokers'
      # from outside the Docker network.
      - --advertise-kafka-addr internal://redpanda-0:9092,external://localhost:19092
      - --pandaproxy-addr internal://0.0.0.0:8082,external://0.0.0.0:18082
      # Address the broker advertises to clients that connect to the HTTP Proxy.
      - --advertise-pandaproxy-addr internal://redpanda-0:8082,external://localhost:18082
      - --schema-registry-addr internal://0.0.0.0:8081,external://0.0.0.0:18081
      # Redpanda brokers use the RPC API to communicate with each other internally.
      - --rpc-addr redpanda-0:33145
      - --advertise-rpc-addr redpanda-0:33145
      # Mode dev-container uses well-known configuration properties for development in containers.
      - --mode dev-container
      # Tells Seastar (the framework Redpanda uses under the hood) to use 1 core on the system.
      - --smp 1
      - --default-log-level=info
    image: docker.redpanda.com/redpandadata/redpanda:v24.2.4
    container_name: redpanda-0
    volumes:
      - redpanda-0:/var/lib/redpanda/data
    networks:
      - redpanda_network
    ports:
      - 18081:18081
      - 18082:18082
      - 19092:19092
      - 19644:9644
  console:
    container_name: redpanda-console
    image: docker.redpanda.com/redpandadata/console:v2.7.2
    networks:
      - redpanda_network
    entrypoint: /bin/sh
    command: -c 'echo "$$CONSOLE_CONFIG_FILE" > /tmp/config.yml; /app/console'
    environment:
      CONFIG_FILEPATH: /tmp/config.yml
      CONSOLE_CONFIG_FILE: |
        kafka:
          brokers: ["redpanda-0:9092"]
          schemaRegistry:
            enabled: true
            urls: ["http://redpanda-0:8081"]
        redpanda:
          adminApi:
            enabled: true
            urls: ["http://redpanda-0:9644"]
    ports:
      - 8080:8080
    depends_on:
      - redpanda-0

and start that with:

docker compose -f ./redpanda-compose.yml up

now in Python, I want to create a sink on a RisingWave backend pointing to Redpanda:

import ibis
import ibis.expr.datatypes as dt

ibis.options.interactive = True
ibis.options.repr.interactive.max_rows = 22
ibis.options.repr.interactive.max_length = 22
ibis.options.repr.interactive.max_columns = None

user = "root"
database = "dev"
host = "localhost"
port = 4566

con = ibis.risingwave.connect(user=user, database=database, host=host, port=port)

source_schema = ibis.schema(
    {
        "timestamp": dt.timestamp,
        "value": str,
    }
)

from confluent_kafka import Producer

# Configuration for the Kafka producer
conf = {
    "bootstrap.servers": "localhost:19092"  # Redpanda runs on port 9092
}

# Create a Producer instance
producer = Producer(conf)

# Delivery callback function to report success or failure of a message
def delivery_report(err, msg):
    if err is not None:
        print(f"Message delivery failed: {err}")
    else:
        print(f"Message delivered to {msg.topic()} [{msg.partition()}]")

# Produce some messages to a topic
topic = "test_topic"

for i in range(10):
    message = f"Message {i}"
    producer.produce(topic, message.encode("utf-8"), callback=delivery_report)

# Wait for any outstanding messages to be delivered and delivery reports to be received
producer.flush()

print("All messages have been produced.")

source = con.create_source(
    name="test_source",
    schema=source_schema,
    connector_properties={
        "connector": "kafka",
        "topic": "test_topic",
        "properties.bootstrap.server": "localhost:18082",
        "schema.registry": "http://0.0.0.0:18081",
        # "data_format": "PLAIN",
        # "data_encode": "JSON",
    },
    data_format="PLAIN",
    encode_format="JSON",
)
source

it seems like from a previous error it couldn't find the schema registry, but I can't figure out how to specify that. this results in the error noted below -- I've tried localhost:18081 and various other things

What version of ibis are you using?

main

What backend(s) are you using, if any?

RisingWave

Relevant log output

---------------------------------------------------------------------------
InternalError_                            Traceback (most recent call last)
Cell In[9], line 1
----> 1 source = con.create_source(
      2     name="test_source",
      3     schema=source_schema,
      4     connector_properties={
      5         "connector": "kafka",
      6         "topic": "test_topic",
      7         "properties.bootstrap.server": "localhost:18082",
      8         "schema.registry": "http://0.0.0.0:18081",
      9         # "data_format": "PLAIN",
     10         # "data_encode": "JSON",
     11     },
     12     data_format="PLAIN",
     13     encode_format="JSON",
     14 )
     15 source

File ~/repos/ibis/ibis/backends/risingwave/__init__.py:449, in Backend.create_source(self, name, schema, database, connector_properties, data_format, encode_format, encode_properties)
    437 create_stmt = sge.Create(
    438     kind="SOURCE",
    439     this=target,
   (...)
    442     ),
    443 )
    445 create_stmt = create_stmt.sql(self.dialect) + data_and_encode_format(
    446     data_format, encode_format, encode_properties
    447 )
--> 449 with self._safe_raw_sql(create_stmt):
    450     pass
    452 return self.table(name, database=database)

File ~/.local/share/uv/python/cpython-3.12.5-macos-aarch64-none/lib/python3.12/contextlib.py:137, in _GeneratorContextManager.__enter__(self)
    135 del self.args, self.kwds, self.func
    136 try:
--> 137     return next(self.gen)
    138 except StopIteration:
    139     raise RuntimeError("generator didn't yield") from None

File ~/repos/ibis/ibis/backends/postgres/__init__.py:731, in Backend._safe_raw_sql(self, *args, **kwargs)
    729 @contextlib.contextmanager
    730 def _safe_raw_sql(self, *args, **kwargs):
--> 731     with contextlib.closing(self.raw_sql(*args, **kwargs)) as result:
    732         yield result

File ~/repos/ibis/ibis/backends/postgres/__init__.py:757, in Backend.raw_sql(self, query, **kwargs)
    754     raise
    756 try:
--> 757     cursor.execute(query, **kwargs)
    758 except Exception:
    759     con.rollback()

InternalError_: Failed to run the query

Caused by these errors (recent errors listed first):
  1: gRPC request to meta service failed: Internal error
  2: failed to create source worker
  3: Unknown fields in the WITH clause: {"schema.registry": "http://0.0.0.0:18081"}

Code of Conduct

lostmygithubaccount commented 1 hour ago

cc: @KeXiangWang