apache / datafusion-python

Apache DataFusion Python Bindings
https://datafusion.apache.org/python
Apache License 2.0
375 stars 78 forks source link

Can't convert date_bin aggregated with count(*) to arrow if some windows contain null data #862

Open rickspencer3 opened 2 months ago

rickspencer3 commented 2 months ago

Describe the bug When using count(*) to aggregate data with date_bin where some of the windows have no data, the datafusion.dataframe object is created fine, but to_arrow_table() raises the below exception.

To Reproduce The following code reproduces the error, as I see it.

from pyarrow import flight
from datafusion import SessionContext

ctx = SessionContext()

test_data = [
    {"id": 1, "created_at": "2024-09-07 10:01:05", "content": "First entry in first minute"},
    {"id": 2, "created_at": "2024-09-07 10:01:45", "content": "Second entry in first minute"},
    {"id": 3, "created_at": "2024-09-07 10:03:10", "content": "First entry in third minute"},
    {"id": 4, "created_at": "2024-09-07 10:03:55", "content": "Second entry in third minute"},
]

ctx.from_pylist(test_data, "count_me")

sql = """ SELECT
DATE_BIN(INTERVAL '1 minute', created_at) AS time_window,
        COUNT(*) AS count
    FROM
        count_me

GROUP BY
    time_window
ORDER BY
    time_window;"""

df = ctx.sql(sql)
df.to_arrow_table()

This results in:

pyarrow.lib.ArrowInvalid: Schema at index 0 was different: 
time_window: timestamp[ns]
count: int64 not null
vs
time_window: timestamp[ns]
count: int64

Expected behavior I would expect there to be 3 rows of data, with the count column being [2,0,2]

rickspencer3 commented 2 months ago

When looking at the dataframe under the debugger, it looks like it skipped the null values ok:

df
DataFrame()
+---------------------+-------+
| time_window         | count |
+---------------------+-------+
| 2024-09-07T10:01:00 | 2     |
| 2024-09-07T10:03:00 | 2     |
+---------------------+-------+

df.schema()
time_window: timestamp[ns]
count: int64 not null

So I am pretty confused about the error message.

    return self.df.to_arrow_table()
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "pyarrow/table.pxi", line 4865, in pyarrow.lib.Table.from_batches
  File "pyarrow/error.pxi", line 155, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Schema at index 0 was different: 
time_window: timestamp[ns]
count: int64 not null
vs
time_window: timestamp[ns]
count: int64
timsaucer commented 2 months ago

Another minimal example

from datafusion import SessionContext, col, functions as F
ctx = SessionContext()
df = ctx.from_pydict({"a": [1, 2, 3]})
df = df.aggregate([col("a")], F.count(col("a")).alias("count"))
df.show()
df.to_arrow_table()

It looks like it's something to do with a mix of nullable and non-nullable fields passing when converting to pyarrow. I'll try to investigate what's happening.

Interestingly, on the latest main you can convert it to a pyarrow table directly

import pyarrow as pa
pa.table(df)

Works just fine. So I'm guessing there's something happening in to_arrow_table.

timsaucer commented 2 months ago

Also, when I coerced the count field to be nullable it was able to work, so I've got a strong suspicion about that mixture problem.

rickspencer3 commented 2 months ago

Thank you for considering my issue. I tried various ways of coercing the field to nullable, but I couldn't figure anything out from the documentation.

@timsaucer If you could get a moment, could I ask you to please drop the code here that you used so I can put into place a work around for the time being?

timsaucer commented 2 months ago

It's super hacky:

df = df.select("time_window", F.when(col("count") != lit(0), col("count")).end().alias("count"))
timsaucer commented 2 months ago

Oh, I just realized that might not work since functions.when was recently exposed. You would need to do something like F.case(col("count") != lit(0)).when(lit(True), col("count")).end() instead

rickspencer3 commented 2 months ago

Oh, I just realized that might not work since functions.when was recently exposed. You would need to do something like F.case(col("count") != lit(0)).when(lit(True), col("count")).end() instead

Thank you so much for the help. Much appreciated. Please let me know if there is anything that I can do to help. It sounds like, if all else fails, there is a workaround coming with pa.table(df).

mesejo commented 2 months ago

I think that the issue #534 may be related to this one, which seems to be caused by a problem upstream (see datafusion#12307)