apache / iceberg-python

Apache PyIceberg
https://py.iceberg.apache.org/
Apache License 2.0
401 stars 147 forks source link

Support partitioned writes #208

Open Fokko opened 9 months ago

Fokko commented 9 months ago

Feature Request / Improvement

Support partitioned writes

So I think we want to tackle the static overwrite first, and then we can compute the predicate for the dynamic overwrite to support that. We can come up with a separate API. I haven't really thought this trough, and we can still change this. I think the most important steps are the breakdown of the work. There is a lot involved, but luckily we already get the test suite from the full overwrite.

Steps I can see:

Other things on my mind:

The good part:

jqin61 commented 8 months ago

Hi @Fokko and Iceberg community, I and @syun64 are continuing working on testing the write capability in Write support pr. We are excited about it as it will help us with our use case a lot.

Our use case also includes overwriting partitions of tables. So I am highly interested in opportunities to contribute to this issue. Would it be alright for me to start working on this issue, based on the Write support pr if no one else has already begun?

Fokko commented 8 months ago

Hey @jqin61 Thanks for replying here. I'm not aware of the fact that anyone already started on this. It would be great if you can take a stab at it ๐Ÿš€

jqin61 commented 8 months ago

In Iceberg it can be that some files are still on an older partitioning, we should make sure that we handle those correctly based on the that we provide.

It seems Spark's iceberg support has such overwrite behaviors under partition scheme evolution:

As Fokko mentioned, we need to make sure in the implementation we use the latest partition spec_id when overwriting partitions so that the data in the old partition spec is not touched.

jqin61 commented 8 months ago

How are we going to fan out the writing of the data. We have an Arrow table, what is an efficient way to compute the partitions and scale out the work. For example, are we going to sort the table on the partition column and do a full pass through it? Or are we going to compute all the affected partitions, and then scale out?

It just comes to me that when spark writes to iceberg, it requires the input dataframe to be sorted by the partition value otherwise an error will be raised during writing. Do we want to take the same assumption for pyiceberg?

If not, if we have to use arrow.compute.filter() to extract each partition before serialization, it seems a global sort before the filter() on the entire table is unnecessary since the filter makes no assumption of the array organization?

To extract the partitions by filter(), would it be helpful if we firstly build an API in pyarrow which does a full scan of the array and bucket-sorts it into partitions and returns buckets (partitions) as a list of arrow arrays? These arrays could be further passed as input to writing jobs which are executed in a multi-threading way.

Fokko commented 8 months ago

I currently see two approaches:

I'm not sure what is the best. I think the first one works better if you have few partitions, and the latter one is more efficient when you have many partitions.

To extract the partitions by filter(), would it be helpful if we firstly build an API in pyarrow which does a full scan of the array and bucket-sorts it into partitions and returns buckets (partitions) as a list of arrow arrays? These arrays could be further passed as input to writing jobs which are executed in a multi-threading way.

Starting with the API is always a great idea. My only concern is that we make sure that we don't take copies of the data, since that might blow up the memory quite quickly.

Hope this helps!

Fokko commented 8 months ago

@jqin61 I did some more thinking over the weekend, and I think that the approach that you suggested is the most flexible. I forgot about the sort-order that we also want to add at some point. Then we would need to sort twice ๐Ÿ‘Ž

jqin61 commented 8 months ago

Based on the existing discussion, there are 3 major possible directions for detecting partitions and writing each partition in a multi-threaded way to maximize I/O. It seems there isnโ€™t any approach simple enough that we could purely leverage the existing Pyarrow APIs in Pyiceberg. I marshalled Fokko's suggestions and list these approaches for discussion purpose:

Filter Out Partitions As Fokko suggested, we could filter the table to get partitions before writing but we will need an API on Arrow to get unique partition values (e.g. extend compute.unique() from array/scalar to table).

partitions: list[dict] = pyarrow.compute.unique(arrow_table)

With it, we could filter the table to get partitions and provide them as inputs to concurrent jobs.

arrow_table = pa.table({
    'column1': ['A', 'B', 'A', 'C', 'B'],
    'column2': [1, 2, 1, 3, 2],
    'column3': ['X', 'Y', 'X', 'Z', 'Y']
})
partition_keys = table.select(['column1', 'column2'])
# The existing unique does not have support on table, we need to create API on Arrow side.
partitions: list[dict] = pyarrow.compute.unique(partition_keys)
# = [
    {'column1': 'A', 'column2': 1},
    {'column1': 'B', 'column2': 2},
    {'column1': 'C', 'column3': 3}
]

def filter_and_write_table(partition, index):
    # Create a boolean mask for rows that match the criteria
    mask = pc.and_(*(pc.equal(table[col], val) for col, val in partition.items()))

    # Filter the table
    filtered_table = table.filter(mask)

    # Write the filtered table to a Parquet file
    parquet_file = f'output_partition_{index}.parquet'
    pq.write_table(filtered_table, parquet_file)

with ThreadPoolExecutor() as executor:
    for i, partition in enumerate(partitions):
        executor.submit(filter_and_write_table, partition, i)

Sort and Single-direction Writing As Fokko suggested, we could sort the table first. We then slice the table and do a one-direction scan for each slice to write out partitioned files. Suppose we have such an arrow API that takes a sorted table, scans through it, creates a new file whenever encountering a row with a new partition value, and raises an error if it encounters a row with a partition value it already passes, just like how spark writes to an iceberg table.

def write_table_partitions(sorted_table, partition_columns, dir)

Then we could do

partition_columns = ['column1', 'column2']
sorted_table = table.sort_by([('column1', 'ascending'), ('column2', 'ascending')])
directory_path = '/path/to/output/directory'

# Break down the sorted table to slices with zero-copy
slices = slice_table(sorted_table, slice_options)

# Call the API
with ThreadPoolExecutor() as executor:
    # Submit tasks to the executor
    for i, partition in enumerate(partitions):
        executor.submit(write_table_partitions, sorted_table, partition_columns, dir)

Bucketing We could create an arrow API to return the partitioned tables/record batches based on the inputs of a table and alist of partition columns in a way that the algorithm does a full scan of the arrow table in O(table_length) time and bucket-sorts it and creates a table/record batch for each bucket:

table_partitions = pyarrow.compute.partition(arrow_table, partition_columns)

We could write each batch:

def write_table_to_parquet(table, directory_path, file_suffix):
    # Construct the full path for the Parquet file
    file_path = os.path.join(directory_path, f'record_batch_{file_suffix}.parquet')

    # Write the table to a Parquet file
    pq.write_table(table, file_path)

with ThreadPoolExecutor() as executor:
    for i, partition_as_table in enumerate(table_partitions):
        executor.submit(write_table_to_parquet, partition_as_table, directory_path, i)

As Fokko pointed out, the filter method will not be efficient if there are many partitions - the filter takes O(table_length) time and although each thread can filter on its own, on a single node, the execution will be O(table_length * number_of_partitions) for all the jobs. Technically we only need one same scan to get all the buckets.

It seems the sort method is not as efficient compared to the bucketing method because the relative order of partitions does not matter, so a general sort algorithm on the partition column might be overkill (compared with bucketing).

I feel like all 3 directions require some implementation on Arrow itself (I did not find any approach simple enough that we could purely leverage the existing Pyarrow APIs to implement any of the directions). And I want to get opinions on whether pursuing arrow API level utilities smells good. Thank you!

Specifically, for the third direction of bucketing and returning materialized tables/batches, since Arrow has dataset.write_dataset() which supports partition-respected writing, I did some reading to see how it partitions and whether we could leverage anything from it.

https://github.com/apache/arrow/blob/main/cpp/src/arrow/dataset/partition.cc#L118 is where the partition happens. The partition algorithm is a full scan with bucket sort leveraging Grouper class utilities in arrow's compute component. Specifically: 1.Grouper.consume() initiates the groups based on the presenting partition columns https://github.com/apache/arrow/blob/55afcf0450aa2b611e78335bdbfd77e55ae3bc9f/cpp/src/arrow/compute/row/grouper.cc#L422 2.Grouper.MakeGroupings() builds a ListArray where each list represents a partition and each element in the list represents the row_id of the original table. https://github.com/apache/arrow/blob/55afcf0450aa2b611e78335bdbfd77e55ae3bc9f/cpp/src/arrow/compute/row/grouper.cc#L886C45-L886C58 3.Grouper.ApplyGroupings() efficiently converts the grouped representation of row_ids into actual rows. https://github.com/apache/arrow/blob/55afcf0450aa2b611e78335bdbfd77e55ae3bc9f/cpp/src/arrow/compute/row/grouper.cc#L875

Other than being used in the dataset writing, Grouper from Arrow's compute component is used to support other exposed compute APIs such as aggregation functions. At the end of the day, what we want (in order to support Pyiceberg's partitioned write) is an API that returns record batches/tables based on an input table and an input partition scheme, so maybe we could expose such a new API under compute leveraging Grouper.

asheeshgarg commented 8 months ago

@jqin61 just wondering if we can use this directly https://arrow.apache.org/docs/python/generated/pyarrow.dataset.partitioning.html

jqin61 commented 8 months ago

@jqin61 just wondering if we can use this directly https://arrow.apache.org/docs/python/generated/pyarrow.dataset.partitioning.html

Thank you Ashish! I overlooked it, as you mention, we could just use write_dataset() with specified args of partitioning base_nametemplate to write out the partitioned datafiles as iceberg needs.

asheeshgarg commented 8 months ago

@Fokko @jqin61 I am also interested in this to move forward as we also deal with lot of write involves partitions. Happy to collaborate on to this. For write_dataset() we might need to look if we need to add field meta data at parquet in terms of field-id etc while committing data files.

sungwy commented 8 months ago

@Fokko @jqin61 I am also interested in this to move forward as we also deal with lot of write involves partitions. Happy to collaborate on to this. For write_dataset() we might need to look if we need to add field meta data at parquet in terms of field-id etc while committing data files.

Yes - I think we learned this from our earlier attempts: https://github.com/apache/iceberg-python/pull/41/files/1398a2fb01341087a1334482db84a193843a2362#r1427302782

As @jqin61 pointed out in a previous PR, adding these to the schema should output parquet files with the correct field_id.

asheeshgarg commented 8 months ago

@Fokko @jqin61 Today I tried basic example on partition write:

from pyiceberg.io.pyarrow import schema_to_pyarrow
import pyarrow as pa
from pyarrow import parquet as pq
data = {'key': ['001', '001', '002', '002'],
        'value_1': [10, 20, 100, 200],
        'value_2': ['a', 'b', 'a', 'b']}
my_partitioning = pa.dataset.partitioning(pa.schema([pa.field("key", pa.string())]), flavor='hive')
TABLE_SCHEMA = Schema(
    NestedField(field_id=1, name="key", field_type=StringType(), required=False),
    NestedField(field_id=2, name="value_1", field_type=StringType(), required=False),
    NestedField(field_id=3, name="value_2", field_type=StringType(), required=False),
)
schema = schema_to_pyarrow(TABLE_SCHEMA)
patbl = pa.Table.from_pydict(data)
pq.write_to_dataset(patbl,'partitioned_data',partitioning=my_partitioning,schema=schema)

If I don't use schema in write it works fine. But if I pass the schema create

schema = schema_to_pyarrow(TABLE_SCHEMA)

It fails with

ArrowTypeError: Item has schema
key: string
value_1: int64
value_2: string
which does not match expected schema
key: string
  -- field metadata --
  PARQUET:field_id: '1'
value_1: string
  -- field metadata --
  PARQUET:field_id: '2'
value_2: string
  -- field metadata --
  PARQUET:field_id: '3'

I also tried the parquet write the way we are doing currently:

writer = pq.ParquetWriter("test", schema=schema, version="1.0") 
writer.write_table(patbl)
ValueError: Table schema does not match schema used to create file: 
table:
key: string
value_1: int64
value_2: string vs. 
file:
key: string
  -- field metadata --
  PARQUET:field_id: '1'
value_1: string
  -- field metadata --
  PARQUET:field_id: '2'
value_2: string
  -- field metadata --
  PARQUET:field_id: '3

Do we do any other transformation for the schema before we write in current write support.

Fokko commented 8 months ago

Hey @jqin61

Thanks for the elaborate post, and sorry for my slow reply. I did want to take the time to write a good answer.

Probably the following statement needs another map step:

partitions: list[dict] = pyarrow.compute.unique(arrow_table)

The above is true for an identity partition, but often we take truncate the month, day or hour from a field, and use that as a partition. Another example is the bucketing partition where we hash the field, and determine in which bucket it will fall.

With regard of utilizing the Arrow primitives that are already there. I think that's a great idea, we just have to make sure that they are flexible enough for Iceberg. There are a couple of questions that pop into my mind:

@asheeshgarg Thanks for giving it a try. Looking at the schema, there is a discrapency. The test-data that you generate has value_1 as an int64, and the table expects a string. I think the error is correct here.

asheeshgarg commented 8 months ago

@Fokko thanks for pointing out the mismatch it worked. After modifying the datatype it worked.

jqin61 commented 8 months ago

@Fokko Thank you! These 2 points of supporting hidden partitioning and extracting metrics efficiently during writing are very insightful!

For using pyarrow.dataset.write_dataset(), its behavior removes the partition columns in the written-out parquet files. I think this is the deal breaker for using write_dataset(). So either we extend pyarrow.dataset.write_dataset() or fall back to the arrow API direction.

Some findings during chasing a solution of dataset.write_dataset():

  1. pyarrow.dataset.partitioning() only supports static values in the column, so we might need on Iceberg to add a transformed column. (this column will be dropped into the directory when write_dataset() uses hive partitioning)
  2. write_dataset() takes in a customized callable to collect file paths and file metadata when files are written and when we create DataFile object this metadata could be leveraged:
    visited_paths = []
    metadata_list = []
    def file_visitor(written_file):
    visited_paths.append(written_file.path)
    metadata_list.append(written_file.metadata)
sungwy commented 8 months ago

Right, as @jqin61 mentioned, if we only had to support Transformed Partitions, we could have employed some hack to add partition column to the dataset, which gets consumed by write_dataset API when we pass the column in pyarrow.dataset.partitioning.

But we can't apply the same hack with Identity Partitions, where the HIVE partition scheme on the file path shares the same name as the partition column that needs to be persisted into the data file. Arrow does not allow two columns to share the same name, and this hack will lead to an exception on write_dataset.

So it sounds like we might be running out of options in using the existing APIs...

If we are in agreement that we need a new PyArrow API to optimally bucket sort the partitions and produce partitioned pyarrow tables or record batches to pass into WriteTask, do we see any value in introducing a simpler PyIceberg feature in the interim, where write_file can support partitioned tables as long as the provided arrow_table only has a single partition of data?

I think introducing this first would have two upsides:

  1. We decouple the work of supporting writes to partitioned table (like handling partitions in file paths on write, adding partition metadata to manifests) with the work of optimally sorting and bucketing an arrow table into target partitions
  2. If a user really needs to break down their in memory pyarrow table into partitions, they can do so, using existing methods to filter on the partition column and producing a new pyarrow.Table. This isn't optimal, especially if they have many partitions within the in-memory table, and is precisely the reason why @jqin61 is investigating the different options in bucket sorting by partition within Arrow/Arrow Datasets.
sungwy commented 8 months ago

Maybe another approach we could take if we want to use existing PyArrow functions is:

  1. table.sort_by (all partitions)
  2. figure out the row index for each permutation of partition groups by taking another pass through the table
  3. Use table.slice(index, length) with indexes we generated above to write out the tables using List[WriteTask] in write_file

If there was an existing PyArrow API that gave us the outcome of (1) + (2) in one pass, it would have been the most optimal, but it seems like there isn't... so I think taking just one more pass to find the indices is maybe not the worst idea.

We could also argue that (1) should be a requirement that we check on the provided PyArrow table, rather than running the sort within the PyIceberg API.

Please let me know your thoughts!

asheeshgarg commented 8 months ago

@jqin61 I have also seen this behavior pyarrow.dataset.write_dataset(), its behavior removes the partition columns in the written-out parquet files. @syun64 above approach look reasonable to me.

It would have been ideal if the partition write we can be done directly using arrow dataset API and use meta data based hidden partitioning using Pyiceberg API. But we need to do good amount of lift in order to that. Haven't seen support for bucket partitioning.

I think we can add write directly using the Pyarrow API as suggested above.

asheeshgarg commented 8 months ago

@Fokko @syun64 @syun64 another option I can think is use polars to do it simple example below with hashing and partitioning sorting in partition. Where all the partition is handle by rust layer in Polars and we write parquet based on arrow table returned. Not sure if we want to add it as dependency? We can do custom transforms like hours etc we have in iceberg as well easily.

import pyarrow as pa import pyarrow.compute as pc import polars as pl t = pa.table({'strings':["A", "A", "B", "A"],'ints':[2, 1, 3, 4]}) df=pl.from_arrow(t)

N = 2 tables=(df.with_columns([ (pl.col("strings").hash() % N).alias("partition_id") ]).partition_by("partition_id"))

for tbl in tables: print(tbl.to_arrow().sort_by("ints"))

sungwy commented 8 months ago

@jqin61 and I discussed this offline, and just wanted to follow up on possible options for step (2). If we wanted to use existing PyArrow functions, I think we could use a 2 pass algorithm to figure out the row index of each permutation of partition groups on a partition-sorted pyarrow table:

import pyarrow as pa
import pyarrow.dataset
import pyarrow.compute

# pyarrow table, which is already sorted by partitions 'year' and 'animals'
pylist = [{'year': 2021, 'animals': 'Bear'}, {'year': 2021, 'animals': 'Bear'}, {'year': 2021, 'animals': 'Centipede'}, {'year': 2022, 'animals': 'Cat'}, {'year': 2022, 'animals': 'Flamingo'},{'year': 2023, 'animals': 'Dog'}]
table = pa.Table.from_pylist(pylist)

# assert that the table is sorted by checking sort_indices are in order
pa.compute.sort_indices(table, sort_keys=[('year', "ascending"), ("animals", "ascending")])
<pyarrow.lib.UInt64Array object at 0x7fe9b0f4c340>
[
  0,
  1,
  2,
  3,
  4,
  5
]

# then sort the same list of partitions in opposite order, and check the indices to figure out the offsets and lengths of each partition group. If a larger number comes before a smaller index, that's the starting offset of the partition group. the number of consecutive number of indices that are in correct ascending order is the length of that partition group.
pa.compute.sort_indices(table, sort_keys=[('year', "descending"), ("animals", "descending")])
<pyarrow.lib.UInt64Array object at 0x7fe9b0f3ff40>
[
  5,
  4,
  3,
  2,
  0,
  1
]

# from above, we get the following partition group:
partition_slices = [(0, 2), (1, 1), (2, 1), (3, 1), (4, 1), (5, 1)]

for offset, length in partition_slices:
    write_file(iceberg_table, iter([WriteTask(write_uuid, next(counter), table.slice(offset, length))]))

Then, how do we handle transformed partitions? I think going back to the previous idea, we could create intermediate helper columns to generate the transformed partition values in order to use them for sorting. We can keep track of these columns and ensure that we drop the column after we use the above algorithm to split up the table by partition slices.

Regardless of whether we choose to support sorting within the PyIceberg write API or have it as a requirement, maybe we can create a helper function that takes the PartitionSpec of the iceberg table and the pyarrow table and makes sure that the table is sortedByPartition by using the above method.

sungwy commented 8 months ago

The Design Document on data file writes that was discussed during the monthly sync. The document summarizes all of the approaches discussed above

jqin61 commented 8 months ago

I have an incoming PR with working code samples that conform to the design above and cover identity transform + append as the first step of supporting partitioned write. During implementation, I find if the partition column has nulls the code will break. This issue is the same with the existing write where append() or overwrite() would break for any arrow table with a column consisting only of nulls. So I opened Issue#348 to track separately.

jqin61 commented 7 months ago

Opened draft PR with working code samples (it supports partitioned append with identity transform for now): https://github.com/apache/iceberg-python/pull/353

jqin61 commented 5 months ago

Updates for monthly sync:

  1. Working on dynamic overwrite which gets unblocked by partial deletes https://github.com/apache/iceberg-python/pull/569
  2. For transforms functions, we could convert the arrow column to a Python list and feed that to the transform function to generate transformed pyarrow columns for grouping partitions using existing algorithm. But there is efficiency concerns since the transform function can only take Python data types and we have to convert between arrow, python and back to arrow. Also, the types in arrow and iceberg are quite different and sometimes we need to call some utility functions. For example, timestamp is converted to datetime in Python, and we have to call an existing utility function to convert it to micros(int) before feeding it into transform functions. Another option is to create an Arrow UDF for the partition transforms which might parallelize better.
sungwy commented 5 months ago

Idea from @Fokko - support day/month/year transforms first

jaychia commented 5 months ago

Idea from @Fokko - support day/month/year transforms first

You can also try using the transforms that Daft has already implemented. Full list of transforms:

There were a lot of intricacies in these transforms that we had to make sure to get exactly right, so as to be compatible with the existing Java implementations. Especially wrt hashing.

Should be zero-copy conversions between arrow and Daft as well (cheap!):

import pyarrow as pa
from daft import Series

pyarrow_array = pa.array(list(range(10000)))

# Should be very cheap! Under the hood just uses the same arrow buffers
daft_series = Series.from_arrow(pyarrow_array)

print(daft_series)
โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ
โ”‚ arrow_series โ”‚
โ”‚ ---          โ”‚
โ”‚ Int64        โ”‚
โ•žโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•ก
โ”‚ 0            โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ 1            โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ 2            โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ 3            โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ 4            โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ โ€ฆ            โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ 9995         โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ 9996         โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ 9997         โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ 9998         โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ 9999         โ”‚
โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ

partitioned = daft_series.partitioning.iceberg_bucket(32)
print(partitioned)

โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ
โ”‚ arrow_series_bucket โ”‚
โ”‚ ---                 โ”‚
โ”‚ Int32               โ”‚
โ•žโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•ก
โ”‚ 28                  โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ 4                   โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ 20                  โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ 19                  โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ 6                   โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ โ€ฆ                   โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ 10                  โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ 28                  โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ 7                   โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ 13                  โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ 28                  โ”‚
โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ

# Convert back to arrow
partitioned_arrow_arr = partitioned.to_arrow()
deepika094 commented 3 months ago

hi, do we have any way to write to partitioned table so far?

ppasquet commented 3 months ago

Curious as well as where you guys are standing on partitioned write.

Fokko commented 3 months ago

Hey everyone, the support for partioned writes are coming along pretty nicely. We miss some of the transforms, such as the bucket transform. Most of the stuff is on the main branch waiting for a release.

If you want to check it out, you can also install it from source: https://py.iceberg.apache.org/contributing/

mike-luabase commented 3 months ago

@Fokko I installed from source, but I'm hitting this error.

Does anyone have a minimal example of writing to a partitioned table?

mike-luabase commented 3 months ago

Here's what I've been trying (sorry for long example, but thought the context would help)

iowa_sales_df = pcsv.read_csv("/Users/mritchie712/blackbird/demoData/Iowa_Liquor_Sales_20240607.csv")
date_column = pc.strptime(iowa_sales_df['date'], format='%m/%d/%Y', unit='us')

# Replace the old 'date' column with the new date column
iowa_sales_df = iowa_sales_df.set_column(iowa_sales_df.schema.get_field_index('date'), 'date', date_column)

iceberg_schema = Schema(
    NestedField(1, "invoice_item_number", StringType(), required=True),
    NestedField(2, "date", TimestampType(), required=True),
    NestedField(3, "store_number", LongType(), required=True),
    NestedField(4, "store_name", StringType(), required=True),
    NestedField(5, "address", StringType(), required=True),
    NestedField(6, "city", StringType(), required=True),
    NestedField(7, "zip_code", StringType(), required=True),
    NestedField(8, "store_location", StringType(), required=True),
    NestedField(9, "county_number", LongType(), required=True),
    NestedField(10, "county", StringType(), required=True),
    NestedField(11, "category", LongType(), required=True),
    NestedField(12, "category_name", StringType(), required=True),
    NestedField(13, "vendor_number", LongType(), required=True),
    NestedField(14, "vendor_name", StringType(), required=True),
    NestedField(15, "item_number", StringType(), required=True),
    NestedField(16, "item_description", StringType(), required=True),
    NestedField(17, "pack", LongType(), required=True),
    NestedField(18, "bottle_volume_ml", LongType(), required=True),
    NestedField(19, "state_bottle_cost", DoubleType(), required=True),
    NestedField(20, "state_bottle_retail", DoubleType(), required=True),
    NestedField(21, "bottles_sold", LongType(), required=True),
    NestedField(22, "sale_dollars", DoubleType(), required=True),
    NestedField(23, "volume_sold_liters", DoubleType(), required=True),
    NestedField(24, "volume_sold_gallons", DoubleType(), required=True)
)

from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import DayTransform
PARTITION_FIELD_ID_START = 1000

source_id = iowa_sales_df.schema.get_field_index('date')

partition_spec = PartitionSpec(
    PartitionField(
        source_id=source_id,  # ID of the "date" field in the schema (0-based index)
        field_id=PARTITION_FIELD_ID_START,  # Unique ID for the partition field, starting from 1000
        transform=DayTransform(),
        name="date_day"
    )
)

table = catalog.create_table(
    "default.iowa_liquor_sales",
    schema=iceberg_schema,
    partition_spec=partition_spec,
)

table.append(iowa_sales_df)
len(table.scan().to_arrow())
{
    "name": "ValueError",
    "message": "Cannot write to partitioned tables",
    "stack": "---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In[17], line 1
----> 1 table.overwrite(iowa_sales_df)
      2 # table.append(iowa_sales_df)
      3 len(table.scan().to_arrow())

File ~/blackbird/notebooks/.venv/lib/python3.12/site-packages/pyiceberg/table/__init__.py:1094, in Table.overwrite(self, df, overwrite_filter)
   1091     raise NotImplementedError(\"Cannot overwrite a subset of a table\")
   1093 if len(self.spec().fields) > 0:
-> 1094     raise ValueError(\"Cannot write to partitioned tables\")
   1096 from pyiceberg.io.pyarrow import schema_to_pyarrow
   1098 _check_schema_compatible(self.schema(), other_schema=df.schema)

ValueError: Cannot write to partitioned tables"
}
mike-luabase commented 3 months ago

update: I uninstalled pyiceberg and installed with:

pip install "git+https://github.com/apache/iceberg-python.git#egg=pyiceberg[s3fs,gcsfs,hive,sql-sqlite,duckdb,pandas]"

It's working now!

Thanks @syun64 !

Pilipets commented 3 months ago

The PR says "Support partitioned writes", but on version 0.6.1 it still fails for me with ValueError: Cannot write to partitioned tables"

    schema = Schema(
        NestedField(1, "city", StringType(), required=False),
        NestedField(2, "lat", DoubleType(), required=False),
        NestedField(3, "long", DoubleType(), required=False),
    )
    partition_spec = PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="id"), spec_id=1)

    table2 = catalog.create_table(
        "whatever.table_partition_spec",
        schema=schema,
        partition_spec = partition_spec
    )

    table2.append(df1)
Fokko commented 3 months ago

@Pilipets that's correct. The partitioned writes are not yet released. You can use it by installing directly from Github as @mike-luabase suggested above, or wait for 0.7.0 to be released, which will happen soon.

Pilipets commented 3 months ago

@Fokko , also note that pyiceberg doesn't write partition specification in the manifest files as spark-sql does, which results in parsing errors on the attempt to read Manifest files with Java SDK.

I don't know if it's bug or not but shared with you as a part of the iceberg community.

java.lang.IllegalArgumentException: Cannot parse partition spec fields, not an array: {"spec-id":0,"fields":[]}
    at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:220)
    at org.apache.iceberg.PartitionSpecParser.buildFromJsonFields(PartitionSpecParser.java:123)
...
    at org.apache.iceberg.ManifestFiles.read(ManifestFiles.java:111)
...
Cannot parse partition spec fields, not an array: {"spec-id":0,"fields":[]}

In particular, pyiceberg doesn't write "partition-spec". There is a workaround for this - request table metadata and use table specs, though. Error above is for non-partitioned table created with pyiceberg.

Fokko commented 3 months ago

@Pilipets That looks like a bug, let me check if I can reproduce this. Thanks for sharing!

Pilipets commented 3 months ago

@Fokko , spark-sql can still read it, so maybe it's expected - i don't know (

In order to read it with Java SDK, i need to provide table.specs() to the ManifestFiles.read, which is actually empty {0=[]}. Without it, it fails to parse with the error I shared above. For the same table created with no partitioning from spark-sql, I don't need to specify table.specs() because of additional fields generated in manifest files. Read with Java.

Snapshot snapshot = table.currentSnapshot();
FileIO fileIO = table.io();
for (ManifestFile manifest : snapshot.dataManifests(fileIO))
{
    ManifestReader<DataFile> reader = ManifestFiles.read(manifest, fileIO, table.specs());

What happens in Java SDK

protected ManifestReader(
    InputFile file,
    int specId,
    Map<Integer, PartitionSpec> specsById,
    InheritableMetadata inheritableMetadata,
    FileType content) {
  this.file = file;
  this.inheritableMetadata = inheritableMetadata;
  this.content = content;

  if (specsById != null) {
    this.spec = specsById.get(specId);
  } else {
    this.spec = readPartitionSpec(file); // THIS FAILS
  }

For the repro, create table with no partitioning as below.

def catalog_sqlite(catalog_name: str, warehouse_path: str):
    props = {
        "uri": f"sqlite:///{warehouse_path}/sql-catalog2.db",
        "warehouse": f"file:{warehouse_path}",
    }
    catalog = SqlCatalog(catalog_name, **props)
    catalog.destroy_tables()
    catalog.create_tables()
    return catalog

catalog = catalog_sqlite("local", "/tmp/warehouse")

df = pa.Table.from_pylist(
    [
        {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029},
        {"city": "San Francisco", "lat": 37.773972, "long": -122.431297},
        {"city": "Drachten", "lat": 53.11254, "long": 6.0989},
        {"city": "Paris", "lat": 48.864716, "long": 2.349014},
    ],
)

catalog.create_namespace("whatever")
table = catalog.create_table(
    "whatever.my_table",
    schema=df.schema,
)

table.append(df)
Fokko commented 3 months ago

Thanks again for the code! The json that's being generated through the SQLCatalog/SQLite is:

{
    "location": "file:/tmp/whatever.db/my_table",
    "table-uuid": "d394d593-ed82-4c3b-8c58-ad3405965407",
    "last-updated-ms": 1718918248213,
    "last-column-id": 3,
    "schemas": [
        {
            "type": "struct",
            "fields": [
                {
                    "id": 1,
                    "name": "city",
                    "type": "string",
                    "required": false
                },
                {
                    "id": 2,
                    "name": "lat",
                    "type": "double",
                    "required": false
                },
                {
                    "id": 3,
                    "name": "long",
                    "type": "double",
                    "required": false
                }
            ],
            "schema-id": 0,
            "identifier-field-ids": []
        }
    ],
    "current-schema-id": 0,
    "partition-specs": [
        {
            "spec-id": 0,
            "fields": []
        }
    ],
    "default-spec-id": 0,
    "last-partition-id": 1000,
    "properties": {},
    "current-snapshot-id": 2465573600625458700,
    "snapshots": [
        {
            "snapshot-id": 2465573600625458700,
            "sequence-number": 1,
            "timestamp-ms": 1718918248213,
            "manifest-list": "file:/tmp/whatever.db/my_table/metadata/snap-2465573600625458552-0-c5284c9e-77bc-4a94-b412-1a3a9db4100d.avro",
            "summary": {
                "operation": "append",
                "added-files-size": "1656",
                "added-data-files": "1",
                "added-records": "4",
                "total-data-files": "1",
                "total-delete-files": "0",
                "total-records": "4",
                "total-files-size": "1656",
                "total-position-deletes": "0",
                "total-equality-deletes": "0"
            },
            "schema-id": 0
        }
    ],
    "snapshot-log": [
        {
            "snapshot-id": 2465573600625458700,
            "timestamp-ms": 1718918248213
        }
    ],
    "metadata-log": [],
    "sort-orders": [
        {
            "order-id": 0,
            "fields": []
        }
    ],
    "default-sort-order-id": 0,
    "refs": {
        "main": {
            "snapshot-id": 2465573600625458700,
            "type": "branch"
        }
    },
    "format-version": 2,
    "last-sequence-number": 1
}

With SQLCatalog:

{
  "format-version" : 2,
  "table-uuid" : "70ed598e-01e2-48d2-98c4-33396b3b4bc4",
  "location" : "s3://warehouse/nyc/taxis",
  "last-sequence-number" : 0,
  "last-updated-ms" : 1718918405513,
  "last-column-id" : 19,
  "current-schema-id" : 0,
  "schemas" : [ {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "VendorID",
      "required" : false,
      "type" : "long"
    }, {
      "id" : 2,
      "name" : "tpep_pickup_datetime",
      "required" : false,
      "type" : "timestamptz"
    }, {
      "id" : 3,
      "name" : "tpep_dropoff_datetime",
      "required" : false,
      "type" : "timestamptz"
    }, {
      "id" : 4,
      "name" : "passenger_count",
      "required" : false,
      "type" : "double"
    }, {
      "id" : 5,
      "name" : "trip_distance",
      "required" : false,
      "type" : "double"
    }, {
      "id" : 6,
      "name" : "RatecodeID",
      "required" : false,
      "type" : "double"
    }, {
      "id" : 7,
      "name" : "store_and_fwd_flag",
      "required" : false,
      "type" : "string"
    }, {
      "id" : 8,
      "name" : "PULocationID",
      "required" : false,
      "type" : "long"
    }, {
      "id" : 9,
      "name" : "DOLocationID",
      "required" : false,
      "type" : "long"
    }, {
      "id" : 10,
      "name" : "payment_type",
      "required" : false,
      "type" : "long"
    }, {
      "id" : 11,
      "name" : "fare_amount",
      "required" : false,
      "type" : "double"
    }, {
      "id" : 12,
      "name" : "extra",
      "required" : false,
      "type" : "double"
    }, {
      "id" : 13,
      "name" : "mta_tax",
      "required" : false,
      "type" : "double"
    }, {
      "id" : 14,
      "name" : "tip_amount",
      "required" : false,
      "type" : "double"
    }, {
      "id" : 15,
      "name" : "tolls_amount",
      "required" : false,
      "type" : "double"
    }, {
      "id" : 16,
      "name" : "improvement_surcharge",
      "required" : false,
      "type" : "double"
    }, {
      "id" : 17,
      "name" : "total_amount",
      "required" : false,
      "type" : "double"
    }, {
      "id" : 18,
      "name" : "congestion_surcharge",
      "required" : false,
      "type" : "double"
    }, {
      "id" : 19,
      "name" : "airport_fee",
      "required" : false,
      "type" : "double"
    } ]
  } ],
  "default-spec-id" : 0,
  "partition-specs" : [ {
    "spec-id" : 0,
    "fields" : [ ]
  } ],
  "last-partition-id" : 999,
  "default-sort-order-id" : 0,
  "sort-orders" : [ {
    "order-id" : 0,
    "fields" : [ ]
  } ],
  "properties" : {
    "owner" : "root",
    "write.parquet.compression-codec" : "zstd"
  },
  "current-snapshot-id" : -1,
  "refs" : { },
  "snapshots" : [ ],
  "statistics" : [ ],
  "partition-statistics" : [ ],
  "snapshot-log" : [ ],
  "metadata-log" : [ ]
}

I don't see any discrepancies with the sort-orders, but I can continue tomorrow. @adrianqin feel free to jump in here :)

Pilipets commented 3 months ago

"partition-spec" : [ ], is missing. static final String PARTITION_SPEC = "partition-spec";

Try generating table with spark-sql to compare and it will be there. Just use any catalog...

spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.3.0 \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.local.type=hadoop \
    --conf spark.sql.catalog.local.warehouse=/tmp/warehouse

create database local.test_db; create table local.test_db.table (col int) using iceberg;
insert into local.test_db.table values (1), (2), (3); select * from local.test_db.table;
hpylypets-ua@hpylypets-1:~/Downloads$ cat /tmp/warehouse/test_db/table/metadata/v1.metadata.json
{
  "format-version" : 1,
  "table-uuid" : "666947c3-b711-4fa3-bdff-f554ed5b14d5",
  "location" : "/tmp/warehouse/test_db/table",
  "last-updated-ms" : 1718919955993,
  "last-column-id" : 1,
  "schema" : {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "col",
      "required" : false,
      "type" : "int"
    } ]
  },
  "current-schema-id" : 0,
  "schemas" : [ {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "col",
      "required" : false,
      "type" : "int"
    } ]
  } ],
  "partition-spec" : [ ],
  "default-spec-id" : 0,
  "partition-specs" : [ {
    "spec-id" : 0,
    "fields" : [ ]
  } ],
  "last-partition-id" : 999,
  "default-sort-order-id" : 0,
  "sort-orders" : [ {
    "order-id" : 0,
    "fields" : [ ]
  } ],
  "properties" : {
    "owner" : "hpylypets-ua"
  },
  "current-snapshot-id" : -1,
  "refs" : { },
  "snapshots" : [ ],
  "statistics" : [ ],
  "snapshot-log" : [ ],
  "metadata-log" : [ ]
Fokko commented 3 months ago

Thanks @Pilipets, again, really appreciate the examples.

So the difference is with the versions. I was generating V2 tables, and you have a V1 table (Java 1.3.0 has the default on V1, which should be changed later on.).

image

I think we need to fix this on the Java side

Pilipets commented 3 months ago

Yeah, I actually don't know if partition-spec is the culprit, but that's the difference that I spotted. When it's missing, it fails for me with error below.

Lake formation, spark-sql, snowflake and other writers probably add it, so read suceeds. Anyway, the work-around is to provide partition specs map to ManifestFiles.read

  /**
   * Return a map of {@link PartitionSpec partition specs} for this table.
   *
   * @return this table's partition specs map
   */
  Map<Integer, PartitionSpec> specs();

Below is full error stack.

java.lang.IllegalArgumentException: Cannot parse partition spec fields, not an array: {"spec-id":0,"fields":[]}
    at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:220)
    at org.apache.iceberg.PartitionSpecParser.buildFromJsonFields(PartitionSpecParser.java:123)
    at org.apache.iceberg.PartitionSpecParser.fromJsonFields(PartitionSpecParser.java:114)
    at org.apache.iceberg.PartitionSpecParser.lambda$fromJsonFields$4(PartitionSpecParser.java:119)
    at org.apache.iceberg.util.JsonUtil.parse(JsonUtil.java:95)
    at org.apache.iceberg.PartitionSpecParser.fromJsonFields(PartitionSpecParser.java:119)
    at org.apache.iceberg.ManifestReader.readPartitionSpec(ManifestReader.java:130)
    at org.apache.iceberg.ManifestReader.<init>(ManifestReader.java:114)
    at org.apache.iceberg.ManifestFiles.read(ManifestFiles.java:131)
    at org.apache.iceberg.ManifestFiles.read(ManifestFiles.java:111
Fokko commented 3 months ago

I should have gone deeper into the stacktrace, it looks like the partition-spec header is misformed.

Python:

โžœ  docker-spark-iceberg git:(main) avro-tools getmeta ~/Desktop/45c184f4-4550-4f6e-a99c-496d93671653-m0.avro | grep -i partition-spec 
24/06/21 17:28:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
partition-spec {"spec-id":1,"fields":[{"name":"tpep_pickup_datetime_day","transform":"day","source-id":2,"field-id":1000}]}
partition-spec-id 1

Java:

โžœ  docker-spark-iceberg git:(main) avro-tools getmeta ~/Desktop/793ff997-d03b-4992-a477-2cf9dbe3cb6b-m0.avro | grep -i partition-spec 
24/06/21 17:31:28 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
partition-spec-id 1
partition-spec [{"name":"tpep_pickup_datetime_day","transform":"day","source-id":2,"field-id":1000}]

Thanks again @Pilipets for raising this, I'll come up with a fix shortly.

RLashofRegas commented 1 month ago

@Fokko any expected timeline you can share on support for bucket transform? Is there a separate issue I can follow for that? Thanks for all the hard work so far!!