Open skoved opened 3 weeks ago
/assign
This feature might be similar to or combined with https://github.com/knative/eventing/issues/7704
if we allow you defining lightweight transformation on a JSON represented event using JSON path, I think, it would solve both:
ceOverrides:
jsonTransform:
- from: .data.status.failed
to: jobfailedstatus
- from: .data.status.conditions[...].type
to: completed
Thanks a bunch for accepting this request. I just wanna check if my understanding is correct. These cloudevent extensions added by jsonTransform would be able to be used with the ApiServerSource filter (#7791), right?
Yes, the idea is that when we get an event we would apply the ceOverrides
(including jsonTransform) and then pass through the defined filters
@muskan2622 since you have expressed interest in contributing, feel free to ask any questions, there are a few parts that needs to be changed as it's a relatively large feature but I'm happy help/chat/etc.
Here is a very high level idea for getting the field out of the event and then setting it as extension:
import (
"encoding/json"
"fmt"
"regexp"
cloudevents "github.com/cloudevents/sdk-go/v2"
cetest "github.com/cloudevents/sdk-go/v2/test"
"github.com/cloudevents/sdk-go/v2/types"
"k8s.io/client-go/util/jsonpath"
)
type JSONTransform struct {
From string // JSON path
To string // CE extension
Type string // CE Extension type
}
func ExampleJsonTransform() {
exampleTransform := JSONTransform{
From: ".data.status.failed",
To: "failed",
Type: "integer", // One of https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md#type-system
}
c := cetest.FullEvent()
err := c.SetData(cloudevents.ApplicationJSON, map[string]interface{}{
"status": map[string]interface{}{
"failed": 1,
},
})
if err != nil {
panic(err)
}
// Core implementation
// 1. Parse From
// 2. Find From in JSON-serialized event
// 3. Set extension based on Type
expr, err := relaxedJSONPathExpression(exampleTransform.From)
if err != nil {
panic(err)
}
jp := jsonpath.New("Parser")
if err := jp.Parse(expr); err != nil {
panic(err)
}
b, err := c.MarshalJSON()
if err != nil {
panic(err)
}
var data map[string]interface{}
err = json.Unmarshal(b, &data)
if err != nil {
panic(err)
}
results, err := jp.FindResults(data)
if err != nil {
panic(err)
}
if len(results) != 1 && len(results[0]) != 1 {
panic("expected 1 result")
}
// TODO: properly handle results and JSONTransform.Type with "results[0][0].Elem().CanInt()", "results[0][0].Elem().CanFloat()" etc
c.SetExtension(exampleTransform.To, int64(results[0][0].Elem().Float()))
b, err = c.MarshalJSON()
if err != nil {
panic(err)
}
f, err := types.ToInteger(c.Extensions()[exampleTransform.To])
if err != nil {
panic(err)
}
fmt.Println(f)
// Output: 1
}
var jsonRegexp = regexp.MustCompile(`^\{\.?([^{}]+)}$|^\.?([^{}]+)$`)
// relaxedJSONPathExpression attempts to be flexible with JSONPath expressions, it accepts:
// - metadata.name (no leading '.' or curly braces '{...}'
// - {metadata.name} (no leading '.')
// - .metadata.name (no curly braces '{...}')
// - {.metadata.name} (complete expression)
//
// And transforms them all into a valid jsonpath expression:
//
// {.metadata.name}
//
// Copied from https://github.com/kubernetes/kubectl/blob/a70106d6a8b4fc24633f7020b9fdc416648e7f22/pkg/cmd/get/customcolumn.go#L38-L67
// Copyright 2014 The Kubernetes Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
func relaxedJSONPathExpression(pathExpression string) (string, error) {
if len(pathExpression) == 0 {
return pathExpression, nil
}
submatches := jsonRegexp.FindStringSubmatch(pathExpression)
if submatches == nil {
return "", fmt.Errorf("unexpected path string, expected a 'name1.name2' or '.name1.name2' or '{name1.name2}' or '{.name1.name2}'")
}
if len(submatches) != 3 {
return "", fmt.Errorf("unexpected submatch list: %v", submatches)
}
var fieldSpec string
if len(submatches[1]) != 0 {
fieldSpec = submatches[1]
} else {
fieldSpec = submatches[2]
}
return fmt.Sprintf("{.%s}", fieldSpec), nil
}
yes sure @pierDipi .
Problem Our use case involves performing different actions based on the state of different resources on our k8s cluster.
Example: send a notification for
Job
s that have failed (ie: wherestatus.failed > 0
).We're using ApiServerSource to receive cloudevents about different types of resources on the cluster. We would like the ability to subscribe to cloudevents for resources that are in a specific state. I think this could be accomplished by adding a field to ApiServerSource that contains criteria for when a specific
spec.ceOverride.extensions
should be applied. This would allow us, when combined withspec.mode: Resource
, to use either new trigger filters or maybe even filters on ApiServerSource itself to have our sinks only receive cloudevents where an action is required.Persona: Which persona is this feature for?
Exit Criteria events meeting the specified criteria have the approriate custom attribute applied