TimelyDataflow / differential-dataflow

An implementation of differential dataflow using timely dataflow on Rust.
MIT License
2.54k stars 183 forks source link

Improve the precision of `half_join` #386

Closed frankmcsherry closed 1 year ago

frankmcsherry commented 1 year ago

This PR changes the behavior of the half_join methods in dogsdogsdogs to better track the required capabilities for outstanding work. The lack of precision has the potential to lead to stalls where work is not done, especially when that work would feed back around to the arrangement the operator uses to unblock its own work.

This has not been tested in any form other than cargo test running fine. There is a Materialize issue that this may resolve, but I wouldn't be at all surprised if there are other issues in the written code. Up for inspection and discussion.

cc: @ggevay

ggevay commented 1 year ago

It seems to be solving the problem: the original query from https://github.com/MaterializeInc/materialize/issues/18759 is not hanging anymore! I also ran tpch.td, which has lots of Delta joins, as well as all slts, and all tests went through.

I'll look through the code in detail now.

@philip-stoev, could you please test this with random WMR queries with Delta joins? (The fixed half_join code is part of Delta joins.) It would be great to have various scenarios for how the input data are presented to the Delta join: The original query has constant inputs, which means a single batch with the input immediately closed, but other variations could be various insertions into tables that are inputs to the Delta join dataflow. For example, the following variation of the original query hangs at the last select without the fix:

CREATE TABLE c1_table (f1 INTEGER, f2 INTEGER, f3 INTEGER);

create materialized view view1 as
 WITH MUTUALLY RECURSIVE c1 (f1 INTEGER, f2 INTEGER, f3 INTEGER) AS (
     SELECT *
     FROM c1_table
   UNION ALL (
                SELECT a1.f1 + 1 AS f1,
                       a1.f2 + 1 AS f2,
                       a1.f3 + 1 AS f3
                  FROM c1 AS a1 ,
                       (
                        SELECT *
                          FROM c1 AS a1
                          LEFT JOIN c1 AS a2 USING (f2)
                         WHERE a1.f1 < 100
                           AND a2.f2 IS NULL
                       ) AS a2
                 WHERE a1 . f1 < 100
               )
       )
SELECT * FROM c1;

select * from view1;

insert into c1_table values (0, 0, 0);

select * from view1;

(For join inputs that are not directly coming from indexed tables, one way to create the arrangement that needs to be there for a Delta join to be planned is to put a Distinct or other Reduce at the root of the join input, where the key is the same as the join key.)

ggevay commented 1 year ago

The code makes sense to me, but it would be good if someone who knows more about Timely/Differential would also check it. Maybe @antiguru or @vmarcos. This is the Materialize issue, and a Slack thread with more details of what is happening.

Also, it would be good to somehow test the else if antichain.len() > 1 { part. I think the query in the original issue is not exercising it.

frankmcsherry commented 1 year ago

I'll try and think through a test case for it that forces a multi-element frontier there.

vmarcos commented 1 year ago

I looked this through and it makes sense to me that the changes are safe, since we only enforce higher precision on the lower bounds of times in the stash. However, I must admit I am not 100% confident that I understood the root cause of the original issue. Is it because we have a self-join and the frontiers in the arranged inputs advance at the same rate? To delve deeper, I'd probably need to invest some debugging time here.

frankmcsherry commented 1 year ago

@ggevay or I can explain the issue in more detail, but it stems from being imprecise about which capabilities are required, which blocks downstream batch formation, which believes that these capabilities might still change data. This causes the dataflow to stall out at certain times when it should be able to continue. In simple cases (without a dataflow cycle) this was fine as everything would just drain. With a cycle, these held-back capabilities prevent event times from completing that the whole dataflow seizes up.

frankmcsherry commented 1 year ago

I did not manage to think through a test for the multi-element antichain. Going to merge nonetheless, and will work harder on this in spare moments.

ggevay commented 1 year ago

@vmarcos, let's discuss it in the Wednesday office hours.

vmarcos commented 1 year ago

@vmarcos, let's discuss it in the Wednesday office hours.

Sounds good, thanks!