FISCO-BCOS 源代码研读(一) —— 共识算法模块

一、版本

  • fisco-bcos:2.1.0
  • commit:cb68124d4fbf3df563a57dfff5f0c6eedc1419cc
  • branch:master

二、共识模块框架

FISCO BCOS实现了一套可扩展的共识框架,可插件化扩展不同的共识算法,目前支持 PBFT(Practical Byzantine Fault Tolerance)Raft(Replication and Fault Tolerant) 共识算法,共识模块框架如下图:

共识模块框架

Sealer线程

交易打包线程,负责从交易池取交易,并基于节点最高块打包交易,产生新区块,产生的新区块交给Engine线程处理,PBFT和Raft的交易打包线程分别为PBFTSealer和RaftSealer。

Engine线程

共识线程,负责从本地或通过网络接收新区块,并根据接收的共识消息包完成共识流程,最终将达成共识的新区块写入区块链(BlockChain),区块上链后,从交易池中删除已经上链的交易,PBFT和Raft的共识线程分别为PBFTEngine和RaftEngine。

三、依赖模块

  1. sync
  2. blockverifier
  3. p2p
  4. security

四、共识算法模块文件结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
.
├── CMakeLists.txt # cmake 构建配置
├── Common.h # 基础数据结构、异常处理
├── ConsensusEngineBase.cpp # 共识基础引擎的实现
├── ConsensusEngineBase.h # 共识基础引擎的头文件
├── ConsensusInterface.h # 共识接口/基类
├── Sealer.cpp # 共识打包提交实现
├── Sealer.h # 共识打包提交接口
├── pbft # PBFT共识模块
│ ├── Common.h
│ ├── PBFTEngine.cpp
│ ├── PBFTEngine.h
│ ├── PBFTMsgCache.h
│ ├── PBFTReqCache.cpp
│ ├── PBFTReqCache.h
│ ├── PBFTSealer.cpp
│ ├── PBFTSealer.h
│ └── TimeManager.h
└── raft # Raft共识模块
├── Common.h
├── RaftEngine.cpp
├── RaftEngine.h
├── RaftSealer.cpp
└── RaftSealer.h

ConsensusInterface

ConsensusInterface.h作为一个基类为最顶端类包,其他类继承该类的虚函数进行override。

Class ConsensusInterface主要包含了以下两种:

  1. 共识模块控制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 共识模块的开关
virtual void start() = 0;
virtual void stop() = 0;
// 共识模块状态
virtual const std::string consensusStatus() = 0;
// 协议ID,当handler注册为p2p模块时使用
virtual PROTOCOL_ID const& protocolId() const = 0;

/// update the context of PBFT after commit a block into the block-chain
virtual void reportBlock(dev::eth::Block const& block) = 0;
// 群组ID
virtual GROUP_ID groupId() const { return 0; };

// 一个块内最多交易数(默认1000)
virtual uint64_t maxBlockTransactions() { return 1000; }

// view/view change
virtual VIEWTYPE view() const { return 0; }
virtual VIEWTYPE toView() const { return 0; }
  1. 共识节点控制
1
2
3
4
5
6
7
8
// 共识节点列表
virtual h512s sealerList() const = 0;
virtual void appendSealer(h512 const& _sealer) = 0;

// 节点类型 共识节点/观察者节点
virtual NodeAccountType accountType() = 0;
virtual void setNodeAccountType(NodeAccountType const&) = 0;
virtual IDXTYPE nodeIdx() const = 0;

Common

Common.h文件主要定义了命名空间consensus的共识与节点的状态、异常处理,以及节点的类型NodeAccountType、打包成块结构体Sealing

1
2
3
4
5
6
7
struct Sealing
{
dev::eth::Block block;
/// hash set for filter fetched transactions
h256Hash m_transactionSet;
dev::blockverifier::ExecutiveContext::Ptr p_execContext;
};

ConsensusEngineBase

定义了ConsensusEngineBase类,该类继承了 Woker 类和 ConsensusInterface 类:

1. 构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// ConsensusEngineBase类构造函数
ConsensusEngineBase(std::shared_ptr<dev::p2p::P2PInterface> _service, // 指定p2p服务
std::shared_ptr<dev::txpool::TxPoolInterface> _txPool, // 指定交易池
std::shared_ptr<dev::blockchain::BlockChainInterface> _blockChain,// 指定区块链
std::shared_ptr<dev::sync::SyncInterface> _blockSync, // 区块同步机制
std::shared_ptr<dev::blockverifier::BlockVerifierInterface> _blockVerifier,
PROTOCOL_ID const& _protocolId, // 协议id
KeyPair const& _keyPair, // 密钥对
dev::h512s const& _sealerList = dev::h512s()) // 共识节点,默认为空
: Worker("ConsensusEngine", 0), // Worker类初始化,类似于监听
m_service(_service), // 对protected成员进行赋值
m_txPool(_txPool),
m_blockChain(_blockChain),
m_blockSync(_blockSync),
m_blockVerifier(_blockVerifier),
m_consensusBlockNumber(0),
m_protocolId(_protocolId),
m_keyPair(_keyPair),
m_sealerList(_sealerList)
{
assert(m_service && m_txPool && m_blockChain && m_blockSync && m_blockVerifier);
if (m_protocolId == 0)
BOOST_THROW_EXCEPTION(dev::eth::InvalidProtocolID()
<< errinfo_comment("Protocol id must be larger than 0"));
m_groupId = dev::eth::getGroupAndProtocol(m_protocolId).first;
std::sort(m_sealerList.begin(), m_sealerList.end()); // 对sealerList排序
}
// 析构函数
virtual ~ConsensusEngineBase() { stop(); }

2. 继承重写函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 共识节点操作
dev::h512s sealerList() const override;
void appendSealer(h512 const& _sealer) override;
const std::string consensusStatus() override;
PROTOCOL_ID const& protocolId() const override { return m_protocolId; }
GROUP_ID groupId() const override { return m_groupId; }

// 节点类型:共识节点/观察者节点
NodeAccountType accountType() override { return m_accountType; }
void setNodeAccountType(dev::consensus::NodeAccountType const& _accountType) override;

// 当前节点在sealerList所在位
IDXTYPE nodeIdx() const override { return m_idx; }
virtual void reportBlock(dev::eth::Block const&) override {}

void start() override;
void stop() override;

3. 其他成员函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 构造一个JSON object,内含共识的基本属性
void getBasicConsensusStatus(Json::Value& status_obj) const;
// 设定是否允许增加区块
void setAllowFutureBlocks(bool isAllowFutureBlocks);
// 最小的有效节点数量 m_nodeNum所有节点,m_f至少有效节点数
IDXTYPE minValidNodes() const { return m_nodeNum - m_f; }
// 更新sealer节点数量
virtual void resetConfig() { m_nodeNum = m_sealerList.size(); }
// 交易进入交易池
void dropHandledTransactions(dev::eth::Block const& block);
// 通过index获取Sealer
inline h512 getSealerByIndex(size_t const& index) const

// message/session/data -> request
inline bool decodeToRequests(T& req, std::shared_ptr<dev::p2p::P2PMessage> message,
std::shared_ptr<dev::p2p::P2PSession> session)
inline bool decodeToRequests(T& req, bytesConstRef data)

// 执行打包区块
dev::blockverifier::ExecutiveContext::Ptr executeBlock(dev::eth::Block& block);

// 检查block是否有效:区块交易数量、时间戳、区块高度、父区块存在
virtual void checkBlockValid(dev::eth::Block const& block);

// 更新共识节点/p2p监听节点list/区块包含最大交易数
virtual void updateConsensusNodeList();
virtual void updateNodeListInP2P();
virtual void updateMaxBlockTransactions();

private:
// 区块是否已经存在,使用hash查询
bool blockExists(h256 const& blockHash)

4. protect属性成员

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
  std::atomic<uint64_t> m_maxBlockTransactions = {1000};
/// 主要参数
std::shared_ptr<dev::p2p::P2PInterface> m_service; // p2p服务handler
std::shared_ptr<dev::txpool::TxPoolInterface> m_txPool; // 交易池handler
std::shared_ptr<dev::blockchain::BlockChainInterface> m_blockChain; // 区块链handler
std::shared_ptr<dev::sync::SyncInterface> m_blockSync; // 区块同步handler
std::shared_ptr<dev::blockverifier::BlockVerifierInterface> m_blockVerifier; // 区块验证
PROTOCOL_ID m_protocolId; // 定义用哪一种协议
KeyPair m_keyPair; // 密钥对(私钥/公钥/地址)
mutable SharedMutex m_sealerListMutex; // seal list 独占锁
dev::h512s m_sealerList; // seal list 共识节点列表

/// 其他参数
// 记录区块
std::atomic<int64_t> m_consensusBlockNumber = {0}; // 等待交易的区块
dev::eth::BlockHeader m_highestBlock; // 最高区块
// 关于节点
std::atomic<IDXTYPE> m_nodeNum = {0}; // 节点个数
std::atomic<IDXTYPE> m_f = {0}; // 至少正确的节点个数
GROUP_ID m_groupId; // 群组号,一般是从protocolID得出
std::atomic<NodeAccountType> m_accountType = {NodeAccountType::ObserverAccount};//该节点类型
std::atomic<IDXTYPE> m_idx = {0}; // 该节点id

// flags
bool m_allowFutureBlocks = true; // 是否允许添加新区块
bool m_startConsensusEngine = false; // 共识是否启动
bool m_omitEmptyBlock = true; // 是否删除空区块
std::atomic_bool m_cfgErr = {false}; // 是否有错误
/// node list record when P2P last update
std::string m_lastNodeList; // 记录上一次P2P通信的节点列表
std::atomic<IDXTYPE> m_connectedNode = {0}; // 已连接的节点ID

Sealer

定义了Sealer 类,继承 Worker ,开启shared_ptr支持。

1. 构造函数/析构函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 构造函数
Sealer(std::shared_ptr<dev::txpool::TxPoolInterface> _txPool, // 交易池模块
std::shared_ptr<dev::blockchain::BlockChainInterface> _blockChain, // 区块链模块
std::shared_ptr<dev::sync::SyncInterface> _blockSync) // 区块同步模块
: Worker("Sealer", 0), // Worker监听
m_txPool(_txPool),
m_blockSync(_blockSync),
m_blockChain(_blockChain),
m_consensusEngine(nullptr)
{
assert(m_txPool && m_blockSync && m_blockChain);
if (m_txPool->status().current > 0)
{
m_syncTxPool = true;
}
// 注册一个handler,一旦新的交易进入,引发调用
m_tqReady = m_txPool->onReady([=]() { this->onTransactionQueueReady(); });
m_blockSubmitted = m_blockChain->onReady([=](int64_t) { this->onBlockChanged(); });
}
virtual ~Sealer() noexcept { stop(); }

2. 成员函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// Sealer模块的开关
virtual void start();
virtual void stop();

// 回调函数 通知所有线程
virtual void onTransactionQueueReady();
virtual void onBlockChanged();

// 打包时添加额外的数据
void setExtraData(std::vector<bytes> const& _extra) { m_extraData = _extra; }
std::vector<bytes> const& extraData() const { return m_extraData; }

// 判断是否需要重置打包
virtual bool shouldResetSealing();
// 重新打包
inline void resetSealingBlock(h256Hash const& filter, bool resetNextLeader);
void resetSealingBlock(Sealing& sealing, h256Hash const& filter, bool resetNextLeader);
void resetBlock(dev::eth::Block& block, bool resetNextLeader = false);

// 返回共识接口
std::shared_ptr<dev::consensus::ConsensusInterface> const consensusEngine();

void reportNewBlock();
// 是打包还是继续等待
virtual bool shouldSeal();
virtual bool shouldWait(bool const& wait) const;

// 从交易池取交易
void loadTransactions(uint64_t const& transToFetch);
virtual bool checkTxsEnough(uint64_t maxTxsCanSeal)

// 以免下一个leader打包好了,最新的区块还没达成共识
virtual bool canHandleBlockForNextLeader() { return true; }
virtual bool reachBlockIntervalTime() { return false; }
virtual void handleBlock() {}
virtual bool shouldHandleBlock() { return true; }
virtual void doWork(bool wait);
void doWork() override { doWork(true); }
bool isBlockSyncing();

3. protect属性成员

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/// 主要参数
std::shared_ptr<dev::txpool::TxPoolInterface> m_txPool; // 交易池模块
std::shared_ptr<dev::sync::SyncInterface> m_blockSync; // 区块同步模块
std::shared_ptr<dev::blockchain::BlockChainInterface> m_blockChain; // 区块链模块
std::shared_ptr<dev::consensus::ConsensusInterface> m_consensusEngine;// 所属共识引擎

/// 其他参数
Sealing m_sealing; // 目前正在打包的块,包括头、交易、执行上下文
mutable SharedMutex x_sealing; // 打包独占锁
std::vector<bytes> m_extraData; // 额外的数据

// 同步相关
std::condition_variable m_signalled; // 通知其他所有线程的信号量
std::condition_variable m_blockSignalled; // 通知其他所有线程区块的信号量
Mutex x_signalled; // 信号独占锁
Mutex x_blocksignalled; // 信号独占锁
std::atomic<bool> m_syncTxPool = {false}; // 是否在调用 syncTransactionQueue
std::atomic<bool> m_syncBlock = {false}; // 是否有新的区块提交到区块链

bool m_remoteWorking = false; // 远程的worker是否有被重置
bool m_startConsensus = false; // 共识是否开启

Handler<> m_tqReady; // 消息列表就绪
Handler<int64_t> m_blockSubmitted; // 有区块提交

uint64_t m_maxBlockCanSeal = 1000; // 单个区块最高可打包多少个交易
mutable SharedMutex x_maxBlockCanSeal; // 独占锁

五、BCOS中PBFT模块是如何使用这些接口的

先看PBFT模块的文件结构,和共识模板相似,有common,有Engine,有Sealer。

1
2
3
4
5
6
7
8
9
10
pbft
├── Common.h
├── PBFTEngine.cpp
├── PBFTEngine.h # 共识引擎接口
├── PBFTMsgCache.h # 消息缓存
├── PBFTReqCache.cpp
├── PBFTReqCache.h # 请求缓存
├── PBFTSealer.cpp
├── PBFTSealer.h # 块打包
└── TimeManager.h # 时间管理模块,主要记录上次共识/签名的时间

PBFT共识主要包括Pre-prepare、Prepare和Commit三个阶段:

  • Pre-prepare:负责执行区块,产生签名包,并将签名包广播给所有共识节点;
  • Prepare:负责收集签名包,某节点收集满2*f+1的签名包后,表明自身达到可以提交区块的状态,开始广播Commit包;
  • Commit:负责收集Commit包,某节点收集满2*f+1的Commit包后,直接将本地缓存的最新区块提交到数据库。

Common

Common 定义了PBFT的几种消息形式:PBFTMsgPacketPBFTMsgPreapareReqSignReqCommitReqViewChangeReq

  • PBFTMsgPacket 定义了PBFT整个消息的属性,有发送节点的id、所在位、消息类型(prepare、commit、sign)、TTL、data、时间戳,并定义了消息的编码和解码,使用到了 libdevcoreRLP 类型;
  • PBFTMsg 是消息基础结构,定义了区块链高度、当前view、发送节点所在位,时间戳、区块哈希、签名等,并定义了消息的编码和解码,使用到了 libdevcoreRLP 类型;
  • PreapareReq 是在PBFTMsg基础上的扩展,并给出四种不同的构造方法:
    • 使用PBFTMsg模板,参数为keyPair、区块链高、当前view、节点所在位、区块哈希值;
    • 从另一个特定的PrepareReq进行构造,参数为另一个prepareReq、keyPair、当前view、节点所在位;
    • 从给定的区块、keypair、view、idx进行构造;
    • 从给定区块以及执行打包结果,更新PrepareReq;
  • SignReq是在PBFTMsg基础上的扩展, 定义了一种构造方法,参数为PrepareReq、keyPair、idx;
  • CommitReq是在PBFTMsg基础上的扩展,定义了一种构造方法,参数为PrepareReq、keyPair、idx;
  • ViewChangeReq是在PBFTMsg基础上的扩展,定义了一种构造方法,参数为 keyPair、区块链高、view、idx、区块哈希;

PBFTMsgCache

定义了消息缓存的结构PBFTMsgCache,都是以消息队列的形式实现,并实现了插入缓存的方式,类成员有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/// 四种消息的缓存
QueueSet<std::string> m_knownPrepare;
QueueSet<std::string> m_knownSign;
QueueSet<std::string> m_knownCommit;
QueueSet<std::string> m_knownViewChange;

// 缓存读写锁
mutable SharedMutex x_knownPrepare;
mutable SharedMutex x_knownSign;
mutable SharedMutex x_knownCommit;
mutable SharedMutex x_knownViewChange;

// 缓存最大值
static const unsigned c_knownPrepare = 1024;
static const unsigned c_knownSign = 1024;
static const unsigned c_knownCommit = 1024;
static const unsigned c_knownViewChange = 1024;

此外,定义了广播消息缓存类 PBFTBroadcastCache ,根据发送节点id的哈希存储收到的消息缓存(hashmap)。

PBFTReqCache

定义了请求缓存类型 PBFTReqCache ,有以下成员:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/// prepare请求缓存
PrepareReq m_prepareCache = PrepareReq();

/// raw prepare请求缓存
PrepareReq m_rawPrepareCache;

/// 签名请求signReq(maps between hash and sign requests) 缓存
std::unordered_map<h256, std::unordered_map<std::string, SignReq>> m_signCache;

/// cache for received-viewChange requests(maps between view and view change requests)
std::unordered_map<VIEWTYPE, unordered_map<IDXTYPE, ViewChangeReq>> m_recvViewChangeReq;

/// cache for commited requests(maps between hash and commited requests)
std::unordered_map<h256, std::unordered_map<std::string, CommitReq>> m_commitCache;

/// cache for prepare request need to be backup and saved
PrepareReq m_committedPrepareCache;

/// cache for the future prepare cache
/// key: block hash, value: the cached future prepeare
std::unordered_map<uint64_t, std::shared_ptr<PrepareReq>> m_futurePrepareCache;

此外还实现了对请求缓存进行增删改查的方法,以及判断是否要进入viewchange。

TimeManager

该模块主要作用是记录时间,设定timeout。

PBFTSealer

定义了PBFTSealer类,该类直接继承Sealer

在继承Sealer类的同时,增加了几个新的成员变量

1
2
3
4
5
6
7
8
std::shared_ptr<PBFTEngine> m_pbftEngine;		// 指向特定的PBFT共识引擎
uint64_t m_lastTimeoutTx = 0; // 能产生timeout的最小交易数
uint64_t m_maxNoTimeoutTx = 0; // 不产生timeout的最大交易数

int64_t m_timeoutCount = 0; // 记录有多少次timeout
uint64_t m_lastBlockNumber = 0; // 最后一个Block数
bool m_enableDynamicBlockSize = true; // 是否动态调整区块大小
float m_blockSizeIncreaseRatio = 0.5; // 区块大小增长率

1. 构造函数

构造函数调用基类Sealer 的构造函数,并对PBFTSealer指向的PBFTEngine进行初始化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 略去函数名和函数参数
{
// 初始化指定engine
m_consensusEngine = std::make_shared<PBFTEngine>(_service, _txPool, _blockChain, _blockSync, _blockVerifier, _protocolId, _baseDir, _key_pair, _sealerList);
m_pbftEngine = std::dynamic_pointer_cast<PBFTEngine>(m_consensusEngine);

/// 绑定回调函数,当timeout的时候重置block
m_pbftEngine->onViewChange(boost::bind(&PBFTSealer::resetBlockForViewChange, this));

/// 绑定回调函数,当下一个leader收到了prepare block时重置block
m_pbftEngine->onNotifyNextLeaderReset(
boost::bind(&PBFTSealer::resetBlockForNextLeader, this, _1));

/// set thread name for PBFTSealer
std::string threadName = "PBFTSeal-" + std::to_string(m_pbftEngine->groupId());
setName(threadName);
}

2. 重写继承成员函数

在这里对继承的Sealer成员函数进行了重写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void start() override{
// 判断是否使用区块大小动态调整
// 启用Engine
// 启用Sealer
}
void stop() override{
// 关闭Sealer
// 关闭Engine
}

bool shouldResetSealing() override{
// 只有leader需要重置sealing
}

/// 关键入口
void handleBlock() override{
// 1. 判断是否打包过多交易了,是的话重置sealing,通知其他线程
// 2. setBlock,填充区块头,计算默克尔树
// 3. 调用Engine的generatePrepare方法,该方法即为PBFT中Pre-Prepare阶段
}
bool shouldSeal() override{
// 该节点是否能产生块
}
bool shouldHandleBlock() override{
// 只有leader节点可以产生最新的区块
}
bool reachBlockIntervalTime() override;
bool canHandleBlockForNextLeader() override;

3. 其他重要成员函数

1
2
3
4
5
6
7
8
9
// 回调函数
void resetBlockForViewChange(){
/// view change时重置block
//
}

void resetBlockForNextLeader(dev::h256Hash const& filter){
/// 为下一个leader重置block
}

PBFTEngine

定义了PBFTEngine类,直接继承了共识模板 ConsensusEngineBase 类,增加了几个成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
protected:
std::atomic<VIEWTYPE> m_view = {0};
std::atomic<VIEWTYPE> m_toView = {0};
std::string m_baseDir;
std::atomic_bool m_leaderFailed = {false};
std::atomic_bool m_notifyNextLeaderSeal = {false};

// 备份msg到数据库
std::shared_ptr<dev::db::LevelDB> m_backupDB = nullptr;

/// static vars
static const std::string c_backupKeyCommitted;
static const std::string c_backupMsgDirName;
static const unsigned c_PopWaitSeconds = 5;

// 广播消息/请求缓存
std::shared_ptr<PBFTBroadcastCache> m_broadCastCache;
std::shared_ptr<PBFTReqCache> m_reqCache;
TimeManager m_timeManager;
PBFTMsgQueue m_msgQueue; // 消息队列
mutable Mutex m_mutex;

// 线程信号
std::condition_variable m_signalled;
Mutex x_signalled;

// 回调函数
std::function<void()> m_onViewChange = nullptr;
std::function<void(dev::h256Hash const& filter)> m_onNotifyNextLeaderReset = nullptr;
std::function<void(uint64_t const& sealingTxNumber)> m_onTimeout = nullptr;
std::function<void(
uint64_t const& blockNumber, uint64_t const& sealingTxNumber, unsigned const& changeCycle)>
m_onCommitBlock = nullptr;

/// for output time-out caused viewchange
/// m_fastViewChange is false: output viewchangeWarning to indicate PBFT consensus timeout
std::atomic_bool m_fastViewChange = {false};

uint8_t maxTTL = MAXTTL;

/// map between nodeIdx to view
mutable SharedMutex x_viewMap;
std::map<IDXTYPE, VIEWTYPE> m_viewMap;

std::atomic<uint64_t> m_sealingNumber = {0};

1. 构造函数

构造函数调用ConsensusEngineBase 的构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 略去函数名与参数部分
{
PBFTENGINE_LOG(INFO) << LOG_DESC("Register handler for PBFTEngine");
// p2p服务绑定回调函数 onRecvPBFTMessage
m_service->registerHandlerByProtoclID(
m_protocolId, boost::bind(&PBFTEngine::onRecvPBFTMessage, this, _1, _2, _3));
// 初始化广播/请求缓存
m_broadCastCache = std::make_shared<PBFTBroadcastCache>();
m_reqCache = std::make_shared<PBFTReqCache>();
/// set thread name for PBFTEngine
std::string threadName = "PBFT-" + std::to_string(m_groupId);
setName(threadName);
/// register checkSealerList to blockSync for check SealerList
// blockSync服务注册回调函数,在同步的时候进行验证
m_blockSync->registerConsensusVerifyHandler(boost::bind(&PBFTEngine::checkBlock, this, _1));
}

2. 其他重要函数

PBFTSealer模块中看,Sealer 基类重写了WorkerdoWork方法,并对shouldHandle的Block进行了handleBlock,在 handleBlock 函数中调用了Engine模块的generatePrepare方法,发送Prepare消息。该方法只有leader节点才能调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/// 生成一个PrepareReq消息块,广播出去
bool PBFTEngine::generatePrepare(Block const& block)
{
Guard l(m_mutex);
m_notifyNextLeaderSeal = false;
PrepareReq prepare_req(block, m_keyPair, m_view, nodeIdx());
bytes prepare_data;
prepare_req.encode(prepare_data); // 序列化成bytes


/// 广播Prepare请求
bool succ = broadcastMsg(PrepareReqPacket, prepare_req.uniqueKey(), ref(prepare_data));
if (succ)
{
// 如果是空块,view change
if (prepare_req.pBlock->getTransactionSize() == 0 && m_omitEmptyBlock)
{
m_leaderFailed = true;
changeViewForFastViewChange();
m_timeManager.m_changeCycle = 0;
return true;
}
// 非空块,leader处理Prepare消息
handlePrepareMsg(prepare_req);
}
/// reset the block according to broadcast result
PBFTENGINE_LOG(INFO) << LOG_DESC("generateLocalPrepare")
<< LOG_KV("hash", prepare_req.block_hash.abridged())
<< LOG_KV("H", prepare_req.height) << LOG_KV("nodeIdx", nodeIdx())
<< LOG_KV("myNode", m_keyPair.pub().abridged());
m_signalled.notify_all();
return succ;
}
持续监听消息队列

leader广播出Prepare消息之后,其他节点进行消息接收,通过PBFTEngine::workLoop 循环从消息队列m_msgQueue接收、处理消息,m_msgQueue 消息队列在PBFTEngine构造函数初始化的时候p2p服务绑定的回调函数PBFTEngine::onRecvPBFTMessage 进行push:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// workLoop是重写的Worker类的workLoop方法
void PBFTEngine::workLoop()
{
while (isWorking())
{
try
{
// 从m_msgQueue消息队列取
std::pair<bool, PBFTMsgPacket> ret = m_msgQueue.tryPop(c_PopWaitSeconds);
if (ret.first)
{
PBFTENGINE_LOG(TRACE)
<< LOG_DESC("workLoop: handleMsg")
<< LOG_KV("type", std::to_string(ret.second.packet_id))
<< LOG_KV("fromIdx", ret.second.node_idx) << LOG_KV("nodeIdx", nodeIdx())
<< LOG_KV("myNode", m_keyPair.pub().abridged());
/// 对接收到的消息进行处理
handleMsg(ret.second);
}
/// to avoid of cpu problem
else if (m_reqCache->futurePrepareCacheSize() == 0)
{
std::unique_lock<std::mutex> l(x_signalled);
m_signalled.wait_for(l, std::chrono::milliseconds(5));
}
// 只要node是sealer
if (nodeIdx() != MAXIDX)
{
// 检查是否timeout,如果timeout代表着leader宕机/被控制
// 需要view change
checkTimeout();
handleFutureBlock();
// 清除reqcache缓存
collectGarbage();
}
}
catch (std::exception& _e)
{
LOG(ERROR) << _e.what();
}
}
}
handleMsg判断请求类型

在以上workLoop可以看出,节点循环对消息使用handleMsg方法进行处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
void PBFTEngine::handleMsg(PBFTMsgPacket const& pbftMsg)
{
Guard l(m_mutex);
PBFTMsg pbft_msg;
std::string key;
bool succ = false;
switch (pbftMsg.packet_id)
{
case PrepareReqPacket: // Prepare消息
{
PrepareReq prepare_req;
succ = handlePrepareMsg(prepare_req, pbftMsg);
key = prepare_req.uniqueKey(); // 返回签名
pbft_msg = prepare_req;
break;
}
case SignReqPacket:
{
succ = handleSignMsg(req, pbftMsg);
// 其余同上
}
case CommitReqPacket:
{
succ = handleCommitMsg(req, pbftMsg);
// 其余同上
}
case ViewChangeReqPacket:
{
succ = handleViewChangeMsg(req, pbftMsg);
// 其余同上
}
default:
{
// 略:error
}
}

if (pbftMsg.ttl == 1){ return; }
bool height_flag = (pbft_msg.height > m_highestBlock.number()) ||
(m_highestBlock.number() - pbft_msg.height < 10);
if (succ && key.size() > 0 && height_flag) // 消息handle成功,且高度一致
{
std::unordered_set<h512> filter;
filter.insert(pbftMsg.node_id);
/// get the origin gen node id of the request
h512 gen_node_id = getSealerByIndex(pbft_msg.idx);
if (gen_node_id != h512())
{
filter.insert(gen_node_id);
}
// TTL-1,再次发送
unsigned current_ttl = pbftMsg.ttl - 1;
broadcastMsg(pbftMsg.packet_id, key, ref(pbftMsg.data), filter, current_ttl);
}
}

handleMsg执行流程图如下所示:

以下将分别讲述 handlePrepareMsghandleSignMsghandleCommitMsghandleViewChangeMsg四个方法:

处理Prepare请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
bool PBFTEngine::handlePrepareMsg(PrepareReq const& prepareReq, std::string const& endpoint)
{
Timer t;
std::ostringstream oss;
/// oss 记录log,省略
/// 检查Prepare Request是否有效
auto valid_ret = isValidPrepare(prepareReq, oss);
if (valid_ret == CheckResult::INVALID)
{
return false;
}
/// 更新当前view
updateViewMap(prepareReq.idx, prepareReq.view);

if (valid_ret == CheckResult::FUTURE) // 若是未来的prepare请求,为了时间顺序先不处理
{
return true;
}
/// 将当前prepareReq缓存到reqCache
m_reqCache->addRawPrepare(prepareReq);
// 执行块内请求
Sealing workingSealing;
try
{
execBlock(workingSealing, prepareReq, oss);
// old block (has already executed correctly by block sync)
if (workingSealing.p_execContext == nullptr &&
workingSealing.block.getTransactionSize() > 0)
{
return false;
}
}
catch (std::exception& e)
{
// 块执行错误,省略
return true;
}
/// 造成空块是否需要删除
if (needOmit(workingSealing))
{
changeViewForFastViewChange();
m_timeManager.m_changeCycle = 0;
return true;
}

/// 使用block执行结果更新请求,对请求进行签名,重新生成一个PrepareReq
PrepareReq sign_prepare(prepareReq, workingSealing, m_keyPair);
m_reqCache->addPrepareReq(sign_prepare);
// log省略

// 使用刚刚重新生成的PrepareReq,生成SignReq,广播出去,并添加到signCache
if (!broadcastSignReq(sign_prepare))
{
// 广播失败,log省略
}
// 从signCache检查已签名的signReq是否达到2/3,如果达到,可以进行commit
checkAndCommit();

return true;
}

handlePrepareMsg方法的处理流程如下图所示:

处理Sign请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
bool PBFTEngine::handleSignMsg(SignReq& sign_req, PBFTMsgPacket const& pbftMsg)
{
Timer t;
bool valid = decodeToRequests(sign_req, ref(pbftMsg.data));
if (!valid)
{
return false;
}
std::ostringstream oss;
// oss log 省略
// 判断是否是有效的SignReq
auto check_ret = isValidSignReq(sign_req, oss);
if (check_ret == CheckResult::INVALID){ return false; }
// 更新当前view
updateViewMap(sign_req.idx, sign_req.view);

if (check_ret == CheckResult::FUTURE){ return true; }
// 添加进SignCache
m_reqCache->addSignReq(sign_req);

// 检查是否达到2/3
checkAndCommit();
return true;
}

handleSignMsg 方法执行流程如下所示

处理Commit请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
bool PBFTEngine::handleCommitMsg(CommitReq& commit_req, PBFTMsgPacket const& pbftMsg)
{
Timer t;
bool valid = decodeToRequests(commit_req, ref(pbftMsg.data));
if (!valid)
{
return false;
}
std::ostringstream oss;
/// oss log 省略
// 判断是否是有效的commit方法
auto valid_ret = isValidCommitReq(commit_req, oss);
if (valid_ret == CheckResult::INVALID){ return false; }
/// 更新当前view
updateViewMap(commit_req.idx, commit_req.view);

if (valid_ret == CheckResult::FUTURE){ return true; }
// 添加进commitCache
m_reqCache->addCommitReq(commit_req);
// 检查是否达到2/3
checkAndSave();

return true;
}

handleCommitMsg方法处理流程如下图所示:

处理ViewChange请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
bool PBFTEngine::handleViewChangeMsg(ViewChangeReq& viewChange_req, PBFTMsgPacket const& pbftMsg)
{
bool valid = decodeToRequests(viewChange_req, ref(pbftMsg.data));
if (!valid){ return false; }
std::ostringstream oss;
/// oss log 省略
// 判断view change请求的有效性
valid = isValidViewChangeReq(viewChange_req, pbftMsg.node_idx, oss);
if (!valid){ return false; }
// 添加到viewchangeCache
m_reqCache->addViewChangeReq(viewChange_req);
// 如果change的就是下一个view
if (viewChange_req.view == m_toView)
{
// 检查是否达到2/3个viewchange
checkAndChangeView();
}
else
{
VIEWTYPE min_view = 0;
bool should_trigger = m_reqCache->canTriggerViewChange(
min_view, m_f, m_toView, m_highestBlock, m_consensusBlockNumber);
if (should_trigger)
{
m_toView = min_view - 1;
changeViewForFastViewChange();
}
}
return true;
}

handleViewChangeMsg 处理流程如下图所示:

checkAndCommit

checkAndCommit方法会在handlePrepareMsghandleSignMsg 两个方法中调用,用于确定是否达到2/3,是否可以进行下PREPARE阶段的消息传递。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void PBFTEngine::checkAndCommit()
{
size_t sign_size = m_reqCache->getSigCacheSize(m_reqCache->prepareCache().block_hash);
// 必须等于,否则会出现多次验证情况
if (sign_size == minValidNodes())
{
// log 省略
if (m_reqCache->prepareCache().view != m_view)
{
// view不相等,错误退出,log省略
return;
}
// 更新当前commitedPrepare消息
m_reqCache->updateCommittedPrepare();
// 备份commited消息到数据库 log省略
backupMsg(c_backupKeyCommitted, m_reqCache->committedPrepareCache());
// 广播Commit消息,在PBFT流程中,相当于广播PREPARE/COMMIT阶段消息
if (!broadcastCommitReq(m_reqCache->prepareCache()))
{
// log 广播失败
}
m_timeManager.m_lastSignTime = utcTime();
// 查看该消息是否到达2/3,是否可以进一步到达COMMIT结束阶段
checkAndSave();
}
}
checkAndSave

checkAndSave方法会在handleCommitMsg方法中调用,用于确定 SignReq 和 CommitReq 这两个请求都到达2/3,且view和高度height都正确,那么就可以加入到区块中,进行落盘。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
void PBFTEngine::checkAndSave()
{
auto start_commit_time = utcTime();
auto record_time = utcTime();
size_t sign_size = m_reqCache->getSigCacheSize(prepareCache().block_hash);
size_t commit_size = m_reqCache->getCommitCacheSize(prepareCache().block_hash);
//// signReq 和 commitReq 都满足 >= 2/3
if (sign_size >= minValidNodes() && commit_size >= minValidNodes())
{
/// 得到足够的票数 log 省略
if (m_reqCache->prepareCache().view != m_view)
{
// view是错误的,log错误返回
return;
}
/// add sign-list into the block header
if (m_reqCache->prepareCache().height > m_highestBlock.number())
{
/// Block block(m_reqCache->prepareCache().block);
std::shared_ptr<dev::eth::Block> p_block = m_reqCache->prepareCache().pBlock;

/// 生成一个签名表<签名人idx,签名>,放入p_block
m_reqCache->generateAndSetSigList(*p_block, minValidNodes());
auto genSig_time_cost = utcTime() - record_time;
record_time = utcTime();
/// 调用blockchain模块commit该块
CommitResult ret = m_blockChain->commitBlock((*p_block),
std::shared_ptr<ExecutiveContext>(m_reqCache->prepareCache().p_execContext));
auto commitBlock_time_cost = utcTime() - record_time;
record_time = utcTime();

/// 如果成功
if (ret == CommitResult::OK)
{
// 从交易池删除已经处理的交易
dropHandledTransactions(*p_block);
auto dropTxs_time_cost = utcTime() - record_time;
record_time = utcTime();
// 同步模块进行同步
m_blockSync->noteSealingBlockNumber(m_reqCache->prepareCache().height);
auto noteSealing_time_cost = utcTime() - record_time;
/// log 共识成功,省略
// 删除cache
m_reqCache->delCache(m_reqCache->prepareCache().block_hash);
m_reqCache->removeInvalidFutureCache(m_highestBlock);
}
else
{
/// log共识失败
/// 同步模块进行同步,继续进行sealing
m_blockSync->noteSealingBlockNumber(m_blockChain->number());
m_txPool->handleBadBlock(*p_block);
}
}
else
{
// log 该块已经存在,共识错误
}
}
}
checkAndChangeView

checkAndChangeView在共识过程中出现leader timeout checkTimeout或主动进行view change handleViewChangeMsg时进行调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void PBFTEngine::checkAndChangeView()
{
IDXTYPE count = m_reqCache->getViewChangeSize(m_toView);
// 判断是否收到2/3的viewChangeReq
if (count >= minValidNodes() - 1)
{
/// reach to consensue dure to fast view change
if (m_timeManager.m_lastSignTime == 0)
{
m_fastViewChange = false;
}
auto orgChangeCycle = m_timeManager.m_changeCycle;

/// 更快的节点有可能已经到了下一轮
if (m_toView > m_view + 1)
{
m_timeManager.m_changeCycle = 1;
PBFTENGINE_LOG(INFO) << LOG_DESC("checkAndChangeView, update m_changeCycle to 1");
}
/// log 达到共识
// view=toView, 清除cache
m_leaderFailed = false;
m_timeManager.m_lastConsensusTime = utcTime();
m_view = m_toView.load();
m_notifyNextLeaderSeal = false;
m_reqCache->triggerViewChange(m_view);
m_blockSync->noteSealingBlockNumber(m_blockChain->number());
}
}
Buy me coffee☕️.