geektutu / blog

极客兔兔的博客,Coding Coding 创建有趣的开源项目。
https://geektutu.com
Apache License 2.0
167 stars 21 forks source link

动手写RPC框架 - GeeRPC第二天 支持并发与异步的客户端 | 极客兔兔 #92

Open geektutu opened 3 years ago

geektutu commented 3 years ago

https://geektutu.com/post/geerpc-day2.html

7天用 Go语言/golang 从零实现 RPC 框架 GeeRPC 教程(7 days implement golang remote procedure call framework from scratch tutorial),动手写 RPC 框架,参照 golang 标准库 net/rpc 的实现,实现了服务端(server)、支持异步和并发的客户端(client)、消息编码与解码(message encoding and decoding)、服务注册(service register)、支持 TCP/Unix/HTTP 等多种传输协议。第二天实现了一个支持异步(asynchronous)和并发(concurrent)的客户端。

furthergo commented 3 years ago
// client.go
func (client *Client) Go(serviceMethod string, args, reply interface{}, done chan *Call) *Call {
    if done == nil {
        done = make(chan *Call, 10)
    } else if cap(done) == 0 {
        log.Panic("rpc client: done channel is unbuffered")
    }

done = make(chan *Call, 10) 官方库这里对chan的处理感觉很奇怪呀,为啥要buffered大小是10个,会有这种场景么

geektutu commented 3 years ago

@furthergo 我看到这里的时候也觉得蛮奇怪的,buffer 设置为 1,不要阻塞 call 的返回,理论上就OK了,我觉得 10 只是随手写的一个数字吧。

yangchen97 commented 3 years ago

client.GO这个异步接口该怎么调用呢

geektutu commented 3 years ago

@yangchen97

Go 返回了 call,call.Done 是一个信道(chan),没有什么特别的地方,按照普通信道来处理就好了。比如直接阻塞就是同步调用,类似于 Call 里的做法,如果不想阻塞,新启动一个协程等待结果,其他函数继续往下执行。

比如:

call := client.Go( ... )
# 新启动协程,异步等待
go func(call *Call) {
    select {
        <-call.Done:
            # do something
        <-otherChan:
            # do something
    }
}(call)

otherFunc() # 不阻塞,继续执行其他函数。
yangchen97 commented 3 years ago

@geektutu 明白了。还想再问一个问题, client.Go() 函数里的 client.send() ,是否应该为 go client.send() ?我认为返回 call 不需要等待 client.send() 执行完。

geektutu commented 3 years ago

@yangchen97

我觉得你的理解是对的,我看了下,确实没有等待 send() 完成的必要。我给 golang 标准库提了一个 PR,看看官方的解释是什么样的~

qingyunha commented 3 years ago

@yangchen97

我觉得你的理解是对的,我看了下,确实没有等待 send() 完成的必要。我给 golang 标准库提了一个 PR,看看官方的解释是什么样的~

go的网络IO本身就是异步的,加上这个go 提前返回也没必要吧。如果真想这么做, go func() { call := client.Go(...) } () ?

geektutu commented 3 years ago

@yangchen97 @qingyunha 在这个地方,网络IO是同步等待的,不是异步的。请求发送成功,send() 才会返回。不过 golang 的回复是 net/rpc 已经冻结了,不再接受新的特性了。但是就这个点而言,如果网络情况不太好的情况下,发送请求耗时很长,确实对性能会有一定的影响。

Rob Pike Patchset 1 15:54 As the documentation says, The net/rpc package is frozen and is not accepting new features.

异步请求确实有很多种其他的方式,但是 client.Go 的好处在于参数 done chan *Call 可以自定义缓冲区的大小,可以给多个 client.Go 传入同一个 chan 对象,从而控制异步请求并发的数量。其他方式就需要自己控制和实现了。

andcarefree commented 3 years ago

client.Dial方法中的defer不是很明白,此处的执行顺序是1.NewClient 2.defer 3.return 这个顺序吗

geektutu commented 3 years ago

@andcarefree 这是 Go defer 的运行机制,在 return 语句之后,函数退出之前执行,defer 执行时,返回值已经被赋值了。

你可以写个简单的函数验证下:

func test() (ans int) {
    defer func() {
        fmt.Println(ans)
    }()
    return 10
}

func main() {
    test()
}

输出是 10。

andcarefree commented 3 years ago

@andcarefree 这是 Go defer 的运行机制,在 return 语句之后,函数退出之前执行,defer 执行时,返回值已经被赋值了。

你可以写个简单的函数验证下:

func test() (ans int) {
  defer func() {
      fmt.Println(ans)
  }()
  return 10
}

func main() {
  test()
}

输出是 10。

多谢解惑

chinawilon commented 3 years ago

@furthergo 我看到这里的时候也觉得蛮奇怪的,buffer 设置为 1,不要阻塞 call 的返回,理论上就OK了,我觉得 10 只是随手写的一个数字吧。

如果客户端大量基于这个chan的rpc异步请求,那么1显然是不够的,10是比较合理的。

chenshuidejidan commented 3 years ago

为什么方法的返回值(仅单个返回值)不需要的时候要用_抛弃,而不是直接调用不取返回值呢

echo-li1024 commented 3 years ago

seq, err := client.registerCall(call) if err != nil { call.Error = err call.done() return }

请问这个seq不是当前call的seq加1吗 这样的话 if err := client.cc.Write(&client.header, call.Args); err != nil { call := client.removeCall(seq) 这里remove的seq不应该是seq-1吗

JesseStutler commented 3 years ago
type Call struct {
    Seq           uint64
    ServiceMethod string      // format "<service>.<method>"
    Args          interface{} // arguments to the function
    Reply         interface{} // reply from the function
    Error         error       // if error occurs, it will be set
    Done          chan *Call  // Strobes when call is complete.
}

type Header struct {
    ServiceMethod string 
    Seq           uint64 
    Error         string 

想问一下,为什么Call与Header有公共字段,而不直接在Call中用Header呢

IAOTW commented 3 years ago

我也与@JesseStutler有同样的问题 seq, err := client.registerCall(call) if err != nil { call.Error = err call.done() return } // prepare request header client.header.ServiceMethod = call.ServiceMethod client.header.Seq = seq client.header.Error = "" 在send()方法里面,client.pending中是可以有多个call的,为啥client.header字段需要跟随call的变化而变化呀

wilgx0 commented 3 years ago

唉 决定了 下个月就转行卖烧烤去了 再会了 同志们

wilgx0 commented 3 years ago

@yudidi 请问五个条件的出处是哪里呢?

不要纠结了 一起卖烧烤去

wilgx0 commented 3 years ago

@ls8725 我也与@JesseStutler有同样的问题 seq, err := client.registerCall(call) if err != nil { call.Error = err call.done() return } // prepare request header client.header.ServiceMethod = call.ServiceMethod client.header.Seq = seq client.header.Error = "" 在send()方法里面,client.pending中是可以有多个call的,为啥client.header字段需要跟随call的变化而变化呀

因为每个请求的ServiceMethod和Seq是不一样的,一方面服务端需要根据ServiceMethod来调用相应的方法 另一面 客户端中的func (client *Client) receive() 接收功能 需要根据Seq来判断服务器端的响应消息

junxxie commented 3 years ago
//registerCall:将参数 call 添加到 client.pending 中,并更新 client.seq
func (c *Client) registerCall(call *Call) (uint64, error) {

    // todo client前面被锁住了,这里为什么还需要再锁一次呢?
    c.mu.Lock()
    defer c.mu.Unlock()
    if c.closing || c.shutdown {
        return 0, ErrShutDown
    }
    call.Seq = c.seq
    c.pending[call.Seq] = call
    c.seq++
    return call.Seq, nil
}

调用registerCall时,client刚被锁住,在registerCall中再锁一次是否多余?

GodXuebi commented 3 years ago

func (client *Client) terminateCalls(err error) { client.sending.Lock() defer client.sending.Unlock() client.mu.Lock() defer client.mu.Unlock() client.shutdown = true for _, call := range client.pending { call.Error = err call.done() } }

请问为什么这里要使用sending锁呢?对call.Done()这个环节应该不需要使用sending锁吧。

receive函数中的 if err := client.cc.Write(&client.header, call.Args); err != nil { call := client.removeCall(seq) // call may be nil, it usually means that Write partially failed, // client has received the response and handled if call != nil { call.Error = err call.done() } } } call.done()就没有加sending。

我的理解是client向远端发送报文的时候才需要加sending锁

shengxiang19 commented 3 years ago

@ls8725 我也与@JesseStutler有同样的问题 seq, err := client.registerCall(call) if err != nil { call.Error = err call.done() return } // prepare request header client.header.ServiceMethod = call.ServiceMethod client.header.Seq = seq client.header.Error = "" 在send()方法里面,client.pending中是可以有多个call的,为啥client.header字段需要跟随call的变化而变化呀

我觉得client.header字段的目的是减少内存申请。一般来说,每次调用cc.write都需要新建一个Header类型的临时变量。尽管客户端是多协程的,但sending互斥量确保了每次只有一个协程可以调用cc.write方法,因此可以直接使用client.header字段。这也就是sending注释中protect following的含义。

gu18168 commented 3 years ago

@JesseStutler

type Call struct {
  Seq           uint64
  ServiceMethod string      // format "<service>.<method>"
  Args          interface{} // arguments to the function
  Reply         interface{} // reply from the function
  Error         error       // if error occurs, it will be set
  Done          chan *Call  // Strobes when call is complete.
}

type Header struct {
  ServiceMethod string 
  Seq           uint64 
  Error         string 

想问一下,为什么Call与Header有公共字段,而不直接在Call中用Header呢

我对于这里的理解是, 确实存在公共字段, 不重用 Header 理由不是代码原因, 而是语义原因, 因为 Header 的存在是为了通讯, 而 Call 本身不涉及通讯, 他只是客户端发起的调用, 客户端理论上来说是不知道底层是如何调用的, 所以这里没有使用 Header , 而且使用了 Header 的话, 在初始化 Call 的时候, 代码也会复杂一点.

cuglaiyp commented 3 years ago

@chinawilon

@furthergo 我看到这里的时候也觉得蛮奇怪的,buffer 设置为 1,不要阻塞 call 的返回,理论上就OK了,我觉得 10 只是随手写的一个数字吧。

如果客户端大量基于这个chan的rpc异步请求,那么1显然是不够的,10是比较合理的。 基于同一个chan的话就需要客户端传入chan,客户端如果传入了chan,就不会走初始化为10的逻辑了,所以我觉得你这个逻辑不太成立

nanfeng1999 commented 3 years ago
        call := client.removeCall(h.Seq)
        switch {
        case call == nil:
            // it usually means that Write partially failed
            // and call was already removed.
            err = client.cc.ReadBody(nil)
        case h.Error != "":
            call.Error = fmt.Errorf(h.Error)
            err = client.cc.ReadBody(nil)
            call.done()
        default:
            err = client.cc.ReadBody(call.Reply)
            if err != nil {
                call.Error = errors.New("reading body " + err.Error())
            }
            call.done()
        }

有个地方很奇怪,在removeCall中已经把对应的call从pending中删除了,那么为什么还需要调用call.done呢,就算执行了client.terminateCalls也通知不到啊

walkmiao commented 3 years ago

@yzy-github

      call := client.removeCall(h.Seq)
      switch {
      case call == nil:
          // it usually means that Write partially failed
          // and call was already removed.
          err = client.cc.ReadBody(nil)
      case h.Error != "":
          call.Error = fmt.Errorf(h.Error)
          err = client.cc.ReadBody(nil)
          call.done()
      default:
          err = client.cc.ReadBody(call.Reply)
          if err != nil {
              call.Error = errors.New("reading body " + err.Error())
          }
          call.done()
      }

有个地方很奇怪,在removeCall中已经把对应的call从pending中删除了,那么为什么还需要调用call.done呢,就算执行了client.terminateCalls也通知不到啊

这里只是从服务器返回的信息当中拿到第一个被处理的call 解码后call.done是为了通知调用者这个call已经完成了

Jaime1129 commented 3 years ago

@chinawilon

@furthergo 我看到这里的时候也觉得蛮奇怪的,buffer 设置为 1,不要阻塞 call 的返回,理论上就OK了,我觉得 10 只是随手写的一个数字吧。

如果客户端大量基于这个chan的rpc异步请求,那么1显然是不够的,10是比较合理的。

@GodXuebi func (client *Client) terminateCalls(err error) { client.sending.Lock() defer client.sending.Unlock() client.mu.Lock() defer client.mu.Unlock() client.shutdown = true for _, call := range client.pending { call.Error = err call.done() } }

请问为什么这里要使用sending锁呢?对call.Done()这个环节应该不需要使用sending锁吧。

receive函数中的 if err := client.cc.Write(&client.header, call.Args); err != nil { call := client.removeCall(seq) // call may be nil, it usually means that Write partially failed, // client has received the response and handled if call != nil { call.Error = err call.done() } } } call.done()就没有加sending。

我的理解是client向远端发送报文的时候才需要加sending锁

这里获取sending锁可以防止新的Call被注册到pending map中。

zyb284629791 commented 3 years ago

var _ io.Closer = (*Client)(nil) 你好,这段代码是什么意思呢?在网上也没找到太多的信息。

zyb284629791 commented 3 years ago

另外,client中的mu锁和sending为什么要分开,也没有太搞懂。我理解大多数时候都是用mu锁是因为client可以复用,为了避免相互干扰。但在send的时候使用sending锁,如果这时候有其他协程调用了close之类的方法又怎么处理呢?

andcarefree commented 3 years ago

var _ io.Closer = (*Client)(nil) 你好,这段代码是什么意思呢?在网上也没找到太多的信息。

断言client实现了closer接口,go里面这个写法很常见

zyb284629791 commented 3 years ago

@andcarefree

var _ io.Closer = (*Client)(nil) 你好,这段代码是什么意思呢?在网上也没找到太多的信息。

断言client实现了closer接口,go里面这个写法很常见

好的。谢谢!

zyb284629791 commented 3 years ago

请问服务端和客户端是怎么协商编解码方式的? 在客户端启动的时候通过json.NewEncoder().encode()向服务端发送了option参数,此时服务端如何处理呢?服务端在Accept之后首先要解option没问题,但后续要继续readRequest啊,在读不到request之后并不会给客户端返回任何消息,那客户端是如何知道应该使用哪种编码方式呢。 其次,客户端在初始化NewClient的时候并未向penging中注册任何call,但是又在receive中从client中removeCall,必然后出现nil的情况导致receive失败。那又如何正常协商成功呢?

   //服务端serverCodec
   func (s *Server) serverCodec(c codec.Codec) {
     sending := new(sync.Mutex)
     wg := new(sync.WaitGroup)
     for {
        req, err := s.ReadRequest(c)
        if err != nil {
            if req == nil {
                break // it's not possible to recover, so close the connection
            }
            req.header.Error = err.Error()
            s.sendResponse(c, req.header, invalidRequest, sending)
            continue
        }
        wg.Add(1)
        s.handleRequest(c, req, sending, wg)
     }
   }

   //客户端receive    
   call := client.removeCall(h.Seq)
   switch {
      case call == nil:
      // it usually means that Write partially failed
      // and call was already removed.
      err = client.cc.ReadBody(nil)
zyb284629791 commented 3 years ago

请问服务端和客户端是怎么协商编解码方式的? 在客户端启动的时候通过json.NewEncoder().encode()向服务端发送了option参数,此时服务端如何处理呢?服务端在Accept之后首先要解option没问题,但后续要继续readRequest啊,在读不到request之后并不会给客户端返回任何消息,那客户端是如何知道应该使用哪种编码方式呢。 其次,客户端在初始化NewClient的时候并未向penging中注册任何call,但是又在receive中从client中removeCall,必然后出现nil的情况导致receive失败。那又如何正常协商成功呢?

   //服务端serverCodec
   func (s *Server) serverCodec(c codec.Codec) {
     sending := new(sync.Mutex)
   wg := new(sync.WaitGroup)
   for {
      req, err := s.ReadRequest(c)
      if err != nil {
          if req == nil {
              break // it's not possible to recover, so close the connection
          }
          req.header.Error = err.Error()
          s.sendResponse(c, req.header, invalidRequest, sending)
          continue
      }
      wg.Add(1)
      s.handleRequest(c, req, sending, wg)
   }
   }

   //客户端receive    
   call := client.removeCall(h.Seq)
   switch {
      case call == nil:
      // it usually means that Write partially failed
      // and call was already removed.
      err = client.cc.ReadBody(nil)
if err := json.NewEncoder(conn).Encode(opt); err != nil {
    log.Println("rpc client: options error: ", err)
    _ = conn.Close()
    return nil, err
}

又看了一遍代码,难道上面的代码意思是每次发送请求之前都带上option吗?如果是这样的话,那客户端协商通过后马上创建协程调用receive方法,此时读不到任何数据,也没看到哪里会阻塞,那不是会造成for循环无限空转吗- - 刚开始学go,希望大佬能多多帮助。

zyb284629791 commented 3 years ago

请问服务端和客户端是怎么协商编解码方式的? 在客户端启动的时候通过json.NewEncoder().encode()向服务端发送了option参数,此时服务端如何处理呢?服务端在Accept之后首先要解option没问题,但后续要继续readRequest啊,在读不到request之后并不会给客户端返回任何消息,那客户端是如何知道应该使用哪种编码方式呢。 其次,客户端在初始化NewClient的时候并未向penging中注册任何call,但是又在receive中从client中removeCall,必然后出现nil的情况导致receive失败。那又如何正常协商成功呢?

   //服务端serverCodec
   func (s *Server) serverCodec(c codec.Codec) {
     sending := new(sync.Mutex)
   wg := new(sync.WaitGroup)
   for {
      req, err := s.ReadRequest(c)
      if err != nil {
          if req == nil {
              break // it's not possible to recover, so close the connection
          }
          req.header.Error = err.Error()
          s.sendResponse(c, req.header, invalidRequest, sending)
          continue
      }
      wg.Add(1)
      s.handleRequest(c, req, sending, wg)
   }
   }

   //客户端receive    
   call := client.removeCall(h.Seq)
   switch {
      case call == nil:
      // it usually means that Write partially failed
      // and call was already removed.
      err = client.cc.ReadBody(nil)
if err := json.NewEncoder(conn).Encode(opt); err != nil {
    log.Println("rpc client: options error: ", err)
    _ = conn.Close()
    return nil, err
}

又看了一遍代码,难道上面的代码意思是每次发送请求之前都带上option吗?如果是这样的话,那客户端协商通过后马上创建协程调用receive方法,此时读不到任何数据,也没看到哪里会阻塞,那不是会造成for循环无限空转吗- - 刚开始学go,希望大佬能多多帮助。

zyb284629791 commented 3 years ago
func (client *Client) send(call *Call) {
    ....
    // prepare request header
    client.header.ServiceMethod = call.ServiceMethod
    client.header.Seq = seq
    client.header.Error = ""
    ....
}

请问上面client.header的时候不会空指针吗?在创建client的时候并没有对header初始化啊

zyb284629791 commented 3 years ago

不好意思还有个问题,请问IsAvailable这个方法在什么时候调用呢? 我在registerCall中调用registerCall会导致获取锁阻塞,查了资料发现Mutex是不可重入锁,那这样的话IsAvailable这个方法会在什么场景下使用呢?

func (client *Client) registerCall(call *Call) (uint64, error) {
    client.mu.Lock()
    defer client.mu.Unlock()
    if client.closing || client.shutdown //这一行调用IsAvailable
}
zyb284629791 commented 3 years ago
func (client *Client) send(call *Call) {
    ....
    // prepare request header
  client.header.ServiceMethod = call.ServiceMethod
  client.header.Seq = seq
  client.header.Error = ""
    ....
}

请问上面client.header的时候不会空指针吗?在创建client的时候并没有对header初始化啊

刚才对比了代码,发现是因为我的代码在Client中对Header变量类型与源码不同,我设置成了指针类型,因此不会自动初始化,而源码中是原生结构体,在创建Client对象时会自动初始化为对应类型的0值类型。 想请问以下一般什么时候使用结构体本身类型,什么时候选择指针类型呢- - 我现在写代码的时候一般看到结构体类型变量都会使用指针类型来声明,因为一般传参结构体是值类型会copy一份新的出去,所以为了传引用就直接采用指针类型了。。。

zyb284629791 commented 3 years ago

请问服务端和客户端是怎么协商编解码方式的? 在客户端启动的时候通过json.NewEncoder().encode()向服务端发送了option参数,此时服务端如何处理呢?服务端在Accept之后首先要解option没问题,但后续要继续readRequest啊,在读不到request之后并不会给客户端返回任何消息,那客户端是如何知道应该使用哪种编码方式呢。 其次,客户端在初始化NewClient的时候并未向penging中注册任何call,但是又在receive中从client中removeCall,必然后出现nil的情况导致receive失败。那又如何正常协商成功呢?

   //服务端serverCodec
   func (s *Server) serverCodec(c codec.Codec) {
     sending := new(sync.Mutex)
     wg := new(sync.WaitGroup)
     for {
        req, err := s.ReadRequest(c)
        if err != nil {
            if req == nil {
                break // it's not possible to recover, so close the connection
            }
            req.header.Error = err.Error()
            s.sendResponse(c, req.header, invalidRequest, sending)
            continue
        }
        wg.Add(1)
        s.handleRequest(c, req, sending, wg)
     }
   }

   //客户端receive    
   call := client.removeCall(h.Seq)
   switch {
      case call == nil:
      // it usually means that Write partially failed
      // and call was already removed.
      err = client.cc.ReadBody(nil)
if err := json.NewEncoder(conn).Encode(opt); err != nil {
  log.Println("rpc client: options error: ", err)
  _ = conn.Close()
  return nil, err
}

又看了一遍代码,难道上面的代码意思是每次发送请求之前都带上option吗?如果是这样的话,那客户端协商通过后马上创建协程调用receive方法,此时读不到任何数据,也没看到哪里会阻塞,那不是会造成for循环无限空转吗- - 刚开始学go,希望大佬能多多帮助。

抱歉,刚才测试了以下,在encoder .read的时候是会自己阻塞。但是还有一个问题没想明白,就是optionencode了一次,但是每次write的时候都会把缓冲区flush掉,那下次写入的时候为什么option会自动添加进去呢?

cyj19 commented 3 years ago

@zyb284629791

请问服务端和客户端是怎么协商编解码方式的? 在客户端启动的时候通过json.NewEncoder().encode()向服务端发送了option参数,此时服务端如何处理呢?服务端在Accept之后首先要解option没问题,但后续要继续readRequest啊,在读不到request之后并不会给客户端返回任何消息,那客户端是如何知道应该使用哪种编码方式呢。 其次,客户端在初始化NewClient的时候并未向penging中注册任何call,但是又在receive中从client中removeCall,必然后出现nil的情况导致receive失败。那又如何正常协商成功呢?

   //服务端serverCodec
   func (s *Server) serverCodec(c codec.Codec) {
     sending := new(sync.Mutex)
   wg := new(sync.WaitGroup)
   for {
      req, err := s.ReadRequest(c)
      if err != nil {
          if req == nil {
              break // it's not possible to recover, so close the connection
          }
          req.header.Error = err.Error()
          s.sendResponse(c, req.header, invalidRequest, sending)
          continue
      }
      wg.Add(1)
      s.handleRequest(c, req, sending, wg)
   }
   }

   //客户端receive    
   call := client.removeCall(h.Seq)
   switch {
      case call == nil:
      // it usually means that Write partially failed
      // and call was already removed.
      err = client.cc.ReadBody(nil)
if err := json.NewEncoder(conn).Encode(opt); err != nil {
    log.Println("rpc client: options error: ", err)
    _ = conn.Close()
    return nil, err
}

又看了一遍代码,难道上面的代码意思是每次发送请求之前都带上option吗?如果是这样的话,那客户端协商通过后马上创建协程调用receive方法,此时读不到任何数据,也没看到哪里会阻塞,那不是会造成for循环无限空转吗- - 刚开始学go,希望大佬能多多帮助。

抱歉,刚才测试了以下,在encoder .read的时候是会自己阻塞。但是还有一个问题没想明白,就是optionencode了一次,但是每次write的时候都会把缓冲区flush掉,那下次写入的时候为什么option会自动添加进去呢?

只有在客户端和服务端建立连接的时候才会处理一次报文头option;服务端建立连接后在serverCodec中循环处理报文体(header+body),所以服务端每次write的都是报文体,不会添加报文头的

cyj19 commented 3 years ago

@yzy-github

      call := client.removeCall(h.Seq)
      switch {
      case call == nil:
          // it usually means that Write partially failed
          // and call was already removed.
          err = client.cc.ReadBody(nil)
      case h.Error != "":
          call.Error = fmt.Errorf(h.Error)
          err = client.cc.ReadBody(nil)
          call.done()
      default:
          err = client.cc.ReadBody(call.Reply)
          if err != nil {
              call.Error = errors.New("reading body " + err.Error())
          }
          call.done()
      }

有个地方很奇怪,在removeCall中已经把对应的call从pending中删除了,那么为什么还需要调用call.done呢,就算执行了client.terminateCalls也通知不到啊

因为在异步调用Go方法中会把这个call返回给调用者

func (client *Client) Go(serviceMethod string, args, reply interface{}, done chan *Call) *Call {
    if done == nil {
        done = make(chan *Call, 10)
    } else if cap(done) == 0 {
        log.Panic("rpc client: done channel is unbuffered")
    }

    call := &Call{
        ServiceMethod: serviceMethod,
        Args:          args,
        Reply:         reply,
        Done:          done,
    }
    client.send(call)
    return call
}
cyj19 commented 3 years ago

@andyli722 seq, err := client.registerCall(call) if err != nil { call.Error = err call.done() return }

请问这个seq不是当前call的seq加1吗 这样的话 if err := client.cc.Write(&client.header, call.Args); err != nil { call := client.removeCall(seq) 这里remove的seq不应该是seq-1吗

registerCall返回的是call.Seq,加1的是client.seq

func (client *Client) registerCall(call *Call) (uint64, error) {
    client.mu.Lock()
    defer client.mu.Unlock()
    if client.closing || client.shutdown {
        return 0, ErrShutdown
    }
    call.Seq = client.seq
    client.pending[call.Seq] = call
    client.seq++
    // 这里返回的是call.Seq,不是client.seq
    return call.Seq, nil
}
cyj19 commented 3 years ago

@Jaime1129

@chinawilon

@furthergo 我看到这里的时候也觉得蛮奇怪的,buffer 设置为 1,不要阻塞 call 的返回,理论上就OK了,我觉得 10 只是随手写的一个数字吧。

如果客户端大量基于这个chan的rpc异步请求,那么1显然是不够的,10是比较合理的。

@GodXuebi func (client *Client) terminateCalls(err error) { client.sending.Lock() defer client.sending.Unlock() client.mu.Lock() defer client.mu.Unlock() client.shutdown = true for _, call := range client.pending { call.Error = err call.done() } }

请问为什么这里要使用sending锁呢?对call.Done()这个环节应该不需要使用sending锁吧。

receive函数中的 if err := client.cc.Write(&client.header, call.Args); err != nil { call := client.removeCall(seq) // call may be nil, it usually means that Write partially failed, // client has received the response and handled if call != nil { call.Error = err call.done() } } } call.done()就没有加sending。

我的理解是client向远端发送报文的时候才需要加sending锁

这里获取sending锁可以防止新的Call被注册到pending map中。

有个疑问,registerCall中是用client.mu来锁的,即使在terminaterCalls中不加sending锁,新的Call也不会被注册到pending map中吧?

cuglaiyp commented 2 years ago

@junxxie

//registerCall:将参数 call 添加到 client.pending 中,并更新 client.seq
func (c *Client) registerCall(call *Call) (uint64, error) {

  // todo client前面被锁住了,这里为什么还需要再锁一次呢?
  c.mu.Lock()
  defer c.mu.Unlock()
  if c.closing || c.shutdown {
      return 0, ErrShutDown
  }
  call.Seq = c.seq
  c.pending[call.Seq] = call
  c.seq++
  return call.Seq, nil
}

调用registerCall时,client刚被锁住,在registerCall中再锁一次是否多余?

这个方法有临界代码,那么就得上锁。不能因为调用它的方法已经上了锁,就不用加了,因为万一有其他没上锁的方法调用了这个方法,就会导致并发问题

cuglaiyp commented 2 years ago

@chenshuidejidan 为什么方法的返回值(仅单个返回值)不需要的时候要用_抛弃,而不是直接调用不取返回值呢

同问

cuglaiyp commented 2 years ago

@cuglaiyp

@chenshuidejidan 为什么方法的返回值(仅单个返回值)不需要的时候要用_抛弃,而不是直接调用不取返回值呢

同问

翻到一篇文章,简单来说,使用_显示的丢弃返回值,会让代码意图更明显、也更加安全。

niconical commented 2 years ago

@echo-li1024 seq, err := client.registerCall(call) if err != nil { call.Error = err call.done() return }

请问这个seq不是当前call的seq加1吗 这样的话 if err := client.cc.Write(&client.header, call.Args); err != nil { call := client.removeCall(seq) 这里remove的seq不应该是seq-1吗

registerCall seq返回的是call.Seq

niconical commented 2 years ago

@wilgx0

@yudidi 请问五个条件的出处是哪里呢?

不要纠结了 一起卖烧烤去

https://github.com/golang/go/blob/5c8ec89cb53025bc76b242b0d2410bf5060b697e/src/net/rpc/server.go#L13

pleasedance commented 1 year ago

客户端go client.receive() 服务端readRequest 如果保证client.Call("Foo.Sum", args, &reply)的时候肯定先到服务端而不是客户端 客户端同时也阻塞了呀

SharkLJ commented 1 year ago

请问这个并发能力体现在哪?

tonexue1 commented 1 year ago

@SharkLJ 请问这个并发能力体现在哪?

我觉得体现在send和receive,calls可以被client并发调用send,并且被receive后可以通知推出