apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.3k stars 2.4k forks source link

[SUPPORT] HoodieDeltaStreamer failing on bytesToAvro call when attempting to insert record #7973

Open joeytman opened 1 year ago

joeytman commented 1 year ago

Tips before filing an issue

Describe the problem you faced

I'm trying to set up a DeltaStreamer job that consumes Avro changelog from Kafka and writes to s3. The changelog is created by Debezium from Vitess binlog, writing schemas to the confluent-api-compatible apicurio registry. I've gone ahead and implemented a VitessDebeziumSource and VitessDebeziumAvroPayload and can see the envelope is deserialized correctly and the flattened dataset looks good from logs within VitessDebeziumSource.processDataset. However, writes are later failing when trying to deserialize records here (invoked here).

To Reproduce

Steps to reproduce the behavior:

  1. Compile hudi-utilities-bundle w/ new VitesssDebeziumSource.java and VitessDebeziumAvroPayload.java. My implementation of each is:

VitessDebeziumSource:

package org.apache.hudi.utilities.sources.debezium;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.debezium.DebeziumConstants;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

/**
 * Source for incrementally ingesting debezium generated change logs for Vitess DB.
 */
public class VitessDebeziumSource extends DebeziumSource {

    private static final Logger LOG = LogManager.getLogger(VitessDebeziumSource.class);

    public VitessDebeziumSource(TypedProperties props, JavaSparkContext sparkContext,
                                SparkSession sparkSession,
                                SchemaProvider schemaProvider,
                                HoodieDeltaStreamerMetrics metrics) {
        super(props, sparkContext, sparkSession, schemaProvider, metrics);
    }

    /**
     * Debezium Kafka Payload has a nested structure (see https://debezium.io/documentation/reference/2.1/connectors/vitess.html).
     * This function flattens this nested structure for the Vitess data, and also extracts a subset of Debezium metadata fields.
     *
     * @param rowDataset Dataset containing Debezium Payloads
     * @return New dataset with flattened columns
     */
    @Override
    protected Dataset<Row> processDataset(Dataset<Row> rowDataset) {
        Dataset<Row> flattenedDataset = rowDataset;
        if (rowDataset.columns().length > 0) {
            // Only flatten for non-empty schemas
            Dataset<Row> insertedOrUpdatedData = rowDataset
                    .selectExpr(
                            String.format("%s as %s", DebeziumConstants.INCOMING_OP_FIELD, DebeziumConstants.FLATTENED_OP_COL_NAME),
                            String.format("%s as %s", DebeziumConstants.INCOMING_TS_MS_FIELD, DebeziumConstants.FLATTENED_TS_COL_NAME), // TODO: use source.ts_ms
                            String.format("%s.*", DebeziumConstants.INCOMING_AFTER_FIELD)
                    )
                    .filter(rowDataset.col(DebeziumConstants.INCOMING_OP_FIELD).notEqual(DebeziumConstants.DELETE_OP));

            Dataset<Row> deletedData = rowDataset
                    .selectExpr(
                            String.format("%s as %s", DebeziumConstants.INCOMING_OP_FIELD, DebeziumConstants.FLATTENED_OP_COL_NAME),
                            String.format("%s as %s", DebeziumConstants.INCOMING_TS_MS_FIELD, DebeziumConstants.FLATTENED_TS_COL_NAME), // TODO: use source.ts_ms
                            String.format("%s.*", DebeziumConstants.INCOMING_BEFORE_FIELD)
                    )
                    .filter(rowDataset.col(DebeziumConstants.INCOMING_OP_FIELD).equalTo(DebeziumConstants.DELETE_OP));

            flattenedDataset = insertedOrUpdatedData.union(deletedData);
        }

        LOG.error("VitessDebeziumSource.processDataset flattenedDataset.schema: " + flattenedDataset.schema());
        LOG.error("VitessDebeziumSource.processDataset flattenedDataset.showString: " + flattenedDataset.showString(10, 0, false));

        return flattenedDataset;
    }

}

VitessDebeziumAvroPayload:


package org.apache.hudi.common.model.debezium;

import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.Objects;

/**
 * Provides support for seamlessly applying changes captured via Debezium for MysqlDB.
 * <p>
 * Debezium change event types are determined for the op field in the payload
 * <p>
 * - For inserts, op=i
 * - For deletes, op=d
 * - For updates, op=u
 * - For snapshot inserts, op=r
 * <p>
 * This payload implementation will issue matching insert, delete, updates against the hudi table
 */
public class VitessDebeziumAvroPayload extends AbstractDebeziumAvroPayload {

    private static final Logger LOG = LogManager.getLogger(VitessDebeziumAvroPayload.class);

    public VitessDebeziumAvroPayload(GenericRecord record, Comparable orderingVal) {
        super(record, orderingVal);
    }

    public VitessDebeziumAvroPayload(Option<GenericRecord> record) {
        super(record);
    }

    private Option<String> extractSeq(IndexedRecord record) {
        Object value = ((GenericRecord) record).get(DebeziumConstants.FLATTENED_TS_COL_NAME);
        return Option.ofNullable(Objects.toString(value, null));
    }

    @Override
    protected boolean shouldPickCurrentRecord(IndexedRecord currentRecord, IndexedRecord insertRecord, Schema schema) throws IOException {
        String insertSourceSeq = extractSeq(insertRecord)
                .orElseThrow(() ->
                        new HoodieException(String.format("%s cannot be null in insert record: %s",
                                DebeziumConstants.FLATTENED_TS_COL_NAME, insertRecord)));
        Option<String> currentSourceSeqOpt = extractSeq(currentRecord);
        // Pick the current value in storage only if its Seq (source.ts_ms) is latest
        // compared to the Seq (source.ts_ms) of the insert value
        return currentSourceSeqOpt.isPresent() && insertSourceSeq.compareTo(currentSourceSeqOpt.get()) < 0;
    }
}
  1. Launch DeltaStreamer job via EMR CLI w/

    spark-submit \
    --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /home/jthaidigsman/hudi-utilities-bundle_2.12-0.12.1.jar  \
    --target-table db.table \
    --target-base-path s3://redacted/hudi/table/ \
    --table-type COPY_ON_WRITE \
    --source-class org.apache.hudi.utilities.sources.debezium.VitessDebeziumSource \
    --payload-class org.apache.hudi.common.model.debezium.VitessDebeziumAvroPayload \
    --source-ordering-field _event_origin_ts_ms \
    --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
    --checkpoint redacted.redacted.redacted,0:0 \
    --hoodie-conf hoodie.datasource.write.recordkey.field=col1,col2 \
    --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator \
    --hoodie-conf hoodie.datasource.write.partitionpath.field="" \
    --hoodie-conf bootstrap.servers=redacted:9092 \
    --hoodie-conf hoodie.deltastreamer.source.kafka.topic=redacted \
    --hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=http://redacted:8080/apis/ccompat/v6/subjects/redacted-value/versions/latest/ \
    --hoodie-conf schema.registry.url=http://redacted:8080/apis/ccompat/v6/ \
    --hoodie-conf hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer 
  2. Observe deserialization error (details below)

Expected behavior

In RFC-39, its stated that:

Since we change the schema of the incoming record in the source class, we have to provide a schema for the target record. We propose to implement DebeziumAvroSource.java as a RowSource and allow spark to infer the schema of the transformed record. An alternative approach is to implement a DebeziumSchemaRegistryProvider.java class that extends the current SchemaRegistryProvider.java, and implements the method getTargetSchema . It constructs the target schema from the original schema by including only the fields nested within the after field of the original record, along with the meta fields that were actually ingested.

From my understanding of this paragraph, at the point in the write operation we're failing at, we should be inferring the schema from the record which has already been deserialized and flattened via VitessDebeziumSource. However, adding some logs to the bytesToAvro method shows that the writerSchema and readerSchema are set to the schema of the debezium envelope, rather than the schema of the record we're writing. This goes against my understanding of how the flow should work, as it seems like it's using the target schema returned from the SchemaRegistryProvider rather than inferring the schema.

I also tried implementing DebeziumSchemaRegistryProvider with a new getTargetSchema as mentioned as an alternative in the RFC, and was able to see that the schemas passed to bytesToAvro were now set to the formatted schema produced by DebeziumSchemaRegistryProvider, confirming that the schema inference flow described by the RFC is not being triggered.

Is there some config that needs to be set so that Deltastreamer will write with schema of the record produced by VitessDebeziumSource ? Or am I misunderstanding how this is supposed to work?

Environment Description

Additional context

Debezium Envelope Schema (from schema registry):

{
  "type": "record",
  "name": "Envelope",
  "namespace": "redacted.redacted.redacted",
  "fields": [
    {
      "name": "before",
      "type": [
        "null",
        {
          "type": "record",
          "name": "Value",
          "fields": [
            {
              "name": "col1",
              "type": "string"
            },
            {
              "name": "col2",
              "type": "string"
            },
            {
              "name": "col3",
              "type": "string"
            },
            {
              "name": "col4",
              "type": "long"
            },
            {
              "name": "col5",
              "type": "long"
            },
            {
              "name": "col6",
              "type": "string"
            },
            {
              "name": "col7",
              "type": "string"
            },
            {
              "name": "col8",
              "type": {
                "type": "int",
                "connect.type": "int16"
              }
            },
            {
              "name": "col9",
              "type": {
                "type": "int",
                "connect.type": "int16"
              }
            },
            {
              "name": "col10",
              "type": {
                "type": "int",
                "connect.type": "int16"
              }
            },
            {
              "name": "col11",
              "type": {
                "type": "int",
                "connect.type": "int16"
              }
            },
            {
              "name": "col12",
              "type": "string"
            },
            {
              "name": "col13",
              "type": "string"
            },
            {
              "name": "col14",
              "type": {
                "type": "int",
                "connect.type": "int16"
              }
            },
            {
              "name": "col15",
              "type": "string"
            }
          ],
          "connect.name": "sandbox.byuser.channels_members.Value"
        }
      ],
      "default": null
    },
    {
      "name": "after",
      "type": [
        "null",
        "Value"
      ],
      "default": null
    },
    {
      "name": "source",
      "type": {
        "type": "record",
        "name": "Source",
        "namespace": "io.debezium.connector.vitess",
        "fields": [
          {
            "name": "version",
            "type": "string"
          },
          {
            "name": "connector",
            "type": "string"
          },
          {
            "name": "name",
            "type": "string"
          },
          {
            "name": "ts_ms",
            "type": "long"
          },
          {
            "name": "snapshot",
            "type": [
              {
                "type": "string",
                "connect.version": 1,
                "connect.parameters": {
                  "allowed": "true,last,false,incremental"
                },
                "connect.default": "false",
                "connect.name": "io.debezium.data.Enum"
              },
              "null"
            ],
            "default": "false"
          },
          {
            "name": "db",
            "type": "string"
          },
          {
            "name": "sequence",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "keyspace",
            "type": "string"
          },
          {
            "name": "table",
            "type": "string"
          },
          {
            "name": "vgtid",
            "type": "string"
          }
        ],
        "connect.name": "io.debezium.connector.vitess.Source"
      }
    },
    {
      "name": "op",
      "type": "string"
    },
    {
      "name": "ts_ms",
      "type": [
        "null",
        "long"
      ],
      "default": null
    },
    {
      "name": "transaction",
      "type": [
        "null",
        {
          "type": "record",
          "name": "block",
          "namespace": "event",
          "fields": [
            {
              "name": "id",
              "type": "string"
            },
            {
              "name": "total_order",
              "type": "long"
            },
            {
              "name": "data_collection_order",
              "type": "long"
            }
          ],
          "connect.version": 1,
          "connect.name": "event.block"
        }
      ],
      "default": null
    }
  ],
  "connect.version": 1,
  "connect.name": "redacted.redacted.redacted.Envelope"
}

From within the VitessDebeziumSource logs we can see that the packets from kafka are ingested, deserialized, and flattened properly:

23/02/15 20:40:34 ERROR VitessDebeziumSource: VitessDebeziumSource.processDataset flattenedDataset.showString: +----------------------+-------------------+-------------+-------------+---------------+-----------+------------+---------+-------------+-------+------------+--------------------+----------+--------------+-------------------+----------------------+-------------+
|_change_operation_type|_event_origin_ts_ms|col1      |col2   |col3 |col4 |col5 |col6 |col7 |col8 |col9 |col10 |col11 |col12 |col13 |col14 |col15 |
+----------------------+-------------------+-------------+-------------+---------------+-----------+------------+---------+-------------+-------+------------+--------------------+----------+--------------+-------------------+----------------------+-------------+
|c                     |1676473534195      |1153411137283|1152398039042|1152398038962  |1676473534 |0           |0        |0            |0      |1           |1                   |0         |1             |1152398038962      |0                     |1152398038962|
|c                     |1676473536740      |1153411137283|1152398039074|1152398038962  |1676473536 |0           |0        |0            |0      |1           |1                   |0         |1153411137283 |0                  |0                     |1152398038962|
|u                     |1676473536792      |1153411137283|1152398039074|1152398038962  |1676473536 |0           |0        |0            |1      |1           |1                   |0         |1153411137283 |0                  |0                     |1152398038962|
|c                     |1676473542584      |1152398039026|1153411137507|1152398039010  |1676473542 |0           |0        |0            |0      |1           |1                   |0         |1             |1152398039010      |0                     |1152398039010|
|c                     |1676473544236      |1152398039026|1152398039234|1152398039010  |1676473543 |0           |0        |0            |0      |1           |1                   |0         |1152398039026 |0                  |0                     |1152398039010|
|u                     |1676473544286      |1152398039026|1152398039234|1152398039010  |1676473543 |0           |0        |0            |1      |1           |1                   |0         |1152398039026 |0                  |0                     |1152398039010|
|c                     |1676473544326      |1153411137283|1152398039154|1152398038962  |1676473544 |0           |0        |0            |0      |0           |0                   |0         |0             |0                  |0                     |1152398038962|
|c                     |1676473554065      |1152398039426|1153411137939|1152398039362  |1676473554 |0           |0        |0            |0      |1           |1                   |0         |1             |1152398039362      |0                     |1152398039362|
|c                     |1676473555721      |1152398039426|1152398039666|1152398039362  |1676473555 |0           |0        |0            |0      |1           |1                   |0         |1152398039426 |0                  |0                     |1152398039362|
|u                     |1676473555815      |1152398039426|1152398039666|1152398039362  |1676473555 |0           |0        |0            |1      |1           |1                   |0         |1152398039426 |0                  |0                     |1152398039362|
+----------------------+-------------------+-------------+-------------+---------------+-----------+------------+---------+-------------+-------+------------+--------------------+----------+--------------+-------------------+----------------------+-------------+

But from logs within avroToBytes we can see that the writerSchema and readerSchema are still set to the Debezium Envelope schema, rather than inferring the schema from the flattened dataset:

23/02/15 20:29:49 ERROR HoodieAvroUtils: bytesToAvro writerSchema: {"type":"record","name":"Envelope","namespace":"redacted.redacted.redacted","fields":[{"name":"before","type":["null",{"type":"record","name":"Value","fields":[{"name":"col1","type":"string"},{"name":"col2","type":"string"},{"name":"col3","type":"string"},{"name":"col4","type":"long"},{"name":"col5","type":"long"},{"name":"col6","type":"string"},{"name":"col7","type":"string"},{"name":"col8","type":{"type":"int","connect.type":"int16"}},{"name":"col9","type":{"type":"int","connect.type":"int16"}},{"name":"col10","type":{"type":"int","connect.type":"int16"}},{"name":"col11","type":{"type":"int","connect.type":"int16"}},{"name":"col12","type":"string"},{"name":"col13","type":"string"},{"name":"col14","type":{"type":"int","connect.type":"int16"}},{"name":"col15","type":"string"}],"connect.name":"sandbox.byuser.channels_members.Value"}],"default":null},{"name":"after","type":["null","Value"],"default":null},{"name":"source","type":{"type":"record","name":"Source","namespace":"io.debezium.connector.vitess","fields":[{"name":"version","type":"string"},{"name":"connector","type":"string"},{"name":"name","type":"string"},{"name":"ts_ms","type":"long"},{"name":"snapshot","type":[{"type":"string","connect.version":1,"connect.parameters":{"allowed":"true,last,false,incremental"},"connect.default":"false","connect.name":"io.debezium.data.Enum"},"null"],"default":"false"},{"name":"db","type":"string"},{"name":"sequence","type":["null","string"],"default":null},{"name":"keyspace","type":"string"},{"name":"table","type":"string"},{"name":"vgtid","type":"string"}],"connect.name":"io.debezium.connector.vitess.Source"}},{"name":"op","type":"string"},{"name":"ts_ms","type":["null","long"],"default":null},{"name":"transaction","type":["null",{"type":"record","name":"block","namespace":"event","fields":[{"name":"id","type":"string"},{"name":"total_order","type":"long"},{"name":"data_collection_order","type":"long"}],"connect.version":1,"connect.name":"event.block"}],"default":null}],"connect.version":1,"connect.name":"redacted.redacted.redacted.Envelope"}
23/02/15 20:29:49 ERROR HoodieAvroUtils: bytesToAvro readerSchema: {"type":"record","name":"Envelope","namespace":"redacted.redacted.redacted","fields":[{"name":"before","type":["null",{"type":"record","name":"Value","fields":[{"name":"col1","type":"string"},{"name":"col2","type":"string"},{"name":"col3","type":"string"},{"name":"col4","type":"long"},{"name":"col5","type":"long"},{"name":"col6","type":"string"},{"name":"col7","type":"string"},{"name":"col8","type":{"type":"int","connect.type":"int16"}},{"name":"col9","type":{"type":"int","connect.type":"int16"}},{"name":"col10","type":{"type":"int","connect.type":"int16"}},{"name":"col11","type":{"type":"int","connect.type":"int16"}},{"name":"col12","type":"string"},{"name":"col13","type":"string"},{"name":"col14","type":{"type":"int","connect.type":"int16"}},{"name":"col15","type":"string"}],"connect.name":"sandbox.byuser.channels_members.Value"}],"default":null},{"name":"after","type":["null","Value"],"default":null},{"name":"source","type":{"type":"record","name":"Source","namespace":"io.debezium.connector.vitess","fields":[{"name":"version","type":"string"},{"name":"connector","type":"string"},{"name":"name","type":"string"},{"name":"ts_ms","type":"long"},{"name":"snapshot","type":[{"type":"string","connect.version":1,"connect.parameters":{"allowed":"true,last,false,incremental"},"connect.default":"false","connect.name":"io.debezium.data.Enum"},"null"],"default":"false"},{"name":"db","type":"string"},{"name":"sequence","type":["null","string"],"default":null},{"name":"keyspace","type":"string"},{"name":"table","type":"string"},{"name":"vgtid","type":"string"}],"connect.name":"io.debezium.connector.vitess.Source"}},{"name":"op","type":"string"},{"name":"ts_ms","type":["null","long"],"default":null},{"name":"transaction","type":["null",{"type":"record","name":"block","namespace":"event","fields":[{"name":"id","type":"string"},{"name":"total_order","type":"long"},{"name":"data_collection_order","type":"long"}],"connect.version":1,"connect.name":"event.block"}],"default":null}],"connect.version":1,"connect.name":"redacted.redacted.redacted.Envelope"}

Stacktrace Driver:

Exception in thread "main" org.apache.hudi.exception.HoodieException: Commit 20230215204036257 failed and rolled-back !
    at org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:653)
    at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:336)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:204)
    at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:202)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:571)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1006)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1095)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1104)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Executor logs shows many stacktraces, with two primary variations of errors.

Negative Length error:

23/02/15 20:29:50 ERROR HoodieWriteHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=col1:1153431876899,col2:1153431871443 partitionPath=}, currentLocation='null', newLocation='null'}
org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -13097524020
    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:308) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:208) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:470) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:460) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:192) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:164) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:152) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:75) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.common.model.debezium.AbstractDebeziumAvroPayload.getInsertRecord(AbstractDebeziumAvroPayload.java:87) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.common.model.debezium.AbstractDebeziumAvroPayload.getInsertValue(AbstractDebeziumAvroPayload.java:58) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.common.model.HoodieRecordPayload.getInsertValue(HoodieRecordPayload.java:105) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.execution.HoodieLazyInsertIterable$HoodieInsertValueGenResult.<init>(HoodieLazyInsertIterable.java:90) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.execution.HoodieLazyInsertIterable.lambda$getTransformFunction$0(HoodieLazyInsertIterable.java:103) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:190) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:106) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_362]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_362]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_362]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_362]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_362]
    at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_362]

String too long error:

3/02/15 20:29:50 ERROR HoodieWriteHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=col1:1153415312659,col2:1153415313283 partitionPath=}, currentLocation='null', newLocation='null'}
java.lang.UnsupportedOperationException: Cannot read strings longer than 2147483639 bytes
    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:305) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:208) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:470) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:460) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:192) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154) ~[avro-1.11.0.jar:1.11.0]
    at org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:164) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:152) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:75) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.common.model.debezium.AbstractDebeziumAvroPayload.getInsertRecord(AbstractDebeziumAvroPayload.java:87) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.common.model.debezium.AbstractDebeziumAvroPayload.getInsertValue(AbstractDebeziumAvroPayload.java:58) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.common.model.HoodieRecordPayload.getInsertValue(HoodieRecordPayload.java:105) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.execution.HoodieLazyInsertIterable$HoodieInsertValueGenResult.<init>(HoodieLazyInsertIterable.java:90) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.execution.HoodieLazyInsertIterable.lambda$getTransformFunction$0(HoodieLazyInsertIterable.java:103) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:190) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:106) ~[hudi-utilities-bundle_2.12-0.12.1.jar:0.12.1]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_362]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_362]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_362]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_362]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_362]
    at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_362]

Given that we’ve already deserialized the records once at this point, I don't think it's an issue with the data itself, despite what the errors may imply. I think the error is arising from the mismatch between the schema used to deserialize and the actual bytes, but let me know if you think something else could be going on.

nfarah86 commented 1 year ago

cc @yihua to help with this issue.

yihua commented 1 year ago

Hi @joeytman thanks for the details. @rmahindra123 could you help answer the question around the Debezium source?

joeytman commented 1 year ago

Hello again, I have an update. I was able to work around the issue and get DeltaStreamer working by defining a DebeziumSchemaRegistryProvider as follows:

package org.apache.hudi.utilities.schema;

import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.debezium.DebeziumConstants;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.Arrays;

/**
 * Obtains latest schema from the Confluent/Kafka schema-registry, with Debezium Envelope format
 * <p>
 * https://github.com/confluentinc/schema-registry
 */
public class DebeziumSchemaRegistryProvider extends SchemaRegistryProvider {

    public DebeziumSchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) {
        super(props, jssc);
    }

    /**
     * Debezium target schema is a nested structure with many metadata fields. This will
     * flatten the schema structure and only require necessary metadata information
     * @return
     */
    @Override
    public Schema getTargetSchema() {
        Schema registrySchema = super.getTargetSchema();

        Schema.Field opField = registrySchema.getField(DebeziumConstants.INCOMING_OP_FIELD);
        Schema.Field dataField = registrySchema.getField(DebeziumConstants.INCOMING_AFTER_FIELD);

        // TODO: use INCOMING_SOURCE_TS_MS_FIELD instead
        // naively using INCOMING_SOURCE_TS_MS_FIELD results in NPE, but we can work around it
        // since INCOMING_TS_MS_FIELD has nullable version of INCOMING_SOURCE_TS_MS_FIELD schema
        Schema.Field tsField = registrySchema.getField(DebeziumConstants.INCOMING_TS_MS_FIELD);

        // Initialize with metadata columns which must match the fields included in VitessDebeziumSource.
        // Note: ALL fields are nullable after being processed by VitessDebeziumSource
        // so if you want to include a field that is NOT nullable in the schema registry,
        // you must call `setSchemaNullable` on the schema. If it's already nullable, just use `type(field.schema())`
        SchemaBuilder.FieldAssembler<Schema> payloadFieldAssembler = SchemaBuilder.builder()
                .record("formatted_debezium_payload")
                .fields()
                .name(DebeziumConstants.FLATTENED_OP_COL_NAME).type(setSchemaNullable(opField.schema())).withDefault(null)
                .name(DebeziumConstants.FLATTENED_TS_COL_NAME).type(tsField.schema()).withDefault(null);

        // Add data columns to schema
        dataField.schema()
                .getTypes()
                // "after" field is a union with data schema and null schema, so we need to extract only the data schema portion
                .get(dataField.schema().getIndexNamed(registrySchema.getNamespace() + ".Value"))
                .getFields()
                .forEach(field -> {
                    payloadFieldAssembler.name(field.name()).type(setSchemaNullable(field.schema())).withDefault(null);
                });

        return payloadFieldAssembler.endRecord();
    }

    /**
     * @param toBecomeNullable source Schema (non-nullable) that needs to become nullable
     * @return Schema of UNION type, like: [null, toBecomeNullable]
     */
    private Schema setSchemaNullable(Schema toBecomeNullable) {
        return Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), toBecomeNullable));
    }
}

I still think the issue should remain open, since the RFC explicitly calls this approach out as something we don't have to do. But I wanted to share this workaround to confirm that everything else is working besides the schema inference that's called out in the RFC (and help anyone else with a similar issue of course 😄).

codope commented 1 year ago

Tracking https://issues.apache.org/jira/browse/HUDI-5877 to followup on the fix.