knative-extensions / eventing-kafka-broker

Alternate Kafka Broker implementation.
Apache License 2.0
185 stars 117 forks source link

Kafkasource: Add message header encoding options/ Discard invalid header automatically #2732

Open chenmay0921 opened 2 years ago

chenmay0921 commented 2 years ago

Problem We are using Kafaksource to consume events from Azure event hub, we can see the message is received, converted to CloudEvent, and sending to sink Knative service. But during sending process, due to 'a header value contains a prohibited character ' error, the message is just abandoned (shown below)

I checked the source code, seems here Knative only replaceBadCharacters for key but for value it uses UTF8 to standardize. Our header value have some strange symbol like �\fNotification. which will be detected as an Error by class io.vertx.core.http.impl.HttpUtils.validateValueChar

So since the header of kafka message are various for various scenarios, can we either provide an option to drop certain headers , or anyway can skip the header validation, or instead of discarding the record, but just pass it to sink to handle.

{"@timestamp":"2022-10-17T16:50:39.12Z","@version":"1","message":"Record matched filtering topic=eh-06 partition=2 headers=[KafkaHeaderImpl{'aeg-output-event-id': �$4db321d6-5e5f-4310-a98b-6fbefb54998c}, KafkaHeaderImpl{'aeg-subscription-name': �0KNATIVE-POC-1358}, KafkaHeaderImpl{'aeg-delivery-count': �\u00010}, KafkaHeaderImpl{'aeg-data-version': �\u00011}, KafkaHeaderImpl{'aeg-metadata-version': �\u00011}, KafkaHeaderImpl{'aeg-event-type': �\fNotification}] offset=14 key=null event=CloudEvent{id='partition:2/offset:14', source=/apis/v1/namespaces/kafka-test/kafkasources/kafka-source#eh-06, type='dev.knative.kafka.event', subject='partition:2#14', time=2022-10-17T16:50:38.473Z, data=BytesCloudEventData{value=[91, 123, 34, 105, 100, 34, 58, 34, 101, 100, 54, 53, 50, 57, 101, 57, 45, 99, 98, 99, 56, 45, 52, 97, 51, 49, 45, 97, 53, 101, 48, 45, 49, 54, 50, 52, 53, 50, 56, 100, 55, 102, 97, 50, 34, 44, 34, 115, 117, 98, 106, 101, 99, 116, 34, 58, 34, 116, 101, 115, 116, 34, 44, 34, 100, 97, 116, 97, 34, 58, 123, 34, 36, 109, 101, 116, 97, 34, 58, 123, 34, 116, 114, 97, 99, 101, 112, 97, 114, 101, 110, 116, 34, 58, 34, 48, 48, 45, 56, 57, 48, 54, 48, 54, 100, 101, 49, 50, 101, 53, 102, 100, 52, 54, 97, 101, 50, 52, 51, 98, 49, 99, 99, 97, 102, 52, 102, 101, 57, 54, 45, 102, 101, 51, 53, 54, 99, 51, 50, 56, 53, 100, 102, 54, 50, 52, 57, 45, 48, 48, 124, 91, 85, 83, 93, 34, 125, 44, 34, 116, 101, 115, 116, 34, 58, 34, 116, 101, 115, 116, 34, 125, 44, 34, 101, 118, 101, 110, 116, 84, 121, 112, 101, 34, 58, 34, 72, 111, 115, 116, 101, 100, 65, 112, 112, 115, 68, 101, 109, 111, 46, 67, 111, 110, 115, 117, 109, 101, 114, 34, 44, 34, 100, 97, 116, 97, 86, 101, 114, 115, 105, 111, 110, 34, 58, 34, 49, 34, 44, 34, 109, 101, 116, 97, 100, 97, 116, 97, 86, 101, 114, 115, 105, 111, 110, 34, 58, 34, 49, 34, 44, 34, 101, 118, 101, 110, 116, 84, 105, 109, 101, 34, 58, 34, 50, 48, 50, 50, 45, 49, 48, 45, 49, 55, 84, 49, 54, 58, 53, 48, 58, 51, 54, 46, 49, 54, 51, 90, 34, 44, 34, 116, 111, 112, 105, 99, 34, 58, 34, 47, 115, 117, 98, 115, 99, 114, 105, 112, 116, 105, 111, 110, 115, 47, 101, 50, 56, 50, 53, 48, 102, 51, 45, 50, 51, 48, 55, 45, 52, 50, 57, 102, 45, 56, 53, 100, 54, 45, 101, 52, 97, 99, 53, 100, 57, 54, 55, 50, 102, 53, 47, 114, 101, 115, 111, 117, 114, 99, 101, 71, 114, 111, 117, 112, 115, 47, 80, 90, 73, 45, 71, 88, 85, 83, 45, 80, 45, 82, 71, 80, 45, 67, 67, 67, 66, 72, 45, 80, 48, 48, 50, 47, 112, 114, 111, 118, 105, 100, 101, 114, 115, 47, 77, 105, 99, 114, 111, 115, 111, 102, 116, 46, 69, 118, 101, 110, 116, 71, 114, 105, 100, 47, 100, 111, 109, 97, 105, 110, 115, 47, 103, 105, 102, 45, 101, 118, 101, 110, 116, 115, 45, 101, 103, 45, 115, 97, 110, 100, 98, 111, 120, 47, 116, 111, 112, 105, 99, 115, 47, 72, 111, 115, 116, 101, 100, 65, 112, 112, 115, 83, 104, 111, 119, 99, 97, 115, 101, 34, 125, 93]}, extensions={kafkaheaderaegmetadataversion=�\u00011, kafkaheaderaegoutputeventid=�$4db321d6-5e5f-4310-a98b-6fbefb54998c, kafkaheaderaegdataversion=�\u00011, kafkaheaderaegeventtype=�\fNotification, kafkaheaderaegdeliverycount=�\u00010, kafkaheaderaegsubscriptionname=�0KNATIVE-POC-SUBSCRIBE-TO-HOSTEDAPPSSHOWCASE-1358}}","logger_name":"dev.knative.eventing.kafka.broker.dispatcher.impl.RecordDispatcherImpl","thread_name":"vert.x-worker-thread-8","level":"DEBUG","level_value":10000,"topic":"eh-06","partition":2,"headers":[{},{},{},{},{},{}],"offset":14,"key":null,"event":{"data":{},"id":"partition:2/offset:14","source":"/apis/v1/namespaces/kafka-test/kafkasources/kafka-source#eh-06","type":"dev.knative.kafka.event","subject":"partition:2#14","time":1666025438.473000000,"dataContentType":null,"dataSchema":null,"specVersion":"V1","extensionNames":["kafkaheaderaegmetadataversion","kafkaheaderaegoutputeventid","kafkaheaderaegdataversion","kafkaheaderaegeventtype","kafkaheaderaegdeliverycount","kafkaheaderaegsubscriptionname"],"attributeNames":["subject","specversion","id","source","time","type"]}} {"@timestamp":"2022-10-17T16:50:39.121Z","@version":"1","message":"Sending event id=partition:2/offset:14 subscriberURI=http://event-display.kafka-test.svc.cluster.local retry=0","logger_name":"dev.knative.eventing.kafka.broker.dispatcher.impl.http.WebClientCloudEventSender","thread_name":"vert.x-worker-thread-8","level":"DEBUG","level_value":10000,"id":"partition:2/offset:14","subscriberURI":"http://event-display.kafka-test.svc.cluster.local","retry":0} {"@timestamp":"2022-10-17T16:50:39.122Z","@version":"1","message":"**Exception occurred, discarding the record topic=eh-06 partition=2 headers=[KafkaHeaderImpl{'aeg-output-event-id': �$4db321d6-5e5f-4310-a98b-6fbefb54998c}, KafkaHeaderImpl{'aeg-subscription-name': �0KNATIVE-POC-SUBSCRIBE-TO-HOSTEDAPPSSHOWCASE-1358}, KafkaHeaderImpl{'aeg-delivery-count': �\u00010}, KafkaHeaderImpl{'aeg-data-version': �\u00011}, KafkaHeaderImpl{'aeg-metadata-version': �\u00011}, KafkaHeaderImpl{'aeg-event-type': �\fNotification}] offset=14** event=CloudEvent{id='partition:2/offset:14', source=/apis/v1/namespaces/kafka-test/kafkasources/kafka-source#eh-06, type='dev.knative.kafka.event', subject='partition:2#14', time=2022-10-17T16:50:38.473Z, data=BytesCloudEventData{value=[91, 123, 34, 105, 100, 34, 58, 34, 101, 100, 54, 53, 50, 57, 101, 57, 45, 99, 98, 99, 56, 45, 52, 97, 51, 49, 45, 97, 53, 101, 48, 45, 49, 54, 50, 52, 53, 50, 56, 100, 55, 102, 97, 50, 34, 44, 34, 115, 117, 98, 106, 101, 99, 116, 34, 58, 34, 116, 101, 115, 116, 34, 44, 34, 100, 97, 116, 97, 34, 58, 123, 34, 36, 109, 101, 116, 97, 34, 58, 123, 34, 116, 114, 97, 99, 101, 112, 97, 114, 101, 110, 116, 34, 58, 34, 48, 48, 45, 56, 57, 48, 54, 48, 54, 100, 101, 49, 50, 101, 53, 102, 100, 52, 54, 97, 101, 50, 52, 51, 98, 49, 99, 99, 97, 102, 52, 102, 101, 57, 54, 45, 102, 101, 51, 53, 54, 99, 51, 50, 56, 53, 100, 102, 54, 50, 52, 57, 45, 48, 48, 124, 91, 85, 83, 93, 34, 125, 44, 34, 116, 101, 115, 116, 34, 58, 34, 116, 101, 115, 116, 34, 125, 44, 34, 101, 118, 101, 110, 116, 84, 121, 112, 101, 34, 58, 34, 72, 111, 115, 116, 101, 100, 65, 112, 112, 115, 68, 101, 109, 111, 46, 67, 111, 110, 115, 117, 109, 101, 114, 34, 44, 34, 100, 97, 116, 97, 86, 101, 114, 115, 105, 111, 110, 34, 58, 34, 49, 34, 44, 34, 109, 101, 116, 97, 100, 97, 116, 97, 86, 101, 114, 115, 105, 111, 110, 34, 58, 34, 49, 34, 44, 34, 101, 118, 101, 110, 116, 84, 105, 109, 101, 34, 58, 34, 50, 48, 50, 50, 45, 49, 48, 45, 49, 55, 84, 49, 54, 58, 53, 48, 58, 51, 54, 46, 49, 54, 51, 90, 34, 44, 34, 116, 111, 112, 105, 99, 34, 58, 34, 47, 115, 117, 98, 115, 99, 114, 105, 112, 116, 105, 111, 110, 115, 47, 101, 50, 56, 50, 53, 48, 102, 51, 45, 50, 51, 48, 55, 45, 52, 50, 57, 102, 45, 56, 53, 100, 54, 45, 101, 52, 97, 99, 53, 100, 57, 54, 55, 50, 102, 53, 47, 114, 101, 115, 111, 117, 114, 99, 101, 71, 114, 111, 117, 112, 115, 47, 80, 90, 73, 45, 71, 88, 85, 83, 45, 80, 45, 82, 71, 80, 45, 67, 67, 67, 66, 72, 45, 80, 48, 48, 50, 47, 112, 114, 111, 118, 105, 100, 101, 114, 115, 47, 77, 105, 99, 114, 111, 115, 111, 102, 116, 46, 69, 118, 101, 110, 116, 71, 114, 105, 100, 47, 100, 111, 109, 97, 105, 110, 115, 47, 103, 105, 102, 45, 101, 118, 101, 110, 116, 115, 45, 101, 103, 45, 115, 97, 110, 100, 98, 111, 120, 47, 116, 111, 112, 105, 99, 115, 47, 72, 111, 115, 116, 101, 100, 65, 112, 112, 115, 83, 104, 111, 119, 99, 97, 115, 101, 34, 125, 93]}, extensions={kafkaheaderaegmetadataversion=�\u00011, kafkaheaderaegoutputeventid=�$4db321d6-5e5f-4310-a98b-6fbefb54998c, kafkaheaderaegdataversion=�\u00011, kafkaheaderaegeventtype=�\fNotification, kafkaheaderaegdeliverycount=�\u00010, kafkaheaderaegsubscriptionname=�0KNATIVE-POC-SUBSCRIBE-TO-HOSTEDAPPSSHOWCASE-1358}}","logger_name":"dev.knative.eventing.kafka.broker.dispatcher.impl.RecordDispatcherImpl","thread_name":"vert.x-worker-thread-8","level":"**ERROR","level_value":40000,"stack_trace":"java.lang.IllegalArgumentException: a header value contains a prohibited character '\\f': �\fNotification\n\tat io.vertx.core.http.impl.HttpUtils.validateValueChar(HttpUtils.java:813)\n\tat** io.vertx.core.http.impl.HttpUtils.validateHeaderValue(HttpUtils.java:788)\n\tat io.vertx.core.http.impl.HttpUtils.validateHeader(HttpUtils.java:770)\n\tat io.vertx.core.http.impl.headers.HeadersMultiMap.add0(HeadersMultiMap.java:616)\n\tat io.vertx.core.http.impl.headers.HeadersMultiMap.add(HeadersMultiMap.java:117)\n\tat io.vertx.core.http.impl.headers.HeadersMultiMap.add(HeadersMultiMap.java:44)\n\tat io.cloudevents.http.vertx.impl.VertxWebClientRequestMessageWriterImpl.withContextAttribute(VertxWebClientRequestMessageWriterImpl.java:56)\n\tat io.cloudevents.http.vertx.impl.VertxWebClientRequestMessageWriterImpl.withContextAttribute(VertxWebClientRequestMessageWriterImpl.java:32)\n\tat io.cloudevents.core.impl.BaseCloudEvent.readExtensions(BaseCloudEvent.java:71)\n\tat io.cloudevents.core.v1.CloudEventV1.readContext(CloudEventV1.java:196)\n\tat io.cloudevents.core.impl.BaseCloudEvent.read(BaseCloudEvent.java:58)\n\tat io.cloudevents.rw.CloudEventReader.read(CloudEventReader.java:43)\n\tat io.cloudevents.core.message.MessageWriter.writeBinary(MessageWriter.java:71)\n\tat dev.knative.eventing.kafka.broker.dispatcher.impl.http.WebClientCloudEventSender.send(WebClientCloudEventSender.java:165)\n\tat dev.knative.eventing.kafka.broker.dispatcher.impl.http.WebClientCloudEventSender.send(WebClientCloudEventSender.java:108)\n\tat dev.knative.eventing.kafka.broker.dispatcher.impl.http.WebClientCloudEventSender.send(WebClientCloudEventSender.java:88)\n\tat dev.knative.eventing.kafka.broker.dispatcher.impl.RecordDispatcherImpl.lambda$composeSenderAndSinkHandler$10(RecordDispatcherImpl.java:379)\n\tat dev.knative.eventing.kafka.broker.dispatcher.impl.RecordDispatcherImpl.onFilterMatching(RecordDispatcherImpl.java:241)\n\tat dev.knative.eventing.kafka.broker.dispatcher.impl.RecordDispatcherImpl.onRecordReceived(RecordDispatcherImpl.java:233)\n\tat dev.knative.eventing.kafka.broker.dispatcher.impl.RecordDispatcherImpl.dispatch(RecordDispatcherImpl.java:181)\n\tat dev.knative.eventing.kafka.broker.dispatcher.impl.RecordDispatcherMutatorChain.dispatch(RecordDispatcherMutatorChain.java:57)\n\tat dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.OrderedConsumerVerticle.dispatch(OrderedConsumerVerticle.java:205)\n\tat dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.OrderedConsumerVerticle.lambda$recordsHandler$4(OrderedConsumerVerticle.java:196)\n\tat dev.knative.eventing.kafka.broker.core.OrderedAsyncExecutor.consume(OrderedAsyncExecutor.java:103)\n\tat dev.knative.eventing.kafka.broker.core.OrderedAsyncExecutor.offer(OrderedAsyncExecutor.java:92)\n\tat dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.OrderedConsumerVerticle.recordsHandler(OrderedConsumerVerticle.java:196)\n\tat io.vertx.core.impl.future.FutureImpl$1.onSuccess(FutureImpl.java:91)\n\tat io.vertx.core.impl.future.FutureImpl$ListenerArray.onSuccess(FutureImpl.java:262)\n\tat io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)\n\tat io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)\n\tat io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23)\n\tat io.vertx.core.impl.future.PromiseImpl.onSuccess(PromiseImpl.java:49)\n\tat io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:41)\n\tat io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:23)\n\tat io.vertx.kafka.client.consumer.impl.KafkaConsumerImpl.lambda$poll$16(KafkaConsumerImpl.java:685)\n\tat io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl.lambda$null$41(KafkaReadStreamImpl.java:878)\n\tat io.vertx.core.impl.AbstractContext.dispatch(AbstractContext.java:100)\n\tat io.vertx.core.impl.WorkerContext.lambda$run$1(WorkerContext.java:83)\n\tat io.vertx.core.impl.TaskQueue.run(TaskQueue.java:76)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\tat java.base/java.lang.Thread.run(Thread.java:833)\n","topic":"eh-06","partition":2,"headers":[{},{},{},{},{},{}],"offset":14,"event":{"data":{},"id":"partition:2/offset:14","source":"/apis/v1/namespaces/kafka-test/kafkasources/kafka-source#eh-06","type":"dev.knative.kafka.event","subject":"partition:2#14","time":1666025438.473000000,"dataContentType":null,"dataSchema":null,"specVersion":"V1","extensionNames":["kafkaheaderaegmetadataversion","kafkaheaderaegoutputeventid","kafkaheaderaegdataversion","kafkaheaderaegeventtype","kafkaheaderaegdeliverycount","kafkaheaderaegsubscriptionname"],"attributeNames":["subject","specversion","id","source","time","type"]}} {"@timestamp":"2022-10-17T16:50:39.226Z","@version":"1","message":"[Consumer clientId=consumer-knative-group-1, groupId=knative-group] Resuming partitions [eh-06-2, eh-06-0]","logger_name":"org.apache.kafka.clients.consumer.KafkaConsumer","thread_name":"vert.x-kafka-consumer-thread-0","level":"DEBUG","level_value":10000}

Persona: Event Consumer

Exit Criteria Able to drop certain headers or pass header validation

Time Estimate (optional): How many developer-days do you think this may take to resolve?

Additional context (optional) Add any other context about the feature request here.

pierDipi commented 2 years ago

It makes sense to me, another way is to allow sending header values base64 encoded which guarantees that the value is a valid HTTP header value.

cc @matzew @lionelvillard @aavarghese

chenmay0921 commented 2 years ago

@pierDipi thanks for your reply, just wonder will this feature be added in next version? or what is the ETA?

pierDipi commented 2 years ago

There is no ETA, contributions are welcome.

We should list the options and decide what fits best.

@matzew @lionelvillard @aavarghese any opinion?

github-actions[bot] commented 1 year ago

This issue is stale because it has been open for 90 days with no activity. It will automatically close after 30 more days of inactivity. Reopen the issue with /reopen. Mark the issue as fresh by adding the comment /remove-lifecycle stale.

pierDipi commented 1 year ago

/remove-lifecycle stale

github-actions[bot] commented 1 year ago

This issue is stale because it has been open for 90 days with no activity. It will automatically close after 30 more days of inactivity. Reopen the issue with /reopen. Mark the issue as fresh by adding the comment /remove-lifecycle stale.

pierDipi commented 1 year ago

/triage accepted

pierDipi commented 1 year ago

/remove-lifecycle stale