vmware-archive / database-stream-processor

Streaming and Incremental Computation Framework
Other
222 stars 20 forks source link

Circuit without cycles causes a DBSP panic #390

Open mihaibudiu opened 1 year ago

mihaibudiu commented 1 year ago

Here is the rust code that causes problems. This circuit is source->integral->differential->sink. Error is: panicked at 'called Result::unwrap() on an Err value: CyclicCircuit { node_id: GlobalNodeId([NodeId(4)]) }', src/lib.rs:2713:8

fn circuit32() -> impl FnMut(OrdZSet<Tuple6<i32, F64, bool, String, Option<i32>, Option<F64>>, Weight>) -> (OrdZSet<Tuple6<i32, F64, bool, String, Option<i32>, Option<F64>>, Weight>, ) {
    let T = Rc::new(RefCell::<OrdZSet<Tuple6<i32, F64, bool, String, Option<i32>, Option<F64>>, Weight>>::new(Default::default()));
    let T_external = T.clone();
    let T = Generator::new(move || T.borrow().clone());
    let V = Rc::new(RefCell::<OrdZSet<Tuple6<i32, F64, bool, String, Option<i32>, Option<F64>>, Weight>>::new(Default::default()));
    let V_external = V.clone();
    let root = dbsp::RootCircuit::build(|circuit| {
        // CreateRelationStatement{tableName='T', columns=[#0: COL1 INTEGER, #1: COL2 DOUBLE, #2: COL3 BOOLEAN, #3: COL4 VARCHAR, #4: COL5 INTEGER, #5: COL6 DOUBLE]}
        // DBSPSourceOperator 5013
        // CREATE TABLE T (
        // COL1 INT NOT NULL, COL2 DOUBLE NOT NULL, COL3 BOOLEAN NOT NULL, COL4 VARCHAR NOT NULL, COL5 INT, COL6 DOUBLE)
        let T = circuit.add_source(T);
        // DBSPIntegralOperator 5022
        let stream389: Stream<_, OrdZSet<Tuple6<i32, F64, bool, String, Option<i32>, Option<F64>>, Weight>> = T.integrate();
        // DBSPDifferentialOperator 5023
        let stream390: Stream<_, OrdZSet<Tuple6<i32, F64, bool, String, Option<i32>, Option<F64>>, Weight>> = stream389.differentiate();
        // CreateRelationStatement{tableName='V', columns=[#0: COL1 INTEGER, #1: COL2 DOUBLE, #2: COL3 BOOLEAN, #3: COL4 VARCHAR, #4: COL5 INTEGER, #5: COL6 DOUBLE]}
        // CREATE VIEW V AS SELECT * FROM T
        // DBSPSinkOperator 5024
        stream390.inspect(move |m| { *V.borrow_mut() = m.clone() });
    }).unwrap();
    return move |T| {
        *T_external.borrow_mut() = T;
        root.0.step().unwrap();
        return (V_external.borrow().clone(), );
    };
}
ryzhyk commented 1 year ago

Copy of a slack discussion that explains why this happens:

I found the problem. It's partially inaccurate error reporting, and partially a real issue that I'm not sure how to best fix.

The unoptimized circuit in this example would like like this:
                       differentiate
                 ┌────────►z^-1───┐
                 │                │
                 │                ▼
─────────► + ────┼──────────────► - ──────────────►
           ▲     │
           │     │
           │     │
         z^-1◄───┘
         integrate
The feedback loop on the left is the integrator, and the z^-1 and - on the right is the differentiator circuit.
The DBSP scheduler requires circuits to be DAGs, so it can come up with an evaluation order where all predecessors of an operator are evaluated before the operator.  This circuit looks like it has a cycle, but in the actual implementation the z^-1  feedback operator is split into a pair of operators that output the value stored at the previous clock cycle and consume the new value respectively (denoted p and c below).
                                    differentiate
                        ┌────────►z^-1───┐
                        │                │
                        │                ▼
─────────► + ───────────┼──────────────► - ──────────────►
           ▲            │
           │            │
           │            │
           └─p    c ◄───┘
            integrate
The remaining z^-1 operator is redundant, as it computes the exact same value as the first z^-1.  DBSP is aware of this and will actually construct an optimized circuit:
─────────► + ───────────┬─────────────► - ──────────────►
           ▲            │               ▲
           │            │               │
           │            │               │
           └─p    c ◄───┘               │
             │                          │
             │                          │
             └──────────────────────────┘
This is still acyclic, so all looks good. Except that the integrate  operator has an additional requirement that the (+)  should consume the output of  z^-1  (the accumulated value of the integral) by value to avoid cloning it, which in turn requires that it is evaluated after all other consumers of the same stream, in this case the minus ( - ). We model this in the scheduler by adding a "dependency" edge from - to + , which models the fact that +  must be evaluated after -.
           ┌────────────────────────────┐
           │                            │
           ▼                            │
─────────► + ───────────┬─────────────► - ──────────────►
           ▲            │               ▲
           │            │               │
           └─p    c ◄───┘               │
             │                          │
             └──────────────────────────┘
And now we have a cycle between +  and -

This raises two questions:
Why doesn't this happen more often (in particular, it doesn't happen in circuits generated by the SQL compiler)
Can we make the "consume by value" constraint a soft constraint and relax it when necessary in the scheduler?

To answer the first question, the - operator in this example (which implements the differentiator), happens to consume both the output of +  and its delayed version (the output of p), which is what creates the cycle. I don't think there is any other operator that does this, and I don't think we use differentiate in practice, since all our circuits are fully incremental, so there is not need to differentiate explicitly.

As for the second question, we may want to come up with a way to relax scheduling constraints in the future.  But for now this check does serve as a safeguard against accidentally introducing performance anomalies in the circuit. We usually require that the operator consumes input by value (via the STRONGLY_PREFER_OWNED flag) to avoid very expensive clones, like cloning the entire accumulated state of a table. So my plan was to keep this constraint, see if it ever affects real-world circuits we care about and act based on findings.
ryzhyk commented 1 year ago

@mbudiu-vmw , why does the compiler generate I+D instead of just map?

mihaibudiu commented 1 year ago

These are tests that circuits before and after optimization are equivalent. I just added select * from input.