influxdata / flux

Flux is a lightweight scripting language for querying databases (like InfluxDB) and working with data. It's part of InfluxDB 1.7 and 2.0, but can be run independently of those.
https://influxdata.com
MIT License
767 stars 153 forks source link

Create a `vector_repeat` operator in semantic graph #4031

Closed wolffcm closed 1 year ago

wolffcm commented 3 years ago

In order to facilitate a vectorization transformation, we should add a new kind of operator expression to our semantic graph representation of Flux.

The idea is that vector_repeat can convert a scalar expression (like a literal) into a vector where each element is the same. For example the type of 3 is int, and the type of vector_repeat(3) is vector(int).

See here for a discussion of vector_repeat (formerly called expand) and why it helps vectorization.

The new kind of expression can be added here: https://github.com/influxdata/flux/blob/b899d8bd6a35797d52964c71a21d192ceda40611/libflux/flux-core/src/semantic/nodes.rs#L110-L133

It could be added as a special kind of Unary operator, but it might be better to give vector_repeat it its own variant of Expression in order to keep it separate from user-facing unary operators like + and -. It also will have different type rules than the arithmetic unary operators.

DOD:

Marwes commented 2 years ago

Here are some functions that are a bit "weird" (I can't say for 100% I know what the expected behaviour is). But I am thinking that we essentially "taint" the record taken as argument and the record being returned such that their fields are now vectors of whatever type they contained before int => vector(int).

(r) => {
    x = 1
    return {r with a: r.a, x: x  }
}
// Should be { a: vector(A) | R }  => { a: vector(A), x: vector(int) | R }
// right? So the binding `x` seems to be a binding of `vector(int)`

// However that seems to imply that this is
// { a: vector(A) | R }  => { a: vector(A), x: vector({ y: int }), y: vector({ z: vector(int}) | R }
// at least according to type inference if force the fields to be `vector`
// And that can't be the semantics that we want as we can't (and don't want to) nest vectors
(r) => {
    x = 1
    return {r with a: r.a, x: x, y: { z: x }  }
} 

For this issue we do not allow variable bindings though so I guess that example doesn't apply (yet) but it can also be shown in the following example

(r) => ({r with a: r.a, x: { z: r.a }})
// { a: vector(A) | R }  => { a: vector(A), x: vector({ z: vector(A) }) | R }
// Which has the issue of nested vectors again
// 
// { a: vector(A) | R }  => { a: vector(A), x: vector({ z: A }) | R }
// Would make sense however which is what I assume that we want, however that is not what type inference would give us

Which is a bit of along winded way of saying that I don't think this touches type inference.

As I understand this, it is solely a matter of which expressions can the interpreter evaluate if it is given vector values instead of scalars and telling the interpreter that it is executing on vectors as that changes what the interpreter needs to do for some operations like record construction (if you construct a record in "vector mode" then you just must go through each vector value build up a vector of records Construct({ x: [1, 2], y: ["a", "b"] }) => [{ x: 1, y: "a" }, { x: 2, y: "b" }]).

If we tell the evaluators to run in vector mode we would just implement get the vector_repeat behavior for any evaluators that needs it.

Marwes commented 2 years ago

I tossed together a tiny evaluator to try and figure out where the problems lie with vectorization if it helps

use std::{
    collections::HashMap,
    ops::{Add, Sub},
};

#[derive(Clone, Debug)]
enum Expr {
    Literal(Value),
    Identifier(String),
    Op(Box<Expr>, Op, Box<Expr>),
    Record(HashMap<String, Expr>),
}

#[derive(Clone, Debug)]
enum Op {
    Add,
    Subtract,
}

#[derive(Clone, Debug, PartialEq)]
enum Value {
    Int(i64),
    Float(f64),
    Record(HashMap<String, Value>),
}

impl Add for Value {
    type Output = anyhow::Result<Self>;
    fn add(self, other: Self) -> Self::Output {
        Ok(match (self, other) {
            (Value::Int(l), Value::Int(r)) => Value::Int(l + r),
            (Value::Float(l), Value::Float(r)) => Value::Float(l + r),
            _ => return Err(anyhow::anyhow!("Typemismatch")),
        })
    }
}

impl Sub for Value {
    type Output = anyhow::Result<Self>;
    fn sub(self, other: Self) -> Self::Output {
        Ok(match (self, other) {
            (Value::Int(l), Value::Int(r)) => Value::Int(l - r),
            (Value::Float(l), Value::Float(r)) => Value::Float(l - r),
            _ => return Err(anyhow::anyhow!("Typemismatch")),
        })
    }
}

trait VectorValue:
    Add<Output = anyhow::Result<Self>>
    + Sub<Output = anyhow::Result<Self>>
    + Sized
    + Clone
    + std::iter::FromIterator<(String, Self)>
{
    type Context;
    fn from_value(v: Value, context: &Self::Context) -> Self;
}

impl VectorValue for Value {
    type Context = ();
    fn from_value(v: Value, _: &Self::Context) -> Self {
        v
    }
}

impl std::iter::FromIterator<(String, Self)> for Value {
    fn from_iter<I>(iter: I) -> Self
    where
        I: IntoIterator<Item = (String, Self)>,
    {
        Value::Record(iter.into_iter().collect())
    }
}

#[derive(Clone, Debug, PartialEq)]
enum Vector {
    Int(Vec<i64>),
    Float(Vec<f64>),
    Record(Vec<HashMap<String, Value>>),
}

impl VectorValue for Vector {
    // Need to know the length to construct a vector (alternative we could have singleton variants
    // in `Vector` which act as a vector of whatever the input length is)
    type Context = usize;
    fn from_value(v: Value, len: &Self::Context) -> Self {
        match v {
            Value::Int(i) => Self::Int(vec![i; *len]),
            Value::Float(i) => Self::Float(vec![i; *len]),
            Value::Record(r) => Self::Record(vec![r; *len]),
        }
    }
}

impl std::iter::FromIterator<(String, Self)> for Vector {
    fn from_iter<I>(iter: I) -> Self
    where
        I: IntoIterator<Item = (String, Self)>,
    {
        let mut record_vector = Vec::new();
        for (key, vector) in iter {
            match vector {
                Self::Int(is) => {
                    record_vector.resize(is.len(), HashMap::new());
                    for (record, i) in record_vector.iter_mut().zip(is) {
                        record.insert(key.clone(), Value::Int(i));
                    }
                }
                Self::Float(fs) => {
                    record_vector.resize(fs.len(), HashMap::new());
                    for (record, f) in record_vector.iter_mut().zip(fs) {
                        record.insert(key.clone(), Value::Float(f));
                    }
                }
                Self::Record(rs) => {
                    record_vector.resize(rs.len(), HashMap::new());
                    for (record, r) in record_vector.iter_mut().zip(rs) {
                        record.insert(key.clone(), Value::Record(r));
                    }
                }
            }
        }
        Vector::Record(record_vector)
    }
}

impl Add for Vector {
    type Output = anyhow::Result<Self>;
    fn add(self, other: Self) -> Self::Output {
        Ok(match (self, other) {
            (Self::Int(l), Self::Int(r)) => {
                Self::Int(l.into_iter().zip(r).map(|(l, r)| l + r).collect())
            }
            (Self::Float(l), Self::Float(r)) => {
                Self::Float(l.into_iter().zip(r).map(|(l, r)| l + r).collect())
            }
            _ => return Err(anyhow::anyhow!("Typemismatch")),
        })
    }
}

impl Sub for Vector {
    type Output = anyhow::Result<Self>;
    fn sub(self, other: Self) -> Self::Output {
        Ok(match (self, other) {
            (Self::Int(l), Self::Int(r)) => {
                Self::Int(l.into_iter().zip(r).map(|(l, r)| l - r).collect())
            }
            (Self::Float(l), Self::Float(r)) => {
                Self::Float(l.into_iter().zip(r).map(|(l, r)| l - r).collect())
            }
            _ => return Err(anyhow::anyhow!("Typemismatch")),
        })
    }
}

impl Expr {
    fn int(i: i64) -> Expr {
        Expr::Literal(Value::Int(i))
    }

    fn float(f: f64) -> Expr {
        Expr::Literal(Value::Float(f))
    }

    fn identifier(i: &str) -> Expr {
        Expr::Identifier(i.to_owned())
    }

    fn op(l: impl Into<Box<Expr>>, op: Op, r: impl Into<Box<Expr>>) -> Expr {
        Expr::Op(l.into(), op, r.into())
    }

    fn eval<V>(&self, env: &mut HashMap<String, V>, context: &V::Context) -> anyhow::Result<V>
    where
        V: VectorValue,
    {
        Ok(match self {
            Expr::Literal(v) => V::from_value(v.clone(), context),
            Expr::Identifier(i) => env
                .get(i)
                .ok_or_else(|| anyhow::anyhow!("Missing `{}`", i))?
                .clone(),
            Expr::Op(l, op, r) => match op {
                Op::Add => (l.eval(env, context)? + r.eval(env, context)?)?,
                Op::Subtract => (l.eval(env, context)? - r.eval(env, context)?)?,
            },
            Expr::Record(ctor) => ctor
                .iter()
                .map(|(key, expr)| Ok((key.clone(), expr.eval(env, context)?)))
                .collect::<anyhow::Result<V>>()?,
        })
    }
}

/// Test
pub fn test() {
    macro_rules! collect {
        ($($expr: expr),* $(,)?) => {
            std::array::IntoIter::new([$($expr),*]).collect()
        }
    }

    let int_expr = Expr::op(Expr::int(1), Op::Add, Expr::identifier("x"));
    let float_expr = Expr::op(Expr::float(1.), Op::Subtract, Expr::identifier("x"));
    let record_expr = Expr::Record(collect![
        ("a".into(), Expr::float(1.)),
        ("b".into(), int_expr.clone())
    ]);

    assert_eq!(
        int_expr
            .eval(&mut collect![("x".into(), Value::Int(2))], &())
            .unwrap(),
        Value::Int(3)
    );

    assert_eq!(
        float_expr
            .eval(
                &mut vec![("x".into(), Value::Float(3.))].into_iter().collect(),
                &()
            )
            .unwrap(),
        Value::Float(1. - 3.)
    );

    assert_eq!(
        int_expr
            .eval(
                &mut vec![("x".into(), Vector::Int(vec![2, 3]))]
                    .into_iter()
                    .collect(),
                &2
            )
            .unwrap(),
        Vector::Int(vec![3, 4])
    );

    assert_eq!(
        float_expr
            .eval(
                &mut vec![("x".into(), Vector::Float(vec![3., 4., 5.]))]
                    .into_iter()
                    .collect(),
                &3
            )
            .unwrap(),
        Vector::Float(vec![1. - 3., 1. - 4., 1. - 5.])
    );

    assert_eq!(
        record_expr
            .eval(
                &mut vec![("x".into(), Vector::Int(vec![2, 3]))]
                    .into_iter()
                    .collect(),
                &2
            )
            .unwrap(),
        Vector::Record(collect![
            collect![("a".into(), Value::Float(1.)), ("b".into(), Value::Int(3))],
            collect![("a".into(), Value::Float(1.)), ("b".into(), Value::Int(4))]
        ])
    );
}
Marwes commented 2 years ago

Might already have found some issues with my idea, though I might also have a path towards a more holistic approach to scalar/vector values, hopefully something written up tomorrow.

Marwes commented 2 years ago

So the issue with toy interpreter above is that it assumes that we either have scalars or we have vectors, which isn't actually true.

Consider (r) => ({r with a: r.a, b: r.b, c: r.a + r.b}). r is also a value that can be referenced and it is a scalar value (specifically r: { a: vector(Int), b: vector(Int) | R }). This becomes complicated as how we want to evaluate r changes depending on context.

(r) => {
    a: r.a, // r should return the scalar record value { a: vector(int) } 
    r: r // r should unwrap itself { a: vector(int) } => vector({a : int })
}

Technically we could always do the second case, however it would defeat the purpose of vectorization to do it when it isn't necessary.

I believe what we need is to analyze whether an expression (or evaluator) is evaluated in a context that requires a scalar or a vector value. The default is a scalar value, however any value that is returned as a field of the returned record needs to be a vector value.

(r) => {
    a: r.a, // r.a is evaluated in a vector context
    // r is evaluated in a scalar context as field projection wants to work with scalars

    r: r // r is evaluate in a vector context but it is a scalar so we need a scalar => vector transformation inserted

    b: r.a * 2 // r.a * 2 must be a vector so `r.a` and `2` must be vectors
}

This "analysis" might be done before hand where we insert nodes which handle the translation between scalars and vectors though I think it might be possible to be done just inside the evaluators.

wolffcm commented 2 years ago

After some discussion we've revised our approach to vectorization. I tried to capture the results of our discussion here, in a comment on the epic: https://github.com/influxdata/flux/issues/4028#issue-993584462

github-actions[bot] commented 1 year ago

This issue has had no recent activity and will be closed soon.