sfu-db / connector-x

Fastest library to load data from DB to DataFrames in Rust and Python
https://sfu-db.github.io/connector-x
MIT License
1.85k stars 146 forks source link

Parallel Data Reading in Pandas Does Not Support Ordering in Queries (PostgreSQL) #633

Closed DentaCool closed 1 month ago

DentaCool commented 1 month ago

What language are you using?

Python.

What version are you using?

0.3.3/0.3.2

What database are you using?

PostgreSQL

What dataframe are you using?

Pandas

Can you describe your bug?

When performing parallel data reading with ordering in queries, the results are incorrect. Specifically, using DESC ordering on the id column causes the output to contain zeroed data.

What are the steps to reproduce the behavior?

Execute a query with ordering by any column, such as DESC ordering on id.

Database setup if the error only happens on specific data or data type

Table schema and example data

CREATE TABLE test_table (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100),
    age INT,
    email VARCHAR(100)
);

INSERT INTO test_table (name, age, email) VALUES
('Alice', 30, 'alice@example.com'),
('Bob', 25, 'bob@example.com'),
('Charlie', 35, 'charlie@example.com'),
('Diana', 28, 'diana@example.com');
In [52]: cx.read_sql(url, "select * from test_table order by id DESC limit 2", partition_on="id", partition_num=2)
Out[52]: 
   id name  age email
0   0    0    0     0
1   0    0    0     0

In [53]: cx.read_sql(url, "select * from test_table order by id ASC limit 2", partition_on="id", partition_num=2)
Out[53]: 
   id   name  age              email
0   1  Alice   30  alice@example.com
1   2    Bob   25    bob@example.com

Postgres logs:

SELECT min(CXTMPTAB_RANGE.id), max(CXTMPTAB_RANGE.id) FROM (SELECT * FROM test_table LIMIT 2) AS CXTMPTAB_RANGE

SELECT count(*) FROM (SELECT * FROM test_table LIMIT 2) AS CXTMPTAB_COUNT

COPY (SELECT * FROM (SELECT * FROM test_table ORDER BY id DESC LIMIT 2) AS CXTMPTAB_PART WHERE 1 <= CXTMPTAB_PART.id AND CXTMPTAB_PART.id < 2) TO STDOUT WITH BINARY

COPY (SELECT * FROM (SELECT * FROM test_table ORDER BY id DESC LIMIT 2) AS CXTMPTAB_PART WHERE 2 <= CXTMPTAB_PART.id AND CXTMPTAB_PART.id < 3) TO STDOUT WITH BINARY

SELECT min(CXTMPTAB_RANGE.id), max(CXTMPTAB_RANGE.id) FROM (SELECT * FROM test_table LIMIT 2) AS CXTMPTAB_RANGE

SELECT count(*) FROM (SELECT * FROM test_table LIMIT 2) AS CXTMPTAB_COUNT

COPY (SELECT * FROM (SELECT * FROM test_table ORDER BY id ASC LIMIT 2) AS CXTMPTAB_PART WHERE 1 <= CXTMPTAB_PART.id AND CXTMPTAB_PART.id < 2) TO STDOUT WITH BINARY

COPY (SELECT * FROM (SELECT * FROM test_table ORDER BY id ASC LIMIT 2) AS CXTMPTAB_PART WHERE 2 <= CXTMPTAB_PART.id AND CXTMPTAB_PART.id < 3) TO STDOUT WITH BINARY
DentaCool commented 1 month ago

Additional Issue: Data Inconsistency with Parallel Reading (without ORDER BY)

When performing parallel reading of queries, there can be instances where data no longer matches the filter criteria. For example, consider a situation where the initial count query returns 100 records with id values ranging from 1 to 101. Before fetching the partition [90..100], the age of the record with id 99 changes from 25 to 26. If the filter condition is WHERE age < 26, this record will no longer match the filter, resulting in df zeroed values like (0, 0, 0, 0).

It also seems that with an increased amount of data on parallel queries, zeros are also possible (I could not check)

WHERE age > 23 and age < 26

count - 20 range - [1...101]

56 id changed from 23 to 24 before own query part

actual_count = 21

In general, I have a problem where I get zeros with parallel reading of database data where there are frequent changes. In most cases, the database can only increase the amount of data for the specified filter

bhelga commented 1 month ago

Same issue

wangxiaoying commented 1 month ago

Hi @DentaCool , thanks for reporting the issue and the reproducible example! This fix will be included in our next release.