fission / keda-connectors

Generic connectors for Keda which can be used as worker images as part of scaleTargetRef.
Apache License 2.0
23 stars 30 forks source link

mqtrigger keda kafka net/http failure when triggering on non ascii message headers #93

Closed fishman closed 2 years ago

fishman commented 2 years ago

Fission/Kubernetes version

client: fission/core: BuildDate: "2021-11-23T09:29:28Z" GitCommit: 5349a591 Version: v1.15.0 server: fission/core: BuildDate: "2021-12-29T03:57:30Z" GitCommit: 327275d1 Version: v1.15.1

Client Version: version.Info{Major:"1", Minor:"20", GitVersion:"v1.20.12", GitCommit:"4bf2e32bb2b9fd eea19ff7cdc1fb51fb295ec407", GitTreeState:"clean", BuildDate:"2021-10-27T17:12:26Z", GoVersion:"go1$ 15.15", Compiler:"gc", Platform:"linux/amd64"} Server Version: version.Info{Major:"1", Minor:"20", GitVersion:"v1.20.9", GitCommit:"a5e4de7e277a707 bd28d448bd75de58b4f1cdc22", GitTreeState:"clean", BuildDate:"2021-11-16T01:09:55Z", GoVersion:"go1.1 5.14", Compiler:"gc", Platform:"linux/amd64"}

Kubernetes platform (e.g. Google Kubernetes Engine)

Azure Kubernetes Service

Describe the bug

When invoking a function the keda connector fails with the following error:

{"level":"error","ts":1641440497.5027132,"caller":"common@v0.0.0-20201028072024-da094250dc08/util.go:75","msg":"sending function invocation request failed","error":"Post \"http://router.fission/fission-function/hello\": net/http: invalid header field value \"\\xa1\\x011\" for key Aeg-Metadata-Version","http_endpoint":"http://router.fission/fission-function/hello","source":"kafkatest","stacktrace":"github.com/fission/keda-connectors/common.HandleHTTPRequest\n\t/go/pkg/mod/github.com/fission/keda-connectors/common@v0.0.0-20201028072024-da094250dc08/util.go:75\nmain.(*kafkaConnector).ConsumeClaim\n\t/app/main.go:224\ngithub.com/Shopify/sarama.(*consumerGroupSession).consume\n\t/go/pkg/mod/github.com/!shopify/sarama@v1.23.1/consumer_group.go:615\ngithub.com/Shopify/sarama.newConsumerGroupSession.func2\n\t/go/pkg/mod/github.com/!shopify/sarama@v1.23.1/consumer_group.go:544"}{"level":"error","ts":1641440497.502975,"caller":"common@v0.0.0-20201028072024-da094250dc08/util.go:75","msg":"sending function invocation request failed","error":"Post \"http://router.fission/fission-function/hello\": net/http: invalid header field value \"\\xa1\\fNotification\" for key Aeg-Event-Type","http_endpoint":"http://router.fission/fission-function/hello","source":"kafkatest","stacktrace":"github.com/fission/keda-connectors/common.HandleHTTPRequest\n\t/go/pkg/mod/github.com/fission/keda-connectors/common@v0.0.0-20201028072024-da094250dc08/util.go:75\nmain.(*kafkaConnector).ConsumeClaim\n\t/app/main.go:224\ngithub.com/Shopify/sarama.(*consumerGroupSession).consume\n\t/go/pkg/mod/github.com/!shopify/sarama@v1.23.1/consumer_group.go:615\ngithub.com/Shopify/sarama.newConsumerGroupSession.func2\n\t/go/pkg/mod/github.com/!shopify/sarama@v1.23.1/consumer_group.go:544"}

To Reproduce

Trigger kafka-keda with an eventhub eventgrid subscription.

The eventgrid subscription writes to eventhub when a blob is created

Expected result

The function should execute successfully

Actual result

The function fails with a net/http error due to kafka message headers having non ascii characters

github-actions[bot] commented 2 years ago

Thank you for your first issue! ✨😊

fishman commented 2 years ago

@sanketsudake I attached a file with the headers. Is there a way to disable header forwarding or should we strip unsendable characters? I don't see any documentation on what those extra characters are from Azure.

developmentdrop-1-headers-4.csv

sanketsudake commented 2 years ago

From my quick analysis, notes for debugging.

It fails here. https://github.com/fission/keda-connectors/blob/e0333454c0dd52e341a6e5503024e2b6aa1555bf/common/util.go#L75

Errors seen

{
    "level": "error",
    "ts": 1641440497.5027132,
    "caller": "common@v0.0.0-20201028072024-da094250dc08/util.go:75",
    "msg": "sending function invocation request failed",
    "error": "Post \"http://router.fission/fission-function/hello\": net/http: invalid header field value \"\\xa1\\x011\" for key Aeg-Metadata-Version",
    "http_endpoint": "http://router.fission/fission-function/hello",
    "source": "kafkatest",
    "stacktrace": "github.com/fission/keda-connectors/common.HandleHTTPRequest\n\t/go/pkg/mod/github.com/fission/keda-connectors/common@v0.0.0-20201028072024-da094250dc08/util.go:75\nmain.(*kafkaConnector).ConsumeClaim\n\t/app/main.go:224\ngithub.com/Shopify/sarama.(*consumerGroupSession).consume\n\t/go/pkg/mod/github.com/!shopify/sarama@v1.23.1/consumer_group.go:615\ngithub.com/Shopify/sarama.newConsumerGroupSession.func2\n\t/go/pkg/mod/github.com/!shopify/sarama@v1.23.1/consumer_group.go:544"
} 
{
    "level": "error",
    "ts": 1641440497.502975,
    "caller": "common@v0.0.0-20201028072024-da094250dc08/util.go:75",
    "msg": "sending function invocation request failed",
    "error": "Post \"http://router.fission/fission-function/hello\": net/http: invalid header field value \"\\xa1\\fNotification\" for key Aeg-Event-Type",
    "http_endpoint": "http://router.fission/fission-function/hello",
    "source": "kafkatest",
    "stacktrace": "github.com/fission/keda-connectors/common.HandleHTTPRequest\n\t/go/pkg/mod/github.com/fission/keda-connectors/common@v0.0.0-20201028072024-da094250dc08/util.go:75\nmain.(*kafkaConnector).ConsumeClaim\n\t/app/main.go:224\ngithub.com/Shopify/sarama.(*consumerGroupSession).consume\n\t/go/pkg/mod/github.com/!shopify/sarama@v1.23.1/consumer_group.go:615\ngithub.com/Shopify/sarama.newConsumerGroupSession.func2\n\t/go/pkg/mod/github.com/!shopify/sarama@v1.23.1/consumer_group.go:544"
}

We would need to check if need to do anything special for encoding/decoding messages received from Kafka.

I see we play with a header at this place also.

https://github.com/fission/keda-connectors/blob/e0333454c0dd52e341a6e5503024e2b6aa1555bf/kafka-http-connector/main.go#L220

Data retried from above excel

aeg-data-version | 
aeg-delivery-count | 0
aeg-event-type | Notification
aeg-metadata-version | 1
aeg-output-event-id | $9796e4a9-b63a-461d-c214-10db8576c923
aeg-subscription-name | "DEVELOPMENT-TEST-SUBSCRIPTION
fishman commented 2 years ago

@sanketsudake I found out what it is. Azure decided to encode their kafka headers with AQMP message encoding. Unfortunately these are generated by Azure with no way of changing them. The csv I posted is actually created by Conduktor so it might not be the actual bytes that are stored in kafka.

https://github.com/Azure/azure-event-hubs-for-kafka/issues/56

theoretically amqp.message.Unmarshal should be able to unmarshall. But it would require the qpid-proton library as a dependency to the container image as well as a setting to specify the encoding.

package main

import (
    "encoding/csv"
    "fmt"
    "log"
    "os"

    "github.com/apache/qpid-proton/go/pkg/amqp"
)

func main() {
    f, err := os.Open("data.csv")
    if err != nil {
        log.Fatal(err)
    }

    // remember to close the file at the end of the program
    defer f.Close()

    // read csv values using csv.Reader
    csvReader := csv.NewReader(f)
    csvReader.Comma = ';'
    data, err := csvReader.ReadAll()
    var body interface{}
    if err != nil {
        log.Fatal(err)
    }
    for _, line := range data {
        test := amqp.NewMessageWith(line)
        test.Unmarshal(&body)
        fmt.Println(body)
    }
}
sanketsudake commented 2 years ago

Thanks for your PR @fishman 🎉

The fix is available in the latest and v0.9.1 image. https://hub.docker.com/repository/docker/fission/keda-kafka-http-connector