openfaas / connector-sdk

SDK for connecting events to functions
MIT License
54 stars 25 forks source link

Add support for invoking with context. #25

Closed bmcustodio closed 5 years ago

bmcustodio commented 5 years ago

This PR adds support for passing context around invocations without breaking the existing API. The main motivation for this is so that a "correlation ID" can be passed around, so that responses arriving at response subscribers can be associated with the original trigger (e.g. a message arriving at a queue which may need to be requeued in case one or more responses result in an error). Other use cases may involve setting a deadline for the actual HTTP call to the gateway, for example.

bmcustodio commented 5 years ago

@alexellis we didn't discuss this beforehand, but I think the change is small and contained enough for us to go through it here in case that's needed. 🙂

ewilde commented 5 years ago

lgtm @bmcstdio can you provide some evidence and steps of how you tested this change?

bmcustodio commented 5 years ago

Definitely! The need for this change arises in the context of a connector for AWS SQS that we are developing (slightly different from the existing one), and in which we need to be able to delete a message (or not) from the queue depending on whether a response is successful (or not). What I did to test this is putting this piece of code in the connector's main loop:

topic := "(...)"
log.Trace("processing message with id %q and topic %q", m.MessageId, topic)
ctx := context.WithValue(context.Background(), messageIdContextKey, *m.MessageId)
p.c.InvokeWithContext(ctx, topic, pointers.NewBytes([]byte(*m.Body)))

Then, I've created this ResponseReceiver:

type ResponseReceiver struct {}

func (ResponseReceiver) Response(res types.InvokerResponse) {
    h := res.Context.Value(messageIdContextKey).(string)
    if res.Error != nil {
        log.WithField("message_id", h).Infof("tester got error: %s", res.Error.Error())
    } else {
        log.WithField("message_id", h).Infof("tester got result: [%d] %s => %s (%d) bytes", res.Status, res.Topic, res.Function, len(*res.Body))
    }
}

Then I've installed figlet:

$ faas store deploy figlet --annotation topic="(...)"

Finally, I sent a message to the AWS SQS queue being used by the connector and watched the logs:

(...)
2019/08/07 10:37:39 Invoke function: figlet
2019/08/07 10:37:40 connector-sdk got result: [200] https://sqs.eu-west-1.amazonaws.com/foo/bar => figlet (108) bytes
[200] https://sqs.eu-west-1.amazonaws.com/foo/bar => figlet
  __
 / _| ___   ___
| |_ / _ \ / _ \
|  _| (_) | (_) |
|_|  \___/ \___/

INFO[0002] tester got result: [200] https://sqs.eu-west-1.amazonaws.com/foo/bar => figlet (108) bytes  message_id=8ad24e18-3dc1-467a-b493-323a2ae760cf

We can see that the value of message_id is indeed being propagated and accessed by the Response method.

alexellis commented 5 years ago

Hi @bmcstdio,

I'm not going to get to this until Friday, but I want to have time to digest it fully before commenting.

@alexellis we didn't discuss this beforehand, but I think the change is small and contained enough for us to go through it here in case that's needed.

Please can you raise an issue so that we can understand the use-case and solution? It sounds like you may have a good idea of what this looks like, so can you go ahead and write it into a brief proposal? https://github.com/openfaas/faas/blob/master/CONTRIBUTING.md#i-have-a-great-idea

Thanks for contributing 💪

Alex

alexellis commented 5 years ago

(The code change looks minimal, but I'd still like that issue if that's OK with you?)

Alex

alexellis commented 5 years ago

@bmcstdio please see the comments added prior to merge and the new issue raised. We would appreciate your input on the issue.

The following new release can be vendored or imported with go modules: https://github.com/openfaas-incubator/connector-sdk/releases/tag/0.4.3