LamaAni / KubernetesJobOperator

An airflow operator that executes a task in a kubernetes cluster, given a kubernetes yaml configuration or an image refrence.
58 stars 8 forks source link

FEATURE: Refresh aws token #54

Open duferdev opened 3 years ago

duferdev commented 3 years ago

Feature description

So basically we are facing this issue located in the k8s python client:

https://github.com/kubernetes-client/python/issues/741

This has been opened for more than one year. As far as I understand, once the client is instanced, there is no way to refresh it's credentials given that it caches them. Because of that, when credentials expire, client calls respond with 401. Meanwhile, people started to develop workarounds on their solutions. The most popular seems to be creating a new client with fresh credentials every time we need to call the client.

So basically that's what i'm proposing. Because this workaround could be ugly for people using the operator that is not suffering this issue, we should consider make some kind of feature toggling from operator params or something like that.

This is my naive proposal given my short knowledge on this but any other proposal would be appreciated.

Because, in our case, we can't run any job that last more than 15 minutes, so our pipelines crash and our pods weren't deleted and got staled (incrementing our infra costs since we are using aws fargate).

Describe alternatives you've considered Try to catch 401 responses and refresh credentials there somehow.

Additional context

We are running everything on aws. We have MWAA (airflow 2.0) with dags running tasks like this:

 KubernetesLegacyJobOperator(
        task_id="task1",
        namespace="fargate",
        config_file=kube_config_yaml,
        get_logs=True,
        startup_timeout_seconds=300,
        body_filepath="/usr/local/airflow/dags/config/task1.yaml",
        dag=pipeline,
        is_delete_operator_pod=True,
        delete_policy="Always",
        execution_timeout=timedelta(hours=1)
    )

Yaml file looks like

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  annotations:
    kubernetes_job_operator.main_container: "spark-kubernetes-driver"
  name: "foo"
  namespace: fargate
spec:
  arguments:
  ...
  deps:
    files:
    - "local:///etc/spark/conf/default-logstash-fields.properties"
  driver:
    coreLimit: 2000m
    coreRequest: 1995m
    cores: 2
    env:
    ....
    initContainers:
    - command:
      - sh
      - "-c"
      - "echo hi"
      image: busybox
      name: "volume-mount-hack"
      volumeMounts:
      - mountPath: /tmp/committer
        name: "staging-vol"
        readOnly: false
    labels:
      metrics: prometheus
      version: '3.1.1'
    memory: 4086M
    podSecurityContext:
      fsGroup: 185
    serviceAccount: fargate
    sidecars:
    - command:
      - "/fluent-bit/bin/fluent-bit"
      - "-c"
      - "/tmp/fluent-bit/fluent-bit-custom.conf"
      image: "fluent/fluent-bit:1.7"
      name: "fluent-bit"
      resources:
        limits:
          cpu: 50m
          memory: 60Mi
        requests:
          cpu: 5m
          memory: 10Mi
      volumeMounts:
      - mountPath: "/tmp/spark-logs"
        name: "spark-logs"
        readOnly: false
      - mountPath: "/tmp/fluent-bit"
        name: "fluent-bit"
        readOnly: false
    volumeMounts:
    - mountPath: "/tmp/spark-logs"
      name: "spark-logs"
      readOnly: false
    - mountPath: "/tmp/fluent-bit"
      name: "fluent-bit"
      readOnly: false
    - mountPath: /tmp/committer
      name: "staging-vol"
      readOnly: false
  dynamicAllocation:
    enabled: true
    initialExecutors: 4
    maxExecutors: 4
    minExecutors: 2
  executor:
    coreLimit: 2000m
    coreRequest: 1995m
    cores: 2
    deleteOnTermination: true
    labels:
      metrics: prometheus
      version: '3.1.1'
    memory: 6134M
    podSecurityContext:
      fsGroup: 185
    serviceAccount: fargate
    sidecars:
    - command:
      - "/fluent-bit/bin/fluent-bit"
      - "-c"
      - "/tmp/fluent-bit/fluent-bit-custom.conf"
      image: "fluent/fluent-bit:1.7"
      name: "fluent-bit"
      resources:
        limits:
          cpu: 50m
          memory: 60Mi
        requests:
          cpu: 5m
          memory: 10Mi
      volumeMounts:
      - mountPath: "/tmp/spark-logs"
        name: "spark-logs"
        readOnly: false
      - mountPath: "/tmp/fluent-bit"
        name: "fluent-bit"
        readOnly: false
    volumeMounts:
    - mountPath: "/tmp/spark-logs"
      name: "spark-logs"
      readOnly: false
    - mountPath: "/tmp/fluent-bit"
      name: "fluent-bit"
      readOnly: false
    - mountPath: /tmp/committer
      name: "staging-vol"
      readOnly: false
  hadoopConf:
    fs.s3.maxRetries: '10'
    fs.s3a.aws.credentials.provider: ...
    fs.s3a.block.size: 64M
    fs.s3a.buffer.dir: /tmp/committer/buffer
    fs.s3a.committer.magic.enabled: 'false'
    fs.s3a.committer.name: partitioned
    fs.s3a.committer.staging.abort.pending.uploads: 'false'
    "fs.s3a.committer.staging.conflict-mode": replace
    fs.s3a.committer.staging.tmp.path: "file:///tmp/committer/staging"
    fs.s3a.connection.ssl.enabled: 'false'
    fs.s3a.experimental.fadvise: random
    fs.s3a.fast.upload.buffer: disk
    fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
    fs.s3a.multipart.purge: 'false'
    fs.s3a.retry.throttle.interval: 10000ms
  image: "spark:3.1.1-v1.0.7"
  imagePullPolicy: IfNotPresent
  mainApplicationFile: "s3a://foo.jar"
  mainClass: com.foo.FooApp
  mode: cluster
  monitoring:
    exposeDriverMetrics: true
    exposeExecutorMetrics: true
    prometheus:
      ...
  restartPolicy:
    type: Never
  sparkConf:
    ...
  sparkConfigMap: "spark-conf-map-foo"
  sparkVersion: '3.1.1'
  template:
    metadata:
      labels:
        app: "foo-pod"
  type: Scala
  volumes:
  - emptyDir: {}
    name: "spark-logs"
  - configMap:
      name: "fluent-bit-conf-map"
    name: "fluent-bit"
  - name: "staging-vol"
    persistentVolumeClaim:
      claimName: "data-staging-share"

As you can see, we are running spark applications.

Here is the kube conf file:

apiVersion: v1
clusters:
- cluster:
    certificate-authority-data: ***
  name: ****
contexts:
- context:
    cluster: ****
    user: ****
  name: aws
current-context: aws
kind: Config
preferences: {}
users:
- name: ****
  user:
    exec:
      apiVersion: ****
      args:
      - --region
      - ***
      - eks
      - get-token
      - --cluster-name
      - ****
      command: /usr/local/airflow/.local/bin/aws

Here is the way we get the token and the reason why it last for just 15 min

https://awscli.amazonaws.com/v2/documentation/api/latest/reference/eks/get-token.html

And here is one example for a failed client call log looks like:

[2021-09-15 10:31:15,682] {{client.py:483}} ERROR - Traceback (most recent call last):

  File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/client.py", line 334, in query_loop
    collection_formats={},

  File "/usr/local/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 353, in call_api
    _preload_content, _request_timeout, _host)

  File "/usr/local/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 184, in __call_api
    _request_timeout=_request_timeout)

  File "/usr/local/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 420, in request
    body=body)

  File "/usr/local/airflow/.local/lib/python3.7/site-packages/kubernetes/client/rest.py", line 270, in DELETE
    body=body)

  File "/usr/local/airflow/.local/lib/python3.7/site-packages/kubernetes/client/rest.py", line 233, in request
    raise ApiException(http_resp=r)

kubernetes.client.exceptions.ApiException: (401)
Reason: Unauthorized
HTTP response headers: HTTPHeaderDict({'Audit-Id': '49ea69bb-f188-4620-aa0e-e4e57ba77e95', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'Date': 'Wed, 15 Sep 2021 10:31:15 GMT', 'Content-Length': '129'})
HTTP response body: {'kind': 'Status', 'apiVersion': 'v1', 'metadata': {}, 'status': 'Failure', 'message': 'Unauthorized', 'reason': 'Unauthorized', 'code': 401}

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/usr/local/airflow/.local/lib/python3.7/site-packages/zthreading/tasks.py", line 173, in _run_as_thread
    rslt = self.action(*args, **kwargs)

  File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/client.py", line 230, in _exdcute_query
    self.query_loop(client)

  File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/client.py", line 375, in query_loop
    raise err

airflow_kubernetes_job_operator.kube_api.exceptions.KubeApiClientException: airflow_kubernetes_job_operator.kube_api.operations.DeleteNamespaceResource, Unauthorized: Unauthorized
LamaAni commented 3 years ago

Hi Victor,

Hum since in this operator the client is internally defined, we can create a method there to refresh the token, not affecting the operator in general.

You can find the client @ kubernetes_job_operator/client.

Sadly I have little time at the moment to address this, but if you find a solution we can integrate it.

LamaAni commented 3 years ago

Client file location: https://github.com/LamaAni/KubernetesJobOperator/blob/master/airflow_kubernetes_job_operator/kube_api/client.py

duferdev commented 3 years ago

Hi Zav,

Thanks for your quick response!

Yup, i've already saw that the original client is wrapped in operator's KubeApiRestClient but, according to some comments I saw like https://github.com/kubernetes-client/python/issues/741#issuecomment-518052241 just refreshing the token is not an option, so you have to create a new one. I didn't look that far, but probably we would need to recreate the configuration if authentication happens there. Maybe the fastest solution would be to create both of them in any call, that's why i'm saying that this could end with performance issues in some use cases.

Anyway, knowing that you're not fully available right now, i will try to write some code here. If some other contributor wants to join me, would be a good help.

Wish me luck :)

duferdev commented 3 years ago

I've got something that forks for us, but it's completely custom for our tricky use case. Apart from that, i was forced to do the fix from your 2.0 version because for some reason newer ones are not working. Maybe my college @carlosliarte (I think you already know him) can give you more details on that.

Given I'am in a hurry, i've decided to fork your repo and evolve our custom version from your 2.0 until we both have more time to look at all this details carefully (I wasn't able to run any test here to check if our changes break some other supported usage). The idea is to join those changes in your last version, make that version work in any case (including ours), and get back to this source. Probably by that time, the main issue in the oficial k8s client will be solved and we will just need to upgrade the client version, do some QA and publish a new release.

For the record, this are the (ugly) changes that worked for us:

https://github.com/duferdev/KubernetesJobOperator/pull/1

I will leave it in our preproduction enviroment for a while to check if they're stable

Feel free to close this issue if you want

LamaAni commented 3 years ago

Hi Tnx,

I'll grab a look at that and see if I can integrate this idea into the operator and how. I would prefer it to be an option. The recreation of the token can be done, or the recreation of the client in the case of disconnect. It would take a while though, so apologies for that. I have started a new project recently and the load is high.

LamaAni commented 3 years ago

Hi

I see you propagated the config file. Can you share your config? I have not read the amazon documentation yet.

duferdev commented 3 years ago

Hi Zav,

I would prefer it to be an option. The recreation of the token can be done, or the recreation of the client in the case of disconnect.

Yep, absolutely. Another improvement would be to catch 401 responses in order to reload the config and retry the request once per Unauthorized error.

It would take a while though, so apologies for that. I have started a new project recently and the load is high.

Don't worry, seems that we have it under control right now, also, this is OS right? :) we understand your situation. You are doing enough and we thank you for that.

Can you share your config? I have not read the amazon documentation yet.

Do you mean the kube conf file? I already posted in the issue description. This is the config format. Shadowed parameters are personal tokens, usernames and so. I don't see how that could be helpful. Anyway, if i'm wrong or you mean something else just tell me.

Thanks!

LamaAni commented 3 years ago

Hi

I think I still need to grab a look at the underlining process. The PR you did over there that showed the changes to make the config work were helpfull.

I may have time for this in two three weeks.

Otherwise, if you find a solution I would love a PR.

Best

LamaAni commented 3 years ago

Also,

Could you try a more simple command and verify that the issue repeats. E.g.

...
kind: Pod
...
spec:
  containers:
    - ...
      command: |
        num=1
        while true; do
            echo sleep 10
            sleep 10 
            num=$((num +1))
            if [ num -eq 100 ]; then break; fi
        done

And just sleep for a very long time in your pod? Will that create the same error?

duferdev commented 3 years ago

Yes, for sure it will if the execution last longer than 15 min because AWS EKS security constrains.

Check: https://aws.github.io/aws-eks-best-practices/security/docs/iam/

The token has a time to live (TTL) of 15 minutes after which a new token will need to be generated. This is handled automatically when you use a client like kubectl, however, if you're using the Kubernetes dashboard, you will need to generate a new token and re-authenticate each time the token expires.

Right now, seems that the only way we can refresh those credentials is by reloading the whole config and creating a new client with it. Current open issue posted in the issue description explains why in detail.

But again, this is something that happen in some corner cases where you are using k8s python client against AWS EKS cluster. If you want to reproduce this locally you will need to raise an k8s cluster that emulates AWS behaviour, expiring k8s api tokens in 15 min.

LamaAni commented 3 years ago

But that would not matter. We can recreate the client internally in the wrapper and download/update new creds. I just need to understand how that happens and how to catch it.

If you dont mind trying that with the sleep command I sent, that would produce an example that we can put in examples until I am available to solve the issue for good.

LamaAni commented 3 years ago

Would love a PR on that last one.

LamaAni commented 1 year ago

Hi still have not gotten time to fix this. Also I have no access to the AWS could as of now.