uw-labs / proximo

Interoperable GRPC based publish/subscribe
GNU Lesser General Public License v3.0
41 stars 16 forks source link

Consider callback based server/client implementation #20

Open thesyncim opened 6 years ago

thesyncim commented 6 years ago

Callbacks are strictly more powerful and don’t require unnecessary goroutines. Also is easy to write correct code

Server implementation(untested)


type ServerV2 struct {
    Producer Producer

    Consumer Consumer
}
type MessageSender func(*Message) error

type Consumer interface {
    Init(topic, consumer string, sendMessage MessageSender) error
    ReceiveConfirmation(*Confirmation) error
    Close() error
}

type ConfirmationSender func(*Confirmation) error

type Producer interface {
    Init(topic string, publisher string, sendConfirmation ConfirmationSender) error
    ReceiveMessage(*Message) error
    Close() error
}

func (s *ServerV2) Consume(stream MessageSource_ConsumeServer) error {

    _, cancel := context.WithCancel(stream.Context())
    defer cancel()

    c := &consumer{Consumer: s.Consumer}
    for {
        msg, err := stream.Recv()
        if err != nil {
            if err == io.EOF {
                return nil
            }
            if strings.HasSuffix(err.Error(), "context canceled") {
                return nil
            }

            return err
        }
        //todo handle returned errors
        go c.handleRequest(msg, stream)
    }
}

type consumer struct {
    started  *atomicBool
    Consumer Consumer
}

func (c *consumer) handleRequest(msg *ConsumerRequest, stream MessageSource_ConsumeServer) error {
    switch {
    case msg.GetStartRequest() != nil:

        if c.started.Get() {
            return ErrStartedTwice
        }
        sr := msg.GetStartRequest()

        sendMessage:= func(m *Message) error {
            err := stream.Send(m)
            if err != nil {
                return err
            }
            return nil
        }
        if err := c.Consumer.Init(sr.GetTopic(), sr.GetConsumer(), sendMessage); err != nil {
            return err
        }
        c.started.Set(true)

    case msg.GetConfirmation() != nil:

        if !c.started.Get() {
            return ErrInvalidConfirm
        }

        return c.Consumer.ReceiveConfirmation(msg.GetConfirmation())
    default:
        return ErrInvalidRequest
    }
    panic("impossible")
}

type publisher struct {
    started  *atomicBool
    Producer Producer
}

func (p *publisher) handleRequest(msg *PublisherRequest, stream MessageSink_PublishServer) error {
    switch {
    case msg.GetStartRequest() != nil:
        if p.started.Get() {
            return ErrStartedTwice
        }

        sr := msg.GetStartRequest()
        sendConfirmation := func(c *Confirmation) error {
            return stream.Send(c)
        }
        p.Producer.Init(sr.Topic, "", sendConfirmation)
        p.started.Set(true)
    case msg.GetMsg() != nil:
        if !p.started.Get() {
            return ErrNotConnected
        }
        return p.Producer.ReceiveMessage(msg.GetMsg())

    }
    return nil
}

func (s *ServerV2) Publish(stream MessageSink_PublishServer) error {

    _, cancel := context.WithCancel(stream.Context())
    defer cancel()
    p := &publisher{Producer: s.Producer}
    for {
        msg, err := stream.Recv()
        if err != nil {
            if err == io.EOF {
                return err
            }
            if strings.HasSuffix(err.Error(), "context canceled") {
                return err
            }
            return err
        }
        //todo handle returned errors
        go p.handleRequest(msg, stream)
    }
}

type atomicBool struct{ flag int32 }

func (b *atomicBool) Set(value bool) {
    var i int32 = 0
    if value {
        i = 1
    }
    atomic.StoreInt32(&(b.flag), int32(i))
}

func (b *atomicBool) Get() bool {
    if atomic.LoadInt32(&(b.flag)) != 0 {
        return true
    }
    return false
}

and the backend(untested):


var _ = proximo.Consumer(&NatsHandlerConsumer{})
var _ = proximo.Producer(&NatsHandlerProducer{})

type NatsHandlerConsumer struct {
    Url string
    sub *nats.Subscription
}

func (n *NatsHandlerConsumer) Init(topic, consumer string, sendMessage proximo.MessageSender) error {
    conn, err := nats.Connect(n.Url)
    if err != nil {
        return err
    }
    //defer conn.Close()

    ch := make(chan *nats.Msg, 64) //TODO: make 64 configurable at startup time
    sub, err := conn.ChanSubscribe(topic, ch)
    if err != nil {
        return err
    }
    n.sub = sub

    for {
        select { // drop
        case m := <-ch:
            sendMessage(&proximo.Message{
                Data: m.Data,
                Id:   proximo.GenerateID(),
            })
        }
    }

    return nil
}

func (n *NatsHandlerConsumer) ReceiveConfirmation(c *proximo.Confirmation) error { return nil }
func (n *NatsHandlerConsumer) Close() error                                      { return n.sub.Unsubscribe() }

type NatsHandlerProducer struct {
    Url              string
    topic            string
    sendConfirmation proximo.ConfirmationSender
    conn             *nats.Conn
}

func (n *NatsHandlerProducer) Init(topic string, publisher string, sendConfirmation proximo.ConfirmationSender) error {
    var err error
    n.topic = topic
    n.conn, err = nats.Connect(n.Url)
    if err != nil {
        return err
    }

    return err
}

func (n *NatsHandlerProducer) ReceiveMessage(msg *proximo.Message) error {
    err := n.conn.Publish(n.topic, msg.GetData())
    n.sendConfirmation(&proximo.Confirmation{msg.GetId()})
    return err
}

func (n *NatsHandlerProducer) Close() error {
    n.conn.Close()
    return nil
}
mjgarton commented 6 years ago

Can you describe the problem that you are trying to solve please?

Or perhaps you have measured something as being slow due to excessive goroutines, in which case do you have benchmarks?

thesyncim commented 6 years ago

Sry if i was not clear. This is just a alternative way to solve the same problem. My main argument here (which is arguable) is an easy way to implement new backends as we can see in the example backend. Performance is not my motivation here. Just a collateral effect.