marsupialtail / quokka

Making data lake work for time series
https://marsupialtail.github.io/quokka/
Apache License 2.0
1.14k stars 60 forks source link

Schema not correct after aggregation #23

Closed skrawcz closed 1 year ago

skrawcz commented 1 year ago

what

The "schema" of the DataStream after the aggregation is missing the mean columns and is thus incorrect.

to reproduce

Run the hello world example:

from pyquokka.df import QuokkaContext
qc = QuokkaContext()
lineitem = qc.read_csv("lineitem.tbl.named", sep="|", has_header=True)
d = lineitem.filter("l_shipdate <= date '1998-12-01' - interval '90' day")
print(len(d.schema))
d = d.with_column("disc_price", lambda x: x["l_extendedprice"] * (1 - x["l_discount"]), required_columns ={"l_extendedprice", "l_discount"})
print(len(d.schema))
d = d.with_column("charge", lambda x: x["l_extendedprice"] * (1 - x["l_discount"]) * (1 + x["l_tax"]), required_columns={"l_extendedprice", "l_discount", "l_tax"})
print(len(d.schema))
f = d.groupby(["l_returnflag", "l_linestatus"], orderby=["l_returnflag","l_linestatus"]).agg({"l_quantity":["sum","avg"], "l_extendedprice":["sum","avg"], "disc_price":"sum", "charge":"sum", "l_discount":"avg","*":"count"})
print(f.schema)
print(len(d.schema)) # <--- this is incorrect and only has 8 columns
df = f.collect()
print(df.columns)  
print(len(df.columns)) # <--- this has 10 columns and is correct

what I expect

Is that when I get a DataStream, the "schema" it has should reflect what it would actually produce.

marsupialtail commented 1 year ago

fixed with commit 73d4d3daa7fca2fdee6c4d82a638f95a7ee4581a