druid-io / druid-operator

Druid Kubernetes Operator
Other
205 stars 93 forks source link

Auto-scaling issues #246

Open EwanValentine opened 3 years ago

EwanValentine commented 3 years ago

I'm doing some load-testing at the moment. I'm using Gatling to scale requests per second up from 100 to 1000 over the course of around four minutes.

However, I start to run into connection errors, where the router seems to get overwhelmed. I've got an HPA setup for each component. I've tried multiple settings and scaling rules based on average CPU and memory utilisation. The router's running on an r5.xlarge.

Looking at the resource usage, I start getting connection errors long before the HPA kicks in. The CPU didn't exceed 16%, and the memory hovered around 20%. Then all the requests start to fail. So it's like it's not utilising the available resources before hitting limits.

Here's my current router config:

routers:
      extra.jvm.options: |-
        -Xms256M
        -Xmx2G
      hpAutoscaler:
        behavior:
          scaleDown:
            stabilizationWindowSeconds: 60
            policies:
            - type: Percent
              value: 50
              periodSeconds: 15
        maxReplicas: {{ default .Values.routers.maxReplicas 5 }}
        minReplicas: 1
        scaleTargetRef:
          apiVersion: apps/v1
          kind: Deployment
          name: druid-druid-cluster-routers
        metrics:
          - type: Resource
            resource:
              name: cpu
              target:
                type: Utilization
                averageUtilization: 60
          - type: Resource
            resource:
              name: memory 
              target:
                type: Utilization 
                averageUtilization: 40
      kind: {{ default .Values.routers.kind "Deployment" }}
      maxSurge: 2
      maxUnavailable: 0
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
              - matchExpressions:
                  - key: service
                    operator: In
                    values:
                      - {{ .Values.computeTierNodeSelector }}
      livenessProbe:
        initialDelaySeconds: 10
        periodSeconds: 1
        failureThreshold: 10
        httpGet:
          path: /status/health
          port: {{ default .Values.routers.port 8080 }}
      readinessProbe:
        initialDelaySeconds: 10
        periodSeconds: 1
        failureThreshold: 10
        httpGet:
          path: /status/health
          port: {{ default .Values.routers.port 8080 }}
      druid.port: {{ default .Values.routers.port 8080 }}
      env:
        - name: AWS_REGION
          value: {{ default .Values.region "eu-west-1" }}
        - name: AWS_DEFAULT_REGION
          value: {{ default .Values.region "eu-west-1" }}
      resources:
        limits:
          cpu: {{ default .Values.routers.resources.maxCPU 2 }}
          memory: {{ default .Values.routers.resources.maxMemory "2Gi" }}
        requests:
          cpu: {{ default .Values.routers.resources.desiredCPU 2 }}
          memory: {{ default .Values.routers.resources.desiredMemory "2Gi" }}
      nodeType: router
      podDisruptionBudgetSpec:
        maxUnavailable: 1
      nodeConfigMountPath: /opt/druid/conf/druid/cluster/query/router
      replicas: {{ default .Values.routers.replicas 1 }}
      runtime.properties: |
          druid.service=druid/router
          druid.log4j2.sourceCategory=druid/router

          druid.router.http.numConnections=4
          druid.router.http.readTimeout=PT5M
          druid.router.http.numMaxThreads=10
          druid.server.http.numThreads=10

          # Service discovery
          druid.router.defaultBrokerServiceName=druid/broker
          druid.router.coordinatorServiceName=druid/coordinator
          druid.router.managementProxy.enabled=true
      {{ with .Values.routers.service -}}
      services:
        -
          metadata:
            name: router-%s-service
          spec:
            ports:
              -
                name: router-port
                port: {{ default .port 8080 }}
            type: {{ default .type "NodePort" }}
      {{- end }}

I've tried tweaking connection and thread limits (from low to really high, like 1000). I've tried tweaking the Xms and Xmx values to in theory allow the heap to grow in relation to the K8s max memory values. Given the current values, I'd expect the utilisation to increase with the number of requests and the auto-scaling to kick in. But it doesn't seem to budge! Any suggestions or advice?

I've posted about it in more detail on the Druid forums: https://www.druidforum.org/t/router-scaling-issues/6664

But haven't had a response yet.

Thanks in advance!

pjain1 commented 3 years ago

What is the exact error you are getting ?

I see that you have set druid.router.http.numConnections, druid.router.http.numMaxThreads and druid.server.http.numThreads to a very low number. Please read about them here. I would suggest not setting these and let druid calculate the default as per the resources available. These might be causing the router not able to use resources it has. If you have already tried this then you can try other things as explained below.

In my experience, generally, router is not a bottleneck as it acts as a simple proxy but downstream nodes like historical or broker might be the bottleneck. It would be worth investigating If queries are timing out on historical or broker, check logs to see queries are timing out on them, you will need to enable request logging if not already done.

There are lot of variables that can cause issues, check the broker and historical resource utilisation, check if cpu or memory are getting saturated if not check num processing threads are set to a number that utilizes the cpus available, also check there are any disk reads which might hang up processing threads waiting on IO and thus causing things to be backlogged. There is a druid metric query/segment/time on historical, check if its increasing during load testing.

What kind of queries are you running ? If they are groupby v2, check if there are enough merge buffers to execute them.

EwanValentine commented 3 years ago

Thanks for the advice!

I did try using really high values and really low ones for druid.router.http.* and a few in-between also. I've just removed the router, I'm testing directly against the brokers now. The results are similar so far. I've made sure logging is enabled, but I'm just seeing successful queries, I'm struggling to find any errors in them.

I've been keeping an eye on the CPU and Memory by doing $ kubectl get hpa and seeing the percentages there. The values barely seem to increase. I've looked at the -Xmx and -Xms values as well, I've set a relatively small value for -Xms (512Mi) and a value that utilises the memory limit value for that pod for -Xmx (8Gi).

I'm not using groupby's in my queries, they're just doing some basic filtering essentially, with a randomised date range.

pjain1 commented 3 years ago

Can you share broker and historical runtime properties and k8s configs.

EwanValentine commented 3 years ago
apiVersion: druid.apache.org/v1alpha1
kind: Druid
metadata:
  name: {{ include "druid-cluster.name" . }}
  labels:
    {{- include "druid-cluster.labels" . | nindent 4 }}
spec:
  commonConfigMountPath: /opt/druid/conf/druid/cluster/_common
  rollingDeploy: true
  securityContext:
    fsGroup: 0
    runAsUser: 0
    runAsGroup: 0
  image: {{ .Values.image }}
  startScript: /druid.sh
  log4j.config: |-
    <?xml version="1.0" encoding="UTF-8" ?>
    <Configuration status="WARN">
      <Appenders>
        <Console name="console" target="SYSTEM_OUT">
          <PatternLayout pattern="[%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%n" />
        </Console>
      </Appenders>
      <Loggers>
        <Root level="info">
          <AppenderRef ref="console"/>
        </Root>
        <Logger name="org.apache.druid.jetty.RequestLog" additivity="false" level="INFO">
            <AppenderRef ref="console"/>
        </Logger>
      </Loggers>
    </Configuration>
  jvm.options: |-
    -server
    -XX:+UseG1GC
    -Xloggc:gc-%t-%p.log
    -XX:+UseGCLogFileRotation
    -XX:GCLogFileSize=100M
    -XX:NumberOfGCLogFiles=10
    -XX:+HeapDumpOnOutOfMemoryError
    -XX:HeapDumpPath=/opt/druid/var/
    -verbose:gc
    -XX:+PrintGCDetails
    -XX:+PrintGCTimeStamps
    -XX:+PrintGCDateStamps
    -XX:+PrintGCApplicationStoppedTime
    -XX:+PrintGCApplicationConcurrentTime
    -XX:+PrintAdaptiveSizePolicy
    -XX:+PrintReferenceGC
    -XX:+PrintFlagsFinal
    -Duser.timezone=UTC
    -Dfile.encoding=UTF-8
    -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
    -Dorg.jboss.logging.provider=slf4j
    -Dnet.spy.log.LoggerImpl=net.spy.memcached.compat.log.SLF4JLogger
    -Dlog4j.shutdownCallbackRegistry=org.apache.druid.common.config.Log4jShutdown
    -Dlog4j.shutdownHookEnabled=true
    -Dcom.sun.management.jmxremote.authenticate=false
    -Dcom.sun.management.jmxremote.ssl=false
  common.runtime.properties: |
    druid.selectors.indexing.serviceName=druid/overlord
    druid.selectors.coordinator.serviceName=druid/coordinator

    # Logging
    druid.request.logging.type=file
    druid.request.logging.dir=/opt/druid/logs

    # Extensions
    druid.extensions.loadList=["druid-basic-security", "postgresql-metadata-storage", "druid-s3-extensions"]
    druid.extensions.directory=/opt/druid/extensions

    # SQL
    druid.sql.enable=true
    druid.sql.planner.useApproximateCountDistinct=false

    # Storage
    {{ if .Values.s3 }}
    druid.storage.type=s3
    druid.storage.bucket={{ .Values.s3.bucket }}
    druid.storage.baseKey={{ .Values.s3.baseKey }}
    druid.s3.accessKey={{ .Values.secrets.s3.accessKey }}
    druid.s3.secretKey={{ .Values.secrets.s3.secretKey }}
    druid.indexer.logs.directory=data/logs/
    druid.storage.sse.type=s3
    {{ else }}
    druid.storage.type=local
    druid.storage.storageDirectory=/druid/deepstorage
    {{ end }}

    druid.extensions.loadList=["postgresql-metadata-storage"]
    druid.metadata.storage.type=postgresql
    druid.metadata.storage.connector.connectURI=<redacted>
    druid.metadata.storage.connector.user=<redacted>
    druid.metadata.storage.connector.password=<redacted>

    # Metrics
    {{ if .Values.prometheus }}
    druid.emitter.http.recipientBaseUrl=http://{{ .Values.prometheus.url }}:{{ .Values.prometheus.port }}/druid
    druid.emitter=http
    {{ end }}

    # Configure whether to use ZooKeeper or K8s/etcd once available
    druid.zk.service.host={{ default .Values.zookeeper.host "tiny-cluster-zk-0.tiny-cluster-zk" }}
    druid.zk.paths.base=/druid
  nodes:
    brokers: 
      {{ with .Values.brokers.service -}}
      services:
        -
          metadata:
            name: broker-%s-service
          spec:
            ports:
              -
                name: broker-port
                port: {{ default .port 8080 }}
            type: {{ default .type "NodePort" }}
      {{- end }}
      extra.jvm.options: |-
        -Xmx1G
        -Xms512M
      kind: Deployment
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
              - matchExpressions:
                  - key: service
                    operator: In
                    values:
                      - {{ .Values.computeTierNodeSelector }}
      replicas: 1
      maxSurge: 2
      maxUnavailable: 0
      druid.port: {{ default .Values.brokers.port 8080 }}
      nodeConfigMountPath: /opt/druid/conf/druid/cluster/query/broker
      nodeType: broker
      env:
        - name: AWS_REGION
          value: {{ default .Values.region "eu-west-1" }}
      podDisruptionBudgetSpec:
        maxUnavailable: 1
      resources:
        limits:
          cpu: 4
          memory: 8Gi
        requests:
          cpu: 4
          memory: 8Gi
      readinessProbe:
        initialDelaySeconds: 5
        periodSeconds: 1
        failureThreshold: 30
        httpGet:
          path: /druid/broker/v1/readiness
          port: 8080
      hpAutoscaler:
        behavior:
          scaleDown:
            stabilizationWindowSeconds: 30
            policies:
            - type: Percent
              value: 100
              periodSeconds: 15
        maxReplicas: 5
        minReplicas: 1
        scaleTargetRef:
          apiVersion: apps/v1
          kind: Deployment
          name: druid-druid-cluster-brokers
        metrics:
          - type: Resource
            resource:
              name: cpu
              target:
                type: Utilization
                averageUtilization: 40
          - type: Resource
            resource:
              name: memory
              target:
                type: Utilization
                averageUtilization: 60
      runtime.properties: |
        druid.service=druid/broker
        druid.log4j2.sourceCategory=druid/broker

        # HTTP Server
        druid.broker.http.defaultQueryTimeout=1500000
        druid.broker.http.readTimeout=PT5M

        # Server settings
        druid.server.http.maxSubqueryRows=10000000

        # Group by
        druid.query.groupBy.maxMergingDictionarySize=100000000
        druid.query.groupBy.maxOnDiskStorage=100000000
        druid.extensions.loadList=["druid-moving-average-query"]

    coordinators:
      druid.port: 8080
      kind: Deployment
      extra.jvm.options: |-
        -Xmx8G
        -Xms512M
      maxSurge: 2
      maxUnavailable: 0
      nodeConfigMountPath: /opt/druid/conf/druid/cluster/master/coordinator-overlord
      nodeType: coordinator
      podDisruptionBudgetSpec:
        maxUnavailable: 1
      replicas: {{ default .Values.coordinators.replicas 1 }}
      resources:
        limits:
          cpu: 1
          memory: 4Gi
        requests:
          cpu: 1
          memory: 4Gi
      livenessProbe:
        initialDelaySeconds: 10
        periodSeconds: 2
        failureThreshold: 10
        httpGet:
          path: /status/health
          port: 8080
      readinessProbe:
        initialDelaySeconds: 10
        periodSeconds: 1
        failureThreshold: 10
        httpGet:
          path: /status/health
          port: 8080
      env:
        - name: AWS_REGION
          value: {{ default .Values.region "eu-west-1" }}
      runtime.properties: |
          druid.service=druid/coordinator
          druid.log4j2.sourceCategory=druid/coordinator
          druid.indexer.runner.type=httpRemote
          druid.indexer.queue.startDelay=PT5S
          druid.coordinator.balancer.strategy=cachingCost
          druid.serverview.type=http
          druid.indexer.storage.type=metadata
          druid.server.http.numThreads=500
          druid.coordinator.asOverlord.enabled=true
          druid.coordinator.asOverlord.overlordService=druid/overlord

    historical:
      druid.port: {{ default .Values.historicals.port 8080 }}
      kind: {{ default .Values.historicals "Deployment" }}
      nodeType: historical
      extra.jvm.options: |-
        -Xmx8G
        -Xms512M
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
              - matchExpressions:
                  - key: service
                    operator: In
                    values:
                      - {{ .Values.storageTierNodeSelector }} 
      podDisruptionBudgetSpec:
        maxUnavailable: 1
      nodeConfigMountPath: /opt/druid/conf/druid/cluster/data/historical
      replicas: 1
      livenessProbe:
        initialDelaySeconds: 30
        periodSeconds: 5
        failureThreshold: 50
        httpGet:
          path: /status/health
          port: {{ default .Values.historicals.port 8080 }}
      readinessProbe:
        httpGet:
          path: /druid/historical/v1/readiness
          port: {{ default .Values.historicals.port 8080 }}
        periodSeconds: 1
        failureThreshold: 10
      resources:
        limits:
          cpu: 4
          memory: 8Gi
        requests:
          cpu: 4
          memory: 8Gi
      env:
        - name: AWS_REGION
          value: {{ .Values.region }}
      hpAutoscaler:
        behavior:
          scaleDown:
            stabilizationWindowSeconds: 30
            policies:
            - type: Percent
              value: 100
              periodSeconds: 10
        maxReplicas: {{ default .Values.historicals.maxReplicas 5 }}
        minReplicas: 1
        scaleTargetRef:
          apiVersion: apps/v1
          kind: Deployment
          name: druid-druid-cluster-historical
        metrics:
          - type: Resource
            resource:
              name: cpu
              target:
                type: Utilization
                averageUtilization: {{ default .Values.historicals.cpuScaleThreshold 50 }}
          - type: Resource
            resource:
              name: memory 
              target:
                type: Utilization 
                averageUtilization: {{ default .Values.historicals.memoryScaleThreshold 60 }}
      runtime.properties: |
        druid.processing.numThreads=4
        # Segment storage
        druid.segmentCache.locations=[{\"path\":\"/druid/data/segments\",\"maxSize\":10737418240}]
        druid.server.maxSize=10737418240

    middlemanagers:
      druid.port: {{ default .Values.middlemanagers.port 8080 }}
      kind: {{ default .Values.middlemanagers.kind "StatefulSet" }}
      nodeType: middleManager
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
              - matchExpressions:
                  - key: service
                    operator: In
                    values:
                      - {{ .Values.computeTierNodeSelector }}
      nodeConfigMountPath: /opt/druid/conf/druid/cluster/data/middleManager
      env:
        - name: AWS_REGION
          value: {{ default .Values.region "eu-west-1" }}
        - name: AWS_DEFAULT_REGION
          value: {{ default .Values.region "eu-west-1" }}
      podDisruptionBudgetSpec:
        maxUnavailable: 1
      replicas: 1
      resources:
        limits:
          cpu: 1
          memory: 8Gi
        requests:
          cpu: 500m
          memory: 4Gi
      livenessProbe:
        initialDelaySeconds: 10
        periodSeconds: 1
        failureThreshold: 10
        httpGet:
          path: /status/health
          port: {{ default .Values.middlemanagers.port 8080 }}
      readinessProbe:
        initialDelaySeconds: 10
        periodSeconds: 1
        failureThreshold: 10
        httpGet:
          path: /status/health
          port: {{ default .Values.middlemanagers.port 8080 }}
      runtime.properties: |
        druid.service=druid/middleManager
        druid.worker.capacity={{ default .Values.middlemanagers.capacity 5 }}
        druid.indexer.runner.javaOptsArray=["-server", "-Xms3g", "-Xmx3g", "-XX:MaxDirectMemorySize=3g", "-Duser.timezone=UTC", "-Dfile.encoding=UTF-8", "-XX:+ExitOnOutOfMemoryError", "-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]

        druid.indexer.task.baseTaskDir=var/druid/task

        # HTTP server threads
        druid.server.http.numThreads=50

        # Processing threads and buffers on Peons
        druid.indexer.fork.property.druid.processing.numMergeBuffers=2
        druid.indexer.fork.property.druid.processing.buffer.sizeBytes=32000000
        druid.indexer.fork.property.druid.processing.numThreads=2
      volumeClaimTemplates:
        -
          metadata:
            name: data-volume
          spec:
            accessModes:
              - ReadWriteOnce
            resources:
              requests:
                storage: 15Gi
            storageClassName: gp2
      volumeMounts:
        -
          mountPath: /var/druid
          name: data-volume

Thanks for your help! This is the result of several weeks of copying other examples and tweaking various values, so I'm sure there's probably some obvious errors in here

pjain1 commented 3 years ago

Configs generally look good, I think you should check historical node to see if constant disk reads are happening when you are running your test that would suggest that memory to disk (segment size) ratio is not 1:1 and processing threads are just waiting on IO. In this case you will also see high cpu io wait times but low cpu utilization. SSDs can also help here if you don't want to keep the ratio 1:1 and want decent performance for workloads that would scan almost all the data.

If this is not the issue then you might want to increase the num processing threads on historical until cpu utitlization hits its limit.