flyteorg / flyte

Scalable and flexible workflow orchestration platform that seamlessly unifies data, ML and analytics stacks.
https://flyte.org
Apache License 2.0
5.62k stars 620 forks source link

[BUG] Cannot use `on_failure` with a task that uses secrets #4999

Open lindseynield opened 7 months ago

lindseynield commented 7 months ago

Describe the bug

When I register a workflow that uses on_failure to call a task that uses secret requests, I get an error that Failed with Unknown Exception <class 'ValueError'> Reason: Please make sure to add secret_requests=[Secret(group=<group>, key=<key>)] in @task. Unable to find secret for key token in group secret in Env Var:_FSEC_SECRET_TOKEN and FilePath: <path> even though this exact secret_requests is already in the @task. The secret does exist, and can be successfully requested in a task that is called in the workflow, but it fails when the task is called in on_failure

Expected behavior

on_failure can call a task with secret injection as long as secret_requests is in the @task

Additional context to reproduce

Using a secret that exists at secret/token

from flytekit import Secret, task, workflow, current_context
from pytest import fail

@task(secret_requests=[Secret(group="secret", key="token")])
def on_failure_task() -> None:
    token = current_context().secrets.get("secret", "token")

@task
def fail_task() -> None:
    fail("This task is supposed to fail")

@workflow(on_failure=on_failure_task())
def fail_workflow() -> None:
    fail_task()

Results in: Failed with Unknown Exception <class 'ValueError'> Reason: Please make sure to add secret_requests=[Secret(group=secret, key=token)] in @task. Unable to find secret for key token in group secret in Env Var:_FSEC_SECRET_TOKEN and FilePath: secret/token

Whereas this is valid:

from flytekit import Secret, task, workflow, current_context
from pytest import fail

@task(secret_requests=[Secret(group="secret", key="token")])
def on_failure_task() -> None:
    token = current_context().secrets.get("secret", "token")

@workflow
def fail_workflow() -> None:
    on_failure_task()

Screenshots

No response

Are you sure this issue hasn't been raised already?

Have you read the Code of Conduct?

dosubot[bot] commented 7 months ago

Hey @lindseynield! Great to see you here again, diving into those intricate corners of Flyte! 🚀

Thank you for providing a detailed description of the issue you're encountering. It seems like you've stumbled upon a limitation or bug in Flyte v1.10.7 regarding the handling of secrets in failure scenarios.

In the provided code, you've correctly specified the secret requirement for the task using the @task decorator and secret_requests parameter:

@task(secret_requests=[Secret(group="secret", key="token")])
def on_failure_task() -> None:
    token = current_context().secrets.get("secret", "token")

This configuration indicates that the task requires a secret named token from the secret group. However, the behavior you're experiencing suggests that the Flyte system's handling of secrets during task failure scenarios might not be fully operational or has limitations in the version you're using (v1.10.7).

Unfortunately, as of my last update, specific documentation or changelogs directly addressing this issue in the context of on_failure behavior and secret injection in Flyte v1.10.7 were not available. However, it's common for software projects like Flyte to continuously evolve, with bugs being fixed and limitations being addressed in subsequent releases.

To resolve this issue, consider the following steps:

  1. Check Flyte Documentation: Look for any known issues or guidance on handling secrets in failure scenarios in the Flyte documentation. There might be updates or workarounds available.

  2. Update Flyte: If possible, consider updating to a newer version of Flyte. Newer versions may contain fixes or improvements that address the issue you're facing.

Without direct access to the Flyte codebase as of version v1.10.7 and specific details on how on_failure is implemented in relation to secret handling, it's challenging to provide a more precise diagnosis or solution. However, the steps above are the recommended approach to troubleshooting and resolving issues of this nature in complex software systems.

Regarding your question about how Flyte v1.10.7 handles secret injection in tasks during normal execution, Flyte allows tasks to request secrets that they need access to. In the provided context, the @task decorator is used with the secret_requests parameter to specify that the secret_task function requires access to a secret. The Secret object is used to specify the secret's group (SECRET_GROUP) and key (SECRET_NAME). During the task's execution, the secret value is retrieved using flytekit.current_context().secrets.get(SECRET_GROUP, SECRET_NAME). This approach ensures that the task has access to the necessary secret while maintaining the secret's confidentiality, as it is not hard-coded or exposed in the task's code. The secret must be configured in the Flyte deployment for this to work, and the Flyte platform handles the secure injection of the secret's value into the task's execution environment.

I hope this information helps. If you have any further questions or need additional clarification, please don't hesitate to ask.

Sources

#### About Dosu This response is meant to be useful and save you time. It is **not meant to be a precise solution**, but rather a starting point for your own research. Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant. If you want to continue the conversation, start your reply with @dosu-bot.
novahow commented 6 months ago

@lindseynield Hi, maybe you can try the following:

from flytekit import Secret, task, workflow, current_context
from pytest import fail

@task(secret_requests=[Secret(group="secret", key="token")])
def on_failure_task() -> None:
    token = current_context().secrets.get("secret", "token")

@task
def fail_task() -> None:
    fail("This task is supposed to fail")

@workflow(on_failure=on_failure_task)
def fail_workflow() -> None:
    fail_task()

This simply removed the parenthesis after the on_failure_task, according to the doc https://docs.flyte.org/en/latest/user_guide/development_lifecycle/failure_node.html

kumare3 commented 5 months ago

@lindseynield is this resolved?

kumare3 commented 5 months ago

I think we can close this out?