apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.43k stars 2.43k forks source link

[SUPPORT] Reading data from Kinesis data stream and storing into S3 using Hudi for KDA-Flink #6475

Open awsUser123 opened 2 years ago

awsUser123 commented 2 years ago

Hey guys, I am trying to implement reading from kinesis data streams and storing it into an s3 bucket using Hudi. I was able to add the data into s3 by referring and running the following code- https://github.com/awsalialem/amazon-kinesis-data-analytics-java-examples/blob/master/S3Sink/src/main/java/com/amazonaws/services/kinesisanalytics/S3StreamingSinkJob.java

I wanted to know how I can further store the data into S3 using Hudi connector while reading from kinesis data streams. Is there any modification I can do to the code?

nsivabalan commented 2 years ago

we dont have support in deltastreamer yet. https://issues.apache.org/jira/browse/HUDI-1386 If you are interested to write a source, we would be glad to assist. If not, I guess, you may have to go w/ DFSBased source[example] to ingest to hudi with deltastreamer or use the 2 stage reliable pipeline for S3.

nsivabalan commented 2 years ago

not sure I understand you question. Here is what you can do.

You can read from kinesis using spark structured streaming (eg: https://spark.apache.org/docs/latest/streaming-kinesis-integration.html) And then write to hudi via streaming sink.

Eg:

streamingInput
              .writeStream
              .format("org.apache.hudi")
              .options(hudiOptions)
              .option("checkpointLocation", basePath + "/checkpoint")
              .mode(Append)
              .start()
              .awaitTermination(10000)

Ref link: https://hudi.apache.org/blog/2021/08/23/async-clustering/#spark-structured-streaming

Here is simple script that I use locally to test spark structured streaming sink to hudi which used kafka as the source. You just need to replace it w/ kinesis read stream.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, from_json,to_json,struct}
import org.apache.spark.sql.avro.to_avro
import org.apache.spark.sql.types.{IntegerType, StringType, LongType, StructType}
import java.time.LocalDateTime
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.ProcessingTime;

// Define kafka flow
    val dataStreamReader = spark.
      readStream.
      format("kafka").
      option("kafka.bootstrap.servers", "localhost:9092").
      option("subscribe", "impressions").
      option("startingOffsets", "earliest").
      option("maxOffsetsPerTrigger", 5000).
      option("failOnDataLoss", false)

 val schema = new StructType().
      add("impresssiontime",LongType).
      add("impressionid",StringType).
      add("userid",StringType).
      add("adid",StringType)

 val df = dataStreamReader.load().
 selectExpr(
        "topic as kafka_topic",
        "CAST(partition AS STRING) kafka_partition",
        "cast(timestamp as String) kafka_timestamp",
        "CAST(offset AS STRING) kafka_offset",
        "CAST(key AS STRING) kafka_key",
        "CAST(value AS STRING) kafka_value",
        "current_timestamp() current_time").
        selectExpr(
        "kafka_topic",
        "concat(kafka_partition,'-',kafka_offset) kafka_partition_offset",
        "kafka_offset",
        "kafka_timestamp",
        "kafka_key",
        "kafka_value",
        "substr(current_time,1,10) partition_date").
        select(col("kafka_topic"),col("kafka_partition_offset"),col("kafka_offset"),col("kafka_timestamp"),col("kafka_key"),col("kafka_value"),from_json(col("kafka_value"), schema).as("data"),col("partition_date")).select("kafka_topic","kafka_partition_offset","kafka_offset","kafka_timestamp","kafka_key","kafka_value","data.impresssiontime","data.impressionid", "data.userid","data.adid","partition_date")

val writer = df.
  writeStream.format("org.apache.hudi").
  option(TABLE_TYPE.key, "COPY_ON_WRITE").
            option(PRECOMBINE_FIELD.key, "impresssiontime").
            option(RECORDKEY_FIELD.key, "adid").
            option(PARTITIONPATH_FIELD.key, "userid").
            option(HIVE_SYNC_ENABLED.key, false).
            option(HIVE_STYLE_PARTITIONING.key, true).
            option(FAIL_ON_TIMELINE_ARCHIVING_ENABLE.key, false).
            option(STREAMING_IGNORE_FAILED_BATCH.key, false).
            option(STREAMING_RETRY_CNT.key, 0).
            option("hoodie.table.name", "copy_on_write_table").
          option("hoodie.cleaner.commits.retained","6").
          option("hoodie.keep.min.commits","10").
          option("hoodie.keep.max.commits","15").
          option("hoodie.clustering.plan.strategy.target.file.max.bytes","10485760").
          option("hoodie.clustering.plan.strategy.small.file.limit","200000000").
          option("hoodie.clustering.plan.strategy.sort.columns","kafka_partition_offset").
          option("hoodie.clustering.async.max.commits","2").
          option("hoodie.datasource.clustering.async.enable","true").
          option("hoodie.clustering.async.enabled","true").
          option("hoodie.clustering.updates.strategy","org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy").
            option("checkpointLocation", "/tmp/hudi_streaming_kafka/checkpoint/").
            outputMode(OutputMode.Append());

 writer.trigger(new ProcessingTime(30000)).start("/tmp/hudi_streaming_kafka/COPY_ON_WRITE");

This is just a sample script that I use locally. but should help you write a similar one for kinesis stream.

Feel free to close the issue if you get what you were looking for.

nsivabalan commented 2 years ago

@awsUser123 : oh, you wanna use flink is it. my bad. @danny0405 @yuzhaojing : can you folks chime in here please.

danny0405 commented 1 year ago

There are some documents about the DataStream API: https://hudi.apache.org/docs/flink-quick-start-guide, not sure if you would write the DataStream Job or uses the Flink SQL, I would suggest Flink SQL if you can.

soumilshah1995 commented 1 year ago

Build Real Time Streaming Pipeline with Apache Hudi Kinesis and Flink

Videos

1

Watch the video Guide

Steps

Step 1: Create kinesis Streams

step 2: upload the jar provided in github repo to S3

Download links

https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-hadoop/1.13.0/flink-s3-fs-hadoop-1.13.0.jar

https://repo1.maven.org/maven2/org/apache/hudi/hudi-flink-bundle_2.12/0.10.1/hudi-flink-bundle_2.12-0.10.1.jar

step 3: Head over to Kinesis Data Analytics and create a Notebook and upload the jar files while creating notebook

step 4 : execute sql commands

%flink.conf
execution.checkpointing.interval 5000
%flink.ssql(type=update)

DROP TABLE if exists stock_table;

CREATE TABLE stock_table (
    uuid varchar,
    ticker VARCHAR,
    price DOUBLE,
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
PARTITIONED BY (ticker)
WITH (
    'connector' = 'kinesis',
    'stream' = 'input-streams',
    'aws.region' = 'us-west-2',
    'scan.stream.initpos' = 'LATEST',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601'
);

%flink.ssql(type=update)

DROP TABLE if exists stock_table_hudi;

CREATE TABLE stock_table_hudi(
    uuid varchar  ,
    ticker VARCHAR,
    price DOUBLE,
    event_time TIMESTAMP(3)
)
PARTITIONED BY (ticker)
WITH (
    'connector' = 'hudi',
    'path' = 's3a://XXXXXXXX/tmp/',
    'table.type' = 'MERGE_ON_READ' ,
    'hoodie.embed.timeline.server' = 'false'
);

step 5 run python code to publish data

try:
    import datetime
    import json
    import random
    import boto3
    import os
    import uuid
    import time
    from faker import Faker

    from dotenv import load_dotenv
    load_dotenv(".env")
except Exception as e:
    pass

global faker
faker = Faker()

def getReferrer():
    data = {}
    now = datetime.datetime.now()
    str_now = now.isoformat()
    data['uuid'] = str(uuid.uuid4())
    data['event_time'] = str_now

    data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV'])
    price = random.random() * 100
    data['price'] = round(price, 2)
    return data

while True:
    data = json.dumps(getReferrer())

    global kinesis_client

    kinesis_client = boto3.client('kinesis',
                                  region_name='us-east-1',
                                  aws_access_key_id=os.getenv("DEV_ACCESS_KEY"),
                                  aws_secret_access_key=os.getenv("DEV_SECRET_KEY")
                                  )

    res = kinesis_client.put_record(
        StreamName="stock-streams",
        Data=data,
        PartitionKey="1")
    print(data, " " , res)

step 6: Insert into HUDI

%ssql
INSERT INTO stock_table_hudi SELECT * FROM stock_table;

Enjoy

ad1happy2go commented 1 year ago

@awsUser123 Do you need any other support on this? Were you able to create the pipeline.