michel-kraemer / actson-rs

🎬 A reactive (or non-blocking, or asynchronous) JSON parser
MIT License
31 stars 3 forks source link

AsyncStream is closed with streaming mode active #37

Open sandercm opened 1 month ago

sandercm commented 1 month ago

Hi I noticed that the parser would stop parsing more json thinking it reached the end of the file.

My code is using the example

        let feeder = AsyncBufReaderJsonFeeder::new(BufReader::new(reader));
        let parser = JsonParser::new_with_options(
            feeder,
            JsonParserOptionsBuilder::default()
                .with_streaming(true) // this enables our weird json stream usecase
                .build(),
        );

My use case is listening on a UDS for json events to come in, but currently it seems to shutdown the parser after receiving a full message. I suspect because the BufReader is empty waiting for more data. I fixed the issue by adding the following && self.streaming == false in the parser.rs file.

I don't know if this is the correct approach to fixing this issue?

in parser.rs : 339 if self.feeder.is_done() -> if self.feeder.is_done() && self.streaming == false

michel-kraemer commented 1 month ago

The problem is not the parser but the feeder. AsyncBufReaderJsonFeeder#isDone() returns true as soon as the underlying BufReader has reached the end of file, which is usually indicated by returning an empty buffer. I guess in your case, your stream can return an empty buffer even though more data will follow.

The easiest way to deal with this is to implement a custom JsonFeeder whose isDone() method never returns true. You could even write a small wrapper like so:

pub struct StreamingFeeder<T>
where
    T: AsyncRead + Unpin,
{
    delegate: AsyncBufReaderJsonFeeder<T>,
}

impl<T> StreamingFeeder<T>
where
    T: AsyncRead + Unpin,
{
    pub fn new(delegate: AsyncBufReaderJsonFeeder<T>) -> Self {
        StreamingFeeder { delegate }
    }

    pub async fn fill_buf(&mut self) -> Result<(), FillError> {
        self.delegate.fill_buf().await
    }
}

impl<T> JsonFeeder for StreamingFeeder<T>
where
    T: AsyncRead + Unpin,
{
    fn has_input(&self) -> bool {
        self.delegate.has_input()
    }

    fn is_done(&self) -> bool {
        // THIS FEEDER IS NEVER DONE!
        false
    }

    fn next_input(&mut self) -> Option<u8> {
        self.delegate.next_input()
    }
}
sandercm commented 3 weeks ago

The problem is not the parser but the feeder. AsyncBufReaderJsonFeeder#isDone() returns true as soon as the underlying BufReader has reached the end of file, which is usually indicated by returning an empty buffer. I guess in your case, your stream can return an empty buffer even though more data will follow.

The easiest way to deal with this is to implement a custom JsonFeeder whose isDone() method never returns true. You could even write a small wrapper like so:

pub struct StreamingFeeder<T>
where
    T: AsyncRead + Unpin,
{
    delegate: AsyncBufReaderJsonFeeder<T>,
}

impl<T> StreamingFeeder<T>
where
    T: AsyncRead + Unpin,
{
    pub fn new(delegate: AsyncBufReaderJsonFeeder<T>) -> Self {
        StreamingFeeder { delegate }
    }

    pub async fn fill_buf(&mut self) -> Result<(), FillError> {
        self.delegate.fill_buf().await
    }
}

impl<T> JsonFeeder for StreamingFeeder<T>
where
    T: AsyncRead + Unpin,
{
    fn has_input(&self) -> bool {
        self.delegate.has_input()
    }

    fn is_done(&self) -> bool {
        // THIS FEEDER IS NEVER DONE!
        false
    }

    fn next_input(&mut self) -> Option<u8> {
        self.delegate.next_input()
    }
}

ill try this out thanks