From 0fe539c32f68e602978fd0c7844221f8efbee0d7 Mon Sep 17 00:00:00 2001 From: Frank Villaro-Dixon Date: Tue, 6 Jun 2023 15:51:54 +0200 Subject: [PATCH] smore stuff Signed-off-by: Frank Villaro-Dixon --- controllers/anesthesianode_controller.go | 179 +++++++++++++++++++---- 1 file changed, 149 insertions(+), 30 deletions(-) diff --git a/controllers/anesthesianode_controller.go b/controllers/anesthesianode_controller.go index 6268af4..f06f5dc 100644 --- a/controllers/anesthesianode_controller.go +++ b/controllers/anesthesianode_controller.go @@ -27,12 +27,15 @@ import ( batch "k8s.io/api/batch/v1" coordination "k8s.io/api/coordination/v1" - core "k8s.io/api/core/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" + "github.com/google/uuid" + anesthesiav1alpha1 "gitlab.k3s.fr/k8s/kube-anesthesia-operator/api/v1alpha1" ) @@ -48,10 +51,12 @@ var ( deadLeaseTimedelta = 1 * time.Minute defaultCidrV4 = "/24" defaultCidrV6 = "/56" + wolImage = "registry.k3s.fr/k8s/docker-wakeonlan-image/main:latest" + namespace = "default" ) -func (r *AnesthesiaNodeReconciler) getV1Node(an *anesthesiav1alpha1.AnesthesiaNode) (*core.Node, error) { - var node core.Node +func (r *AnesthesiaNodeReconciler) getV1Node(an *anesthesiav1alpha1.AnesthesiaNode) (*corev1.Node, error) { + var node corev1.Node nodeName := client.ObjectKey{ Name: an.ObjectMeta.Name, Namespace: "", @@ -63,7 +68,7 @@ func (r *AnesthesiaNodeReconciler) getV1Node(an *anesthesiav1alpha1.AnesthesiaNo } -func (r *AnesthesiaNodeReconciler) lastHeartbeat(n *core.Node) (*time.Time, error) { +func (r *AnesthesiaNodeReconciler) lastHeartbeat(n *corev1.Node) (*time.Time, error) { var lease coordination.Lease leaseName := client.ObjectKey{ Name: n.Name, @@ -88,11 +93,50 @@ func isNodeAlive(lastLeaseTime *time.Time) bool { return true } +func s(s string) *string { + return &s +} + +func jobLabels(node *anesthesiav1alpha1.AnesthesiaNode) map[string]string { + return map[string]string{ + "anesthesia.k3s.fr/node": node.Name, + "anesthesia.k3s.fr/type": "wakeup", + } +} + +func (r *AnesthesiaNodeReconciler) WolJob(runnerNode *corev1.Node, wakeupNode *anesthesiav1alpha1.AnesthesiaNode) *batch.Job { + randid := uuid.New().String()[0:6] + + job := &batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "wakeup-" + wakeupNode.Name + "-" + randid, + Namespace: namespace, + Labels: jobLabels(wakeupNode), + }, + Spec: batch.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + NodeName: runnerNode.Name, + RestartPolicy: "OnFailure", + Containers: []corev1.Container{ + { + Image: wolImage, + Name: "wakeup", + Command: []string{"sh", "-c", "for i in `seq 20`; do wol wake " + wakeupNode.Spec.Wakeup.Wol.MacAddr + "; sleep 2; done"}, + }, + }, + }, + }, + }, + } + return job + +} + // Ensures that there is a WOL pod running on a targetNode (that is on the same subnet) -func (r *AnesthesiaNodeReconciler) ensureWolJob(an *anesthesiav1alpha1.AnesthesiaNode, targetNode string) error { - job := batch.Job{} - _ = job - return nil +func (r *AnesthesiaNodeReconciler) ensureWolJob(job *batch.Job) error { + err := r.Create(context.TODO(), job) + return err } // Returns true if both IPs are on the same /cidr subnet @@ -113,10 +157,10 @@ func areIpsOnSameNet(ip1 string, ip2 string, cidr string) bool { return ip1net.IP.Equal(ip2net.IP) } -func areNodesOnSameL2(n1 *core.Node, n2 *core.Node) bool { +func areNodesOnSameL2(n1 *corev1.Node, n2 *corev1.Node) bool { // This is O(n^2) but I don't care one bit for _, a1 := range n1.Status.Addresses { - if a1.Type == core.NodeHostName { + if a1.Type == corev1.NodeHostName { continue } @@ -144,9 +188,9 @@ func areNodesOnSameL2(n1 *core.Node, n2 *core.Node) bool { // Returns all the nodes of the cluster that are on the same L2 than node. // +kubebuilder:rbac:groups=v1,resources=nodes,verbs=get;list -func (r *AnesthesiaNodeReconciler) findNodesOnSameL2(n *core.Node) ([]core.Node, error) { - var allNodes core.NodeList - var sameL2Nodes []core.Node +func (r *AnesthesiaNodeReconciler) findNodesOnSameL2(n *corev1.Node) ([]*corev1.Node, error) { + var allNodes corev1.NodeList + var sameL2Nodes []*corev1.Node err := r.List(context.TODO(), &allNodes) if err != nil { @@ -159,12 +203,70 @@ func (r *AnesthesiaNodeReconciler) findNodesOnSameL2(n *core.Node) ([]core.Node, continue } if areNodesOnSameL2(n, &nn) { - sameL2Nodes = append(sameL2Nodes, nn) + sameL2Nodes = append(sameL2Nodes, &nn) } } return sameL2Nodes, nil } +func getCandidateNodeFromList(nodes []*corev1.Node) *corev1.Node { + // XXX FIXME this is PoC-style level + + for _, node := range nodes { + // Check that the node is running + return node + } + return nil + +} + +// XXX kubebuilder +func (r *AnesthesiaNodeReconciler) listWakeupJobs(wakeupNode *anesthesiav1alpha1.AnesthesiaNode) (*batch.JobList, error) { + nodeJobLabels := jobLabels(wakeupNode) + var nodeJobs batch.JobList + + err := r.List(context.TODO(), &nodeJobs, client.InNamespace(namespace), client.MatchingLabels(nodeJobLabels)) + return &nodeJobs, err + +} + +func (r *AnesthesiaNodeReconciler) areRunningWakeupJobs(wakeupNode *anesthesiav1alpha1.AnesthesiaNode) (bool, error) { + nodeJobs, err := r.listWakeupJobs(wakeupNode) + if err != nil { + return false, err + } + + for _, job := range nodeJobs.Items { + if job.Status.Active > 0 { + return true, nil + } + } + return false, nil + +} + +func (r *AnesthesiaNodeReconciler) removeOldJobs(wakeupNode *anesthesiav1alpha1.AnesthesiaNode) error { + + nodeJobs, err := r.listWakeupJobs(wakeupNode) + + if err != nil { + fmt.Printf("Could not list wakeupJobs: %v\n", err) + return err + } + + for _, job := range nodeJobs.Items { + if job.Status.Succeeded >= 1 { + fmt.Printf("Will remove the fucking job %v\n", job) + err = r.Delete(context.TODO(), &job) + if err != nil { + fmt.Printf("Could not delete fucking job ! %v\n", err) + return err + } + } + } + return nil +} + //+kubebuilder:rbac:groups=anesthesia.k3s.fr,resources=anesthesianodes,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=anesthesia.k3s.fr,resources=anesthesianodes/status,verbs=get;update;patch //+kubebuilder:rbac:groups=anesthesia.k3s.fr,resources=anesthesianodes/finalizers,verbs=update @@ -183,7 +285,7 @@ func (r *AnesthesiaNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reque var node anesthesiav1alpha1.AnesthesiaNode - if req.Name != "think03" && req.Name != "think02" { + if req.Name != "think02" { return ctrl.Result{}, nil } @@ -207,31 +309,50 @@ func (r *AnesthesiaNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reque nodeAlive := isNodeAlive(renewTime) var newActualState string - if node.Spec.State == "on" && nodeAlive { + requeue := 2 * time.Second + if node.Spec.State == "on" && nodeAlive { // FIXME newActualState = "running" + err = r.removeOldJobs(&node) + if err != nil { + log.Error(err, "Couldn't remove old jobs") + return ctrl.Result{}, err + } - } else if node.Spec.State == "on" && !nodeAlive { + } else if node.Spec.State == "on" && !nodeAlive { // FIXME newActualState = "starting" - nodes, err := r.findNodesOnSameL2(v1node) + runningJobs, err := r.areRunningWakeupJobs(&node) + if err != nil { + log.Error(err, "Could not get running wakeup jobs") + return ctrl.Result{}, err + } + if runningJobs { + log.Info("Wakeup job still running") + return ctrl.Result{RequeueAfter: time.Minute}, nil + } + + l2Nodes, err := r.findNodesOnSameL2(v1node) if err != nil { log.Error(err, "Could not find nodes on same L2") + return ctrl.Result{}, err } // XXX here we should filter for running nodes - if len(nodes) == 0 { + // i.e. we should have at least 1 running node before suspending it + if len(l2Nodes) == 0 { log.Info("No nodes on the same L2 as our node") return ctrl.Result{RequeueAfter: time.Minute}, nil } - var allNames []string - for _, n := range nodes { - allNames = append(allNames, n.Name) + + candidateNode := getCandidateNodeFromList(l2Nodes) + log.Info("Candidate node: " + candidateNode.Name) + woljob := r.WolJob(candidateNode, &node) + err = r.ensureWolJob(woljob) + if err != nil { + log.Error(err, "Could not ensure WOL Job") + return ctrl.Result{RequeueAfter: 2 * time.Minute}, err } - - fmt.Printf("Sames nodes on same L2 as %v: %v", v1node.Name, allNames) - - //r.ensureWolJob(&node, "TODO") - // TODO + requeue = 2 * time.Minute } else if node.Spec.State == "off" && nodeAlive { newActualState = "shutting-down" @@ -254,11 +375,9 @@ func (r *AnesthesiaNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reque r.Status().Update(ctx, &node) } - log.Info(fmt.Sprintf("The last renew time of %v was %v", req.Name, renewTime)) - // TODO(user): your logic here - return ctrl.Result{RequeueAfter: 2 * time.Second}, nil + return ctrl.Result{RequeueAfter: requeue}, nil } // SetupWithManager sets up the controller with the Manager.