Tendermint 节点启动源码分析

发布于 2020-11-01 10:15:02 字数 32494 浏览 1243 评论 0

本文以官方示例公链 Basecoin 的 basecoind start 命令为入口,结合日志与源码分析 Tendermint 节点创建、启动及生成区块等过程。

文中代码细节较多,但限于篇幅没有太深入某些细节,比如共识过程。如只想快速了解整体过程,可只阅读有 basecoind start 日志部分的内容。

start 命令入口

执行 basecoind init 命令初始化 genesis 配置、priv-validator 文件及 p2p-node 文件后,在命令行执行 basecoind start 启动节点。在不带有任何参数时,Tendermint 会与 ABCI 应用一起执行:

日志Starting ABCI with Tendermint module=main

func(cmd *cobra.Command, args []string) error {
	if !viper.GetBool(flagWithTendermint) {
		ctx.Logger.Info("Starting ABCI without Tendermint")
		return startStandAlone(ctx, appCreator)
	}
	ctx.Logger.Info("Starting ABCI with Tendermint")
	return startInProcess(ctx, appCreator)
}

创建节点

这一节把 NewNode()

创建三条连接的客户端

startInProcess 会创建 Tendermint 节点,然后启动节点。

创建节点时 Tendermint 与 ABCI 应用会创建建立三条连接所需的客户端:即 querymempool 以及 consensus 连接。

**注意:**所有要启动服务都是通过 BaseService.Start 来执行的。

日志Starting multiAppConn module=proxy impl=multiAppConn

NewNode() 中相关代码:

proxyApp := proxy.NewAppConns(clientCreator, handshaker)

// 实际执行的是 multiAppConn.multiAppConn()
if err := proxyApp.Start(); err != nil {
		return nil, fmt.Errorf("Error starting proxy app connections: %v", err)
	}

三条连接的客户端的建立在 multiAppConn.OnStart 方法中完成,在这里三条连接的客户端都是 localClient 结构,由于 localClient 没有实现 OnStart 方法,它们的 Start 方法调用的都是 cmn.BaseServiceOnStart 方法,也就是 querycli.Start() (另两个也一样)除了打印日志外,什么都不做。

接下来分别把 localClient 封装成了 appConnQueryappConnMempoolappConnConsensus 结构。

日志Starting localClient module=abci-client connection=query impl=localClient

multiAppConn.OnStart() 中相关代码:

// 在创建节点时传入的 clientCreator 的具体实现是 localClientCreator 结构,
// 这里最终返回的是 localClient 结构
querycli, err := app.clientCreator.NewABCIClient()

// 执行的是 cmn.BaseService 的 OnStart 方法,啥都没做
if err := querycli.Start(); err != nil {
		return errors.Wrap(err, "Error starting ABCI client (query connection)")
	}

其它两个连接的客户端与 query 连接客户端的创建相同。

握手同步

在建立完毕以上三条连接的客户端后,会执行 app.handshaker.Handshake(app) 来握手,确保 Tendermint 节点与应用程序的状态是同步的。

日志

ABCI Handshake                     module=consensus appHeight=169313 appHash=DC1ED303D0D1EE403CC010B911D7A991E3EAE7E3

ABCI Replay Blocks                  module=consensus appHeight=169313 storeHeight=169313 stateHeight=169313

Completed ABCI Handshake - Tendermint and App are synced module=consensus appHeight=169313 appHash=DC1ED303D0D1EE403CC010B911D7A991E3EAE7E3

query 连接上通过 ABCI Info 查询 ABCI 应用的 blockstore 中的最新状态,然后将 Tendermint 节点的区块重放到此状态:

multiAppConn.OnStart() 中相关代码:

if app.handshaker != nil {
  return app.handshaker.Handshake(app)
}

Handshaker.Handshake() 中相关代码:

// 从 ABCI 应用的 blockstore 中获取最新状态
res, err := proxyApp.Query().InfoSync(abci.RequestInfo{version.Version})

// 重放所有区块到最新状态
_, err = h.ReplayBlocks(h.initialState, appHash, blockHeight, proxyApp)

重放完毕后,Tendermint 节点已经和 ABCI 应用同步到了相同的区块高度。

快速同步设置及验证人节点确认

在进入代码细节之前,先了解一下快速同步的概念。

快速同步(FastSync):在当前节点落后于区块链的最新状态时,需要进行节点间同步,快速同步只下载区块并检查验证人的默克尔树,比运行实时一致性八卦协议快得多。一旦追上其它节点的状态,守护进程将切换出快速同步并进入正常共识模式。在运行一段时间后,如果此节点至少有一个 peer,并且其区块高度至少与最大的报告的 peer 高度一样高,则认为该节点已追上区块链最新状态(即 caught up)。

现在回到 NewNode() 源码,当节点同步到 ABCI 应用的最新状态后,会检查当前状态的验证人集合中是否只有当前节点一个验证人,如果是,则无需快速同步。

// 重新从数据库加载状态,因为可能在握手时有更新
state = sm.LoadState(stateDB)

// Decide whether to fast-sync or not
// We don't fast-sync when the only validator is us.

// 此字段用来指定当此节点在区块链末端有许多区块需要同步时,是否启用快速同步功能,默认为 true
fastSync := config.FastSync
if state.Validators.Size() == 1 {
  addr, _ := state.Validators.GetByIndex(0)
  if bytes.Equal(privValidator.GetAddress(), addr) {
    fastSync = false
  }
}

日志

This node is a validator              module=consensus addr=FF2AF6957DCD4B2FA8373D2157DE67278C5F0E41 pubKey=PubKeyEd25519{476CA31AE9AFB1FDC2BE1A00C32796FE5EEDBE3BD4C3C05CD1A76A4E975FB975}

NewNode() 中相关代码,显示当前节点是否是验证人:

// Log whether this node is a validator or an observer
if state.Validators.HasAddress(privValidator.GetAddress()) {
  consensusLogger.Info("This node is a validator", "addr", privValidator.GetAddress(), "pubKey", privValidator.GetPubKey())
} else {
  consensusLogger.Info("This node is not a validator", "addr", privValidator.GetAddress(), "pubKey", privValidator.GetPubKey())
}

创建各种 Reactor

Reactor 是处理各类传入消息的结构,一共有 5 种类型,通过将其添加到 Switch 中来实现。先熟悉一下 Switch 的结构:

Switch 处理 peer 连接,并暴露一个 API 以在各类 Reactor 上接收传入的消息。每个 Reactor 负责处理一个或多个 “Channels” 的传入消息。因此,发送传出消息通常在 peer 执行,传入的消息在 Reactor 上接收。

type Switch struct {
  cmn.BaseService

  config     *config.P2PConfig
  listeners   []Listener
  // 添加的 Reactor 都存在这里
  reactors    map[string]Reactor
  chDescs    []*conn.ChannelDescriptor
  reactorsByCh map[byte]Reactor
  peers      *PeerSet
  dialing    *cmn.CMap
  reconnecting *cmn.CMap
  nodeInfo    NodeInfo // our node info
  nodeKey    *NodeKey // our node privkey
  addrBook    AddrBook

  filterConnByAddr func(net.Addr) error
  filterConnByID  func(ID) error

  rng *cmn.Rand // seed for randomizing dial times and orders
}

MempoolReactor

Mempool 是一个有序的内存池,交易在被共识提议之前会存储在这里,而在存储到这里之前会通过 ABCI 应用的 CheckTx 方法检查其合法性。

先看 NewNode() 中相关代码:

mempoolLogger := logger.With("module", "mempool")
// 创建 Mempool
mempool := mempl.NewMempool(config.Mempool, proxyApp.Mempool(), state.LastBlockHeight)

// 初始化 Mempool 的  write-ahead log(确保可以从任何形式的崩溃中恢复过来)
mempool.InitWAL() // no need to have the mempool wal during tests
mempool.SetLogger(mempoolLogger)

// 创建 MempoolReactor
mempoolReactor := mempl.NewMempoolReactor(config.Mempool, mempool)
mempoolReactor.SetLogger(mempoolLogger)

// 这里根据配置,判断是否要等待有交易时才生成新区块
if config.Consensus.WaitForTxs() {
  mempool.EnableTxsAvailable()
}

MempoolReactor 用来在 peer 之间对 mempool 交易进行广播。看一下它的数据结构:

type MempoolReactor struct {
  p2p.BaseReactor
  config  *cfg.MempoolConfig
  Mempool *Mempool
}

func NewMempoolReactor(config *cfg.MempoolConfig, mempool *Mempool) *MempoolReactor {
	memR := &MempoolReactor{
		config:  config,
		Mempool: mempool,
	}
	memR.BaseReactor = *p2p.NewBaseReactor("MempoolReactor", memR)
	return memR
}

EvidenceReactor

Evidence 是一个接口,表示验证人的任何可证明的恶意活动,主要有 DuplicateVoteEvidence (包含验证人签署两个相互矛盾的投票的证据。)这种实现。

NewNode() 中相关代码:

evidenceDB, err := dbProvider(&DBContext{"evidence", config})
if err != nil {
  return nil, err
}
evidenceLogger := logger.With("module", "evidence")

// EvidenceStore 用来存储见过的所有 Evidence,包括已提交的、已经过验证但没有广播的以及已经广播但未提交的
evidenceStore := evidence.NewEvidenceStore(evidenceDB)
// EvidencePool 在 EvidenceStore 中维护一组有效的 Evidence
evidencePool := evidence.NewEvidencePool(stateDB, evidenceStore)
evidencePool.SetLogger(evidenceLogger)

// 创建 EvidenceReactor
evidenceReactor := evidence.NewEvidenceReactor(evidencePool)
evidenceReactor.SetLogger(evidenceLogger)

EvidenceReactor 用来在 peer 间对 EvidencePoolEvidence 进行广播。

type EvidenceReactor struct {
  p2p.BaseReactor
  evpool  *EvidencePool
   
  eventBus *types.EventBus
}

BlockchainReactor

NewNode() 中相关代码:

blockExecLogger := logger.With("module", "state")

// BlockExecutor 用来处理区块执行和状态更新。
// 它暴露一个 ApplyBlock() 方法,用来验证并执行区块、更新状态和 ABCI 应答,然后以原子方式提交
// 并更新 mempool,最后保存状态。
blockExec := sm.NewBlockExecutor(stateDB, blockExecLogger, proxyApp.Consensus(), mempool, evidencePool)

// 创建 BlockchainReactor
bcReactor := bc.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
bcReactor.SetLogger(logger.With("module", "blockchain"))

BlockchainReactor 用来处理长期的 catchup 同步。

type BlockchainReactor struct {
	p2p.BaseReactor

	// immutable
	initialState sm.State

	blockExec *sm.BlockExecutor
   
   // 区块的底层存储。主要存储三种类型的信息:BlockMeta、Block part 和 Commit
	store    *BlockStore
   
   // 当加入到 BlockPool 时,peer 自己报告它们的高度。
   // 从当前节点最新的 pool.height 开始,从报告的高于我们高度的 peer 顺序请求区块。
   // 节点经常问 peer 他们当前的高度,这样我们就可以继续前进。
   // 不断请求更高的区块直到到达限制。如果大多数请求没有可用的 peer,并且没有处在 peer 限制,可以
   // 切换到 consensus reactor
	pool    *BlockPool
	fastSync  bool

	requestsCh <-chan BlockRequest
	errorsCh  <-chan peerError
}

ConsensusReactor

NewNode() 中相关代码:

ConsensusState 用来处理共识算法的执行。它处理投票和提案,一旦达成一致,将区块提交给区块链并针对 ABCI 应用执行它们。内部状态机从 peer、内部验证人和定时器接收输入。

// 创建 ConsensusState
consensusState := cs.NewConsensusState(config.Consensus, state.Copy(),
  blockExec, blockStore, mempool, evidencePool)
consensusState.SetLogger(consensusLogger)

if privValidator != nil {
   // 设置 PrivValidator 用来投票
  consensusState.SetPrivValidator(privValidator)
}

// 创建 ConsensusReactor
consensusReactor := cs.NewConsensusReactor(consensusState, fastSync)
consensusReactor.SetLogger(consensusLogger)

ConsensusReactor 用于共识服务。

type ConsensusReactor struct {
  p2p.BaseReactor // BaseService + p2p.Switch

  conS *ConsensusState

  mtx    sync.RWMutex
  fastSync bool
  eventBus *types.EventBus
}

把 reactor 添加到 Switch

NewNode() 中相关代码:

把上面创建的四个 Reactor 添加到 Switch 中。

p2pLogger := logger.With("module", "p2p")

sw := p2p.NewSwitch(config.P2P)
sw.SetLogger(p2pLogger)
sw.AddReactor("MEMPOOL", mempoolReactor)
sw.AddReactor("BLOCKCHAIN", bcReactor)
sw.AddReactor("CONSENSUS", consensusReactor)
sw.AddReactor("EVIDENCE", evidenceReactor)

PEXReactor(可选的)

看代码之前先了解几个概念:

  • seeds:启动节点时可通过 --p2p.seeds 标签来指定种子节点,可以从中获得许多其它 peer 的地址。
    tendermint node --p2p.seeds "f9baeaa15fedf5e1ef7448dd60f46c01f1a9e9c4@1.2.3.4:46656,0491d373a8e0fcf1023aaf18c51d6a1d0d4f31bd@5.6.7.8:46656"
  • persistent_peers:可指定与当前节点保持持久连接的一组节点。或使用 RPC 端点 /dial_peers 来指定而无需停止 Tendermint 实例。
    tendermint node --p2p.persistent_peers "429fcf25974313b95673f58d77eacdd434402665@10.11.12.13:46656,96663a3dd0d7b9d17d4c8211b191af259621c693@10.11.12.14:46656"
    curl 'localhost:46657/dial_peers?persistent=true&peers=\["429fcf25974313b95673f58d77eacdd434402665@10.11.12.13:46656","96663a3dd0d7b9d17d4c8211b191af259621c693@10.11.12.14:46656"\]'
  • PEX:peer-exchange 协议的缩写,默认是开启的,在第一次启动后通常不需要种子。peer 之间将传播已知的 peer 并形成一个网络。peer 地址存储在 addrbook 中。

如果 PEX 模式是打开的,它应该处理种子的拨号,否则将由 Switch 来处理。

NewNode() 中相关代码:

// 创建 addrBook
addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile()))

// 如果开启了 PEX 模式
if config.P2P.PexReactor {
  // 创建 PEX reactor
  pexReactor := pex.NewPEXReactor(addrBook,
    &pex.PEXReactorConfig{
      Seeds:       cmn.SplitAndTrim(config.P2P.Seeds, ",", " "),
      SeedMode:     config.P2P.SeedMode,
      PrivatePeerIDs: cmn.SplitAndTrim(config.P2P.PrivatePeerIDs, ",", " ")})
  pexReactor.SetLogger(p2pLogger)
  // 添加到 Switch
  sw.AddReactor("PEX", pexReactor)
}

sw.SetAddrBook(addrBook)

PEXReactor 处理 PEX 并保证足够数量的 peer 连接到 Switch。用 AddrBook 存储 peer 的 NetAddress。为防止滥用,只接受来自 peer 的 pexAddrsMsg,我们也发送了相应的 pexRequestMsg。每个 defaultEnsurePeersPeriod 时间段内只接收一个 pexRequestMsg

FilterPeers

配置中的 config.FilterPeers 字段用来指定当连接到一个新 peer 时是否要查询 ABCI 应用,由应用用来决定是否要保持连接。

使用 ABCI 查询通过 addr 或 pubkey 来过滤 peer,如果返回 OK 则添加 peer。

NewNode() 中相关代码:

if config.FilterPeers {
  // 设置两种类型的 Filter
  sw.SetAddrFilter(func(addr net.Addr) error {
    resQuery, err := proxyApp.Query().QuerySync(abci.RequestQuery{Path: cmn.Fmt("/p2p/filter/addr/%s", addr.String())})
    if err != nil {
      return err
    }
    if resQuery.IsErr() {
      return fmt.Errorf("Error querying abci app: %v", resQuery)
    }
    return nil
  })
  sw.SetIDFilter(func(id p2p.ID) error {
    resQuery, err := proxyApp.Query().QuerySync(abci.RequestQuery{Path: cmn.Fmt("/p2p/filter/pubkey/%s", id)})
    if err != nil {
      return err
    }
    if resQuery.IsErr() {
      return fmt.Errorf("Error querying abci app: %v", resQuery)
    }
    return nil
  })
}

设置 EventBus

EventBus 是一个通过此系统的所有事件的事件总线,所有的调用都代理到底层的 pubsub 服务器。所有事件都必须用 EventBus 来发布,以保证正确的数据类型。

type EventBus struct {
	cmn.BaseService
   
   // Server 允许 client 订阅/取消订阅消息,带或不带 tag 发布消息,并管理内部状态
	pubsub *tmpubsub.Server
}

NewNode() 中相关代码:

eventBus := types.NewEventBus()
eventBus.SetLogger(logger.With("module", "events"))

// 将要发布和/或订阅消息(事件)的服务
// consensusReactor 将在 consensusState 和 blockExecutor 上设置 eventBus
consensusReactor.SetEventBus(eventBus)

设置 TxIndexer

TxIndexer 是一个接口,定义了索引和搜索交易的方法。它有两种实现,一个是 kv.TxIndex,由键值存储支持(levelDB),另一个是 null.TxIndex,即不设置索引(默认的)。

IndexerService 会把 TxIndexerEventBus 连接在一起,以对来自 EventBus 的交易进行索引。

type IndexerService struct {
	cmn.BaseService

	idr    TxIndexer
	eventBus *types.EventBus
}

NewNode() 中相关代码:

var txIndexer txindex.TxIndexer
switch config.TxIndex.Indexer {
// 键值存储索引
case "kv":
  // 创建 DB
  store, err := dbProvider(&DBContext{"tx_index", config})
  if err != nil {
    return nil, err
  }
  // 有指定要索引的标签列表(以逗号分隔)
  if config.TxIndex.IndexTags != "" {
    txIndexer = kv.NewTxIndex(store, kv.IndexTags(cmn.SplitAndTrim(config.TxIndex.IndexTags, ",", " ")))
  // 要索引所有标签
  } else if config.TxIndex.IndexAllTags {
    txIndexer = kv.NewTxIndex(store, kv.IndexAllTags())
  // 不索引标签
  } else {
    txIndexer = kv.NewTxIndex(store)
  }
default:
  txIndexer = &null.TxIndex{}
}

// 创建 IndexerService
indexerService := txindex.NewIndexerService(txIndexer, eventBus)
indexerService.SetLogger(logger.With("module", "txindex"))

创建节点的 BaseService

节点所需的所有字段内容已在以上部分创建完毕,在创建 BaseService 后,就可以启动节点了。

node.BaseService = *cmn.NewBaseService(logger, "Node", node)

创建节点总结

总结一下 NewNode 函数都做了哪些事情:

  • 创建 Tendermint 与 ABCI 应用建立 mempoolconsensusquery 连接所需的客户端。
  • Tendermint 节点与应用程序执行握手,确保其状态是同步的。
  • 根据配置 config.FastSyncprivValidator.GetAddress() 方法,判断是否需要快速同步,是否是验证人节点。
  • 创建并在 Switch 中设置 Reactor,即 MempoolReactorEvidenceReactorBlockchainReactorConsensusReactor 以及 PEXReactor 这五种。用来在 peer 上接收不同类型的消息。
  • 根据配置 config.FilterPeers 判断是否要用 ABCI 查询通过 addr 或 pubkey 来过滤要新连接的 peer,如果返回 OK 则添加 peer。
  • 创建并设置 EventBus,订阅/发布事件。
  • 设置 TxIndexer,对交易进行索引。

启动节点

回到 startInProcess 函数,在这里创建完毕节点后,执行 Start 方法,实际执行的是节点的 OnStart 方法。

接下来就看这个方法:

// 日志:Starting Node   module=node impl=Node
func (n *Node) OnStart() error {
   // 日志:Starting EventBus   module=events impl=EventBus
	err := n.eventBus.Start()
	if err != nil {
		return err
	}

    // 监听 P2P 连接的端口
	// 日志:Local listener   module=p2p ip=:: port=46656
   // 日志:Could not perform UPNP discover   module=p2p err="write udp4 0.0.0.0:63731->239.255.255.250:1900: i/o timeout"
   // 日志:Starting DefaultListener   module=p2p impl=Listener(@169.254.237.47:46656)
	protocol, address := cmn.ProtocolAndAddress(n.config.P2P.ListenAddress)
	l := p2p.NewDefaultListener(protocol, address, n.config.P2P.SkipUPNP, n.Logger.With("module", "p2p"))
	n.sw.AddListener(l)

	// 生成节点的 PrivKey
   // 日志:P2P Node ID   module=node ID=3158bddb4e03bef94bf4a99543af5beab4733368 file=/Users/LLLeon/.basecoind/config/node_key.json
	nodeKey, err := p2p.LoadOrGenNodeKey(n.config.NodeKeyFile())
	if err != nil {
		return err
	}
	n.Logger.Info("P2P Node ID", "ID", nodeKey.ID(), "file", n.config.NodeKeyFile())

	nodeInfo := n.makeNodeInfo(nodeKey.ID())
	n.sw.SetNodeInfo(nodeInfo)
	n.sw.SetNodeKey(nodeKey)

	// 将自己添加到 addrbook 以防止连接自己
   // 日志: Add our address to book   module=p2p book=/Users/LLLeon/.basecoind/config/addrbook.json addr=3158bddb4e03bef94bf4a99543af5beab4733368@169.254.237.47:46656
	n.addrBook.AddOurAddress(nodeInfo.NetAddress())
   
   // 在 P2P 服务器之前启动 RPC 服务器,所以可以例如:接收第一个区块的交易
   // 日志:Starting RPC HTTP server on tcp://0.0.0.0:46657 module=rpc-server
	if n.config.RPC.ListenAddress != "" {
		listeners, err := n.startRPC()
		if err != nil {
			return err
		}
		n.rpcListeners = listeners
	}

	// 启动 switch (P2P 服务器),函数内部依次启动了各 Reactor,
   // Switch 会在 46656 端口(默认的)监听 peer 的连接,连接后会通过 Reactor 的 Receive 方法
   // 接收来自 peer 的消息
   
   // 日志:Starting P2P Switch   module=p2p impl="P2P Switch"
   // 日志:Starting BlockchainReactor   module=blockchain impl=BlockchainReactor
   // 日志:Starting ConsensusReactor   module=consensus impl=ConsensusReactor
   // 日志:ConsensusReactor   module=consensus fastSync=false
   // 日志:Starting ConsensusState   module=consensus impl=ConsensusState
   // 日志:Starting baseWAL   module=consensus wal=/Users/LLLeon/.basecoind/data/cs.wal/wal impl=baseWAL
   // 日志:Catchup by replaying consensus messages   module=consensus height=169315
   // 日志:Replay: Done   module=consensus
   // 日志:Starting EvidenceReactor   module=evidence impl=EvidenceReactor
   // 日志:Starting PEXReactor   module=p2p impl=PEXReactor
   // 日志:Starting AddrBook   module=p2p book=/Users/LLLeon/.basecoind/config/addrbook.json impl=AddrBook
   // 日志:enterNewRound(169315/0). Current: 169315/0/RoundStepNewHeight module=consensus height=169315 round=0
   // 日志:enterPropose(169315/0). Current: 169315/0/RoundStepNewRound module=consensus height=169315 round=0
   // 日志:enterPropose: Our turn to propose        module=consensus height=169315 round=0 proposer=FF2AF6957DCD4B2FA8373D2157DE67278C5F0E41 privValidator="PrivValidator{FF2AF6957DCD4B2FA8373D2157DE67278C5F0E41 LH:169314, LR:0, LS:3}"
   // 日志:Starting MempoolReactor   module=mempool impl=MempoolReactor
	err = n.sw.Start()
	if err != nil {
		return err
	}

	// 始终连接到持久 peers
	if n.config.P2P.PersistentPeers != "" {
		err = n.sw.DialPeersAsync(n.addrBook, cmn.SplitAndTrim(n.config.P2P.PersistentPeers, ",", " "), true)
		if err != nil {
			return err
		}
	}

	// 启动交易的 indexer
   // 日志:Starting IndexerService   module=txindex impl=IndexerService
	return n.indexerService.Start()
}

至此,节点中各模块已经启动完毕,接下来进入 catchup、共识协议等核心处理逻辑。

Tendermint 核心处理逻辑

这部分包括区块的 catchup、共识协议处理等内容。

这部分逻辑是在 ConsensusReactor 启动时处理的。

日志

Starting ConsensusReactor              module=consensus impl=ConsensusReactor
ConsensusReactor                    module=consensus fastSync=false
Starting ConsensusState               module=consensus impl=ConsensusState
Starting baseWAL                    module=consensus wal=/Users/LLLeon/.basecoind/data/cs.wal/wal impl=baseWAL
Starting TimeoutTicker                module=consensus impl=TimeoutTicker
Catchup by replaying consensus messages    module=consensus height=169315
Replay: Done                      module=consensus

先看 ConsensusReactor 启动的代码:

func (conR *ConsensusReactor) OnStart() error {
  conR.Logger.Info("ConsensusReactor ", "fastSync", conR.FastSync())
  if err := conR.BaseReactor.OnStart(); err != nil {
    return err
  }

  // 订阅这几种类型的事件:EventNewRoundStep、EventVote 和 EventProposalHeartbeat,一旦接收到这些
  // 类型的消息就会用在 state 中定义的 pubsub 广播给 peer
  conR.subscribeToBroadcastEvents()

   // 由于只有我们一个节点,所以会设置 FastSync = false,会启动 ConsensusState
  if !conR.FastSync() {
    err := conR.conS.Start()
    if err != nil {
      return err
    }
  }

  return nil
}

现在看 ConsensusState 的启动:

func (cs *ConsensusState) OnStart() error {
	if err := cs.evsw.Start(); err != nil {
		return err
	}

	// we may set the WAL in testing before calling Start,
	// so only OpenWAL if its still the nilWAL
	if _, ok := cs.wal.(nilWAL); ok {
		walFile := cs.config.WalFile()
      
      // 这里会启动 baseWAL,它在处理消息之前会将其写入磁盘。可用于崩溃恢复和确定性重放
		wal, err := cs.OpenWAL(walFile)
		if err != nil {
			cs.Logger.Error("Error loading ConsensusState wal", "err", err.Error())
			return err
		}
		cs.wal = wal
	}

	// we need the timeoutRoutine for replay so
	// we don't block on the tick chan.
	// NOTE: we will get a build up of garbage go routines
	// firing on the tockChan until the receiveRoutine is started
	// to deal with them (by that point, at most one will be valid)
   
   // 用来对每一步的超时进行控制
   // 里面是在协程里面执行 timeoutRoutine 方法,内部会监听 timeoutTicker.tickChan,进行到下一步时
   // 会中止并重置旧 timer,并更新 timeoutInfo。tickChan 上超时时间为 0 时,会立即转发到 tockChan
   // 日志:Starting TimeoutTicker   module=consensus impl=TimeoutTicker
	if err := cs.timeoutTicker.Start(); err != nil {
		return err
	}

	// we may have lost some votes if the process crashed
	// reload from consensus log to catchup
   
   // 新建 ConsensusState 时已设置为 true
	if cs.doWALCatchup {
      // catchup:仅重放自上一个区块以来的那些消息
      // 日志:Catchup by replaying consensus messages    module=consensus height=169315
      // 日志:Replay: Done                      module=consensus
		if err := cs.catchupReplay(cs.Height); err != nil {
			cs.Logger.Error("Error on catchup replay. Proceeding to start ConsensusState anyway", "err", err.Error())
			// NOTE: if we ever do return an error here,
			// make sure to stop the timeoutTicker
		}
	}

   // 核心处理逻辑,主要的协程,详细解释见下面
	go cs.receiveRoutine(0)

	// schedule the first round!
	// use GetRoundState so we don't race the receiveRoutine for access
	cs.scheduleRound0(cs.GetRoundState())

	return nil
}

这个方法里面主要做了区块的 catchup、在协程中启动 receiveRoutine() 方法来接收并处理消息,以及执行 scheduleRound0() 方法来进入新回合,下面逐一说明。

catchup

在这次启动节点之前,已经运行过一段时间,区块最新高度为 169315。首先看节点在重启后是如何 catchup 到此高度的。

catchupReplay 源码:

func (cs *ConsensusState) catchupReplay(csHeight int64) error {

  // Set replayMode to true so we don't log signing errors.
  cs.replayMode = true
  defer func() { cs.replayMode = false }()

  // Ensure that #ENDHEIGHT for this height doesn't exist.
  // NOTE: This is just a sanity check. As far as we know things work fine
  // without it, and Handshake could reuse ConsensusState if it weren't for
  // this check (since we can crash after writing #ENDHEIGHT).
  //
  // Ignore data corruption errors since this is a sanity check.
  gr, found, err := cs.wal.SearchForEndHeight(csHeight, &WALSearchOptions{IgnoreDataCorruptionErrors: true})
  if err != nil {
    return err
  }
  if gr != nil {
    if err := gr.Close(); err != nil {
      return err
    }
  }
  if found {
    return fmt.Errorf("WAL should not contain #ENDHEIGHT %d", csHeight)
  }

  // Search for last height marker.
  //
  // Ignore data corruption errors in previous heights because we only care about last height
  gr, found, err = cs.wal.SearchForEndHeight(csHeight-1, &WALSearchOptions{IgnoreDataCorruptionErrors: true})
  if err == io.EOF {
    cs.Logger.Error("Replay: wal.group.Search returned EOF", "#ENDHEIGHT", csHeight-1)
  } else if err != nil {
    return err
  }
  if !found {
    return fmt.Errorf("Cannot replay height %d. WAL does not contain #ENDHEIGHT for %d", csHeight, csHeight-1)
  }
  defer gr.Close() // nolint: errcheck

  cs.Logger.Info("Catchup by replaying consensus messages", "height", csHeight)

  var msg *TimedWALMessage
  dec := WALDecoder{gr}

  for {
    msg, err = dec.Decode()
    if err == io.EOF {
      break
    } else if IsDataCorruptionError(err) {
      cs.Logger.Debug("data has been corrupted in last height of consensus WAL", "err", err, "height", csHeight)
      panic(fmt.Sprintf("data has been corrupted (%v) in last height %d of consensus WAL", err, csHeight))
    } else if err != nil {
      return err
    }

    // NOTE: since the priv key is set when the msgs are received
    // it will attempt to eg double sign but we can just ignore it
    // since the votes will be replayed and we'll get to the next step
    if err := cs.readReplayMessage(msg, nil); err != nil {
      return err
    }
  }
  cs.Logger.Info("Replay: Done")
  return nil
}

receiveRoutine

核心处理逻辑:需要重点看一下 cs.receiveRoutine() 方法。

这个方法处理可能引起状态转换的消息,参数 maxSteps 表示退出前要处理的消息数量,0 表示永不退出。它持有 RoundState 并且是唯一更新它的地方。更新发生在超时、完成提案和 2/3 多数时。 ConsensusState 在任意内部状态更新之前必须是锁定的。

这个方法在单独的协程里面,接收并处理来自 peer、节点内部或超时消息。

func (cs *ConsensusState) receiveRoutine(maxSteps int) {
  defer func() {
    if r := recover(); r != nil {
      cs.Logger.Error("CONSENSUS FAILURE!!!", "err", r, "stack", string(debug.Stack()))
    }
  }()

  for {
    // 到达设定的 maxSteps 时退出
    if maxSteps > 0 {
      if cs.nSteps >= maxSteps {
        cs.Logger.Info("reached max steps. exiting receive routine")
        cs.nSteps = 0
        return
      }
    }
    rs := cs.RoundState
    var mi msgInfo

    select {
    // TxsAvailable 返回一个通道,每添加一个交易到 mempool 后会触发一次,
    // 并且只有在 mempool 中的交易可用时才会触发。
    // 如果没有调用 EnableTxsAvailable,返回的 channel 可能为 nil。
    case height := <-cs.mempool.TxsAvailable():
       
       // 这里会执行 enterPropose(),进入提议环节,完成后会执行 enterPrevote 进入 Prevote 环节
       // 日志:enterPropose(169315/0). Current: 169315/0/RoundStepNewRound module=consensus height=169315 round=0
       // 日志:enterPropose: Our turn to propose        module=consensus height=169315 round=0 proposer=FF2AF6957DCD4B2FA8373D2157DE67278C5F0E41 privValidator="PrivValidator{FF2AF6957DCD4B2FA8373D2157DE67278C5F0E41 LH:169314, LR:0, LS:3}"
      cs.handleTxsAvailable(height)
       
    // 接收到来自 peer 的消息
    case mi = <-cs.peerMsgQueue:
      cs.wal.Write(mi)
      // handles proposals, block parts, votes
      // may generate internal events (votes, complete proposals, 2/3 majorities)
      // 处理消息
      cs.handleMsg(mi)
       
    // 接收到来自内部的消息
    case mi = <-cs.internalMsgQueue:
      // 发送签名的消息前写入磁盘
      cs.wal.WriteSync(mi) // NOTE: fsync
      // handles proposals, block parts, votes
      cs.handleMsg(mi)
       
    // 接收到超时消息
    case ti := <-cs.timeoutTicker.Chan(): // tockChan:
      cs.wal.Write(ti)
      // if the timeout is relevant to the rs
      // go to the next step
      cs.handleTimeout(ti, rs)
    case <-cs.Quit():

      // NOTE: the internalMsgQueue may have signed messages from our
      // priv_val that haven't hit the WAL, but its ok because
      // priv_val tracks LastSig

      // close wal now that we're done writing to it
      cs.wal.Stop()

      close(cs.done)
      return
    }
  }
}

scheduleRound0

scheduleRound0() 方法会将当前 consensus H/R/S 封装成 timeoutInfo 发送给 tickChan,这就进入了 timeoutTicker.timeoutRoutine 的逻辑。

这里发送的信息第 0 回合,步骤为 RoundStepNewHeight

scheduleRound0 源码:

func (cs *ConsensusState) scheduleRound0(rs *cstypes.RoundState) {
	
	sleepDuration := rs.StartTime.Sub(time.Now())
	cs.scheduleTimeout(sleepDuration, rs.Height, 0, cstypes.RoundStepNewHeight)
}

timeoutTicker.timeoutRoutine 源码:

func (t *timeoutTicker) timeoutRoutine() {
  t.Logger.Debug("Starting timeout routine")
  var ti timeoutInfo
  for {
    select {
    case newti := <-t.tickChan:
      t.Logger.Debug("Received tick", "old_ti", ti, "new_ti", newti)

      // ignore tickers for old height/round/step
      if newti.Height < ti.Height {
        continue
      } else if newti.Height == ti.Height {
        if newti.Round < ti.Round {
          continue
        } else if newti.Round == ti.Round {
          if ti.Step > 0 && newti.Step <= ti.Step {
            continue
          }
        }
      }

      // stop the last timer
      t.stopTimer()

      // update timeoutInfo and reset timer
      // NOTE time.Timer allows duration to be non-positive
      ti = newti
      t.timer.Reset(ti.Duration)
      t.Logger.Debug("Scheduled timeout", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)
    case <-t.timer.C:
      t.Logger.Info("Timed out", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)
      // go routine here guarantees timeoutRoutine doesn't block.
      // Determinism comes from playback in the receiveRoutine.
      // We can eliminate it by merging the timeoutRoutine into receiveRoutine
      // and managing the timeouts ourselves with a millisecond ticker
      go func(toi timeoutInfo) { t.tockChan <- toi }(ti)
    case <-t.Quit():
      return
    }
  }
}

receiveRoutine 中,如果触发了超时,会执行 handleTimeout,进行状态转移相关操作:

func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs cstypes.RoundState) {
	cs.Logger.Debug("Received tock", "timeout", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)

	// timeouts must be for current height, round, step
	if ti.Height != rs.Height || ti.Round < rs.Round || (ti.Round == rs.Round && ti.Step < rs.Step) {
		cs.Logger.Debug("Ignoring tock because we're ahead", "height", rs.Height, "round", rs.Round, "step", rs.Step)
		return
	}

	// the timeout will now cause a state transition
	cs.mtx.Lock()
	defer cs.mtx.Unlock()

	switch ti.Step {
	case cstypes.RoundStepNewHeight:
		// NewRound event fired from enterNewRound.
		// XXX: should we fire timeout here (for timeout commit)?
		cs.enterNewRound(ti.Height, 0)
	case cstypes.RoundStepNewRound:
		cs.enterPropose(ti.Height, 0)
	case cstypes.RoundStepPropose:
		cs.eventBus.PublishEventTimeoutPropose(cs.RoundStateEvent())
		cs.enterPrevote(ti.Height, ti.Round)
	case cstypes.RoundStepPrevoteWait:
		cs.eventBus.PublishEventTimeoutWait(cs.RoundStateEvent())
		cs.enterPrecommit(ti.Height, ti.Round)
	case cstypes.RoundStepPrecommitWait:
		cs.eventBus.PublishEventTimeoutWait(cs.RoundStateEvent())
		cs.enterNewRound(ti.Height, ti.Round+1)
	default:
		panic(cmn.Fmt("Invalid timeout step: %v", ti.Step))
	}

}

共识协议处理

共识投票和生成区块等环节是在 receiveRoutine 中收到消息或超时后由 handleMsghandleTimeout 执行相应方法进行处理。

本文只提供一个阅读 Tendermint 源码的入口,其它核心逻辑,比如创建区块等逻辑,需要自己深入 consensus 包。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据

关于作者

JSmiles

生命进入颠沛而奔忙的本质状态,并将以不断告别和相遇的陈旧方式继续下去。

0 文章
0 评论
84960 人气
更多

推荐作者

lorenzathorton8

文章 0 评论 0

Zero

文章 0 评论 0

萧瑟寒风

文章 0 评论 0

mylayout

文章 0 评论 0

tkewei

文章 0 评论 0

17818769742

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文