apache / datafusion-python

Apache DataFusion Python Bindings
https://datafusion.apache.org/python
Apache License 2.0
320 stars 63 forks source link

ORDER BY is ignored when COPYing from a pyarrow table to a csv file #609

Open progval opened 3 months ago

progval commented 3 months ago

Describe the bug ORDER BY is ignored when COPYing from a pyarrow table to a csv file

This happens both for tables created with pyarrow.Table.from_pydict and from ORC files.

To Reproduce

with datafusion 35.0.0 installed from PyPI:

from pathlib import Path

import datafusion
import pyarrow.csv
import pyarrow.dataset

ctx = datafusion.SessionContext()
ctx.from_arrow_table(pyarrow.Table.from_pydict({'value': [2, 1, 3]}), "content")

output_path = Path("/tmp/output.csv")

query = f"""
    COPY (SELECT value FROM content ORDER BY value DESC)
    TO '{output_path}' (
        FORMAT CSV,
    )
"""
df = ctx.sql(query)

columns = df.schema().names
assert columns == ["value"], columns

df.count()  # force the query to run

print(output_path.read_text())
$ python3 /tmp/order_arrow_table.py
value
2
1
3

Expected behavior Should print

value
1
2
3

Additional context

I tried to reproduce it directly in Rust (https://github.com/apache/arrow-datafusion/issues/9463), but this code does produce a sorted output as expected:

use std::sync::Arc;
use datafusion::arrow::array::PrimitiveArray;
use datafusion::arrow::datatypes::{DataType, Field, Schema, Int64Type};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::prelude::*;
use datafusion::datasource::MemTable;

#[tokio::main]
async fn main() {
    let ctx = SessionContext::new();

    let schema = Arc::new(Schema::new(vec![Field::new("value", DataType::Int64, false)]));
    let column: PrimitiveArray<Int64Type> = vec![2, 1, 3].into();
    let partition = RecordBatch::try_new(schema.clone(), vec![Arc::new(column)]).unwrap();
    let table = MemTable::try_new(schema, vec![vec![partition]]).unwrap();
    ctx.register_table("content", Arc::new(table)).unwrap();

    let df = ctx.sql("COPY (SELECT value FROM content ORDER BY value) TO '/tmp/output.csv'").await.unwrap();
    df.collect().await.unwrap();
}
mesejo commented 3 months ago

This work correctly if you do

df.collect()

as opposed to

df.count()

What is (probably) happening is that DataFusion optimizes the query (removing the ORDER BY) since count is not changed by order.