当前位置:首页区块链hyperledger fabric 结构分析(三)

hyperledger fabric 结构分析(三)

本文解决的问题是:本Peer节点如何接收其他节点的数据,接到数据如何处理?

之前两节的分析是命令来源是CLI client,如何连接到Deops服务器、如何发送给Consensus模块、如何发送给ChainCodeSupportClient等。

接下来分析本文要讨论的问题

1)在进行网络初始化的过程中执行以下内容,在创建节点Engine过程中该节点作为客户端的身份连接到其他Peer

[cpp] 代码
peerSerer, err = peer.NewPeerWithEngine(secHelperFunc, helper.GetEngine)


[cpp] 代码

func (p *Impl) chatWithPeer(address string) error { 


peerLogger.Debugf(“Initiating Chat with peer address: %s”, address)
conn, err := NewPeerClientConnectionWithAddress(address)
if err != nil {
peerLogger.Errorf(“Error creating connection to peer address %s: %s”, address, err)
return err
}
sererClient := pb.NewPeerClient(conn)
ctx := context.Background()
stream, err := sererClient.Chat(ctx)
if err != nil {
peerLogger.Errorf(“Error establishing chat with peer address %s: %s”, address, err)
return err
}
peerLogger.Debugf(“Established Chat with peer address: %s”, address)
err = p.handleChat(ctx, stream, true)
stream.CloseSend()
if err != nil {
peerLogger.Errorf(“Ending Chat with peer address %s due to error: %s”, address, err)
return err
}
return nil
}

2.在handleChat执行过程中,建立消息循环,而这里的handler.HandleMessage。这个handler之前介绍过,是Engine的消息响应句柄,该消息响应处理来自于Consensus模块

[cpp] 代码
func (p *Impl) handleChat(ctx context.Context, stream ChatStream, initiatedStream bool) error {
deadline, ok := ctx.Deadline()
peerLogger.Debugf(“Current context deadline = %s, ok = %”, deadline, ok)
handler, err := p.handlerFactory(p, stream, initiatedStream, nil)
if err != nil {
return fmt.Errorf(“Error creating handler during handleChat initiation: %s”, err)
}
DeFier handler.Stop()
for {
in, err := stream.Rec()
if err == io.EOF {
peerLogger.Debug(“Receied EOF, ending Chat”)
return nil
}
if err != nil {
e := fmt.Errorf(“Error during Chat, stopping handler: %s”, err)
peerLogger.Error(e.Error())
return e
}
err = handler.HandleMessage(in)
if err != nil {
peerLogger.Errorf(“Error handling message: %s”, err)
//return err
}
}
}

3.HandleMessage函数consenterChan 这个channel比较重要,该写入操作会触发engine.consensusFan的消息循环

[cpp] 代码
func (handler *ConsensusHandler) HandleMessage(msg *pb.Message) error {
if msg.Type == pb.Message_CONSENSUS {
senderPE, _ := handler.To()
select {
case handler.consenterChan lt;- AMPLutil.Message{
Msg: msg,
Sender: senderPE.ID,
}:
return nil
DeFiault:
err := fmt.Errorf(“Message channel for % full, rejecting”, senderPE.ID)
logger.Errorf(“Failed to queue consensus message because: %”, err)
return err
}
}

if logger.IsEnabledFor(logging.DEBUG) {
logger.Debugf(“Did not handle message of type %s, passing on to next MessageHandler”, msg.Type)
}
return handler.MessageHandler.HandleMessage(msg)
}

4.看到RecMsg这个函数是不是有点眼熟,这个操作和 hyperledger fabric 结构分析 **一个流程是一样的。

[cpp] 代码
func GetEngine(coord peer.MessageHandlerCoordinator) (peer.Engine, error) {
ar err error
engineOnce.Do(func() {
engine = new(EngineImpl)
engine.helper = NewHelper(coord)
engine.consenter = controller.NewConsenter(engine.helper)
engine.helper.setConsenter(engine.consenter)
engine.peerEndpoint, err = coord.GetPeerEndpoint()
engine.consensusFan = util.NewMessageFan()

go func() {
logger.Debug(“Starting up message thread for consenter”)

// The channel neer closes, so this should neer break
for msg := range engine.consensusFan.GetOutChannel() {
engine.consenter.RecMsg(msg.Msg, msg.Sender)
}
}()
})
return engine, err
}

5.再往下的流程与 hyperledger fabric 结构分析(二)中的一致。

温馨提示:

文章标题:hyperledger fabric 结构分析(三)

文章链接:https://www.btchangqing.cn/2764.html

更新时间:2020年12月02日

本站大部分内容均收集于网络,若内容若侵犯到您的权益,请联系我们,我们将第一时间处理。

区块链

区块链核心技术:委任权益证明算法DPoS

2020-4-6 17:24:10

区块链

IBM HyperLedger fabric 详解

2020-4-6 17:25:57

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索