strimzi / strimzi-kafka-bridge

An HTTP bridge for Apache Kafka®
Apache License 2.0
286 stars 119 forks source link

content-type not evaluated on every send request #803

Closed lowerorbit closed 1 year ago

lowerorbit commented 1 year ago

strimzi-kafka-bridge version: 0.25

It seems that the content-type header is not evaluated on every request. Switching content-type from application/vnd.kafka.binary.v2+json to application/vnd.kafka.json.v2+json does not change the created message content.

A request posted to the bridge via curl -X POST -H 'Content-Type: application/vnd.kafka.binary.v2+json' -d @test.json <bridge-url>/topics/<topicname> where test.json contains

{"records":[
  {"headers":[
    {"key":"test-header","value":"MS4wCg=="}
  ],
  "key": "dGVzdGtleQ==",
  "value": "c29tZXJhbmRvbXN0cmluZw=="
  }
]
}

does indeed result in an message containing the base64 decoded key and value. Submitting the same request again with an Content-Type: application/vnd.kafka.jsonv2+json leads to the same result. I would expect that the message contains the plain base64 encoded strings.

ppatierno commented 1 year ago

HI @lowerorbit , when you create a consumer, you specify the embedded format (json vs binary). Any action about consuming by asking a different embeddede format should reply with an error. Can you please provide detailed steps about reproducing what you are doing? Since the consumer creation (and how) to the consuming operations. Thanks!

lowerorbit commented 1 year ago

Hi @ppatierno, I'm only using the producer API, maybe I've not made that clear enough. In addition the behaviour is differs for an Istio vs. non-Istio case (see below).

The setup is as follows:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaBridge
metadata:
  name: bridge-dev
spec:
  replicas: 1
  bootstrapServers: <bootstrap URL>:9094
  tls:
    trustedCertificates:
    - secretName: <cert secret>
      certificate: ca.crt
  authentication:
    type: tls
    certificateAndKey:
      secretName: bridge-user
      certificate: user.crt
      key: user.key
  consumer:
    config:
      auto.offset.reset: earliest
      enable.auto.commit: true
  producer:
    config:
      acks: 1
      delivery.timeout.ms: 300000
  http:
    port: 8080
  resources:
    requests:
      cpu: 0.3
      memory: 600Mi
    limits:
      cpu: 1.0
      memory: 1200Mi
  jvmOptions:
    "-XX":
      "UseG1GC": true
      "MaxGCPauseMillis": 20
      "InitiatingHeapOccupancyPercent": 35
      "ExplicitGCInvokesConcurrent": true
  logging:
    type: inline
    loggers:
      logger.bridge.level: "INFO"
      logger.send.name: "http.openapi.operation.send"
      logger.send.level: "DEBUG"

Tests done:

Given the test.json file from above containing a base64 encoded key,value pair, two POST requests to the producer endpoint /topics/BRIDGE-TEST1 were done

  1. url -k -X POST -H 'authorization: Bearer '$JWT -H 'Content-Type: application/vnd.kafka.json.v2+json' -d @test.json https://<istio-ingress>/topics/BRIDGE-TEST1
  2. curl -k -X POST -H 'authorization: Bearer '$JWT -H 'Content-Type: application/vnd.kafka.binary.v2+json' -d @test.json https://<istio-ingress>/topics/BRIDGE-TEST1

Consuming the topic via kcat produces this result:

"c29tZXJhbmRvbXN0cmluZw=="
% Reached end of topic BRIDGE-TEST1 [0] at offset 1127
"c29tZXJhbmRvbXN0cmluZw=="
% Reached end of topic BRIDGE-TEST1 [0] at offset 1128

The kafka-bridge pod output:

2023-05-30 09:51:31 INFO  send:59 - [152151490] SEND Response: statusCode = 200, message = OK
2023-05-30 09:51:31 DEBUG send:62 - [152151490] SEND Response: headers =
2023-05-30 09:55:56 INFO  NetworkClient:937 - [AdminClient clientId=adminclient-1] Node 1 disconnected.
2023-05-30 10:00:56 INFO  NetworkClient:937 - [AdminClient clientId=adminclient-1] Node 0 disconnected.
2023-05-30 10:05:29 INFO  send:50 - [1151090466] SEND Request: from 127.0.0.6:43579, method = POST, path = /topics/BRIDGE-TEST1
2023-05-30 10:05:29 DEBUG send:54 - [1151090466] SEND Request: headers = host=<istio-gateway>
user-agent=curl/7.86.0
accept=*/*
authorization=*****
content-type=application/vnd.kafka.json.v2+json
content-length=142
x-forwarded-for=100.64.0.6
x-forwarded-proto=https
x-envoy-external-address=100.64.0.6
x-request-id=75a759f6-8c09-4786-bd54-fef39b672a1c
x-envoy-attempt-count=1
x-forwarded-client-cert=By=spiffe://cluster.local/ns/kafka-bridge-dev/sa/bridge-dev-bridge;Hash=354178fba108c4ddc8e61b20fa5a6bd307e8b17868444d41cf41e73fd55474f8;Subject="";URI=spiffe://cluster.local/ns/istio-system/sa/istio-ingressgateway-service-account
x-b3-traceid=4906a0b618bebdde55e548fa4a1b6098
x-b3-spanid=22ed776300a8e1a7
x-b3-parentspanid=55e548fa4a1b6098
x-b3-sampled=0

2023-05-30 10:05:29 INFO  send:59 - [1151090466] SEND Response: statusCode = 200, message = OK
2023-05-30 10:05:29 DEBUG send:62 - [1151090466] SEND Response: headers =
2023-05-30 10:05:29 INFO  Metadata:402 - [Producer clientId=producer-1] Resetting the last seen epoch of partition BRIDGE-TEST1-0 to 563 since the associated topicId changed from null to bVIyEn7SS7K2L3ilNELd5g
2023-05-30 10:05:56 INFO  NetworkClient:937 - [AdminClient clientId=adminclient-1] Node 2 disconnected.
2023-05-30 10:10:56 INFO  NetworkClient:937 - [AdminClient clientId=adminclient-1] Node 3 disconnected.
2023-05-30 10:12:43 INFO  send:50 - [421096329] SEND Request: from 127.0.0.6:43579, method = POST, path = /topics/BRIDGE-TEST1
2023-05-30 10:12:43 DEBUG send:54 - [421096329] SEND Request: headers = host=<istio-gateway>
user-agent=curl/7.86.0
accept=*/*
authorization=******
content-type=application/vnd.kafka.binary.v2+json
content-length=142
x-forwarded-for=100.64.0.6
x-forwarded-proto=https
x-envoy-external-address=100.64.0.6
x-request-id=c471810d-7544-4122-ac2d-a1dc6010b8de
x-envoy-attempt-count=1
x-forwarded-client-cert=By=spiffe://cluster.local/ns/kafka-bridge-dev/sa/bridge-dev-bridge;Hash=354178fba108c4ddc8e61b20fa5a6bd307e8b17868444d41cf41e73fd55474f8;Subject="";URI=spiffe://cluster.local/ns/istio-system/sa/istio-ingressgateway-service-account
x-b3-traceid=896462a71733de1b7c1f697d19a06d62
x-b3-spanid=97a4daaf8b93aeb4
x-b3-parentspanid=7c1f697d19a06d62
x-b3-sampled=0

2023-05-30 10:12:43 INFO  send:59 - [421096329] SEND Response: statusCode = 200, message = OK
2023-05-30 10:12:43 DEBUG send:62 - [421096329] SEND Response: headers =

Based on the content-type in the request I would assume that the second request contains the decoded values for the records key and value. Restarting the kafka-bridge pod and issuing request 2 first, results in a decoded key-value pair for both requests.

Crosschecking with a non-Istio setup on my workstation works as expected so that might be an Istio specific behaviour. But why? The content-type header is not modified by istio as visible in the pods output.

BR

ppatierno commented 1 year ago

I have a clue ... Istio is taking the producer connection alive after sending the message, which is something not happening with cURL command. I was able to reproduce your same behaviour by keeping or not keeping connection alive when sending the two messages one after the other.

When the producer HTTP client keeps the connection alive, it's assumed to be the same producer as before (so it's using the same corresponding Kafka producer internally) so sending always the same embedded format as value. It means going through the same internal message converter, so using always base64 or never base64 depending on which comes first and how the producer is created (for this reason you see this behaviour inverting the order). A new producer is created when it comes from a new HTTP connection created.