apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.33k stars 2.42k forks source link

[SUPPORT] s3 list cost increases exponentially when using COW table #11742

Open ankit0811 opened 1 month ago

ankit0811 commented 1 month ago

We started ingesting data from kafka using spark java and wanted to understand the storage cost associated with creating a table. Interestingly, we see that our s3 listing cost exponentially so wanted to understand if we are missing on something that can help us reduce this cost.

We do have the metadata enabled so the assumption was s3 cost will be reduced but dont see that.

When the spark job starts, we do see that the embedded timeline server starts and errors out post processing of the 1 micro batch. (Not sure if this is causing the listing cost and I believe the embedded timeline server was disabled as per this issue)

Screenshot 2024-08-08 at 08 59 57

Environment Description

Stacktrace

24/08/08 16:07:56 INFO TimelineService: Starting Timeline server on port :45043
24/08/08 16:07:56 INFO EmbeddedTimelineService: Started embedded timeline server at <......>
24/08/08 16:07:56 INFO BaseHoodieClient: Timeline Server already running. Not restarting the service
24/08/08 16:07:56 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[20240808160601237__commit__COMPLETED__20240808160625000]}
24/08/08 16:08:02 INFO BaseHoodieClient: Embedded Timeline Server is disabled. Not starting timeline service
24/08/08 16:08:02 INFO BaseHoodieClient: Embedded Timeline Server is disabled. Not starting timeline service

Hudi config

df.writeStream()
    .format("hudi")
    .option("hoodie.insert.shuffle.parallelism", "2")
    .option("hoodie.upsert.shuffle.parallelism", "2")
    .option("hoodie.delete.shuffle.parallelism", "2")
    .option(EMBEDDED_TIMELINE_SERVER_ENABLE.key(), "true")

    .option(HoodieWriteConfig.TBL_NAME.key(), newTName)
    .option("hoodie.datasource.write.table.type", HoodieTableType.COPY_ON_WRITE.name())
    .option("hoodie.datasource.write.operation", WriteOperationType.INSERT.value())
    .option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "ts_date")
    .option("checkpointLocation", newCheckPoint)

    .option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "col1,col2,col3,col4")

    .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "time_stamp")
    .option(INDEX_TYPE.key(), GLOBAL_BLOOM.name())

     // Clustering + Compaction config
    .option(ASYNC_CLUSTERING_ENABLE.key(), "true")
    .option("hoodie.clustering.plan.strategy.max.bytes.per.group", "524288000")

     // Metadata Config
    .option(ENABLE_METADATA_INDEX_COLUMN_STATS.key(), "true")
    .option(ASYNC_INDEX_ENABLE.key(), "true")

    .option(ASYNC_CLEAN.key(), "true")

    .outputMode(OutputMode.Append());
soumilshah1995 commented 1 month ago

resolved and answered on slack https://apache-hudi.slack.com/archives/C4D716NPQ/p1723225490926329

ankit0811 commented 1 month ago

Thanks for responding @soumilshah1995 The index change did help a bit but we are still seeing a significant cost towards s3 bucket listing api calls

KnightChess commented 1 month ago

@ankit0811 does s3 has more detail log show which type file cost?

KnightChess commented 1 month ago

@ankit0811 like audit log?

ankit0811 commented 1 month ago

@KnightChess unfortunately no.

soumilshah1995 commented 1 month ago

Hi there! It's fantastic to hear that the cost has dropped by 10%. Could you clarify the current cost? Are we looking at something in the hundreds of dollars, or is it less than $100?

We also need a bit more detail to assist you better: Are you running this on AWS Glue or EMR? What version of Apache Hudi are you using? Which version of Spark are you working with? Could you provide the full script, including the code and configurations? What type of data are you working with, and which indexing option are you currently using?

ankit0811 commented 1 month ago

Hey, since this is just a POC, its less than $100. However, the storage cost is half than the listing cost so want to understand if theres an incorrect config that we are using or is this generally expected.

Are you running this on AWS Glue or EMR?

We have our own local clusters and deploy spark using spark-operator (k8s)

What version of Apache Hudi are you using?

0.15.0

Which version of Spark are you working with?

3.4.3

Could you provide the full script, including the code and configurations?

This is the hoodie writer config


df = sparkSession
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", "kafka-host-port")
        .option("subscribe", "topicName")
        .option("auto.offset.reset", "latest")
        .option("failOnDataLoss", "false")
        .load();

df = df
    .selectExpr("CAST(value AS STRING) as value")
    .select(
        col("value"),
        from_json(col("value"), schema).alias("__rows"))
    .select(
        col("__rows.time_stamp").alias("time_stamp"),
        col("..."),
        col("..."),
        col("..."),,
        explode(col("__rows.nestedColumn")).alias("__data")
    )
    .select(
        col("time_stamp"),
        col("...."),
        col("...."),
        col("....");

DataStreamWriter<Row> streamWriter = df.writeStream()
                                           .format("hudi")
                                           .option("hoodie.insert.shuffle.parallelism", "2")
                                           .option("hoodie.upsert.shuffle.parallelism", "2")
                                           .option("hoodie.delete.shuffle.parallelism", "2")
                                           .option(EMBEDDED_TIMELINE_SERVER_ENABLE.key(), "true")
                                           .option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "col1, col2, col3, col4")

                                           .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "time_stamp")

                                           // Clustering + Compaction config
                                           .option(ASYNC_CLUSTERING_ENABLE.key(), "true")
                                           .option("hoodie.clustering.plan.strategy.max.bytes.per.group", "524288000")

                                           // Metadata Config
                                           .option(ENABLE_METADATA_INDEX_COLUMN_STATS.key(), "true")

                                           .option(ASYNC_CLEAN.key(), "true")

                                           .option(HoodieWriteConfig.TBL_NAME.key(), newTName)
                                           .option(HoodieTableConfig.TYPE.key(), HoodieTableType.COPY_ON_WRITE.name())
                                           .option("hoodie.datasource.write.operation", WriteOperationType.INSERT.value())
                                           .option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "ts_date")
                                           .option("checkpointLocation", newCheckPoint)

                                           .outputMode(OutputMode.Append());

streamWriter.trigger(new ProcessingTimeTrigger(120000)).start(newBasePath).awaitTermination();

Have pasted the relevant piece of code only. Let me kn ow if this helps.

What type of data are you working with, and which indexing option are you currently using? the source is a kafka stream where we expect some IOT related data. The volume will be ~ each device sends some data at least twice a minute.

The current volume per day for the above topic is ~11Gb compressed data per day. (FYI this is only dev data and we expect this to be at least 50 times in prod).

We plan to partition this by date. As of now we haven't configured any indexing and are using the default options only (Post your suggestion)

soumilshah1995 commented 1 month ago

To provide you with the best suggestions, we'll need a sample schema or some mock data. This will help us recommend the most suitable indexing options and the optimal approach for achieving the fastest writes (UPSERT).

Whether you require uniqueness across all partitions or within a single partition, we can fine-tune the script to ensure maximum performance for both reads and writes. For instance, if you need fast UPSERTs on a primary key, a record-level index might be ideal. Alternatively, a bucket index can offer faster writes since it avoids index lookups. However, the best option also depends on the query engine you're using. For example, Athena does not support metadata and data skipping.

To give you the best possible guidance, please share more details about your workload, data, and schema. This information will help us recommend the most effective settings and options available.

ankit0811 commented 1 month ago

Sure. Please find the schema (due to security reasons, I am not allowed to share the actual schema but this is the closest representation without giving much details)

The work load is approx 11GB (compressed) of streamed data. This is a consistent stream and do not expect spiky traffic.

Also curious, how will schema dictate the listing cost?


StructType nestedCol1 = new StructType(new StructField[] {
        DataTypes.createStructField("col1", DataTypes.StringType, true),
        DataTypes.createStructField("col2", DataTypes.StringType, true),
        DataTypes.createStructField("col3", DataTypes.StringType, true),
        DataTypes.createStructField("col4", DataTypes.StringType, true),
        DataTypes.createStructField("col5", DataTypes.StringType, true),
        DataTypes.createStructField("col6", DataTypes.StringType, true),
        DataTypes.createStructField("col7", DataTypes.StringType, true),
        DataTypes.createStructField("col8", DataTypes.StringType, true),
        DataTypes.createStructField("nested_metric1", DataTypes.IntegerType, true),
        DataTypes.createStructField("nested_metric2", DataTypes.IntegerType, true)
    });

    StructType schema = new StructType(new StructField[] {
        DataTypes.createStructField("time_stamp", DataTypes.DoubleType, false),
        DataTypes.createStructField("id_part1", DataTypes.StringType, false),
        DataTypes.createStructField("id_part2", DataTypes.StringType, false),
        DataTypes.createStructField("id_part3", DataTypes.StringType, false),
        DataTypes.createStructField("id_part4", DataTypes.StringType, false),
        DataTypes.createStructField("id_part5", DataTypes.StringType, false),
        DataTypes.createStructField("attribute1", DataTypes.StringType, true),
        DataTypes.createStructField("attribute2", DataTypes.StringType, true),
        DataTypes.createStructField("attribute3", DataTypes.StringType, true),
        DataTypes.createStructField("attribute4", DataTypes.StringType, true),
        DataTypes.createStructField("attribute5", DataTypes.BooleanType, true),
        DataTypes.createStructField("metric1", DataTypes.DoubleType, true),
        DataTypes.createStructField("metric2", DataTypes.DoubleType, true),
        DataTypes.createStructField("metric3", DataTypes.DoubleType, true),
        DataTypes.createStructField("nested_col1", DataTypes.createArrayType(nestedCol1), true)
    });

sample data


{
    "id_part1": "4a7545a1-9f0d-4038-a8c0-444b55a579fe",
    "id_part2": "cf7755cb-faaf-4805-986c-69f1cd625754",
    "time_stamp": 1723250086660005600,
    "id_part3": "7fbf2e0d-0567-475e-bc16-cf115f2af435",
    "id_part4": "327da669-a0e9-442b-b423-0d0481688b00",
    "id_part5": "device_type1",
      "attribute1": "device_attribute1",
    "attribute2": "device_attribute2",
    "attribute3": "device_attribute3",
    "attribute4": "device_attribute4",
    "attribute5": false,
    "metric1": 10234.24,
    "metric2": 1.99,
    "metric3": -156.98,
    "etl_out": 1723250087328.89,

    "nested_col1": [
        {
              "col0": "b9e102ac-d00c-410e-b5c1-d2ada5f53933",
              "col1": "74db6f8b-d99e-4483-b82f-9efaebaf17e8",
              "col2": "478c6f35-e2ac-410b-a390-517f308d8a43",
              "col3": "d6bf71d0-ee70-42df-937e-d9f26280a66a",
              "col4": "45c94e8f-3ac0-4cbd-8284-13808a65846c",
              "col5": "653256d6-bccd-49ac-9256-bae53219db3f",
              "col6": "c953409e-9bf0-43fd-af2f-22751cdbeeb6",
              "col7": "6371c7a4-6b97-4925-8f37-5e0dd474a8b7",
            "nested_metric1": "0",
            "nested_metric2": "-190"
        },
        {
              "col0": "98d52ad0-1482-4ed8-be7b-3f6d81eac4de",
              "col1": "d1022ef4-100b-4c7e-96f5-19f60426a73a",
              "col2": "067a5291-7038-492f-9776-5fb87f673f6c",
              "col3": "601ac625-f7ee-487d-b473-535c09103018",
              "col4": "21a07cf2-633e-478f-a218-9e3ce0564f84",
              "col5": "d7b66f65-388b-45a8-b1ca-ff9ad3a90750",
              "col6": "bfc026d2-f5f4-4a61-9015-113e6464f630",
              "col7": "5870169b-0f2f-4989-b42c-d470e3912277",
              "nested_metric1": "-90",
            "nested_metric2": "-190"
        }
    ]
}
soumilshah1995 commented 1 month ago

I'll review the schema and get back to you on this GitHub issue.

What are you planning to use as the primary key? Are you considering a composite key? Also, where is the data being ingested from—are you pulling it from Kafka topics? also are you following medallion architecture ?

Gatsby-Lee commented 1 month ago

Hi @ankit0811 I just happened to see your case. ( I am using 100% CoW on Prod + Spark microbatching )

If I were you, I would create a base line with this. You need a base line stats.

And play with "ProcessingTimeTrigger(120000)".

ankit0811 commented 3 weeks ago

@Gatsby-Lee thanks for the suggestions. I have disabled metadata, clustering and cleaner services atm. Will let it run for a day or so and update with the changes if any. If this doesnt help, will play with the trigger time

@soumilshah1995 the source is a kafka stream and right now since this is a POC, we havent finalized the architecture yet. All we are trying to do is create a hudi table from our raw kafka stream and take it forward from there. We are using composite key as our primary key

ankit0811 commented 3 weeks ago

Update: The listing cost did drop by 50% post disabling metadata, clustering and clean-up.

FYI, this cost is in 10s of $ per day. The storage cost is also in the same range. So wanted to understand if this is a constant cost independent of the storage volume and you see the same kind of cost behavior for your prod workflows? @Gatsby-Lee

Gatsby-Lee commented 3 weeks ago

Hi @ankit0811

that's great news that the cost dropped. ( BTW, you need the cleaner at minimum ) The cost is not constant. As you can imagine, there are many factors.

In short, I always think this way. If ETL does less work, the cost is less.

I don't use clustering since it can add extra complicity. I only use CoW + Hudi optimization ( like lazy listing ). And, this is already fast enough without using the Metadata Table. I've experimented the Metadata Table. And, I think It is NOT always good for everything. If you don't need it, then you don't need to enable it. It can slow down ETL and increase cost.

parisni commented 2 weeks ago

The listing cost did drop by 50% post disabling metadata, clustering and clean-up

Would you mind disabling only cleaning ? This reminds me this issue with same exponentiel listing behavior #6373

ankit0811 commented 2 weeks ago

@parisni Sure will try that as well. As of now have enabled the cleaner + clustering service and been tweaking the trigger time. So far looks like its moving in the right direction so will keep playing with this. Want this to run for some time to get a fair idea abt the cost part.

Will target experimenting further with cleaner service in a few weeks and keep the thread updated

ankit0811 commented 4 days ago

@parisni I did disabled the cleaner but it did not result in any more decrease in list cost. So most likely the metadata service + the triggering frequency was causing this

One thing still not clear to me is wasn't the metadata service's role to avoid such increase and use it for listing instead of going to file system ?

parisni commented 3 days ago

thx, can you confirm the usage pattern on this table is write only ? So the listing are related to writer, and not select queries ?

True the metadata goal is to reduce drastically listing which happens mostly during read, but AFAIK during the write path to identify files to copy on write

Gatsby-Lee commented 3 days ago

@parisni I did disabled the cleaner but it did not result in any more decrease in list cost. So most likely the metadata service + the triggering frequency was causing this

One thing still not clear to me is wasn't the metadata service's role to avoid such increase and use it for listing instead of going to file system ?

You need the cleaner at min. Recently, I completed my experimentation in the Hudi Metadata Table. Before you use the Hudi Metadata Table ( Indexing ), please think twice. It comes with some cost.