dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.66k stars 1.47k forks source link

Make resources more reusable #9018

Open flvndh opened 2 years ago

flvndh commented 2 years ago

What's the use case?

Let's put ourselves in the shoes of a Dagster add-on provider, for example, Azure.

Azure, for most of its SDKs, uses the TokenCredential protocol to authenticate clients. Implementations of this protocol are given by the azure-identity package.

So, we would expect from this add-on provider to come with a credential resource like this (simplified)

AZURE_CREDENTIAL_CONFIG = {
    "credential": Field(Selector({
        "managed_identity": {
            "client_id": Field(StringSource, description="The user-assigned identity's client ID", is_required=False)
        },
        "secret": {
            "tenant_id": Field(StringSource, description="ID of the service principal's tenant"),
            "client_id": Field(StringSource, description="The service principal's client ID"),
            "client_secret": Field(StringSource, description="One of the service principal's client secrets"),
        }
    }))
}

@resource(config_schema=AZURE_CREDENTIAL_CONFIG, description="Credential for Azure SDKs")
def credential(init_context: InitResourceContext) -> TokenCredential:
    if "managed_identity" in init_context.resource_config["credential"]:
        client_id = init_context.resource_config["credential"]["managed_identity"].get("client_id")

        return ManagedIdentityCredential(client_id=client_id)
    else:
        tenant_id = init_context.resource_config["credential"]["secret"]["tenant_id"]
        client_id = init_context.resource_config["credential"]["secret"]["client_id"]
        client_secret = init_context.resource_config["credential"]["secret"]["client_secret"]

        return ClientSecretCredential(tenant_id, client_id, client_secret)

That way, Azure resources can declare a dependency to the credential resource:

KEY_VAULT_CLIENT_CONFIG = {
    "subscription_id": Field(StringSource, description="The subscription ID"),
}

@resource(config_schema=KEY_VAULT_CLIENT_CONFIG, required_resource_keys={"credential"}, description="Azure Key Vault client")
def key_vault(init_context: InitResourceContext) -> KeyVaultManagementClient:
    subscription_id = init_context.resource_config["subscription_id"]
    credential = init_context.resources.credential

    return KeyVaultManagementClient(credential, subscription_id=subscription_id)

The problem arises when the user wants to use a client with different sets of credentials:

@op(required_resource_keys={"key_vault_a"})
def do_something_with_key_vault_a(context):
    pass

@op(required_resource_keys={"key_vault_b"})
def do_something_with_key_vault_b(context):
    pass

@job(resource_defs={"key_vault_a": key_vault, "key_vault_b": key_vault, "credential": credential})
def do_something_with_both_key_vault():
    pass

With the required resource key credential being set by the provider, we can't provide two different sets of credentials for key_vault_a and key_vault_b.

Ideas of implementation

A possible solution would be to give the ability to "map" required resource keys:

key_vault_a = key_vault.with_mapped_resource_keys({"credential": "credential_a"})
key_vault_b = key_vault.with_mapped_resource_keys({"credential": "credential_b"})

@job(resource_defs={
    "key_vault_a": key_vault_a, 
    "key_vault_b": key_vault_b, 
    "credential_a": credential, 
    "credential_b": credential
})
def do_something_with_both_key_vault():
    pass

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

sryza commented 2 years ago

This is related: https://github.com/dagster-io/dagster/issues/2112