level 7
兄弟连区块链教程Fabric1.0源代码分析流言算法Gossip服务端二
# Fabric 1.0源代码笔记 之 gossip(流言算法) #GossipServer(Gossip服务端)
### 5.2、commImpl结构体方法
```go
//conn.serviceConnection(),启动连接服务
func (c *commImpl) GossipStream(stream proto.Gossip_GossipStreamServer) error
//return &proto.Empty{}
func (c *commImpl) Ping(context.Context, *proto.Empty) (*proto.Empty, error)
func (c *commImpl) GetPKIid() common.PKIidType
//向指定节点发送消息
func (c *commImpl) Send(msg *proto.SignedGossipMessage, peers ...*RemotePeer)
//探测远程节点是否有响应,_, err = cl.Ping(context.Background(), &proto.Empty{})
func (c *commImpl) Probe(remotePeer *RemotePeer) error
//握手验证远程节点,_, err = cl.Ping(context.Background(), &proto.Empty{})
func (c *commImpl) Handshake(remotePeer *RemotePeer) (api.PeerIdentityType, error)
func (c *commImpl) Accept(acceptor common.MessageAcceptor) <-chan proto.ReceivedMessage
func (c *commImpl) PresumedDead() <-chan common.PKIidType
func (c *commImpl) CloseConn(peer *RemotePeer)
func (c *commImpl) Stop()
//创建并启动gRPC Server,以及注册GossipServer实例
func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity api.PeerIdentityType,
//将GossipServer实例注册至peerServer
func NewCommInstance(s *grpc.Server, cert *tls.Certificate, idStore identity.Mapper,
func extractRemoteAddress(stream stream) string
func readWithTimeout(stream interface{}, timeout time.Duration, address string) (*proto.SignedGossipMessage, error)
//创建gRPC Server,grpc.NewServer(serverOpts...)
func createGRPCLayer(port int) (*grpc.Server, net.Listener, api.PeerSecureDialOpts, []byte)
//创建与服务端连接
func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (*connection, error)
//向指定节点发送消息
func (c *commImpl) sendToEndpoint(peer *RemotePeer, msg *proto.SignedGossipMessage)
//return atomic.LoadInt32(&c.stopping) == int32(1)
func (c *commImpl) isStopping() bool
func (c *commImpl) emptySubscriptions()
func (c *commImpl) authenticateRemotePeer(stream stream) (*proto.ConnectionInfo, error)
func (c *commImpl) disconnect(pkiID common.PKIidType)
func (c *commImpl) createConnectionMsg(pkiID common.PKIidType, certHash []byte, cert api.PeerIdentityType, signer proto.Signer) (*proto.SignedGossipMessage, error)
//代码在gossip/comm/comm_impl.go
```
#### 5.2.1、func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity api.PeerIdentityType,secureDialOpts api.PeerSecureDialOpts, dialOpts ...grpc.DialOption) (Comm, error)
创建并启动gRPC Server,以及注册GossipServer实例
```go
func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity api.PeerIdentityType,
secureDialOpts api.PeerSecureDialOpts, dialOpts ...grpc.DialOption) (Comm, error) {
var ll net.Listener
var s *grpc.Server
var certHash []byte
if len(dialOpts) == 0 {
//peer.gossip.dialTimeout,gRPC连接拨号的超时
dialOpts = []grpc.DialOption{grpc.WithTimeout(util.GetDurationOrDefault("peer.gossip.dialTimeout", defDialTimeout))}
}
if port > 0 {
//创建gRPC Server,grpc.NewServer(serverOpts...)
s, ll, secureDialOpts, certHash = createGRPCLayer(port)
}
commInst := &commImpl{
selfCertHash: certHash,
PKIID: idMapper.GetPKIidOfCert(peerIdentity),
idMapper: idMapper,
logger: util.GetLogger(util.LoggingCommModule, fmt.Sprintf("%d", port)),
peerIdentity: peerIdentity,
opts: dialOpts,
secureDialOpts: secureDialOpts,
port: port,
lsnr: ll,
gSrv: s,
msgPublisher: NewChannelDemultiplexer(),
lock: &sync.RWMutex{},
deadEndpoints: make(chan common.PKIidType, 100),
stopping: int32(0),
exitChan: make(chan struct{}, 1),
subscriptions: make([]chan proto.ReceivedMessage, 0),
}
commInst.connStore = newConnStore(commInst, commInst.logger)
if port > 0 {
commInst.stopWG.Add(1)
go func() {
defer commInst.stopWG.Done()
s.Serve(ll) //启动gRPC Server
}()
//commInst注册到gRPC Server
proto.RegisterGossipServer(s, commInst)
}
return commInst, nil
}
//代码在gossip/comm/comm_impl.go
```
#### 5.2.2、func NewCommInstance(s *grpc.Server, cert *tls.Certificate, idStore identity.Mapper,peerIdentity api.PeerIdentityType, secureDialOpts api.PeerSecureDialOpts,dialOpts ...grpc.DialOption) (Comm, error)
将GossipServer实例注册至peerServer
```go
func NewCommInstance(s *grpc.Server, cert *tls.Certificate, idStore identity.Mapper,
peerIdentity api.PeerIdentityType, secureDialOpts api.PeerSecureDialOpts,
dialOpts ...grpc.DialOption) (Comm, error) {
dialOpts = append(dialOpts, grpc.WithTimeout(util.GetDurationOrDefault("peer.gossip.dialTimeout", defDialTimeout)))
//构造commImpl
commInst, err := NewCommInstanceWithServer(-1, idStore, peerIdentity, secureDialOpts, dialOpts...)
if cert != nil {
inst := commInst.(*commImpl)
inst.selfCertHash = certHashFromRawCert(cert.Certificate[0])
}
proto.RegisterGossipServer(s, commInst.(*commImpl))
return commInst, nil
}
2018年10月31日 07点10分