sourcegraph / jsonrpc2

Package jsonrpc2 provides a client and server implementation of JSON-RPC 2.0 (http://www.jsonrpc.org/specification)
MIT License
195 stars 62 forks source link

PlainObjectCodec rejects additional messages #57

Closed mnowotnik closed 2 years ago

mnowotnik commented 2 years ago

PlainObjectCodec creates a json codec on demand, reads a stream and then returns a single object. For this reason, I think, additional calls to ReadObject may return empty results even though multiple objects were sent.

// WriteObject implements ObjectCodec.
func (PlainObjectCodec) WriteObject(stream io.Writer, v interface{}) error {
    return json.NewEncoder(stream).Encode(v)
}

// ReadObject implements ObjectCodec.
func (PlainObjectCodec) ReadObject(stream *bufio.Reader, v interface{}) error {
    return json.NewDecoder(stream).Decode(v)
}

To fix this, an object stream needs to store an encoder and a decoder:

type PlainObjectStream struct {
    conn    net.Conn
    decoder *json.Decoder
    encoder *json.Encoder
    r       *bufio.Reader
    w       *bufio.Writer
    mu      sync.Mutex
}

func NewPlainObjectStream(conn net.Conn) ObjectStream {
    os := &PlainObjectStream{conn: conn, r: bufio.NewReader(conn), w: bufio.NewWriter(conn)}
    os.encoder = json.NewEncoder(os.w)
    os.decoder = json.NewDecoder(os.r)
    return os
}

func (os *PlainObjectStream) ReadObject(v interface{}) error {
    return os.decoder.Decode(v)
}

func (os *PlainObjectStream) WriteObject(v interface{}) error {
    os.mu.Lock()
    defer os.mu.Unlock()
    err := os.encoder.Encode(v)
    if err != nil {
        return err
    }
    os.w.Flush()
    return nil
}

func (os *PlainObjectStream) Close() error {
    return os.conn.Close()
}

In jsonrpc2_test.go the TestClientServer test doesn't pass with PlainObjectCodec, but passes with this implementation of stream, PlainObjectStream.

The issue doesn't occur for other codes since, in my opinion, they are instantiated with a stream wrapped in io.LimitReader that protects the contents of the stream from being read by the decoder.

keegancsmith commented 2 years ago

Ouch nice find. Just to confirm my understanding, this is because NewDecoder in PlainObjectCodec.ReadObject can read more than just that object (eg due to buffering)?

cc @samherrmann who introduced PlainObjectStream in https://github.com/sourcegraph/jsonrpc2/pull/45. Have you run into this in your use of this?

mnowotnik commented 2 years ago

@keegancsmith Actually PlainObjectStream is my example of solving this issue, not included in jsonrpc2.

Just to confirm my understanding, this is because NewDecoder in PlainObjectCodec.ReadObject can read more than just that object (eg due to buffering)?

Yes, that's exactly my suspicion. I believe decoder "consumes" additional json objects, but is then discarded after reading only one.

samherrmann commented 2 years ago

That is a nice find! I just did some digging in the application where we've used jsonrpc2. It turns out that we haven't experienced this issue because we're using unixpacket (SOCK_SEQPACKET) for the underlaying connection. If you modify TestClientServer in jsonrpc2_test.go from tcp to unixpacket, then the existing PlainObjectCodec implementation passes successfully. It's not actually correct to use json.Decoder (used by PlainObjectCodec) with packet type connections, so it's interesting that our improper use shielded us from this issue.

samherrmann commented 2 years ago

@mnowotnik, are you planning on making a pull request? If not then let me know, I'd be happy to do it.

mnowotnik commented 2 years ago

@samherrmann I wanted to add it, but I don't see the way to do that cleanly without backwards incompatible changes. Backwards compatbility can be preserved in a bit hackish way:

func NewBufferedStream(conn io.ReadWriteCloser, codec ObjectCodec) ObjectStream {
    switch codec.(type) {
    case PlainObjectCodec:
        return NewPlainObjectStream{conn}
    }
    return &bufferedObjectStream{
        conn:  conn,
        w:     bufio.NewWriter(conn),
        r:     bufio.NewReader(conn),
        codec: codec,
    }
}

That's why I created this issue to discuss it.

samherrmann commented 2 years ago

That makes sense. An alternative backwards compatible implementation I can think of is this:

type PlainObjectCodec struct {
    decoder *json.Decoder
    encoder *json.Encoder
}

func (c *PlainObjectCodec) WriteObject(stream io.Writer, v interface{}) error {
    if c.encoder == nil {
        c.encoder = json.NewEncoder(stream)
    }
    return c.encoder.Encode(v)
}

func (c *PlainObjectCodec) ReadObject(stream *bufio.Reader, v interface{}) error {
    if c.decoder == nil {
        c.decoder = json.NewDecoder(stream)
    }
    return c.decoder.Decode(v)
}

Then use it with the existing NewBufferedStream like before:

jsonrpc2.NewBufferedStream(conn, &jsonrpc2.PlainObjectCodec{})

Of course this is not ideal either, but maybe it's acceptable as long as WriteObject and ReadObject methods document that they only use stream given on the first call?

mnowotnik commented 2 years ago

Unfortunately, this solution breaks tests that assume that WriteObject and ReadObject have different signatures (instance instead of pointer) and is not threadsafe.

I changed it a bit:

func NewBufferedStream(conn io.ReadWriteCloser, codec ObjectCodec) ObjectStream {
    switch v := codec.(type) {
    case PlainObjectCodec:
        v.decoder = json.NewDecoder(conn)
        v.encoder = json.NewEncoder(conn)
        codec = v
    }
    return &bufferedObjectStream{
        conn:  conn,
        w:     bufio.NewWriter(conn),
        r:     bufio.NewReader(conn),
        codec: codec,
    }
}

// PlainObjectCodec reads/writes plain JSON-RPC 2.0 objects without a header.
type PlainObjectCodec struct {
    decoder *json.Decoder
    encoder *json.Encoder
}

// WriteObject implements ObjectCodec.
func (c PlainObjectCodec) WriteObject(stream io.Writer, v interface{}) error {
    if c.encoder != nil {
        return c.encoder.Encode(v)
    }
    return json.NewEncoder(stream).Encode(v)
}

// ReadObject implements ObjectCodec.
func (c PlainObjectCodec) ReadObject(stream *bufio.Reader, v interface{}) error {
    if c.decoder != nil {
        return c.decoder.Decode(v)
    }
    return json.NewDecoder(stream).Decode(v)
}

Tests pass with this change.

I would also add Deprecated label to PlainObjectCodec in favour of PlainObjectStream:

type plainObjectStream struct {
    conn    io.Closer
    decoder *json.Decoder
    encoder *json.Encoder
    mu      sync.Mutex
}

func NewPlainObjectStream(conn io.ReadWriteCloser) ObjectStream {
    os := &plainObjectStream{conn: conn}
    os.encoder = json.NewEncoder(conn)
    os.decoder = json.NewDecoder(conn)
    return os
}

func (os *plainObjectStream) ReadObject(v interface{}) error {
    return os.decoder.Decode(v)
}

func (os *plainObjectStream) WriteObject(v interface{}) error {
    os.mu.Lock()
    defer os.mu.Unlock()
    return os.encoder.Encode(v)
}

func (os *plainObjectStream) Close() error {
    return os.conn.Close()
}
samherrmann commented 2 years ago

As a user of the library, this looks like a good solution that offers a smooth transition.