apache / gravitino

World's most powerful open data catalog for building a high-performance, geo-distributed and federated metadata lake.
https://gravitino.apache.org
Apache License 2.0
1.09k stars 343 forks source link

[Bug report] Can't load filesystem 'gs' when use spark to access Gravitino GCS bundles #5609

Open yuqi1129 opened 2 days ago

yuqi1129 commented 2 days ago

Version

main branch

Describe what's wrong

When I running spark to access GCS, the following error occur:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/yuqi/venv/lib/python3.9/site-packages/pyspark/sql/readwriter.py", line 955, in csv
    self._jwrite.csv(path)
  File "/Users/yuqi/venv/lib/python3.9/site-packages/py4j/java_gateway.py", line 1309, in __call__
    return_value = get_return_value(
  File "/Users/yuqi/venv/lib/python3.9/site-packages/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/Users/yuqi/venv/lib/python3.9/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o104.csv.
: org.apache.gravitino.exceptions.GravitinoRuntimeException: Exception occurs when create new FileSystem for actual uri: gs://example_qazwsx/example/people, msg: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "gs"
    at org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem.lambda$getFilesetContext$2(GravitinoVirtualFileSystem.java:431)
    at org.apache.gravitino.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2406)
    at java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1853)
    at org.apache.gravitino.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2404)
    at org.apache.gravitino.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2387)
    at org.apache.gravitino.shaded.com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108)
    at org.apache.gravitino.shaded.com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62)
    at org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem.getFilesetContext(GravitinoVirtualFileSystem.java:386)
    at org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem.getFileStatus(GravitinoVirtualFileSystem.java:547)
    at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1862)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:117)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
    at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:839)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.lang.Thread.run(Thread.java:750)

Error message and/or stacktrace

please see above.

How to reproduce

  1. create a metalake named test, gcs catalog name gcs_catalog, schema named schema and fileset_named example.
  2. install pyspark=3.2.0 and gravitino python client.
    pip install apache-gravitino==0.7.0
    pip install pyspark==3.2.0
  3. use the following code to access GCS.
    
    import logging
    logging.basicConfig(level=logging.INFO)

from gravitino import NameIdentifier, GravitinoClient, Catalog, Fileset, GravitinoAdminClient

gravitino_url = "http://localhost:8090" metalake_name = "test"

catalog_name = "gcs_catalog" schema_name = "schema" fileset_name = "example"

fileset_ident = NameIdentifier.of(schema_name, fileset_name)

gravitino_admin_client = GravitinoAdminClient(uri=gravitino_url) gravitino_client = GravitinoClient(uri=gravitino_url, metalake_name=metalake_name) from pyspark.sql import SparkSession import os

os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars /Users/yuqi/project/gravitino/bundles/gcp-bundle/build/libs/gravitino-gcp-bundle-0.8.0-incubating-SNAPSHOT.jar,/Users/yuqi/project/gravitino/clients/filesystem-hadoop3-runtime/build/libs/gravitino-filesystem-hadoop3-runtime-0.8.0-incubating-SNAPSHOT.jar --master local[1] pyspark-shell"

spark = SparkSession.builder \ .appName("s3_fielset_test") \ .config("spark.hadoop.fs.AbstractFileSystem.gvfs.impl", "org.apache.gravitino.filesystem.hadoop.Gvfs") \ .config("spark.hadoop.fs.gvfs.impl", "org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem") \ .config("spark.hadoop.fs.gravitino.server.uri", "http://localhost:8090") \ .config("spark.hadoop.fs.gravitino.client.metalake", "test") \ .config("spark.hadoop.gcs-service-account-file", "/Users/yuqi/Downloads/silken-physics-431108-g3-30ab3d97bb60.json") \ .config("spark.hadoop.fs.gvfs.filesystem.providers", "gcs") \ .config("spark.driver.memory", "2g") \ .config("spark.driver.port", "2048") \ .config("spark.driver.extraClassPath", "/Users/yuqi/Downloads/hadoop-client-runtime-3.3.6.jar:/Users/yuqi/Downloads/hadoop-client-api-3.3.6.jar") \ .config("spark.executor.extraClassPath", "/Users/yuqi/Downloads/hadoop-client-runtime-3.3.6.jar:/Users/yuqi/Downloads/hadoop-client-api-3.3.6.jar") \ .getOrCreate()

spark.sparkContext.setLogLevel("DEBUG")

data = [("Alice", 25), ("Bob", 30), ("Cathy", 45)] columns = ["Name", "Age"] spark_df = spark.createDataFrame(data, schema=columns) gvfs_path = f"gvfs://fileset/{catalog_name}/{schema_name}/{fileset_name}/people"

spark_df.coalesce(1).write \ .mode("overwrite") \ .option("header", "true") \ .csv(gvfs_path) spark.stop()


### Additional context

<img width="1623" alt="image" src="https://github.com/user-attachments/assets/3106c621-065e-4c79-a7a5-8798e9324f47">

![image](https://github.com/user-attachments/assets/929b08ad-a1fa-4484-a590-b2ebbfd0af68)

Reason: The value of `FILE_SYSTEMS_LOADED` in `FileSystem` is always true before `provider.getFileSystem(filePath, maps);` in `GravitinoVirtualFileSystem` and the GCS filesystem has not been loaded please see the value of `SERVICE_FILE_SYSTEMS` before and after:

Before

{viewfs=class org.apache.hadoop.fs.viewfs.ViewFileSystem, swebhdfs=class org.apache.hadoop.hdfs.web.SWebHdfsFileSystem, file=class org.apache.hadoop.hive.ql.io.ProxyLocalFileSystem, har=class org.apache.hadoop.fs.HarFileSystem, http=class org.apache.hadoop.fs.http.HttpFileSystem, hdfs=class org.apache.hadoop.hdfs.DistributedFileSystem, webhdfs=class org.apache.hadoop.hdfs.web.WebHdfsFileSystem, nullscan=class org.apache.hadoop.hive.ql.io.NullScanFileSystem, https=class org.apache.hadoop.fs.http.HttpsFileSystem}


After:

{viewfs=class org.apache.hadoop.fs.viewfs.ViewFileSystem, swebhdfs=class org.apache.hadoop.hdfs.web.SWebHdfsFileSystem, file=class org.apache.hadoop.hive.ql.io.ProxyLocalFileSystem, har=class org.apache.hadoop.fs.HarFileSystem, http=class org.apache.hadoop.fs.http.HttpFileSystem, hdfs=class org.apache.hadoop.hdfs.DistributedFileSystem, webhdfs=class org.apache.hadoop.hdfs.web.WebHdfsFileSystem, nullscan=class org.apache.hadoop.hive.ql.io.NullScanFileSystem, https=class org.apache.hadoop.fs.http.HttpsFileSystem}



We need to call `ServiceLoader` again to load all the FileSystem.