risingwavelabs / risingwave

SQL stream processing, analytics, and management. We decouple storage and compute to offer efficient joins, instant failover, dynamic scaling, speedy bootstrapping, and concurrent query serving.
https://www.risingwave.com/slack
Apache License 2.0
6.63k stars 544 forks source link

refactor(stream executor): adopt the `ExecutorInner` + `ExecutionVars` fashion in all stream executors #8882

Open stdrc opened 1 year ago

stdrc commented 1 year ago

In HashAggExecutor we began to use a new fashion to write Executor:

pub struct XxxExecutor {
    input: BoxedExecutor,
    inner: ExecutorInner, // anything other than input executors are in `inner`
}

struct ExecutorInner {
    actor_ctx: ActorContextRef,
    info: ExecutorInfo,

    // ...Other executor arguments passed in via `XxxExecutor::new`.
    // These are expected to be immutable except for state tables.
}

struct ExecutionVars {
    xxx_cache: ExecutorCache,
    xxx_builder: ChunkBuilder,
    buffered_xxx: Foo,
    // etc

    // These are mutable variables that are modified at runtime
    // when handling incoming chunks.
}

// just like before
impl Executor for XxxExecutor {
    fn execute(self: Box<Self>) -> BoxedMessageStream {
        self.execute_inner().boxed()
    }

    fn schema(&self) -> &Schema {
        &self.inner.info.schema
    }

    fn pk_indices(&self) -> PkIndicesRef<'_> {
        &self.inner.info.pk_indices
    }

    fn identity(&self) -> &str {
        &self.inner.info.identity
    }
}

impl XxxExecutor {
    pub fn new(args: XxxExecutorArgs) -> StreamResult<Self> {
        Ok(Self {
            input: args.input,
            inner: ExecutorInner {
                // ...
            }
        })
    }

    async fn handle_chunk(
        this: &mut ExecutorInner,
        vars: &mut ExecutionVars,
        chunk: StreamChunk
    ) -> StreamExecutorResult<()> {
        // blahblah
    }

    #[try_stream(ok = Message, error = StreamExecutorError)]
    async fn execute_inner(self) {
        let HashAggExecutor {
            input,
            inner: mut this,
        } = self;

        let mut input = input.execute();
        let barrier = expect_first_barrier(&mut input).await?;
        // ...Initialize epoch for state tables.
        yield Message::Barrier(barrier);

        // Create execution variables according to `this`.
        let mut vars = ExecutionVars {
            // ...
        }

        #[for_await]
        for msg in input {
            match msg? {
                Message::Chunk(chunk) => {
                    Self::handle_chunk(&mut this, &mut vars, chunk).await?;
                }
                // ...
            }
        }
    }
}

I think this new fashion can be adopted by other stream executors as well, to improve code readability and extensibility.

github-actions[bot] commented 1 year ago

This issue has been open for 60 days with no activity. Could you please update the status? Feel free to continue discussion or close as not planned.