ibis-project / ibis

the portable Python dataframe library
https://ibis-project.org
Apache License 2.0
5.21k stars 591 forks source link

feat(duckdb/pyspark): load data from duckdb into pyspark #8440

Closed lostmygithubaccount closed 7 months ago

lostmygithubaccount commented 7 months ago

What happened?

trying to create example data in a PySpark connection and running into errors

repro:

[ins] In [1]: import ibis

[ins] In [2]: from pyspark.sql import SparkSession
         ...:
         ...: spark = SparkSession \
         ...:     .builder \
         ...:     .appName("PySpark2") \
         ...:     .getOrCreate()
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/24 13:42:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[ins] In [3]: con = ibis.pyspark.connect(spark)

[ins] In [4]: t = ibis.examples.penguins.fetch()

[ins] In [5]: con.create_table("penguins", t)
---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
Cell In[5], line 1
----> 1 con.create_table("penguins", t)

File ~/repos/ibis/ibis/backends/pyspark/__init__.py:395, in Backend.create_table(self, name, obj, schema, database, temp, overwrite, format)
    393     with self._active_database(database):
    394         self._run_pre_execute_hooks(table)
--> 395         df = self._session.sql(query)
    396         df.write.saveAsTable(name, format=format, mode=mode)
    397 elif schema is not None:

File ~/repos/ibis/venv/lib/python3.11/site-packages/pyspark/sql/session.py:1631, in SparkSession.sql(self, sqlQuery, args, **kwargs)
   1627         assert self._jvm is not None
   1628         litArgs = self._jvm.PythonUtils.toArray(
   1629             [_to_java_column(lit(v)) for v in (args or [])]
   1630         )
-> 1631     return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self)
   1632 finally:
   1633     if len(kwargs) > 0:

File ~/repos/ibis/venv/lib/python3.11/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):

File ~/repos/ibis/venv/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py:185, in capture_sql_exception.<locals>.deco(*a, **kw)
    181 converted = convert_exception(e.java_exception)
    182 if not isinstance(converted, UnknownException):
    183     # Hide where the exception came from that shows a non-Pythonic
    184     # JVM exception message.
--> 185     raise converted from None
    186 else:
    187     raise

AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view `penguins` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS.; line 3 pos 5;
'Project [*]
+- 'UnresolvedRelation [penguins], [], false

try with to_pyarrow():

[ins] In [6]: con.create_table("penguins", t.to_pyarrow())
---------------------------------------------------------------------------
PySparkTypeError                          Traceback (most recent call last)
Cell In[6], line 1
----> 1 con.create_table("penguins", t.to_pyarrow())

File ~/repos/ibis/ibis/backends/pyspark/__init__.py:394, in Backend.create_table(self, name, obj, schema, database, temp, overwrite, format)
    392 mode = "overwrite" if overwrite else "error"
    393 with self._active_database(database):
--> 394     self._run_pre_execute_hooks(table)
    395     df = self._session.sql(query)
    396     df.write.saveAsTable(name, format=format, mode=mode)

File ~/repos/ibis/ibis/backends/__init__.py:877, in BaseBackend._run_pre_execute_hooks(self, expr)
    875 self._define_udf_translation_rules(expr)
    876 self._register_udfs(expr)
--> 877 self._register_in_memory_tables(expr)

File ~/repos/ibis/ibis/backends/sql/__init__.py:195, in SQLBackend._register_in_memory_tables(self, expr)
    193 def _register_in_memory_tables(self, expr: ir.Expr) -> None:
    194     for memtable in expr.op().find(ops.InMemoryTable):
--> 195         self._register_in_memory_table(memtable)

File ~/repos/ibis/ibis/backends/pyspark/__init__.py:242, in Backend._register_in_memory_table(self, op)
    240 def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
    241     schema = PySparkSchema.from_ibis(op.schema)
--> 242     df = self._session.createDataFrame(data=op.data.to_frame(), schema=schema)
    243     df.createOrReplaceTempView(op.name)

File ~/repos/ibis/venv/lib/python3.11/site-packages/pyspark/sql/session.py:1440, in SparkSession.createDataFrame(self, data, schema, samplingRatio, verifySchema)
   1436     data = pd.DataFrame(data, columns=column_names)
   1438 if has_pandas and isinstance(data, pd.DataFrame):
   1439     # Create a DataFrame from pandas DataFrame.
-> 1440     return super(SparkSession, self).createDataFrame(  # type: ignore[call-overload]
   1441         data, schema, samplingRatio, verifySchema
   1442     )
   1443 return self._create_dataframe(
   1444     data, schema, samplingRatio, verifySchema  # type: ignore[arg-type]
   1445 )

File ~/repos/ibis/venv/lib/python3.11/site-packages/pyspark/sql/pandas/conversion.py:363, in SparkConversionMixin.createDataFrame(self, data, schema, samplingRatio, verifySchema)
    361             raise
    362 converted_data = self._convert_from_pandas(data, schema, timezone)
--> 363 return self._create_dataframe(converted_data, schema, samplingRatio, verifySchema)

File ~/repos/ibis/venv/lib/python3.11/site-packages/pyspark/sql/session.py:1485, in SparkSession._create_dataframe(self, data, schema, samplingRatio, verifySchema)
   1483     rdd, struct = self._createFromRDD(data.map(prepare), schema, samplingRatio)
   1484 else:
-> 1485     rdd, struct = self._createFromLocal(map(prepare, data), schema)
   1486 assert self._jvm is not None
   1487 jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())

File ~/repos/ibis/venv/lib/python3.11/site-packages/pyspark/sql/session.py:1090, in SparkSession._createFromLocal(self, data, schema)
   1088 # make sure data could consumed multiple times
   1089 if not isinstance(data, list):
-> 1090     data = list(data)
   1092 if schema is None or isinstance(schema, (list, tuple)):
   1093     struct = self._inferSchemaFromList(data, names=schema)

File ~/repos/ibis/venv/lib/python3.11/site-packages/pyspark/sql/session.py:1459, in SparkSession._create_dataframe.<locals>.prepare(obj)
   1457 @no_type_check
   1458 def prepare(obj):
-> 1459     verify_func(obj)
   1460     return obj

File ~/repos/ibis/venv/lib/python3.11/site-packages/pyspark/sql/types.py:2187, in _make_type_verifier.<locals>.verify(obj)
   2185 def verify(obj: Any) -> None:
   2186     if not verify_nullability(obj):
-> 2187         verify_value(obj)

File ~/repos/ibis/venv/lib/python3.11/site-packages/pyspark/sql/types.py:2160, in _make_type_verifier.<locals>.verify_struct(obj)
   2150         raise PySparkValueError(
   2151             error_class="LENGTH_SHOULD_BE_THE_SAME",
   2152             message_parameters={
   (...)
   2157             },
   2158         )
   2159     for v, (_, verifier) in zip(obj, verifiers):
-> 2160         verifier(v)
   2161 elif hasattr(obj, "__dict__"):
   2162     d = obj.__dict__

File ~/repos/ibis/venv/lib/python3.11/site-packages/pyspark/sql/types.py:2187, in _make_type_verifier.<locals>.verify(obj)
   2185 def verify(obj: Any) -> None:
   2186     if not verify_nullability(obj):
-> 2187         verify_value(obj)

File ~/repos/ibis/venv/lib/python3.11/site-packages/pyspark/sql/types.py:2094, in _make_type_verifier.<locals>.verify_long(obj)
   2092 def verify_long(obj: Any) -> None:
   2093     assert_acceptable_types(obj)
-> 2094     verify_acceptable_types(obj)
   2095     if obj < -9223372036854775808 or obj > 9223372036854775807:
   2096         raise PySparkValueError(
   2097             error_class="VALUE_OUT_OF_BOUND",
   2098             message_parameters={
   (...)
   2103             },
   2104         )

File ~/repos/ibis/venv/lib/python3.11/site-packages/pyspark/sql/types.py:2006, in _make_type_verifier.<locals>.verify_acceptable_types(obj)
   2003 def verify_acceptable_types(obj: Any) -> None:
   2004     # subclass of them can not be fromInternal in JVM
   2005     if type(obj) not in _acceptable_types[_type]:
-> 2006         raise PySparkTypeError(
   2007             error_class="CANNOT_ACCEPT_OBJECT_IN_TYPE",
   2008             message_parameters={
   2009                 "data_type": str(dataType),
   2010                 "obj_name": str(obj),
   2011                 "obj_type": type(obj).__name__,
   2012             },
   2013         )

PySparkTypeError: [CANNOT_ACCEPT_OBJECT_IN_TYPE] `LongType()` can not accept object `181.0` in type `float`.

What version of ibis are you using?

main

What backend(s) are you using, if any?

duckdb + pyspark

Relevant log output

see above

Code of Conduct

cpcloud commented 7 months ago

We don't support this kind of cross backend data loading, not even with two different instances of the same backend.

Don't we already have an issue for this?

cpcloud commented 7 months ago

If you can add a use case that would also be helpful. It's not trivial to make this work, so having some rationale might help justify the effort.

lostmygithubaccount commented 7 months ago

I just ran into this in getting demo data for #8090, I don't consider that worth this effort. the overall issue is in #8115, which I do think is worth prioritizing. similar to #8426 I think it adds a lot of value to be able to easily (and ideally efficiently) move data cross all the systems Ibis supports for a few use cases:

for this issue, feel free to just close in favor of #8115, though I also think it'd be good to have a better error message here ("Error: cannot transfer data between DuckDB and PySpark backends") -- I'm not sure how difficult detecting and adding that is

for the purposes of the tutorial I'll just write to CSV/Parquet and read from that

cpcloud commented 7 months ago

Converting to a feature request given the above.

gforsyth commented 7 months ago

I think it adds a lot of value to be able to easily (and ideally efficiently) move data cross all the systems Ibis supports for a few use cases:

I agree. And also, this is a monstrous problem for anything that doesn't have native arrow support. If we plan to try to recreate odo we should first check in with the ibis devs who used to work on it, and then second, make a different plan.

lostmygithubaccount commented 7 months ago

limiting to backends that have native arrow support seems fine to me. perhaps w/ exceptions for postgres and sqlite given how common they are for source data into analytics (idk if they support Arrow but I assume not)

gforsyth commented 7 months ago

(idk if they support Arrow but I assume not)

they do not. we could accomplish this in the short-term by using duckdb's ATTACH features, although that will make duckdb a dependency of the sqlite and postgres backends (although could be an optional dependency). medium-term, I think we should use ADBC for this.

lostmygithubaccount commented 7 months ago

oh yeah I like using DuckDB for that -- would vote for optional dependency

jcrist commented 7 months ago

I think this issue issue is a duplicate of #4800?

Implementation-wise, the common case would be iterating over to_pyarrow_batches() and inserting each batch in turn (possibly inside a transaction so we can roll it back on failure :shrug:). The trick here would be exposing fast paths for backends like duckdb that include native support for reading-from/writing-to another backend. AFAICT duckdb is unique among our backends in this ability (it includes native readers/writes for sqlite/mysql/postgres).

I'd vote to close this in favor of #4800, but with a focus on designing #4800 so we can handle the fast path support in duckdb.