smol-rs / futures-lite

Futures, streams, and async I/O combinators.
Apache License 2.0
427 stars 25 forks source link

map_ok from TryStreamExt and buffered from StreamExt #80

Closed v1gnesh closed 9 months ago

v1gnesh commented 9 months ago

Hello,

Would it be possible to add the following 2 to futures-lite?

https://docs.rs/futures-util/latest/futures_util/stream/trait.TryStreamExt.html#method.map_ok https://docs.rs/futures-util/latest/futures_util/stream/trait.StreamExt.html#method.buffered

I'm trying to see if I can switch from futures-util and tokio to futures-lite and smol..

EDIT: I have to say though, futures-lite is taking 10-20x longer... maybe because I can't use buffered() here.

notgull commented 9 months ago
v1gnesh commented 9 months ago

@notgull, thanks for the detailed blog post. Have to say, the comments in the code block are almost invisible when trying to read with "night sight" on.

EDIT: I don't see a comment section in the blog post, so asking here if that's ok. "Ok" about the creation of Vec<Task>, however, what to do when I can't loop through the Vec because I have to use while let Some(x) because there's write_all to do within the while block.

Example:

let mut stream = my_stream
  .map(|x: Result<Bytes, _>| create_receiver(x, arg2))
  .buffered(NUM);

// let mut stream: Vec<smol::Task<Receiver<(ty1, ty2)>>> = my_stream
//     .map(|x| { ex.spawn(create_receiver(x, arg2)) })
//     .collect()
//     .await;

while let Some(recvd) = stream.next().await {
  let (val1, val2) = recvd.expect("work");
  file.write_all(val1).await?;
}

async fn create_receiver(bytes: Result<Bytes, _>, arg2: u16) -> Receiver<(ty1, ty2)> {
  let (send, recv) = oneshot::channel();
  rayon::spawn(move || {
   do_stuff(send, &bytes)
  });
  recv
}

fn do_stuff(send: Sender<(ty1, ty2)>) {
  // stuff
  send.send()
}