os-climate / os_c_data_commons

Repository for Data Commons platform architecture overview, as well as developer and user documentation
Apache License 2.0
21 stars 10 forks source link

Guidance needed for using AddFiles with Iceberg #153

Open MichaelTiemannOSC opened 2 years ago

MichaelTiemannOSC commented 2 years ago

This PR allows for easy migration of Hive data into Iceberg: https://github.com/apache/iceberg/pull/2210

I have observed that using the current Iceberg data ingestion (processing 1000 or 2000 insert statements per batch) is perhaps 50x slower than what I saw when using Parquet as a format for Hive. While it's minorly annoying to wait almost an hour to load data that used to load in a minute or so, the SEC DERA data is two orders of magnitude larger and take a whole weekend to load 200M rows of data.

I'd like to confirm whether there is a right way to associate Parquet files with Iceberg for the way we plan to use Iceberg, and if so what that pattern looks like. Here's how I did it previously:

# Once we have all our parquet files in place, load up the tables with their directory contents
for tbl in dera_tables:
    if tbl not in dera_df:
        error(f'{tbl} data not found')
    tabledef = osc.unmanaged_parquet_tabledef(dera_df[tbl],
                                              ingest_catalog, ingest_schema, tbl, trino_bucket,
                                              typemap={'int16':'smallint', 'Int16':'smallint'})
    qres = engine.execute(tabledef)
    for row in qres.fetchall():
        print(row)

I know that Iceberg only supports 32 and 64 bit integers, so fixing the typemap is easy. How can we use parquet data as a source for Iceberg? And can we incrementally add new parquet data without needing to reload everything when it becomes available?

erikerlandson commented 2 years ago

We can explore writing ingest in java, if that's what you are asking A lot of iceberg api is spark centric: https://iceberg.apache.org/docs/latest/spark-procedures/#add_files

Yeah, looking at the https://github.com/apache/iceberg/pull/2210 code, it's all under spark, so if we use this at all, it will have to be some kind of spark hackery

erikerlandson commented 2 years ago

If Trino SQL supports CALL catalog_name.system.add_files(...) it would be ideal: https://github.com/RussellSpitzer/iceberg/blob/a4279fc5842046043f2afdc90f2428243958574d/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java#L80

https://github.com/trinodb/trino/issues/11744

erikerlandson commented 2 years ago

cc @rimolive

erikerlandson commented 2 years ago

another thing that would probably make df.to_sql faster is if trino supports executemany https://github.com/trinodb/trino-python-client/blob/master/trino/dbapi.py#L467

The reason I have to create a literal SQL string for all data rows in TrinoBatchInsert is because executemany isn't supported. https://github.com/trinodb/trino-python-client/issues/169

MichaelTiemannOSC commented 2 years ago

We can explore writing ingest in java, if that's what you are asking A lot of iceberg api is spark centric: https://iceberg.apache.org/docs/latest/spark-procedures/#add_files

Yeah, looking at the apache/iceberg#2210 code, it's all under spark, so if we use this at all, it will have to be some kind of spark hackery

To that I say "no sir!". I really don't want to open the Java can of worms myself. I'll go the conventional way and keep eyes open for when a better path presents. Then I will loudly ask for the appropriate new version of Trino ;-)

erikerlandson commented 2 years ago

I know that we decided we did not want to do this, but we can re-visit the idea of maintaining a hive catalog, for doing "direct parquet ingest". And then to get that data into SQL, do a insert from select, from the hive-temp-table -> main-iceberg-table.

I am unsure what the performance of that would be, but trino would be managing all the data transfer and I'd expect it to be at least somewhat faster than brute-force pushing raw SQL literals across the net with sqlalchemy.

erikerlandson commented 2 years ago

I don't have much experience multi-threading but sqlalchemy does allow thread-local sessions, and it's probably possible to add multi-threading to TrinoBatchInsert https://docs.sqlalchemy.org/en/14/orm/contextual.html

MichaelTiemannOSC commented 2 years ago

Just another interesting example of parquet adjacent to our work: https://github.com/catalyst-cooperative/pudl/issues/1564

MichaelTiemannOSC commented 2 years ago

Well, my investigations thus far lead me to the following conclusions:

We do have users who now want to onboard 50MB-200MB global datasets. Maybe we should revisit using a hive temp-table implementation. Since I'm not that familiar with either Hive or Iceberg, I'd really rather somebody else look at doing the first implementation and then testing to see how well it works.

chenwyi2 commented 2 years ago

there is a way to import a hive table into a specific parition in iceberg? the partition schema between hive table and iceberg is different, for example, a hive table has two partitions but a iceberg tablehas three paritions, we want import this hive table into a specific parition in iceberg.

MichaelTiemannOSC commented 2 years ago

The role of this Issue was specific to some particular Iceberg functionality (that we cannot yet access due to how much Spark we are willing/able to support in our implementation). Once the question of using Hive comes up, @erikerlandson has implemented a fast-path that demonstrates 70x speedup, which is enough to break through the bottleneck of row-by-row insertion of textual values statements. It's quite likely that there's an additional 2x-3x performance available since writing parquet files and then doing an "insert from select" operation still involves extra copying and formatting. See https://github.com/os-climate/osc-ingest-tools/blob/main/osc_ingest_trino/trino_utils.py in version 0.4.1 of osc_ingest_trino.