cloudevents / sdk-go

Go SDK for CloudEvents
https://cloudevents.github.io/sdk-go/
Apache License 2.0
804 stars 217 forks source link

returning protocol.ResultACK in receiver doesn't acknowledge the event #834

Open preslavmihaylov opened 1 year ago

preslavmihaylov commented 1 year ago

Given this example within a consumer using google pubsub:

    err = cloudEventsClient.StartReceiver(ctx, func(ctx context.Context, event cloudevents.Event) protocol.Result {
        log.Infow("received event from topic", "event.id", event.Context.GetID())

        var data map[string]interface{}
        if err := event.DataAs(&data); err != nil {
            log.Errorw("error decoding data", "err", err)
        }

        log.Infow("decoded event", "event.id", event.Context.GetID(), "event.data", data)

                 // this will lead to the event being consumed by this receiver infinitely
        return protocol.ResultACK

                 // this, otoh, will consume the event once
        // return nil
    })

returning protocol.ResultACK will actually not acknowledge the message and it will be retried infinitely. Returningnil on the other hand, acknowledges the message, but this behavior feels wonky.

Is this expected?

embano1 commented 1 year ago

Before I dig deeper, can you please provide the exact module (e.g. v2) and version/commit you're using for the above code? Is there any error printed which would help me debugging the code path? For example, do you see cecontext.LoggerFrom(ctx).Warn("Error while handling a message: ", err)?

preslavmihaylov commented 1 year ago

Example receiver:

package main

import (
    "context"
    "fmt"
    "log"

    cepubsub "github.com/cloudevents/sdk-go/protocol/pubsub/v2"
    cloudevents "github.com/cloudevents/sdk-go/v2"
    "github.com/cloudevents/sdk-go/v2/event"
    "github.com/cloudevents/sdk-go/v2/protocol"
)

func receive(ctx context.Context, event event.Event) error {
    fmt.Printf("------------------ Received MSG ------------------\n")
    return protocol.ResultACK
}

func main() {
    ctx := context.Background()
    t, err := cepubsub.New(context.Background(),
        cepubsub.WithProjectID("helloworld"),
        cepubsub.AllowCreateTopic(true),
        cepubsub.AllowCreateSubscription(true),
        cepubsub.WithSubscriptionAndTopicID("helloworld", "helloworld"))
    if err != nil {
        log.Fatalf("failed to create pubsub protocol, %s", err.Error())
    }
    c, err := cloudevents.NewClient(t)

    if err != nil {
        log.Fatalf("failed to create client, %s", err.Error())
    }

    log.Println("Created client, listening...")

    if err := c.StartReceiver(ctx, receive); err != nil {
        log.Fatalf("failed to start pubsub receiver, %s", err.Error())
    }
}

Example sender:

package main

import (
    "context"
    "log"
    "os"

    cepubsub "github.com/cloudevents/sdk-go/protocol/pubsub/v2"
    cloudevents "github.com/cloudevents/sdk-go/v2"
)

// Example is a basic data struct.
type Example struct {
    Sequence int    `json:"id"`
    Message  string `json:"message"`
}

func main() {
    t, err := cepubsub.New(context.Background(),
        cepubsub.WithProjectID("helloworld"),
        cepubsub.AllowCreateTopic(true),
        cepubsub.AllowCreateSubscription(true),
        cepubsub.WithTopicID("helloworld"))
    if err != nil {
        log.Printf("failed to create pubsub transport, %s", err.Error())
        os.Exit(1)
    }
    c, err := cloudevents.NewClient(t, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
    if err != nil {
        log.Printf("failed to create client, %s", err.Error())
        os.Exit(1)
    }

    event := cloudevents.NewEvent()
    event.SetType("com.cloudevents.sample.sent")
    event.SetSource("github.com/cloudevents/sdk-go/samples/pubsub/sender/")
    _ = event.SetData("application/json", &Example{
        Sequence: 0,
        Message:  "HELLO",
    })

    if result := c.Send(context.Background(), event); cloudevents.IsUndelivered(result) {
        log.Printf("failed to send: %v", err)
        os.Exit(1)
    } else {
        log.Printf("sent, accepted: %t", cloudevents.IsACK(result))
    }

    os.Exit(0)
}

Run both while local gpubsub is running. Here's output from receiver:

$ go run *.go
2023/02/11 13:59:06 Created client, listening...
{"level":"info","ts":1676116746.776515,"logger":"fallback","caller":"v2@v2.13.0/protocol.go:195","msg":"starting subscriber for Topic \"helloworld\", Subscription \"helloworld\""}
{"level":"info","ts":1676116746.777045,"logger":"fallback","caller":"v2@v2.13.0/protocol.go:198","msg":"conn is&{true true helloworld 0x1400011e340 helloworld <nil> helloworld <nil> {0 0} <nil> <nil> <nil> false }"}
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
------------------ Received MSG ------------------
...
embano1 commented 1 year ago

So you send exactly one event and your receiver is stuck in this receive loop, right? Can you please provide detailed information on the module tag/commit used for all the imports?

embano1 commented 1 year ago

Returningnil on the other hand, acknowledges the message, but this behavior feels wonky.

Btw: I wonder why this feels wonky though? The handler is not the transport. Hence the underlying transport typically handles ACK/NACK.

preslavmihaylov commented 1 year ago

The point is that returning protocol.ResultACK doesn't actually acknowledge the message and it is retransmitted forever (as shown in final output).

here are the versions of imports I'm using, based on go.mod:

github.com/cloudevents/sdk-go/protocol/pubsub/v2 v2.13.0
github.com/cloudevents/sdk-go/v2 v2.13.0

full go.mod is here

embano1 commented 1 year ago

The point is that returning protocol.ResultACK doesn't actually acknowledge the message

Yeah, sorry (restated my question as I got confused with the other issue you have :) )

preslavmihaylov commented 1 year ago

Btw: I wonder why this feels wonky though? The handler is not the transport. Hence the underlying transport typically handles ACK/NACK.

The thing I was referring to as wonky is the fact that protocol.ResultACK results in retry, not that nil results in acknowledgement.

embano1 commented 1 year ago

The thing I was referring to as wonky is the fact that protocol.ResultACK results in retry, not that nil results in acknowledgement.

IIRC it is because protocol.ResultACK is also an error type and the code may just check for error and retry. That's why I suggest returning nil if there's no error (and let the transport handle ACK).

preslavmihaylov commented 1 year ago

yeah, I did change it to retuning nil to workaround the issue.

It just took me a while to figure out why protocol.ResultACK wasn't working, especially given this func in the library, which I assumed should be used to separate a normal error from the ResultACK error:

// IsACK true means the recipient acknowledged the event.
func IsACK(target Result) bool {
    // special case, nil target also means ACK.
    if target == nil {
        return true
    }

    return ResultIs(target, ResultACK)
}
embano1 commented 1 year ago

yeah, I did change it to retuning nil to workaround the issue.

It just took me a while to figure out why protocol.ResultACK wasn't working, especially given this func in the library, which I assumed should be used to separate a normal error from the ResultACK error:

// IsACK true means the recipient acknowledged the event.
func IsACK(target Result) bool {
  // special case, nil target also means ACK.
  if target == nil {
      return true
  }

  return ResultIs(target, ResultACK)
}

IIRC, this one is supposed to be used by the underlying protocol (senders/receivers) to make decisions about retries - not by the handler, though.

arxeiss commented 1 year ago

Still valid. Went the same path as @preslavmihaylov

I was debugging the code to see where the issue happen.

  1. The Invoker returns error, which is handled by the defer function: https://github.com/cloudevents/sdk-go/blob/310da9018ea36b7c6faa3c06aa95c8d8f7a834db/v2/client/invoker.go#L57-L60
  2. This calls the Message Finish method, which checks err != nil and then calls Nack. https://github.com/cloudevents/sdk-go/blob/310da9018ea36b7c6faa3c06aa95c8d8f7a834db/protocol/pubsub/v2/message.go#L125-L132
embano1 commented 1 year ago

@arxeiss as discussed above what are you returning in your receiver handler?

arxeiss commented 1 year ago

I'm using protocol.ResultACK just as the owner of the ticket.

Yes, I read your message:

IIRC, this one is supposed to be used by the underlying protocol (senders/receivers) to make decisions about retries - not by the handler, though.

However, when I check documentation of StartReceiver I see this. Where return type could be protocol.Result. If we should return nil for ACK, then function declaration shouldn't support Protocol return type, which indicates that we could return protocol.ResultACK.

https://github.com/cloudevents/sdk-go/blob/310da9018ea36b7c6faa3c06aa95c8d8f7a834db/v2/client/client.go#L33-L50

embano1 commented 1 year ago

Where return type could be protocol.Result. If we should return nil for ACK, then function declaration shouldn't support Protocol return type, which indicates that we could return protocol.ResultACK.

That signature is not incorrect, but a bit misleading. If you look at what protocol.Result is, it's an (interface type) definition of type error, i.e., Go's standard convention for error handling returning an error in functions (which can be nil).

As described above, protocol.Result(N)ACK is a concrete but low-level type you shouldn't be using. Instead, if you want to use a protocol.Result in your code (instead of nil), use the protocol-specific type e.g. http.Result (where applicable).

However, and coming back to this issue, it seems the pubsub implementation has a bug here as it treats any result, incl. ACK, as error. Other protocol implementations have a different error handling strategy here and I'm not familiar with PubSub so I'll have to defer a fix to someone familiar with PubSub.

Alternatively, the workaround is to return nil instead of a Result.

arxeiss commented 1 year ago

Yes, you are right. If you search deeper, you notice it is fulfilling error interface. But if you check just doc block of function and intellisense in IDE suggest you protocol.ResultACK you might not feel the need to check it.

So in my opinion, there are 2 things to change

  1. Fix error handling taking protocol.ResultACK as not error for PubSub
  2. Fix documentation for StartReceiver and replace protocol.Result with error. Then it is clear that developer should return nil and not protocol.Result.

Do you agree?

embano1 commented 1 year ago

Yes, you are right. If you search deeper, you notice it is fulfilling error interface. But if you check just doc block of function and intellisense in IDE suggest you protocol.ResultACK you might not feel the need to check it.

Totally agree that this is misleading in our docs.

Fix error handling taking protocol.ResultACK as not error for PubSub

Yes.

Fix documentation for StartReceiver and replace protocol.Result with error. Then it is clear that developer should return nil and not protocol.Result.

Yes, with a minor adjustment to include an example such as http.Result as a valid transport response/error in the doc string. We should just be more clear here.

chapurlatn commented 2 months ago

Hello, I've encounter the same issue here. Pub/Sub does not support properly protocol.Result. Here is a PR that fixes the issue with related tests: https://github.com/cloudevents/sdk-go/pull/1064

Note the 4 cases proposed:

Function result Acked/Nacked Finish result
nil ACKED nil
any error except cloudevents.Result NACKED the original error
cloudevents.Result (param acked) ACKED nil
cloudevents.Result (param nacked) NACKED the protocol.Result