flyteorg / flyte

Scalable and flexible workflow orchestration platform that seamlessly unifies data, ML and analytics stacks.
https://flyte.org
Apache License 2.0
5.18k stars 550 forks source link

Timout for Flyte file sensors #5035

Open shekhar-kotekar opened 3 months ago

shekhar-kotekar commented 3 months ago

Motivation: Why do you think this is important?

When we start running a workflow and use file sensor, this senor keeps waiting for the file infinitely. This becomes a problem in 2 scenarios.

1) If file does not arrive at all then workflow never expires. We have to manully stop the workflow execution. 2) We schedule the workflow to run at certain interval (say every 2 hours) - with the first execution of the workflow file sensor is waiting for the file to arrive. If file does not arrive within 2 hours then after 2 hours Flyte will schedule another execution of the workflow so both the executions will keep waiting for the same file which will possibly lead to duplicate execution of the workflow. This can lead to unpredictable, unwanted results like - cluster resource being wasted, data duplication, errors, etc.

Goal: What should the final outcome look like, ideally?

Ideally we should be able to specify timeouts for a file sensor like shown belo:

sensor = FileSensor(name="s3_file_sensor", timeout="2h")

We should be able to specify timeout in terms of minutes, hours, days, etc. Example - "5m" for 5 minutes, "3h" for 3 hours, etc.

After the timeout workflow can fail so that users can take some action like restarting the workflow or some other action.

Describe alternatives you've considered

As of now we have set workflow execution CRON schedule to run every 12 hours.

Propose: Link/Inline OR Additional context

No response

Are you sure this issue hasn't been raised already?

Have you read the Code of Conduct?

dosubot[bot] commented 3 months ago

Hey there, @shekhar-kotekar! Good to see you back in the fray. How's everything going on your end?

Thank you for your detailed request. I understand the need for a timeout feature for the file sensor in Flyte to prevent infinite waiting and potential workflow duplication.

Currently, Flyte does support a form of timeout for tasks, including those waiting on file sensors. This is done through the TaskMetadata class within the FlyteIDL (Flyte Interface Definition Language) TypeScript definition, which includes a timeout field of type google.protobuf.Duration. This field can be utilized to specify the overall timeout for a task, including any user-triggered retries. Here's the relevant part of the TaskMetadata class showing the timeout field:

/**
 * Task Metadata
 *
 * @generated from message flyteidl.core.TaskMetadata
 */
export class TaskMetadata extends Message<TaskMetadata> {
  // ...
  /**
   * The overall timeout of a task including user-triggered retries.
   *
   * @generated from field: google.protobuf.Duration timeout = 4;
   */
  timeout?: Duration;
  // ...
}

You can find this in the flyteidl/gen/pb-es/flyteidl/core/tasks_pb.ts file.

To use this feature, you would specify the desired timeout duration for your task in the task's metadata. This ensures that if a file sensor or any other operation within the task does not complete within the specified duration, the task will timeout, helping to prevent issues related to infinite waiting or workflow duplication.

In addition, the current implementation of the FileSensor in Flyte, as shown in the provided YAML configuration, supports a form of timeout. The timeout field under metadata specifies the duration after which the sensor should time out if it hasn't been triggered. In this case, the timeout is set to 1200 seconds. You can find this in the flytepropeller/pkg/compiler/test/testdata/snacks-core/compiled/075_my-objectstore-sensor_1_task.yaml file.

When a timeout event occurs in the Flyte platform, it transitions the node phase to NodePhaseTimedOut. This is indicated by the NodePhase enumeration in the provided context, where NodePhaseTimedOut is a distinct phase. This transition suggests that the workflow or task that experienced the timeout will be marked accordingly, reflecting that it did not complete successfully within the allotted time frame. You can find this in the flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go file.

However, I understand that you are looking for a more direct way to specify a timeout for a file sensor in its configuration. While the current implementation does not seem to support this directly, your suggestion to add a timeout parameter to the FileSensor configuration is a valid one and could be a valuable addition to the Flyte platform.

I hope this information is helpful. If you have any further questions or need more clarification, please feel free to ask.

Sources

#### About Dosu This response is meant to be useful and save you time. It is **not meant to be a precise solution**, but rather a starting point for your own research. Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant. If you want to continue the conversation, start your reply with @dosu-bot.
kumare3 commented 4 days ago

cc @pingsutw

Future-Outlier commented 3 days ago

I will start from here.

image