Yolean / kubernetes-kafka

Kafka cluster as Kubernetes StatefulSet, plain manifests and config
Apache License 2.0
1.84k stars 734 forks source link

Error Code 50002 while trying to get records #189

Closed andy2046 closed 6 years ago

andy2046 commented 6 years ago

Hi, Got 50002 error while trying to get records, cannot see the consumer group created in kafka while doing bin/kafka-consumer-groups.sh --bootstrap-server PLAINTEXT://bootstrap.kafka:9092 --list

steps to produce the issue as shown below:

➜  curl -i -X POST -H "Content-Type: application/vnd.kafka.v2+json" -H "Accept: application/vnd.kafka.v2+json" \
    --data '{"records": [{ "key": "a2V5", "value": "Y29uZmx1ZW50" }, { "value": "bG9ncw==" }]}' \                                        
    http://$RESTIP:8082/topics/kafka-01  
HTTP/1.1 200 OK
Date: Mon, 09 Jul 2018 08:21:01 GMT
Content-Type: application/vnd.kafka.v2+json
Content-Length: 173
Server: Jetty(9.2.24.v20180105)

{"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null},{"partition":0,"offset":1,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null}%    

➜  curl -i -X POST -H "Content-Type: application/vnd.kafka.v2+json" -H "Accept: application/vnd.kafka.v2+json" \
    --data '{"name": "consumer-01", "format": "binary", "auto.offset.reset": "earliest"}' \
    http://$RESTIP:8082/consumers/consumer-group-03 
HTTP/1.1 200 OK
Date: Mon, 09 Jul 2018 08:21:32 GMT
Content-Type: application/vnd.kafka.v2+json
Content-Length: 118
Server: Jetty(9.2.24.v20180105)

{"instance_id":"consumer-01","base_uri":"http://xxx.xxx.xxx.xxx:8082/consumers/consumer-group-03/instances/consumer-01"}%                                                          

 ➜  curl -i -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["kafka-01"]}' \          
    http://$RESTIP:8082/consumers/consumer-group-03/instances/consumer-01/subscription
HTTP/1.1 204 No Content
Date: Mon, 09 Jul 2018 08:22:04 GMT
Server: Jetty(9.2.24.v20180105)

➜  curl -i -X GET -H "Accept: application/vnd.kafka.v2+json" \                                                  
    http://$RESTIP:8082/consumers/consumer-group-03/instances/consumer-01/records
HTTP/1.1 500 Internal Server Error
Date: Mon, 09 Jul 2018 08:22:19 GMT
Content-Type: application/vnd.kafka.v2+json
Content-Length: 650
Server: Jetty(9.2.24.v20180105)

{"error_code":50002,"message":"Kafka error: null io.confluent.rest.exceptions.RestServerErrorException: Kafka error: null
io.confluent.rest.exceptions.RestServerErrorException: Kafka error: null
at io.confluent.kafkarest.Errors.kafkaErrorException(Errors.java:188)
at io.confluent.kafkarest.v2.KafkaConsumerManager$1.onCompletion(KafkaConsumerManager.java:295)
at io.confluent.kafkarest.v2.KafkaConsumerReadTask.finish(KafkaConsumerReadTask.java:167)
at io.confluent.kafkarest.v2.KafkaConsumerReadTask.doPartialRead(KafkaConsumerReadTask.java:153)
at io.confluent.kafkarest.v2.KafkaConsumerWorker.run(KafkaConsumerWorker.java:112)"}   

yams file for deployment as shown below:

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: zk
  namespace: kafka
  labels:
    app: zk
spec:
  serviceName: "zk-headless-svc"
  replicas: 1
  selector:
    matchLabels:
      app: zk
  template:
    metadata:
      labels:
        app: zk
        replicas: "1"
    spec:
      containers:
      - name: k8s-zk
        imagePullPolicy: IfNotPresent
        image: confluentinc/cp-zookeeper:4.1.1
        resources:
          requests:
            memory: "2Gi"
            cpu: "500m"
          limits:
            memory: "2Gi"
            cpu: "500m"
        ports:
        - containerPort: 2181
          name: client
        - containerPort: 2888
          name: server
        - containerPort: 3888
          name: leader-election
        env:
        - name: ZOOKEEPER_CLIENT_PORT
          value: "2181"
        - name: ZOOKEEPER_TICK_TIME
          value: "2000"
        volumeMounts:
        - name: data
          mountPath: /var/lib/zookeeper
  volumeClaimTemplates:
  - metadata:
      name: data
    spec:
      storageClassName: zookeeper
      accessModes: 
        - ReadWriteOnce
      resources:
        requests:
          storage: 50Gi

---

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
  namespace: kafka
spec:
  selector:
    matchLabels:
      app: kafka
  serviceName: "broker"
  replicas: 1
  updateStrategy:
    type: OnDelete
  template:
    metadata:
      labels:
        app: kafka
      annotations:
    spec:
      terminationGracePeriodSeconds: 30
      initContainers:
      - name: init-config
        image: solsson/kafka-initutils@sha256:18bf01c2c756b550103a99b3c14f741acccea106072cd37155c6d24be4edd6e2
        env:
        - name: NODE_NAME
          valueFrom:
            fieldRef:
              fieldPath: spec.nodeName
        - name: POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: POD_NAMESPACE
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace
        command: ['/bin/bash', '/etc/kafka-configmap/init.sh']
        volumeMounts:
        - name: configmap
          mountPath: /etc/kafka-configmap
        - name: config
          mountPath: /etc/kafka
      containers:
      - name: broker
        image: solsson/kafka:1.0.1@sha256:1a4689d49d6274ac59b9b740f51b0408e1c90a9b66d16ad114ee9f7193bab111
        env:
        - name: KAFKA_LOG4J_OPTS
          value: -Dlog4j.configuration=file:/etc/kafka/log4j.properties
        ports:
        - name: inside
          containerPort: 9092
        - name: outside
          containerPort: 9094
        command:
        - ./bin/kafka-server-start.sh
        - /etc/kafka/server.properties
        resources:
          requests:
            cpu: 800m
            memory: 2048Mi
        readinessProbe:
          tcpSocket:
            port: 9092
          timeoutSeconds: 1
        volumeMounts:
        - name: config
          mountPath: /etc/kafka
        - name: data
          mountPath: /var/lib/kafka/data
      volumes:
      - name: configmap
        configMap:
          name: broker-config
      - name: config
        emptyDir: {}
  volumeClaimTemplates:
  - metadata:
      name: data
    spec:
      accessModes: [ "ReadWriteOnce" ]
      storageClassName: kafka-broker
      resources:
        requests:
          storage: 100Gi

---

apiVersion: apps/v1
kind: Deployment
metadata:
  name: rest-proxy
  namespace: kafka-test
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-rest
  template:
    metadata:
      labels:
        app: kafka-rest
    spec:
      containers:
      - name: kafka-rest
        image: solsson/kafka-cp@sha256:2797da107f477ede2e826c29b2589f99f22d9efa2ba6916b63e07c7045e15044
        env:
        - name: KAFKAREST_LOG4J_OPTS
          value: -Dlog4j.configuration=file:/etc/kafka-rest/log4j.properties
        command:
        - kafka-rest-start
        - /etc/kafka-rest/kafka-rest.properties
        readinessProbe:
          httpGet:
            path: /
            port: 8082
        livenessProbe:
          httpGet:
            path: /
            port: 8082
        ports:
        - containerPort: 8082
        volumeMounts:
        - name: config
          mountPath: /etc/kafka-rest
      volumes:
      - name: config
        configMap:
          name: rest-config

---

kind: ConfigMap
metadata:
  name: rest-config
  namespace: kafka-test
apiVersion: v1
data:
  schema-registry.properties: |-
    port=8081
    listeners=http://0.0.0.0:8081
    kafkastore.bootstrap.servers=PLAINTEXT://kafka-client-svc:9092
    kafkastore.topic=_schemas
    debug=false

    # https://github.com/Landoop/schema-registry-ui#prerequisites
    access.control.allow.methods=GET,POST,PUT,OPTIONS
    access.control.allow.origin=*

  kafka-rest.properties: |-
    #id=kafka-rest-test-server
    listeners=http://0.0.0.0:8082
    bootstrap.servers=PLAINTEXT://kafka-client-svc:9092
    # schema.registry.url=http://avro-schemas.kafka:8082

    # https://github.com/Landoop/kafka-topics-ui#common-issues
    access.control.allow.methods=GET,POST,PUT,DELETE,OPTIONS
    access.control.allow.origin=*

    zookeeper.connect=zk-client-svc:2181
    consumer.threads=20
    host.name=xxx.xxx.xxx.xxx
    consumer.request.timeout.ms=5000
    consumer.request.max.bytes=300000
    consumer.instance.timeout.ms=600000
    debug=true
    simpleconsumer.pool.timeout.ms=5000
    simpleconsumer.pool.size.max=50

    consumer.iterator.backoff.ms=1000
    consumer.iterator.timeout.ms=100

    client.zk.session.timeout.ms=600000
    client.timeout.ms=60000

  log4j.properties: |-
    log4j.rootLogger=INFO, stdout

    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n

    log4j.logger.kafka=WARN, stdout
    log4j.logger.org.apache.zookeeper=WARN, stdout
    log4j.logger.org.apache.kafka=WARN, stdout
    log4j.logger.org.I0Itec.zkclient=WARN, stdout
    log4j.additivity.kafka.server=false
    log4j.additivity.kafka.consumer.ZookeeperConsumerConnector=false

    log4j.logger.org.apache.kafka.clients.Metadata=DEBUG, stdout
    log4j.logger.org.apache.kafka.clients.consumer.internals.AbstractCoordinator=INFO, stdout
solsson commented 6 years ago

I think this issue needs more context, what you're trying to do and how it relates to this repository, and regarding the yaml please fork so I can see a proper diff.

golonzovsky commented 6 years ago

As produce request worked we can assume that cluster is healthy and connection between rest proxy and cluster is fine.

50002 is a Rest-proxy generic error code, which might be timeout. Check rest proxy logs for more details about this error.

Rest proxy might not have enough resources or request might be too big - then you might try to give it more resources, increase query timeout (with timeout query param), and maybe get a smaller chunk (with max_bytes query param)

curl -i -X GET -H "Accept: application/vnd.kafka.v2+json" \                                                  
    http://$RESTIP:8082/consumers/consumer-group-03/instances/consumer-01/records?max_bytes=3000&timeout=30000

You might want to increase timeout defaults / reduce max size of request for rest-proxy deployment as well. see docs

But I would agree that question might not have relation to this repo, and seems you have same issues with confluent images as well here and https://github.com/confluentinc/kafka-rest/issues/445

solsson commented 6 years ago

Thanks a lot @golonzovsky

andy2046 commented 6 years ago

@golonzovsky thanks for the reply, actually I did try to limit the max_bytes to 50 and timeout to 30,000 before I raised this issue, but it's still the same error

since this repo is quite active and I also use the image built by @solsson, hope to get a solution here

I am looking at the source code to find out why, pls update me if you have any suggestion on this issue

thanks a lot