confluentinc / kafka-rest

Confluent REST Proxy for Kafka
https://docs.confluent.io/current/kafka-rest/docs/index.html
Other
38 stars 643 forks source link

KREST-10286 Example python code for produce streaming V3 API #1163

Closed msn-tldr closed 1 year ago

msn-tldr commented 1 year ago

This will use the HTTP connection in a fully duplex mode.

Following is the output of streaming_produce_v3_main.py, writing 5 records. It demonstrates the connection is being used in full-duplex mode, i.e. record-receipt is read for 1st record even before then 2nd record is written to the connection.

Establishing connection with headers: {'Content-Type': 'application/json'}
Waiting for http-connection to be established.
Sleeping for 1 second, before producing record #1
Connection established, will read responses.
Sleeping for 1 second, before producing record #2
Writing a record #1 with json b'{"value": {"type": "STRING", "data": "value_0"}, "key": {"type": "STRING", "data": "key_0"}}'
Http-stream has status-code 200
Receipt for record #1 is ******
"{\"error_code\":200,\"cluster_id\":\"EV-5o5e3SViiGP0hpgKn1g\",\"topic_name\":\"topic_1\",\"partition_id\":0,\"offset\":468,\"timestamp\":\"2023-05-16T11:02:08.322Z\",\"key\":{\"type\":\"STRING\",\"size\":5},\"value\":{\"type\":\"STRING\",\"size\":7}}"
Sleeping for 1 second, before producing record #3
Writing a record #2 with json b'{"value": {"type": "STRING", "data": "value_1"}, "key": {"type": "STRING", "data": "key_1"}}'
Receipt for record #2 is ******
"{\"error_code\":200,\"cluster_id\":\"EV-5o5e3SViiGP0hpgKn1g\",\"topic_name\":\"topic_1\",\"partition_id\":0,\"offset\":469,\"timestamp\":\"2023-05-16T11:02:09.323Z\",\"key\":{\"type\":\"STRING\",\"size\":5},\"value\":{\"type\":\"STRING\",\"size\":7}}"
Sleeping for 1 second, before producing record #4
Writing a record #3 with json b'{"value": {"type": "STRING", "data": "value_2"}, "key": {"type": "STRING", "data": "key_2"}}'
Receipt for record #3 is ******
"{\"error_code\":200,\"cluster_id\":\"EV-5o5e3SViiGP0hpgKn1g\",\"topic_name\":\"topic_1\",\"partition_id\":0,\"offset\":470,\"timestamp\":\"2023-05-16T11:02:10.334Z\",\"key\":{\"type\":\"STRING\",\"size\":5},\"value\":{\"type\":\"STRING\",\"size\":7}}"
Sleeping for 1 second, before producing record #5
Writing a record #4 with json b'{"value": {"type": "STRING", "data": "value_3"}, "key": {"type": "STRING", "data": "key_3"}}'
Writing a record #5 with json b'{"value": {"type": "STRING", "data": "value_4"}, "key": {"type": "STRING", "data": "key_4"}}'
Receipt for record #4 is ******
"{\"error_code\":200,\"cluster_id\":\"EV-5o5e3SViiGP0hpgKn1g\",\"topic_name\":\"topic_1\",\"partition_id\":0,\"offset\":471,\"timestamp\":\"2023-05-16T11:02:11.338Z\",\"key\":{\"type\":\"STRING\",\"size\":5},\"value\":{\"type\":\"STRING\",\"size\":7}}"
Receipt for record #5 is ******
"{\"error_code\":200,\"cluster_id\":\"EV-5o5e3SViiGP0hpgKn1g\",\"topic_name\":\"topic_1\",\"partition_id\":0,\"offset\":472,\"timestamp\":\"2023-05-16T11:02:12.337Z\",\"key\":{\"type\":\"STRING\",\"size\":5},\"value\":{\"type\":\"STRING\",\"size\":7}}"
No more records to produce, exiting __record_generator
Done producing-records, exiting __produce_records

Vs the this example(&output) demonstrate that idiomatic http request-response is half-duplex, i.e all records must be written to the wire, and then only record-receipts can be read for all records. https://github.com/confluentinc/kafka-rest/pull/1164#issue-1711773677

trnguyencflt commented 1 year ago

@msn-tldr what is the motivation of copying the code from http_parser? Why can't we use the library directly?

Could you also share the original problem (with example code) that we encounter that requires us to go to low level handling of socket?

msn-tldr commented 1 year ago

@trnguyencflt Thanks for suggesting http.client stdlib package. I had looked at the popular python http-libs like urllib3, aiohttp. Both had used their own version of HttpResponse with their own custom-parsing logic of chunks, instead of using http.client. This led me to be believe python's stdlib doesn't expose "public" modules for low-level http-parsing. Hence I looked into other open-source low-level http-parsers. But I am pleasantly surprised that it does! and this significantly reduced the LOC of the example.

msn-tldr commented 1 year ago

The CI job failure is due to missing downstream dep, which is unrelated


 [ERROR] Failed to execute goal on project control-center: Could not resolve dependencies for project io.confluent.controlcenter:control-center:jar:7.5.0-99999: Could not find artifact io.confluent:ce-kafka-rest-extensions:jar:7.5.0-99999 in confluent-codeartifact-central (https://confluent-519856050701.dp.confluent.io/maven/maven-public/) -> [Help 1]