confluentinc / kafka-rest

Confluent REST Proxy for Kafka
https://docs.confluent.io/current/kafka-rest/docs/index.html
Other
42 stars 645 forks source link

Consumer call needs to happen twice to receive recently produced messages #432

Open aaronjwhiteside opened 6 years ago

aaronjwhiteside commented 6 years ago

The following is the "on the wire" interaction between our REST client and the Kafka REST Proxy.

A summary of the operations performed is:

  1. Start the various docker images in the right order (wait for them to finish starting up)
  2. Produce a message to the topic "pubsub"
  3. Create consumer
  4. Subscribe consumer to the topic "pubsub"
  5. Try and consume, but receive nothing. (Note: increase or defaulting the timeout value has no effect other than how long we wait until we try and consume again)
  6. Immediately try and consume again, this time we actually receive the message.

We have tried creating and subscribing the consumer before producing the message then trying to consume, but it makes no difference, in all cases we have tried we need to try and consume the topic twice to receive the message on the second attempt.

I can provide the output from all three docker containers if required.

2018-05-09 20:41:33.957 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-0 >> "POST /topics/pubsub HTTP/1.1[\r][\n]"
2018-05-09 20:41:33.957 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-0 >> "Accept: application/vnd.kafka.v2+json[\r][\n]"
2018-05-09 20:41:33.957 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-0 >> "User-Agent: KafkaRestProxyClient/1.0-SNAPSHOT Jersey/2.26 Apache-HttpClient/4.5.5 Java/1.8.0_141[\r][\n]"
2018-05-09 20:41:33.957 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-0 >> "Content-Type: application/vnd.kafka.binary.v2+json[\r][\n]"
2018-05-09 20:41:33.957 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-0 >> "Transfer-Encoding: chunked[\r][\n]"
2018-05-09 20:41:33.957 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-0 >> "Host: localhost:54987[\r][\n]"
2018-05-09 20:41:33.958 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-0 >> "Connection: Keep-Alive[\r][\n]"
2018-05-09 20:41:33.958 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-0 >> "Accept-Encoding: gzip,deflate[\r][\n]"
2018-05-09 20:41:33.958 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-0 >> "[\r][\n]"
2018-05-09 20:41:33.958 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-0 >> "2a[\r][\n]"
2018-05-09 20:41:33.958 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-0 >> "{"records":[{"value":"aGVsbG8gd29ybGQ="}]}[\r][\n]"
2018-05-09 20:41:33.959 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-0 >> "0[\r][\n]"
2018-05-09 20:41:33.959 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-0 >> "[\r][\n]"
2018-05-09 20:41:34.929 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-0 << "HTTP/1.1 200 OK[\r][\n]"
2018-05-09 20:41:34.929 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-0 << "Date: Thu, 10 May 2018 03:41:34 GMT[\r][\n]"
2018-05-09 20:41:34.929 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-0 << "Content-Type: application/vnd.kafka.v2+json[\r][\n]"
2018-05-09 20:41:34.929 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-0 << "Content-Length: 115[\r][\n]"
2018-05-09 20:41:34.930 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-0 << "Server: Jetty(9.2.24.v20180105)[\r][\n]"
2018-05-09 20:41:34.930 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-0 << "[\r][\n]"
2018-05-09 20:41:34.930 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-0 << "{"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null}"

2018-05-09 20:41:34.986 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-1 >> "POST /consumers/default HTTP/1.1[\r][\n]"
2018-05-09 20:41:34.986 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-1 >> "Accept: application/vnd.kafka.v2+json[\r][\n]"
2018-05-09 20:41:34.986 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-1 >> "User-Agent: KafkaRestProxyClient/1.0-SNAPSHOT Jersey/2.26 Apache-HttpClient/4.5.5 Java/1.8.0_141[\r][\n]"
2018-05-09 20:41:34.987 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-1 >> "Content-Type: application/vnd.kafka.v2+json[\r][\n]"
2018-05-09 20:41:34.987 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-1 >> "Transfer-Encoding: chunked[\r][\n]"
2018-05-09 20:41:34.987 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-1 >> "Host: localhost:54987[\r][\n]"
2018-05-09 20:41:34.987 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-1 >> "Connection: Keep-Alive[\r][\n]"
2018-05-09 20:41:34.987 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-1 >> "Accept-Encoding: gzip,deflate[\r][\n]"
2018-05-09 20:41:34.987 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-1 >> "[\r][\n]"
2018-05-09 20:41:34.987 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-1 >> "47[\r][\n]"
2018-05-09 20:41:34.987 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-1 >> "{"name":"my-consumer","format":"binary","auto.offset.reset":"earliest"}[\r][\n]"
2018-05-09 20:41:34.987 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-1 >> "0[\r][\n]"
2018-05-09 20:41:34.988 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-1 >> "[\r][\n]"
2018-05-09 20:41:35.125 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-1 << "HTTP/1.1 200 OK[\r][\n]"
2018-05-09 20:41:35.125 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-1 << "Date: Thu, 10 May 2018 03:41:35 GMT[\r][\n]"
2018-05-09 20:41:35.126 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-1 << "Content-Type: application/vnd.kafka.v2+json[\r][\n]"
2018-05-09 20:41:35.126 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-1 << "Content-Length: 105[\r][\n]"
2018-05-09 20:41:35.126 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-1 << "Server: Jetty(9.2.24.v20180105)[\r][\n]"
2018-05-09 20:41:35.126 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-1 << "[\r][\n]"
2018-05-09 20:41:35.126 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-1 << "{"instance_id":"my-consumer","base_uri":"http://kafka-rest:8080/consumers/default/instances/my-consumer"}"

2018-05-09 20:41:35.134 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 >> "POST /consumers/default/instances/my-consumer/subscription HTTP/1.1[\r][\n]"
2018-05-09 20:41:35.134 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 >> "Accept: application/vnd.kafka.v2+json[\r][\n]"
2018-05-09 20:41:35.134 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 >> "User-Agent: KafkaRestProxyClient/1.0-SNAPSHOT Jersey/2.26 Apache-HttpClient/4.5.5 Java/1.8.0_141[\r][\n]"
2018-05-09 20:41:35.134 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 >> "Content-Type: application/vnd.kafka.binary.v2+json[\r][\n]"
2018-05-09 20:41:35.134 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 >> "Transfer-Encoding: chunked[\r][\n]"
2018-05-09 20:41:35.134 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 >> "Host: localhost:54987[\r][\n]"
2018-05-09 20:41:35.134 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 >> "Connection: Keep-Alive[\r][\n]"
2018-05-09 20:41:35.134 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 >> "Accept-Encoding: gzip,deflate[\r][\n]"
2018-05-09 20:41:35.135 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 >> "[\r][\n]"
2018-05-09 20:41:35.135 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 >> "15[\r][\n]"
2018-05-09 20:41:35.135 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 >> "{"topics":["pubsub"]}[\r][\n]"
2018-05-09 20:41:35.135 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 >> "0[\r][\n]"
2018-05-09 20:41:35.135 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 >> "[\r][\n]"
2018-05-09 20:41:35.153 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 << "HTTP/1.1 204 No Content[\r][\n]"
2018-05-09 20:41:35.153 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 << "Date: Thu, 10 May 2018 03:41:35 GMT[\r][\n]"
2018-05-09 20:41:35.153 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 << "Server: Jetty(9.2.24.v20180105)[\r][\n]"
2018-05-09 20:41:35.153 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 << "[\r][\n]"

2018-05-09 20:41:35.158 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 >> "GET /consumers/default/instances/my-consumer/records?timeout=1 HTTP/1.1[\r][\n]"
2018-05-09 20:41:35.158 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 >> "Accept: application/vnd.kafka.binary.v2+json[\r][\n]"
2018-05-09 20:41:35.158 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 >> "User-Agent: KafkaRestProxyClient/1.0-SNAPSHOT Jersey/2.26 Apache-HttpClient/4.5.5 Java/1.8.0_141[\r][\n]"
2018-05-09 20:41:35.158 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 >> "Host: localhost:54987[\r][\n]"
2018-05-09 20:41:35.158 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 >> "Connection: Keep-Alive[\r][\n]"
2018-05-09 20:41:35.159 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 >> "Accept-Encoding: gzip,deflate[\r][\n]"
2018-05-09 20:41:35.159 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 >> "[\r][\n]"
2018-05-09 20:41:38.723 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 << "HTTP/1.1 200 OK[\r][\n]"
2018-05-09 20:41:38.723 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 << "Date: Thu, 10 May 2018 03:41:35 GMT[\r][\n]"
2018-05-09 20:41:38.723 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 << "Content-Type: application/vnd.kafka.binary.v2+json[\r][\n]"
2018-05-09 20:41:38.723 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 << "Content-Length: 2[\r][\n]"
2018-05-09 20:41:38.723 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 << "Server: Jetty(9.2.24.v20180105)[\r][\n]"
2018-05-09 20:41:38.724 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 << "[\r][\n]"
2018-05-09 20:41:38.724 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 << "[]"

2018-05-09 20:41:38.735 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 >> "GET /consumers/default/instances/my-consumer/records?timeout=1000 HTTP/1.1[\r][\n]"
2018-05-09 20:41:38.736 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 >> "Accept: application/vnd.kafka.binary.v2+json[\r][\n]"
2018-05-09 20:41:38.736 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 >> "User-Agent: KafkaRestProxyClient/1.0-SNAPSHOT Jersey/2.26 Apache-HttpClient/4.5.5 Java/1.8.0_141[\r][\n]"
2018-05-09 20:41:38.736 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 >> "Host: localhost:54987[\r][\n]"
2018-05-09 20:41:38.736 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 >> "Connection: Keep-Alive[\r][\n]"
2018-05-09 20:41:38.736 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 >> "Accept-Encoding: gzip,deflate[\r][\n]"
2018-05-09 20:41:38.736 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 >> "[\r][\n]"
2018-05-09 20:41:39.747 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 << "HTTP/1.1 200 OK[\r][\n]"
2018-05-09 20:41:39.748 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 << "Date: Thu, 10 May 2018 03:41:39 GMT[\r][\n]"
2018-05-09 20:41:39.748 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 << "Content-Type: application/vnd.kafka.binary.v2+json[\r][\n]"
2018-05-09 20:41:39.748 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 << "Content-Length: 83[\r][\n]"
2018-05-09 20:41:39.748 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 << "Server: Jetty(9.2.24.v20180105)[\r][\n]"
2018-05-09 20:41:39.748 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 << "[\r][\n]"
2018-05-09 20:41:39.749 DEBUG 21787 --- [           main] o.a.h.wire                               : http-outgoing-2 << "[{"topic":"pubsub","key":null,"value":"aGVsbG8gd29ybGQ=","partition":0,"offset":0}]"

Using the official docker images and the docker-maven-plugin to start them as part of a functional test. The behavior was observed with both versions 4.1.0 and 4.0.0.

            <plugin>
                <groupId>io.fabric8</groupId>
                <artifactId>docker-maven-plugin</artifactId>

                <configuration>
                    <!--suppress MavenModelInspection -->
                    <skip>${skipITs}</skip>
                    <allContainers>true</allContainers>
                    <removeVolumes>true</removeVolumes>
                    <!--<startParallel>true</startParallel>-->

                    <images>
                        <image>
                            <alias>zookeeper</alias>
                            <name>confluentinc/cp-zookeeper:${confluent-docker-image.version}</name>
                            <run>
                                <env>
                                    <ZOOKEEPER_CLIENT_PORT>2181</ZOOKEEPER_CLIENT_PORT>
                                    <ZOOKEEPER_TICK_TIME>1000</ZOOKEEPER_TICK_TIME>
                                </env>
                                <wait>
                                    <log>binding to port</log>
                                    <time>60000</time>
                                </wait>
                            </run>
                        </image>

                        <image>
                            <alias>kafka</alias>
                            <name>confluentinc/cp-kafka:${confluent-docker-image.version}</name>
                            <run>
                                <dependsOn>
                                    <container>zookeeper</container>
                                </dependsOn>
                                <links>
                                    <link>zookeeper</link>
                                </links>
                                <hostname>kafka</hostname>

                                <env>
                                    <KAFKA_BROKER_ID>1</KAFKA_BROKER_ID>
                                    <KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS>10</KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS>
                                    <KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR>1</KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR>
                                    <KAFKA_ZOOKEEPER_CONNECT>zookeeper:2181</KAFKA_ZOOKEEPER_CONNECT>

                                    <KAFKA_LISTENERS>PLAINTEXT://0.0.0.0:9092</KAFKA_LISTENERS>
                                    <KAFKA_ADVERTISED_LISTENERS>PLAINTEXT://kafka:9092</KAFKA_ADVERTISED_LISTENERS>
                                </env>

                                <wait>
                                    <log>.* Controller 1 connected to .* \(kafka\.controller\.RequestSendThread\)</log>
                                    <time>60000</time>
                                </wait>
                            </run>
                        </image>

                        <image>
                            <alias>kafka-rest</alias>
                            <name>confluentinc/cp-kafka-rest:${confluent-docker-image.version}</name>
                            <run>
                                <dependsOn>
                                    <container>zookeeper</container>
                                    <container>kafka</container>
                                </dependsOn>
                                <links>
                                    <link>zookeeper</link>
                                    <link>kafka</link>
                                </links>
                                <hostname>kafka-rest</hostname>

                                <env>
                                    <KAFKA_REST_ZOOKEEPER_CONNECT>zookeeper:2181</KAFKA_REST_ZOOKEEPER_CONNECT>
                                    <KAFKA_REST_LISTENERS>http://0.0.0.0:8080</KAFKA_REST_LISTENERS>
                                    <KAFKA_REST_HOST_NAME>kafka-rest</KAFKA_REST_HOST_NAME>
                                </env>

                                <ports>
                                    <port>${kafkarest.port}:8080</port>
                                </ports>

                                <wait>
                                    <log>Server started, listening for requests</log>
                                    <time>60000</time>
                                </wait>
                            </run>
                        </image>
                    </images>
                </configuration>

                <executions>
                    <!-- Start all the containers -->
                    <execution>
                        <id>start-containers</id>
                        <phase>pre-integration-test</phase>
                        <goals>
                            <goal>start</goal>
                        </goals>
                    </execution>

                    <!-- Stop everything -->
                    <execution>
                        <id>stop-containers</id>
                        <phase>post-integration-test</phase>
                        <goals>
                            <goal>stop</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
ybyzek commented 6 years ago

I have also encountered the same issue in the demo https://docs.confluent.io/current/tutorials/cp-demo/docs/index.html#schema-registry-and-rest-proxy

ConfluentCasey commented 6 years ago

It's not an issue with docker containers. I recreated this with version Kafka version : 1.1.0-cp1 directly on my laptop.

Here are my steps:

  1. Restart rest-proxy
  2. Create new consumer, and consumer instance with "auto.offset.reset": "earliest"
  3. Subscribe to the Consumer instance
  4. GET /records, receive nothing ( with max_bytes specified, as that is part of my use case. ) a. first GET /records can occur immediately after the subscription or in 10 minutes without change in behavior. b. Periodically I have noticed that two empty GET /records are returned
  5. Log says that after the GET /records, the offsets for the partitions are being reset to 0.
  6. Next GET returns data,
aaron-comyn commented 6 years ago

We're experiencing the same issue: first-time call to "consume" returns no records, a second call gets the records as expected.

This has only been tested with "auto.offet.reset" set to "earliest". The container logs for the REST proxy seem to show that the consumer gets connected to a broker on the first call, and then returns data on successive calls.

[2018-05-31 09:22:28,317] INFO [Consumer clientId=consumer-23, groupId=testing_consumation8] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:336)

Setup: a fresh install in a kubernetes cluster (based on https://github.com/Yolean/kubernetes-kafka), kafka v 1.1, rest proxy v 3.2.2 or 3.3 .

scotthez commented 5 years ago

I am also experiencing this issue. Any suggestions to what the fix is?

kaushiksrinivas commented 4 years ago

Upgrade the kafka to 5.3.0 confluent kafka version. The issue is not observed in this kafka release with kafka rest proxy.

ybyzek commented 4 years ago

Based on some limited testing, it seems that this issue may occur if the consumer instance & subscription are created after producing to the topic. On the other hand, if the consumer instance & subscription are created before producing to the topic, then a single consume command appears to work.

rs199483 commented 3 years ago

We are on version 6.0.1 and this behavior is still happening. Exactly the same thing as ybyzek described above. (Sep 20, 2020)

dennisgermany commented 2 years ago

We just did some testing on our own and found out that: