linkedin / openhouse

Open Control Plane for Tables in Data Lakehouse
https://www.openhousedb.org/
BSD 2-Clause "Simplified" License
294 stars 50 forks source link

[PR4/5]: Add S3FileIO #125

Closed jainlavina closed 3 months ago

jainlavina commented 3 months ago

Summary

This is the fourth PR in a sequence of PRs to add support for S3 storage.

Openhouse catalog currently supports HDFS as the storage backend. The end goal of this effort is to add the integration with S3 so that the storage backend can be configured to be S3 vs HDFS based on storage type.

The entire work is done via a series of PRs:

  1. Add S3 Storage type and S3StorageClient.
  2. Add base class for StorageClient and move common logic like validation of properties there to avoid code duplication.
  3. Add S3Storage implementation that uses S3StorageClient.
  4. Add support for using S3FileIO for S3 storage type.
  5. Add a recipe for end-to-end testing in docker.

This PR addresses 4 by adding S3FileIO.

Sushant has already done 5. So, this marks the completion of S3 integration.

Changes

For all the boxes checked, please include additional details of the changes made in this pull request.

Testing Done

Testing by running the oh-s3-spark recipe in docker.

  1. Run docker:

$ docker compose up -d [+] Building 0.0s (0/0) docker:desktop-linux [+] Running 16/16 ✔ Network oh-s3-spark_default Cre... 0.0s ✔ Container local.spark-master St... 0.2s ✔ Container oh-s3-spark-prometheus-1 Started 0.2s ✔ Container local.mysql Started 0.2s ✔ Container local.minioS3 Started 0.2s ✔ Container local.opa Started 0.2s ! opa The requested image's platform (linux/amd64) does not match the detected host platform (linux/arm64/v8) and no specific platform was requested 0.0s ! spark-master The requested image's platform (linux/amd64) does not match the detected host platform (linux/arm64/v8) and no specific platform was requested 0.0s ✔ Container local.spark-livy Star... 0.1s ✔ Container local.spark-worker-a Started 0.1s ✔ Container local.minioClient Sta... 0.0s ✔ Container local.openhouse-housetables Started 0.0s ! spark-worker-a The requested image's platform (linux/amd64) does not match the detected host platform (linux/arm64/v8) and no specific platform was requested 0.0s ! spark-livy The requested image's platform (linux/amd64) does not match the detected host platform (linux/arm64/v8) and no specific platform was requested 0.0s ✔ Container local.openhouse-jobs Started 0.0s ✔ Container local.openhouse-tables Started 0.0s lajain-mn2:oh-s3-spark lajain$ docker exec -it local.spark-master /bin/bash

Screenshot 2024-06-13 at 3 47 32 PM
  1. Login to MinIO. Screenshot 2024-06-13 at 3 48 36 PM
Screenshot 2024-06-13 at 3 48 59 PM
  1. Run spark shell: openhouse@cff3c38358c5:/opt/spark$ export AWS_REGION=us-east-1 openhouse@cff3c38358c5:/opt/spark$ bin/spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.2.0,software.amazon.awssdk:bundle:2.20.18,software.amazon.awssdk:url-connection-client:2.20.18 \

    --jars openhouse-spark-runtime_2.12-*-all.jar \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,com.linkedin.openhouse.spark.extensions.OpenhouseSparkSessionExtensions \ --conf spark.sql.catalog.openhouse=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.openhouse.catalog-impl=com.linkedin.openhouse.spark.OpenHouseCatalog \ --conf spark.sql.catalog.openhouse.metrics-reporter-impl=com.linkedin.openhouse.javaclient.OpenHouseMetricsReporter \ --conf spark.sql.catalog.openhouse.uri=http://openhouse-tables:8080 \ --conf spark.sql.catalog.openhouse.auth-token=$(cat /var/config/$(whoami).token) \ --conf spark.sql.catalog.openhouse.cluster=LocalS3Cluster \ --conf spark.sql.catalog.openhouse.io-impl=org.apache.iceberg.aws.s3.S3FileIO \ --conf spark.sql.catalog.openhouse.s3.endpoint=http://minioS3:9000 \ --conf spark.sql.catalog.openhouse.s3.access-key-id=admin \ --conf spark.sql.catalog.openhouse.s3.secret-access-key=password \ --conf spark.sql.catalog.openhouse.s3.path-style-access=true :: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml Ivy Default Cache set to: /home/openhouse/.ivy2/cache The jars for the packages stored in: /home/openhouse/.ivy2/jars org.apache.iceberg#iceberg-spark-runtime-3.1_2.12 added as a dependency software.amazon.awssdk#bundle added as a dependency software.amazon.awssdk#url-connection-client added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-9d1e3e8a-c713-44f0-93b3-bd5029daa8e2;1.0 confs: [default] found org.apache.iceberg#iceberg-spark-runtime-3.1_2.12;1.2.0 in central

    found software.amazon.awssdk#bundle;2.20.18 in central found software.amazon.eventstream#eventstream;1.0.1 in central found software.amazon.awssdk#url-connection-client;2.20.18 in central found software.amazon.awssdk#utils;2.20.18 in central found org.reactivestreams#reactive-streams;1.0.3 in central found software.amazon.awssdk#annotations;2.20.18 in central found org.slf4j#slf4j-api;1.7.30 in central found software.amazon.awssdk#http-client-spi;2.20.18 in central found software.amazon.awssdk#metrics-spi;2.20.18 in central downloading https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.1_2.12/1.2.0/iceberg-spark-runtime-3.1_2.12-1.2.0.jar ... [SUCCESSFUL ] org.apache.iceberg#iceberg-spark-runtime-3.1_2.12;1.2.0!iceberg-spark-runtime-3.1_2.12.jar (966ms) downloading https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.20.18/bundle-2.20.18.jar ... [SUCCESSFUL ] software.amazon.awssdk#bundle;2.20.18!bundle.jar (8889ms) downloading https://repo1.maven.org/maven2/software/amazon/awssdk/url-connection-client/2.20.18/url-connection-client-2.20.18.jar ... [SUCCESSFUL ] software.amazon.awssdk#url-connection-client;2.20.18!url-connection-client.jar (66ms) downloading https://repo1.maven.org/maven2/software/amazon/eventstream/eventstream/1.0.1/eventstream-1.0.1.jar ... [SUCCESSFUL ] software.amazon.eventstream#eventstream;1.0.1!eventstream.jar (59ms) downloading https://repo1.maven.org/maven2/software/amazon/awssdk/utils/2.20.18/utils-2.20.18.jar ... [SUCCESSFUL ] software.amazon.awssdk#utils;2.20.18!utils.jar (66ms) downloading https://repo1.maven.org/maven2/software/amazon/awssdk/annotations/2.20.18/annotations-2.20.18.jar ... [SUCCESSFUL ] software.amazon.awssdk#annotations;2.20.18!annotations.jar (62ms) downloading https://repo1.maven.org/maven2/software/amazon/awssdk/http-client-spi/2.20.18/http-client-spi-2.20.18.jar ... [SUCCESSFUL ] software.amazon.awssdk#http-client-spi;2.20.18!http-client-spi.jar (61ms) downloading https://repo1.maven.org/maven2/org/reactivestreams/reactive-streams/1.0.3/reactive-streams-1.0.3.jar ... [SUCCESSFUL ] org.reactivestreams#reactive-streams;1.0.3!reactive-streams.jar (58ms) downloading https://repo1.maven.org/maven2/org/slf4j/slf4j-api/1.7.30/slf4j-api-1.7.30.jar ... [SUCCESSFUL ] org.slf4j#slf4j-api;1.7.30!slf4j-api.jar (59ms) downloading https://repo1.maven.org/maven2/software/amazon/awssdk/metrics-spi/2.20.18/metrics-spi-2.20.18.jar ... [SUCCESSFUL ] software.amazon.awssdk#metrics-spi;2.20.18!metrics-spi.jar (56ms) :: resolution report :: resolve 153033ms :: artifacts dl 10382ms :: modules in use: org.apache.iceberg#iceberg-spark-runtime-3.1_2.12;1.2.0 from central in [default] org.reactivestreams#reactive-streams;1.0.3 from central in [default] org.slf4j#slf4j-api;1.7.30 from central in [default] software.amazon.awssdk#annotations;2.20.18 from central in [default] software.amazon.awssdk#bundle;2.20.18 from central in [default] software.amazon.awssdk#http-client-spi;2.20.18 from central in [default] software.amazon.awssdk#metrics-spi;2.20.18 from central in [default] software.amazon.awssdk#url-connection-client;2.20.18 from central in [default] software.amazon.awssdk#utils;2.20.18 from central in [default] software.amazon.eventstream#eventstream;1.0.1 from central in [default]

    | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded|

    | default | 10 | 10 | 10 | 0 || 10 | 10 |

    :: retrieving :: org.apache.spark#spark-submit-parent-9d1e3e8a-c713-44f0-93b3-bd5029daa8e2 confs: [default] 10 artifacts copied, 0 already retrieved (480145kB/974ms) 2024-06-14 00:51:50,318 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://cff3c38358c5:4040 Spark context available as 'sc' (master = local[*], app id = local-1718326322630). Spark session available as 'spark'. Welcome to


    / / _ ___/ /_ \ \/ \/ `/ / '/ // ._/_,// //_\ version 3.1.1 /_/

Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_232) Type in expressions to have them evaluated. Type :help for more information.

scala>

The bucket right now is empty:

Screenshot 2024-06-13 at 5 50 19 PM
  1. Create table: scala> spark.sql("CREATE TABLE openhouse.db.tb (ts timestamp, col1 string, col2 string) PARTITIONED BY (days(ts))").show() ++ || ++ ++
scala> spark.sql("DESCRIBE TABLE openhouse.db.tb").show() SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. +--------------+---------+-------+ col_name data_type comment +--------------+---------+-------+ ts timestamp col1 string col2 string
# Partitioning
Part 0 days(ts)

+--------------+---------+-------+

Screenshot 2024-06-13 at 5 53 55 PM Screenshot 2024-06-13 at 5 54 11 PM Screenshot 2024-06-13 at 5 54 22 PM
  1. Add data

scala> spark.sql("INSERT INTO TABLE openhouse.db.tb VALUES (current_timestamp(), 'val1', 'val2')") res2: org.apache.spark.sql.DataFrame = []

scala> spark.sql("INSERT INTO TABLE openhouse.db.tb VALUES (date_sub(CAST(current_timestamp() as DATE), 30), 'val1', 'val2')") res3: org.apache.spark.sql.DataFrame = []

scala> spark.sql("INSERT INTO TABLE openhouse.db.tb VALUES (date_sub(CAST(current_timestamp() as DATE), 60), 'val1', 'val2')") res4: org.apache.spark.sql.DataFrame = []

scala> spark.sql("SELECT * FROM openhouse.db.tb").show() +--------------------+----+----+
| ts|col1|col2| +--------------------+----+----+ | 2024-05-15 00:00:00|val1|val2| | 2024-04-15 00:00:00|val1|val2| |2024-06-14 00:55:...|val1|val2| +--------------------+----+----+

scala> spark.sql("SHOW TABLES IN openhouse.db").show() +---------+---------+ |namespace|tableName| +---------+---------+ | db| tb| +---------+---------+

Screenshot 2024-06-13 at 5 56 15 PM

Test using table service: $ curl "${curlArgs[@]}" -XPOST http://localhost:8000/v1/databases/d3/tables/ \

--data-raw '{ "tableId": "t1", "databaseId": "d3", "baseTableVersion": "INITIAL_VERSION", "clusterId": "LocalS3Cluster", "schema": "{\"type\": \"struct\", \"fields\": [{\"id\": 1,\"required\": true,\"name\": \"id\",\"type\": \"string\"},{\"id\": 2,\"required\": true,\"name\": \"name\",\"type\": \"string\"},{\"id\": 3,\"required\": true,\"name\": \"ts\",\"type\": \"timestamp\"}]}", "timePartitioning": { "columnName": "ts", "granularity": "HOUR" }, "clustering": [ { "columnName": "name" } ], "tableProperties": { "key": "value" } }' | json_pp % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 2146 0 1576 100 570 4184 1513 --:--:-- --:--:-- --:--:-- 5692 { "clusterId" : "LocalS3Cluster", "clustering" : [ { "columnName" : "name", "transform" : null } ], "creationTime" : 1718327830198, "databaseId" : "d3", "lastModifiedTime" : 1718327830198, "policies" : null, "schema" : "{\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"id\",\"required\":true,\"type\":\"string\"},{\"id\":2,\"name\":\"name\",\"required\":true,\"type\":\"string\"},{\"id\":3,\"name\":\"ts\",\"required\":true,\"type\":\"timestamp\"}]}", "tableCreator" : "DUMMY_ANONYMOUS_USER", "tableId" : "t1", "tableLocation" : "s3://openhouse-bucket/d3/t1-394d8186-143f-482a-b5e5-e6aa6e382556/00000-8e498f9d-153e-412f-bb0e-476cfcab926d.metadata.json", "tableProperties" : { "key" : "value", "openhouse.clusterId" : "LocalS3Cluster", "openhouse.creationTime" : "1718327830198", "openhouse.databaseId" : "d3", "openhouse.lastModifiedTime" : "1718327830198", "openhouse.tableCreator" : "DUMMY_ANONYMOUS_USER", "openhouse.tableId" : "t1", "openhouse.tableLocation" : "s3://openhouse-bucket/d3/t1-394d8186-143f-482a-b5e5-e6aa6e382556/00000-8e498f9d-153e-412f-bb0e-476cfcab926d.metadata.json", "openhouse.tableType" : "PRIMARY_TABLE", "openhouse.tableUUID" : "394d8186-143f-482a-b5e5-e6aa6e382556", "openhouse.tableUri" : "LocalS3Cluster.d3.t1", "openhouse.tableVersion" : "INITIAL_VERSION", "policies" : "", "write.format.default" : "orc", "write.metadata.delete-after-commit.enabled" : "true", "write.metadata.previous-versions-max" : "28" }, "tableType" : "PRIMARY_TABLE", "tableUUID" : "394d8186-143f-482a-b5e5-e6aa6e382556", "tableUri" : "LocalS3Cluster.d3.t1", "tableVersion" : "INITIAL_VERSION", "timePartitioning" : { "columnName" : "ts", "granularity" : "HOUR" } }

Screenshot 2024-06-13 at 6 17 46 PM

$ curl "${curlArgs[@]}" -XGET http://localhost:8000/v1/databases/d3/tables/t1 | json_pp % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 1576 0 1576 0 0 9477 0 --:--:-- --:--:-- --:--:-- 9493 { "clusterId" : "LocalS3Cluster", "clustering" : [ { "columnName" : "name", "transform" : null } ], "creationTime" : 1718327830198, "databaseId" : "d3", "lastModifiedTime" : 1718327830198, "policies" : null, "schema" : "{\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"id\",\"required\":true,\"type\":\"string\"},{\"id\":2,\"name\":\"name\",\"required\":true,\"type\":\"string\"},{\"id\":3,\"name\":\"ts\",\"required\":true,\"type\":\"timestamp\"}]}", "tableCreator" : "DUMMY_ANONYMOUS_USER", "tableId" : "t1", "tableLocation" : "s3://openhouse-bucket/d3/t1-394d8186-143f-482a-b5e5-e6aa6e382556/00000-8e498f9d-153e-412f-bb0e-476cfcab926d.metadata.json", "tableProperties" : { "key" : "value", "openhouse.clusterId" : "LocalS3Cluster", "openhouse.creationTime" : "1718327830198", "openhouse.databaseId" : "d3", "openhouse.lastModifiedTime" : "1718327830198", "openhouse.tableCreator" : "DUMMY_ANONYMOUS_USER", "openhouse.tableId" : "t1", "openhouse.tableLocation" : "s3://openhouse-bucket/d3/t1-394d8186-143f-482a-b5e5-e6aa6e382556/00000-8e498f9d-153e-412f-bb0e-476cfcab926d.metadata.json", "openhouse.tableType" : "PRIMARY_TABLE", "openhouse.tableUUID" : "394d8186-143f-482a-b5e5-e6aa6e382556", "openhouse.tableUri" : "LocalS3Cluster.d3.t1", "openhouse.tableVersion" : "INITIAL_VERSION", "policies" : "", "write.format.default" : "orc", "write.metadata.delete-after-commit.enabled" : "true", "write.metadata.previous-versions-max" : "28" }, "tableType" : "PRIMARY_TABLE", "tableUUID" : "394d8186-143f-482a-b5e5-e6aa6e382556", "tableUri" : "LocalS3Cluster.d3.t1", "tableVersion" : "INITIAL_VERSION", "timePartitioning" : { "columnName" : "ts", "granularity" : "HOUR" } }

$ curl "${curlArgs[@]}" -XGET http://localhost:8000/v1/databases/d3/tables/ | json_pp % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 1590 0 1590 0 0 22490 0 --:--:-- --:--:-- --:--:-- 22714 { "results" : [ { "clusterId" : "LocalS3Cluster", "clustering" : [ { "columnName" : "name", "transform" : null } ], "creationTime" : 1718327830198, "databaseId" : "d3", "lastModifiedTime" : 1718327830198, "policies" : null, "schema" : "{\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"id\",\"required\":true,\"type\":\"string\"},{\"id\":2,\"name\":\"name\",\"required\":true,\"type\":\"string\"},{\"id\":3,\"name\":\"ts\",\"required\":true,\"type\":\"timestamp\"}]}", "tableCreator" : "DUMMY_ANONYMOUS_USER", "tableId" : "t1", "tableLocation" : "s3://openhouse-bucket/d3/t1-394d8186-143f-482a-b5e5-e6aa6e382556/00000-8e498f9d-153e-412f-bb0e-476cfcab926d.metadata.json", "tableProperties" : { "key" : "value", "openhouse.clusterId" : "LocalS3Cluster", "openhouse.creationTime" : "1718327830198", "openhouse.databaseId" : "d3", "openhouse.lastModifiedTime" : "1718327830198", "openhouse.tableCreator" : "DUMMY_ANONYMOUS_USER", "openhouse.tableId" : "t1", "openhouse.tableLocation" : "s3://openhouse-bucket/d3/t1-394d8186-143f-482a-b5e5-e6aa6e382556/00000-8e498f9d-153e-412f-bb0e-476cfcab926d.metadata.json", "openhouse.tableType" : "PRIMARY_TABLE", "openhouse.tableUUID" : "394d8186-143f-482a-b5e5-e6aa6e382556", "openhouse.tableUri" : "LocalS3Cluster.d3.t1", "openhouse.tableVersion" : "INITIAL_VERSION", "policies" : "", "write.format.default" : "orc", "write.metadata.delete-after-commit.enabled" : "true", "write.metadata.previous-versions-max" : "28" }, "tableType" : "PRIMARY_TABLE", "tableUUID" : "394d8186-143f-482a-b5e5-e6aa6e382556", "tableUri" : "LocalS3Cluster.d3.t1", "tableVersion" : "INITIAL_VERSION", "timePartitioning" : { "columnName" : "ts", "granularity" : "HOUR" } } ] }

Delete table: $ curl "${curlArgs[@]}" -XDELETE http://localhost:8000/v1/databases/d3/tables/t1

Valid that the table is deleted: ![Uploading Screenshot 2024-06-13 at 6.21.03 PM.png…]()

For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request.

Additional Information

For all the boxes checked, include additional details of the changes made in this pull request.