ibis-project / ibis

the portable Python dataframe library
https://ibis-project.org
Apache License 2.0
5.21k stars 592 forks source link

bug: investigate discrepancies between Polars Python native code vs Ibis w/ the Polars backend #8050

Open kunishou opened 9 months ago

kunishou commented 9 months ago

edit from @lostmygithubaccount: we'll re-purpose this issue to investigate the q2 performance issue noticed below and the "one billion row challenge" performance issue noticed w/ the Polars backend. I may or may not investigate myself, otherwise we should dig into why these queries are slower on Ibis

What happened?

Hello. I recently started using ibis. I'm interested in whether there is a significant difference in processing speed between the original backend and the backend operated through Ibis. To investigate this, I rewrote the Polars queries of the pola-rs/tpch benchmark for Ibis , set the backend to Polars , and executed six queries. As a result , the processing speed with Ibis-Polars was significantly slower than with original Polars. Could this be due to the process of converting the Ibis API for use with Polars ? If there is any mistake in how I'm using Ibis, please point it out.

What version of ibis are you using?

7.2.0

What backend(s) are you using, if any?

Polars

Relevant log output

Code of Conduct

lostmygithubaccount commented 9 months ago

Ibis is converting its expressions to code the backend can execute -- generally SQL, but for Polars native Polars Python code. the overhead in this is minimal

Ibis is likely generating poorly performing Polars code somewhere. thanks for sharing your examples! I had recently done some tpc-h benchmarking and didn't notice much difference between Ibis with the Polars backend and native Polars code, but did notice a massive increase for the one billion row challenge code here: https://github.com/ibis-project/ibis/pull/8004

so I suspect this is specific to Polars, we need to investigate

cpcloud commented 9 months ago

TL; DR: queries appear faster because the polars versions are running against in-memory data, while the Ibis queries are running against pl.scan_parquet.

The code in these notebooks is not close enough in equivalence to make meaningful statements about performance, across multiple dimensions.

Let's look at query 1.

Ibis code

var_1 = datetime(1998, 9, 2)

q = line_item_ds

q = q.mutate(
      disc_price=q["l_extendedprice"] * (1 - q["l_discount"]),
      charge=q["l_extendedprice"] * (1 - q["l_discount"]) * (1 + q["l_tax"])
    ).cache()

q_final = (
        q.filter(q["l_shipdate"] <= var_1)
        .group_by(["l_returnflag", "l_linestatus"])
        .agg(
          [
          q["l_quantity"].sum().name("sum_qty"),
          q["l_extendedprice"].sum().name("sum_base_price"),
          q["disc_price"].sum().name("sum_disc_price"),
          q["charge"].sum().name("sum_charge"),
          q["l_quantity"].mean().name("avg_qty"),
          q["l_extendedprice"].mean().name("avg_price"),
          q["l_discount"].mean().name("avg_disc"),
          q.count().name("count_order")
          ]
          )
        .order_by(["l_returnflag", "l_linestatus"])
)

q_final.execute().head()
  1. There's a mutate call here, there's no equivalent call in the polars code
  2. You're using cache in the Ibis code but not in the polars code.

All of these cache() calls seem premature. You're caching both the input dataset and then in the Ibis case the intermediate call.

After doing the following I get similar performance for query 1:

  1. Remove the cache() call on utils.get_line_item_ds() in the Ibis notebook
  2. Convert the Ibis expression to match the polars query as close as possible. Inline the mutate expressions into the aggregation (which will remove the cache() call)

Here's the ibis code for that for query 1:

var_1 = datetime(1998, 9, 2)

q = line_item_ds

q_final = (
        q.filter(q["l_shipdate"] <= var_1)
        .group_by(["l_returnflag", "l_linestatus"])
        .agg(
          [
          q["l_quantity"].sum().name("sum_qty"),
          q["l_extendedprice"].sum().name("sum_base_price"),
          (q["l_extendedprice"] * (1 - q["l_discount"])).sum().name("sum_disc_price"),
          (q["l_extendedprice"] * (1 - q["l_discount"]) * (1 + q["l_tax"])).sum().name("sum_charge"),
          q["l_quantity"].mean().name("avg_qty"),
          q["l_extendedprice"].mean().name("avg_price"),
          q["l_discount"].mean().name("avg_disc"),
          q.count().name("count_order")
          ]
          )
        .order_by(["l_returnflag", "l_linestatus"])
)

q_final.execute().head(

In general you can see the difference in the Polars code produced by Ibis versus handwritten by comparing the results of calling explain():

print(ibis_expr.compile().explain())
print(polars_expr.explain())

We can now compare plans.

Polars:

SORT BY [col("l_returnflag"), col("l_linestatus")]
  AGGREGATE
    [col("l_quantity").sum().alias("sum_qty"), col("l_extendedprice").sum().alias("sum_base_price"), col("__POLARS_CSER_10189943427343226660").sum().alias("sum_disc_price"), [(col("__POLARS_CSER_10189943427343226660")) * ([(1.0) + (col("l_tax"))])].sum().alias("sum_charge"), col("l_quantity").mean().alias("avg_qty"), col("l_extendedprice").mean().alias("avg_price"), col("l_discount").mean().alias("avg_disc"), count().alias("count_order")] BY [col("l_returnflag"), col("l_linestatus")] FROM
     WITH_COLUMNS:
     [[(col("l_extendedprice")) * ([(1.0) - (col("l_discount"))])].alias("__POLARS_CSER_10189943427343226660")]
      DF ["l_orderkey", "l_partkey", "l_suppkey", "l_linenumber"]; PROJECT 7/17 COLUMNS; SELECTION: "[(col(\"l_shipdate\")) <= (1998-09-02 00:00:00)]"

Ibis:

SORT BY [col("l_returnflag"), col("l_linestatus")]
  AGGREGATE
    [col("l_quantity").filter(col("__POLARS_CSER_5052675655408460014")).sum().strict_cast(Int64).alias("sum_qty").alias("sum_qty"), col("l_extendedprice").filter(col("__POLARS_CSER_8603379535699628203")).sum().strict_cast(Float64).alias("sum_base_price").alias("sum_base_price"), col("__POLARS_CSER_6060913848753859363").filter(col("__POLARS_CSER_6060913848753859363").is_not_null()).sum().strict_cast(Float64).alias("sum_disc_price").alias("sum_disc_price"), col("__POLARS_CSER_7981173579976008998").filter(col("__POLARS_CSER_7981173579976008998").is_not_null()).sum().strict_cast(Float64).alias("sum_charge").alias("sum_charge"), col("l_quantity").filter(col("__POLARS_CSER_5052675655408460014")).mean().strict_cast(Float64).alias("avg_qty").alias("avg_qty"), col("l_extendedprice").filter(col("__POLARS_CSER_8603379535699628203")).mean().strict_cast(Float64).alias("avg_price").alias("avg_price"), col("l_discount").filter(col("l_discount").is_not_null()).mean().strict_cast(Float64).alias("avg_disc").alias("avg_disc"), count().strict_cast(Int64).alias("count_order").alias("count_order")] BY [col("l_returnflag"), col("l_linestatus")] FROM
     WITH_COLUMNS:
     [col("l_quantity").is_not_null().alias("__POLARS_CSER_5052675655408460014"), [(col("l_extendedprice")) * ([(1.0) - (col("l_discount"))])].alias("__POLARS_CSER_6060913848753859363"), col("l_extendedprice").is_not_null().alias("__POLARS_CSER_8603379535699628203"), [([(col("l_extendedprice")) * ([(1.0) - (col("l_discount"))])]) * ([(col("l_tax")) + (1.0)])].alias("__POLARS_CSER_7981173579976008998")]
       WITH_COLUMNS:
       [col("l_returnflag"), col("l_linestatus")]

          Parquet SCAN /content/tpch_tables_scale_1/tables_scale_1/lineitem.parquet
          PROJECT 7/17 COLUMNS
          SELECTION: [(col("l_shipdate")) <= (1998-09-02 00:00:00.cast(Datetime(Microseconds, None)))]

It looks like ibis is generating a bit of unnecessary code (the aggregation filter calls and the double alias calls) but that doesn't seem to affect the performance of query 1.

@kunishou What's going on with that initial scan?

Are these even running against the same type of scan? It doesn't look like it. The Ibis code is running against a parquet file, while the polars code is running against in memory data.

We're happy to look into this more if you can make these queries more comparable. The first and likely most impactful step would be to adjust

def _scan_ds(path: str):
    path = f"{path}.{FILE_TYPE}"
    if FILE_TYPE == "parquet":
        scan = pl.scan_parquet(path)
    elif FILE_TYPE == "feather":
        scan = pl.scan_ipc(path)
    else:
        raise ValueError(f"file type: {FILE_TYPE} not expected")
    if INCLUDE_IO:
        return scan
    return scan.collect().rechunk().lazy()

to return scan.

cpcloud commented 9 months ago

I just ran these all after converting _scan_ds to just return scan and the only query that was more than a factor of 2 worse in performance was query 2, different by factor of around 8-10x. Not sure what's going on there.

Some queries with ibis were faster than with native Polars, though it's probably not anything systematic there.

kunishou commented 9 months ago

@lostmygithubaccount Thank you for your comments and for sharing #8004 ! It seems that there is a flaw in my code for comparing native polars and ibis-polar.

cpcloud commented 9 months ago

Thanks for bringing this to our attention! It looks like there's something to look at for query 2!

kunishou commented 9 months ago

@cpcloud Thank you for showing me the correct way to use Ibis and for providing a revised query ! I also appreciate your pointing out that the conditions in the scan were not the same. Recently in Japan, the number of Polars enthusiasts has been gradually increasing. Amidst this , I want to spread the word about the greatness of Ibis. I believe that processing speed is a part that everyone pays attention to , and I wanted to prove that Ibis-Polars is not inferior compared to native Polars, which is why I was doing this verification. I will take your advice into consideration and review the query and other conditions once again ( and I will also study more about how to use Ibis ).

Thank you very much!

cpcloud commented 9 months ago

@kunishou Awesome, great to hear! Let us know how we can help.