diff --git a/config/crd/bases/anesthesia.k3s.fr_anesthesianodes.yaml b/config/crd/bases/anesthesia.k3s.fr_anesthesianodes.yaml index f55f4b4..75bd850 100644 --- a/config/crd/bases/anesthesia.k3s.fr_anesthesianodes.yaml +++ b/config/crd/bases/anesthesia.k3s.fr_anesthesianodes.yaml @@ -79,8 +79,9 @@ spec: - unknown - starting - running + - draining - shutting-down - - "off" + - halted type: string type: object type: object diff --git a/controllers/anesthesianode_controller.go b/controllers/anesthesianode_controller.go index f06f5dc..55fa228 100644 --- a/controllers/anesthesianode_controller.go +++ b/controllers/anesthesianode_controller.go @@ -52,6 +52,7 @@ var ( defaultCidrV4 = "/24" defaultCidrV6 = "/56" wolImage = "registry.k3s.fr/k8s/docker-wakeonlan-image/main:latest" + shutdownImage = "registry.k3s.fr/k8s/docker-ubuntu-systemd/main:latest" namespace = "default" ) @@ -104,6 +105,13 @@ func jobLabels(node *anesthesiav1alpha1.AnesthesiaNode) map[string]string { } } +func podLabels(node *anesthesiav1alpha1.AnesthesiaNode) map[string]string { + return map[string]string{ + "anesthesia.k3s.fr/node": node.Name, + "anesthesia.k3s.fr/type": "shutdown", + } +} + func (r *AnesthesiaNodeReconciler) WolJob(runnerNode *corev1.Node, wakeupNode *anesthesiav1alpha1.AnesthesiaNode) *batch.Job { randid := uuid.New().String()[0:6] @@ -133,11 +141,66 @@ func (r *AnesthesiaNodeReconciler) WolJob(runnerNode *corev1.Node, wakeupNode *a } +func shutdownPod(shutdownNode *anesthesiav1alpha1.AnesthesiaNode) *corev1.Pod { + randid := uuid.New().String()[0:6] + + hostPathDirectory := corev1.HostPathDirectory + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "shutdown-" + shutdownNode.Name + "-" + randid, + Namespace: namespace, + Labels: podLabels(shutdownNode), + }, + Spec: corev1.PodSpec{ + NodeName: shutdownNode.Name, + RestartPolicy: "Never", + HostPID: true, + Containers: []corev1.Container{ + { + Image: shutdownImage, + Name: "shutdown", + Command: []string{"sh", "-c", "systemctl poweroff"}, + Env: []corev1.EnvVar{ + { + Name: "SYSTEMD_IGNORE_CHROOT", + Value: "1", + }, + }, + VolumeMounts: []corev1.VolumeMount{ + { + MountPath: "/run/systemd", + Name: "runsystemd", + }, + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: "runsystemd", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: "/run/systemd", + Type: &hostPathDirectory, + }, + }, + }, + }, + }, + } + return pod +} + // Ensures that there is a WOL pod running on a targetNode (that is on the same subnet) +// XXX to rebase ensureRessource func (r *AnesthesiaNodeReconciler) ensureWolJob(job *batch.Job) error { err := r.Create(context.TODO(), job) return err } +func (r *AnesthesiaNodeReconciler) ensurePod(job *corev1.Pod) error { + err := r.Create(context.TODO(), job) + return err +} // Returns true if both IPs are on the same /cidr subnet func areIpsOnSameNet(ip1 string, ip2 string, cidr string) bool { @@ -206,6 +269,7 @@ func (r *AnesthesiaNodeReconciler) findNodesOnSameL2(n *corev1.Node) ([]*corev1. sameL2Nodes = append(sameL2Nodes, &nn) } } + fmt.Printf("SameL2as %v: %v\n\n\n", n, sameL2Nodes) return sameL2Nodes, nil } @@ -231,6 +295,7 @@ func (r *AnesthesiaNodeReconciler) listWakeupJobs(wakeupNode *anesthesiav1alpha1 } func (r *AnesthesiaNodeReconciler) areRunningWakeupJobs(wakeupNode *anesthesiav1alpha1.AnesthesiaNode) (bool, error) { + // XXX this could return the running wakeupjobs so that the function is not called twice nodeJobs, err := r.listWakeupJobs(wakeupNode) if err != nil { return false, err @@ -267,6 +332,52 @@ func (r *AnesthesiaNodeReconciler) removeOldJobs(wakeupNode *anesthesiav1alpha1. return nil } +func (r *AnesthesiaNodeReconciler) listShutdownPods(shutdownNode *anesthesiav1alpha1.AnesthesiaNode) (*corev1.PodList, error) { + nodePodLabels := podLabels(shutdownNode) + var podList corev1.PodList + + err := r.List(context.TODO(), &podList, client.InNamespace(namespace), client.MatchingLabels(nodePodLabels)) + return &podList, err + +} + +func (r *AnesthesiaNodeReconciler) removePod(pod corev1.Pod) error { + fmt.Printf("Will remove pod %v\n", pod.Name) + err := r.Delete(context.TODO(), &pod) + return err + +} + +func (r *AnesthesiaNodeReconciler) removeOldShutdownPods(shutdownNode *anesthesiav1alpha1.AnesthesiaNode) error { + shutdownPods, err := r.listShutdownPods(shutdownNode) + if err != nil { + fmt.Printf("Could not list shutdown pods: %v\n", err) + return err + } + + for _, pod := range shutdownPods.Items { + switch pod.Status.Phase { + case corev1.PodPending, corev1.PodRunning: + // Nothing to do + case corev1.PodFailed: + fmt.Printf("Pod %v failed !\n", pod.Name) + fallthrough + case corev1.PodUnknown: + // Shouldn't be used by k8s, but arises in k3s + fallthrough + case corev1.PodSucceeded: + // This pod shall be removed + err := r.removePod(pod) + if err != nil { + return err // Should we continue with others instead ? + } + } + + } + return nil + +} + //+kubebuilder:rbac:groups=anesthesia.k3s.fr,resources=anesthesianodes,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=anesthesia.k3s.fr,resources=anesthesianodes/status,verbs=get;update;patch //+kubebuilder:rbac:groups=anesthesia.k3s.fr,resources=anesthesianodes/finalizers,verbs=update @@ -288,6 +399,7 @@ func (r *AnesthesiaNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reque if req.Name != "think02" { return ctrl.Result{}, nil } + fmt.Printf("\n\n\nHALLO, I was launched !!!\n\n") if err := r.Get(ctx, req.NamespacedName, &node); err != nil { log.Error(err, "Could not get AnesthesiaNode") @@ -302,14 +414,14 @@ func (r *AnesthesiaNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reque renewTime, err := r.lastHeartbeat(v1node) if err != nil { - log.Error(err, "") + log.Error(err, "Couldn't get last heartbeat") return ctrl.Result{}, nil } nodeAlive := isNodeAlive(renewTime) var newActualState string - requeue := 2 * time.Second + requeue := 0 * time.Second if node.Spec.State == "on" && nodeAlive { // FIXME newActualState = "running" err = r.removeOldJobs(&node) @@ -334,14 +446,16 @@ func (r *AnesthesiaNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reque l2Nodes, err := r.findNodesOnSameL2(v1node) if err != nil { log.Error(err, "Could not find nodes on same L2") - return ctrl.Result{}, err + requeue = 1 * time.Minute + return ctrl.Result{RequeueAfter: requeue}, nil } // XXX here we should filter for running nodes // i.e. we should have at least 1 running node before suspending it if len(l2Nodes) == 0 { log.Info("No nodes on the same L2 as our node") - return ctrl.Result{RequeueAfter: time.Minute}, nil + requeue = 1 * time.Minute + return ctrl.Result{RequeueAfter: requeue}, nil } candidateNode := getCandidateNodeFromList(l2Nodes) @@ -350,18 +464,32 @@ func (r *AnesthesiaNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reque err = r.ensureWolJob(woljob) if err != nil { log.Error(err, "Could not ensure WOL Job") - return ctrl.Result{RequeueAfter: 2 * time.Minute}, err + return ctrl.Result{RequeueAfter: 2 * time.Minute}, err // XXX Maybe we need to fail here ? } requeue = 2 * time.Minute } else if node.Spec.State == "off" && nodeAlive { + // We have to cordon + drain the node, and then shut it down + // XXX we need another if here with cordon -> shut down newActualState = "shutting-down" // XXX make sure that we will still have a node alive for WOL - // TODO + + // XXX cleanup old shutdown pods + err := r.removeOldShutdownPods(&node) + + pod := shutdownPod(&node) + err = r.ensurePod(pod) + if err != nil { + log.Error(err, "Could not create shutdown Pod") + return ctrl.Result{RequeueAfter: 2 * time.Minute}, err // XXX maybe we need to fail here + } + log.Info("Created shutdown pod") + + requeue = 2 * time.Minute } else if node.Spec.State == "off" && !nodeAlive { newActualState = "halted" - // TODO + log.Info("Nothing new to do here !") } else { err := fmt.Errorf("Impossible state %v", node.Spec.State) @@ -369,10 +497,13 @@ func (r *AnesthesiaNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reque } if node.Status.ActualState != newActualState { - node.Status.ActualState = newActualState + log.Info(fmt.Sprintf("Will update state %v->%v", node.Status.ActualState, newActualState)) - log.Info("Will update state") - r.Status().Update(ctx, &node) + node.Status.ActualState = newActualState + err := r.Status().Update(ctx, &node) + if err != nil { + log.Error(err, "Could not update state") + } } // TODO(user): your logic here