tabular-io / iceberg-kafka-connect

Apache License 2.0
192 stars 41 forks source link

Running with docker #100

Closed Wuerike closed 10 months ago

Wuerike commented 11 months ago

Can you help me run it locally? I don't have experience either with Iceberg or Kafka, so I probably doing something wrong.

Starting from the tabular-io docker compose of iceberg + spark, I've added the Kafka services but I could manage to make it work end-to-end

I created a dockerfile to extend kafka connect image and install this pluging

Dockerfile:

FROM confluentinc/cp-kafka-connect:latest
ENV CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
RUN confluent-hub install --no-prompt tabular/iceberg-kafka-connect:0.4.11

docker-compose.yaml:

version: "3"

services:
  spark-iceberg:
    image: tabulario/spark-iceberg
    container_name: spark-iceberg
    build: spark/
    networks:
      iceberg_net:
    depends_on:
      - rest
      - minio
    volumes:
      - ./notebooks:/home/iceberg/notebooks/notebooks
    environment:
      - AWS_ACCESS_KEY_ID=admin
      - AWS_SECRET_ACCESS_KEY=password
      - AWS_REGION=us-east-1
    ports:
      - 8888:8888
      - 8080:8080
      - 10000:10000
      - 10001:10001

  rest:
    image: tabulario/iceberg-rest
    container_name: iceberg-rest
    networks:
      iceberg_net:
    ports:
      - 8181:8181
    environment:
      - AWS_ACCESS_KEY_ID=admin
      - AWS_SECRET_ACCESS_KEY=password
      - AWS_REGION=us-east-1
      - CATALOG_WAREHOUSE=s3://warehouse/
      - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
      - CATALOG_S3_ENDPOINT=http://minio:9000

  minio:
    image: minio/minio
    container_name: minio
    environment:
      - MINIO_ROOT_USER=admin
      - MINIO_ROOT_PASSWORD=password
      - MINIO_DOMAIN=minio
    # volumes:
    #   - ./data:/data
    networks:
      iceberg_net:
        aliases:
          - warehouse.minio
    ports:
      - 9001:9001
      - 9000:9000
    command: ["server", "/data", "--console-address", ":9001"]

  mc:
    depends_on:
      - minio
    image: minio/mc
    container_name: mc
    networks:
      iceberg_net:
    environment:
      - AWS_ACCESS_KEY_ID=admin
      - AWS_SECRET_ACCESS_KEY=password
      - AWS_REGION=us-east-1
    entrypoint: >
      /bin/sh -c "
      until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
      /usr/bin/mc rm -r --force minio/warehouse;
      /usr/bin/mc mb minio/warehouse;
      /usr/bin/mc policy set public minio/warehouse;
      tail -f /dev/null
      "

  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    networks:
      iceberg_net:

  kafka:
    image: confluentinc/cp-kafka:latest
    hostname: kafka
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
      - 9997:9997
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9997
      KAFKA_JMX_HOSTNAME: kafka
    networks:
      iceberg_net:

  schemaregistry:
    image: confluentinc/cp-schema-registry:latest
    hostname: schemaregistry
    container_name: schemaregistry
    ports:
      - 8085:8085
    depends_on:
      - kafka
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:9092
      SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
      SCHEMA_REGISTRY_HOST_NAME: schemaregistry
      SCHEMA_REGISTRY_LISTENERS: http://schemaregistry:8085
      SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
      SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
      SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
    networks:
      iceberg_net:

  kafka-connect:
    build:
      context: .
      dockerfile: Dockerfile.KafkaConnect
    hostname: kafka-connect
    container_name: kafka-connect
    ports:
      - 8083:8083
    depends_on:
      - kafka
      - schemaregistry
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka:9092
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: _connect_configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_TOPIC: _connect_offset
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: _connect_status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry:8085
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry:8085
      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
    networks:
      iceberg_net:

  kafka-ui:
    container_name: kafka-ui
    image: provectuslabs/kafka-ui:latest
    ports:
      - 18080:8080
    depends_on:
      - kafka
      - schemaregistry
      - kafka-connect
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
      KAFKA_CLUSTERS_0_METRICS_PORT: 9997
      KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schemaregistry:8085
      KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: kafka-connect
      KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect:8083
      DYNAMIC_CONFIG_ENABLED: 'true'
    networks:
      iceberg_net:

networks:
  iceberg_net:

Config:

{
    "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
    "tasks.max": "1",
    "topics": "events",
    "iceberg.tables": "default.events",
    "iceberg.catalog.type": "rest",
    "iceberg.catalog.uri": "http://localhost:8181",
    "iceberg.catalog.warehouse": "s3://warehouse/"
}

Error:

org.apache.iceberg.exceptions.RESTException: Error occurred while processing GET request at
org.apache.iceberg.rest.HTTPClient.execute(HTTPClient.java:304) at
org.apache.iceberg.rest.HTTPClient.execute(HTTPClient.java:219) at
org.apache.iceberg.rest.HTTPClient.get(HTTPClient.java:320) at
org.apache.iceberg.rest.RESTSessionCatalog.fetchConfig(RESTSessionCatalog.java:823) at
org.apache.iceberg.rest.RESTSessionCatalog.initialize(RESTSessionCatalog.java:167) at
org.apache.iceberg.rest.RESTCatalog.initialize(RESTCatalog.java:68) at
org.apache.iceberg.CatalogUtil.loadCatalog(CatalogUtil.java:239) at
org.apache.iceberg.CatalogUtil.buildIcebergCatalog(CatalogUtil.java:284) at
io.tabular.iceberg.connect.data.Utilities.loadCatalog(Utilities.java:57) at
io.tabular.iceberg.connect.IcebergSinkTask.open(IcebergSinkTask.java:61) at
org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:644) at
org.apache.kafka.connect.runtime.WorkerSinkTask.access$1200(WorkerSinkTask.java:73) at
org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:741) at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:324) at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:473) at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478) at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389) at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:559) at
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1288) at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1247) at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1227) at
org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:479) at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:331) at
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237) at
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206) at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204) at
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259) at
org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181) at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at
java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.hc.client5.http.HttpHostConnectException: Connect to http://localhost:8181 [localhost/127.0.0.1] failed: Connection refused (Connection refused) at
java.base/java.net.PlainSocketImpl.socketConnect(Native Method) at
java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412) at
java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255) at
java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237) at
java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at
java.base/java.net.Socket.connect(Socket.java:615) at
org.apache.hc.client5.http.socket.PlainConnectionSocketFactory.lambda$connectSocket$0(PlainConnectionSocketFactory.java:85) at
java.base/java.security.AccessController.doPrivileged(Native Method) at
org.apache.hc.client5.http.socket.PlainConnectionSocketFactory.connectSocket(PlainConnectionSocketFactory.java:84) at
org.apache.hc.client5.http.socket.ConnectionSocketFactory.connectSocket(ConnectionSocketFactory.java:113) at
org.apache.hc.client5.http.impl.io.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:181) at
org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:447) at
org.apache.hc.client5.http.impl.classic.InternalExecRuntime.connectEndpoint(InternalExecRuntime.java:162) at
org.apache.hc.client5.http.impl.classic.InternalExecRuntime.connectEndpoint(InternalExecRuntime.java:172) at
org.apache.hc.client5.http.impl.classic.ConnectExec.execute(ConnectExec.java:142) at
org.apache.hc.client5.http.impl.classic.ExecChainElement.execute(ExecChainElement.java:51) at
org.apache.hc.client5.http.impl.classic.ProtocolExec.execute(ProtocolExec.java:192) at
org.apache.hc.client5.http.impl.classic.ExecChainElement.execute(ExecChainElement.java:51) at
org.apache.hc.client5.http.impl.classic.HttpRequestRetryExec.execute(HttpRequestRetryExec.java:96) at
org.apache.hc.client5.http.impl.classic.ExecChainElement.execute(ExecChainElement.java:51) at
org.apache.hc.client5.http.impl.classic.ContentCompressionExec.execute(ContentCompressionExec.java:152) at
org.apache.hc.client5.http.impl.classic.ExecChainElement.execute(ExecChainElement.java:51) at
org.apache.hc.client5.http.impl.classic.RedirectExec.execute(RedirectExec.java:115) at
org.apache.hc.client5.http.impl.classic.ExecChainElement.execute(ExecChainElement.java:51) at
org.apache.hc.client5.http.impl.classic.InternalHttpClient.doExecute(InternalHttpClient.java:170) at
org.apache.hc.client5.http.impl.classic.CloseableHttpClient.execute(CloseableHttpClient.java:123) at
org.apache.iceberg.rest.HTTPClient.execute(HTTPClient.java:267) ... 32 more
bryanck commented 11 months ago

You will need to update iceberg.catalog.uri, localhost won't work in this case AFAIK.

Wuerike commented 11 months ago

I've also tried http://rest:8181 before, but then I got a timeout error in the connector

bryanck commented 11 months ago

Try giving the REST catalog a hostname

Wuerike commented 11 months ago

The error changed, it seems its failing in a further step now

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:618) at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336) at
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237) at
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206) at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204) at
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259) at
org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181) at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at
java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.iceberg.exceptions.ServiceFailureException: Server error: NotFoundException: Location does not exist: s3://warehouse/default/events/metadata/00000-922efb19-660e-4b67-a7ba-af58697a1762.metadata.json at
org.apache.iceberg.rest.ErrorHandlers$DefaultErrorHandler.accept(ErrorHandlers.java:162) at
org.apache.iceberg.rest.ErrorHandlers$TableErrorHandler.accept(ErrorHandlers.java:109) at
org.apache.iceberg.rest.ErrorHandlers$TableErrorHandler.accept(ErrorHandlers.java:93) at
org.apache.iceberg.rest.HTTPClient.throwFailure(HTTPClient.java:176) at
org.apache.iceberg.rest.HTTPClient.execute(HTTPClient.java:285) at
org.apache.iceberg.rest.HTTPClient.execute(HTTPClient.java:219) at
org.apache.iceberg.rest.HTTPClient.get(HTTPClient.java:320) at
org.apache.iceberg.rest.RESTClient.get(RESTClient.java:96) at
org.apache.iceberg.rest.RESTSessionCatalog.loadInternal(RESTSessionCatalog.java:294) at
org.apache.iceberg.rest.RESTSessionCatalog.loadTable(RESTSessionCatalog.java:310) at
org.apache.iceberg.catalog.BaseSessionCatalog$AsCatalog.loadTable(BaseSessionCatalog.java:99) at
org.apache.iceberg.rest.RESTCatalog.loadTable(RESTCatalog.java:92) at
io.tabular.iceberg.connect.data.IcebergWriter.<init>(IcebergWriter.java:46) at
io.tabular.iceberg.connect.data.IcebergWriterFactory.createWriter(IcebergWriterFactory.java:35) at
io.tabular.iceberg.connect.channel.Worker.lambda$getWriterForTable$7(Worker.java:249) at
java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1134) at
io.tabular.iceberg.connect.channel.Worker.getWriterForTable(Worker.java:249) at
io.tabular.iceberg.connect.channel.Worker.lambda$routeRecordStatically$4(Worker.java:203) at
java.base/java.util.Arrays$ArrayList.forEach(Arrays.java:4390) at
io.tabular.iceberg.connect.channel.Worker.routeRecordStatically(Worker.java:201) at
io.tabular.iceberg.connect.channel.Worker.save(Worker.java:190) at
java.base/java.util.ArrayList.forEach(ArrayList.java:1541) at
io.tabular.iceberg.connect.channel.Worker.save(Worker.java:177) at
io.tabular.iceberg.connect.IcebergSinkTask.put(IcebergSinkTask.java:119) at
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587) ... 11 more
bryanck commented 11 months ago

It looks like you may need to create the MinIO bucket, the sink won’t do that

bryanck commented 11 months ago

You'll also want to set CATALOG_S3_PATH__STYLE__ACCESS to true for the REST catalog container

bryanck commented 11 months ago

I'll try your setup a little later today. BTW this would be a nice thing to contribute once it is working, if you're interested.

Wuerike commented 11 months ago

Yeah, sure, I was thinking the same, once it works I wanna share it

Wuerike commented 11 months ago

I configured the schema registry better and now I got: Caused by: java.lang.UnsupportedOperationException: Cannot convert type: java.lang.String at io.tabular.iceberg.connect.data.RecordConverter.convert(RecordConverter.java:91) at io.tabular.iceberg.connect.data.IcebergWriter.write(IcebergWriter.java:56)

Maybe I'm still messing up with the converters

Wuerike commented 11 months ago

Now (I think) I'm sure its conversion related:

  1. I got NoSuchTableException: Table does not exist
  2. I've created the table
  3. I got conversion error at io.tabular.iceberg.connect.data.RecordConverter.convert

At this test this was my kafka-connect docker-compose service:

  kafka-connect:
    build:
      context: .
      dockerfile: Dockerfile.KafkaConnect
    hostname: kafka-connect
    container_name: kafka-connect
    ports:
      - 8083:8083
    depends_on:
      - kafka
      - schemaregistry
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka:9092
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: _connect_configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_TOPIC: _connect_offset
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: _connect_status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry:8085
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry:8085
      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
    networks:
      iceberg_net:

Full error:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler at
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230) at
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156) at
org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:522) at
org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:499) at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:335) at
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237) at
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206) at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204) at
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259) at
org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181) at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at
java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: at
org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:326) at
org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88) at
org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:522) at
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180) at
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214) ... 14 more Caused by: org.apache.kafka.common.errors.SerializationException: java.io.CharConversionException: Invalid UTF-32 character 0x17a2269 (above 0x0010ffff) at
char #1, byte #7) at
org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:66) at
org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:324) ... 18 more Caused by: java.io.CharConversionException: Invalid UTF-32 character 0x17a2269 (above 0x0010ffff) at
char #1, byte #7) at
com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:193) at
com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:159) at
com.fasterxml.jackson.core.json.ReaderBasedJsonParser._loadMore(ReaderBasedJsonParser.java:276) at
com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:2442) at
com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:698) at
com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4703) at
com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3090) at
org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:64) ... 19 more
bryanck commented 11 months ago

Are the messages you are producing JSON strings with a schema?

bryanck commented 11 months ago

I can try running it, I think the only missing piece for me is your Dockerfile for KC

hendoxc commented 11 months ago

Now (I think) I'm sure its conversion related:

  1. I got NoSuchTableException: Table does not exist
  2. I've created the table
  3. I got conversion error at io.tabular.iceberg.connect.data.RecordConverter.convert

At this test this was my kafka-connect docker-compose service:

  kafka-connect:
    build:
      context: .
      dockerfile: Dockerfile.KafkaConnect
    hostname: kafka-connect
    container_name: kafka-connect
    ports:
      - 8083:8083
    depends_on:
      - kafka
      - schemaregistry
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka:9092
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: _connect_configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_TOPIC: _connect_offset
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: _connect_status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry:8085
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry:8085
      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
    networks:
      iceberg_net:

Full error:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler at
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230) at
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156) at
org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:522) at
org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:499) at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:335) at
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237) at
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206) at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204) at
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259) at
org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181) at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at
java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: at
org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:326) at
org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88) at
org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:522) at
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180) at
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214) ... 14 more Caused by: org.apache.kafka.common.errors.SerializationException: java.io.CharConversionException: Invalid UTF-32 character 0x17a2269 (above 0x0010ffff) at
char #1, byte #7) at
org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:66) at
org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:324) ... 18 more Caused by: java.io.CharConversionException: Invalid UTF-32 character 0x17a2269 (above 0x0010ffff) at
char #1, byte #7) at
com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:193) at
com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:159) at
com.fasterxml.jackson.core.json.ReaderBasedJsonParser._loadMore(ReaderBasedJsonParser.java:276) at
com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:2442) at
com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:698) at
com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4703) at
com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3090) at
org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:64) ... 19 more

i got this error too

Wuerike commented 11 months ago

To share my setup easily, I've created a repo with these info, I did my best to demonstrate the config I've been using

https://github.com/Wuerike/docker-iceberg-kafka-connect

If you guys manage to make it work let me know please, so far i haven't succeeded

jbouricius commented 11 months ago

Following along here as well, @Wuerike could you include your Producer as well in your repro example?

Wuerike commented 11 months ago

I was producing directly in the kafka-ui, as the connector fails at the first message I didn't build a producer yet

hendoxc commented 11 months ago

I think you need to specify key.converter=org.apache.kafka.connect.storage.StringConverter this was causing the issue for me and adding this fixed it. the task now runs, but nothing is being written I just get these logs

my connector task:

{
  "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
  "iceberg.catalog.s3.secret-access-key": "password",
  "iceberg.catalog.s3.endpoint": "http://minio:9000",
  "topics": "dummy.data",
  "tasks.max": "1",
  "iceberg.control.commit.interval-ms": "20000",
  "iceberg.catalog.client.region": "eu-west-1",
  "iceberg.catalog.uri": "http://rest:8181",
  "iceberg.tables": "demo_schema.dummy_data",
  "value.converter.schemas.enable": "false",
  "iceberg.catalog.warehouse": "s3://demo-iceberg/",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "iceberg.catalog.type": "rest",
  "schemas.enable": "false",
  "iceberg.catalog.s3.access-key-id": "admin"
}
[2023-10-06 10:37:41,484] INFO Sending event of type: COMMIT_REQUEST (io.tabular.iceberg.connect.channel.Channel)
[2023-10-06 10:38:11,633] INFO Commit timeout reached (io.tabular.iceberg.connect.channel.CommitState)
[2023-10-06 10:38:11,686] INFO Sending event of type: COMMIT_COMPLETE (io.tabular.iceberg.connect.channel.Channel)
[2023-10-06 10:38:11,697] INFO Commit b99c810d-a950-422c-ad8f-74d959a31751 complete, committed to 0 table(s), vtts null (io.tabular.iceberg.connect.channel.Coordinator)

im sending these kind of messages:

{"block_hash":"f266944e-c8b5-42f4-a557-a1c04f1c325e","block_number":1993,"block_timestamp":1696588481}

sometimes it does give me (and writes to the table):

2023-10-06 10:43:12,079] INFO Sending event of type: COMMIT_TABLE (io.tabular.iceberg.connect.channel.Channel)
[2023-10-06 10:43:12,088] INFO Commit complete to table atlas.dummy_data, snapshot 8328104263542356348, commit ID 1faeac5d-5484-44de-9bb4-10cefb6b7c42, vtts null (io.tabular.iceberg.connect.channel.Coordinator)
[2023-10-06 10:43:12,097] INFO Sending event of type: COMMIT_COMPLETE (io.tabular.iceberg.connect.channel.Channel)
[2023-10-06 10:43:12,127] INFO Commit 1faeac5d-5484-44de-9bb4-10cefb6b7c42 complete, committed to 1 table(s), vtts null (io.tabular.iceberg.connect.channel.Coordinator)

I guess I don't quite understand how the commit process to iceberg works?

Wuerike commented 11 months ago

My connector config it's pretty similar to yours but it never works

Wuerike commented 11 months ago

@hendoxc I got to the same behaviour as you, I produce messages, I got no error but also nothing is committed to the table.

Did you solved it somehow?

hendoxc commented 11 months ago

@hendoxc I got to the same behaviour as you, I produce messages, I got no error but also nothing is committed to the table.

Did you solved it somehow?

Yes, looking at the files in minio it seems the files are commited to the table on 5 min intervals, so I guess I'm mis-understanding how "iceberg.control.commit.interval-ms": "20000" works.

so everything seems to be all working for me. I also switched to using avro and schema registry and things worked a lot better.

bryanck commented 11 months ago

@hendoxc are you saying you're seeing commits every 5 min even though you set the commit interval to 20 sec?

bryanck commented 11 months ago

I think the problem might be that the config property iceberg.control.commit.interval-ms is for sink versions 0.5.0 and later. If using 0.4.x the property is iceberg.control.commitIntervalMs, so it might have been using the default.

jbouricius commented 11 months ago

I have the same behavior running 0.5.0. I see COMMIT REQUEST and COMMIT COMPLETE messages in the logs, but no data is written out to my catalog until I close kafka-connect . I'll experiment with it today and see if it writes after 5 minutes.

Wuerike commented 10 months ago

Using iceberg.control.commitIntervalMs finally make it work

Now I'll update my repo to share a demo

Thanks you all

bryanck commented 10 months ago

Awesome, nice work!

Wuerike commented 10 months ago

I've updated my repo with an exemple of how to execute everything locally https://github.com/wuerike/kafka-iceberg-streaming

@bryanck how can we use this work to contribute here? Don't know if copying + pasting to a folder here or just mentioning it in the readme

bryanck commented 10 months ago

A link on the README would be great, perhaps at the bottom we can have a section for resources. This repo will be archived at some point once the code is moved to Iceberg.