Open alamb opened 1 month ago
Could potentially be related to https://github.com/apache/datafusion/pull/11535
Note the test passes on re-run for PR #11527:
Failure: https://github.com/apache/datafusion/actions/runs/10001535587/job/27645407724?pr=11527 Pass (same code): https://github.com/apache/datafusion/actions/runs/10001535587/job/27681929975?pr=11527
I think the nature can be similar to https://github.com/apache/datafusion/pull/11041/files#r1648318160
I'll do a fix, thanks @alamb for reporting it
🤔 it seems to have happened again on main right after https://github.com/apache/datafusion/pull/11604 was merged:
https://github.com/apache/datafusion/actions/runs/10046115241/job/27764888311
I have disabled the test for now. I'll spend more time on investigation why this happens
I'm still on it. It has a pretty tricky condition for cross buffered batches.
UPD: I built a repro, working on solution
I found the problem happens if for 1 stream row there are multiple matched buffered rows, but those buffered rows are in separate batches. In this case the datafusion SMJ reacts on the first batch without knowing the next one is coming. I'm still experimenting to find a solution even a hacky one
the repro test case
#[tokio::test]
async fn test_cross_b_bb() {
let left: Vec<RecordBatch> = make_staggered_batches(1);
let left = vec![RecordBatch::try_new(
left[0].schema().clone(),
vec![
Arc::new(Int32Array::from(vec![2])),
Arc::new(Int32Array::from(vec![8])),
Arc::new(Int32Array::from(vec![-18656817])),
Arc::new(Int32Array::from(vec![2133711598])),
],
).unwrap()];
let right = vec![RecordBatch::try_new(
left[0].schema().clone(),
vec![
Arc::new(Int32Array::from(vec![2])),
Arc::new(Int32Array::from(vec![8])),
Arc::new(Int32Array::from(vec![1875176725])),
Arc::new(Int32Array::from(vec![728454380])),
],
).unwrap(),
RecordBatch::try_new(
left[0].schema().clone(),
vec![
Arc::new(Int32Array::from(vec![2])),
Arc::new(Int32Array::from(vec![8])),
Arc::new(Int32Array::from(vec![102493212])),
Arc::new(Int32Array::from(vec![161372512])),
],
).unwrap()
];
JoinFuzzTestCase::new(
left,
right,
JoinType::LeftAnti,
Some(Box::new(col_lt_col_filter)),
)
.run_test(&[JoinTestType::HjSmj], false)
.await;
}
Attached more accurate test case
#[tokio::test]
async fn test_cross() {
let left: Vec<RecordBatch> = make_staggered_batches(1);
let left = vec![
RecordBatch::new_empty(left[0].schema().clone()),
RecordBatch::try_new(
left[0].schema().clone(),
vec![
Arc::new(Int32Array::from(vec![0, 0, 1, 1])),
Arc::new(Int32Array::from(vec![1, 2, 3, 4])),
Arc::new(Int32Array::from(vec![1001, 1002, 1003, 1004])),
Arc::new(Int32Array::from(vec![10011, 10021, 10031, 10051])),
],
).unwrap()
];
let right = vec![
RecordBatch::try_new(
left[0].schema().clone(),
vec![
Arc::new(Int32Array::from(vec![0, 0, 0, 1, 1, 1])),
Arc::new(Int32Array::from(vec![1, 2, 3, 1, 2, 3])),
Arc::new(Int32Array::from(vec![2001, 2002, 2003, 2004, 2005, 2006])),
Arc::new(Int32Array::from(vec![20011, 20021, 20031, 20041, 20051, 20061])),
],
).unwrap(),
RecordBatch::try_new(
left[0].schema().clone(),
vec![
Arc::new(Int32Array::from(vec![1, 1, 1, 1, 2, 2, 2])),
Arc::new(Int32Array::from(vec![3, 3, 4, 5, 6, 7, 8])),
Arc::new(Int32Array::from(vec![3000, 3001, 3002, 3003, 3004, 3005, 3006])),
Arc::new(Int32Array::from(vec![30001, 30011, 30021, 30031, 30041, 30051, 30061])),
],
).unwrap(),
];
JoinFuzzTestCase::new(
left,
right,
JoinType::LeftAnti,
Some(Box::new(col_lt_col_filter)),
)
.run_test(&[JoinTestType::HjSmj], false)
.await;
}
well the problem is AntiJoin needs to wait for the very last right batch to read for the respective left row.
I tried couple of options how to identify the very last right batch,
self.buffered_data.scanning_offset == 0
or
self.buffered_data.scanning_finished()
But each of them has its own false positives or false negatives. Perhaps we need a separate function or index to calculate the very last batch. @korowa do you have any other ideas on that, as you contributed a lot to SMJ, appreciate if you can help
From what I remember -- doesn't SMJ already fetches buffered side until it meets the first key which is non-equal to the current streamed side value (PollingRest
buffered state)? It seems to be a solution for waiting the last batch for the key -- for any type (normal/semi/anti) of join, on start of streamed row processing, all required rows from the buffered side should be already read and stored in memory.
Or maybe I've mistunderstood the problem?
Thanks @korowa that is something I'm also trying, I hope to make a PR soon adding you as a reviewer
I think this local test may cover lots of cases
#[tokio::test]
async fn test_cross_1() {
let left: Vec<RecordBatch> = make_staggered_batches(1);
let left = vec![
RecordBatch::try_new(
left[0].schema().clone(),
vec![
Arc::new(Int32Array::from(vec![0, 0, 0, 0, 0, 0, 0])),
Arc::new(Int32Array::from(vec![10, 11, 15, 20, 30, 30, 30])),
Arc::new(Int32Array::from(vec![110, 111, 115, 120, 129, 130, 131])),
Arc::new(Int32Array::from(vec![1100, 1110, 1150, 1200, 1290, 1300, 1310])),
],
).unwrap(),
RecordBatch::try_new(
left[0].schema().clone(),
vec![
Arc::new(Int32Array::from(vec![0, 0, 0, 0])),
Arc::new(Int32Array::from(vec![30, 40, 50, 70])),
Arc::new(Int32Array::from(vec![132, 140, 150, 170])),
Arc::new(Int32Array::from(vec![1320, 1400, 1500, 1700])),
],
).unwrap()
];
let right = vec![
RecordBatch::try_new(
left[0].schema().clone(),
vec![
Arc::new(Int32Array::from(vec![0])),
Arc::new(Int32Array::from(vec![10])),
Arc::new(Int32Array::from(vec![1100])),
Arc::new(Int32Array::from(vec![11011])),
],
).unwrap(),
RecordBatch::try_new(
left[0].schema().clone(),
vec![
Arc::new(Int32Array::from(vec![0])),
Arc::new(Int32Array::from(vec![20])),
Arc::new(Int32Array::from(vec![2100])),
Arc::new(Int32Array::from(vec![21011])),
],
).unwrap(),
RecordBatch::try_new(
left[0].schema().clone(),
vec![
Arc::new(Int32Array::from(vec![0, 0, 0])),
Arc::new(Int32Array::from(vec![30, 30, 30])),
Arc::new(Int32Array::from(vec![3100, 3101, 3102])),
Arc::new(Int32Array::from(vec![31001, 31011, 31021])),
],
).unwrap(),
RecordBatch::try_new(
left[0].schema().clone(),
vec![
Arc::new(Int32Array::from(vec![0, 0, 0, 0])),
Arc::new(Int32Array::from(vec![30, 30, 30, 40])),
Arc::new(Int32Array::from(vec![3110, 3111, 3112, 4099])),
Arc::new(Int32Array::from(vec![31101, 31111, 31121, 40991])),
],
).unwrap(),
RecordBatch::try_new(
left[0].schema().clone(),
vec![
Arc::new(Int32Array::from(vec![0, 0])),
Arc::new(Int32Array::from(vec![40, 49])),
Arc::new(Int32Array::from(vec![4100, 6100])),
Arc::new(Int32Array::from(vec![41011, 61011])),
],
).unwrap(),
];
JoinFuzzTestCase::new(
left,
right,
JoinType::LeftAnti,
Some(Box::new(col_lt_col_filter)),
)
.run_test(&[JoinTestType::HjSmj], false)
.await;
}
My initial thought was to do like :
freeze_streamed
called until we hit matched batches length, so presumably processed all the data.But this approach has a flaw, namely
for given streamed row like in test above: for (0, 30) key there are 2 matches batches, but the way freeze_streamed
called is not determenistic.
So for (0, 30) there are 2 batches, 3 matched rows each. Sometimes thery are processed in 6 calls, 1 buffered row per call. sometimes 1 call for 3 rows, and then 3 calls per 1 buffered row.
Describe the bug
I have seen this test fail twice now on two unrelated PRs:
11540: https://github.com/apache/datafusion/actions/runs/10011021548/job/27673684873?pr=11540
11527: https://github.com/apache/datafusion/actions/runs/10001535587/job/27645407724?pr=11527
And
To Reproduce
Not sure -- it is happening on CI intermittently
Expected behavior
No response
Additional context
No response