confluentinc / kafka-rest

Confluent REST Proxy for Kafka
https://docs.confluent.io/current/kafka-rest/docs/index.html
Other
34 stars 640 forks source link

GET /consumers/.../records is neither idempotent nor safe #582

Open TrueWill opened 5 years ago

TrueWill commented 5 years ago

https://docs.confluent.io/current/kafka-rest/api.html#get--consumers-(string-group_name)-instances-(string-instance)-records appears to have the potential for data loss. If a GET response is lost, the consumer cannot retry the request without the possibility of losing messages.

The only workaround I've found is to immediately create a new consumer instance any time a fetch times out. That would read from the last committed offset.

Proof-of-concept: (Run against https://github.com/confluentinc/cp-docker-images/tree/5.3.0-post/examples/cp-all-in-one with the appropriate topic created)

#!/bin/sh
host="http://localhost:8082"
contenttype="application/vnd.kafka.v2+json"
messagecontenttype="application/vnd.kafka.json.v2+json"
group="group1000"

echo 'Pre-created compacted topic with 1 partition, 1 replica, and 0 messages.'

echo "\nProducing 4 messages (followed by sleep)..."
curl -i -X POST -H "Content-Type: $messagecontenttype" \
    --data '{"records":[{"key": "A","value":"Value A"},{"key": "B","value":"Value B"},{"key": "C","value":"Value C"},{"key": "D","value":"Value D"}]}' \
    "$host/topics/topic-x1"

sleep 4

echo "\n\nCreating initial consumer (auto.commit.enable is false, auto.offset.reset is earliest)..."
curl -i -X POST -H "Content-Type: $contenttype" \
    -H "Accept: $contenttype" \
    --data '{"name":"instance1000","format":"json","auto.offset.reset":"earliest","auto.commit.enable":"false"}' \
    "$host/consumers/$group"

echo "\n\nSubscribing to topic..."
curl -i -X POST -H "Content-Type: $contenttype" \
    --data '{"topics":["topic-x1"]}' \
    "$host/consumers/$group/instances/instance1000/subscription"

echo "Consuming data..."
curl -i -X GET -H "Accept: $messagecontenttype" \
    "$host/consumers/$group/instances/instance1000/records"

echo "\n\n** OOPS - GET RESPONSE LOST! **"

echo "\nProducing 1 more message (followed by sleep)..."
curl -i -X POST -H "Content-Type: $messagecontenttype" \
    --data '{"records":[{"key": "E","value":"Value E"}]}' \
    "$host/topics/topic-x1"

sleep 4

echo "\n\nRetrying (_exact_ same GET request)..."
echo "Consuming data..."
curl -i -X GET -H "Accept: $messagecontenttype" \
    "$host/consumers/$group/instances/instance1000/records"

echo "\n\nCommitting offset 4 (the only one we know about)..."
curl -i -X POST -H "Content-Type: $contenttype" \
    --data '{"offsets":[{"topic":"topic-x1","partition":0,"offset":4}]}' \
    "$host/consumers/$group/instances/instance1000/offsets"

echo "\n\nUnsubscribing..."
curl -i -X DELETE -H "Accept: $contenttype" \
    "$host/consumers/$group/instances/instance1000/subscription"

echo "Closing initial consumer..."
curl -i -X DELETE -H "Accept: $contenttype" \
    "$host/consumers/$group/instances/instance1000"

echo "********\n"

echo 'Creating new consumer in same group (it will ignore auto.offset.reset since committed offsets exist)...'
curl -i -X POST -H "Content-Type: $contenttype" \
    -H "Accept: $contenttype" \
    --data '{"name":"instance10001","format":"json","auto.offset.reset":"earliest","auto.commit.enable":"false"}' \
    "$host/consumers/$group"

echo "\n\nSubscribing to topic..."
curl -i -X POST -H "Content-Type: $contenttype" \
    --data '{"topics":["topic-x1"]}' \
    "$host/consumers/$group/instances/instance10001/subscription"

echo "Consuming data..."
curl -i -X GET -H "Accept: $messagecontenttype" \
    "$host/consumers/$group/instances/instance10001/records"

echo "\n\n** NOTE - NO RECORDS FOUND! **"

echo "\n\nUnsubscribing..."
curl -i -X DELETE -H "Accept: $contenttype" \
    "$host/consumers/$group/instances/instance10001/subscription"

echo "Closing consumer..."
curl -i -X DELETE -H "Accept: $contenttype" \
    "$host/consumers/$group/instances/instance10001"

echo "Done."
TrueWill commented 5 years ago

One possible way to handle this is discussed in https://www.databasesandlife.com/idempotency/ under Alternative approaches. The client would include a random request ID with each call. The server would keep a cache keyed on client instance IDs + request ID (items could expire relatively quickly). When retrying, the client would send the same request ID; the server would get a cache hit and would respond with the contents of the cache (instead of talking to Kafka).