quintoandar / hive-metastore-client

A client for connecting and running DDLs on hive metastore.
Apache License 2.0
52 stars 15 forks source link

Hive Metastore Client Cataloging for Delta #61

Open asafepy opened 3 years ago

asafepy commented 3 years ago

Hi guys, we here at CVCCorp have a limitation for Hive cataloging regarding Delta data.

This would be an example of what the cataloging model for data in Delta should look like.

CREATE EXTERNAL TABLE table_teste( tabela STRING, data_update STRING, count BIGINT) STORED BY 'io.delta.hive.DeltaStorageHandler' LOCATION 's3://bucket-name/example/table_teste/';

Our motivations in using data in Delta are because we use Databricks and our Benchmark, Delta has better performance. We also centralized all metadata in a Hive Cluster for integration with Databricks.

Any questions I will be in contact with Lucas on LinkedIn.

LucasMMota commented 3 years ago

Hi @asafepy, thank you for opening this issue!

I see that Hive provides this STORED BY clause in its syntax, but after some diving into the code, I did not find any reference for the "STORED BY" clause in the Hive Metastore objects provided by the Thrift mapping.

My guess: or this is not supported by the Hive Metastore Server yet, or we should apply this clause in some different way.

For example, maybe you could try using the parameter key of the StorageDescriptorBuilder or SerDeInfoBuilder to achieve this (I am not sure it will work though) - we do something similar for creating external tables (example).

jdonnelly-apixio commented 1 year ago

If anyone comes across this issue as well, this is how I was able to create a table that uses delta:

field_schema = ColumnBuilder(name='col',
                             type='array<string>',
                             comment='from deserializer').build()
serde_info = SerDeInfoBuilder(serialization_lib='org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe',
                              parameters={'serialization.format': '1',
                                          'path': 's3a://{data_bucket}/{delta_folder'}).build()
storage_descriptor = StorageDescriptorBuilder(columns=[field_schema],
                                              location='s3a://{schema_bucket}/{db_name}/{table_name}-__PLACEHOLDER__',
                                              input_format='org.apache.hadoop.mapred.SequenceFileInputFormat',
                                              output_format='org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat',
                                              compressed=False,
                                              num_buckets=-1,
                                              serde_info=serde_info,
                                              bucket_cols=[],
                                              sort_cols=[],
                                              parameters={},
                                              #skewedInfo=SkewedInfo(skewedColNames=[],
                                              #                      skewedColValues=[],
                                              #                      skewedColValueLocationMaps={}),
                                              stored_as_sub_directories=False).build()

table = TableBuilder(db_name="{db_name}",
    table_name="{table_name}",
    owner="{owner_name}",
    storage_descriptor=storage_descriptor,
    partition_keys=[],
    parameters={'owner': '{owner_name}',
                'EXTERNAL': 'TRUE',
                'provider': 'delta',
                'spark.sql.sources.schema.part.0': '{"type":"struct","fields":[]}',
                'transient_lastDdlTime': '1642628681',
                'location': 's3a://{data_bucket}/{delta_folder}',
                'spark.sql.partitionProvider': 'catalog',
                'spark.sql.sources.schema.numParts': '1',
                'spark.sql.sources.provider': 'delta',
                'spark.sql.create.version': '3.1.1'},
   table_type='EXTERNAL_TABLE',
   temporary=False,
   rewrite_enabled=False,
   cat_name='hive',
   owner_type=1).build()

client.create_table(table)