GoogleCloudPlatform / flink-on-k8s-operator

[DEPRECATED] Kubernetes operator for managing the lifecycle of Apache Flink and Beam applications.
Apache License 2.0
657 stars 265 forks source link

Savepoints flow changes #404

Open shashken opened 3 years ago

shashken commented 3 years ago

Hey guys, following the comments from @functicons @elanv on my PR: #392 and elanv's fix: #401 for the intermediate fix for Savepoints (which had more than 1 triggered) I would like to have a discussion here before moving on to solving this issue. I have few things I want to bring up:

  1. We need to figure out how we want to make sure more than 1 SP is not triggered at a given time. right now I implemented a TriggerTime mechanism that does not allow more than X triggers in a given period. Maybe we want to have a different approach and check the jobamanager and see if there is a SP/CP in progress and don't allow a SP trigger if there is an active SP/CP. I would like your opinion on what to do and where to implement this as you both know the operator better than me @functicons @elanv

  2. I think its very important to make an optional flag for the operator to not allow a job submit without savepoint and i'll explain: The configuration to update a job with an old SP/trigger a new one and restart is the same as the configuration when you submit without the CRD existing, for some cases a job start with no savepoint(state) is devastating and will cause corrupted results. With the flag the job will simply finish in an error if no savepoint is retrieved. This case will get pretty common when using the operator with a CD solution that makes install/upgrade configuration basically the same.

  3. I want there to be a way to trigger a job restart and SP with a CRD update (right now I found that when I change external configmap I have to change parallelism as well for the operator to notice change and trigger SP+restart) any suggestion here?

Those might not be best solved in a single PR, but the strongest shortage I felt when using the operator was this part regarding Savepoints, everything else works very well for me :)

elanv commented 3 years ago
  1. In fact, the routine has already been implemented to prevent savepoints from overlapping and triggering simultaneously. For this, the status of savepoint checked through jobmanager and it is recorded in status.savepoint. If a new savepoint is triggered even though previous savepoint has not been completed, it is because the operator regards the previous savepoint in progress as failed due to the timeout set by the operator itself. You can refer to the code below. https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/54a7d091745e474fd967c89f4ab50c19ab95ca57/controllers/flinkcluster_updater.go#L496-L501 I think it's better to remove both the operator's own timeout routine above and PR#392's nextOkTriggerTime routine and instead guide the flink's own checkpoint timeout configuration.

  2. If the job is restarted or updated, it will be restored to what was recorded in status.job.savepointLocation. And if the FlinkCluster spec is updated, it compares the time of status.job.lastSavepointTime to SavepointAgeForJobUpdateSec before updating and triggers a new savepoint if it is old. I think it would be nice to make SavepointAgeForJobUpdateSec user configurable. Could you please explain more about what corrupted result could be?

  3. I think it will be possible by updating spec.job.parallelism or spec.flinkProperties. If it is about the feature to resume the stopped job, I think it is better to implement it via attaching annotation like savepoint or cancel rather than updating CR.

shashken commented 3 years ago

Thanks for the quick response @elanv !

  1. Agreed on the flink checkpoint time config, do you believe that if we remove the timeout feature the bug I saw before where multiple savepoint are being triggered? I also saw it when the savepoint was triggered from the restartJob flow. Do you think its a bad idea to verify before each savepoint trigger that no checkpoint/savepoint operation is no in progress? (that can slow the job)

  2. The SavepointAgeForJobUpdateSec flag actually fits perfectly, but I also want a validation when its given that a job will not be submitted when the savepoint is too old / there isn't one, im more worried about a situation where a job is updated and somehow the crd got deleted before, either by human error or bug and the job then is started with no savepoint (and no state), does that makes sense?

  3. Definitely worth a try but thats a little "tricky" not talking about the annotation for job cancel and etc, Im looking for a place to change that indicates to the operator a new revision for the job must be created (some sha256 value changes for example) like a podAnnotation or podLabels for example The code flow I looked at was the reconciler calling shouldUpdateJob that calls isUpdateTriggered from utils that does:

    return status.CurrentRevision != status.NextRevision
elanv commented 3 years ago
1. Agreed on the flink checkpoint time config, do you believe that if we remove the timeout feature the bug I saw before where multiple savepoint are being triggered? I also saw it when the savepoint was triggered from the restartJob flow.

Even for restart for update, timeout for savepoint is still applied.

   Do you think its a bad idea to verify before each savepoint trigger that no checkpoint/savepoint operation is no in progress? (that can slow the job) 

I think a routine to check with the jobmanager is necessary even to record the progress of the savepoint and is already implemented in observer.

2. The `SavepointAgeForJobUpdateSec` flag actually fits perfectly, but I also want a validation when its given that a job will not be submitted when the savepoint is too old / there isn't one, im more worried about a situation where a job is updated and somehow the crd got deleted before, either by human error or bug and the job then is started with no savepoint (and no state), does that makes sense?

Currently, to avoid such problems, it is already implemented so that the update process cannot be started if there is no savepoint - status.job.savepointLocation - or if it is not up to date. (For a restart for job recovery, this routine has not yet been applied, so I think it is something to do in the future.) https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/54a7d091745e474fd967c89f4ab50c19ab95ca57/controllers/flinkcluster_updater.go#L616-L624 However, if the job is already stopped (finished, failed, canceled, etc.), the savepoint cannot be triggered anyway, and updating the spec is regarded as the intention of the user to restart the job.

I think the FlinkCluster spec needs to be improved if the feature of restricting the user to provide a savepoint to be restored upon creation and update is required. There are two fields that the flink operator refers to when restoring a job: status.job.savepointLocation and spec.job.fromSavepoint. And spec.job.fromSavepoint always takes precedence. Therefore, once you specify spec.job.fromSavepoint, you need to change the value of the field to a new savepoint every time you update the spec, or empty the value to restore job from status.job.savepointLocation.

From UX point of view I think it's good to remove fromSavepoint from the spec and use annotations to handle it. You may find it helpful to refer to the existing discussion on this in the link below. And let's dicuss more. https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/pull/245#issuecomment-642050210 https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/pull/245

3. Definitely worth a try but thats a little "tricky" not talking about the annotation for job cancel and etc, Im looking for a place to change that indicates to the operator a new revision for the job must be created (some sha256 value changes for example)
   like a podAnnotation or podLabels for example
   The code flow I looked at was the reconciler calling `shouldUpdateJob` that calls `isUpdateTriggered` from utils that does:
return status.CurrentRevision != status.NextRevision

I haven't quite understood this part yet. Could you explain more?

shashken commented 3 years ago

@elanv

  1. going to create a test branch and play with it, thanks for the guidance.
  2. That sounds like a very helpful feature, I want to make sure there are 2 tests when a job is submitted/updated: -- The savepoint restored from is not older than SavepointAgeForJobRestoreSec -- The job did not start without a savepoint recover (the --from savepoint flag was given) Those condition will make sure there a job that must be restored always from a SP did not restore without one. (no matter if its a first install or upgrade)
  3. Managed to do so with the approach you suggested, added a sha256 as a new key in the flink-conf.yaml and it triggered a job SP and restart, so that covers it I can keep that solution.
elanv commented 3 years ago

I'm trying to improve the savepoint related routine in #420. Since the savepoints and related routines were scattered, I organized those parts overall, so that savepoint related routines can be improved and extended in the future.

The update process also seems to need to be improved.

@functicons Currently, the approach of cancelRequested is applied to the update feature. That approach triggers the savepoint first while the job is running, checks that the savepoint is completed successfully, and then cancels the job. In this case, even when the savepoint is in progress, the job will continue to process stream, therefore externally exactly-once semantics will not be met. Is there any reason to adopt this approach? If there are no issues, instead of that way, I would like to apply "cancel(stop) with savepoint" API that stops the job immediately after the savepoint is completed. It could be applied to both update and cancel.

@shashken Apart from the update, we still need a means to set the maximum savepoint age to restore in the case of a job fails, so I've included it in the PR above. Have you ever been trying a similar feature? I didn't recognize that, but if so, I apologize. And the existing auto savepoint routine could cause problem 1, so I improved initial auto savepoint process based on job start time. I have a question about takeSavepointOnUgrade. Could you please explain more about what this field is for? Is it to force the update when the savepoint fails?