dmitryvk / async-fn-stream

Lightweight implementation of `async-stream` without macros
MIT License
9 stars 1 forks source link

Adding support emit error in try_fn_stream #2

Closed negezor closed 6 months ago

negezor commented 11 months ago

Hi! Could we add emitter.emit_err(error) for try_fn_stream? I want to transfer control over error handling to the consumer, and they, in turn, decide whether to break the loop or not. Right now the only option is to return an error from the handler, which in turn prevents it from continuing to consume the thread in the event of an error. However, if I wanted to throw an error now, I could solve it this way

impl Stream<Item = Result<Result<ListResponse, Error>, Error>>

But this is cumbersome and duplicates the logic of the current result.

dmitryvk commented 10 months ago

Hi,

Sorry for the late response and thanks for submitting the issue.

Could you describe what are you trying to achieve?

As far as I can see, you can use fn_stream and produce a stream of impl Stream<Item = Result<ListResponse, Error>> like so:

use async_fn_stream::fn_stream;
use futures::{pin_mut, Stream, StreamExt};

#[tokio::main]
async fn main() {
    let stream = get_stream();
    pin_mut!(stream);
    while let Some(res) = stream.next().await {
        println!("{res:?}");
    }
}

fn get_stream() -> impl Stream<Item = Result<i32, MyError>> {
    fn_stream(|emitter| async move {
        emitter.emit(Err(MyError {})).await;
        emitter.emit(Ok(0)).await;
    })
}

#[derive(Debug)]
struct MyError {}
negezor commented 6 months ago

That's a really long answer already from me 😄. Indeed I can use fn_stream, but I expected about the same behavior from try_fn_stream. The inconvenience is that I can't use ? operator when using fn_stream.

Let me show you with your example, I would like to use the following:

use async_fn_stream::try_fn_stream;
use futures_util::{pin_mut, Stream, StreamExt};

#[tokio::main]
async fn main() {
    let stream = get_stream();
    pin_mut!(stream);
    while let Some(res) = stream.next().await {
        println!("{res:?}");
    }
}

fn get_payload() -> Result<i32, MyError> {
    Err(MyError {})
}

fn get_stream() -> impl Stream<Item = Result<i32, MyError>> {
    try_fn_stream(|emitter| async move {
        // End the stream immediately
        let payload = get_payload()?;

        // Handle
        emitter.emit(Err(MyError {})).await;
        emitter.emit(Ok(0)).await;

        Ok(())
    })
}

#[derive(Debug)]
struct MyError {}

But I get the following when building it:

error[E0271]: type mismatch resolving `<TryFnStream<Result<{integer}, MyError>, _, {async block@src/bin/test.rs:18:29: 27:6}> as Stream>::Item == Result<i32, MyError>`
  --> src/bin/test.rs:17:20
   |
17 | fn get_stream() -> impl Stream<Item = Result<i32, MyError>> {
   |                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected `Result<Result<..., ...>, ...>`, found `Result<i32, MyError>`
   |
   = note: expected enum `Result<Result<{integer}, MyError>, _>`
              found enum `Result<i32, MyError>`

For more information about this error, try `rustc --explain E0271`.
error: could not compile `async-fn-stream` (bin "test") due to 1 previous error

However, I think try_fn_stream would be easier to use if there was something like this:

fn get_stream() -> impl Stream<Item = Result<i32, MyError>> {
    try_fn_stream(|emitter| async move {
        // End the stream immediately
        let payload = get_payload()?;

        // Handle
        emitter.emit(0).await;
        emitter.emit_err(MyError {}).await;

        Ok(())
    })
}

We'd keep the same function signature and give more control when we don't necessarily want to get out of the stream on an error

dmitryvk commented 6 months ago

OK, now I see what you mean :)

I like the emit_err method - it seems to be backwards-compatible and concise.

I'll try to come up with the implementation, or feel free to make a pull request.