influxdata / flux

Flux is a lightweight scripting language for querying databases (like InfluxDB) and working with data. It's part of InfluxDB 1.7 and 2.0, but can be run independently of those.
https://influxdata.com
MIT License
767 stars 153 forks source link

Programming Interface for `Source` #131

Open aanthony1243 opened 5 years ago

aanthony1243 commented 5 years ago

Currently the Source Interface only requires two methods:

type Node interface {
    AddTransformation(t Transformation)
}

type Source interface {
    Node
    Run(ctx context.Context)
}

In flux a source function is one that interfaces directly with a data source to create an initial supply of data for a query. In principle, a Source could just stream the entire data source to the flux runtime. However, the scope of many data sources is far too large for this to be a practical outcome. We already have the concept of a PushDown operation that can encode certain filtering and aggregation conditions so that they can be executed by the storage engine, thereby reducing the amount of data that is returned to flux.

The current implementation of pushdowns is cumbersome for a few reasons:

  1. The rule and the execution of the rule are defined in the context of the filter type operation, instead of the source.
  2. Translating the filter logic to something understandable to the storage engine is largely a manual task, with little re-usable code for different filter/source type combinations.

In our development, we have observed a pattern of pushdowns such that it makes sense to add them to the source interface. If they are properly defined and implemented, the planner can be made aware of these options, and use them during its optimization routine.

We can implement each pushdown optimization as its own interface so that the planner can use the Go type matching capability to determine if a source pushdown optimization may be applied.

NOTE: these interfaces must be implemented on the ProcedureSpec type so that the planner can access them.

type FilteredSource interface {
    PushDownFilter(fn *execute.RowPredicateFn) error
}

type RangedSource interface {
    PushDownRange(execute.Bounds) error
}

type AggregatedSource interface {
    PushDownAggregate(kind flux.OperationKind,  Columns []string) error
}

type GroupedSource interface {
    PushDownGroup(Columns []string)
}

type SortedSource interface {
    PushDownSort(Columns string, Desc bool)
}

There are a few items to address/implement to make this interface most useful to the customer:

  1. Should these functions self-modify the Source node, or should they return a modified copy?
  2. Is an error return type sufficient, or are there cases where no error occurs, but the optimization is not applied?
  3. NEED: some helper utility that makes is simpler to users to translate a flux.ColMeta to a field/type for their source.
  4. NEED: some helper utility that can help convert an execute.RowPredicateFn to a valid filter implementation for the Source. (e.g., to easily generate a SQL-compatible WHERE clause as a string)

All of this can also be put into the context of the recently written Fetch/Decode/Execute interface. We can easily duplicate the 3 interfaces above for the sourceIterator type, so that all of the code for retrieving data from a source can be written in a single file.

pauldix commented 5 years ago

I think having all the code for a source in a single file would be great. So having the source be responsible for pushing down logic makes sense. One thing that I think is worth discussing is what happens when part of an operation can be pushed down but another part can't. For example, what if there was a filter where part of the filter could be pushed down, but some of the filter would have to be executed in the runtime. Would it make more sense to have the return be a filter or range or whatever object and an error? Then the source could return nil, nil if it is able to push down the entire thing or it would return some object and nil where the object is what the runtime would have to execute, or an error if there's something else going on.

Other than filter, range and aggregate, what else can be pushed down? Group? Are there situations where some pushdowns might result in multiple round-trip requests to the target source? I thought there were going to be cases like this for InfluxDB.

Overall I like this direction. Another way to think about this that I came to a week or two ago is that in some cases flux will basically act like an ORM. Where the flux query gets translated into a query against the target source.

aanthony1243 commented 5 years ago

updated to include GroupedSource. Also just occurred to me that some engines may happily sort the data before they send it to us (particularly in indexed relations).

Yeah, I didn't surface the ORM concept in my thinking, but that's pretty accurate for what I'm thinking of here. Basically, we are taking some subset of operations from a flux query and translating them to something the source engine can understand. Acknowledging that the concepts of Filter/Aggregate/Group are fairly universal, we should be able to implement some helpers to speed up the transformation.

On the flip side, we want the planner to be totally ignorant of ORM logic. This interface insulates the planner as much as possible by only using flux types as input and output.

I think we'd get a TON of mileage if we merely implemented a ToSQLClause method in the RowPredicateFn type that returns a valid WHERE expression.

I like the idea that we can 'split' an expression if there are some operations that are not compatible.

wolffcm commented 5 years ago

@aanthony1243 I like this proposal.

One thing to note is that Source is defined in the execute package which already imports plan. So the plan package (as it is now and also in the new planner) isn't aware of sources or transformations themselves, but rather their corresponding ProcedureSpec instances. So the interfaces that determine what can be pushed down would need to be part of procedures.

My hope is that the planner modifies the plan only by invoking rewrite rules, but one can imagine making rules that match the pattern filterable_source->filter that invoke the PushDownFilter method, and so on.

nathanielc commented 5 years ago

My hope is that the planner modifies the plan only by invoking rewrite rules, but one can imagine making rules that match the pattern filterable_source->filter that invoke the PushDownFilter method, and so on.

This is my hope as well. I want there to be a single way the planner rewrites the plans, which means it needs to be generic enough to handle all the edge cases discussed above about whether a partial rewrite happens etc. So in my mind these interfaces are not something the planner would understand directly but rather a builtin rewrite rule that knows how to do the happy path for these kinds of sources. Does that make sense?

Overall I like the design and intent of making it easy for new sources to be added and interact with the planner.

aanthony1243 commented 5 years ago

we had a call yesterday about this. My thinking is that these re-write opportunities are so universal to most sources, that we can use a default re-write rule for any arbitrary source that tries any of the above pushdowns. This way, a source implementor does not need to consider the planner at all.

It actually occurred to me this morning that these operations could apply to some transformations as well. e.g., a transformation that outputs data in a sort-order may use ascending by default, but can switch to descending. Also, filtering could be applied in-line to something like a Join or a Pivot.

wolffcm commented 5 years ago

I think it would be best to have 5 built-in rules for each of the 5 kinds of things we can push down into sources. The logic for any one rule should be pretty simple that way.

As far as inlining operations, this is a great idea. It could decrease memory usage and latency dramatically. However, it would need to be designed carefully such that we avoid an ad hoc approach where we have NxM snippets of code to inline N operations (filter, map) into M other operations (join, sort, pivot). One could imagine having procedures implement a InlineableProcedure or StreamableProcedure interface or something like that, and rules that do the inlining.

aanthony1243 commented 5 years ago

I think in general, these 5 types of operations may be rewritten into a variety of operations, so that we can have these 5 built-in rules that may be applied in a combinatorially large variety of circumstances. This fits really well into a search-based planner like volcano/cascade since there may be a variety of options for rewriting, for example, a filter operation to be a part of some other operation.

wolffcm commented 5 years ago

I totally agree with this.

I've been thinking that the DAG-ness of our plans also broadens the search space, since it might sometimes be beneficial to duplicate operations if we can get better optimization.

t = from(bucket: "telegraf") |> range(start: -5m) |> filter(fn: ...)
result1 = filter(table: t, fn: ...) |> yield(name: "result1")
result2 = filter(table: t, fn: ...) |> yield(name: "result2")

The filter assigned to t has two successors: result1's filter and result2's filter. This makes it hard to merge any filters with a straightforward rule. However, if we duplicated t's filter so the graph looks like this

                t-filter --> result1-filter --> yield
              / 
from --> range
              \
                t-filter --> result2-filter --> yield

Then we could apply rules that merge filters. This scenario could apply in lots of situations.

wolffcm commented 5 years ago

Suppose we have a query like this one,

thirdParty.from(...) 
|> filter(...) 
|> group(...) |> max() 
|> filter(...) // filter the result of the aggregate operations e.g., _value > 100

Can this approach be smart enough to know if grouping and aggregation has already been pushed down, and so the filter must be evaluated against the output of max (and not anded it with the first filter)? Or if the source can't filter the output of aggregation, how will we know to not push it down?

We can require the source implementor to handle this logic, but we know from experience that it's easy to introduce bugs with respect to push downs.

I describe an approach to handling some of these issues with InfluxDB sources here: https://github.com/influxdata/platform/issues/76#issuecomment-446746000

Perhaps under the hood of the interface presented above we can keep track of what's already been pushed down and be more conservative with regard to issues like these.