apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.12k stars 14.3k forks source link

Add support to KPO to manage secrets passing Connection-derived credentials #28086

Open potiuk opened 1 year ago

potiuk commented 1 year ago

Description

Add an option to automatically create, map, and manage secrets from Airflow Connection to a Kubernetes Secret for Kubernetes Pod Operator.

Use case/motivation

One of the important use-cases for Kubernetes Pod Operator is to be able to use credentials passed from Airflow Connections to the POD running the workload.

Some of the users pass the credentials using environment variables, but this is inherently insecure, because environment variables passed to the Pod might be displayed and accessed using various mechanisms and certain scenarios (like failing to create Pod) might reveal it in various logs which are not protected by secret masker.

This is possible even now, but it requires custom Kubernetes Pod Operator which would grab the credential, store it in a secret and mount the secret to the Pod via pod template and delete the secrets after the Pod completes.

This has some drawbacks - it is impossible to attach secret's lifecycle to the Pod lifecycle so managing such secrets (especially deleting them in failure scenarios) has to be done externally (for example by a cleanup script).

There is a "complex-ish" possibilty of managing those secrets by the operator itself. The best idea is to assign a fixed name to the secret (either connection or dag/task based), deleting them and handling potential race scenarios where "deletion" of the secret from one Pod completing would race with creating the secret while starting another Pod.

It would be great to implement such a feature for KPO.

Related issues

No response

Are you willing to submit a PR?

Code of Conduct

potiuk commented 1 year ago

cc: @jedcunningham @eladkal @XD-DENG

XD-DENG commented 1 year ago

The best idea is to assign a fixed name to the secret (either connection or dag/task based)

Thinking of a possible case:

What if TWO different Airflow instances are creating Pods (via KPO) in the same Kubernetes namespace? And then it's possible that we have connections in the two Airflow instances with the same connection name, but different values.

This would cause conflict and the job from either Airflow instance would get wrong information.

Does this concern sound valid to you? @potiuk

potiuk commented 1 year ago

What if TWO different Airflow instances are creating Pods (via KPO) in the same Kubernetes namespace? And then it's possible that we have connections in the two Airflow instances with the same connection name, but different values.

I don't think that's a best practice to have different connection names for different instances, but yeah it is a possibilty.

However that gives a very good idea to solve the probem of races.

1) The name of the secret should be (for celery):

${worker_name}-${dag}-${task}

Where ${worker_name} should be "instance_name + worker_id". This should completely solve the race condition. Any left-over secrets would only result from:

a) failure of KPO to delete it in the first place b) reconfiguring your celery and removing some workers

Other than that, this would be an eventually-self-healing solution.

It's still a bit of a problem on KubernetesExecutor - I think there it would be great to add something in the KubernetesExecutor life-cycle. For example I can imagine a Garbage-Collection thread that could look for secrets that follow the right naming convention (where pod_id is part of the secret name) where there are no pods running.

I think that would be a pretty complete solution.

raphaelauv commented 1 year ago

what about reverse the approach

create an airflow Secrets Store CSI Driver

so K8S could call at run time airflow to retrieve the pod secrets

eladkal commented 1 year ago

What if TWO different Airflow instances are creating Pods (via KPO) in the same Kubernetes namespace?

The approach I took when creating K8s secrets as part of custom KPO operator is that each KPO creates it own secrets. e.g any secreted created in K8s has unique name. even rerun of task will create a new secret. That is one of the ways to handle this.

RoyNoymanW commented 1 year ago

I would like to raise a pr for this issue

potiuk commented 1 year ago

Feel Free @RoyNoymanW

SamWheating commented 1 year ago

Hey there @RoyNoymanW, are you still working on this? If not, would it be alright if I gave it a try?

We have a lot of workflows where this would be extremely useful, as we currently have to define the same secrets as both kube-native secrets and airflow connections.

potiuk commented 1 year ago

Feel free @SamWheating

SamWheating commented 1 year ago

Started looking into this today and I had a few design decisions that I wanted to get some early feedback on -

Basic implementation

I'm thinking that I'd follow the approach suggested above of just creating secrets in the task namespace at execution time, and then deleting them in the cleanup and on_kill methods. Are there any reasons you can think of that this might be a bad approach?

Providing connection-backed secrets to the KubernetePodOperator

I have two potential approaches here:

1) Add a KubernetesConnectionSecret class which stores all of the information required for mounting and provisioning a connection-backed secret, and then add a connection_secrets: List[KubernetesConnectionSecret] arg to the KubernetesPodOperator.

2) Add a connection_id argument to the existing Secret class, and if this is defined then we can consider this to be a connection-backed secret and handle it appropriately.

I think I am leaning towards (2) since it keeps things simple within the operator interface, but I'd totally understand if people don't want to overload the Secret class. Thoughts?

Naming connection-backed secrets in kubernetes

I believe kubernetes secret names are limited to 253 characters, so creating something truly unique like {DAG_ID}-{TASK_ID}-{EXECUTION_DATE}-{TRY_NUMBER} would almost definitely run into length issues.

I think that it might just be easiest to call it something like {DAG_ID[:64]}-{TASK_ID[:64]}-{RANDOM_16_CHAR_SHA}. This would avoid collisions, but also remove any identifying information from the kube resources, making things like re-use between tries and also potentially creating a mess for kube cluster admins.

Anyways, I will start working on an early implementation, but please let me know what you think of these proposals as there's a lot of room for interpretation here 😄

cc @potiuk @eladkal

jedcunningham commented 1 year ago

I lean toward 1. More explicit.

I'm not overly concerned with the name problem, so your suggestion of dag_id-task_id-random works fine. We can put all the identifying info in labels or annotations anyways.

potiuk commented 1 year ago

Agree with @jedcunningham

shalberd commented 1 year ago

Hi, just a positive note from me here: it is great there is work on this, as this issue is e.g. also important when using S3 connections in airflow steps. I saw something like that in IBM's Elyra.

SamWheating commented 10 months ago

Gonna unassign myself due to limited time, anyone else can feel free to pick this one up.

ares-b commented 2 months ago

Where ${worker_name} should be "instance_name + worker_id". This should completely solve the race condition. Any left-over secrets would only result from:

@potiuk Hi, I can try to implement this if you can explain to me how I can grab the instance_name and worker_id