geektutu / blog

极客兔兔的博客,Coding Coding 创建有趣的开源项目。
https://geektutu.com
Apache License 2.0
166 stars 21 forks source link

动手写RPC框架 - GeeRPC第一天 服务端与消息编码 | 极客兔兔 #91

Open geektutu opened 3 years ago

geektutu commented 3 years ago

https://geektutu.com/post/geerpc-day1.html

7天用 Go语言/golang 从零实现 RPC 框架 GeeRPC 教程(7 days implement golang remote procedure call framework from scratch tutorial),动手写 RPC 框架,参照 golang 标准库 net/rpc 的实现,实现了服务端(server)、支持异步和并发的客户端(client)、消息编码与解码(message encoding and decoding)、服务注册(service register)、支持 TCP/Unix/HTTP 等多种传输协议。第一天实现了一个简单的服务端和消息的编码与解码。

panjianning commented 2 years ago

学习一下,分享下我画的图解

AveryQi115 commented 2 years ago

我想请问一下main.go中这里client端通过Write函数写request之后就通过ReadHeader,ReadBody读取Response了。但是这里没有任何的同步点,是否可能出现client端ReadBody的时候server端还没有写入Response的情况呢?

for i := 0; i < 5; i++ {
        h := &codec.Header{
            ServiceMethod: "Foo.Sum",
            Seq:           uint64(i),
        }
        _ = cc.Write(h, fmt.Sprintf("geerpc req %d", h.Seq))
        _ = cc.ReadHeader(h)
        var reply string
        _ = cc.ReadBody(&reply)
        log.Println("reply:", reply)
    }
wjh791072385 commented 2 years ago

@Nicola115 我想请问一下main.go中这里client端通过Write函数写request之后就通过ReadHeader,ReadBody读取Response了。但是这里没有任何的同步点,是否可能出现client端ReadBody的时候server端还没有写入Response的情况呢?

for i := 0; i < 5; i++ {
      h := &codec.Header{
          ServiceMethod: "Foo.Sum",
          Seq:           uint64(i),
      }
      _ = cc.Write(h, fmt.Sprintf("geerpc req %d", h.Seq))
      _ = cc.ReadHeader(h)
      var reply string
      _ = cc.ReadBody(&reply)
      log.Println("reply:", reply)
  }

我尝试注销cc.write部分,会发现cc.ReadHeader会阻塞住,应该是conn内如果还没有写入数据的话,就不会读出来,直到有数据写入

AveryQi115 commented 2 years ago

@wjh791072385

@Nicola115 我想请问一下main.go中这里client端通过Write函数写request之后就通过ReadHeader,ReadBody读取Response了。但是这里没有任何的同步点,是否可能出现client端ReadBody的时候server端还没有写入Response的情况呢?

for i := 0; i < 5; i++ {
        h := &codec.Header{
            ServiceMethod: "Foo.Sum",
            Seq:           uint64(i),
        }
        _ = cc.Write(h, fmt.Sprintf("geerpc req %d", h.Seq))
        _ = cc.ReadHeader(h)
        var reply string
        _ = cc.ReadBody(&reply)
        log.Println("reply:", reply)
    }

我尝试注销cc.write部分,会发现cc.ReadHeader会阻塞住,应该是conn内如果还没有写入数据的话,就不会读出来,直到有数据写入

谢谢。 我之前理解错了,conn应该是本身在client端和server端就有独立的数据缓冲区的,所以如果server端还没写入,client端这边就读不到就会被block

Sentiger commented 2 years ago

这里应该存在发送options的时候,服务端读取到options后面header的内容吧。导致再次读取后面的内容是不完整的,导致gob编码失败

callmePicacho commented 1 year ago

请教各位大佬一个问题,例如在 main.go 中,大量存在

_ = cc.Write(h, fmt.Sprintf("geerpc req %d", h.Seq))
_ = cc.ReadHeader(h)

像这样的返回值既然要抛弃,为何不直接写成这样呢?

cc.Write(h, fmt.Sprintf("geerpc req %d", h.Seq))
cc.ReadHeader(h)
ZAKLLL commented 1 year ago

当我尝试将Options 通过gob 编码发送后(服务端也使用 gob 解码)代码会在readRequestHeader 这个函数阻塞,这是为什么呢

main.go

    time.Sleep(time.Second)
    // 告知服务端本次链接的options
    //_ = json.NewEncoder(conn).Encode(server.DefaultOption)
    _ = gob.NewEncoder(conn).Encode(server.DefaultOption)

    //使用GobCodec 作为编辑编解码器
    cc := codec.NewGobCodec(conn)
    // send request & receive response
    for i := 0; i < 5; i++ {
        h := &codec.Header{
            ServiceMethod: "Foo.Sum",
            Seq:           uint64(i),
        }
        _ = cc.Write(h, fmt.Sprintf("geerpc req %d", h.Seq))
        _ = cc.ReadHeader(h)
        var reply string
        _ = cc.ReadBody(&reply)
        log.Println("reply:", reply)
    }

server.go

// ServeConn runs the server on a single connection.
// ServeConn blocks, serving the connection until the client hangs up.
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
    defer func() { _ = conn.Close() }()
    var opt Option
    //if err := json.NewDecoder(conn).Decode(&opt); err != nil {
    if err := gob.NewDecoder(conn).Decode(&opt); err != nil {
        log.Println("rpc server: options error: ", err)
        return
    }
    if opt.MagicNumber != MagicNumber {
        log.Printf("rpc server: invalid magic number %x", opt.MagicNumber)
        return
    }
    f := codec.NewCodecFuncMap[opt.CodecType]
    if f == nil {
        log.Printf("rpc server: invalid codec type %s", opt.CodecType)
        return
    }
    server.serveCodec(f(conn))
}
yikuaibro commented 1 year ago
    _ = cc.Write(h, fmt.Sprintf("geerpc req %d", h.Seq))
    //_ = cc.ReadHeader(h)
    time.Sleep(5 * time.Second)
    var reply string
    _ = cc.ReadBody(&reply)

请教下,为什么注释掉ReadHeader,ReadBody会读不到东西,已经加了time.sleep,我理解的不是很清楚。

        &{Foo.Sum 0 } geerpc req 0
        reply:
        &{Foo.Sum 1 } geerpc req 1
        reply: geerpc resp 0
        &{Foo.Sum 2 } geerpc req 2
        reply:
        &{Foo.Sum 3 } geerpc req 3
        reply: geerpc resp 1
        &{Foo.Sum 4 } geerpc req 4
        reply:
simon12138-code commented 1 year ago

我出现了一个rpc server: invalid codec type 的报错,代码都是一摸一样的,请问这是哪里出现了问题,我调试出来主要是,没有接收到对应默认option的codectype

simon12138-code commented 1 year ago

已经解决, Option属性私有化了,非常的愚蠢

Drum-sys commented 1 year ago

@yikuaibro = cc.Write(h, fmt.Sprintf("geerpc req %d", h.Seq)) // = cc.ReadHeader(h) time.Sleep(5 * time.Second) var reply string _ = cc.ReadBody(&reply)

请教下,为什么注释掉ReadHeader,ReadBody会读不到东西,已经加了time.sleep,我理解的不是很清楚。

        &{Foo.Sum 0 } geerpc req 0
        reply:
        &{Foo.Sum 1 } geerpc req 1
        reply: geerpc resp 0
        &{Foo.Sum 2 } geerpc req 2
        reply:
        &{Foo.Sum 3 } geerpc req 3
        reply: geerpc resp 1
        &{Foo.Sum 4 } geerpc req 4
        reply:

数据格式|header|body|heder|body Header 类型codec.header raply 类型string cc.ReadBody(&reply) reply字符串应该不能解析到codec.header 结构体当中

把reply类型改一下 var reply codec.Header _ = cc.ReadBody(&reply) 2022/10/17 17:08:36 start rpc server on [::]:62015 2022/10/17 17:08:37 reply: {Foo.Sum 0 } 2022/10/17 17:08:37 reply: { 0 } 2022/10/17 17:08:37 reply: {Foo.Sum 1 } 2022/10/17 17:08:37 reply: { 0 } 2022/10/17 17:08:37 reply: {Foo.Sum 2 }

sunviv commented 1 year ago

time.Sleep(time.Second) 为什么这地方要阻塞 1s

huty1998 commented 1 year ago

@panjianning

学习一下,分享下我画的图解

请问兄弟用的什么软件画的图?

Asolmn commented 1 year ago

@yikuaibro = cc.Write(h, fmt.Sprintf("geerpc req %d", h.Seq)) // = cc.ReadHeader(h) time.Sleep(5 * time.Second) var reply string _ = cc.ReadBody(&reply)

请教下,为什么注释掉ReadHeader,ReadBody会读不到东西,已经加了time.sleep,我理解的不是很清楚。

        &{Foo.Sum 0 } geerpc req 0
        reply:
        &{Foo.Sum 1 } geerpc req 1
        reply: geerpc resp 0
        &{Foo.Sum 2 } geerpc req 2
        reply:
        &{Foo.Sum 3 } geerpc req 3
        reply: geerpc resp 1
        &{Foo.Sum 4 } geerpc req 4
        reply:

解析是要安装顺序的,序列化的时候是先header然后body。读取的时候进行反序列化,所以也得安装序列化时候的顺序去解析,这就得先读取header再读body

SCUTking commented 7 months ago

@callmePicacho 请教各位大佬一个问题,例如在 main.go 中,大量存在

_ = cc.Write(h, fmt.Sprintf("geerpc req %d", h.Seq))
_ = cc.ReadHeader(h)

像这样的返回值既然要抛弃,为何不直接写成这样呢?

cc.Write(h, fmt.Sprintf("geerpc req %d", h.Seq))
cc.ReadHeader(h)

用_接收不会有警告 为了好看

LumosLiang commented 7 months ago

@sunviv time.Sleep(time.Second) 为什么这地方要阻塞 1s

同问,不过结合之前大佬们的回复,这里是解决粘包的问题吗?

callmePicacho commented 7 months ago

@sunviv time.Sleep(time.Second) 为什么这地方要阻塞 1s

同问,不过结合之前大佬们的回复,这里是解决粘包的问题吗?

确保连接成功建立

taosu0216 commented 5 months ago

画的有点乱,但是大概逻辑是能理清了

TheWaveLab commented 4 months ago

@callmePicacho

@sunviv time.Sleep(time.Second) 为什么这地方要阻塞 1s

同问,不过结合之前大佬们的回复,这里是解决粘包的问题吗?

确保连接成功建立

粘包是靠在header里指定body长度解决

TheWaveLab commented 4 months ago

@callmePicacho 请教各位大佬一个问题,例如在 main.go 中,大量存在

_ = cc.Write(h, fmt.Sprintf("geerpc req %d", h.Seq))
_ = cc.ReadHeader(h)

像这样的返回值既然要抛弃,为何不直接写成这样呢?

cc.Write(h, fmt.Sprintf("geerpc req %d", h.Seq))
cc.ReadHeader(h)

这个只是为了让IDE忽略没有错误处理的警告

TheWaveLab commented 4 months ago

@MoneyHappy day1-codec/main/main.go 文件中这行代码 _ = cc.ReadHeader(h) 的作用是啥,没太理解😢

为了解决tcp粘包,一般报文设计都会有先是header,在里面指定后面body的长度,所以要先读取header,判断报文类型和后续要读取的长度等。其实这个写的不太严谨,这里应该需要判断错误的,而不是直接忽略。

nineth1 commented 2 months ago

想请教一下,将header和body通过buffer缓冲一起进行发送,在client或server中进行读取的时候会出现粘包问题吗? 这里header好像没有固定大小?

vito-go commented 2 months ago

想请教一下,将header和body通过buffer缓冲一起进行发送,在client或server中进行读取的时候会出现粘包问题吗? 这里header好像没有固定大小?

Although the header and body are sent together, when the header and body are sent separately, the length of the bytes is marked. enc.countState.encodeUint(uint64(messageLen))

source code: go/src/encoding/gob/encoder.go

// writeMessage sends the data item preceded by an unsigned count of its length.
func (enc *Encoder) writeMessage(w io.Writer, b *encBuffer) {
    // Space has been reserved for the length at the head of the message.
    // This is a little dirty: we grab the slice from the bytes.Buffer and massage
    // it by hand.
    message := b.Bytes()
    messageLen := len(message) - maxLength
    // Length cannot be bigger than the decoder can handle.
    if messageLen >= tooBig {
        enc.setError(errors.New("gob: encoder: message too big"))
        return
    }
    // Encode the length.
    enc.countState.b.Reset()
    enc.countState.encodeUint(uint64(messageLen))
    // Copy the length to be a prefix of the message.
    offset := maxLength - enc.countState.b.Len()
    copy(message[offset:], enc.countState.b.Bytes())
    // Write the data.
    _, err := w.Write(message[offset:])
    // Drain the buffer and restore the space at the front for the count of the next message.
    b.Reset()
    b.Write(spaceForLength)
    if err != nil {
        enc.setError(err)
    }
}