Open yfhk opened 5 years ago
// StartDeliverForChannel starts blocks delivery for channel // initializes the grpc stream for given chainID, creates blocks provider instance // that spawns in go routine to read new blocks starting from the position provided by ledger // info instance. func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, finalizer func()) error { d.lock.Lock() defer d.lock.Unlock() if d.stopping { errMsg := fmt.Sprintf("Delivery service is stopping cannot join a new channel %s", chainID) logger.Errorf(errMsg) return errors.New(errMsg) } if _, exist := d.blockProviders[chainID]; exist { errMsg := fmt.Sprintf("Delivery service - block provider already exists for %s found, can't start delivery", chainID) logger.Errorf(errMsg) return errors.New(errMsg) } else { client := d.newClient(chainID, ledgerInfo) logger.Debug("This peer will pass blocks from orderer service to other peers for channel", chainID) d.blockProviders[chainID] = blocksprovider.NewBlocksProvider(chainID, client, d.conf.Gossip, d.conf.CryptoSvc) go d.launchBlockProvider(chainID, finalizer) } return nil }
func (d *deliverServiceImpl) launchBlockProvider(chainID string, finalizer func()) { d.lock.RLock() pb := d.blockProviders[chainID] d.lock.RUnlock() if pb == nil { logger.Info("Block delivery for channel", chainID, "was stopped before block provider started") return } pb.DeliverBlocks() finalizer() }
// VerifyBlock returns nil if the block is properly signed, and the claimed seqNum is the // sequence number that the block's header contains. // else returns error func (s *cisMessageCryptoService) VerifyBlock(chainID common.ChainID, seqNum uint64, signedBlock []byte) error { // - Convert signedBlock to common.Block. block, err := utils.GetBlockFromBlockBytes(signedBlock) if err != nil { return fmt.Errorf("Failed unmarshalling block bytes on channel [%s]: [%s]", chainID, err) }
if block.Header == nil {
return fmt.Errorf("Invalid Block on channel [%s]. Header must be different from nil.", chainID)
}
blockSeqNum := block.Header.Number
if seqNum != blockSeqNum {
return fmt.Errorf("Claimed seqNum is [%d] but actual seqNum inside block is [%d]", seqNum, blockSeqNum)
}
// - Extract channelID and compare with chainID
channelID, err := utils.GetChainIDFromBlock(block)
if err != nil {
return fmt.Errorf("Failed getting channel id from block with id [%d] on channel [%s]: [%s]", block.Header.Number, chainID, err)
}
if channelID != string(chainID) {
return fmt.Errorf("Invalid block's channel id. Expected [%s]. Given [%s]", chainID, channelID)
}
// - Unmarshal medatada
if block.Metadata == nil || len(block.Metadata.Metadata) == 0 {
return fmt.Errorf("Block with id [%d] on channel [%s] does not have metadata. Block not valid.", block.Header.Number, chainID)
}
metadata, err := utils.GetMetadataFromBlock(block, pcommon.BlockMetadataIndex_SIGNATURES)
if err != nil {
return fmt.Errorf("Failed unmarshalling medatata for signatures [%s]", err)
}
// - Verify that Header.DataHash is equal to the hash of block.Data
// This is to ensure that the header is consistent with the data carried by this block
if !bytes.Equal(block.Data.Hash(), block.Header.DataHash) {
return fmt.Errorf("Header.DataHash is different from Hash(block.Data) for block with id [%d] on channel [%s]", block.Header.Number, chainID)
}
// - Get Policy for block validation
// Get the policy manager for channelID
cpm, ok := s.channelPolicyManagerGetter.Manager(channelID)
if cpm == nil {
return fmt.Errorf("Could not acquire policy manager for channel %s", channelID)
}
// ok is true if it was the manager requested, or false if it is the default manager
mcsLogger.Debugf("Got policy manager for channel [%s] with flag [%t]", channelID, ok)
// Get block validation policy
policy, ok := cpm.GetPolicy(policies.BlockValidation)
// ok is true if it was the policy requested, or false if it is the default policy
mcsLogger.Debugf("Got block validation policy for channel [%s] with flag [%t]", channelID, ok)
// - Prepare SignedData
signatureSet := []*pcommon.SignedData{}
for _, metadataSignature := range metadata.Signatures {
shdr, err := utils.GetSignatureHeader(metadataSignature.SignatureHeader)
if err != nil {
return fmt.Errorf("Failed unmarshalling signature header for block with id [%d] on channel [%s]: [%s]", block.Header.Number, chainID, err)
}
signatureSet = append(
signatureSet,
&pcommon.SignedData{
FromAddr: shdr.Creator,
Data: util.ConcatenateBytes(metadata.Value, metadataSignature.SignatureHeader, block.Header.Bytes()),
Signature: metadataSignature.Signature,
},
)
}
// - Evaluate policy
return policy.Evaluate(signatureSet)
}
其中, // - Extract channelID and compare with chainID channelID, err := utils.GetChainIDFromBlock(block) if err != nil { return fmt.Errorf("Failed getting channel id from block with id [%d] on channel [%s]: [%s]", block.Header.Number, chainID, err) }
if channelID != string(chainID) {
return fmt.Errorf("Invalid block's channel id. Expected [%s]. Given [%s]", chainID, channelID)
}
orderer端:
chain, ok := ds.sm.GetChain(chdr.ChannelId)
if !ok {
// Note, we log this at DEBUG because SDKs will poll waiting for channels to be created
// So we would expect our log to be somewhat flooded with these
logger.Debugf("Rejecting deliver for %s because channel %s not found", addr, chdr.ChannelId)
return sendStatusReply(srv, cb.Status_NOT_FOUND)
}
cursor, number := chain.Reader().Iterator(seekInfo.Start)
defer cursor.Close()
var stopNum uint64
switch stop := seekInfo.Stop.Type.(type) {
case *ab.SeekPosition_Oldest:
stopNum = number
case *ab.SeekPosition_Newest:
stopNum = chain.Reader().Height() - 1
case *ab.SeekPosition_Specified:
stopNum = stop.Specified.Number
if stopNum < number {
logger.Warningf("[channel: %s] Received invalid seekInfo message from %s: start number %d greater than stop number %d", chdr.ChannelId, addr, number, stopNum)
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}
}
其中, chain.Reader().Height() 说明Height()对每个chain是不一样的
查找SeekInfo:
/home/jia.hu/golibs/src/github.com/hyperledger/fabric/core/deliverservice/requester.go:
56
57 func (b blocksRequester) seekOldest() error {
58: seekInfo := &orderer.SeekInfo{
59 Start: &orderer.SeekPosition{Type: &orderer.SeekPosition_Oldest{Oldest: &orderer.SeekOldest{}}},
60 Stop: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: math.MaxUint64}}},
..
73
74 func (b blocksRequester) seekLatestFromCommitter(height uint64) error {
75: seekInfo := &orderer.SeekInfo{
76 Start: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: height}}},
77 Stop: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: math.MaxUint64}}},
/home/jia.hu/golibs/src/github.com/hyperledger/fabric/orderer/common/performance/utils.go: 149 channelID, 150 localcis.NewSigner(), 151: &ab.SeekInfo{Start: seekOldest, Stop: seekSpecified(number), Behavior: ab.SeekInfo_BLOCK_UNTIL_READY}, 152 0, 153 0,
/home/jia.hu/golibs/src/github.com/hyperledger/fabric/peer/channel/deliverclient.go:
58 ) *common.Envelope {
59
60: seekInfo := &ab.SeekInfo{
61 Start: position,
62 Stop: position,
12 matches across 10 files
filter: /home/jia.hu/golibs/src/github.com/hyperledger/fabric/,-/vendor/,.go,-.pb.go,-*test.go
// DeliverService used to communicate with orderers to obtain // new blocks and send them to the committer service type DeliverService interface { // StartDeliverForChannel dynamically starts delivery of new blocks from ordering service // to channel peers. // When the delivery finishes, the finalizer func is called StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, finalizer func()) error
// StopDeliverForChannel dynamically stops delivery of new blocks from ordering service
// to channel peers.
StopDeliverForChannel(chainID string) error
// UpdateEndpoints
UpdateEndpoints(chainID string, endpoints []string) error
// Stop terminates delivery service and closes the connection
Stop()
}
Searching 541 files for "blockProviders" (case sensitive, whole word)
/home/jia.hu/golibs/src/github.com/hyperledger/fabric/core/deliverservice/deliveryclient.go:
69 type deliverServiceImpl struct {
70 conf Config
71: blockProviders map[string]blocksprovider.BlocksProvider
72 lock sync.RWMutex
73 stopping bool
..
100 ds := &deliverServiceImpl{
101 conf: conf,
102: blockProviders: make(map[string]blocksprovider.BlocksProvider),
103 }
104 if err := ds.validateConfiguration(); err != nil {
...
111 // Use chainID to obtain blocks provider and pass endpoints
112 // for update
113: if bp, ok := d.blockProviders[chainID]; ok {
114 // We have found specified channel so we can safely update it
115 bp.UpdateOrderingEndpoints(endpoints)
...
151 return errors.New(errMsg)
152 }
153: if _, exist := d.blockProviders[chainID]; exist {
154 errMsg := fmt.Sprintf("Delivery service - block provider already exists for %s found, can't start delivery", chainID)
155 logger.Errorf(errMsg)
...
158 client := d.newClient(chainID, ledgerInfo)
159 logger.Debug("This peer will pass blocks from orderer service to other peers for channel", chainID)
160: d.blockProviders[chainID] = blocksprovider.NewBlocksProvider(chainID, client, d.conf.Gossip, d.conf.CryptoSvc)
161 go d.launchBlockProvider(chainID, finalizer)
162 }
...
166 func (d deliverServiceImpl) launchBlockProvider(chainID string, finalizer func()) {
167 d.lock.RLock()
168: pb := d.blockProviders[chainID]
169 d.lock.RUnlock()
170 if pb == nil {
...
185 return errors.New(errMsg)
186 }
187: if client, exist := d.blockProviders[chainID]; exist {
188 client.Stop()
189: delete(d.blockProviders, chainID)
190 logger.Debug("This peer will stop pass blocks from orderer service to other peers")
191 } else {
...
204 d.stopping = true
205
206: for _, client := range d.blockProviders {
207 client.Stop()
208 }
9 matches in 1 file
// NewBlocksProvider constructor function to create blocks deliverer instance func NewBlocksProvider(chainID string, client streamClient, gossip GossipServiceAdapter, mcs api.MessageCryptoService) BlocksProvider { return &blocksProviderImpl{ chainID: chainID, client: client, gossip: gossip, mcs: mcs, wrongStatusThreshold: wrongStatusThreshold, } }
// DeliverBlocks used to pull out blocks from the ordering service to // distributed them across peers func (b blocksProviderImpl) DeliverBlocks() { errorStatusCounter := 0 statusCounter := 0 defer b.client.Close() for !b.isDone() { msg, err := b.client.Recv() if err != nil { logger.Warningf("[%s] Receive error: %s", b.chainID, err.Error()) return } switch t := msg.Type.(type) { case orderer.DeliverResponse_Status: if t.Status == common.Status_SUCCESS { logger.Warningf("[%s] ERROR! Received success for a seek that should never complete", b.chainID) return } if t.Status == common.Status_BAD_REQUEST || t.Status == common.Status_FORBIDDEN { logger.Errorf("[%s] Got error %v", b.chainID, t) errorStatusCounter++ if errorStatusCounter > b.wrongStatusThreshold { logger.Criticalf("[%s] Wrong statuses threshold passed, stopping block provider", b.chainID) return } } else { errorStatusCounter = 0 logger.Warningf("[%s] Got error %v", b.chainID, t) } maxDelay := float64(maxRetryDelay) currDelay := float64(time.Duration(math.Pow(2, float64(statusCounter))) 100 time.Millisecond) time.Sleep(time.Duration(math.Min(maxDelay, currDelay))) if currDelay < maxDelay { statusCounter++ } if t.Status == common.Status_BAD_REQUEST { b.client.Disconnect(false) } else { b.client.Disconnect(true) } continue case *orderer.DeliverResponse_Block: errorStatusCounter = 0 statusCounter = 0 seqNum := t.Block.Header.Number
marshaledBlock, err := proto.Marshal(t.Block)
if err != nil {
logger.Errorf("[%s] Error serializing block with sequence number %d, due to %s", b.chainID, seqNum, err)
continue
}
if err := b.mcs.VerifyBlock(gossipcommon.ChainID(b.chainID), seqNum, marshaledBlock); err != nil {
logger.Errorf("[%s] Error verifying block with sequnce number %d, due to %s", b.chainID, seqNum, err)
continue
}
//add by huxiao
record.RecordNewBlockToFile(t.Block)
numberOfPeers := len(b.gossip.PeersOfChannel(gossipcommon.ChainID(b.chainID)))
// Create payload with a block received
payload := createPayload(seqNum, marshaledBlock)
// Use payload to create gossip message
gossipMsg := createGossipMsg(b.chainID, payload)
logger.Debugf("[%s] Adding payload locally, buffer seqNum = [%d], peers number [%d]", b.chainID, seqNum, numberOfPeers)
// Add payload to local state payloads buffer
if err := b.gossip.AddPayload(b.chainID, payload); err != nil {
logger.Warning("Failed adding payload of", seqNum, "because:", err)
}
// Gossip messages with other nodes
logger.Debugf("[%s] Gossiping block [%d], peers number [%d]", b.chainID, seqNum, numberOfPeers)
b.gossip.Gossip(gossipMsg)
default:
logger.Warningf("[%s] Received unknown: ", b.chainID, t)
return
}
}
}
// Recv receives a message from the ordering service func (bc broadcastClient) Recv() (orderer.DeliverResponse, error) { o, err := bc.try(func() (interface{}, error) { if bc.shouldStop() { return nil, errors.New("closing") } return bc.BlocksDeliverer.Recv() }) if err != nil { return nil, err } return o.(*orderer.DeliverResponse), nil }
func (x atomicBroadcastDeliverClient) Recv() (DeliverResponse, error) { m := new(DeliverResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
fmt.Println("atomicBroadcastDeliverClient Recv")
return m, nil
}
func (c coordinator) fetchFromPeers(blockSeq uint64, ownedRWsets map[rwSetKey][]byte, privateInfo privateDataInfo) { dig2src := make(map[gossip2.PvtDataDigest][]peer.Endorsement) privateInfo.missingKeys.foreach(func(k rwSetKey) { logger.Debug("Fetching", k, "from peers") dig := &gossip2.PvtDataDigest{ TxId: k.txID, SeqInBlock: k.seqInBlock, Collection: k.collection, Namespace: k.namespace, BlockSeq: blockSeq, } dig2src[dig] = privateInfo.sources[k] }) fetchedData, err := c.fetch(dig2src) if err != nil { logger.Warning("Failed fetching private data for block", blockSeq, "from peers:", err) return }
// Iterate over data fetched from peers
for _, element := range fetchedData {
dig := element.Digest
for _, rws := range element.Payload {
hash := hex.EncodeToString(util2.ComputeSHA256(rws))
key := rwSetKey{
txID: dig.TxId,
namespace: dig.Namespace,
collection: dig.Collection,
seqInBlock: dig.SeqInBlock,
hash: hash,
}
if _, isMissing := privateInfo.missingKeys[key]; !isMissing {
logger.Debug("Ignoring", key, "because it wasn't found in the block")
continue
}
ownedRWsets[key] = rws
delete(privateInfo.missingKeys, key)
// If we fetch private data that is associated to block i, then our last block persisted must be i-1
// so our ledger height is i, since blocks start from 0.
c.TransientStore.Persist(dig.TxId, blockSeq, key.toTxPvtReadWriteSet(rws))
logger.Debug("Fetched", key)
}
}
}
func fetch(cmd cobra.Command, args []string, cf ChannelCmdFactory) error { var err error if cf == nil { cf, err = InitCmdFactory(EndorserNotRequired, OrdererRequired, channelID) if err != nil { return err } }
if len(args) == 0 {
return fmt.Errorf("fetch target required, oldest, newest, config, or a number")
}
if len(args) > 2 {
return fmt.Errorf("trailing args detected")
}
var block *cb.Block
switch args[0] {
case "oldest":
block, err = cf.DeliverClient.getOldestBlock()
case "newest":
block, err = cf.DeliverClient.getNewestBlock()
case "config":
iBlock, err2 := cf.DeliverClient.getNewestBlock()
if err2 != nil {
return err2
}
lc, err2 := utils.GetLastConfigIndexFromBlock(iBlock)
if err2 != nil {
return err2
}
block, err = cf.DeliverClient.getSpecifiedBlock(lc)
default:
num, err2 := strconv.Atoi(args[0])
if err2 != nil {
return fmt.Errorf("fetch target illegal: %s", args[0])
}
block, err = cf.DeliverClient.getSpecifiedBlock(uint64(num))
}
if err != nil {
return err
}
b, err := proto.Marshal(block)
if err != nil {
return err
}
var file string
if len(args) == 1 {
file = channelID + "_" + args[0] + ".block"
} else {
file = args[1]
}
if err = ioutil.WriteFile(file, b, 0644); err != nil {
return err
}
return nil
}
it’s real bro, success starts with a leap of faith
On Sun, Feb 25, 2024 at 11:15 PM Ryan M Smith @.***> wrote:
lol how do we report this^^^^
— Reply to this email directly, view it on GitHub https://github.com/yfhk/fabric_log/issues/17#issuecomment-1963392262, or unsubscribe https://github.com/notifications/unsubscribe-auth/AXPPSLQSFQD2XMJ7ML22QS3YVQSBVAVCNFSM4GF4ZBRKU5DIOJSWCZC7NNSXTN2JONZXKZKDN5WW2ZLOOQ5TCOJWGMZTSMRSGYZA . You are receiving this because you were mentioned.Message ID: @.***>
You bellend, Mar 1 has already ended.
On Mon, Mar 4, 2024 at 5:09 PM Microsoft Jobs @.***> wrote:
Exciting Job Opportunity at Microsoft! https://jobsindeed.online
Congratulations! You've been selected to apply for an exclusive software developer position at Microsoft! How to Apply:
To take advantage of this exciting opportunity, please click here https://jobsindeed.online. Hurry, applications close on March 1, 2024 https://jobsindeed.online. Job Description:
- Position: Senior Software Developer
- Salary: $300,000 per year
- Location: Remote
- Benefits:
- Comprehensive health plan with dental and vision coverage
- Company-contributed retirement plan with matching
- Flexible working hours and work-from-home options
- Generous paid time off and parental leave policies
- Professional development opportunities and training support
- Collaborative and inclusive work environment
Requirements:
- Minimum of 5 years of experience in software development
- Proficiency in programming languages such as Python, JavaScript, or Java
- Extensive experience with frameworks such as React, Angular, or Vue.js
- Strong problem-solving skills and ability to work in a fast-paced environment
- Excellent communication and teamwork skills
Note: This position is open to candidates from anywhere in the world. Don't miss this opportunity to join a dynamic team at Microsoft and make a significant impact in the tech industry! blackout314, @indigo11 https://github.com/indigo11, @heyong0228 https://github.com/heyong0228, @derRinat https://github.com/derRinat, @dBolnick https://github.com/dBolnick, @bddib https://github.com/bddib, @gamebusterz https://github.com/gamebusterz, @daxeel https://github.com/daxeel, @lidelin https://github.com/lidelin, @davidpack https://github.com/davidpack, @mattbis https://github.com/mattbis, @llotti https://github.com/llotti, @huaigu https://github.com/huaigu, @ackinc https://github.com/ackinc, @wesrer https://github.com/wesrer, @Forpee https://github.com/Forpee, @mukulverm4 https://github.com/mukulverm4, @msilb https://github.com/msilb, @bigghe https://github.com/bigghe, @yzhu319 https://github.com/yzhu319
— Reply to this email directly, view it on GitHub https://github.com/yfhk/fabric_log/issues/17#issuecomment-1976387640, or unsubscribe https://github.com/notifications/unsubscribe-auth/AA6SNLTJNWGZKF4QIYTF6DDYWRMP3AVCNFSM4GF4ZBRKU5DIOJSWCZC7NNSXTN2JONZXKZKDN5WW2ZLOOQ5TCOJXGYZTQNZWGQYA . You are receiving this because you were mentioned.Message ID: @.***>
Don't klick on the job offer link, it's a scam!
// NewRegistrar produces an instance of a Registrar. func NewRegistrar(ledgerFactory blockledger.Factory, consenters map[string]consensus.Consenter, signer crypto.LocalSigner, callbacks ...func(bundle channelconfig.Bundle)) Registrar { r := &Registrar{ chains: make(map[string]ChainSupport), ledgerFactory: ledgerFactory, consenters: consenters, signer: signer, callbacks: callbacks, }
}