influxdata / flux

Flux is a lightweight scripting language for querying databases (like InfluxDB) and working with data. It's part of InfluxDB 1.7 and 2.0, but can be run independently of those.
https://influxdata.com
MIT License
769 stars 153 forks source link

Simplify generation of yield/result nodes #4465

Closed wolffcm closed 2 years ago

wolffcm commented 2 years ago

We have some technical debt with how we generate implicit yield nodes.

Just before we start planning, we add them in here, if a plan node has no successors and does not produce side effects: https://github.com/influxdata/flux/blob/ee80d230d11c5da9d50b8fa58951d4c61cfc3a8c/plan/logical.go#L265-L278

Later on, when we build the execution graph, we generate an implicit yield if there are no successors and the node does produce a side effect: https://github.com/influxdata/flux/blob/ee80d230d11c5da9d50b8fa58951d4c61cfc3a8c/execute/executor.go#L237-L242 (In this case we call it a result instead of a yield)

It seems clear we need to generate a yield (or whatever) for any node that does not have successors. We should do this in the same place every time. My recollection is that at one point we want to implicitly yield only if there were no side effects, but then we decided that we want to yield in the side effect case too. It's confusing to have this contradictory logic like this.

DOD:

wolffcm commented 2 years ago

It seems like not all code paths generate a yield, and that's why we needed this PR: https://github.com/influxdata/flux/pull/4488 We need a better understanding of this case before we can make this change.

onelson commented 2 years ago

In an effort to better understand how we arrive at each of the two branches, I cooked up a small repro:

import "array"
import "experimental"
import "sql"

emptyFn = () => array.from(rows: [{x: 1}])
write = () => emptyFn() |> sql.to(dataSourceName: ":memory:", driverName: "sqlite3", table: "none")

experimental.chain(first: write(), second: emptyFn())

The executor branch is hit while write() executes, and the planner branch is hit by the chain().

I'm not yet sure how the refactor should actually go, but by tracing through with the debugger I'm hoping to form some opinions.

onelson commented 2 years ago

With breakpoints set in each of the two places we generate implicit yields today, I ran the reproducer (above) through the debugger.

First, we land in the executor branch.

Screenshot_20220304_135119

Frames of (my) interest for this might be:

In this case, the final frame where we hit the breakpoint is inside createExecutionNodeVisitor.Visit.


Second, we land in the planner branch.

Screenshot_20220304_111918

The logicalNode here is named array.from0, produced by emptyFn. My belief is this is the one returned by experimental.chain() (which will spit out the second parameter directly after trying to block on the stream argument passed as first).

In future runs, I may experiment with wrapping the array.from calls in differently named functions to help keep better track of which expressions are being visited, and when. I'm sure the detail I need is already available, but I'm still working to better understand the execution path so more landmarks might be helpful.

Frames of (my) interest here are:

The visitor is used to walk nodes in the plan, and the branch where we insert the yield happens inside the call to fluxSpecVisitor.visitOperation.

My thinking is I'll set some breakpoints around logical.go:186 to see if the visitor can see the other node that would get an implicit yield as it goes.


The common root for both branches is the call to REPL.executeLine, defined at repl.go:175.


Questions I have currently:

Separately, I think we had some discussion at one point of improving the names we generate for implicit yields. An idea was to give them distinct (numbered) names automatically like we do for nodes.

Part of what I'm wondering is if the only way to land in the executor branch is via the "sub program" used for experimental.chain() which has been shown to be problematic for flux test in #4302.

If experimental.chain() is the only way to trigger this branch, I'd perhaps look to refactor it such that it doesn't.

onelson commented 2 years ago

I've been working towards proving or debunking the notion of the branch we land in being determined by the use of a "sub program" or not.

So far, signs point to this not being the reason. During the standup today, tableFind was offered as a 2nd place in the stdlib where we compile a "sub program" and execute it. With tableFind, we don't land in either branch -- no implicit yield is generated. This could be just because tableFind doesn't advertise side-effects, or I'm not sure what else.

Refining my repro program:

import "array"
import "experimental"
import "sql"

read = () => array.from(rows: [{x: 1}]) |> group(columns: ["x"])
write = () =>
    array.from(rows: [{x: 1}])
        |> group(columns: ["x"])
        |> sql.to(dataSourceName: ":memory:", driverName: "sqlite3", table: "none")

// read and write each hit their respective branches ....
// r => planner
read()
// w => executor
write()

// hits the planner branch twice
//experimental.chain(first: read(), second: read())

// hits the executor branch twice
// experimental.chain(first: write(), second: write())

// r,w or w,r in chain lands us in each branch once

As such, it seems the branch selection is entirely based on whether or not the ops in question advertise side-effects.

Making a change like the following in the planner branch seems to allow it to cover both cases, rather than letting the side-effecting sql.to from slipping through to the executor branch.

diff --git a/plan/logical.go b/plan/logical.go
index 91227343..b832901a 100644
--- a/plan/logical.go
+++ b/plan/logical.go
@@ -263,7 +263,7 @@ func (v *fluxSpecVisitor) visitOperation(o *flux.Operation) error {

        // no children => no successors => root node
        if len(v.spec.Children(o.ID)) == 0 {
-               if isYield || HasSideEffect(procedureSpec) {
+               if isYield && HasSideEffect(procedureSpec) {
                        v.plan.Roots[logicalNode] = struct{}{}
                } else {
                        // Generate a yield node

Seems like a change similar to this (not this -- this is incorrect), combined with removal of the other block might satisfy the DOD. I'm not sure how to best prove this is adequate outside of just showing the existing test suite passes (assuming it does).

onelson commented 2 years ago

I'm not sure how to best prove this is adequate outside of just showing the existing test suite passes (assuming it does).

The existing suite does not pass.

--- FAIL: TestPlan_LogicalPlanFromSpec (0.20s)                                                                                                                                                                                             
    --- FAIL: TestPlan_LogicalPlanFromSpec/Non-yield_side_effect (0.02s)                                                                                                                                                                   
        logical_test.go:325: plans have 3 and 4 nodes respectively                                                                                                                                                                         
    --- FAIL: TestPlan_LogicalPlanFromSpec/Multiple_non-yield_side_effect (0.03s)                                                                                                                                                          
        logical_test.go:316: found more than one call to yield() with the name "_result"                                                                                                                                                   
    --- FAIL: TestPlan_LogicalPlanFromSpec/side_effect_and_a_generated_yield (0.02s)                                                                                                                                                       
        logical_test.go:316: found more than one call to yield() with the name "_result"

The executor branch looks like it has some different code for generating names for the result based on the node leading to the yield being inserted.

I'll keep poking at this. The solution might be to fold the code from both branches together but we may need to revise some tests to get this to work. If assertions were made along the way with the assumption that some yields would not be generated, but they were being generated (somewhere else, out of view of the test) then the tests could be wrong...

onelson commented 2 years ago

Weakly feeling this out. Here's a patch that accounts for the dynamic naming of yields for the case where we see side-effecting nodes without successors, but continues to use the default name of _result for the else case:

diff --git a/plan/logical.go b/plan/logical.go
index 91227343..505860a4 100644
--- a/plan/logical.go
+++ b/plan/logical.go
@@ -211,8 +211,9 @@ func (v *fluxSpecVisitor) addYieldName(pn Node) error {
        return nil
 }

-func generateYieldNode(pred Node) Node {
-       yieldSpec := &GeneratedYieldProcedureSpec{Name: DefaultYieldName}
+func generateYieldNode(pred Node, name string) Node {
+
+       yieldSpec := &GeneratedYieldProcedureSpec{Name: name}
        yieldNode := CreateLogicalNode(NodeID("generated_yield"), yieldSpec)
        pred.AddSuccessors(yieldNode)
        yieldNode.AddPredecessors(pred)
@@ -263,17 +264,25 @@ func (v *fluxSpecVisitor) visitOperation(o *flux.Operation) error {

        // no children => no successors => root node
        if len(v.spec.Children(o.ID)) == 0 {
-               if isYield || HasSideEffect(procedureSpec) {
+               if isYield {
                        v.plan.Roots[logicalNode] = struct{}{}
+               } else if HasSideEffect(procedureSpec) {
+                       // Generate a yield node
+                       name := string(logicalNode.ID())
+                       generatedYield := generateYieldNode(logicalNode, name)
+                       err = v.addYieldName(generatedYield)
+                       if err != nil {
+                               return err
+                       }
+                       v.plan.Roots[generatedYield] = struct{}{}
                } else {
                        // Generate a yield node
-                       generateYieldNode := generateYieldNode(logicalNode)
-                       err = v.addYieldName(generateYieldNode)
+                       generatedYield := generateYieldNode(logicalNode, DefaultYieldName)
+                       err = v.addYieldName(generatedYield)
                        if err != nil {
                                return err
                        }
-                       v.plan.Roots[generateYieldNode] = struct{}{}
-
+                       v.plan.Roots[generatedYield] = struct{}{}
                }
        }

With this patch applied, and running the following flux:

import "array"
import "sql"

read = () => array.from(rows: [{x: 1}]) |> group(columns: ["x"])
write = () =>
    array.from(rows: [{x: 1}])
        |> group(columns: ["x"])
        |> sql.to(dataSourceName: ":memory:", driverName: "sqlite3", table: "none")

read()
write()

The output shows two results:

Result: _result
Table: keys: [x]
                     x:int  
--------------------------  
                         1  
Result: toSQL2
Table: keys: [x]
                     x:int  
--------------------------  
                         1 

The read() call lands in the else branch since it isn't a yield and doesn't have side-effects, earning it the name _result. The write() call gets a result named for the side-effecting node, toSQL2. I'm not clear on why it's toSQL2 and not toSQL1 as I might have expected.

The original handling for the "has side-effects and no successors" in executor.go adds a new result without adding a yield, so this naturally will cause some ripples in the test suite as node counts change and so on.

The block in executor.go I'm trying to reproduce in the new branch here in logical.go also does something about skipYields, and I'm not yet sure how I'd do this over on the other side.

Specifically:

if plan.HasSideEffect(spec) && len(node.Successors()) == 0 {
    name := string(node.ID())
    r := newResult(name)
    v.es.results[name] = r
    v.nodes[skipYields(node)][i].AddTransformation(r) // XXX: how to achieve this?
}

In this case, nodes is a field on the execution state visitor so I guess I'll need to figure out where the plan and the outcome of this visitor are combined.

onelson commented 2 years ago

Noting that Adrian makes mods to the executor branch in https://github.com/influxdata/flux/pull/4536. So far I've been looking at moving this logic up to the planner, but the other option (I plan to look at today) is to move it all down to the executor. When #4536 merges, it might make a stronger case for all this being in the executor.

onelson commented 2 years ago

Refactoring the other way (ie, to move all the logic down into the executor) seems doable, however there's some bookkeeping code somewhere that is no longer being satisfied.

A patch like

diff --git a/execute/executor.go b/execute/executor.go
index eba8b1f5..f8b6bb4a 100644
--- a/execute/executor.go
+++ b/execute/executor.go
@@ -287,11 +287,19 @@ func (v *createExecutionNodeVisitor) Visit(node plan.Node) error {
                }
            }

-           if (plan.HasSideEffect(spec) || isParallelMerge) && len(node.Successors()) == 0 {
-               name := string(node.ID())
-               r := newResult(name)
-               v.es.results[name] = r
-               v.nodes[skipYields(node)][i].AddTransformation(r)
+           if len(node.Successors()) == 0 {
+               _, isYield := spec.(plan.YieldProcedureSpec)
+               if plan.HasSideEffect(spec) || isParallelMerge {
+                   name := string(node.ID())
+                   r := newResult(name)
+                   v.es.results[name] = r
+                   v.nodes[skipYields(node)][i].AddTransformation(r)
+               } else if !isYield {
+                   name := plan.DefaultYieldName
+                   r := newResult(name)
+                   v.es.results[name] = r
+                   v.nodes[skipYields(node)][i].AddTransformation(r)
+               }
            }
        }
    }
diff --git a/plan/logical.go b/plan/logical.go
index 91227343..41237586 100644
--- a/plan/logical.go
+++ b/plan/logical.go
@@ -265,15 +265,6 @@ func (v *fluxSpecVisitor) visitOperation(o *flux.Operation) error {
    if len(v.spec.Children(o.ID)) == 0 {
        if isYield || HasSideEffect(procedureSpec) {
            v.plan.Roots[logicalNode] = struct{}{}
-       } else {
-           // Generate a yield node
-           generateYieldNode := generateYieldNode(logicalNode)
-           err = v.addYieldName(generateYieldNode)
-           if err != nil {
-               return err
-           }
-           v.plan.Roots[generateYieldNode] = struct{}{}
-
        }
    }

diff --git a/plan/physical.go b/plan/physical.go
index 926d8f0a..9cfdb2b0 100644
--- a/plan/physical.go
+++ b/plan/physical.go
@@ -97,6 +97,10 @@ func (pp *physicalPlanner) Plan(ctx context.Context, spec *Spec) (*Spec, error)
    // Update concurrency quota
    if transformedSpec.Resources.ConcurrencyQuota == 0 {
        transformedSpec.Resources.ConcurrencyQuota = len(transformedSpec.Roots)
+       // FIXME(onelson): hack to ensure we have at least `1` when Roots is empty.
+       if transformedSpec.Resources.ConcurrencyQuota == 0 {
+           transformedSpec.Resources.ConcurrencyQuota = 1
+       }

        // If the query concurrency limit is greater than zero,
        // we will use the new behavior that sets the concurrency

seems as though it should behave roughly equivalent to before, however when I run my repro I see: (fixed with the hack noted above).

failed to initialize execute state: invalid plan: plan must have a non-zero concurrency quota

Whatever leads us to this error message causes a slew of failures in the test suite so this is my next area of investigation.

It seems as though the concurrency quota is tied to the number of "roots" on the plan, which would normally be the responsibility of the planner branch I removed in this patch. I've sidestepped the issue for now by clamping the quota to a minimum of 1.

With this in place, the test suite is in sort of a sorry state :cry:

Whereas refactoring towards the planner gave us failures related to the overall shape of the plan (thanks to results being upgraded to yields), this refactor gives panics (nil pointers derefs, etc) in addition to assertion failures around the anticipated names of nodes.

I'll dig into the panics to see if there's something I overlooked. If the panics can be resolved, the two refactors will be closer to an even footing in terms of the required test refactors.

onelson commented 2 years ago

I'm not making much progress looking into the panics. Here's a sample:

--- FAIL: TestSlackPost (0.03s)                                                                                                                                                                                                            
    --- FAIL: TestSlackPost/.... (0.03s)                                                                                                                                                                                                   
panic: runtime error: invalid memory address or nil pointer dereference [recovered]                                                                                                                                                        
        panic: runtime error: invalid memory address or nil pointer dereference                                                                                                                                                            
[signal SIGSEGV: segmentation violation code=0x1 addr=0x20 pc=0x144e532]                                                                                                                                                                   

goroutine 11 [running]:                                                                                                                                                                                                                    
testing.tRunner.func1.2({0x18ba580, 0x38c9190})                                                                                                                                                                                            
        /usr/local/go/src/testing/testing.go:1209 +0x24e
testing.tRunner.func1()
        /usr/local/go/src/testing/testing.go:1212 +0x218
panic({0x18ba580, 0x38c9190})
        /usr/local/go/src/runtime/panic.go:1038 +0x215
github.com/influxdata/flux/stdlib/slack_test.TestSlackPost.func1(0xc0018929c0)
        /home/onelson/Projects/flux/stdlib/slack/slack_test.go:200 +0x4f2
testing.tRunner(0xc0018929c0, 0xc0003fa8c0)
        /usr/local/go/src/testing/testing.go:1259 +0x102
created by testing.(*T).Run
        /usr/local/go/src/testing/testing.go:1306 +0x35a
FAIL    github.com/influxdata/flux/stdlib/slack 0.289s

slack_test.go:200 points to a call to res.Tables().Do(), but it's unclear exactly where the panic is coming from.

Aside from the panics, there are other test failures related to tableFind "not finding a table." Refactoring down to the executor definitely feels like a heavier lift, just to sort through this.

onelson commented 2 years ago

So, it looks like there was an early return for yields way up at the top of this method. I need to take another look top-down to make sure this executor-focused refactor really does approximate the work formerly done in the planner. That'll be my first thing tomorrow.