redpanda-data / redpanda

Redpanda is a streaming data platform for developers. Kafka API compatible. 10x faster. No ZooKeeper. No JVM!
https://redpanda.com
9.58k stars 582 forks source link

Consume records using chunked transfer encoding when using the REST API #4709

Open huntc opened 2 years ago

huntc commented 2 years ago

Who is this for and what problem do they have today?

A shortcoming of the pandaproxy-rest API, and related to the Kafka REST API, is the apparent inability to consume a stream of records when subscribed to a topic. Instead, the GET /consumers/{}/instances/{}/records API has to be repeatedly polled to consume new records.

In addition, a separate request is required to establish the consumer instance. These instances must subsequently be deleted once they are no longer being used by a client. Furthermore, if offsets are to be committed then they must also be posted in yet another http request.

What are the success criteria?

To be able to create an ephemeral consumer instance, commit offsets, and then subscribe to any number of topics in a single request. Each reply to this request is then to be made using a chunked transfer encoding so that the consumer can consume records as they become available.

Why is solving this problem impactful?

HTTP is arguably the most interoperable protocol we have and is ubiquitous among languages. Having HTTP become a first-class means of accessing Redpanda has the potential to allow more things to connect without the need for a native Kafka API client library.

Additional notes

I have prototyped an enhancement to the POST /consumers/{} API where optional offsets and subscriptions fields can be supplied to cause the record replies to be transferred using a chunked encoding. The API is backward-compatible with today's API.

Here is a sample curl command that will consume all events on an end-device-events topic:

curl -v \
  -d '{"subscriptions":[{"topic":"end-device-events"}]}' \
  -H "Accept: application/json" \
  -H "Content-Type: application/json" \
  http://localhost:9880/consumers/control-center

The above request may return many events. Here is a sample:

*   Trying 127.0.0.1:9880...
* Connected to localhost (127.0.0.1) port 9880 (#0)
> POST /consumers/control-center HTTP/1.1
> Host: localhost:9880
> User-Agent: curl/7.79.1
> Accept: application/json
> Content-Type: application/json
> Content-Length: 129
> 
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< Server: akka-http/10.2.9
< Date: Thu, 12 May 2022 06:38:53 GMT
< Transfer-Encoding: chunked
< Content-Type: application/json
< 
{"key":100473,"offset":81106162417742,"partition":0,"topic":"default:end-device-events","value":"cyrDFiwpxFSe6IEYczhLDoLyX9TQkST7FDQUCcKjd7RAh3ER0f/2BwePUh/kbNSPEO5uBKFefYELR7syO9aQqOjUemNG2+ag6bkN/W3JDeCmmxmnUQ=="}
{"key":100473,"offset":81106162417743,"partition":0,"topic":"default:end-device-events","value":"BmI3IAIoTLAWUxxt153wE87w017hSaPiI3E3jpUl/35aqD8LNV+ueZtOR9Pcw4hFDIMJS6ZKoTlFq3AbYuYzBG2OGwcbZuc5Ptea9u422z5iyibb"}
{"key":100473,"offset":81106162417744,"partition":0,"topic":"default:end-device-events","value":"D4onyCIDM6Pif7Rpzb8xNGChtd8w58CwPcPEkJopwx5QYEAFzAm59iG4vzE5geWXYWkoIixHkwAOwincD4+VgoOhiW/7DWX85qv0zH6xf9Od34Yu"}
<Request pauses here waiting for more records to become available>

Here is another request where a specific offset is to be committed prior to subscribing:

curl -v \
  -d '{"offsets":[{"topic":"end-device-events","partition":0,"offset":81106162417743}],"subscriptions":[{"topic":"end-device-events"}]}' \
  -H "Accept: application/json" \
  -H "Content-Type: application/json" \
  http://localhost:9880/consumers/control-center

Records are returned using the chunked transfer encoding so they may be consumed in a streaming fashion. The server does not close its half of the connection at any time.

JIRA Link: CORE-903

emaxerrno commented 2 years ago

Cc @vsaraswat

vsaraswat commented 2 years ago

@mattschumpert