confluentinc / schema-registry

Confluent Schema Registry for Kafka
https://docs.confluent.io/current/schema-registry/docs/index.html
Other
2.23k stars 1.12k forks source link

Unnecessary leader election in schema-registry #1714

Open haf opened 3 years ago

haf commented 3 years ago

1691 was closed prematurely. The solution is not to use stateful sets, because this deployment has no state it needs to take care of, using stateful sets introduce other problems. Also, the original topic was "during rolling deploys" — the reason for this is that:

the schema registry is super-important to have highly available (I'm not saying failover, but availability)

— simply because there are multiple concurrent writers/users of this service and this service being down causes every single producing service to experience a worse SLA. Needless to say, rolling deploys don't work with stateful sets.

Background

0.0.0.0 means to listen on all available NIC:s. It's needed when you don't know ahead of time what IP your pod will get. Pod, yes, I'm deploying on Kubernetes. Each new pod gets a mostly new IP, but one that is randomly assigned.

It's therefore not possible to change away from SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081

Possible solutions:

If you, like closer suggested, simply add more statefulset replicas, what's the point of leader election anyway? We'll get reads and writes to all of the replicas, not just the leader. It also doesn't make sense why, if this is the "solution" I can't just use IP:s either. All in all, it's very unclear what the property is, that this service is trying to uphold.

The problem

It would seem that Schema Registry uses the listener address as a key to find itself by a unique IP; a faulty assumption (as per the above):

[2020-11-24 10:39:33,351] INFO Finished rebalance with leader election result: Assignment{version=1, error=1, leader='sr-1-13502021-eb25-414e-aa14-621bb61e5009', leaderIdentity=SchemaRegistryIdentity{version=1,host=kafka-schemas,port=8081,scheme=http,leaderEligibility=true}, nodes=[SchemaRegistryIdentity{version=1,host=kafka-schemas,port=8081,scheme=http,leaderEligibility=true}, SchemaRegistryIdentity{version=1,host=kafka-schemas,port=8081,scheme=http,leaderEligibility=true}]} (io.confluent.kafka.schemaregistry.leaderelector.kafka.KafkaGroupLeaderElector)
[2020-11-24 10:39:33,368] ERROR Unexpected exception in schema registry group processing thread (io.confluent.kafka.schemaregistry.leaderelector.kafka.KafkaGroupLeaderElector)
java.lang.IllegalStateException: The schema registry group contained multiple members advertising the same URL. Verify that each instance has a unique, routable listener by setting the 'listeners' configuration. This error may happen if executing in containers where the default hostname is 'localhost'.
    at io.confluent.kafka.schemaregistry.leaderelector.kafka.KafkaGroupLeaderElector.onAssigned(KafkaGroupLeaderElector.java:246)
    at io.confluent.kafka.schemaregistry.leaderelector.kafka.SchemaRegistryCoordinator.onJoinComplete(SchemaRegistryCoordinator.java:150)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:339)
    at io.confluent.kafka.schemaregistry.leaderelector.kafka.SchemaRegistryCoordinator.poll(SchemaRegistryCoordinator.java:114)
    at io.confluent.kafka.schemaregistry.leaderelector.kafka.KafkaGroupLeaderElector$1.run(KafkaGroupLeaderElector.java:200)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

This is a bug that should be fixed. Beyond failing my restarts, it will also fail rolling deployments, for no good reason.

Workaround

Wait a longish period of time until the consumer group disappears from Kafka and start schema registry again. Due to how deployments in k8s work, you also have to remove the deployment to take down the old version of the pod.

This means that you get even worse availability, because the identity takes a long time to disappear (30-60 seconds) from kafka before you can start up your service again

brinchj commented 3 years ago

Hi Henrik,

I am setting up Schema Registry and found this issue. I am wondering if you tried setting SCHEMA_REGISTRY_HOST_NAME to the pod ip, to override the localhost default value?

I've seen a couple of deployment setups configure it like so, and I am wondering if that works to address the leader election issue you are seeing, as each pod would have a unique name -- although not a DNS domain name, but a direct IP:

          - name: SCHEMA_REGISTRY_HOST_NAME
            valueFrom:
              fieldRef:
                fieldPath: status.podIP
haf commented 3 years ago

Hi @brinchj — Yes that's what I ended up with and rolling this deployment causes problems with availability, as described in this issue.

RafaelSNascimento commented 3 years ago

Hi Henrik,

I am setting up Schema Registry and found this issue. I am wondering if you tried setting SCHEMA_REGISTRY_HOST_NAME to the pod ip, to override the localhost default value?

I've seen a couple of deployment setups configure it like so, and I am wondering if that works to address the leader election issue you are seeing, as each pod would have a unique name -- although not a DNS domain name, but a direct IP:

          - name: SCHEMA_REGISTRY_HOST_NAME
            valueFrom:
              fieldRef:
                fieldPath: status.podIP

I'm using schema registry in EKS with MSK, both in same VPN. This is the best approach for this problem?

renatocron commented 2 years ago

Confirmed that the workaround of using SCHEMA_REGISTRY_HOST_NAME from status.podIP is working, and it's less ugly than dealing with ZK

For reference to future me and others, not forget to include enableServiceLinks: false on the spec

kind: Deployment
metadata:
  name: your-schema-registry
  namespace: kafka
  labels:
    app: your-schema-registry
spec:
  replicas: 2
  selector:
    matchLabels:
      app: your-schema-registry
  template:
    metadata:
      labels:
        app: your-schema-registry
    spec:
      enableServiceLinks: false
      containers:
        - name: your-schema-registry
          image: confluentinc/cp-schema-registry:7.0.1
          env:
            - name: SCHEMA_REGISTRY_HOST_NAME
              valueFrom:
                fieldRef:
                  fieldPath: status.podIP
            - name: SCHEMA_REGISTRY_LISTENERS
              value: "http://0.0.0.0:8081"
            - name: SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS
              value: "PLAINTEXT://your-kafka-kafka-0.your-kafka-kafka-brokers.kafka.svc:9092,PLAINTEXT://your-kafka-kafka-1.your-kafka-kafka-brokers.kafka.svc:9092,PLAINTEXT://your-kafka-kafka-2.your-kafka-kafka-brokers.kafka.svc:9092"
          ports:
            - containerPort: 8081
              name: schema-registry
          resources:
            requests:
              cpu: 50m
              memory: 340Mi
            limits:
              cpu: 800m
              memory: 1400Mi
---
apiVersion: v1
kind: Service
metadata:
  name: schema-registry
  namespace: kafka
spec:
  ports:
    - port: 8081
      targetPort: 8081
  selector:
    app: your-schema-registry

This the value for hosts is from the strimzi operator,

kind: Kafka
metadata:
  name: your-kafka
spec:
  kafka:
    version: 3.1.0
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
      inter.broker.protocol.version: "3.1"
      auto.create.topics.enable: "false"
      delete.topic.enable: "true"
    storage:
      type: jbod
      volumes:
        - id: 0
          type: persistent-claim
          size: 100Gi
          deleteClaim: false
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 70Gi # this is the 'recommended' by confluent but it's excessive, the older ZK k8s images that have a misspell on the purge interval config that make it not run and k8s people think ZK needs lot of storage, 10gb should be enough for many cluster
      deleteClaim: false
  entityOperator:
    topicOperator: {}
    userOperator: {}

namespace for all services is kafka and you can connect on the other services using:

- name: KAFKA_REGISTRY_URL # just a name I used on my project, you must set it accordingly to your app
  value: "http://schema-registry.kafka.svc:8081"