ThreeDotsLabs / watermill

Building event-driven applications the easy way in Go.
https://watermill.io
MIT License
7.84k stars 409 forks source link

Replace original context in pub/sub GoChannel (SendToSubscriber) #464

Open bi0dread opened 3 months ago

bi0dread commented 3 months ago

Hi you have replaced original msg context with msgToSend.SetContext(ctx) and we lost the original context! so to speak :

when we call msg.Copy() we don't set original context on new msg

func (s *subscriber) sendMessageToSubscriber(msg *message.Message, logFields watermill.LogFields) {
    s.sending.Lock()
    defer s.sending.Unlock()

    ctx, cancelCtx := context.WithCancel(s.ctx)
    defer cancelCtx()

SendToSubscriber:
    for {
        // copy the message to prevent ack/nack propagation to other consumers
        // also allows to make retries on a fresh copy of the original message
        msgToSend := msg.Copy()
        msgToSend.SetContext(ctx)

        s.logger.Trace("Sending msg to subscriber", logFields)

        if s.closed {
            s.logger.Info("Pub/Sub closed, discarding msg", logFields)
            return
        }

        select {
        case s.outputChannel <- msgToSend:
            s.logger.Trace("Sent message to subscriber", logFields)
        case <-s.closing:
            s.logger.Trace("Closing, message discarded", logFields)
            return
        }

        select {
        case <-msgToSend.Acked():
            s.logger.Trace("Message acked", logFields)
            return
        case <-msgToSend.Nacked():
            s.logger.Trace("Nack received, resending message", logFields)
            continue SendToSubscriber
        case <-s.closing:
            s.logger.Trace("Closing, message discarded", logFields)
            return
        }
    }
}
m110 commented 3 months ago

Hey @bi0dread. This is to maintain the same behavior as other Pubs/Subs. Other backends transfer the message over the network, so they can't keep the context. I guess we could make it an option in the config to keep the context, though, as a special case.

yashb042 commented 3 months ago

@m110 This issue is what I am also facing. At least in the local pub-sub case, the context must be propagated (even though it doesn't match the behaviour of other pub-subs.

Let me know if you want me to add this feature and push.

m110 commented 3 months ago

Hey @yashb042. I think it makes sense, just please keep it as a config option with proper disclaimer that it doesn't match the generic behavior. :)

yashb042 commented 2 months ago

Okay, picking this up

yashb042 commented 2 months ago

https://github.com/ThreeDotsLabs/watermill/pull/487/files

Adding test cases soon Please let me know if the variable addition is okay. Have not added extra variables in Message struct, because the Message struct is used by every publisher-subscriber

yashb042 commented 2 months ago

Added test cases as well, please review.

Couldn't test the func - TestMessageCtx in test_pubsub.go It's mentioned that "ExactlyOnceDelivery test is not supported yet"

Please let me know if anything to be done there