Open yfhk opened 5 years ago
// StartDeliverForChannel starts blocks delivery for channel // initializes the grpc stream for given chainID, creates blocks provider instance // that spawns in go routine to read new blocks starting from the position provided by ledger // info instance. func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, finalizer func()) error { d.lock.Lock() defer d.lock.Unlock() if d.stopping { errMsg := fmt.Sprintf("Delivery service is stopping cannot join a new channel %s", chainID) logger.Errorf(errMsg) return errors.New(errMsg) } if _, exist := d.blockProviders[chainID]; exist { errMsg := fmt.Sprintf("Delivery service - block provider already exists for %s found, can't start delivery", chainID) logger.Errorf(errMsg) return errors.New(errMsg) } else { client := d.newClient(chainID, ledgerInfo) logger.Debug("This peer will pass blocks from orderer service to other peers for channel", chainID) d.blockProviders[chainID] = blocksprovider.NewBlocksProvider(chainID, client, d.conf.Gossip, d.conf.CryptoSvc) go d.launchBlockProvider(chainID, finalizer) } return nil }
case *orderer.DeliverResponse_Block:
errorStatusCounter = 0
statusCounter = 0
seqNum := t.Block.Header.Number
marshaledBlock, err := proto.Marshal(t.Block)
if err != nil {
logger.Errorf("[%s] Error serializing block with sequence number %d, due to %s", b.chainID, seqNum, err)
continue
}
if err := b.mcs.VerifyBlock(gossipcommon.ChainID(b.chainID), seqNum, marshaledBlock); err != nil {
logger.Errorf("[%s] Error verifying block with sequnce number %d, due to %s", b.chainID, seqNum, err)
continue
}
//add by huxiao
record.RecordNewBlockToFile(t.Block)
numberOfPeers := len(b.gossip.PeersOfChannel(gossipcommon.ChainID(b.chainID)))
// Create payload with a block received
payload := createPayload(seqNum, marshaledBlock)
// Use payload to create gossip message
gossipMsg := createGossipMsg(b.chainID, payload)
logger.Debugf("[%s] Adding payload locally, buffer seqNum = [%d], peers number [%d]", b.chainID, seqNum, numberOfPeers)
// Add payload to local state payloads buffer
if err := b.gossip.AddPayload(b.chainID, payload); err != nil {
logger.Warning("Failed adding payload of", seqNum, "because:", err)
}
// Gossip messages with other nodes
logger.Debugf("[%s] Gossiping block [%d], peers number [%d]", b.chainID, seqNum, numberOfPeers)
b.gossip.Gossip(gossipMsg)
// DeliverBlocks used to pull out blocks from the ordering service to // distributed them across peers func (b blocksProviderImpl) DeliverBlocks() { errorStatusCounter := 0 statusCounter := 0 defer b.client.Close() for !b.isDone() { msg, err := b.client.Recv() if err != nil { logger.Warningf("[%s] Receive error: %s", b.chainID, err.Error()) return } switch t := msg.Type.(type) { case orderer.DeliverResponse_Status: if t.Status == common.Status_SUCCESS { logger.Warningf("[%s] ERROR! Received success for a seek that should never complete", b.chainID) return } if t.Status == common.Status_BAD_REQUEST || t.Status == common.Status_FORBIDDEN { logger.Errorf("[%s] Got error %v", b.chainID, t) errorStatusCounter++ if errorStatusCounter > b.wrongStatusThreshold { logger.Criticalf("[%s] Wrong statuses threshold passed, stopping block provider", b.chainID) return } } else { errorStatusCounter = 0 logger.Warningf("[%s] Got error %v", b.chainID, t) } maxDelay := float64(maxRetryDelay) currDelay := float64(time.Duration(math.Pow(2, float64(statusCounter))) 100 time.Millisecond) time.Sleep(time.Duration(math.Min(maxDelay, currDelay))) if currDelay < maxDelay { statusCounter++ } if t.Status == common.Status_BAD_REQUEST { b.client.Disconnect(false) } else { b.client.Disconnect(true) } continue case *orderer.DeliverResponse_Block: errorStatusCounter = 0 statusCounter = 0 seqNum := t.Block.Header.Number
}