apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.48k stars 2.44k forks source link

[SUPPORT] Hudi Offline Compaction in EMR Serverless 6.10 for YouTube Video #8400

Closed soumilshah1995 closed 11 months ago

soumilshah1995 commented 1 year ago

Subject : Need Help on Compaction Offline for MOR tables

Good Afternoon and hope you are fine I would want some assistance for next content I am creating on hudi offline compaction for

MOR tables After searching and reading I would seek some guidance on how to submit offline compaction and if I am missing anything Attaching sample code

Glue Job

"""
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog --conf spark.sql.legacy.pathOptionBehavior.enabled=true

"""
try:
    import sys
    import os
    from pyspark.context import SparkContext
    from pyspark.sql.session import SparkSession
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from awsglue.dynamicframe import DynamicFrame
    from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
    from pyspark.sql.functions import *
    from awsglue.utils import getResolvedOptions
    from pyspark.sql.types import *
    from datetime import datetime, date
    import boto3
    from functools import reduce
    from pyspark.sql import Row

    import uuid
    from faker import Faker
except Exception as e:
    print("Modules are missing : {} ".format(e))

spark = (SparkSession.builder.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
         .config('spark.sql.hive.convertMetastoreParquet', 'false') \
         .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.hudi.catalog.HoodieCatalog') \
         .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
         .config('spark.sql.legacy.pathOptionBehavior.enabled', 'true').getOrCreate())

sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
logger = glueContext.get_logger()

# =================================INSERTING DATA =====================================
global faker
faker = Faker()

class DataGenerator(object):

    @staticmethod
    def get_data():
        return [
            (
                uuid.uuid4().__str__(),
                faker.name(),
                faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
                faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')),
                str(faker.random_int(min=10000, max=150000)),
                str(faker.random_int(min=18, max=60)),
                str(faker.random_int(min=0, max=100000)),
                str(faker.unix_time()),
                faker.email(),
                faker.credit_card_number(card_type='amex'),

            ) for x in range(10000)
        ]

# ============================== Settings =======================================
db_name = "hudidb"
table_name = "employees"
recordkey = 'emp_id'
precombine = "ts"
PARTITION_FIELD = 'state'
path = "s3://delta-streamer-demo-hudi/hudi/"
method = 'upsert'
table_type = "MERGE_ON_READ"
# ====================================================================================

hudi_part_write_config = {
    'className': 'org.apache.hudi',

    'hoodie.table.name': table_name,
    'hoodie.datasource.write.table.type': table_type,
    'hoodie.datasource.write.operation': method,
    'hoodie.datasource.write.recordkey.field': recordkey,
    'hoodie.datasource.write.precombine.field': precombine,
    "hoodie.schema.on.read.enable": "true",
    "hoodie.datasource.write.reconcile.schema": "true",

    'hoodie.datasource.hive_sync.mode': 'hms',
    'hoodie.datasource.hive_sync.enable': 'true',
    'hoodie.datasource.hive_sync.use_jdbc': 'false',
    'hoodie.datasource.hive_sync.support_timestamp': 'false',
    'hoodie.datasource.hive_sync.database': db_name,
    'hoodie.datasource.hive_sync.table': table_name

    , "hoodie.clean.automatic": "false"
    , "hoodie.clean.async": "false"
    , "hoodie.clustering.async.enabled": "false"
    , "hoodie.metadata.enable": "false"
    , "hoodie.metadata.index.async": "false"
    , "hoodie.metadata.index.column.stats.enable": "false"
    , "hoodie.compact.inline": "false"
    , 'hoodie.compact.schedule.inline': 'false'

    , "hoodie.metadata.index.check.timeout.seconds": "60"
    , "hoodie.write.concurrency.mode": "optimistic_concurrency_control"
    , "hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.InProcessLockProvider"

}

for i in range(0, 5):
    data = DataGenerator.get_data()
    columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts", "email", "credit_card"]
    spark_df = spark.createDataFrame(data=data, schema=columns)
    spark_df.write.format("hudi").options(**hudi_part_write_config).mode("append").save(path)
try:
    import json
    import uuid
    import os
    import boto3
    from dotenv import load_dotenv

    load_dotenv(".env")
except Exception as e:
    pass

global AWS_ACCESS_KEY
global AWS_SECRET_KEY
global AWS_REGION_NAME

AWS_ACCESS_KEY = os.getenv("DEV_ACCESS_KEY")
AWS_SECRET_KEY = os.getenv("DEV_SECRET_KEY")
AWS_REGION_NAME = "us-east-1"

client = boto3.client("emr-serverless",
                      aws_access_key_id=AWS_ACCESS_KEY,
                      aws_secret_access_key=AWS_SECRET_KEY,
                      region_name=AWS_REGION_NAME)

def lambda_handler_test_emr(event, context):
    # ------------------Hudi settings ---------------------------------------------
    glue_db = "hudi_db"
    table_name = "invoice"
    path = "s3://delta-streamer-demo-hudi/hudi/"

    # ---------------------------------------------------------------------------------
    #                                       EMR
    # --------------------------------------------------------------------------------
    ApplicationId = os.getenv("ApplicationId")
    ExecutionTime = 600
    ExecutionArn = os.getenv("ExecutionArn")
    JobName = 'delta_streamer_{}'.format(table_name)

    # --------------------------------------------------------------------------------
    spark_submit_parameters = ' --conf spark.jars=/usr/lib/hudi/hudi-utilities-bundle.jar'
    spark_submit_parameters += ' --class org.apache.hudi.utilities.HoodieCompactor'

    arguments = [
        '--spark-memory', '1g',
        '--parallelism', '2',
        "--mode", "schedule",
        "--base-path", path,
        "--table-name", table_name
    ]

    response = client.start_job_run(
        applicationId=ApplicationId,
        clientToken=uuid.uuid4().__str__(),
        executionRoleArn=ExecutionArn,
        jobDriver={
            'sparkSubmit': {
                'entryPoint': "command-runner.jar",
                'entryPointArguments': arguments,
                'sparkSubmitParameters': spark_submit_parameters
            },
        },
        executionTimeoutMinutes=ExecutionTime,
        name=JobName,
    )
    print("response", end="\n")
    print(response)

lambda_handler_test_emr(context=None, event=None)

image

Hudi tables

image

Error

image

looking fwd for your guidance

References https://hudi.apache.org/docs/compaction/ https://github.com/apache/hudi/issues/6903

soumilshah1995 commented 1 year ago

Additionally, I have a question that keeps coming up in the group: How would you disable compaction? I have a sample glue script configuration there; is it correct? Your advice will help me grasp the matter better.

ad1happy2go commented 1 year ago

@soumilshah1995 Did we tried to just use hoodie.table.services.enabled to disable all table services. (archive, clean, compact, cluster) As anyway you want to disable 3 among it in your configs.

Also , I am working on running the compaction job on EMR server less. I will update on that soon.

soumilshah1995 commented 1 year ago

thanks a lot thank you very much

ad1happy2go commented 1 year ago

@soumilshah1995 Here the error specifically says "command-runner" not found. Can you try directly give jar path in 'sparkSubmit': {'entryPoint':

or can you try directly creating server less job first instead of lambda same as (https://github.com/apache/hudi/issues/8412)

soumilshah1995 commented 1 year ago

I'm not sure why, but every time I try to connect, I get the error Let's connect 11:30. Perhaps if I show you my setup, you can tell me if I'm doing something incorrectly.

soumilshah1995 commented 1 year ago

i shall test this again on weekends with New JAR files

soumilshah1995 commented 1 year ago

Hello i tested it looks like i get different error here are steps

Glue Job OK tested

"""
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog --conf spark.sql.legacy.pathOptionBehavior.enabled=true

"""
try:
    import sys
    import os
    from pyspark.context import SparkContext
    from pyspark.sql.session import SparkSession
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from awsglue.dynamicframe import DynamicFrame
    from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
    from pyspark.sql.functions import *
    from awsglue.utils import getResolvedOptions
    from pyspark.sql.types import *
    from datetime import datetime, date
    import boto3
    from functools import reduce
    from pyspark.sql import Row

    import uuid
    from faker import Faker
except Exception as e:
    print("Modules are missing : {} ".format(e))

spark = (SparkSession.builder.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
         .config('spark.sql.hive.convertMetastoreParquet', 'false') \
         .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.hudi.catalog.HoodieCatalog') \
         .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
         .config('spark.sql.legacy.pathOptionBehavior.enabled', 'true').getOrCreate())

sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
logger = glueContext.get_logger()

# =================================INSERTING DATA =====================================
global faker
faker = Faker()

class DataGenerator(object):

    @staticmethod
    def get_data():
        return [
            (
                x,
                faker.name(),
                faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
                faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')),
                str(faker.random_int(min=10000, max=150000)),
                str(faker.random_int(min=18, max=60)),
                str(faker.random_int(min=0, max=100000)),
                str(faker.unix_time()),
                faker.email(),
                faker.credit_card_number(card_type='amex'),

            ) for x in range(10000)
        ]

# ============================== Settings =======================================
db_name = "hudidb"
table_name = "employees"
recordkey = 'emp_id'
precombine = "ts"
PARTITION_FIELD = 'state'
path = "s3://delta-streamer-demo-hudi/hudi/"
method = 'upsert'
table_type = "MERGE_ON_READ"
# ====================================================================================

hudi_part_write_config = {
    'className': 'org.apache.hudi',

    'hoodie.table.name': table_name,
    'hoodie.datasource.write.table.type': table_type,
    'hoodie.datasource.write.operation': method,
    'hoodie.datasource.write.recordkey.field': recordkey,
    'hoodie.datasource.write.precombine.field': precombine,
    "hoodie.schema.on.read.enable": "true",
    "hoodie.datasource.write.reconcile.schema": "true",

    'hoodie.datasource.hive_sync.mode': 'hms',
    'hoodie.datasource.hive_sync.enable': 'true',
    'hoodie.datasource.hive_sync.use_jdbc': 'false',
    'hoodie.datasource.hive_sync.support_timestamp': 'false',
    'hoodie.datasource.hive_sync.database': db_name,
    'hoodie.datasource.hive_sync.table': table_name

    , "hoodie.clean.automatic": "false"
    , "hoodie.clean.async": "false"
    , "hoodie.clustering.async.enabled": "false"
    , "hoodie.metadata.enable": "false"
    , "hoodie.metadata.index.async": "false"
    , "hoodie.metadata.index.column.stats.enable": "false"
    , "hoodie.compact.inline": "false"
    , 'hoodie.compact.schedule.inline': 'true'

    , "hoodie.metadata.index.check.timeout.seconds": "60"
    , "hoodie.write.concurrency.mode": "optimistic_concurrency_control"
    , "hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.InProcessLockProvider"

}

for i in range(0, 5):
    data = DataGenerator.get_data()
    columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts", "email", "credit_card"]
    spark_df = spark.createDataFrame(data=data, schema=columns)
    spark_df.write.format("hudi").options(**hudi_part_write_config).mode("append").save(path)

EMR Serverless Job

try:
    import json
    import uuid
    import os
    import boto3
    from dotenv import load_dotenv

    load_dotenv("../.env")
except Exception as e:
    pass

global AWS_ACCESS_KEY
global AWS_SECRET_KEY
global AWS_REGION_NAME

AWS_ACCESS_KEY = os.getenv("DEV_ACCESS_KEY")
AWS_SECRET_KEY = os.getenv("DEV_SECRET_KEY")
AWS_REGION_NAME = "us-east-1"

client = boto3.client("emr-serverless",
                      aws_access_key_id=AWS_ACCESS_KEY,
                      aws_secret_access_key=AWS_SECRET_KEY,
                      region_name=AWS_REGION_NAME)

def lambda_handler_test_emr(event, context):
    # ============================== Settings =======================================
    db_name = "hudidb"
    table_name = "employees"
    recordkey = 'emp_id'
    precombine = "ts"
    PARTITION_FIELD = 'state'
    path = "s3://delta-streamer-demo-hudi/hudi/"
    method = 'upsert'
    table_type = "MERGE_ON_READ"
    # ====================================================================================
    # ---------------------------------------------------------------------------------
    #                                       EMR
    # --------------------------------------------------------------------------------
    ApplicationId = os.getenv("ApplicationId")
    ExecutionTime = 600
    ExecutionArn = os.getenv("ExecutionArn")
    JobName = 'delta_streamer_compaction_{}'.format(table_name)

    # --------------------------------------------------------------------------------
    spark_submit_parameters = ' --conf spark.serializer=org.apache.spark.serializer.KryoSerializer'
    spark_submit_parameters += ' --class org.apache.hudi.utilities.HoodieCompactor'
    jar_path = "s3://delta-streamer-demo-hudi/jar_test/hudi-utilities-bundle_2.12-0.13.0.jar"
    # schedule | execute | scheduleAndExecute

    arguments = [
        '--spark-memory', '5g',
        '--parallelism', '2',
        "--mode", "scheduleAndExecute",
        "--base-path", path,
        "--table-name", table_name

    ]

    response = client.start_job_run(
        applicationId=ApplicationId,
        clientToken=uuid.uuid4().__str__(),
        executionRoleArn=ExecutionArn,
        jobDriver={
            'sparkSubmit': {
                'entryPoint': jar_path,
                'entryPointArguments': arguments,
                'sparkSubmitParameters': spark_submit_parameters
            },
        },
        executionTimeoutMinutes=ExecutionTime,
        name=JobName,
    )
    print("response", end="\n")
    print(response)

lambda_handler_test_emr(context=None, event=None)

o/p Logs

23/04/20 12:55:33 INFO SparkContext: Running Spark version 3.3.1-amzn-0
23/04/20 12:55:33 INFO ResourceUtils: ==============================================================
23/04/20 12:55:33 INFO ResourceUtils: No custom resources configured for spark.driver.
23/04/20 12:55:33 INFO ResourceUtils: ==============================================================
23/04/20 12:55:33 INFO SparkContext: Submitted application: compactor-employees
23/04/20 12:55:33 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 4, script: , vendor: , memory -> name: memory, amount: 5120, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
23/04/20 12:55:33 INFO ResourceProfile: Limiting resource is cpus at 4 tasks per executor
23/04/20 12:55:33 INFO ResourceProfileManager: Added ResourceProfile id: 0
23/04/20 12:55:33 INFO SecurityManager: Changing view acls to: hadoop
23/04/20 12:55:33 INFO SecurityManager: Changing modify acls to: hadoop
23/04/20 12:55:33 INFO SecurityManager: Changing view acls groups to: 
23/04/20 12:55:33 INFO SecurityManager: Changing modify acls groups to: 
23/04/20 12:55:33 INFO SecurityManager: SecurityManager: authentication enabled; ui acls disabled; users  with view permissions: Set(hadoop); groups with view permissions: Set(); users  with modify permissions: Set(hadoop); groups with modify permissions: Set()
23/04/20 12:55:33 INFO deprecation: mapred.output.compression.codec is deprecated. Instead, use mapreduce.output.fileoutputformat.compress.codec
23/04/20 12:55:33 INFO deprecation: mapred.output.compression.type is deprecated. Instead, use mapreduce.output.fileoutputformat.compress.type
23/04/20 12:55:33 INFO deprecation: mapred.output.compress is deprecated. Instead, use mapreduce.output.fileoutputformat.compress
23/04/20 12:55:33 INFO Utils: Successfully started service 'sparkDriver' on port 33507.
23/04/20 12:55:33 INFO SparkEnv: Registering MapOutputTracker
23/04/20 12:55:33 INFO SparkEnv: Registering BlockManagerMaster
23/04/20 12:55:33 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
23/04/20 12:55:33 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
23/04/20 12:55:33 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
23/04/20 12:55:33 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-78f669b6-62b6-4ec2-9e2b-b2314a1273fd
23/04/20 12:55:33 INFO MemoryStore: MemoryStore started with capacity 7.3 GiB
23/04/20 12:55:33 INFO SparkEnv: Registering OutputCommitCoordinator
23/04/20 12:55:33 INFO SubResultCacheManager: Sub-result caches are disabled.
23/04/20 12:55:33 INFO Utils: Successfully started service 'SparkUI' on port 8090.
23/04/20 12:55:33 INFO SparkContext: Added JAR s3://delta-streamer-demo-hudi/jar_test/hudi-utilities-bundle_2.12-0.13.0.jar at s3://delta-streamer-demo-hudi/jar_test/hudi-utilities-bundle_2.12-0.13.0.jar with timestamp 1681995333013
23/04/20 12:55:34 INFO Utils: Using initial executors = 3, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances
23/04/20 12:55:34 INFO ExecutorContainerAllocator: Set total expected execs to {0=3}
23/04/20 12:55:34 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 43873.
23/04/20 12:55:34 INFO NettyBlockTransferService: Server created on [2600:1f18:7aa7:3005:405:dccd:66cb:96e9]:43873
23/04/20 12:55:34 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
23/04/20 12:55:34 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, [2600:1f18:7aa7:3005:405:dccd:66cb:96e9], 43873, None)
23/04/20 12:55:34 INFO BlockManagerMasterEndpoint: Registering block manager [2600:1f18:7aa7:3005:405:dccd:66cb:96e9]:43873 with 7.3 GiB RAM, BlockManagerId(driver, [2600:1f18:7aa7:3005:405:dccd:66cb:96e9], 43873, None)
23/04/20 12:55:34 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, [2600:1f18:7aa7:3005:405:dccd:66cb:96e9], 43873, None)
23/04/20 12:55:34 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, [2600:1f18:7aa7:3005:405:dccd:66cb:96e9], 43873, None)
23/04/20 12:55:34 INFO ExecutorContainerAllocator: Going to request 3 executors for ResourceProfile Id: 0, target: 3 already provisioned: 0.
23/04/20 12:55:34 INFO DefaultEmrServerlessRMClient: Creating containers with container role SPARK_EXECUTOR and keys: Set(1, 2, 3)
23/04/20 12:55:34 INFO SingleEventLogFileWriter: Logging events to file:/var/log/spark/apps/00f9gpl7uiklu609.inprogress
23/04/20 12:55:34 INFO Utils: Using initial executors = 3, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances
23/04/20 12:55:34 INFO ExecutorAllocationManager: Dynamic allocation is enabled without a shuffle service.
23/04/20 12:55:34 INFO ExecutorContainerAllocator: Set total expected execs to {0=3}
23/04/20 12:55:34 INFO DefaultEmrServerlessRMClient: Containers created with container role SPARK_EXECUTOR. key to container id map: Map(2 -> 9ac3cf5d-cd55-40b5-9c7d-2491d16c9635, 1 -> f6c3cf5d-cd69-2a09-dace-aefa04b3afe6, 3 -> f6c3cf5d-cd5d-e6cb-fc14-9227dfc49d91)
23/04/20 12:55:38 INFO EmrServerlessClusterSchedulerBackend$EmrServerlessDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (2600:1f18:7aa7:3005:2901:ec5e:c073:b610:43210) with ID 3,  ResourceProfileId 0
23/04/20 12:55:38 INFO EmrServerlessClusterSchedulerBackend$EmrServerlessDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (2600:1f18:7aa7:3005:cd0:80eb:6605:593:49078) with ID 2,  ResourceProfileId 0
23/04/20 12:55:38 INFO ExecutorMonitor: New executor 3 has registered (new total is 1)
23/04/20 12:55:38 INFO ExecutorMonitor: New executor 2 has registered (new total is 2)
23/04/20 12:55:39 INFO BlockManagerMasterEndpoint: Registering block manager [2600:1f18:7aa7:3005:2901:ec5e:c073:b610]:38209 with 2.7 GiB RAM, BlockManagerId(3, [2600:1f18:7aa7:3005:2901:ec5e:c073:b610], 38209, None)
23/04/20 12:55:39 INFO BlockManagerMasterEndpoint: Registering block manager [2600:1f18:7aa7:3005:cd0:80eb:6605:593]:40479 with 2.7 GiB RAM, BlockManagerId(2, [2600:1f18:7aa7:3005:cd0:80eb:6605:593], 40479, None)
23/04/20 12:55:39 INFO EmrServerlessClusterSchedulerBackend$EmrServerlessDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (2600:1f18:7aa7:3005:463c:a178:348b:c839:38048) with ID 1,  ResourceProfileId 0
23/04/20 12:55:39 INFO ExecutorMonitor: New executor 1 has registered (new total is 3)
23/04/20 12:55:39 INFO BlockManagerMasterEndpoint: Registering block manager [2600:1f18:7aa7:3005:463c:a178:348b:c839]:45689 with 2.7 GiB RAM, BlockManagerId(1, [2600:1f18:7aa7:3005:463c:a178:348b:c839], 45689, None)
23/04/20 12:55:39 INFO EmrServerlessClusterSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8
23/04/20 12:55:39 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
23/04/20 12:55:39 INFO MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
23/04/20 12:55:39 INFO MetricsSystemImpl: s3a-file-system metrics system started
23/04/20 12:55:39 WARN HoodieWriteConfig: Embedded timeline server is disabled, fallback to use direct marker type for spark
23/04/20 12:55:39 WARN HoodieCompactor: No instant time is provided for scheduling compaction.
23/04/20 12:55:40 WARN HoodieBackedTableMetadata: Metadata table was not found at path s3a://delta-streamer-demo-hudi/hudi/.hoodie/metadata
23/04/20 12:55:40 WARN HoodieBackedTableMetadata: Metadata table was not found at path s3a://delta-streamer-demo-hudi/hudi/.hoodie/metadata
23/04/20 12:55:40 INFO SparkContext: Starting job: collect at HoodieSparkEngineContext.java:137
23/04/20 12:55:40 INFO DAGScheduler: Got job 0 (collect at HoodieSparkEngineContext.java:137) with 1 output partitions
23/04/20 12:55:40 INFO DAGScheduler: Final stage: ResultStage 0 (collect at HoodieSparkEngineContext.java:137)
23/04/20 12:55:40 INFO DAGScheduler: Parents of final stage: List()
23/04/20 12:55:40 INFO DAGScheduler: Missing parents: List()
23/04/20 12:55:40 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at flatMap at HoodieSparkEngineContext.java:137), which has no missing parents
23/04/20 12:55:40 INFO ExecutorContainerAllocator: Set total expected execs to {0=1}
23/04/20 12:55:40 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 107.6 KiB, free 7.3 GiB)
23/04/20 12:55:41 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 39.5 KiB, free 7.3 GiB)
23/04/20 12:55:41 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on [2600:1f18:7aa7:3005:405:dccd:66cb:96e9]:43873 (size: 39.5 KiB, free: 7.3 GiB)
23/04/20 12:55:41 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1570
23/04/20 12:55:41 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at flatMap at HoodieSparkEngineContext.java:137) (first 15 tasks are for partitions Vector(0))
23/04/20 12:55:41 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks resource profile 0
23/04/20 12:55:41 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) ([2600:1f18:7aa7:3005:2901:ec5e:c073:b610], executor 3, partition 0, PROCESS_LOCAL, 4393 bytes) taskResourceAssignments Map()
23/04/20 12:55:42 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on [2600:1f18:7aa7:3005:2901:ec5e:c073:b610]:38209 (size: 39.5 KiB, free: 2.7 GiB)
23/04/20 12:55:43 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1889 ms on [2600:1f18:7aa7:3005:2901:ec5e:c073:b610] (executor 3) (1/1)
23/04/20 12:55:43 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
23/04/20 12:55:43 INFO DAGScheduler: ResultStage 0 (collect at HoodieSparkEngineContext.java:137) finished in 3.089 s
23/04/20 12:55:43 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
23/04/20 12:55:43 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished
23/04/20 12:55:43 INFO DAGScheduler: Job 0 finished: collect at HoodieSparkEngineContext.java:137, took 3.150595 s
23/04/20 12:55:43 INFO ExecutorContainerAllocator: Set total expected execs to {0=0}
23/04/20 12:55:43 INFO SparkContext: Starting job: collect at HoodieSparkEngineContext.java:103
23/04/20 12:55:43 INFO DAGScheduler: Got job 1 (collect at HoodieSparkEngineContext.java:103) with 7 output partitions
23/04/20 12:55:43 INFO DAGScheduler: Final stage: ResultStage 1 (collect at HoodieSparkEngineContext.java:103)
23/04/20 12:55:43 INFO DAGScheduler: Parents of final stage: List()
23/04/20 12:55:43 INFO DAGScheduler: Missing parents: List()
23/04/20 12:55:43 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[3] at map at HoodieSparkEngineContext.java:103), which has no missing parents
23/04/20 12:55:43 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 107.3 KiB, free 7.3 GiB)
23/04/20 12:55:43 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 39.5 KiB, free 7.3 GiB)
23/04/20 12:55:43 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on [2600:1f18:7aa7:3005:405:dccd:66cb:96e9]:43873 (size: 39.5 KiB, free: 7.3 GiB)
23/04/20 12:55:43 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1570
23/04/20 12:55:43 INFO DAGScheduler: Submitting 7 missing tasks from ResultStage 1 (MapPartitionsRDD[3] at map at HoodieSparkEngineContext.java:103) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6))
23/04/20 12:55:43 INFO TaskSchedulerImpl: Adding task set 1.0 with 7 tasks resource profile 0
23/04/20 12:55:43 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1) ([2600:1f18:7aa7:3005:463c:a178:348b:c839], executor 1, partition 0, PROCESS_LOCAL, 4701 bytes) taskResourceAssignments Map()
23/04/20 12:55:43 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 2) ([2600:1f18:7aa7:3005:2901:ec5e:c073:b610], executor 3, partition 1, PROCESS_LOCAL, 4702 bytes) taskResourceAssignments Map()
23/04/20 12:55:43 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 3) ([2600:1f18:7aa7:3005:cd0:80eb:6605:593], executor 2, partition 2, PROCESS_LOCAL, 4702 bytes) taskResourceAssignments Map()
23/04/20 12:55:43 INFO TaskSetManager: Starting task 3.0 in stage 1.0 (TID 4) ([2600:1f18:7aa7:3005:463c:a178:348b:c839], executor 1, partition 3, PROCESS_LOCAL, 4703 bytes) taskResourceAssignments Map()
23/04/20 12:55:43 INFO TaskSetManager: Starting task 4.0 in stage 1.0 (TID 5) ([2600:1f18:7aa7:3005:2901:ec5e:c073:b610], executor 3, partition 4, PROCESS_LOCAL, 4652 bytes) taskResourceAssignments Map()
23/04/20 12:55:43 INFO TaskSetManager: Starting task 5.0 in stage 1.0 (TID 6) ([2600:1f18:7aa7:3005:cd0:80eb:6605:593], executor 2, partition 5, PROCESS_LOCAL, 4701 bytes) taskResourceAssignments Map()
23/04/20 12:55:43 INFO TaskSetManager: Starting task 6.0 in stage 1.0 (TID 7) ([2600:1f18:7aa7:3005:463c:a178:348b:c839], executor 1, partition 6, PROCESS_LOCAL, 4592 bytes) taskResourceAssignments Map()
23/04/20 12:55:43 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on [2600:1f18:7aa7:3005:2901:ec5e:c073:b610]:38209 (size: 39.5 KiB, free: 2.7 GiB)
23/04/20 12:55:43 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 2) in 107 ms on [2600:1f18:7aa7:3005:2901:ec5e:c073:b610] (executor 3) (1/7)
23/04/20 12:55:43 INFO TaskSetManager: Finished task 4.0 in stage 1.0 (TID 5) in 109 ms on [2600:1f18:7aa7:3005:2901:ec5e:c073:b610] (executor 3) (2/7)
23/04/20 12:55:44 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on [2600:1f18:7aa7:3005:cd0:80eb:6605:593]:40479 (size: 39.5 KiB, free: 2.7 GiB)
23/04/20 12:55:44 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on [2600:1f18:7aa7:3005:463c:a178:348b:c839]:45689 (size: 39.5 KiB, free: 2.7 GiB)
23/04/20 12:55:45 INFO TaskSetManager: Finished task 5.0 in stage 1.0 (TID 6) in 1688 ms on [2600:1f18:7aa7:3005:cd0:80eb:6605:593] (executor 2) (3/7)
23/04/20 12:55:45 INFO TaskSetManager: Finished task 2.0 in stage 1.0 (TID 3) in 1692 ms on [2600:1f18:7aa7:3005:cd0:80eb:6605:593] (executor 2) (4/7)
23/04/20 12:55:45 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 2111 ms on [2600:1f18:7aa7:3005:463c:a178:348b:c839] (executor 1) (5/7)
23/04/20 12:55:45 INFO TaskSetManager: Finished task 3.0 in stage 1.0 (TID 4) in 2110 ms on [2600:1f18:7aa7:3005:463c:a178:348b:c839] (executor 1) (6/7)
23/04/20 12:55:46 INFO TaskSetManager: Finished task 6.0 in stage 1.0 (TID 7) in 2486 ms on [2600:1f18:7aa7:3005:463c:a178:348b:c839] (executor 1) (7/7)
23/04/20 12:55:46 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
23/04/20 12:55:46 INFO DAGScheduler: ResultStage 1 (collect at HoodieSparkEngineContext.java:103) finished in 2.514 s
23/04/20 12:55:46 INFO DAGScheduler: Job 1 is finished. Cancelling potential speculative or zombie tasks for this job
23/04/20 12:55:46 INFO TaskSchedulerImpl: Killing all running tasks in stage 1: Stage finished
23/04/20 12:55:46 INFO DAGScheduler: Job 1 finished: collect at HoodieSparkEngineContext.java:103, took 2.524419 s
23/04/20 12:55:46 INFO SparkContext: Starting job: collect at HoodieSparkEngineContext.java:137
23/04/20 12:55:46 INFO DAGScheduler: Got job 2 (collect at HoodieSparkEngineContext.java:137) with 1 output partitions
23/04/20 12:55:46 INFO DAGScheduler: Final stage: ResultStage 2 (collect at HoodieSparkEngineContext.java:137)
23/04/20 12:55:46 INFO DAGScheduler: Parents of final stage: List()
23/04/20 12:55:46 INFO DAGScheduler: Missing parents: List()
23/04/20 12:55:46 INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[5] at flatMap at HoodieSparkEngineContext.java:137), which has no missing parents
23/04/20 12:55:46 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 369.6 KiB, free 7.3 GiB)
23/04/20 12:55:46 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 135.1 KiB, free 7.3 GiB)
23/04/20 12:55:46 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on [2600:1f18:7aa7:3005:405:dccd:66cb:96e9]:43873 (size: 135.1 KiB, free: 7.3 GiB)
23/04/20 12:55:46 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1570
23/04/20 12:55:46 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (MapPartitionsRDD[5] at flatMap at HoodieSparkEngineContext.java:137) (first 15 tasks are for partitions Vector(0))
23/04/20 12:55:46 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks resource profile 0
23/04/20 12:55:46 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 8) ([2600:1f18:7aa7:3005:cd0:80eb:6605:593], executor 2, partition 0, PROCESS_LOCAL, 4332 bytes) taskResourceAssignments Map()
23/04/20 12:55:46 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on [2600:1f18:7aa7:3005:cd0:80eb:6605:593]:40479 (size: 135.1 KiB, free: 2.7 GiB)
23/04/20 12:55:47 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 8) in 650 ms on [2600:1f18:7aa7:3005:cd0:80eb:6605:593] (executor 2) (1/1)
23/04/20 12:55:47 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
23/04/20 12:55:47 INFO DAGScheduler: ResultStage 2 (collect at HoodieSparkEngineContext.java:137) finished in 0.685 s
23/04/20 12:55:47 INFO DAGScheduler: Job 2 is finished. Cancelling potential speculative or zombie tasks for this job
23/04/20 12:55:47 INFO TaskSchedulerImpl: Killing all running tasks in stage 2: Stage finished
23/04/20 12:55:47 INFO DAGScheduler: Job 2 finished: collect at HoodieSparkEngineContext.java:137, took 0.689851 s
23/04/20 12:55:47 WARN BaseHoodieCompactionPlanGenerator: No operations are retrieved for s3a://delta-streamer-demo-hudi/hudi
23/04/20 12:55:47 WARN HoodieCompactor: Couldn't do schedule
23/04/20 12:55:47 INFO SparkUI: Stopped Spark web UI at http://[2600:1f18:7aa7:3005:405:dccd:66cb:96e9]:8090
23/04/20 12:55:47 INFO EmrServerlessClusterSchedulerBackend: Shutting down all executors
23/04/20 12:55:47 INFO EmrServerlessClusterSchedulerBackend$EmrServerlessDriverEndpoint: Asking each executor to shut down
23/04/20 12:55:47 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/04/20 12:55:47 INFO MemoryStore: MemoryStore cleared
23/04/20 12:55:47 INFO BlockManager: BlockManager stopped
23/04/20 12:55:47 INFO BlockManagerMaster: BlockManagerMaster stopped
23/04/20 12:55:47 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/04/20 12:55:47 INFO SparkContext: Successfully stopped SparkContext
23/04/20 12:55:47 INFO ShutdownHookManager: Shutdown hook called
23/04/20 12:55:47 INFO ShutdownHookManager: Deleting directory /tmp/spark-f0bb3006-aabf-4fc4-b3d0-5d9223ed8f68
23/04/20 12:55:47 INFO ShutdownHookManager: Deleting directory /tmp/spark-19f096f4-4b39-4f2d-a97e-71b7e7d43e1d
23/04/20 12:55:47 INFO MetricsSystemImpl: Stopping s3a-file-system metrics system...
23/04/20 12:55:47 INFO MetricsSystemImpl: s3a-file-system metrics system stopped.
23/04/20 12:55:47 INFO MetricsSystemImpl: s3a-file-system metrics system shutdown complete.

image

soumilshah1995 commented 1 year ago

looks like i am missing a setting can you point what am i missing

soumilshah1995 commented 1 year ago

@ad1happy2go

can you help here :D

ad1happy2go commented 1 year ago

@soumilshah1995 I see the job ran fine in the logs. Can you check stderr logs if you see any error?

soumilshah1995 commented 1 year ago

i dont see much i am free to connect with you now if you want

soumilshah1995 commented 1 year ago

as discussed on call let me do some more few test and i will get back to you soon :D

soumilshah1995 commented 1 year ago

GLue Job

try:
    import sys
    import os
    from pyspark.context import SparkContext
    from pyspark.sql.session import SparkSession
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from awsglue.dynamicframe import DynamicFrame
    from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
    from pyspark.sql.functions import *
    from awsglue.utils import getResolvedOptions
    from pyspark.sql.types import *
    from datetime import datetime, date
    import boto3
    from functools import reduce
    from pyspark.sql import Row

    import uuid
    from faker import Faker
except Exception as e:
    print("Modules are missing : {} ".format(e))

spark = (SparkSession.builder.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
         .config('spark.sql.hive.convertMetastoreParquet', 'false') \
         .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.hudi.catalog.HoodieCatalog') \
         .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
         .config('spark.sql.legacy.pathOptionBehavior.enabled', 'true').getOrCreate())

sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
logger = glueContext.get_logger()

# =================================INSERTING DATA =====================================
global faker
faker = Faker()

class DataGenerator(object):

    @staticmethod
    def get_data():
        return [
            (
                x,
                faker.name(),
                faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
                faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')),
                str(faker.random_int(min=10000, max=150000)),
                str(faker.random_int(min=18, max=60)),
                str(faker.random_int(min=0, max=100000)),
                str(faker.unix_time()),
                faker.email(),
                faker.credit_card_number(card_type='amex'),

            ) for x in range(5)
        ]

# ============================== Settings =======================================
db_name = "hudidb"
table_name = "employees"
recordkey = 'emp_id'
precombine = "ts"
PARTITION_FIELD = 'state'
path = "s3://delta-streamer-demo-hudi/hudi/"
method = 'upsert'
table_type = "MERGE_ON_READ"
# ====================================================================================

hudi_part_write_config = {
    'className': 'org.apache.hudi',

    'hoodie.table.name': table_name,
    'hoodie.datasource.write.table.type': table_type,
    'hoodie.datasource.write.operation': method,
    'hoodie.datasource.write.recordkey.field': recordkey,
    'hoodie.datasource.write.precombine.field': precombine,
    "hoodie.schema.on.read.enable": "true",
    "hoodie.datasource.write.reconcile.schema": "true",

    'hoodie.datasource.hive_sync.mode': 'hms',
    'hoodie.datasource.hive_sync.enable': 'true',
    'hoodie.datasource.hive_sync.use_jdbc': 'false',
    'hoodie.datasource.hive_sync.support_timestamp': 'false',
    'hoodie.datasource.hive_sync.database': db_name,
    'hoodie.datasource.hive_sync.table': table_name

    , "hoodie.clean.automatic": "false"
    , "hoodie.clean.async": "false"
    , "hoodie.clustering.async.enabled": "false"
    , "hoodie.metadata.enable": "false"
    , "hoodie.metadata.index.async": "false"
    , "hoodie.metadata.index.column.stats.enable": "false"
    , "hoodie.compact.inline": "false"
    , 'hoodie.compact.schedule.inline': 'true'

    , "hoodie.metadata.index.check.timeout.seconds": "60"
    , "hoodie.write.concurrency.mode": "optimistic_concurrency_control"
    , "hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.InProcessLockProvider"

}

# ====================================================
"""Create Spark Data Frame """
# ====================================================
data = DataGenerator.get_data()

columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
df = spark.createDataFrame(data=data, schema=columns)
df.write.format("hudi").options(**hudi_part_write_config).mode("overwrite").save(path)

# ====================================================
"""APPEND """
# ====================================================

impleDataUpd = [
    (6, "This is APPEND", "Sales", "RJ", 81000, 30, 23000, 827307999),
    (7, "This is APPEND", "Engineering", "RJ", 79000, 53, 15000, 1627694678),
]

columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
usr_up_df = spark.createDataFrame(data=impleDataUpd, schema=columns)
usr_up_df.write.format("hudi").options(**hudi_part_write_config).mode("append").save(path)

# ====================================================
"""UPDATE """
# ====================================================
impleDataUpd = [
    (3, "this is update 1 on data lake", "Sales", "RJ", 81000, 30, 23000, 827307999),
]
columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
usr_up_df = spark.createDataFrame(data=impleDataUpd, schema=columns)
usr_up_df.write.format("hudi").options(**hudi_part_write_config).mode("append").save(path)

impleDataUpd = [
    (3, "this is update 2 on data lake", "Sales", "RJ", 81000, 30, 23000, 827307999),
]
columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
usr_up_df = spark.createDataFrame(data=impleDataUpd, schema=columns)
usr_up_df.write.format("hudi").options(**hudi_part_write_config).mode("append").save(path)

Hudi Data Lake

image

EMR Job

try:
    import json
    import uuid
    import os
    import boto3
    from dotenv import load_dotenv

    load_dotenv("../.env")
except Exception as e:
    pass

global AWS_ACCESS_KEY
global AWS_SECRET_KEY
global AWS_REGION_NAME

AWS_ACCESS_KEY = os.getenv("DEV_ACCESS_KEY")
AWS_SECRET_KEY = os.getenv("DEV_SECRET_KEY")
AWS_REGION_NAME = "us-east-1"

client = boto3.client("emr-serverless",
                      aws_access_key_id=AWS_ACCESS_KEY,
                      aws_secret_access_key=AWS_SECRET_KEY,
                      region_name=AWS_REGION_NAME)

def lambda_handler_test_emr(event, context):
    # ============================== Settings =======================================
    db_name = "hudidb"
    table_name = "employees"
    recordkey = 'emp_id'
    precombine = "ts"
    PARTITION_FIELD = 'state'
    path = "s3://delta-streamer-demo-hudi/hudi/"
    method = 'upsert'
    table_type = "MERGE_ON_READ"
    # ====================================================================================
    # ---------------------------------------------------------------------------------
    #                                       EMR
    # --------------------------------------------------------------------------------
    ApplicationId = os.getenv("ApplicationId")
    ExecutionTime = 600
    ExecutionArn = os.getenv("ExecutionArn")
    JobName = 'delta_streamer_compaction_{}'.format(table_name)

    # --------------------------------------------------------------------------------
    spark_submit_parameters = ' --conf spark.serializer=org.apache.spark.serializer.KryoSerializer'
    spark_submit_parameters += ' --class org.apache.hudi.utilities.HoodieCompactor'
    jar_path = "s3://delta-streamer-demo-hudi/jar_test/hudi-utilities-bundle_2.12-0.13.0.jar"
    # schedule | execute | scheduleAndExecute

    arguments = [
        '--spark-memory', '5g',
        '--parallelism', '2',
        "--mode", "scheduleAndExecute",
        "--base-path", path,
        "--table-name", table_name,
        "--hoodie-conf", "hoodie.datasource.write.recordkey.field={}".format(recordkey),
        "--hoodie-conf", "hoodie.datasource.write.precombine.field={}".format(precombine),
        "--hoodie-conf", "hoodie.metadata.index.async=false",
        "--hoodie-conf", "hoodie.metadata.enable=false"

    ]

    response = client.start_job_run(
        applicationId=ApplicationId,
        clientToken=uuid.uuid4().__str__(),
        executionRoleArn=ExecutionArn,
        jobDriver={
            'sparkSubmit': {
                'entryPoint': jar_path,
                'entryPointArguments': arguments,
                'sparkSubmitParameters': spark_submit_parameters
            },
        },
        executionTimeoutMinutes=ExecutionTime,
        name=JobName,
    )
    print("response", end="\n")
    print(response)

lambda_handler_test_emr(context=None, event=None)

Error

image

Stdrout

23/04/24 17:32:45 INFO SparkContext: Running Spark version 3.3.1-amzn-0
23/04/24 17:32:45 INFO ResourceUtils: ==============================================================
23/04/24 17:32:45 INFO ResourceUtils: No custom resources configured for spark.driver.
23/04/24 17:32:45 INFO ResourceUtils: ==============================================================
23/04/24 17:32:45 INFO SparkContext: Submitted application: compactor-employees
23/04/24 17:32:45 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 4, script: , vendor: , memory -> name: memory, amount: 5120, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
23/04/24 17:32:45 INFO ResourceProfile: Limiting resource is cpus at 4 tasks per executor
23/04/24 17:32:46 INFO ResourceProfileManager: Added ResourceProfile id: 0
23/04/24 17:32:46 INFO SecurityManager: Changing view acls to: hadoop
23/04/24 17:32:46 INFO SecurityManager: Changing modify acls to: hadoop
23/04/24 17:32:46 INFO SecurityManager: Changing view acls groups to: 
23/04/24 17:32:46 INFO SecurityManager: Changing modify acls groups to: 
23/04/24 17:32:46 INFO SecurityManager: SecurityManager: authentication enabled; ui acls disabled; users  with view permissions: Set(hadoop); groups with view permissions: Set(); users  with modify permissions: Set(hadoop); groups with modify permissions: Set()
23/04/24 17:32:46 INFO deprecation: mapred.output.compression.codec is deprecated. Instead, use mapreduce.output.fileoutputformat.compress.codec
23/04/24 17:32:46 INFO deprecation: mapred.output.compression.type is deprecated. Instead, use mapreduce.output.fileoutputformat.compress.type
23/04/24 17:32:46 INFO deprecation: mapred.output.compress is deprecated. Instead, use mapreduce.output.fileoutputformat.compress
23/04/24 17:32:46 INFO Utils: Successfully started service 'sparkDriver' on port 38683.
23/04/24 17:32:46 INFO SparkEnv: Registering MapOutputTracker
23/04/24 17:32:46 INFO SparkEnv: Registering BlockManagerMaster
23/04/24 17:32:46 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
23/04/24 17:32:46 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
23/04/24 17:32:46 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
23/04/24 17:32:46 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-8f446246-8819-4282-bb21-cd9202f28988
23/04/24 17:32:46 INFO MemoryStore: MemoryStore started with capacity 7.3 GiB
23/04/24 17:32:46 INFO SparkEnv: Registering OutputCommitCoordinator
23/04/24 17:32:46 INFO SubResultCacheManager: Sub-result caches are disabled.
23/04/24 17:32:46 INFO Utils: Successfully started service 'SparkUI' on port 8090.
23/04/24 17:32:46 INFO SparkContext: Added JAR s3://delta-streamer-demo-hudi/jar_test/hudi-utilities-bundle_2.12-0.13.0.jar at s3://delta-streamer-demo-hudi/jar_test/hudi-utilities-bundle_2.12-0.13.0.jar with timestamp 1682357565906
23/04/24 17:32:47 INFO Utils: Using initial executors = 3, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances
23/04/24 17:32:47 INFO ExecutorContainerAllocator: Set total expected execs to {0=3}
23/04/24 17:32:47 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 35927.
23/04/24 17:32:47 INFO NettyBlockTransferService: Server created on [2600:1f18:5856:4301:8471:a289:676c:1ff0]:35927
23/04/24 17:32:47 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
23/04/24 17:32:47 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, [2600:1f18:5856:4301:8471:a289:676c:1ff0], 35927, None)
23/04/24 17:32:47 INFO BlockManagerMasterEndpoint: Registering block manager [2600:1f18:5856:4301:8471:a289:676c:1ff0]:35927 with 7.3 GiB RAM, BlockManagerId(driver, [2600:1f18:5856:4301:8471:a289:676c:1ff0], 35927, None)
23/04/24 17:32:47 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, [2600:1f18:5856:4301:8471:a289:676c:1ff0], 35927, None)
23/04/24 17:32:47 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, [2600:1f18:5856:4301:8471:a289:676c:1ff0], 35927, None)
23/04/24 17:32:47 INFO ExecutorContainerAllocator: Going to request 3 executors for ResourceProfile Id: 0, target: 3 already provisioned: 0.
23/04/24 17:32:47 INFO DefaultEmrServerlessRMClient: Creating containers with container role SPARK_EXECUTOR and keys: Set(1, 2, 3)
23/04/24 17:32:47 INFO SingleEventLogFileWriter: Logging events to file:/var/log/spark/apps/00f9k5j6uatf6b09.inprogress
23/04/24 17:32:47 INFO Utils: Using initial executors = 3, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances
23/04/24 17:32:47 INFO ExecutorAllocationManager: Dynamic allocation is enabled without a shuffle service.
23/04/24 17:32:47 INFO ExecutorContainerAllocator: Set total expected execs to {0=3}
23/04/24 17:32:47 INFO DefaultEmrServerlessRMClient: Containers created with container role SPARK_EXECUTOR. key to container id map: Map(2 -> b4c3da29-8d54-5c47-92b5-cb12498c32a8, 1 -> bcc3da29-8d24-1e3f-c6b7-b941a5b6c570, 3 -> fec3da29-8d30-0e57-f3b6-35b5fe897c79)
23/04/24 17:32:51 INFO EmrServerlessClusterSchedulerBackend$EmrServerlessDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (2600:1f18:5856:4301:6434:e690:3a6b:a55e:42664) with ID 1,  ResourceProfileId 0
23/04/24 17:32:51 INFO EmrServerlessClusterSchedulerBackend$EmrServerlessDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (2600:1f18:5856:4301:cddc:9113:28e1:eb26:38970) with ID 3,  ResourceProfileId 0
23/04/24 17:32:51 INFO ExecutorMonitor: New executor 1 has registered (new total is 1)
23/04/24 17:32:51 INFO ExecutorMonitor: New executor 3 has registered (new total is 2)
23/04/24 17:32:51 INFO BlockManagerMasterEndpoint: Registering block manager [2600:1f18:5856:4301:cddc:9113:28e1:eb26]:43825 with 2.7 GiB RAM, BlockManagerId(3, [2600:1f18:5856:4301:cddc:9113:28e1:eb26], 43825, None)
23/04/24 17:32:51 INFO BlockManagerMasterEndpoint: Registering block manager [2600:1f18:5856:4301:6434:e690:3a6b:a55e]:38015 with 2.7 GiB RAM, BlockManagerId(1, [2600:1f18:5856:4301:6434:e690:3a6b:a55e], 38015, None)
23/04/24 17:32:51 INFO EmrServerlessClusterSchedulerBackend$EmrServerlessDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (2600:1f18:5856:4301:54a3:8e48:1700:41c7:43444) with ID 2,  ResourceProfileId 0
23/04/24 17:32:51 INFO ExecutorMonitor: New executor 2 has registered (new total is 3)
23/04/24 17:32:51 INFO EmrServerlessClusterSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8
23/04/24 17:32:51 INFO BlockManagerMasterEndpoint: Registering block manager [2600:1f18:5856:4301:54a3:8e48:1700:41c7]:34311 with 2.7 GiB RAM, BlockManagerId(2, [2600:1f18:5856:4301:54a3:8e48:1700:41c7], 34311, None)
23/04/24 17:32:51 INFO S3NativeFileSystem: Opening 's3://delta-streamer-demo-hudi/hudi/.hoodie/hoodie.properties' for reading
23/04/24 17:32:51 WARN HoodieWriteConfig: Embedded timeline server is disabled, fallback to use direct marker type for spark
23/04/24 17:32:52 WARN HoodieCompactor: No instant time is provided for scheduling compaction.
23/04/24 17:32:52 INFO S3NativeFileSystem: Opening 's3://delta-streamer-demo-hudi/hudi/.hoodie/hoodie.properties' for reading
23/04/24 17:32:52 WARN HoodieCompactor: Couldn't do schedule
23/04/24 17:32:52 INFO SparkUI: Stopped Spark web UI at http://[2600:1f18:5856:4301:8471:a289:676c:1ff0]:8090
23/04/24 17:32:52 INFO EmrServerlessClusterSchedulerBackend: Shutting down all executors
23/04/24 17:32:52 INFO EmrServerlessClusterSchedulerBackend$EmrServerlessDriverEndpoint: Asking each executor to shut down
23/04/24 17:32:52 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/04/24 17:32:52 INFO MemoryStore: MemoryStore cleared
23/04/24 17:32:52 INFO BlockManager: BlockManager stopped
23/04/24 17:32:52 INFO BlockManagerMaster: BlockManagerMaster stopped
23/04/24 17:32:52 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/04/24 17:32:52 INFO SparkContext: Successfully stopped SparkContext
23/04/24 17:32:52 INFO ShutdownHookManager: Shutdown hook called
23/04/24 17:32:52 INFO ShutdownHookManager: Deleting directory /tmp/spark-7d6b3915-7b6d-4bfd-b534-a898b6dd6653
23/04/24 17:32:52 INFO ShutdownHookManager: Deleting directory /tmp/spark-03ad476f-549d-4bc5-b1f4-16b1a343a93c
ad1happy2go commented 1 year ago

@soumilshah1995 Did the compaction succeed on data? can you paste the screenshot of the table directory and .hoodie dir after job finished.

soumilshah1995 commented 1 year ago

No same error what we talked on call yesterday i will paste the screenshot image

ad1happy2go commented 1 year ago

I will try to reproduce this on my end then.

soumilshah1995 commented 1 year ago

Thanks

soumilshah1995 commented 1 year ago

Any updates @ad1happy2go

amareshb commented 1 year ago

@soumilshah1995

hoodie.compact.inline.max.delta.commits
Number of delta commits after the last compaction, before scheduling of a new compaction is attempted.
Default Value: 5 (Optional)
Config Param: INLINE_COMPACT_NUM_DELTA_COMMITS

Try setting this param to 1 or add more commits, so you should be seeing a compaction.requested file in .hoodie dir as long as this prop is set'hoodie.compact.schedule.inline': 'true'

soumilshah1995 commented 1 year ago

let me give a try over weekends

soumilshah1995 commented 1 year ago

Glue job

try:
    import sys
    import os
    from pyspark.context import SparkContext
    from pyspark.sql.session import SparkSession
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from awsglue.dynamicframe import DynamicFrame
    from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
    from pyspark.sql.functions import *
    from awsglue.utils import getResolvedOptions
    from pyspark.sql.types import *
    from datetime import datetime, date
    import boto3
    from functools import reduce
    from pyspark.sql import Row

    import uuid
    from faker import Faker
except Exception as e:
    print("Modules are missing : {} ".format(e))

spark = (SparkSession.builder.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
         .config('spark.sql.hive.convertMetastoreParquet', 'false') \
         .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.hudi.catalog.HoodieCatalog') \
         .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
         .config('spark.sql.legacy.pathOptionBehavior.enabled', 'true').getOrCreate())

sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
logger = glueContext.get_logger()

# =================================INSERTING DATA =====================================
global faker
faker = Faker()

class DataGenerator(object):

    @staticmethod
    def get_data():
        return [
            (
                x,
                faker.name(),
                faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
                faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')),
                str(faker.random_int(min=10000, max=150000)),
                str(faker.random_int(min=18, max=60)),
                str(faker.random_int(min=0, max=100000)),
                str(faker.unix_time()),
                faker.email(),
                faker.credit_card_number(card_type='amex'),

            ) for x in range(5)
        ]

# ============================== Settings =======================================
db_name = "hudidb"
table_name = "employees"
recordkey = 'emp_id'
precombine = "ts"
PARTITION_FIELD = 'state'
path = "s3://soumilshah-hudi-demos/hudi/"
method = 'upsert'
table_type = "MERGE_ON_READ"
# ====================================================================================

hudi_part_write_config = {
    'className': 'org.apache.hudi',

    'hoodie.table.name': table_name,
    'hoodie.datasource.write.table.type': table_type,
    'hoodie.datasource.write.operation': method,
    'hoodie.datasource.write.recordkey.field': recordkey,
    'hoodie.datasource.write.precombine.field': precombine,
    "hoodie.schema.on.read.enable": "true",
    "hoodie.datasource.write.reconcile.schema": "true",

    'hoodie.datasource.hive_sync.mode': 'hms',
    'hoodie.datasource.hive_sync.enable': 'true',
    'hoodie.datasource.hive_sync.use_jdbc': 'false',
    'hoodie.datasource.hive_sync.support_timestamp': 'false',
    'hoodie.datasource.hive_sync.database': db_name,
    'hoodie.datasource.hive_sync.table': table_name

    , "hoodie.compact.inline": "false"
    , 'hoodie.compact.schedule.inline': 'true'
    , "hoodie.metadata.index.check.timeout.seconds": "60"
    , "hoodie.write.concurrency.mode": "optimistic_concurrency_control"
    , "hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.InProcessLockProvider"

}

# ====================================================
"""Create Spark Data Frame """
# ====================================================
data = DataGenerator.get_data()

columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
df = spark.createDataFrame(data=data, schema=columns)
df.write.format("hudi").options(**hudi_part_write_config).mode("overwrite").save(path)

# ====================================================
"""APPEND """
# ====================================================

impleDataUpd = [
    (6, "This is APPEND", "Sales", "RJ", 81000, 30, 23000, 827307999),
    (7, "This is APPEND", "Engineering", "RJ", 79000, 53, 15000, 1627694678),
]

columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
usr_up_df = spark.createDataFrame(data=impleDataUpd, schema=columns)
usr_up_df.write.format("hudi").options(**hudi_part_write_config).mode("append").save(path)

# ====================================================
"""UPDATE """
# ====================================================
impleDataUpd = [
    (3, "this is update 1** on data lake", "Sales", "RJ", 81000, 30, 23000, 827307999),
]
columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
usr_up_df = spark.createDataFrame(data=impleDataUpd, schema=columns)
usr_up_df.write.format("hudi").options(**hudi_part_write_config).mode("append").save(path)

Compactions

try:
    import json
    import uuid
    import os
    import boto3
    from dotenv import load_dotenv

    load_dotenv("../.env")
except Exception as e:
    pass

global AWS_ACCESS_KEY
global AWS_SECRET_KEY
global AWS_REGION_NAME

AWS_ACCESS_KEY = os.getenv("DEV_ACCESS_KEY")
AWS_SECRET_KEY = os.getenv("DEV_SECRET_KEY")
AWS_REGION_NAME = "us-east-1"

client = boto3.client("emr-serverless",
                      aws_access_key_id=AWS_ACCESS_KEY,
                      aws_secret_access_key=AWS_SECRET_KEY,
                      region_name=AWS_REGION_NAME)

def lambda_handler_test_emr(event, context):
    # ============================== Settings =======================================
    table_name = "employees"
    recordkey = 'emp_id'
    precombine = "ts"
    path = "s3://soumilshah-hudi-demos/hudi/"

    # ====================================================================================
    # ---------------------------------------------------------------------------------
    #                                       EMR
    # --------------------------------------------------------------------------------
    ApplicationId = os.getenv("ApplicationId")
    ExecutionTime = 600
    ExecutionArn = os.getenv("ExecutionArn")
    JobName = 'delta_streamer_compaction_{}'.format(table_name)

    # --------------------------------------------------------------------------------
    spark_submit_parameters = ' --conf spark.serializer=org.apache.spark.serializer.KryoSerializer'
    spark_submit_parameters += ' --class org.apache.hudi.utilities.HoodieCompactor'
    jar_path = "s3://delta-streamer-demo-hudi/jar_test/hudi-utilities-bundle_2.12-0.13.0.jar"
    # schedule | execute | scheduleAndExecute

    arguments = [
        '--spark-memory', '5g',
        '--parallelism', '2',
        "--mode", "schedule",
        "--base-path", path,
        "--table-name", table_name,
        "--hoodie-conf", "hoodie.datasource.write.recordkey.field={}".format(recordkey),
        "--hoodie-conf", "hoodie.datasource.write.precombine.field={}".format(precombine),
        "--hoodie-conf", "hoodie.compact.schedule.inline=true",
        "--hoodie-conf", "hoodie.compact.inline.max.delta.commits=1"

    ]

    response = client.start_job_run(
        applicationId=ApplicationId,
        clientToken=uuid.uuid4().__str__(),
        executionRoleArn=ExecutionArn,
        jobDriver={
            'sparkSubmit': {
                'entryPoint': "command-runner.jar",
                'entryPointArguments': arguments,
                'sparkSubmitParameters': spark_submit_parameters
            },
        },
        executionTimeoutMinutes=ExecutionTime,
        name=JobName,
    )
    print("response", end="\n")
    print(response)

lambda_handler_test_emr(context=None, event=None)

Error

image

soumilshah1995 commented 1 year ago

Now trying with Custom JAR https://mvnrepository.com/artifact/org.apache.hudi/hudi-spark3.3-bundle_2.12/0.13.0

lets see if this works

soumilshah1995 commented 1 year ago

after using this jar now i am getting image

i have given full permission as well can you guys help

amareshb commented 1 year ago

Can you try with EMR 6.11 which has hudi 0.13 - that worked for me. EMR 6.10 didn't work me as well.

soumilshah1995 commented 1 year ago

@AmareshB

Sure

@AmareshB

Step 1 : Create EMR 6.11 Cluster

image

Step2 : Create MOR table

try:
    import sys
    import os
    from pyspark.context import SparkContext
    from pyspark.sql.session import SparkSession
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from awsglue.dynamicframe import DynamicFrame
    from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
    from pyspark.sql.functions import *
    from awsglue.utils import getResolvedOptions
    from pyspark.sql.types import *
    from datetime import datetime, date
    import boto3
    from functools import reduce
    from pyspark.sql import Row

    import uuid
    from faker import Faker
except Exception as e:
    print("Modules are missing : {} ".format(e))

spark = (SparkSession.builder.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
         .config('spark.sql.hive.convertMetastoreParquet', 'false') \
         .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.hudi.catalog.HoodieCatalog') \
         .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
         .config('spark.sql.legacy.pathOptionBehavior.enabled', 'true').getOrCreate())

sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
logger = glueContext.get_logger()

# =================================INSERTING DATA =====================================
global faker
faker = Faker()

class DataGenerator(object):

    @staticmethod
    def get_data():
        return [
            (
                x,
                faker.name(),
                faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
                faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')),
                str(faker.random_int(min=10000, max=150000)),
                str(faker.random_int(min=18, max=60)),
                str(faker.random_int(min=0, max=100000)),
                str(faker.unix_time()),
                faker.email(),
                faker.credit_card_number(card_type='amex'),

            ) for x in range(5)
        ]

# ============================== Settings =======================================
db_name = "hudidb"
table_name = "employees"
recordkey = 'emp_id'
precombine = "ts"
PARTITION_FIELD = 'state'
path = "s3://soumilshah-hudi-demos/hudi/"
method = 'upsert'
table_type = "MERGE_ON_READ"
# ====================================================================================

hudi_part_write_config = {
    'className': 'org.apache.hudi',

    'hoodie.table.name': table_name,
    'hoodie.datasource.write.table.type': table_type,
    'hoodie.datasource.write.operation': method,
    'hoodie.datasource.write.recordkey.field': recordkey,
    'hoodie.datasource.write.precombine.field': precombine,
    "hoodie.schema.on.read.enable": "true",
    "hoodie.datasource.write.reconcile.schema": "true",

    'hoodie.datasource.hive_sync.mode': 'hms',
    'hoodie.datasource.hive_sync.enable': 'true',
    'hoodie.datasource.hive_sync.use_jdbc': 'false',
    'hoodie.datasource.hive_sync.support_timestamp': 'false',
    'hoodie.datasource.hive_sync.database': db_name,
    'hoodie.datasource.hive_sync.table': table_name

    , "hoodie.compact.inline": "false"
    , 'hoodie.compact.schedule.inline': 'true'
    , "hoodie.metadata.index.check.timeout.seconds": "60"
    , "hoodie.write.concurrency.mode": "optimistic_concurrency_control"
    , "hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.InProcessLockProvider"

}

# ====================================================
"""Create Spark Data Frame """
# ====================================================
data = DataGenerator.get_data()

columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
df = spark.createDataFrame(data=data, schema=columns)
df.write.format("hudi").options(**hudi_part_write_config).mode("overwrite").save(path)

# ====================================================
"""APPEND """
# ====================================================

impleDataUpd = [
    (6, "This is APPEND", "Sales", "RJ", 81000, 30, 23000, 827307999),
    (7, "This is APPEND", "Engineering", "RJ", 79000, 53, 15000, 1627694678),
]

columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
usr_up_df = spark.createDataFrame(data=impleDataUpd, schema=columns)
usr_up_df.write.format("hudi").options(**hudi_part_write_config).mode("append").save(path)

# ====================================================
"""UPDATE """
# ====================================================
impleDataUpd = [
    (3, "this is update 1** on data lake", "Sales", "RJ", 81000, 30, 23000, 827307999),
]
columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
usr_up_df = spark.createDataFrame(data=impleDataUpd, schema=columns)
usr_up_df.write.format("hudi").options(**hudi_part_write_config).mode("append").save(path)

Step 3: Fire job

"""
https://github.com/apache/hudi/issues/8400
"""
try:
    import json
    import uuid
    import os
    import boto3
    from dotenv import load_dotenv

    load_dotenv("../.env")
except Exception as e:
    pass

global AWS_ACCESS_KEY
global AWS_SECRET_KEY
global AWS_REGION_NAME

AWS_ACCESS_KEY = os.getenv("DEV_ACCESS_KEY")
AWS_SECRET_KEY = os.getenv("DEV_SECRET_KEY")
AWS_REGION_NAME = "us-east-1"

client = boto3.client("emr-serverless",
                      aws_access_key_id=AWS_ACCESS_KEY,
                      aws_secret_access_key=AWS_SECRET_KEY,
                      region_name=AWS_REGION_NAME)

def lambda_handler_test_emr(event, context):
    # ============================== Settings =======================================
    table_name = "employees"
    recordkey = 'emp_id'
    precombine = "ts"
    path = "s3://soumilshah-hudi-demos/hudi/"

    # ====================================================================================
    # ---------------------------------------------------------------------------------
    #                                       EMR
    # --------------------------------------------------------------------------------
    ApplicationId = os.getenv("ApplicationId")
    ExecutionTime = 600
    ExecutionArn = os.getenv("ExecutionArn")
    JobName = 'hudi_compaction_{}'.format(table_name)
    jar_path = "s3://soumilshah-hudi-demos/jar/hudi-spark3.3-bundle_2.12-0.13.0.jar"

    # --------------------------------------------------------------------------------
    spark_submit_parameters = ' --conf spark.serializer=org.apache.spark.serializer.KryoSerializer'
    spark_submit_parameters += ' --class org.apache.hudi.utilities.HoodieCompactor'
    # schedule | execute | scheduleAndExecute

    arguments = [
        '--spark-memory', '5g',
        '--parallelism', '2',
        "--mode", "schedule",
        "--base-path", path,
        "--table-name", table_name,
        "--hoodie-conf", "hoodie.datasource.write.recordkey.field={}".format(recordkey),
        "--hoodie-conf", "hoodie.datasource.write.precombine.field={}".format(precombine),
        "--hoodie-conf", "hoodie.compact.schedule.inline=true",
        "--hoodie-conf", "hoodie.compact.inline.max.delta.commits=1"

    ]

    response = client.start_job_run(
        applicationId=ApplicationId,
        clientToken=uuid.uuid4().__str__(),
        executionRoleArn=ExecutionArn,
        jobDriver={
            'sparkSubmit': {
                'entryPoint': "command-runner.jar",
                'entryPointArguments': arguments,
                'sparkSubmitParameters': spark_submit_parameters
            },
        },
        executionTimeoutMinutes=ExecutionTime,
        name=JobName,
    )
    print("response", end="\n")
    print(response)

lambda_handler_test_emr(context=None, event=None)

image

Now again trying with custom jar

"""
https://github.com/apache/hudi/issues/8400
"""
try:
    import json
    import uuid
    import os
    import boto3
    from dotenv import load_dotenv

    load_dotenv("../.env")
except Exception as e:
    pass

global AWS_ACCESS_KEY
global AWS_SECRET_KEY
global AWS_REGION_NAME

AWS_ACCESS_KEY = os.getenv("DEV_ACCESS_KEY")
AWS_SECRET_KEY = os.getenv("DEV_SECRET_KEY")
AWS_REGION_NAME = "us-east-1"

client = boto3.client("emr-serverless",
                      aws_access_key_id=AWS_ACCESS_KEY,
                      aws_secret_access_key=AWS_SECRET_KEY,
                      region_name=AWS_REGION_NAME)

def lambda_handler_test_emr(event, context):
    # ============================== Settings =======================================
    table_name = "employees"
    recordkey = 'emp_id'
    precombine = "ts"
    path = "s3://soumilshah-hudi-demos/hudi/"

    # ====================================================================================
    # ---------------------------------------------------------------------------------
    #                                       EMR
    # --------------------------------------------------------------------------------
    ApplicationId = os.getenv("ApplicationId")
    ExecutionTime = 600
    ExecutionArn = os.getenv("ExecutionArn")
    JobName = 'hudi_compaction_{}'.format(table_name)
    jar_path = "s3://soumilshah-hudi-demos/jar/hudi-spark3.3-bundle_2.12-0.13.0.jar"

    # --------------------------------------------------------------------------------
    spark_submit_parameters = ' --conf spark.serializer=org.apache.spark.serializer.KryoSerializer'
    spark_submit_parameters += ' --class org.apache.hudi.utilities.HoodieCompactor'
    # schedule | execute | scheduleAndExecute

    arguments = [
        '--spark-memory', '5g',
        '--parallelism', '2',
        "--mode", "schedule",
        "--base-path", path,
        "--table-name", table_name,
        "--hoodie-conf", "hoodie.datasource.write.recordkey.field={}".format(recordkey),
        "--hoodie-conf", "hoodie.datasource.write.precombine.field={}".format(precombine),
        "--hoodie-conf", "hoodie.compact.schedule.inline=true",
        "--hoodie-conf", "hoodie.compact.inline.max.delta.commits=1"

    ]

    # arguments = [
    #     '--spark-memory', '1g',
    #     '--parallelism', '2',
    #     "--mode", "schedule",
    #     "--base-path", path,
    #     "--table-name", table_name
    # ]

    response = client.start_job_run(
        applicationId=ApplicationId,
        clientToken=uuid.uuid4().__str__(),
        executionRoleArn=ExecutionArn,
        jobDriver={
            'sparkSubmit': {
                'entryPoint': jar_path,
                'entryPointArguments': arguments,
                'sparkSubmitParameters': spark_submit_parameters
            },
        },
        executionTimeoutMinutes=ExecutionTime,
        name=JobName,
    )
    print("response", end="\n")
    print(response)

lambda_handler_test_emr(context=None, event=None)
soumilshah1995 commented 1 year ago

Any help would be great :D

dacort commented 1 year ago

Any help would be great :D

@soumilshah1995 Did you ever figure this out? Just came across this and happy to help. I work on the EMR team.

soumilshah1995 commented 4 months ago

posting this for anyone who comes to this post in future I was able to make it work https://github.com/soumilshah1995/Apache-Hudi-Table-Services-Hands-on-labs/blob/main/E4/Submit%20Spark%20Job