SeaQL / sea-streamer

🌊 A real-time stream processing toolkit for Rust
https://www.sea-ql.org/SeaStreamer/
Apache License 2.0
263 stars 9 forks source link

Owned consumer stream #27

Open carlocorradini opened 4 months ago

carlocorradini commented 4 months ago

Motivation

Since the message is behind a lifetime, it is currently not possible to return impl Stream<Item =?> from a function using consumer stream.

The following cannot be compiled (cannot return value referencing temporary value returns a value referencing data owned by the current function):

async fn my_stream(&self, options: SeaConsumerOptions) -> impl Stream<Item = String> {
    self
        .streamer // SeaStreamer
        .create_consumer(&[StreamKey::new("my_key").unwrap()], options).unwrap()
        .stream()
        .filter_map(|message| {
            message.ok().and_then(|message| {
                message
                    .message()
                    .as_str()
                    .ok()
                    .and_then(|message| Some(message.to_string()))
            })
        })
}

Proposed Solutions

I would want something like the two functions that redis provides for pubsub streams: one that consumes the stream with self and returns only Stream without the lifetime, and another employing &mut self that returns Stream + '_. into_stream or something similar.

tyt2y3 commented 4 months ago

Thank you for your suggestion. It is definitely doable with a custom struct owning the object and impl the Stream, similar to how it is done in flume

carlocorradini commented 4 months ago

Any idea or implementation in your mind? 🤗🤯

tyt2y3 commented 4 months ago

I just realized it's very difficult due to self-referencing lifetime.

The following does not compile:

#[derive(Debug)]
pub struct ConsumerStream<'a, C: Consumer + 'static> {
    con: C,
    fut: Option<C::NextFuture<'a>>,
}

/// Common interface of consumers, to be implemented by all backends.
pub trait Consumer: Sized + Send + Sync {
    ...
    fn into_stream<'a>(self) -> ConsumerStream<'a, Self> {
        ConsumerStream {
            con: self,
            fut: None,
        }
    }
}

impl<'a, C: Consumer> Stream for ConsumerStream<'a, C>
where
    Self: 'a,
{
    type Item = StreamResult<C::Message<'a>, C::Error>;

    fn poll_next(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Self::Item>> {
        use std::task::Poll::{Pending, Ready};
        if self.fut.is_none() {
            self.fut = Some(self.con.next());
        }
        match &mut self.fut {
            Some(fut) => match std::pin::Pin::new(fut).poll(cx) {
                Ready(res) => {
                    self.fut = None;
                    Ready(Some(res))
                }
                Pending => Pending,
            },
            None => unreachable!(),
        }
    }
}
carlocorradini commented 4 months ago

I am aware that this is not simple or straightforward, but I believe it is necessary. Many libraries support this, and most patterns require a function to return a stream (async-graphql subscriptions). We can try, and I'm willing to help (though I'm not as experienced as you are) 🤗🥳🤯

tyt2y3 commented 3 months ago

I have had a design that works with the Redis backend, yeah sadly we have to implement per backend and leverage flume's into_stream.

I am not sure if this will be easily doable for the Kafka backend.