Closed matzew closed 2 years ago
I use a "processor", like:
package functions;
import io.quarkus.funqy.Funq;
import io.quarkus.funqy.knative.events.CloudEvent;
import io.quarkus.funqy.knative.events.CloudEventBuilder;
import java.util.UUID;
public class Function {
@Funq
public CloudEvent<Output> function(CloudEvent<Input> input) {
// work with the received Cloud event:
// do some sort of processing with it...
System.out.println(" We got: " + input);
// create some other business object:
Output output = new Output("HEllo");
// return the object to the broker,
// wrapped as a cloud event:
CloudEvent<Output> response = CloudEventBuilder.create()
.source("/service/processors/1/random")
.type("io.openshift.smart.event.processor")
.id(UUID.randomUUID().toString())
.build(output);
System.out.println("We sent back: " + response);
return response;
}
}
and a receiver (the canonical event-display
).
Here is the manifests:
apiVersion: v1
kind: Service
metadata:
name: processor
spec:
selector:
app: processor
ports:
- port: 80
protocol: TCP
targetPort: processor
name: http
---
apiVersion: v1
kind: Pod
metadata:
name: processor
labels:
app: processor
spec:
containers:
- name: processor
image: docker.io/matzew/processor-demo:latest
imagePullPolicy: Always
ports:
- containerPort: 8080
protocol: TCP
name: processor
---
apiVersion: v1
kind: Service
metadata:
name: receiver
spec:
selector:
app: receiver
ports:
- port: 80
protocol: TCP
targetPort: receiver
name: http
---
apiVersion: v1
kind: Pod
metadata:
name: receiver
labels:
app: receiver
spec:
containers:
- name: receiver
image: quay.io/openshift-knative/knative-eventing-sources-event-display
imagePullPolicy: Always
ports:
- containerPort: 8080
protocol: TCP
name: receiver
---
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: processor
spec:
broker: my-kafkachannel-advanced-broker
filter:
attributes:
smartevent: super-duper-event-extension
subscriber:
ref:
apiVersion: v1
kind: Service
name: processor
---
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: receiver
spec:
broker: my-kafkachannel-advanced-broker
filter:
attributes:
type: io.openshift.smart.event.processor
subscriber:
ref:
apiVersion: v1
kind: Service
name: receiver
---
for the broker I use this config: https://github.com/openshift-knative/eventing-hyperfoil-benchmark/blob/main/testcases/broker/eventing-broker/broker-kc-advanced-config/100-broker.yaml
/assign
for the broker I use this config: https://github.com/openshift-knative/eventing-hyperfoil-benchmark/blob/main/testcases/broker/eventing-broker/broker-kc-advanced-config/100-broker.yaml
When I use that I get this:
❯ k get brokers -A ─╯
NAMESPACE NAME URL AGE READY REASON
default my-kafkachannel-advanced-broker 2m30s False unable to build topic config from configmap: error validating topic config from configmap invalid configuration - numPartitions: 0 - replicationFactor: 0 - bootstrapServers: [] - ConfigMap data: map[channelTemplateSpec:apiVersion: messaging.knative.dev/v1beta1
kind: KafkaChannel
spec:
numPartitions: 10
replicationFactor: 3
delivery:
retry: 12
backoffPolicy: exponential
backoffDelay: PT1S] - ConfigMap data: map[channelTemplateSpec:apiVersion: messaging.knative.dev/v1beta1
kind: KafkaChannel
spec:
numPartitions: 10
replicationFactor: 3
delivery:
retry: 12
backoffPolicy: exponential
backoffDelay: PT1S]
cc @pierDipi
I don't get it....
When I add the broker class annotation, this time I don't see any reaction in any of the controllers...
annotations:
eventing.knative.dev/broker.class: MTChannelBasedBroker
@aliok did you install it via hack/run.sh
?
If so, that does setup the Kafka
brokers to be the default:
k get cm config-br-defaults -nknative-eventing -o yaml
apiVersion: v1
data:
default-br-config: |
clusterDefault:
brokerClass: Kafka
apiVersion: v1
kind: ConfigMap
name: kafka-broker-config
namespace: knative-eventing
kind: ConfigMap
...
The referenced broker config does not have any annotation, so it assumes that the broker default is the MTChannelBasedBroker
.
On main
, the receiver
still does NOT receive a message.
But I am seeing different logs:
{"@timestamp":"2022-04-25T12:31:47.433Z","@version":"1","message":"[Consumer clientId=consumer-kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37-2, groupId=kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37] Added READ_UNCOMMITTED fetch request for partition knative-messaging-kafka.default.kafkachannel-advanced-broker-kne-trigger-1 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[my-cluster-kafka-1.my-cluster-kafka-brokers.kafka.svc:9092 (id: 1 rack: null)], epoch=0}} to node my-cluster-kafka-1.my-cluster-kafka-brokers.kafka.svc:9092 (id: 1 rack: null)","logger_name":"org.apache.kafka.clients.consumer.internals.Fetcher","thread_name":"vert.x-kafka-consumer-thread-1","level":"DEBUG","level_value":10000}
{"@timestamp":"2022-04-25T12:31:47.433Z","@version":"1","message":"[Consumer clientId=consumer-kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37-2, groupId=kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37] Built incremental fetch (sessionId=152424680, epoch=328) for node 1. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 3 partition(s)","logger_name":"org.apache.kafka.clients.FetchSessionHandler","thread_name":"vert.x-kafka-consumer-thread-1","level":"DEBUG","level_value":10000}
{"@timestamp":"2022-04-25T12:31:47.433Z","@version":"1","message":"[Consumer clientId=consumer-kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37-2, groupId=kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(knative-messaging-kafka.default.kafkachannel-advanced-broker-kne-trigger-7, knative-messaging-kafka.default.kafkachannel-advanced-broker-kne-trigger-4, knative-messaging-kafka.default.kafkachannel-advanced-broker-kne-trigger-1)) to broker my-cluster-kafka-1.my-cluster-kafka-brokers.kafka.svc:9092 (id: 1 rack: null)","logger_name":"org.apache.kafka.clients.consumer.internals.Fetcher","thread_name":"vert.x-kafka-consumer-thread-1","level":"DEBUG","level_value":10000}
{"@timestamp":"2022-04-25T12:31:47.433Z","@version":"1","message":"[Consumer clientId=consumer-kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37-2, groupId=kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=12, clientId=consumer-kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37-2, correlationId=1057) and timeout 30000 to node 1: FetchRequestData(clusterId=null, replicaId=-1, maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=152424680, sessionEpoch=328, topics=[], forgottenTopicsData=[], rackId='')","logger_name":"org.apache.kafka.clients.NetworkClient","thread_name":"vert.x-kafka-consumer-thread-1","level":"DEBUG","level_value":10000}
{"@timestamp":"2022-04-25T12:31:47.446Z","@version":"1","message":"[Consumer clientId=consumer-kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37-2, groupId=kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37] Received FETCH response from node 2 for request with header RequestHeader(apiKey=FETCH, apiVersion=12, clientId=consumer-kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37-2, correlationId=1055): FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=1809965895, responses=[])","logger_name":"org.apache.kafka.clients.NetworkClient","thread_name":"vert.x-kafka-consumer-thread-1","level":"DEBUG","level_value":10000}
{"@timestamp":"2022-04-25T12:31:47.446Z","@version":"1","message":"[Consumer clientId=consumer-kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37-2, groupId=kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37] Node 2 sent an incremental fetch response with throttleTimeMs = 0 for session 1809965895 with 0 response partition(s), 4 implied partition(s)","logger_name":"org.apache.kafka.clients.FetchSessionHandler","thread_name":"vert.x-kafka-consumer-thread-1","level":"DEBUG","level_value":10000}
{"@timestamp":"2022-04-25T12:31:47.446Z","@version":"1","message":"[Consumer clientId=consumer-kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37-2, groupId=kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37] Added READ_UNCOMMITTED fetch request for partition knative-messaging-kafka.default.kafkachannel-advanced-broker-kne-trigger-9 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[my-cluster-kafka-2.my-cluster-kafka-brokers.kafka.svc:9092 (id: 2 rack: null)], epoch=0}} to node my-cluster-kafka-2.my-cluster-kafka-brokers.kafka.svc:9092 (id: 2 rack: null)","logger_name":"org.apache.kafka.clients.consumer.internals.Fetcher","thread_name":"vert.x-kafka-consumer-thread-1","level":"DEBUG","level_value":10000}
{"@timestamp":"2022-04-25T12:31:47.446Z","@version":"1","message":"[Consumer clientId=consumer-kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37-2, groupId=kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37] Added READ_UNCOMMITTED fetch request for partition knative-messaging-kafka.default.kafkachannel-advanced-broker-kne-trigger-3 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[my-cluster-kafka-2.my-cluster-kafka-brokers.kafka.svc:9092 (id: 2 rack: null)], epoch=0}} to node my-cluster-kafka-2.my-cluster-kafka-brokers.kafka.svc:9092 (id: 2 rack: null)","logger_name":"org.apache.kafka.clients.consumer.internals.Fetcher","thread_name":"vert.x-kafka-consumer-thread-1","level":"DEBUG","level_value":10000}
{"@timestamp":"2022-04-25T12:31:47.446Z","@version":"1","message":"[Consumer clientId=consumer-kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37-2, groupId=kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37] Added READ_UNCOMMITTED fetch request for partition knative-messaging-kafka.default.kafkachannel-advanced-broker-kne-trigger-6 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[my-cluster-kafka-2.my-cluster-kafka-brokers.kafka.svc:9092 (id: 2 rack: null)], epoch=0}} to node my-cluster-kafka-2.my-cluster-kafka-brokers.kafka.svc:9092 (id: 2 rack: null)","logger_name":"org.apache.kafka.clients.consumer.internals.Fetcher","thread_name":"vert.x-kafka-consumer-thread-1","level":"DEBUG","level_value":10000}
{"@timestamp":"2022-04-25T12:31:47.446Z","@version":"1","message":"[Consumer clientId=consumer-kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37-2, groupId=kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37] Added READ_UNCOMMITTED fetch request for partition knative-messaging-kafka.default.kafkachannel-advanced-broker-kne-trigger-0 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[my-cluster-kafka-2.my-cluster-kafka-brokers.kafka.svc:9092 (id: 2 rack: null)], epoch=0}} to node my-cluster-kafka-2.my-cluster-kafka-brokers.kafka.svc:9092 (id: 2 rack: null)","logger_name":"org.apache.kafka.clients.consumer.internals.Fetcher","thread_name":"vert.x-kafka-consumer-thread-1","level":"DEBUG","level_value":10000}
{"@timestamp":"2022-04-25T12:31:47.446Z","@version":"1","message":"[Consumer clientId=consumer-kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37-2, groupId=kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37] Built incremental fetch (sessionId=1809965895, epoch=328) for node 2. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 4 partition(s)","logger_name":"org.apache.kafka.clients.FetchSessionHandler","thread_name":"vert.x-kafka-consumer-thread-1","level":"DEBUG","level_value":10000}
{"@timestamp":"2022-04-25T12:31:47.446Z","@version":"1","message":"[Consumer clientId=consumer-kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37-2, groupId=kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(knative-messaging-kafka.default.kafkachannel-advanced-broker-kne-trigger-9, knative-messaging-kafka.default.kafkachannel-advanced-broker-kne-trigger-3, knative-messaging-kafka.default.kafkachannel-advanced-broker-kne-trigger-6, knative-messaging-kafka.default.kafkachannel-advanced-broker-kne-trigger-0)) to broker my-cluster-kafka-2.my-cluster-kafka-brokers.kafka.svc:9092 (id: 2 rack: null)","logger_name":"org.apache.kafka.clients.consumer.internals.Fetcher","thread_name":"vert.x-kafka-consumer-thread-1","level":"DEBUG","level_value":10000}
{"@timestamp":"2022-04-25T12:31:47.446Z","@version":"1","message":"[Consumer clientId=consumer-kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37-2, groupId=kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=12, clientId=consumer-kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37-2, correlationId=1058) and timeout 30000 to node 2: FetchRequestData(clusterId=null, replicaId=-1, maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=1809965895, sessionEpoch=328, topics=[], forgottenTopicsData=[], rackId='')","logger_name":"org.apache.kafka.clients.NetworkClient","thread_name":"vert.x-kafka-consumer-thread-1","level":"DEBUG","level_value":10000}
{"@timestamp":"2022-04-25T12:31:47.763Z","@version":"1","message":"[Consumer clientId=consumer-kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37-2, groupId=kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37] Sending Heartbeat request with generation 1 and member id consumer-kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37-2-9b1f68d8-0ed4-47b0-b6e6-bbb6d7747979 to coordinator my-cluster-kafka-1.my-cluster-kafka-brokers.kafka.svc:9092 (id: 2147483646 rack: null)","logger_name":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","thread_name":"kafka-coordinator-heartbeat-thread | kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37","level":"DEBUG","level_value":10000}
{"@timestamp":"2022-04-25T12:31:47.764Z","@version":"1","message":"[Consumer clientId=consumer-kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37-2, groupId=kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37] Sending HEARTBEAT request with header RequestHeader(apiKey=HEARTBEAT, apiVersion=4, clientId=consumer-kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37-2, correlationId=1059) and timeout 30000 to node 2147483646: HeartbeatRequestData(groupId='kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37', generationId=1, memberId='consumer-kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37-2-9b1f68d8-0ed4-47b0-b6e6-bbb6d7747979', groupInstanceId=null)","logger_name":"org.apache.kafka.clients.NetworkClient","thread_name":"kafka-coordinator-heartbeat-thread | kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37","level":"DEBUG","level_value":10000}
{"@timestamp":"2022-04-25T12:31:47.772Z","@version":"1","message":"[Consumer clientId=consumer-kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37-2, groupId=kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37] Received HEARTBEAT response from node 2147483646 for request with header RequestHeader(apiKey=HEARTBEAT, apiVersion=4, clientId=consumer-kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37-2, correlationId=1059): HeartbeatResponseData(throttleTimeMs=0, errorCode=0)","logger_name":"org.apache.kafka.clients.NetworkClient","thread_name":"vert.x-kafka-consumer-thread-1","level":"DEBUG","level_value":10000}
{"@timestamp":"2022-04-25T12:31:47.773Z","@version":"1","message":"[Consumer clientId=consumer-kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37-2, groupId=kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37] Received successful Heartbeat response","logger_name":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","thread_name":"vert.x-kafka-consumer-thread-1","level":"DEBUG","level_value":10000}
{"@timestamp":"2022-04-25T12:31:47.935Z","@version":"1","message":"[Consumer clientId=consumer-kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37-2, groupId=kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37] Received FETCH response from node 1 for request with header RequestHeader(apiKey=FETCH, apiVersion=12, clientId=consumer-kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37-2, correlationId=1057): FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=152424680, responses=[])","logger_name":"org.apache.kafka.clients.NetworkClient","thread_name":"vert.x-kafka-consumer-thread-1","level":"DEBUG","level_value":10000}
{"@timestamp":"2022-04-25T12:31:47.936Z","@version":"1","message":"[Consumer clientId=consumer-kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37-2, groupId=kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37] Node 1 sent an incremental fetch response with throttleTimeMs = 0 for session 152424680 with 0 response partition(s), 3 implied partition(s)","logger_name":"org.apache.kafka.clients.FetchSessionHandler","thread_name":"vert.x-kafka-consumer-thread-1","level":"DEBUG","level_value":10000}
{"@timestamp":"2022-04-25T12:31:47.937Z","@version":"1","message":"[Consumer clientId=consumer-kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37-2, groupId=kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37] Received FETCH response from node 0 for request with header RequestHeader(apiKey=FETCH, apiVersion=12, clientId=consumer-kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37-2, correlationId=1056): FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=1806667037, responses=[])","logger_name":"org.apache.kafka.clients.NetworkClient","thread_name":"kafka-coordinator-heartbeat-thread | kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37","level":"DEBUG","level_value":10000}
{"@timestamp":"2022-04-25T12:31:47.937Z","@version":"1","message":"[Consumer clientId=consumer-kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37-2, groupId=kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37] Node 0 sent an incremental fetch response with throttleTimeMs = 0 for session 1806667037 with 0 response partition(s), 3 implied partition(s)","logger_name":"org.apache.kafka.clients.FetchSessionHandler","thread_name":"kafka-coordinator-heartbeat-thread | kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37","level":"DEBUG","level_value":10000}
{"@timestamp":"2022-04-25T12:31:47.945Z","@version":"1","message":"[Consumer clientId=consumer-kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37-2, groupId=kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37] Added READ_UNCOMMITTED fetch request for partition knative-messaging-kafka.default.kafkachannel-advanced-broker-kne-trigger-8 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[my-cluster-kafka-0.my-cluster-kafka-brokers.kafka.svc:9092 (id: 0 rack: null)], epoch=0}} to node my-cluster-kafka-0.my-cluster-kafka-brokers.kafka.svc:9092 (id: 0 rack: null)","logger_name":"org.apache.kafka.clients.consumer.internals.Fetcher","thread_name":"vert.x-kafka-consumer-thread-1","level":"DEBUG","level_value":10000}
{"@timestamp":"2022-04-25T12:31:47.945Z","@version":"1","message":"[Consumer clientId=consumer-kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37-2, groupId=kafka.default.kafkachannel-advanced-broker-kne-trigger.2da06c26-0581-4caa-a97f-028b7f893e37] Added READ_UNCOMMITTED fetch request for partition knative-messaging-kafka.default.kafkachannel-advanced-broker-kne-trigger-7 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[my-cluster-kafka-1.my-cluster-kafka-brokers.kafka.svc:9092 (id: 1 rack: null)], epoch=0}} to node my-cluster-kafka-1.my-cluster-kafka-brokers.kafka.svc:9092 (id: 1 rack: null)","logger_name":"org.apache.kafka.clients.consumer.internals.Fetcher","thread_name":"vert.x-kafka-consumer-thread-1","level":"DEBUG","level_value":10000}
...
...
...
...
...
@aliok did you install it via hack/run.sh ? If so, that does setup the Kafka brokers to be the default:
Yes, I know that. I tried 3 things:
Something is off.
But I am now trying with OpenShift midstream 1.2
FWIW, the hack/run.sh
does not install the IMC/MTChannelBasedBroker things. I did ko apply -f
them from eventing upstream main
Multiple Content-Length values found:
I was able to reproduce the issue.
logger.log("AAAAAAAAAAAAAAAAAA", new RuntimeException())
here and saw this trace:java.lang.RuntimeException: null
at dev.knative.eventing.kafka.broker.dispatcher.impl.http.WebClientCloudEventSender.logError(WebClientCloudEventSender.java:167)
at dev.knative.eventing.kafka.broker.dispatcher.impl.http.WebClientCloudEventSender.lambda$send$2(WebClientCloudEventSender.java:125)
at io.vertx.core.impl.future.FutureImpl$2.onFailure(FutureImpl.java:117)
at io.vertx.core.impl.future.FutureImpl$ListenerArray.onFailure(FutureImpl.java:268)
at io.vertx.core.impl.future.FutureBase.emitFailure(FutureBase.java:75)
at io.vertx.core.impl.future.FutureImpl.tryFail(FutureImpl.java:230)
at io.vertx.core.impl.future.PromiseImpl.tryFail(PromiseImpl.java:23)
at io.vertx.core.impl.future.PromiseImpl.onFailure(PromiseImpl.java:54)
at io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:43)
at io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:23)
at io.vertx.ext.web.client.impl.HttpContext.handleFailure(HttpContext.java:396)
at io.vertx.ext.web.client.impl.HttpContext.execute(HttpContext.java:390)
at io.vertx.ext.web.client.impl.HttpContext.next(HttpContext.java:365)
at io.vertx.ext.web.client.impl.HttpContext.fire(HttpContext.java:332)
at io.vertx.ext.web.client.impl.HttpContext.fail(HttpContext.java:313)
at io.vertx.ext.web.client.impl.HttpContext.lambda$handleSendRequest$14(HttpContext.java:599)
at io.vertx.core.impl.future.FutureImpl$3.onFailure(FutureImpl.java:153)
at io.vertx.core.impl.future.FutureBase.lambda$emitFailure$1(FutureBase.java:69)
at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:81)
at io.vertx.core.impl.ContextImpl.execute(ContextImpl.java:260)
at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:22)
at io.vertx.core.impl.future.FutureBase.emitFailure(FutureBase.java:66)
at io.vertx.core.impl.future.FutureImpl.tryFail(FutureImpl.java:230)
at io.vertx.core.impl.future.PromiseImpl.tryFail(PromiseImpl.java:23)
at io.vertx.core.http.impl.HttpClientRequestBase.fail(HttpClientRequestBase.java:165)
at io.vertx.core.http.impl.HttpClientRequestBase.handleException(HttpClientRequestBase.java:160)
at io.vertx.core.http.impl.HttpClientRequestImpl.handleException(HttpClientRequestImpl.java:77)
at io.vertx.core.http.impl.Http1xClientConnection$StreamImpl.handleException(Http1xClientConnection.java:616)
at io.vertx.core.http.impl.Http1xClientConnection.handleException(Http1xClientConnection.java:1130)
at io.vertx.core.net.impl.VertxHandler.exceptionCaught(VertxHandler.java:136)
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireExceptionCaught(CombinedChannelDuplexHandler.java:424)
at io.netty.channel.ChannelHandlerAdapter.exceptionCaught(ChannelHandlerAdapter.java:92)
at io.netty.channel.CombinedChannelDuplexHandler$1.fireExceptionCaught(CombinedChannelDuplexHandler.java:145)
at io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:143)
at io.netty.channel.CombinedChannelDuplexHandler.exceptionCaught(CombinedChannelDuplexHandler.java:231)
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273)
at io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1377)
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
at io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:907)
at io.vertx.core.net.impl.ConnectionBase.fail(ConnectionBase.java:120)
at io.vertx.core.http.impl.Http1xClientConnection.handleMessage(Http1xClientConnection.java:678)
at io.vertx.core.net.impl.ConnectionBase.read(ConnectionBase.java:156)
at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:153)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
We're touching anything related to the content length. We need to ask the Vertx people about this.
@aliok a few questions:
failed to send event to subscriber target=http://broker-filter.knative-eventing.svc.cluster.local/triggers/default/processor
: Why is it sending to there? The service of that receiver does return a CloudEvent. (did we log the actual value of the CloudEvent, that's being failed? (e.g. is it the initial one, or the one returned)?FWIW, reply channel works (e.g. with):
apiVersion: messaging.knative.dev/v1
kind: Subscription
metadata:
name: process-sub
namespace: default
spec:
channel:
apiVersion: messaging.knative.dev/v1beta1
kind: KafkaChannel
name: process-channel
subscriber:
ref:
apiVersion: v1
kind: Service
name: processor
reply:
ref:
apiVersion: messaging.knative.dev/v1beta1
kind: KafkaChannel
name: receive-channel
Using the channel in the MT Broker, I get this dispatcher log: