multiformats / go-multistream

an implementation of the multistream protocol in go
MIT License
39 stars 27 forks source link

replace AddHandler with Listen #79

Open marten-seemann opened 2 years ago

marten-seemann commented 2 years ago

@Stebalien suggested to replace the AddHandler (which gets passed a callback) with a Listen(... protocol.ID) function, returning a Listener, on which an Accept method could be called to accept streams.

This would be a more idiomatic approach, as it closely mirrors how net.Conns and streams are accepted. It would also move the go routine handling to the application, which is arguably the place where it belongs.

Stebalien commented 2 years ago
marten-seemann commented 2 years ago

This will be more involved than expected: multistream is not only used for stream negotiation, but also to negotiate the security protocol (as long as we don't have Protocol Select) and the stream muxer. That's the reason go-multistream currently acts on io.ReadWriteClosers. It would be super nice to get rid of the type assertions and use generics:

type conn interface { // TODO: find a more descriptive name than conn
    io.ReadWriteCloser
    Reset() error
    Protocol() protocol.ID
}

type Listener[T conn] interface {
     io.Closer
     Accept() (T, error)
}

type MultistreamMuxer[T conn] struct {
     Listen(... protocol.ID) (Listener[T], error)
}

These types will be instantiated (or whatever the right generics-lingo term for this is) with a (wrapped) net.Conn, a network.SecureConn and a network.Stream.

We could then have an Accept function that works like this:

type listener {
    queue chan conn // buffered chan (of length 4 or 8)?
}

func (l *listener) addStream(c conn) { // called by the multistream muxer as soon as the protocol was negotiated
    select {
    case l.queue <- c:
    default:
        log.Warnf("dropping", "protocol", c.Protocol())
        c.Reset()
    }
}

func (l *listener) Accept() (network.Stream, error) {
    select {
    case <-l.closeChan:
        return nil, errors.New("listener closed")
    case str := <-l.queue:
        return str, nil
    }
}