trinodb / trino

Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
https://trino.io
Apache License 2.0
10.37k stars 2.98k forks source link

Add support for HDFS only iceberg tables #5571

Open cccs-jc opened 4 years ago

cccs-jc commented 4 years ago

In order to keep list of tables and partitions Iceberg can be configured to use a Hive metastore however it can also be configured to store that information on files and thus not require running a Hive metastore.

This question was asked in a previous post.

@lxynov Is it planned to add support of hdfs only iceberg tables (like in spark https://iceberg.apache.org/spark/ &spark.sql.catalog.hadoop_prod.type = hadoop ) ?

_Originally posted by @AbdullaevAPo in https://github.com/prestosql/presto/issue_comments/707585664_

I've looked at the implementation of the PrestoSQL file IcebergMetadata. That class looks to be the interface which Presto uses to obtain the iceberg tables and schemas. Currently this class only handles retrieving this information from a Hive metastore.

https://github.com/prestosql/presto/blob/master/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java

However it looks like it could be extended to support the "hadoop" mode as well. For instance if you look at the iceberg catalog class you can see how it retrieves tables and schemas directly from files.

https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java

Would it be feasible to implement support for "hadoop" iceberg this way? Is it reasonable/desirable feature?

To simplify the implementation the initial support could be for "read only", no table creation or deletion.

cccs-jc commented 4 years ago

Did some further digging. Here are my notes for possible implementation:

Iceberg classes use to list namespaces and tables from iceberg based on HDFS only

public HadoopCatalog(Configuration conf, String warehouseLocation) {
public List<TableIdentifier> listTables(Namespace namespace) {
public List<Namespace> listNamespaces(Namespace namespace) {
public Table loadTable(TableIdentifier identifier) {
# Table has a schema
Schema <- Table.schema();
# TableIdentifier has
namespace
table name

Interface used by Presto to discover Iceberg namespaces and tables (namespaces are called schema in Presto)

IcebergMetadata

@Override
    public List<SchemaTableName> listTables(ConnectorSession session, Optional<String> schemaName)
    {
        return schemaName.map(Collections::singletonList)
                .orElseGet(metastore::getAllDatabases)
                .stream()
                .flatMap(schema -> metastore.getTablesWithParameter(schema, TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE).stream()
                        .map(table -> new SchemaTableName(schema, table))
                        .collect(toList())
                        .stream())
                .collect(toList());

        HDFS version would be something like

        new HadoopCatalog()
        List<Namespace> = HadoopCatalog.listSchemas(schemaName)
        for namespaces
          TableIdentifier = HadoopCatalog.listTables(namespace)
          list.add(new SchemaTableName(Table.getNamespace, Table.getName)

    }

IcebergMetadata uses IcebergUtil for example

public static Table getIcebergTable(HiveMetastore metastore, HdfsEnvironment hdfsEnvironment, ConnectorSession session, SchemaTableName table)
    {
        HdfsContext hdfsContext = new HdfsContext(session, table.getSchemaName(), table.getTableName());
        HiveIdentity identity = new HiveIdentity(session);
        TableOperations operations = new HiveTableOperations(metastore, hdfsEnvironment, hdfsContext, identity, table.getSchemaName(), table.getTableName());
        return new BaseTable(operations, quotedTableName(table));

     HDFS version would be something like

        SchemaTableName is a presto schema+table object
        you can create a TableIdentifier using the schema+table (simple string arguments)
        Table t = call HadoopCatalog.loadTalbe(tableIdentifier)
        return t
    }

In IcerbergUtil there are methods like these that don't need to change, for example this method takes an Iceberg table schema and returns implementations of the Presto ColumnHandle

public static List<IcebergColumnHandle> getColumns(Schema schema, TypeManager typeManager)