Open Dyex719 opened 1 week ago
I just ran a quick example as per Flink Operater QuickStart, a simple FlinkDeployment looks like(you can get it from here):
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: basic-example
spec:
image: flink:1.17
flinkVersion: v1_17
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 2
upgradeMode: stateless
Also, dumped the configurations here:
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
creationTimestamp: "2024-05-07T08:59:32Z"
finalizers:
- flinkdeployments.flink.apache.org/finalizer
generation: 2
name: basic-example
namespace: default
resourceVersion: "30397"
uid: fa8d217e-9859-4ccd-ba39-f4670aa6f3f5
spec:
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
flinkVersion: v1_17
image: flink:1.17
job:
args: []
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 2
state: running
upgradeMode: stateless
jobManager:
replicas: 1
resource:
cpu: 1
memory: 2048m
serviceAccount: flink
taskManager:
resource:
cpu: 1
memory: 2048m
status:
clusterInfo:
flink-revision: c0027e5 @ 2023-11-09T13:24:38+01:00
flink-version: 1.17.2
total-cpu: "2.0"
total-memory: "4294967296"
jobManagerDeploymentStatus: READY
jobStatus:
checkpointInfo:
lastPeriodicCheckpointTimestamp: 0
jobId: fb11661e5eebb5c2aea39ab0405f9b85
jobName: State machine job
savepointInfo:
lastPeriodicSavepointTimestamp: 0
savepointHistory: []
startTime: "1715073594831"
state: RUNNING
updateTime: "1715073620122"
lifecycleState: STABLE
observedGeneration: 2
reconciliationStatus:
lastReconciledSpec: '{"spec":{"job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":2,"entryClass":null,"args":[],"state":"running","savepointTriggerNonce":null,"initialSavepointPath":null,"checkpointTriggerNonce":null,"upgradeMode":"stateless","allowNonRestoredState":null,"savepointRedeployNonce":null},"restartNonce":null,"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"},"image":"flink:1.17","imagePullPolicy":null,"serviceAccount":"flink","flinkVersion":"v1_17","ingress":null,"podTemplate":null,"jobManager":{"resource":{"cpu":1.0,"memory":"2048m","ephemeralStorage":null},"replicas":1,"podTemplate":null},"taskManager":{"resource":{"cpu":1.0,"memory":"2048m","ephemeralStorage":null},"replicas":null,"podTemplate":null},"logConfiguration":null,"mode":null},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":2},"firstDeployment":true}}'
lastStableSpec: '{"spec":{"job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":2,"entryClass":null,"args":[],"state":"running","savepointTriggerNonce":null,"initialSavepointPath":null,"checkpointTriggerNonce":null,"upgradeMode":"stateless","allowNonRestoredState":null,"savepointRedeployNonce":null},"restartNonce":null,"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"},"image":"flink:1.17","imagePullPolicy":null,"serviceAccount":"flink","flinkVersion":"v1_17","ingress":null,"podTemplate":null,"jobManager":{"resource":{"cpu":1.0,"memory":"2048m","ephemeralStorage":null},"replicas":1,"podTemplate":null},"taskManager":{"resource":{"cpu":1.0,"memory":"2048m","ephemeralStorage":null},"replicas":null,"podTemplate":null},"logConfiguration":null,"mode":null},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":2},"firstDeployment":true}}'
reconciliationTimestamp: 1715072375768
state: DEPLOYED
taskManager:
labelSelector: component=taskmanager,app=basic-example
replicas: 1
I guess your use case might be:
FlinkDeployment
to KarmadaFlinkDeployment
to one
of the available clusters. You want a feature that Karmada can schedule the FlinkDeployment
in failed state
to another available cluster. That's what you mean failover
, right?
Before starting to discuss the solutions, I might need to ask several questions to understand your use case.
Thanks for the quick response @RainbowMango!
Yes, the general use-case is as you've described -
FlinkDeployment
to KarmadaFlinkDeployment
to 2 out of 3 clusters in an effort to have application redundancy. Our users have an option of deploying in an active-active configuration (where both apps are actively running), or active-passive in which case one application acts as a hot backup. We want to be able to:
total cluster failures
(when member's control plane is down, impacting Karmada's connection to the member cluster). This seems to work based off our current testing, but will require some tuning to decrease the total failover time.partial cluster failures
, which results in pods scheduled by FlinkDeployment's operator going down, and then being unable to be scheduled due to lack of resources. For point 2, it becomes a little tricky as we would like to differentiate between application-level errors that result in pod failures (which Karmada shouldn't reschedule) vs. pod scheduling errors. Ideally the FlinkDeployment's status could reflect these differences so that Karmada can intepret the FlinkDeployment's health status accurately - will need to discuss further.
Our users have an option of deploying in an active-active configuration (where both apps are actively running), or active-passive in which case one application acts as a hot backup.
For the active-active configuration, I guess you mean deploy 2 FlinkDeployments on 2 clusters, that result in two Flink clusters. I'm not familiar with Flink, does that mean the two Flink clusters are collectively consuming a single data stream?
This seems to work based off our current testing, but will require some tuning to decrease the total failover time.
The cluster-level failover is based on taints, you can set the tolerations to controller how long a deployment should be wait before cluster failure.
In addition, there are some flags in karmada-controller-manager
that also can be used to tune the precision regarding to detect cluster failure, such as :
Ideally the FlinkDeployment's status could reflect these differences so that Karmada can intepret the FlinkDeployment's health status accurately - will need to discuss further.
Yes, It would be great if we can observe the status(failover or not), from the FlinkDeployment's status? Can we do that? If so, we can try to use the Application Failover to implements the failover.
For the active-active configuration, I guess you mean deploy 2 FlinkDeployments on 2 clusters, that result in two Flink clusters. I'm not familiar with Flink, does that mean the two Flink clusters are collectively consuming a single data stream?
Yes we would deploy 2 identical FlinkDeployments, both deployed with identical data stream sources that would feed the applications the same data. In an active-passive configuration, the setup would be similar except for only one of the data sources providing data to the active application.
The cluster-level failover is based on taints, you can set the tolerations to controller how long a deployment should be wait before cluster failure. In addition, there are some flags in
karmada-controller-manager
that also can be used to tune the precision regarding to detect cluster failure, such as :
Thanks! Yes we are actively tuning these parameters. :)
Yes, It would be great if we can observe the status(failover or not), from the FlinkDeployment's status? Can we do that?
We used a custom health interpreter to determine the FlinkDeployment's health - which seems to be working. Will confirm if this is fully suited for our use-cases after we finish testing.
What would you like to be added: During our testing we noticed that descheduler will filter out all resource types that are not deployments. Because of this, we would like to propose adding descheduler support for CRDs (custom resources).
Why is this needed: We aim to use Karmada specifically for it's failover and deschedule capabilities, for the FlinkDeployment CRD. Looking at previous issues, there seems to be interest this in type of support, and we believe a generic solution could benefit the community. We wanted to create this ticket to start a conversation and get some feedback / opinions on this type of support.