ipni / index-provider

📢 Index Provider
Other
35 stars 16 forks source link

WithTopic and WithTopicName Option #255

Open sssion opened 2 years ago

sssion commented 2 years ago

// WithTopic sets the pubsub topic on which new advertisements are announced. // To use the default pubsub configuration with a specific topic name, use WithTopicName. If both // options are specified, WithTopic takes presence. // // Note that this option only takes effect if the PublisherKind is set to DataTransferPublisher. // See: WithPublisherKind. func WithTopic(t pubsub.Topic) Option { return func(o options) error { o.pubTopic = t return nil } }

// WithTopicName sets toe topic name on which pubsub announcements are published. // To override the default pubsub configuration, use WithTopic. // // Note that this option only takes effect if the PublisherKind is set to DataTransferPublisher. // See: WithPublisherKind. func WithTopicName(t string) Option { return func(o *options) error { o.pubTopicName = t return nil } }


The topic and topic name works in legs dtsyn publisher. But topic will not override the topic name.

engine.go 183
    case DataTransferPublisher:
        dtOpts := []dtsync.Option{
            dtsync.Topic(e.pubTopic),
            dtsync.WithExtraData(e.pubExtraGossipData),
            dtsync.AllowPeer(e.syncPolicy.Allowed),
        }

        if e.pubDT != nil {
            return dtsync.NewPublisherFromExisting(e.pubDT, e.h, e.pubTopicName, e.lsys, dtOpts...)
        }
        ds := dsn.Wrap(e.ds, datastore.NewKey("/legs/dtsync/pub"))
        return dtsync.NewPublisher(e.h, ds, e.lsys, e.pubTopicName, dtOpts...)

dtsync/publisher.go
// NewPublisher creates a new legs publisher
func NewPublisher(host host.Host, ds datastore.Batching, lsys ipld.LinkSystem, topic string, options ...Option) (*publisher, error) {
    cfg := config{}
    err := cfg.apply(options)
    if err != nil {
        return nil, err
    }

    var cancel context.CancelFunc
    t := cfg.topic
    if t == nil {
        var ctx context.Context
        ctx, cancel = context.WithCancel(context.Background())
        t, err = gpubsub.MakePubsub(ctx, host, topic)
        if err != nil {
            cancel()
            return nil, err
        }
    }

    dtManager, _, dtClose, err := makeDataTransfer(host, ds, lsys, cfg.allowPeer)
    if err != nil {
        if cancel != nil {
            cancel()
        }
        return nil, err
    }

    headPublisher := head.NewPublisher()
    startHeadPublisher(host, topic, headPublisher)

    p := &publisher{
        cancelPubSub:  cancel,
        dtManager:     dtManager,
        dtClose:       dtClose,
        headPublisher: headPublisher,
        host:          host,
        topic:         t,
    }

    if len(cfg.extraData) != 0 {
        p.extraData = cfg.extraData
    }
    return p, nil
}

func startHeadPublisher(host host.Host, topic string, headPublisher *head.Publisher) {
    go func() {
        log.Infow("Starting head publisher for topic", "topic", topic, "host", host.ID())
        err := headPublisher.Serve(host, topic)
        if err != http.ErrServerClosed {
            log.Errorw("Head publisher stopped serving on topic on host", "topic", topic, "host", host.ID(), "err", err)
        }
        log.Infow("Stopped head publisher", "host", host.ID(), "topic", topic)
    }()
}

The topic name is must and as the headpublisher topic. If Topic in Option is nil a new Topic will be created with the topic name.
If topic name and Topic are both set value and not equal in name,  head.QueryRootCid will get error "Get "http://unused.invalid/head": protocol not supported" .
sssion commented 2 years ago

engine_test.go 139 subject, err := New( WithHost(pubHost), WithPublisherKind(DataTransferPublisher), WithTopic(pubT), WithTopicName(topic), WithSubTopic(pubTSub), WithExtraGossipData(extraGossipData), ) If delete WithTopicName or modify the topic string, the ut will fail.