a1akris / page-turner

A generic abstraction of paginated APIs
Apache License 2.0
59 stars 1 forks source link

Unpin streams #7

Open StarlessNights opened 2 weeks ago

StarlessNights commented 2 weeks ago

Currently the returned streams don't implement Unpin.

This limits their usability, as many methods in StreamExt and TryStreamExt traits have Self: Unpin bounds.

next() is probably the most crucial of these. Idiomatic code like this won't compile:

use futures::StreamExt;

// ...

let item_stream = page_client.into_pages().items();

while let Some(item) = item_stream.next().await {
  println!("Got item: {item:?}");
}
a1akris commented 1 week ago

The docs don't mention it but you should use std::pin::pin! macro for this use case:

let mut items_stream = std::pin::pin!(page_client.into_pages(req).items());

Well, at least this is mentioned in the compiler error message but the last time I saw it, it wasn't that scary :) :

error[E0277]: `impl std::marker::Send + futures::Stream<Item = std::result::Result<<std::vec::Vec<BlogRecord> as std::iter::IntoIterator>::Item, std::string::String>> + '
_` cannot be unpinned
  --> src/bin/check.rs:74:51
   |
74 |         while let Some(item) = item_stream.next().await {
   |                                                  -^^^^^
   |                                                  ||
   |                                                  |the trait `std::marker::Unpin` is not implemented for `impl std::marker::Send + futures::Stream<Item = std::result:
:Result<<std::vec::Vec<BlogRecord> as std::iter::IntoIterator>::Item, std::string::String>> + '_`, which is required by `futures::stream::Next<'_, impl std::marker::Send
+ futures::Stream<Item = std::result::Result<<std::vec::Vec<BlogRecord> as std::iter::IntoIterator>::Item, std::string::String>> + '_>: std::future::IntoFuture`
   |                                                  help: remove the `.await`
   |
   = note: consider using the `pin!` macro <!------!--------!-------!-----!------
           consider using `Box::pin` if you need to access the pinned value outside of the current scope

Previously, streams were Unpin by default and we had issues calling pages(), into_pages() on some fancy client types so Unpin was removed in favor of manual stream pinning.

a1akris commented 1 week ago

Here is a complete working example:

use futures::StreamExt;
use page_turner::prelude::*;

pub struct BlogClient {
    content: Vec<Result<BlogRecord, String>>,
}

impl BlogClient {
    pub fn new(amount: usize) -> Self {
        Self {
            content: (0..amount).map(BlogRecord).map(Ok).collect(),
        }
    }

    pub async fn get_content(&self, req: GetContentRequest) -> Result<GetContentResponse, String> {
        let record = self
            .content
            .get(req.page)
            .ok_or("The page is out of bound")?
            .clone()?;

        let next_page = (req.page + 1 < self.content.len()).then_some(req.page + 1);
        Ok(GetContentResponse { record, next_page })
    }

    pub fn set_error(&mut self, pos: usize) {
        self.set_error_with_msg(pos, "Custom error");
    }

    pub fn set_error_with_msg(&mut self, pos: usize, msg: &'static str) {
        self.content[pos] = Err(msg.into())
    }
}

impl PageTurner<GetContentRequest> for BlogClient {
    type PageItems = Vec<BlogRecord>;
    type PageError = String;

    async fn turn_page(&self, req: GetContentRequest) -> TurnedPageResult<Self, GetContentRequest> {
        let response = self.get_content(req).await?;

        match response.next_page {
            Some(page) => Ok(TurnedPage::next(
                vec![response.record],
                GetContentRequest { page },
            )),
            None => Ok(TurnedPage::last(vec![response.record])),
        }
    }
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct BlogRecord(pub usize);

#[derive(Debug, Clone)]
pub struct GetContentRequest {
    pub page: usize,
}

pub struct GetContentResponse {
    pub record: BlogRecord,
    pub next_page: Option<usize>,
}

fn main() {
    let page_client = BlogClient::new(100);

    let fut = async {
        let mut item_stream = std::pin::pin!(page_client
            .into_pages(GetContentRequest { page: 0 })
            .items());

        while let Some(item) = item_stream.next().await {
            println!("Got item: {item:?}");
        }
    };

    futures::executor::block_on(fut);
}

This also works:

fn main() {
    let page_client = BlogClient::new(100);

    let mut item_stream = std::pin::pin!(page_client
        .into_pages(GetContentRequest { page: 0 })
        .items());

    for item in futures::executor::block_on_stream(item_stream) {
        println!("Got item: {item:?}");
    }
}
StarlessNights commented 6 days ago

Thanks for the tips!

Can you provide some background for this? Is it just not possible/practical to make the returned type Unpin, or is it left out for generality?

a1akris commented 6 days ago

It's been 3 years so I may be wrong, but as I remember, some client types made internal streams !Unpin, which triggered the same !Unpin error with no way to fix it except by changing the client type. On the other hand, you could just std::pin::pin! the stream with the !Unpin client, and everything just worked, so Unpin was removed.

Of course, I will try to reproduce the exact issue and come up with a correct explanation while working on #8.

a1akris commented 6 days ago

Also, this is not a big deal because when you use stream combinators you don't need to use std::pin::pin!. This generally works:

let results = client.pages(req).items().try_filter(interesting).try_collect().await?;

Only a few methods like next and try_next require Unpin in the futures lib.

StarlessNights commented 4 days ago

Yeah this is the approach I've taken.

So the reason likely boils down to "It could be Unpin, but then it wouldn't work on !Unpin clients"?

And I'm guessing that the type system doesn't allow us to "passthrough" unpinnedness. I.e. return Unpin if the client is Unpin.

Yeah a doc example of pinning the stream should be enough 👍