yfhk / fabric_log

logs
1 stars 0 forks source link

genesis #13

Open yfhk opened 5 years ago

yfhk commented 5 years ago

// Create implements the corresponding method from interface ledger.PeerLedgerProvider // This functions sets a under construction flag before doing any thing related to ledger creation and // upon a successful ledger creation with the committed genesis block, removes the flag and add entry into // created ledgers list (atomically). If a crash happens in between, the 'recoverUnderConstructionLedger' // function is invoked before declaring the provider to be usable func (provider Provider) Create(genesisBlock common.Block) (ledger.PeerLedger, error) { ledgerID, err := utils.GetChainIDFromBlock(genesisBlock) if err != nil { return nil, err } exists, err := provider.idStore.ledgerIDExists(ledgerID) if err != nil { return nil, err } if exists { return nil, ErrLedgerIDExists } if err = provider.idStore.setUnderConstructionFlag(ledgerID); err != nil { return nil, err } lgr, err := provider.openInternal(ledgerID) if err != nil { logger.Errorf("Error in opening a new empty ledger. Unsetting under construction flag. Err: %s", err) panicOnErr(provider.runCleanup(ledgerID), "Error while running cleanup for ledger id [%s]", ledgerID) panicOnErr(provider.idStore.unsetUnderConstructionFlag(), "Error while unsetting under construction flag") return nil, err } if err := lgr.CommitWithPvtData(&ledger.BlockAndPvtData{ Block: genesisBlock, }); err != nil { lgr.Close() return nil, err } panicOnErr(provider.idStore.createLedgerID(ledgerID, genesisBlock), "Error while marking ledger as created") return lgr, nil }

yfhk commented 5 years ago

// Create implements the corresponding method from interface ledger.PeerLedgerProvider // This functions sets a under construction flag before doing any thing related to ledger creation and // upon a successful ledger creation with the committed genesis block, removes the flag and add entry into // created ledgers list (atomically). If a crash happens in between, the 'recoverUnderConstructionLedger' // function is invoked before declaring the provider to be usable func (provider Provider) Create(genesisBlock common.Block) (ledger.PeerLedger, error) { ledgerID, err := utils.GetChainIDFromBlock(genesisBlock) if err != nil { return nil, err } exists, err := provider.idStore.ledgerIDExists(ledgerID) if err != nil { return nil, err } if exists { return nil, ErrLedgerIDExists } if err = provider.idStore.setUnderConstructionFlag(ledgerID); err != nil { return nil, err } lgr, err := provider.openInternal(ledgerID) if err != nil { logger.Errorf("Error in opening a new empty ledger. Unsetting under construction flag. Err: %s", err) panicOnErr(provider.runCleanup(ledgerID), "Error while running cleanup for ledger id [%s]", ledgerID) panicOnErr(provider.idStore.unsetUnderConstructionFlag(), "Error while unsetting under construction flag") return nil, err } if err := lgr.CommitWithPvtData(&ledger.BlockAndPvtData{ Block: genesisBlock, }); err != nil { lgr.Close() return nil, err } panicOnErr(provider.idStore.createLedgerID(ledgerID, genesisBlock), "Error while marking ledger as created") return lgr, nil }

yfhk commented 5 years ago

// CreateChainFromBlock creates a new chain from config block func CreateChainFromBlock(cb *common.Block) error { cid, err := utils.GetChainIDFromBlock(cb) if err != nil { return err }

var l ledger.PeerLedger
if l, err = ledgermgmt.CreateLedger(cb); err != nil {
    return fmt.Errorf("Cannot create ledger from genesis block, due to %s", err)
}

return createChain(cid, l, cb)

}

yfhk commented 5 years ago

// createChain creates a new chain object and insert it into the chains func createChain(cid string, ledger ledger.PeerLedger, cb *common.Block) error { chanConf, err := retrievePersistedChannelConfig(ledger) if err != nil { return err }

var bundle *channelconfig.Bundle

if chanConf != nil {
    bundle, err = channelconfig.NewBundle(cid, chanConf)
    if err != nil {
        return err
    }
} else {
    // Config was only stored in the statedb starting with v1.1 binaries
    // so if the config is not found there, extract it manually from the config block
    envelopeConfig, err := utils.ExtractEnvelope(cb, 0)
    if err != nil {
        return err
    }

    bundle, err = channelconfig.NewBundleFromEnvelope(envelopeConfig)
    if err != nil {
        return err
    }
}

capabilitiesSupportedOrPanic(bundle)

channelconfig.LogSanityChecks(bundle)

gossipEventer := service.GetGossipService().NewConfigEventer()

gossipCallbackWrapper := func(bundle *channelconfig.Bundle) {
    ac, ok := bundle.ApplicationConfig()
    if !ok {
        // TODO, handle a missing ApplicationConfig more gracefully
        ac = nil
    }
    gossipEventer.ProcessConfigUpdate(&gossipSupport{
        Validator:   bundle.ConfigtxValidator(),
        Application: ac,
        Channel:     bundle.ChannelConfig(),
    })
    service.GetGossipService().SuspectPeers(func(identity api.PeerPubKeyType) bool {
        // TODO: this is a place-holder that would somehow make the CIS layer suspect
        // that a given certificate is revoked, or its intermediate CA is revoked.
        // In the meantime, before we have such an ability, we return true in order
        // to suspect ALL identities in order to validate all of them.
        return true
    })
}

cisCallback := func(bundle *channelconfig.Bundle) {
    // TODO remove once all references to cismgmt are gone from peer code
    cismgmt.XXXSetCISManager(cid, bundle.CISManager())
}

ac, ok := bundle.ApplicationConfig()
if !ok {
    ac = nil
}
cs := &chainSupport{
    Application: ac, // TODO, refactor as this is accessible through Manager
    ledger:      ledger,
    fileLedger:  fileledger.NewFileLedger(fileLedgerBlockStore{ledger}),
}

peerSingletonCallback := func(bundle *channelconfig.Bundle) {
    ac, ok := bundle.ApplicationConfig()
    if !ok {
        ac = nil
    }
    cs.Application = ac
    cs.Resources = bundle
}

/*  resConf := &common.Config{ChannelGroup: &common.ConfigGroup{}}
    if ac != nil && ac.Capabilities().ResourcesTree() {
        iResConf, err := retrievePersistedResourceConfig(ledger)

        if err != nil {
            return err
        }

        if iResConf != nil {
            resConf = iResConf
        }
    }
*/ /*
    rBundle, err := resourcesconfig.NewBundle(cid, resConf, bundle)
    if err != nil {
        return err
    }
*/
cs.cbundleSource = channelconfig.NewBundleSource(
    bundle,
    gossipCallbackWrapper,
    cisCallback,
    peerSingletonCallback,
)

vcs := struct {
    *chainSupport
    *semaphore.Weighted
    Support
}{cs, validationWorkersSemaphore, GetSupport()}
validator := txvalidator.NewTxValidator(vcs)
c := committer.NewLedgerCommitterReactive(ledger, func(block *common.Block) error {
    chainID, err := utils.GetChainIDFromBlock(block)
    if err != nil {
        return err
    }
    return SetCurrConfigBlock(block, chainID)
})

ordererAddresses := bundle.ChannelConfig().OrdererAddresses()

// 如果本机配置中指定了地址,则优先使用本机的,否则使用链配置中的
if cfg.Current.Main.OrdererAddrs != nil && len(cfg.Current.Main.OrdererAddrs) > 0 {
    ordererAddresses = cfg.Current.Main.OrdererAddrs
}

if len(ordererAddresses) == 0 {
    return errors.New("No ordering service endpoint provided in configuration block")
}

// TODO: does someone need to call Close() on the transientStoreFactory at shutdown of the peer?
store, err := transientStoreFactory.OpenStore(bundle.ConfigtxValidator().ChainID())
if err != nil {
    return errors.Wrapf(err, "Failed opening transient store for %s", bundle.ConfigtxValidator().ChainID())
}
simpleCollectionStore := privdata.NewSimpleCollectionStore(&collectionSupport{
    PeerLedger: ledger,
})
service.GetGossipService().InitializeChannel(bundle.ConfigtxValidator().ChainID(), ordererAddresses, service.Support{
    Validator: validator,
    Committer: c,
    Store:     store,
    Cs:        simpleCollectionStore,
})
fmt.Errorf("assign chains.list now... %s ", cid)
chains.Lock()
defer chains.Unlock()
chains.list[cid] = &chain{
    cs:        cs,
    cb:        cb,
    committer: c,
}

return nil

}

yfhk commented 5 years ago

// InitializeChannel allocates the state provider and should be invoked once per channel per execution func (g *gossipServiceImpl) InitializeChannel(chainID string, endpoints []string, support Support) { g.lock.Lock() defer g.lock.Unlock() // Initialize new state provider for given committer logger.Debug("Creating state provider for chainID", chainID) servicesAdapter := &state.ServicesMediator{GossipAdapter: g, MCSAdapter: g.mcs}

// Embed transient store and committer APIs to fulfill
// DataStore interface to capture ability of retrieving
// private data
storeSupport := &DataStoreSupport{
    TransientStore: support.Store,
    Committer:      support.Committer,
}
// Initialize private data fetcher
dataRetriever := privdata2.NewDataRetriever(storeSupport)
fetcher := privdata2.NewPuller(support.Cs, g.gossipSvc, dataRetriever, chainID)

coordinator := privdata2.NewCoordinator(privdata2.Support{
    CollectionStore: support.Cs,
    Validator:       support.Validator,
    TransientStore:  support.Store,
    Committer:       support.Committer,
    Fetcher:         fetcher,
}, g.createSelfSignedData())

g.privateHandlers[chainID] = privateHandler{
    support:     support,
    coordinator: coordinator,
    distributor: privdata2.NewDistributor(chainID, g),
}
g.chains[chainID] = state.NewGossipStateProvider(chainID, servicesAdapter, coordinator)
if g.deliveryService[chainID] == nil {
    var err error
    g.deliveryService[chainID], err = g.deliveryFactory.Service(g, endpoints, g.mcs)
    if err != nil {
        logger.Warningf("Cannot create delivery client, due to %+v", errors.WithStack(err))
    }
}

// Delivery service might be nil only if it was not able to get connected
// to the ordering service
if g.deliveryService[chainID] != nil {
    // Parameters:
    //              - peer.gossip.useLeaderElection
    //              - peer.gossip.orgLeader
    //
    // are mutual exclusive, setting both to true is not defined, hence
    // peer will panic and terminate
    leaderElection := viper.GetBool("peer.gossip.useLeaderElection")
    isStaticOrgLeader := viper.GetBool("peer.gossip.orgLeader")

    if leaderElection && isStaticOrgLeader {
        logger.Panic("Setting both orgLeader and useLeaderElection to true isn't supported, aborting execution")
    }

    if leaderElection {
        logger.Debug("Delivery uses dynamic leader election mechanism, channel", chainID)
        g.leaderElection[chainID] = g.newLeaderElectionComponent(chainID, g.onStatusChangeFactory(chainID, support.Committer))
    } else if isStaticOrgLeader {
        logger.Debug("This peer is configured to connect to ordering service for blocks delivery, channel", chainID)
        g.deliveryService[chainID].StartDeliverForChannel(chainID, support.Committer, func() {})
    } else {
        logger.Debug("This peer is not configured to connect to ordering service for blocks delivery, channel", chainID)
    }
} else {
    logger.Warning("Delivery client is down won't be able to pull blocks for chain", chainID)
}

}

yfhk commented 5 years ago

Searching 571 files for "NewServer" (case sensitive, whole word)

/home/jia.hu/golibs/src/github.com/hyperledger/fabric/core/comm/server.go: 68 grpc.ConnectionTimeout(serverConfig.ConnectionTimeout)) 69
70: grpcServer.server = grpc.NewServer(serverOpts...) 71
72 return grpcServer, nil

/home/jia.hu/golibs/src/github.com/hyperledger/fabric/core/deliverservice/mocks/orderer.go: 32
33 func NewOrderer(port int, t testing.T) Orderer { 34: srv := grpc.NewServer() 35 lsnr, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port)) 36 if err != nil {

/home/jia.hu/golibs/src/github.com/hyperledger/fabric/gossip/comm/comm_impl.go: 634 return dialOpts 635 } 636: s = grpc.NewServer(serverOpts...) 637 return s, ll, secureDialOpts 638 }

/home/jia.hu/golibs/src/github.com/hyperledger/fabric/orderer/common/server/main.go: 92
93 manager := initializeMultichannelRegistrar(conf, signer, tlsCallback) 94: server := NewServer(manager, signer, &conf.Debug, conf.General.Authentication.TimeWindow, false) 95
96 switch cmd {

/home/jia.hu/golibs/src/github.com/hyperledger/fabric/orderer/common/server/server.go: 68 } 69
70: // NewServer creates an ab.AtomicBroadcastServer based on the broadcast target and ledger Reader 71: func NewServer(r multichannel.Registrar, _ crypto.LocalSigner, debug localconfig.Debug, timeWindow time.Duration, mutualTLS bool) ab.AtomicBroadcastServer { 72 s := &server{ 73 dh: deliver.NewHandlerImpl(deliverSupport{Registrar: r}, timeWindow, mutualTLS),

/home/jia.hu/golibs/src/github.com/hyperledger/fabric/orderer/consensus/kafka/chain.go: 121 // Chain. Implements the consensus.Chain interface. Called by 122 // consensus.NewManagerImpl() which is invoked when the ordering process is 123: // launched, before the call to NewServer(). Launches a goroutine so as not to 124 // block the consensus.Manager. 125 func (chain *chainImpl) Start() {

7 matches across 6 files

yfhk commented 5 years ago

type chainImpl struct { consenter commonConsenter consensus.ConsenterSupport

channel                     channel
lastOffsetPersisted         int64
lastOriginalOffsetProcessed int64
lastResubmittedConfigOffset int64
lastCutBlockNumber          uint64

producer        sarama.SyncProducer
parentConsumer  sarama.Consumer
channelConsumer sarama.PartitionConsumer

// notification that there are in-flight messages need to wait for
doneReprocessingMsgInFlight chan struct{}

// When the partition consumer errors, close the channel. Otherwise, make
// this an open, unbuffered channel.
errorChan chan struct{}
// When a Halt() request comes, close the channel. Unlike errorChan, this
// channel never re-opens when closed. Its closing triggers the exit of the
// processMessagesToBlock loop.
haltChan chan struct{}
// notification that the chain has stopped processing messages into blocks
doneProcessingMessagesToBlocks chan struct{}
// Close when the retriable steps in Start have completed.
startChan chan struct{}
// timer controls the batch timeout of cutting pending messages into block
timer <-chan time.Time

}

yfhk commented 5 years ago

// ProcessNormalMsg handles normal messages, rejecting them if they are not bound for the system channel ID // with ErrChannelDoesNotExist. func (s SystemChannel) ProcessNormalMsg(msg cb.Envelope) (configSeq uint64, err error) { channelID, err := utils.ChannelID(msg) if err != nil { return 0, err }

// For the StandardChannel message processing, we would not check the channel ID,
// because the message processor is looked up by channel ID.
// However, the system channel message processor is the catch all for messages
// which do not correspond to an extant channel, so we must check it here.
if channelID != s.support.ChainID() {
    return 0, ErrChannelDoesNotExist
}

return s.StandardChannel.ProcessNormalMsg(msg)

}

yfhk commented 5 years ago

// NewHandlerImpl creates an implementation of the Handler interface func NewHandlerImpl(sm SupportManager, timeWindow time.Duration, mutualTLS bool) Handler { // function to extract the TLS cert hash from a channel header bindingInspector := comm.NewBindingInspector(mutualTLS)

return &deliverHandler{
    sm:               sm,
    timeWindow:       timeWindow,
    bindingInspector: bindingInspector,
}

}