trinodb / trino

Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
https://trino.io
Apache License 2.0
9.97k stars 2.88k forks source link

Support for `UNLOAD` functionality for Data Lake connectors #19981

Open findinpath opened 8 months ago

findinpath commented 8 months ago

Introduction

Amazon Athena exposes UNLOAD statement which can be used to save to a specified storage location the output of a query. AWS Athena documentation for UNLOAD : https://docs.aws.amazon.com/athena/latest/ug/unload.html

Such a functionality would be beneficial to implement in Trino as well.

Implementation roadmap

End state:

    CALL unload(path => …, format => ..., input => TABLE(...))

How do we get there:

First phase

SELECT * FROM TABLE(unload(path => …, input => TABLE (...)))
SELECT * FROM TABLE(hive.system.unload(
  input => TABLE( <subquery> ),
  location => 's3://bucket/path',
  format => 'parquet'))

What would the statement return?

A table function today runs either partitioned or not. We need to partition the writes and therefore we partition the output - so we produce multiple values for the statement. The statement would therefore potentially return all the filenames of all the files written.

A table function should be side-effect free. Calling however this function twice would probably write twice / potentially overwrite and corrupt (in case of Iceberg or Delta Lake tables) content in the specified location. How would it perform in the context of FTE?

Limitations:

Second phase

SELECT * FROM TABLE(unload(path => …, input => TABLE (...)))

Aspects that can be improved independently from each other:

End state:

    CALL unload(path => …, format => ... , input => TABLE(...))

Use CALL statement

Initial suggestion from @pettyjamesm

CALL unload(
      location => "s3://bucket/path", 
      format => "parquet", 
      input => TABLE (SELECT * FROM orders))

This kind of call would fit the SQL specification. Yes, CALL can take table as a parameter (either a table or a query). Input from @martint

This would probably be ideal from a syntactic point of view.

CALL is today like a TASK in the coordinator - it is not a "proper" query. In order to should be changed to be invoked in a “query”. This could be sugared behind the scenes as

SELECT * FROM TABLE(system.unload(
  input => TABLE( <subquery> ),
  location => 's3://bucket/path',
  format => 'parquet'))

unload would return a status value saying e.g.: how many rows were "affected" The commit protocol should be taken into account.

Other potential ways to implement this functionality

Add support for a UNLOAD in the SQL Syntax

It looks attractive, but it doesn’t fit in the Trino model. The engine wouldn't know to exactly which connector should point UNLOAD to. Moreover, there could be scenarios where depending on the specified location, different connectors would need to be used.

Add an extra table parameter to CTAS statement to avoid registering the table

CREATE TABLE hive.myschema.mytable 
WITH (
    location=’....’,
    register_table= false) 
AS SELECT * FROM tcph.sf1.nation;

The table parameter register_table would signal to the connector that the output table of this statement shouldn't be registered to the metastore. In the example above however myschema and mytable are both synthetic - they are not used anywhere.

Create a special connector with the sole purpose of doing CTAS but don't register the table

CREATE TABLE hive.myschema.mytable 
WITH (
    location=’....’) 
AS SELECT * FROM tcph.sf1.nation;

Same as the option mentioned previously, the connector would completely disregard myschema and mytable.

Create a special connector which interprets differently the concept of schema name and table name

CREATE TABLE hive.bucket."table/location”
WITH (....)
AS SELECT * FROM tpch.sf1.nation;

The connector handling the above query would interpret the schema name as the bucket name and the table name as the path in the bucket to which we'd like to save the table.

Use a reserved schema name in a general purpose data lake connector

CREATE TABLE hive.raw."bucket/some/path"
WITH (....)
AS SELECT * FROM tpch.sf1.nation;

raw would be a reserved schema name in the connector. This pragmatic choice comes with the side-effect of limiting the user not to use the reserved schema name. From a UX perspective it feels rather awkward.

Insert into a table function the output of a SELECT

INSERT INTO TABLE(hive.system.unload(
  location => 's3://bucket/path',
  format => 'parquet'))

The syntax as it is listed, doesn’t fit in the standard. This could be misleading for the users - how should the users distinguish between read-only and writable table functions?

martint commented 7 months ago

Let’s keep this open. It’s a good feature to have if someone wants to pick it up.