kubeflow / spark-operator

Kubernetes operator for managing the lifecycle of Apache Spark applications on Kubernetes.
Apache License 2.0
2.76k stars 1.37k forks source link

[BUG] Env vars are randomly dropped #2082

Open gagarinfan opened 3 months ago

gagarinfan commented 3 months ago

Description

Hi!

After upgrading my Spark Operator Helm chart from version 1.1.26 (hosted previously in the https://googlecloudplatform.github.io/spark-on-k8s-operator repository) to version 1.4.3 (from https://kubeflow.github.io/spark-operator), we are experiencing inconsistent behaviour with environment variables in our Spark jobs. Specifically, the environment variable KAFKA_SERVERS is randomly not recognised by the Spark job, as observed in the application logs. This issue started occurring post-upgrade.

I noticed similar issues reported in the following tickets, which appear to persist in the latest version (Helm chart: 1.4.3):

https://github.com/kubeflow/spark-operator/issues/2031 https://github.com/kubeflow/spark-operator/issues/2017 Unfortunately, the proposed solutions in these tickets did not resolve the issue for us.

Reproduction Code [Required]

Install latest (as of now: 1.4.3) version of spark-operator with values:

spark-operator:
  replicaCount: 2
  fullnameOverride: spark-operator
  sparkJobNamespaces:
    - spark
  podMonitor:
    enabled: true
  webhook:
    enable: true
  serviceAccounts:
    spark:
      name: spark-jobs
  ingressUrlFormat: "{{$appName}}.example.com"

Run spark-application with env vars, for example:

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: some-spark-job
spec:
  driver:
    cores: 4
    env:
      - name: KAFKA_SERVERS
        value: kaffka-kafka.kafka:9092
    envFrom:
      - secretRef:
          name: some-secret
    memory: 8000m
    serviceAccount: spark-jobs
  executor:
    cores: 4
    env:
      - name: KAFKA_SERVERS
        value: kaffka-kafka.kafka:9092
    envFrom:
      - secretRef:
          name: some-secret
    instances: 2
    memory: 8000m
    serviceAccount: spark-jobs
  image: repo/image:tag
  mainApplicationFile: local:///opt/app/connector.py
  mode: cluster
  restartPolicy:
    onFailureRetries: 3
    onFailureRetryInterval: 10
    onSubmissionFailureRetries: 5
    onSubmissionFailureRetryInterval: 20
    type: OnFailure
  sparkVersion: 3.3.2
  type: Python

Expected behavior

Env var is pulled properly

Actual behavior

App reports:


2024-07-03 07:15:01.601Traceback (most recent call last):2024-07-03 07:15:01.601  File "/opt/app/connector.py", line 3, in <module>2024-07-03 07:15:01.601    from configs.spark import kafka_stream_config, SparkAppConfig2024-07-03 07:15:01.601  File "/opt/app/configs/spark.py", line 8, in <module>2024-07-03 07:15:01.601    "kafka.bootstrap.servers": os.environ['KAFKA_SERVERS'],2024-07-03 07:15:01.601  File "/usr/lib/python3.9/os.py", line 679, in __getitem__2024-07-03 07:15:01.601    raise KeyError(key) from None | 2024-07-03 07:15:01.601 | Traceback (most recent call last): |   |   |   | 2024-07-03 07:15:01.601 | File "/opt/app/connector.py", line 3, in <module> |   |   |   | 2024-07-03 07:15:01.601 | from configs.spark import kafka_stream_config, SparkAppConfig |   |   |   | 2024-07-03 07:15:01.601 | File "/opt/app/configs/spark.py", line 8, in <module> |   |   |   | 2024-07-03 07:15:01.601 | "kafka.bootstrap.servers": os.environ['KAFKA_SERVERS'], |   |   |   | 2024-07-03 07:15:01.601 | File "/usr/lib/python3.9/os.py", line 679, in __getitem__ |   |   |   | 2024-07-03 07:15:01.601 | raise KeyError(key) from None |  
-- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | --
2024-07-03 07:15:01.601 | Traceback (most recent call last): |  
  |   | 2024-07-03 07:15:01.601 | File "/opt/app/connector.py", line 3, in <module> |  
  |   | 2024-07-03 07:15:01.601 | from configs.spark import kafka_stream_config, SparkAppConfig |  
  |   | 2024-07-03 07:15:01.601 | File "/opt/app/configs/spark.py", line 8, in <module> |  
  |   | 2024-07-03 07:15:01.601 | "kafka.bootstrap.servers": os.environ['KAFKA_SERVERS'], |  
  |   | 2024-07-03 07:15:01.601 | File "/usr/lib/python3.9/os.py", line 679, in __getitem__ |  
  |   | 2024-07-03 07:15:01.601 | raise KeyError(key) from None |  

2024-07-03 07:15:01.601KeyError: 'KAFKA_SERVERS' |   |   | 2024-07-03 07:15:01.601 | KeyError: 'KAFKA_SERVERS'
  |   | 2024-07-03 07:15:01.601 | KeyError: 'KAFKA_SERVERS'

SparkApplication event:

Executor [<redacted-pod-name> %!s(int32=143) Error] failed with ExitCode: %!d(MISSING), Reason: %!s(MISSING)

Environment & Versions

ChenYi015 commented 3 months ago

@gagarinfan The environment variables are patched by webhook, and now the default failure policy of webhook is Ignore, that means when there is an error calling the webhook, the error will be ignored and the pod will continue to be created.

gagarinfan commented 3 months ago

Hi @ChenYi015

thanks for suggestion. I increased log level to spot potential webhook related error that might have been logged, but I didn't find any.

Thanks to log level increase I can now see when webhook is patching pods and looks like it wasn't even triggered for one of the executor pod.

I manually changed Webhook configuration to Fail. Let's see

gagarinfan commented 2 months ago

Hi. Unfortunately webhook is recreated every-time spark-operator pod is restarted and I can't see any option to overwrite its failure policy in the chart. Also, as I mention in my previous post, increasing log level didn't bring any useful information

gagarinfan commented 2 months ago

Small update from my site. We detected two issues. One with the spark-operator-webhook-certs secret that was cleaned by ArgoCD. After upgrading to the latest version, ArgoCD sees a diff in secret values and tries to "heal it". Turned out that it simply erases it. We found it out by seeing

TLS handshake error from 192.168.13.102:43682: remote error: tls: bad certificate

error in operator's log when there was an attempt to call the webhook with a mutation (injecting env vars).

Another issue we found is that when running more than 1 spark-operator replica we still got TLS handshake error even though secret is not empty.

Making ArgoCD ignore the secret's value + running on single replica resolve the issue, but we should be able to run the operator in HA. Any help will be appreciated!

yonibc commented 2 months ago

hi @gagarinfan I'm experiencing the same issue, deploying spark-operator via ArgoCD as well. There is a workaround I used in order ignore seeing OutOfSync status. You can update your Application with the following:

  ignoreDifferences:
  - group: "'*'"
    kind: Secret
    name: spark-operator-webhook-certs
    jsonPointers:
      - /data

Do you have any idea why after a pod restart the secret is updated, why is it the default behaviour?

gagarinfan commented 2 months ago

hi @yonibc

Thanks for the hint with ignoreDifferences. Regarding your question; I think that it's caused by the fact that in helm chart the secret values are empty:

{{- if .Values.webhook.enable -}}
apiVersion: v1
kind: Secret
metadata:
  name: {{ include "spark-operator.webhookSecretName" . }}
  labels:
    {{- include "spark-operator.labels" . | nindent 4 }}
data:
  ca-key.pem: ""
  ca-cert.pem: ""
  server-key.pem: ""
  server-cert.pem: ""
{{- end }}

so that is why Argo wants to overwrite it as values are filled in by the operator. See https://github.com/kubeflow/spark-operator/commit/5ce3dbacff76bba364055b9b786110f1a4ab3174. I think @ChenYi015 can say more 🙏

@yonibc do you also observe certificate issues when running > 1 pod for spark-operator?

ChenYi015 commented 2 months ago

@gagarinfan @yonibc Thanks for reporting the certificate issue. I had fixed this problem in the new 2.0 version operator, but it hadn't been released. In the new version, webhook secret will be populated once by operator, and will not be cleared or overwrited during the install/upgrade/rollback process. It also works fine in the HA mode. And I will try to fix this issue in this 1.6.x version recently.

Bsides, in the new version, the default failure policy of webhook has changed to Fail and will be configurable in values.yaml.

yonibc commented 2 months ago

hey @gagarinfan @ChenYi015

@gagarinfan - according to your question - yes, I have the issue. It seems that secret is generated empty, and every time a new operator pod comes up, the certificates are re-generated and it causes issues with MutatingWebhookConfiguration, resulting in env vars, affinity, etc not to be passed to the pods created from SparkApplication.

As @ChenYi015 mentioned, in the new 2.0 version the secret will be populated once.

My questions:

  1. How will the secret be populated? I mean, the expected behavior is generated one-time during the installation of the operator, and not be overridden after.
  2. When will be able to see this new version released? :) It would help A lot
  3. Is the temporary solution for now will be to leave the operator running with one replica?

Thank you!

ChenYi015 commented 2 months ago

hey @gagarinfan @ChenYi015

@gagarinfan - according to your question - yes, I have the issue. It seems that secret is generated empty, and every time a new operator pod comes up, the certificates are re-generated and it causes issues with MutatingWebhookConfiguration, resulting in env vars, affinity, etc not to be passed to the pods created from SparkApplication.

As @ChenYi015 mentioned, in the new 2.0 version the secret will be populated once.

My questions:

  1. How will the secret be populated? I mean, the expected behavior is generated one-time during the installation of the operator, and not be overridden after.
  2. When will be able to see this new version released? :) It would help A lot
  3. Is the temporary solution for now will be to leave the operator running with one replica?

Thank you!

@yonibc 1: In the new version, once a new operator pod starts up (install/upgrade/rollback), it will check whether the webhook secret exists, if not, it will create a new one. Otherwise, it will check whether the certificates are populated, if not, a new certificate will be generated and the secret will be populated accordingly, otherwise it will read certifcates from the secret and save it to local. When in HA mode, there is a retry mechanism to make sure only one replica can create/update the secret successfully, and other replicas will retry and finally sync the certificates to local.

2: I am modifying the relase action workflow, hopefully it can be released in the next week.

3: Yes, I think so.

Reamer commented 1 month ago

I have just stumbled across this issue. Thank you @yonibc for your short ArgoCD snippet. For me, your code block had too many quotes around the wildcard. The following snippet works for me.

  ignoreDifferences:
  - group: "*"
    kind: Secret
    name: spark-operator-webhook-certs
    jsonPointers:
      - /data

I also do not use the Spark operator in HA mode.