ThreeDotsLabs / watermill

Building event-driven applications the easy way in Go.
https://watermill.io
MIT License
7.75k stars 406 forks source link

The publish signature does not allow for a context.Context to be passed #445

Open advdv opened 5 months ago

advdv commented 5 months ago

I was evaluating watermill today and I was surprised to learn that the Publish signature does not take a context.Context. Without it, how does one cancel a publish that may be stuck on a slow/failing network request?

Let's say I my app takes a http request with a timeout of 3 seconds. In handling this request I want to publish a message to a Redis stream using a Watermill publisher. Unfortunately the Redis instance has some issues and the network is blocked and it takes 4 seconds for Redis to answer. How can I make sure the publish get's canceled?

The issue is also apparent in the redisstream source code:

// Publish publishes message to redis stream
//
// Publish is blocking and waits for redis response.
// When any of messages delivery fails - function is interrupted.
func (p *Publisher) Publish(topic string, msgs ...*message.Message) error {
    if p.closed {
        return errors.New("publisher closed")
    }

    logFields := make(watermill.LogFields, 3)
    logFields["topic"] = topic

    for _, msg := range msgs {
        logFields["message_uuid"] = msg.UUID
        p.logger.Trace("Sending message to redis stream", logFields)

        values, err := p.config.Marshaller.Marshal(topic, msg)
        if err != nil {
            return errors.Wrapf(err, "cannot marshal message %s", msg.UUID)
        }

        maxlen, ok := p.config.Maxlens[topic]
        if !ok {
            maxlen = p.config.DefaultMaxlen
        }

                // HERE: The library needs to use "context.Background" which means this XADD can hang
                // forever on network issues.
        id, err := p.client.XAdd(context.Background(), &redis.XAddArgs{
            Stream: topic,
            Values: values,
            MaxLen: maxlen,
            Approx: true,
        }).Result()
        if err != nil {
            return errors.Wrapf(err, "cannot xadd message %s", msg.UUID)
        }

        logFields["xadd_id"] = id
        p.logger.Trace("Message sent to redis stream", logFields)
    }

    return nil
}
m110 commented 4 months ago

Hey @advdv. The publish doesn't take context because it's attached to the message. See the message.SetContext() method.

You are right about redisstream, though. We probably should review all Pub/Subs. For example, watermill-amqp does it correctly: https://github.com/ThreeDotsLabs/watermill-amqp/blob/master/pkg/amqp/publisher.go#L191

advdv commented 4 months ago

Right, thank you for the response. Too bad I didn't notice this, I remember looking at the NATS and Redis implementation to check how is done. Guess I picked the wrong ones. I think maybe the Kafka producer has the same issue, but it is more complicated since it uses the Sarama sync producer which has no context support also. (I think, not a user, but I found: https://github.com/IBM/sarama/issues/1849).

I guess some of these libraries are old and pre-date the wide-spread use of context.Context? Not sure if it's actually feasible to do anything about this in the short term.