GoogleCloudPlatform / flink-on-k8s-operator

[DEPRECATED] Kubernetes operator for managing the lifecycle of Apache Flink and Beam applications.
Apache License 2.0
657 stars 265 forks source link

Improve update and savepoint handling #420

Open elanv opened 3 years ago

elanv commented 3 years ago

Purpose of this PR

Currently, savepoint and its related routines are scattered in several places. It make difficult to enhance this operator now. This PR organizes them so that savepoint-related routines can be improved and extended in the future. It also improves the update, cancel and recovery features that depend on savepoint routines.

Changes

Details

functicons commented 3 years ago

Thanks for the PRs! Let me know when it is ready for review.

shashken commented 3 years ago

@elanv Thank you for doing this! you are awesome :) I opened an issue #427 , I think its the root cause for my mitigation where I introduced some of the fields you are removing here, I will appreciate your feedback there too.

Also, I see that in the takeSavepointOnUpgrade flow I added (nice rename BTW) it takes the SP synchronously. And I saw that while it happends the operator fails to submit new jobclsuters (it just waits for the SP to finish then it submits them) Do you think its something we can change as well? or it proves to difficult ATM?

last thing, do you thinks its possible to leverage the new SavepointMaxAgeToRestoreSeconds (or a new field) to make the job submitter fail if not SP is provided for it? (To reduce the chance for an error in deployment where a job that must start from a SP does not)

elanv commented 3 years ago

I plan to write additional unit test code while doing enough tests. The diagram of the new job states applied to new commit is also attached to the PR description. @shashken Thanks for the comments. I will organize my thoughts and answer later.

elanv commented 3 years ago

@shashken Sorry for late response.

The savepoint issue is specific to the auto savepoint feature. When auto savepoint is enabled, as soon as job starts the operator triggers a savepoint, but the savepoint fails if some tasks of the Flink job did not started. In that case, the savepoint status is updated as failed, and immediatley the next iteration of the reconciliation loop starts again because the status change triggers it. But in the following iterations, it is likely that the previous failing routine is still repeated because there wasn't enough time between the iterations. Basically the savepoint should not be triggered as soon as job starts, therefore, I have fixed it in this PR to trigger the first savepoint after provided time interval passed.

I do not understand still in which case takeSavepointOnUpgrade is needed. When a Flink job is updated, it is expected that the job resumes from its last state. But when takeSavepointOnUpgrade is false, I'm not sure where the job should be restored. Could you explain more about the field?

MaxStateAgeToRestoreSeconds is for the auto restarting from failure or updating job from stopped states. With the field, Flink operator will restart the Flink job only when the recorded savepoint met the age condition. Does it fit your intention too?

shashken commented 3 years ago

@elanv I am making a PR for the concurrent reconcile calls. will mention you there to take a look once its ready.

Regarding the takeSavepointOnUpgrade flag, we need to take care of 3 situations:

  1. When a job fails or gets updated, what is the acceptable age for the state to prevent human errors/bugs (MaxStateAgeToRestoreSeconds covers it perfectly ✔️ )
  2. When a job is updated when do we trigger a new SP? - which is not covered by MaxStateAgeToRestoreSeconds (for example, what if we say we trigger a SP every 24h, and its acceptable to restore from 24h- SP, but when we update the job if the SP is 23h old we probably would like to trigger a new one) Maybe another argument can cover this case?
  3. When a job is upgraded, the default should be to take a fresh SP if needed (as mentioned in the point above) but sometimes, the user might not want to wait for a SP to be created, so an option to disable the SP creation on upgrade would be nice (the first 2 cases are more important IMO)
elanv commented 3 years ago

@functicons Could you review this PR? I think completed this work almost. Now, it would be nice to add the documentation and tests while proceeding with the review.

elanv commented 3 years ago

@shashken I have assumed that the job must be resumed from its last state when update is triggered . So I thought takeSavepointOnUpdate == true means savepoint must be taken always before job update. Assuming on that, it seems that the second issue can be resolved with takeSavepointOnUpdate == false and autoSavepointSeconds == 3600. If it is not appropriate to take savepoint so often, IMO, it seem efficient to take savepoint on update time with takeSavepointOnUpdate == true. If it is necessary to support use case just like the issue, the update strategy need to be elaborated more.

And in the last commit, I applied maxStateAgeToRestoreSeconds to the case takeSavepointOnUpdate == false also, to prevent updating running job with too old savepoint.