apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.45k stars 2.23k forks source link

Iceberg newbie questions #2173

Closed kkishoreyadav closed 3 years ago

kkishoreyadav commented 3 years ago

Hello Community, I am solving the problem of handling late arrived data in one of our systems. Currently, we wait for 8 hours for the late data to arrive before starting processing the current hour data.

We have three stages in our pipeline A -> B -> C where B waits for 8 hours for A's hourly data to complete, C waits for 8 hours for B's data to be complete as well.

A writes Avro data in GCS (in a batch of 1000 files) B and C write Parquet data in GCS

Current sample path for A,B, C looks like gs://data/year=2021/month=1/day=27/hour=0/user_bucket=22/filename.deflate.avro

The Current solution that we are considering is

a) A writes continues to write avro data in GCS  but records the filename + stats in iceberg b) B runs an initial run R1 after A's ontime data is completed without waiting for 8 hours. B queries iceberg for hour H1 data and write ontime data into GCS/iceberg c) Similarly C runs an initial run R1 for consuming B's ontime data for hour H1 and write ontime data into GCS/iceberg d) Consumers of C run initial run R1 for consuming C's ontime data as well e) After 8 hours, B runs another run R2 to process A's late data. B queries iceberg for hour H1 data after R1's runtime and writes the late data into GCS/icebergf) C and its consumers repeat step (e)

Could you please clarify the following questions.

1) Table level question

What is the best practice for creating the iceberg table, do we need to create it for all the days or one table per day? Our one day's worth of data is around 100TB and I am concerned if we run into scalability issues if we keep 1 year worth of data with one iceberg table. So far, we never need to query across multiple days as all of our queries are spanned within the same day, but that might likely change. Currently, we pre-create schema 2 days before the current day and the schema is locked for a day. We create the schema for our data everyday. If we keep one table for all of the days, how hard is it to update schema for every day?

2) Partition level question

If we have one iceberg table per day, when we query an iceberg table for a snapshot, can we also provide a filter/regex on the filepath names like passing hour H and bucket number? If iceberg supports this, then we don't need any partitions on the event hour/bucket number. If partitions on event hour is a requirement for our use case, we have the following challenge a) Currently our data do not contain bucket number and our event hour is not a field in our data, it is a derivation of (if field f1 has non-null value, pick that value otherwise fallback to f2 value). Can the partition spec support out of box for these two cases. If not, is there a workaround other than writing the bucket number and event hour in the data files?

3) GCS related question

Does iceberg support writing data into GCS? Because for the iceberg's atomicity to work according to https://iceberg.apache.org/java-api-quickstart/, GCS should support atomic rename, however from https://cloud.google.com/storage/docs/gsutil/commands/mv GCS renames are not atomic. What are the workarounds to handle atomicity if iceberg doesn't support GCS

4) Confirming delta b/w two snapshots

Does iceberg return different results if I query the table for the same ts at two different timepoints ? Also, does iceberg support delta between two ts/snapshots? If so, are the delta files distributed across tasks in the spark job or it can only be accessible at the driver level?

5) Hadoop table vs hive table From https://iceberg.apache.org/java-api-quickstart/, iceberg supports two catalogs i.e., hive catalog and hadoop catalog ? Can someone explain like I am five about the differences between them? When to use one vs other?

6) Idempotent

Let's say we append 1000 gcs files into the iceberg table from a job but the job was restarted after committing into iceberg. Our jobs do not produce strictly immutable data. During retry, if the job tries to append 998 same files of the previous commit, however it has two new files in the append? What is the behaviour here?

rdblue commented 3 years ago

This was also sent to the dev list: https://lists.apache.org/thread.html/rd15bf1db711b1a31f39d4b98776f29753b544fa3a496111d3460e11e%40%3Cdev.iceberg.apache.org%3E

Let's use the discussion there, since this isn't really a focused issue. Thanks!