aembke / redis-protocol.rs

A Rust implementation of RESP2 and RESP3
Apache License 2.0
39 stars 13 forks source link

cannot decode Hello message #22

Closed stevefan1999-personal closed 6 months ago

stevefan1999-personal commented 2 years ago

Using the streaming codec example:

use bytes::BytesMut;
use redis_protocol::resp3::decode::streaming::decode_mut;
use redis_protocol::resp3::encode::complete::encode_bytes;
use redis_protocol::resp3::types::*;
use redis_protocol::types::{RedisProtocolError, RedisProtocolErrorKind};
use tokio_util::codec::{Decoder, Encoder};

#[derive(Default)]
pub struct RedisCodec {
    decoder_stream: Option<StreamedFrame>,
}

impl Encoder<Frame> for RedisCodec {
    type Error = RedisProtocolError;

    fn encode(&mut self, item: Frame, dst: &mut BytesMut) -> Result<(), Self::Error> {
        // in this example we only show support for encoding complete frames. see the resp3 encoder
        // documentation for examples showing how encode streaming frames
        let _ = encode_bytes(dst, &item)?;
        Ok(())
    }
}

impl Decoder for RedisCodec {
    type Item = Frame;
    type Error = RedisProtocolError;

    // Buffer the results of streamed frame before returning the complete frame to the caller.
    // Callers that want to surface streaming frame chunks up the stack would simply return after calling `decode` here.
    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        if src.is_empty() {
            return Ok(None);
        }

        if let Some((frame, _, _)) = decode_mut(src)? {
            if self.decoder_stream.is_some() && frame.is_streaming() {
                // it doesn't make sense to start a stream while inside another stream
                return Err(RedisProtocolError::new(
                  RedisProtocolErrorKind::DecodeError,
                  "Cannot start a stream while already inside a stream.",
                ));
            }

            let result = if let Some(ref mut streamed_frame) = self.decoder_stream {
                // we started receiving streamed data earlier

                // we already checked for streams within streams above
                let frame = frame.into_complete_frame()?;
                streamed_frame.add_frame(frame);

                if streamed_frame.is_finished() {
                    // convert the inner stream buffer into the final output frame
                    Some(streamed_frame.into_frame()?)
                } else {
                    // buffer the stream in memory until it completes
                    None
                }
            } else {
                // we're processing a complete frame or starting a new streamed frame
                if frame.is_streaming() {
                    // start a new stream, saving the internal buffer to the codec state
                    self.decoder_stream = Some(frame.into_streaming_frame()?);
                    // don't return anything to the caller until the stream finishes (shown above)
                    None
                } else {
                    // we're not in the middle of a stream and we found a complete frame
                    Some(frame.into_complete_frame()?)
                }
            };

            if result.is_some() {
                // we're either done with the stream or we found a complete frame. either way clear the buffer.
                let _ = self.decoder_stream.take();
            }

            Ok(result)
        } else {
            Ok(None)
        }
    }
}

With this processor function:

    async fn process(&self, mut stream: TcpStream) -> Result<()> {
        let mut stream = stream;
        let codec = RedisCodec::default();
        let mut conn = codec.framed(stream);
        loop {
            while let Some(message) = conn.next().await {
                if let Ok(message) = message {
                    tracing::info!(message = ?message, "received");
                } else if let Err(message) = message {
                    tracing::error!(message = ?message, "error received");
                }
            }
        }
        Ok(())
    }

I was not able to decode Hello message:

peer connected: 127.0.0.1:52744
  2022-05-21T03:38:25.378855Z TRACE tokio_util::codec::framed_impl: attempting to decode a frame
    at C:\Users\steve\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-util-0.7.2\src\codec\framed_impl.rs:197 on tokio-runtime-worker ThreadId(2)

  2022-05-21T03:38:25.379105Z TRACE tokio_util::codec::framed_impl: Got an error, going to errored state
    at C:\Users\steve\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-util-0.7.2\src\codec\framed_impl.rs:200 on tokio-runtime-worker ThreadId(2)

  2022-05-21T03:38:25.379334Z ERROR sledis::server: error received, RedisProtocolError { desc: "frame_type: Invalid frame type prefix.", kind: DecodeError }
    at src\server.rs:31 on tokio-runtime-worker ThreadId(2)

  2022-05-21T03:38:25.379489Z TRACE tokio_util::codec::framed_impl: Returning None and setting paused
    at C:\Users\steve\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-util-0.7.2\src\codec\framed_impl.rs:163 on tokio-runtime-worker ThreadId(2)

It turns out the message does not have a tag?

  2022-05-21T03:45:18.455055Z  INFO sledis::server: got data, str: Ok("HELLO 3\r\n")

I have to put it in issue because there is no discussion tab, it is way better to put it there imo

stevefan1999-personal commented 2 years ago

...in Redis Hello is actually a command?

stevefan1999-personal commented 2 years ago

so if I read this correctly, this is actually an inline command, which is left unparsed

stevefan1999-personal commented 2 years ago

I was using this for an implementation of Redis, so I suppose you didn't expect to parse inline commands since it will never be sent from server, right? A workaround is to, well convert the inline command to equivalent RESP array of course, but it will be egregious. Basically what you need to do is split string by space associating each split string with length as prefix and tag it as bulk string, then treat the split strings into RESP arrays

aembke commented 2 years ago

Hey @stevefan1999-personal,

Yeah that's correct, I didn't expect to have to parse HELLO since the server never sends it. However, I did end up implementing a silly parsing function for it.

HELLO doesn't have a byte prefix in RESP, which makes detecting it pretty brittle. The parsing logic is written, but it'll never be called. I'll probably add a feature flag for parsing it that just switches on the leading H and go from there.

For what it's worth, if you're using this directly it's probably worth noting that I was only able to get the encoding to work by testing against a live server since the docs are pretty sparse on this. Based on my testing it looks like client should send the command like this:

HELLO [protocol number] [AUTH username [password]]\r\n

Hope that helps, and I'll try to get a new release with this FF up in the next couple days.

aembke commented 6 months ago

Added in 5.0.0