apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.45k stars 2.43k forks source link

[SUPPORT] HoodieSinkConnector not updating the Hudi Table after initial update. #12048

Open bpp-incom opened 1 month ago

bpp-incom commented 1 month ago

Description When using the HoodieSinkConnector in Kafka Connect, the connector only synchronize the Kafka Topic to the Hudi table once. After the initial upsert, it does not persist any future messages that might be produced in the Kafka Topic.

A restart of the connector fetches the new messages and correctly updates the Hudi Table.

To Reproduce

The HoodieSinkConnector has the following configuration:

{
    "bootstrap.servers": "kafka:29092",
    "connector.class": "org.apache.hudi.connect.HoodieSinkConnector",
    "errors.deadletterqueue.context.headers.enable": "false",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true",
    "header.converter": "org.apache.kafka.connect.storage.SimpleHeaderConverter",
    "hoodie.base.path": "file:///tmp/hoodie/input/",
    "hoodie.bulkinsert.shuffle.parallelism": "2",
    "hoodie.datasource.transactional": "false",
    "hoodie.datasource.write.commit.interval": "30",
    "hoodie.datasource.write.operation": "upsert",
    "hoodie.datasource.write.partitionpath.field": "date",
    "hoodie.datasource.write.recordkey.field": "volume",
    "hoodie.hadoop.fs.defaultFS": "file:///",
    "hoodie.hadoop.security.authentication": "simple",
    "hoodie.insert.shuffle.parallelism": "2",
    "hoodie.kafka.commit.interval.secs": "60",
    "hoodie.metadata.enable": "false",
    "hoodie.schemaprovider.class": "org.apache.hudi.schema.SchemaRegistryProvider",
    "hoodie.streamer.schemaprovider.registry.url": "http://schema-registry:8081/subjects/input-key/versions/latest",
    "hoodie.table.name": "input",
    "hoodie.table.type": "COPY_ON_WRITE",
    "hoodie.upsert.shuffle.parallelism": "2",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "name": "test-hudi-connector-1",
    "topics": "input",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter.schemas.enable": "false"
}

I have create the schema in SchemaRegistry and produced messages to the Kafka Topic called "input". The messages adheres to the schema in SchemaRegistry. The data contains fields "date" and "volume" which are used in the config.

Further steps to reproduce the behavior:

  1. Create a local environment with Kafka, Kafka Connect, SchemaRegistry, and Redpanda (optional. This is just to have a nice interface to interact with Kafka and Kafka Connect.) For the Kafka Connect instance I have the following dependencies
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>31.1-jre</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hudi</groupId>
            <artifactId>hudi-kafka-connect</artifactId>
            <version>0.15.0</version>
        </dependency>       
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.10.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-avro</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.10.2</version>
        </dependency>
  1. Create a HoodieConnector in Kafka Connect with the above config, and add a schema in SchemaRegistry containing the fields "volume" and "date"
  2. Create a new records in the Kafka topic "input" with fields "volume" and "date".
  3. Check that the Hudi table is created in this path: "/tmp/hoodie/input/" in your Kafka Connect container.
  4. Create a additional new records in the Kafka topic "input" with fields "volume" and "date".
  5. Verify that the Hudi table in "/tmp/hoodie/input/" is not updated with new data.

Expected behavior

We expect data from the new records from the Kafka topic to be upserted into the Hudi table.

Environment Description

Additional context

A restart of the connector adds new data to the Hudi table.

Stacktrace

There are no error in the logs.

I have enabled DEBUG logging in log4j.properties and the best guess are these lines:

DEBUG org.apache.kafka.connect.runtime.WorkerSinkTask               |ConnectorName: |  WorkerSinkTask{id=test-hudi-connector-1-0} Skipping offset commit, no change since last commit   [org.apache.kafka.connect.runtime.WorkerSinkTask]
DEBUG org.apache.kafka.connect.runtime.WorkerSinkTask               |ConnectorName: |  WorkerSinkTask{id=test-hudi-connector-1-0} Finished offset commit successfully in 0 ms for sequence number 39: null   [org.apache.kafka.connect.runtime.WorkerSinkTask]

In advance, thank you for your help!

danny0405 commented 1 month ago

@yihua The kafka-connector is out of maintainance right?

rangareddy commented 1 month ago

Hi @bpp-incom / @danny0405,

I will try to reproduce this issue.