ibis-project / ibis

the portable Python dataframe library
https://ibis-project.org
Apache License 2.0
4.86k stars 579 forks source link

bug: cannot directly execute an expression connected to DuckDB table on PySpark backend #9602

Closed chloeh13q closed 2 weeks ago

chloeh13q commented 1 month ago

What happened?

Following the Ibis tutorial here, I want to do something similar by first writing and executing an expression on DuckDB and then connecting to PySpark and executing the same expression on PySpark.

However, the SQL generated from the DuckDB expression is throwing an error when executed on PySpark. It's generating the name of the table as "memory.main.payments" and this is not a valid syntax in PySpark:

AnalysisException: [REQUIRES_SINGLE_PART_NAMESPACE] spark_catalog requires a single-part namespace, but got `memory`.`main`.
>>> import ibis
>>>
>>> con = ibis.get_backend()
>>> t = con.create_table("payments", df)
>>> t
DatabaseTable: memory.main.payments
  createTime  timestamp(6)
  orderId     int64
  payAmount   float64
  payPlatform int64
  provinceId  int64
>>> provinces = (
...     "Beijing",
...     "Shanghai",
...     "Hangzhou",
...     "Shenzhen",
...     "Jiangxi",
...     "Chongqing",
...     "Xizang",
... )
>>> province_id_to_name_df = pd.DataFrame(
...     enumerate(provinces), columns=["provinceId", "province"]
... )
>>> province_id_to_name_df
   provinceId   province
0           0    Beijing
1           1   Shanghai
2           2   Hangzhou
3           3   Shenzhen
4           4    Jiangxi
5           5  Chongqing
6           6     Xizang
>>> from pyspark.sql import SparkSession
>>>
>>> session = SparkSession.builder \
...     .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1")\
...     .config("spark.sql.streaming.schemaInference", True)\
...     .config("spark.ui.port","4050")\
...     .getOrCreate()
>>> schema = ibis.schema(
...     {
...         "createTime": "timestamp(3)",
...         "orderId": "int64",
...         "payAmount": "float64",
...         "payPlatform": "int32",
...         "provinceId": "int32",
...     }
... )
>>>
>>> con = ibis.pyspark.connect(session, mode="streaming")
>>>
>>> con.read_kafka(
...     "payment_msg",
...     watermark=ibis.watermark(time_col="createTime", allowed_delay=ibis.interval(seconds=10)),
...     schema=schema,
...     auto_parse=True,
...     options={"kafka.bootstrap.servers": "localhost:9092", "subscribe": "payment_msg", "startingOffsets": "earliest"}
... )
DatabaseTable: payment_msg
  createTime  timestamp
  orderId     int64
  payAmount   float64
  payPlatform int32
  provinceId  int32
>>> con.compile(t.join(province_id_to_name_df, ["provinceId"]))
'SELECT `t2`.`createTime`, `t2`.`orderId`, `t2`.`payAmount`, `t2`.`payPlatform`, `t2`.`provinceId`, `t3`.`province` FROM `memory`.`main`.`payments` AS `t2` INNER JOIN `ibis_pandas_memtable_g4tatyyimzhs7kevu5uxwl3lwu` AS `t3` ON `t2`.`provinceId` = `t3`.`provinceId`'
>>> t = con.table("payment_msg")
>>> con.compile(t.join(province_id_to_name_df, ["provinceId"]))
'SELECT `t2`.`createTime`, `t2`.`orderId`, `t2`.`payAmount`, `t2`.`payPlatform`, `t2`.`provinceId`, `t3`.`province` FROM `payment_msg` AS `t2` INNER JOIN `ibis_pandas_memtable_kut46v76lzblhemiwkjnrme6km` AS `t3` ON `t2`.`provinceId` = `t3`.`provinceId`'

What version of ibis are you using?

main

What backend(s) are you using, if any?

DuckDB, PySpark

Relevant log output

No response

Code of Conduct

chloeh13q commented 1 month ago

Did some digging with @ncclementi -

The issue here is that when we create an in-memory table in DuckDB, Ibis uses the current catalog and database to generate a complete name, which becomes memory.main.table_name. And memory.main is not a valid catalog/database in other backends.

The question is: where should the in-memory table live, when we create them in different backends? If they always live in the default catalog/database of the specific backend, then perhaps expressions created from one backend should not be able to be executed in a different backend, because even though it's the same table name, the table lives in a different catalog/database (so it's technically a different table). But - if all that the user cares about is an in-memory table, should they need to worry about catalog/database at all?

Separately, there also seems to be some inconsistency in how we deal with in-memory table names, since the name of the in-memory table in PySpark is just table_name, not prefixed by catalog or database.

@gforsyth It seems that you have worked on a large chunk of this code and may have some thoughts here. Tagging for visibility.

gforsyth commented 1 month ago

Hey @chloeh13q and @ncclementi -- thanks for digging in here!

We don't necessarily have a choice about where in-memory tables live -- they are temporary tables and there are restrictions on where they can be created, dependent on the backend. Hmm, although I interpret "in-memory table" as a memtable. Anyway, moving on...

This appears to be a weird hiccup in how we're handling prepending catalog + database location in table expressions.

I have three takeaways here (I think, maybe more?)

  1. A workaround: if you use the DatabaseTable that is the output of create_table in DuckDB, this has the catalog and database location prepended to it. However, if you then grab a separate table reference to the same table (via e..g con.table) it doesn't:
    
    [ins] In [39]: con.create_table("penguins", t, overwrite=True)
    Out[39]: 
    DatabaseTable: memory.main.penguins
    species           string
    island            string
    bill_length_mm    float64
    bill_depth_mm     float64
    flipper_length_mm int64
    body_mass_g       int64
    sex               string
    year              int64

[ins] In [40]: t = con.tables.penguins

Out[41]: DatabaseTable: penguins species string island string bill_length_mm float64 bill_depth_mm float64 flipper_length_mm int64 body_mass_g int64 sex string year int64



2. What I just wrote in 1 is obviously a usability bug and should be fixed.
3. Currently `unbind` strips out backend-specific bindings from a table expression, but we may also want to have it "sanitize" the table names, to allow for execution between backends that have nested table hierarchies and those that don't.
ncclementi commented 1 month ago

Thanks @gforsyth for looking into this.

Couple more notes/questions:

When we get into create_table in duckdb, we have catalog and database being memory and main respectively. see https://github.com/ibis-project/ibis/blob/3b2a7ecfd3d7625bc316ee20ee1157348db410b4/ibis/backends/duckdb/__init__.py#L173-L174

When passed to the ops.Database() in table() the namespace namespace=ops.Namespace(catalog=catalog, database=database), results in namespace=memory.main.my_table

But for pyspark we don't have a catalog nor a db, in here these are both None https://github.com/ibis-project/ibis/blob/3b2a7ecfd3d7625bc316ee20ee1157348db410b4/ibis/backends/pyspark/__init__.py#L505-L506

Then the namespace=ops.Namespace(catalog=catalog, database=database), is just the table name without memory.main .

Is this expected?

cc: @chloeh13q for your demo, you can grab the table reference as Gil showed on his case (1).

cpcloud commented 1 month ago

We noticed that when you create an in memory table pyspark, for example, it doesn't append the memory.main. I wonder if in case 1. this is just a naming issue.

No, this is working expected. Prefixing with memory.main is a DuckDB implementation detail, and doing that for PySpark doesn't make sense.

cpcloud commented 1 month ago

One option might be to add catalog and database parameters to unbind(), so that you could set them yourself, which would also cover the case of erasing them (setting them to None).

gforsyth commented 1 month ago

Let me throw up a PR adding in catalog and database to unbind -- I think that makes sense.

gforsyth commented 1 month ago

And actually, as a default, stripping namespace information in unbind makes sense, too

gforsyth commented 1 month ago

Ok, there were some issues with my initial PR to handle this.

There are two chunks of work here:

  1. Make Ibis self-consistent in how we attach <catalog>.<database> locations to tables (and make sure that information gets passed everywhere and we aren't implicitly relying on tables existing in the current namespace
  2. Once we are self-consistent, allow passing a mapping to unbind, a la rename, so a user can explicitly retarget a query by updating table locations in some target backend B
cpcloud commented 2 weeks ago

I think this issue may be conflating two things:

No-code-change expression execution

This works, even when temp tables are in use, so the inability to do this with SQL doesn't have much bearing on the explicit goals of Ibis.

No-SQL-change execution

This is definitely not something that is guaranteed in any way, nor do I think we should or are able to guarantee it. No amount of SQL massaging will in general get you the ability to generate cross-backend compatible SQL that uses temp tables.

Closing as wontfix.