func loadState(db dbm.DB) State {
stateBytes := db.Get(stateKey)
var state State
if len(stateBytes) != 0 {
err := json.Unmarshal(stateBytes, &state)
if err != nil {
panic(err)
}
}
state.db = db
return state
}
服务端处理请求及应答细节:
重点看接受连接的函数:
func (s *SocketServer) acceptConnectionsRoutine() {
for {
// 接受连接,下面这些日志就是命令行启动 kvstore 后看到的信息
s.Logger.Info("Waiting for new connection...")
conn, err := s.listener.Accept()
if err != nil {
if !s.IsRunning() {
return // Ignore error from listener closing.
}
s.Logger.Error("Failed to accept connection: " + err.Error())
continue
}
s.Logger.Info("Accepted a new connection")
// 可以接受多条连接并记录
connID := s.addConn(conn)
closeConn := make(chan error, 2) // Push to signal connection closed
responses := make(chan *types.Response, 1000) // A channel to buffer responses
// 从连接读取请求并处理
go s.handleRequests(closeConn, conn, responses)
// 从 'responses' 获取应答并写到连接中
go s.handleResponses(closeConn, conn, responses)
// 等待信号来关闭连接
go s.waitForClose(closeConn, connID)
}
}
先看处理请求的 handleRequests 函数:
func (s *SocketServer) handleRequests(closeConn chan error, conn net.Conn, responses chan<- *types.Response) {
var count int
var bufReader = bufio.NewReader(conn)
for {
var req = &types.Request{}
// 从连接上读取请求消息,读取完毕或出错后要通知 waitForClose 协程来关闭连接
err := types.ReadMessage(bufReader, req)
if err != nil {
if err == io.EOF {
closeConn <- err
} else {
closeConn <- fmt.Errorf("Error reading message: %v", err.Error())
}
return
}
s.appMtx.Lock()
count++
// 处理请求时要加锁,这个函数会根据请求的类型调用具体函数来处理,
// 比如 types.Request_DeliverTx 类型时就会调用 KVStoreApplication.DeliverTx 函数来处理,
// 应答会写入 responses 通道,以便 handleResponses 函数处理
s.handleRequest(req, responses)
s.appMtx.Unlock()
}
}
现在看 handleResponses 函数:
func (s *SocketServer) handleResponses(closeConn chan error, conn net.Conn, responses <-chan *types.Response) {
var count int
var bufWriter = bufio.NewWriter(conn)
for {
// 从 responses 通道读取应答并写入连接。同样,出错时要通知 waitForClose 来关闭连接
var res = <-responses
err := types.WriteMessage(res, bufWriter)
if err != nil {
closeConn <- fmt.Errorf("Error writing message: %v", err.Error())
return
}
// flush 类型的应答是哪里来的?
// 与客户端处理类似,如果是此类型要进行 Flush 处理,把缓冲的数据写入连接
if _, ok := res.Value.(*types.Response_Flush); ok {
err = bufWriter.Flush()
if err != nil {
closeConn <- fmt.Errorf("Error flushing write buffer: %v", err.Error())
return
}
}
count++
}
}
Tendermint abci 项目主页
这篇文章以 ABCI 示例
KVStore
应用及默认的socket
连接为例说明 ABCI 应用的启动及 abci-cli 客户端与其交互的过程,以加深开发 ABCI 应用的模式及源码组织方式的理解。整体流程说明
abci-cli kvstore
启动应用后,它会在 46658 端口等待客户端的 TCP 连接。echo
、info
和deliverTx
等子命令。执行这些命令会建立一条与 ABCI 应用服务端的 TCP 连接,并将子命令后面的参数当作请求消息发送给应用服务端进行处理(这里的 ABCI 应用与 Tendermint 节点绑定在一起)。构建命令过程
程序入口在
abci/cmd/abci-cli/main.go
的Execute
函数。这里面做事情有:
构建
RootCmd
命令,即abci-cli
命令及各子命令。注册全局 Flags,主要包括:
flagAddress
,默认为:tcp://0.0.0.0:46658
flagAbci
,默认为:socket
flagLogLevel
,默认为:debug
添加 ABCI 应用
kvstore
和dummy
以及echoCmd
、infoCmd
、deliverTxCmd
和commitCmd
等客户端命令。RootCmd、kvstore 命令的实现及启动
RootCmd 主命令逻辑
所有子命令都要添加到
RootCmd
主命令下面。执行
abci-cli
命令只会列出其使用文档,在执行具体子命令时才会执行其定义的相应应用逻辑。这里只需看这段代码:
abci-cli 客户端
在命令行执行
abci-cli echo hello
,会与 ABCI 应用服务端建立一条 TCP 连接并将 “abc” 发送到服务端进行处理,收到应后断开连接。但这样比较麻烦,可以使用abci-cli console
命令可以在交互式命令行中与应用服务端交互。通过调用
abcicli.NewClient
函数来创建客户端。返回的是接口
abci/client/Client
,这个接口有socket
、grpc
和local
三种实现,但 flag 只可以指定前两种,默认为socket
。abci/client/Client
接口继承了tmlibs/common/service/Service
接口,可以启动、停止和重置。客户端和服务端都需要这些功能,使用时可以通过把BaseService
作为自定义结构的匿名字段来实现。首先看
socketClient
函数的结构,它实现了abci/client/Client
接口,由于cmn.BaseService
结构是它的匿名字段,也间接实现了tmlibs/common/service/Service
接口:创建 socketClient 的函数:
在命令行执行
abci-cli kvstore
命令时会执行client.Start()
启动服务,这里用默认的socket
连接及kvstore
应用举例说明。这里执行的是 socketClient 结构中匿名字段 cmn.BaseService 的方法:以上就是客户端的启动过程。
ABCI 应用服务端
现在看一下执行
abci-cli kvstore
命令都做了什么。执行此命令时,实际执行的是
cmdKVStore
函数,启动了应用服务端,在tcp://0.0.0.0:46658
监听连接。启动服务端:
NewSocketServer
创建服务端:执行
srv.Start()
函数时,实际执行的是SocketServer
的实现,通过BaseService
结构的Start
方法调用:至此已经把 ABCI 应用服务端是如何启动的说明了,下面的部分会详细说明请求及应答处理的细节。
请求及应答处理
这部分以
deliver_tx
命令为例来进行说明。abci-cli 客户端
为了方便,这里再看一下
socketClient
的数据结构:OnStart 函数所做的就是与服务端建立连接,启动两个协程来处理请求与应答。
先看处理请求的函数
sendRequestsRoutine
:现在看处理应答的
recvResponseRoutine
函数:ABCI 应用服务端
创建应用细节:
先看应用的数据结构
KVStoreApplication
:创建应用:
主要看
loadState
函数,它根据键stateKey
从内存存储MemDB
结构中获取对应状态,因为是初始化,肯定没有对应值,返回的是一个带有新建MemDB
(就是一个带锁的 map)的State
:服务端处理请求及应答细节:
重点看接受连接的函数:
先看处理请求的 handleRequests 函数:
现在看 handleResponses 函数:
deliver_tx 子命令执行过程
现在以此命令为例说明发起请求及收到应答的整体过程。
abci-cli console
进入交互模式,创建客户端并与服务端建立了持久连接。服务端和客户端各启动两个协程,一个处理请求,一个处理应答。deliver_tx "abc"
,客户端会识别子命令,调用cmdDeliverTx
函数,此函数在解析到 tx 后进行编码,随后会调用cli.DeliverTxSync(txBytes)
(同步的) 函数。DeliverTx
函数进行处理。