Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
"context"
"github.com/go-logr/logr"
"github.com/pkg/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/cluster-api/util/patch"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
operatorv1 "k8s.io/kubeadm/operator/api/v1alpha1"
operatorerrors "k8s.io/kubeadm/operator/errors"
"k8s.io/kubeadm/operator/operations"
)
// OperationReconciler reconciles a Operation object
type OperationReconciler struct {
client.Client
ManagerContainerName string
ManagerNamespace string
AgentImage string
MetricsRBAC bool
Log logr.Logger
recorder record.EventRecorder
}
// +kubebuilder:rbac:groups=core,resources=events,verbs=get;list;watch;create;patch
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;
// +kubebuilder:rbac:groups=apps,resources=daemonsets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=operator.kubeadm.x-k8s.io,resources=operations,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=operator.kubeadm.x-k8s.io,resources=operations/status,verbs=get;update;patch
// SetupWithManager configures the controller for calling the reconciler
func (r *OperationReconciler) SetupWithManager(mgr ctrl.Manager) error {
err := ctrl.NewControllerManagedBy(mgr).
For(&operatorv1.Operation{}).
Owns(&operatorv1.RuntimeTaskGroup{}). // force reconcile operation every time one of the owned TaskGroups change
Complete(r)
//TODO: watch DS for operation Daemonsets
r.recorder = mgr.GetEventRecorderFor("operation-controller")
return err
}
// Reconcile an operation
func (r *OperationReconciler) Reconcile(req ctrl.Request) (_ ctrl.Result, rerr error) {
ctx := context.Background()
log := r.Log.WithValues("operation", req.NamespacedName)
// Fetch the Operation instance
operation := &operatorv1.Operation{}
if err := r.Client.Get(ctx, req.NamespacedName, operation); err != nil {
if apierrors.IsNotFound(err) {
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}
// Ignore the Operation if it is already completed or failed
if operation.Status.CompletionTime != nil {
// Reconcile the daemon set that deploys controller agents on nodes, so we are sure it is deleted after completion
err := r.reconcileDaemonSet(operation, log)
if err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
// Initialize the patch helper
patchHelper, err := patch.NewHelper(operation, r)
if err != nil {
return ctrl.Result{}, err
}
// Always attempt to Patch the Operation object and status after each reconciliation.
defer func() {
if err := patchHelper.Patch(ctx, operation); err != nil {
log.Error(err, "failed to patch Operation")
if rerr == nil {
rerr = err
}
}
}()
// Reconcile the Operation
if err := r.reconcileOperation(operation, log); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
func (r *OperationReconciler) reconcileDaemonSet(operation *operatorv1.Operation, log logr.Logger) error {
daemonSet, err := getDaemonSet(r.Client, operation, r.ManagerNamespace)
if err != nil {
return err
}
if daemonSet != nil {
// if operation completed
if daemonSetShouldBeRunning(operation) {
return nil
}
log.WithValues("daemonset-name", daemonSetName(operation.Name)).Info("deleting DaemonSet")
if err := deleteDaemonSet(r.Client, daemonSet); err != nil {
return err
}
return nil
}
if !daemonSetShouldBeRunning(operation) {
return nil
}
// if operation running
log.WithValues("daemonset-name", daemonSetName(operation.Name)).Info("creating DaemonSet")
image := r.AgentImage
if image == "" {
image, err = getImage(r.Client, r.ManagerNamespace, r.ManagerContainerName)
if err != nil {
return err
}
}
if err := createDaemonSet(r.Client, operation, r.ManagerNamespace, image, r.MetricsRBAC); err != nil {
return err
}
return nil
}
func daemonSetShouldBeRunning(operation *operatorv1.Operation) bool {
return operation.Status.CompletionTime == nil &&
operation.Status.ErrorReason == nil &&
operation.Status.ErrorMessage == nil
}
func (r *OperationReconciler) reconcileOperation(operation *operatorv1.Operation, log logr.Logger) (err error) {
// Reconcile paused settings
r.reconcilePause(operation)
// Reconcile labels so the operation and the operation object can be searched by a well-known set of labels
r.reconcileLabels(operation)
// Reconcile the daemon set that deploys controller agents on nodes
err = r.reconcileDaemonSet(operation, log)
if err != nil {
return
}
// Handle deleted Operation
if !operation.DeletionTimestamp.IsZero() {
err = r.reconcileDelete(operation)
if err != nil {
return
}
}
// Handle non-deleted Operation
// gets controlled taskGroups items (desired vs actual)
taskGroups, err := r.reconcileTaskGroups(operation, log)
if err != nil {
return err
}
err = r.reconcileNormal(operation, taskGroups, log)
if err != nil {
return
}
// Always reconcile Phase at the end
r.reconcilePhase(operation)
return
}
func (r *OperationReconciler) reconcilePause(operation *operatorv1.Operation) {
// record paused state change, if any
recordPausedChange(r.recorder, operation, operation.Status.Paused, operation.Spec.Paused)
// update status with paused setting
operation.Status.Paused = operation.Spec.Paused
}
func (r *OperationReconciler) reconcileLabels(operation *operatorv1.Operation) {
if operation.Labels == nil {
operation.Labels = map[string]string{}
}
if _, ok := operation.Labels[operatorv1.OperationNameLabel]; !ok {
operation.Labels[operatorv1.OperationNameLabel] = operation.Name
}
if _, ok := operation.Labels[operatorv1.OperationUIDLabel]; !ok {
operation.Labels[operatorv1.OperationUIDLabel] = string(uuid.NewUUID())
}
}
func (r *OperationReconciler) reconcileTaskGroups(operation *operatorv1.Operation, log logr.Logger) (*taskGroupReconcileList, error) {
// gets all the desired TaskGroup objects for the current operation
// Nb. this is the domain knowledge encoded into operation implementations
desired, err := operations.TaskGroupList(operation)
if err != nil {
return nil, errors.Wrap(err, "failed to get desired TaskGroup list")
}
// gets the current TaskGroup objects related to this Operation
actual, err := listTaskGroupsByLabels(r.Client, operation.Labels)
if err != nil {
return nil, errors.Wrap(err, "failed to list TaskGroup")
}
r.Log.Info("reconciling", "desired-TaskGroups", len(desired.Items), "TaskGroups", len(actual.Items))
// match current and desired TaskGroup, so the controller can determine what is necessary to do next
taskGroups := reconcileTaskGroups(desired, actual)
// update replica counters
operation.Status.Groups = int32(len(taskGroups.all))
operation.Status.RunningGroups = int32(len(taskGroups.running))
operation.Status.SucceededGroups = int32(len(taskGroups.completed))
operation.Status.FailedGroups = int32(len(taskGroups.failed))
operation.Status.InvalidGroups = int32(len(taskGroups.invalid))
return taskGroups, nil
}
func (r *OperationReconciler) reconcileNormal(operation *operatorv1.Operation, taskGroups *taskGroupReconcileList, log logr.Logger) error {
// If the Operation doesn't have finalizer, add it.
//if !util.Contains(operation.Finalizers, operatorv1.OperationFinalizer) {
// operation.Finalizers = append(operation.Finalizers, operatorv1.OperationFinalizer)
//}
// if there are TaskGroup not yet completed (pending or running), cleanup error messages (required e.g. after recovery)
// NB. It is necessary to give priority to running vs errors so the operation controller keeps alive/restarts
// the DaemonsSet for processing tasks
if taskGroups.activeTaskGroups() > 0 {
operation.Status.ResetError()
} else {
// if there are invalid combinations (e.g. a TaskGroup without a desired TaskGroup)
// set the error and stop creating new TaskGroups
if len(taskGroups.invalid) > 0 {
// TODO: improve error message
operation.Status.SetError(
operatorerrors.NewOperationReconciliationError("something invalid"),
)
return nil
}
// if there are failed TaskGroup
// set the error and stop creating new TaskGroups
if len(taskGroups.failed) > 0 {
// TODO: improve error message
operation.Status.SetError(
operatorerrors.NewOperationReplicaError("something failed"),
)
return nil
}
}
// TODO: manage adopt tasks/tasks to be orphaned
// if nil, set the Operation start time
if operation.Status.StartTime == nil {
operation.Status.SetStartTime()
//TODO: add a signature so we can detect if someone/something changes the operations while it is processed
return nil
}
// if the completed TaskGroup have reached the number of expected TaskGroup, the Operation is completed
// NB. we are doing this before checking pause because if everything is completed, does not make sense to pause
if len(taskGroups.completed) == len(taskGroups.all) {
// NB. we are setting this condition explicitly in order to avoid that the Operation accidentally
// restarts to create TaskGroup
operation.Status.SetCompletionTime()
}
// if the TaskGroup is paused, return
if operation.Status.Paused {
return nil
}
// otherwise, proceed creating TaskGroup
// if there are still TaskGroup to be created
if len(taskGroups.tobeCreated) > 0 {
// if there no TaskGroup not yet completed (pending or running)
if taskGroups.activeTaskGroups() == 0 {
// create the next TaskGroup in the ordered sequence
nextTaskGroup := taskGroups.tobeCreated[0].planned
log.WithValues("task-group", nextTaskGroup.Name).Info("creating task")
err := r.Client.Create(context.Background(), nextTaskGroup)
if err != nil {
return errors.Wrap(err, "Failed to create TaskGroup")
}
}
}
return nil
}
func (r *OperationReconciler) reconcileDelete(operation *operatorv1.Operation) error {
// Operation is deleted so remove the finalizer.
//operation.Finalizers = util.Filter(operation.Finalizers, operatorv1.OperationFinalizer)
return nil
}
func (r *OperationReconciler) reconcilePhase(operation *operatorv1.Operation) {
// Set the phase to "deleting" if the deletion timestamp is set.
if !operation.DeletionTimestamp.IsZero() {
operation.Status.SetTypedPhase(operatorv1.OperationPhaseDeleted)
return
}
// Set the phase to "failed" if any of Status.ErrorReason or Status.ErrorMessage is not-nil.
if operation.Status.ErrorReason != nil || operation.Status.ErrorMessage != nil {
operation.Status.SetTypedPhase(operatorv1.OperationPhaseFailed)
return
}
// Set the phase to "succeeded" if completion date is set.
if operation.Status.CompletionTime != nil {
operation.Status.SetTypedPhase(operatorv1.OperationPhaseSucceeded)
return
}
// Set the phase to "paused" if paused set.
if operation.Status.Paused {
operation.Status.SetTypedPhase(operatorv1.OperationPhasePaused)
return
}
// Set the phase to "running" if start date is set.
if operation.Status.StartTime != nil {
operation.Status.SetTypedPhase(operatorv1.OperationPhaseRunning)
return
}
// Set the phase to "pending" if nil.
operation.Status.SetTypedPhase(operatorv1.OperationPhasePending)
}
ew file mode 100644
ndex 0000000..0c4f50e
++ b/controllers/operation_controller_test.go
manage adopt tasks/tasks to be orphaned
https://github.com/pacoxu/kubeadm-operator/blob/dfef3ab2a20e2af53bea4db15830249bfbc8f071/controllers/operation_controller.go#L282
ebad023b717e208adf9ad1de8c5db522e2e5c26e