Open art4ul opened 4 years ago
Let me preface by saying that, the feature you are asking does make sense, its just that we need to think about how long should we keep waiting, is there a timeout? @art4ul on the other hand, for your problem Ideally you want to prevent side-effects in Flyte Workflows. So why do you not lock the dataset that will be loaded by the execution every 1 hour, and thus the subsequent workflows will always work on independent units. This also gives you complete replayability.
Lets take an example, you have a dataset that materializes at some frequency. datasets, d1( time x), d2( time x+delta).... Your workflow runs every hour, at time t.
You could structure your workflow like this
Task1: lock-dataset(kickoff-time, execution_window="1 hour") -> []dataset_references
collect_datasets for times (t - execution_window)
return collected_datasets
Task2: process-dataset(refs []dataset_references)
process-dataset(refs)
outputs...
Thus you can see, you are always locking the datasets so no matter when you run, but if you run with the right kickoff-time, you will end up with the right datasets to be processed
Thank you @kumare3 for reaching out!
Let me preface by saying that, the feature you are asking does make sense, its just that we need to think about how long should we keep waiting, is there a timeout?
I think we should not wait at all , we should prevent/skip the new execution if current is not complete.
@art4ul on the other hand, for your problem Ideally you want to prevent side-effects in Flyte Workflows. So why do you not lock the dataset that will be loaded by the execution every 1 hour, and thus the subsequent workflows will always work on independent units. This also gives you complete replayability.
Lets take an example, you have a dataset that materializes at some frequency. datasets, d1( time x), d2( time x+delta).... Your workflow runs every hour, at time t.
You could structure your workflow like this
Task1: lock-dataset(kickoff-time, execution_window="1 hour") -> []dataset_references collect_datasets for times (t - execution_window) return collected_datasets Task2: process-dataset(refs []dataset_references) process-dataset(refs) outputs...
Thus you can see, you are always locking the datasets so no matter when you run, but if you run with the right kickoff-time, you will end up with the right datasets to be processed
Definitely I could solve my issue by locking an input dataset during the processing time. But If I have a guarantee that my job will be scheduled in single instance(yes it's week guarantee ) I could avoid additional development and additional dependencies (Zookeeper or Consul) . I agree that using distributed locks provides stronger garantee of preventing double processing of the dataset. But in many cases we need to provide quick and simple solution, I beliave that the option "skip if running" would be very helpful that cases. Sorry, but maybe I didn't catch your idea.
@art4ul I guess I did not explain it well.
we should prevent/skip the new execution if current is not complete.
That is a good suggestion, So what we could start off with is in the Launch Plan when you attach a schedule, we could allow a scheduling policy, where an option is skip new executions if a previous instance of this schedule is still in progress
But If I have a guarantee that my job will be scheduled in single instance(yes it's week guarantee ) I could avoid I think my example completely made it hard for you to understand. Sorry for the bad example. By LOCK i mean, capture the datasets path completely as a closure.
So let us take a better example
`Assume a directory called dataset.
dataset gets a new file every hour. So the the fully qualified paths for these files could look something like,
dataset/t1, dataset/t2 ..... dataset/tn
Now, let us assume we want to run a scheduled job every 2 hours, that processes the files in the last 2 hours. Thus an implementation could run at any time and lookback for files in the last 2 hours using the names of the files or timestamps etc. But that may not work for late landing datasets and things like retries will affect the outcomes.
So another option could be,
Split the task into 2 tasks
Task1:
Looks at the directory and captures all the files that Task2 should work on as a list, it could be implemented as simply as listing the directory in reverse order of time and then filter all files older than time t = current time - 2 hours. This will give a locked set of files. Thus Task2, for subsequent retries etc will not use different set of files.
Task2:
Works on the input set of files`
again that being said, we should definitely implement skipping, Can you help us implement this? It should be really simple to implement. @katrogan could help you design the solution.
This is somewhat related to the feature that is covered by https://github.com/flyteorg/flyte/issues/872. @art4ul would this be enough, or you want complete serialization guarantee. Also related to https://github.com/flyteorg/flyte/issues/420
Lets do a spec first and solve all of them together.
This should not be closed
Cc @EngHabu
Hello 👋, This issue has been inactive for over 9 months. To help maintain a clean and focused backlog, we'll be marking this issue as stale and will close the issue if we detect no activity in the next 7 days. Thank you for your contribution and understanding! 🙏
Hello 👋, This issue has been inactive for over 9 months and hasn't received any updates since it was marked as stale. We'll be closing this issue for now, but if you believe this issue is still relevant, please feel free to reopen it. Thank you for your contribution and understanding! 🙏
We have need for this feature as well, is anyone working on this currently ? Would be happy to collaborate.
This seems to be a critical feature. We should have option to set concurrency policy https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/#concurrency-policy
This is a standard and the current implementation in flyte makes it unfit for a large type of cron jobs.
I can have workflows which CANNOT run in parallel -> a new one should simply be cancelled or waited or replaced. There should be an option for that.
Also, an incident occurred where cluster was down for some time (a few hours) and as soon as cluster was up - it was flooded with workflows (from cron), although according to the nature of workflow -> running it once is sufficient (after downtime). Now I need to manually cleanup all the workflows in the cluster.
hi @nikp1172 do you mind checking to see if https://github.com/flyteorg/flyte/pull/5659 would be a possible solution for your needs (or please leave any comments if there are issues you notice)
Motivation: Why do you think this is important? In my project I use scheduled launch plan to process a dataset each 1 hour. But sometimes the workflow may process the data more the 1 hour. In this case the scheduler creates the new concurrent execution of the launch plan. But the concurrent execution of the job is unacceptable for my use case (I get dublicated output).
Goal: What should the final outcome look like, ideally? I think it would be nice to have an option for launch plan to allow concurrent execution or not.
Describe alternatives you've considered The only one option fo me to start implementing some distributed locks using some external systems like Zookeeper
Flyte component
Is this a blocker for you to adopt Flyte Yes