apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.32k stars 2.41k forks source link

[SUPPORT] HoodieFlinkWriteClient null pointer for HudiRecord locationdata #11633

Closed harsha27-ops closed 2 weeks ago

harsha27-ops commented 1 month ago

package com.amazon.cosmos.transactionaldatalake.kda.common.sink;

import lombok.extern.slf4j.Slf4j; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.HoodieJavaWriteClient; import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.client.common.HoodieJavaEngineContext; import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.data.HoodieListData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.config.HoodieArchivalConfig; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex.BucketIndexEngineType; import org.apache.hudi.index.HoodieIndex.IndexType; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.table.HoodieFlinkTable;

import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors;

@Slf4j public class HudiSinkFunction extends RichSinkFunction<HoodieRecord> {

public static String TRIP_EXAMPLE_SCHEMA = "{\n" +
        "  \"type\": \"record\",\n" +
        "  \"name\": \"triprec\",\n" +
        "  \"fields\": [\n" +
        "    {\"name\": \"ts\", \"type\": \"long\"},\n" +
        "    {\"name\": \"uuid\", \"type\": \"string\"},\n" +
        "    {\"name\": \"path\", \"type\": \"string\"},\n" +
        "    {\"name\": \"driver\", \"type\": \"string\"},\n" +
        "    {\"name\": \"begin_lat\", \"type\": \"double\"},\n" +
        "    {\"name\": \"begin_lon\", \"type\": \"double\"},\n" +
        "    {\"name\": \"end_lat\", \"type\": \"double\"},\n" +
        "    {\"name\": \"end_lon\", \"type\": \"double\"},\n" +
        "    {\"name\": \"fare\", \"type\": \"double\"}\n" +
        "  ]\n" +
        "}";

private transient HoodieFlinkWriteClient<HoodieAvroPayload> writeClient;
private final String tablePath = "s3a://testkaharshd4/test";
private final String tableName = "kaharshd_test";

@Override
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
    super.open(parameters);

    org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
    hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
    hadoopConf.set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain");
    hadoopConf.set("fs.defaultFS", "s3a://testkaharshd4");
    hadoopConf.set("fs.s3a.endpoint", "s3.amazonaws.com");

    StorageConfiguration<?> storageConf = HadoopFSUtils.getStorageConf(hadoopConf);

    // Initialize the table if not already done
    Path path = new Path(this.tablePath);
    FileSystem fs = HadoopFSUtils.getFs(this.tablePath, storageConf);
    if (!fs.exists(path)) {
        LOG.info("sdjbchdsbchbdhb");
        HoodieTableMetaClient.withPropertyBuilder().setTableType(HoodieTableType.MERGE_ON_READ.name()).setTableName(this.tableName)
                .setPayloadClassName(HoodieAvroPayload.class.getName()).initTable(storageConf, this.tablePath);
    }

    HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder()
            .withPath(this.tablePath)
            .withSchema(TRIP_EXAMPLE_SCHEMA)
            .withParallelism(2, 2)
            .withDeleteParallelism(2)
            .forTable(this.tableName)
            .withIndexConfig(HoodieIndexConfig.newBuilder()
                                     .withIndexType(IndexType.SIMPLE)
                                     .withBucketIndexEngineType(BucketIndexEngineType.SIMPLE)
                                     .build())
            .withArchivalConfig(HoodieArchivalConfig.newBuilder()
                                        .archiveCommitsWith(20, 30)
                                        .build())
            .withCompactionConfig(HoodieCompactionConfig.newBuilder()
                                          .withInlineCompaction(true)
                                          .build())
            .withStorageConfig(HoodieStorageConfig.newBuilder()
                                       .parquetMaxFileSize(120 * 1024 * 1024)
                                       .parquetBlockSize(120 * 1024 * 1024)
                                       .parquetPageSize(1 * 1024 * 1024)
                                       .build())
            .build();
    System.out.println(cfg.getRecordMerger().getRecordType());

    LOG.info("s,ncjds" + cfg.getRecordMerger().getRecordType());

    this.writeClient = new HoodieFlinkWriteClient<>(new HoodieFlinkEngineContext(hadoopConf), cfg);
}

@Override
public void invoke(HoodieRecord<HoodieAvroPayload> value, Context context) throws Exception {
    LOG.info("cjjddncj" + this.writeClient.getConfig().getProps());
    LOG.info("cjjddncj" + FSUtils.getFileExtension(value.getPartitionPath()));
    String newCommitTime = this.writeClient.startCommit();
    List<HoodieRecord<HoodieAvroPayload>> records = new ArrayList<>();
    records.add(value);
    List<HoodieRecord<HoodieAvroPayload>> writeRecords =
            records.stream().map(r -> new HoodieAvroRecord<>(r.getKey(), r.getData())).collect(Collectors.toList());

    HoodieFlinkTable table = this.writeClient.getHoodieTable();
    this.writeClient.getIndex().tagLocation(HoodieListData.eager(writeRecords), this.writeClient.getEngineContext(), table);
    this.writeClient.insert(writeRecords, newCommitTime);
}
@Override
public void close() throws Exception {
    if (this.writeClient != null) {
        this.writeClient.close();
    }
    super.close();
}

}

I am trying to run this code but its failing with

with failure cause: java.lang.NullPointerException\n\tat org.apache.hudi.io.FlinkWriteHandleFactory$BaseCommitWriteHandleFactory.create(FlinkWriteHandleFactory.java:107)\n\tat org.apache.hudi.client.HoodieFlinkWriteClient.getOrCreateWriteHandle(HoodieFlinkWriteClient.java:459)\n\tat org.apache.hudi.client.HoodieFlinkWriteClient.access$000(HoodieFlinkWriteClient.java:76)\n\tat org.apache.hudi.client.HoodieFlinkWriteClient$AutoCloseableWriteHandle.\u003cinit\u003e(HoodieFlinkWriteClient.java:515)\n\tat org.apache.hudi.client.HoodieFlinkWriteClient$AutoCloseableWriteHandle.\u003cinit\u003e(HoodieFlinkWriteClient.java:507)\n\tat org.apache.hudi.client.HoodieFlinkWriteClient.insert(HoodieFlinkWriteClient.java:182)\n\tat com.amazon.cosmos.transactionaldatalake.kda.common.sink.HudiSinkFunction.invoke(HudiSinkFunction.java:101)\n\tat com.amazon.cosmos.transactionaldatalake.kda.common.sink.HudiSinkFunction.invoke(HudiSinkFunction.java:31)\n\tat org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)\n\tat org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)\n\tat org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)\n\tat org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)\n\tat org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)\n\tat org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)\n\tat org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)\n\tat org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)\n\tat org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)\n\tat org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)\n\tat org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)\n\tat org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)\n\tat org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:423)\n\tat org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:528)\n\tat org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collectWithTimestamp(StreamSourceContexts.java:108)\n\tat org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:1028)\n\tat org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.access$000(KinesisDataFetcher.java:113)\n\tat org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$AsyncKinesisRecordEmitter.emit(KinesisDataFetcher.java:315)\n\tat org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:332)\n\tat org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:329)\n\tat org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:1012)\n\tat org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:219)\n\tat org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.lambda$run$0(ShardConsumer.java:126)\n\tat org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:118)\n\tat org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:102)\n\tat org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concu

danny0405 commented 1 month ago

The HoodieFlinkWriteClient does not work as a coordination role like the SparkRddWriteClient, it only works locally per-write task, so the suggestion is to use the HoodiePipeline utility class instead for building the sink pipeline.

ad1happy2go commented 1 month ago

@harsha27-ops Do you need any more help here? You able to make it work using HoodiePipeline?

ad1happy2go commented 2 weeks ago

@harsha27-ops Closing this. Please reopen if you still see this issue.