kube-anesthesia/controllers/anesthesianode_controller.go
Frank Villaro-Dixon 11c8053422 works somewhat
Signed-off-by: Frank Villaro-Dixon <frank.villarodixon@merkle.com>
2023-06-07 18:09:22 +02:00

522 lines
14 KiB
Go

/*
Copyright 2023.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
"context"
"fmt"
"net"
"strings"
"time"
errors "github.com/pkg/errors"
batch "k8s.io/api/batch/v1"
coordination "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"github.com/google/uuid"
anesthesiav1alpha1 "gitlab.k3s.fr/k8s/kube-anesthesia-operator/api/v1alpha1"
)
// AnesthesiaNodeReconciler reconciles a AnesthesiaNode object
type AnesthesiaNodeReconciler struct {
client.Client
Scheme *runtime.Scheme
apiReader client.Reader
}
var (
nodeLeaseNs = "kube-node-lease"
deadLeaseTimedelta = 1 * time.Minute
defaultCidrV4 = "/24"
defaultCidrV6 = "/56"
wolImage = "registry.k3s.fr/k8s/docker-wakeonlan-image/main:latest"
shutdownImage = "registry.k3s.fr/k8s/docker-ubuntu-systemd/main:latest"
namespace = "default"
)
func (r *AnesthesiaNodeReconciler) getV1Node(an *anesthesiav1alpha1.AnesthesiaNode) (*corev1.Node, error) {
var node corev1.Node
nodeName := client.ObjectKey{
Name: an.ObjectMeta.Name,
Namespace: "",
}
if err := r.Get(context.TODO(), nodeName, &node); err != nil {
return nil, errors.Wrapf(err, "Could not find v1 Node named %v", nodeName.Name)
}
return &node, nil
}
func (r *AnesthesiaNodeReconciler) lastHeartbeat(n *corev1.Node) (*time.Time, error) {
var lease coordination.Lease
leaseName := client.ObjectKey{
Name: n.Name,
Namespace: nodeLeaseNs,
}
if err := r.Get(context.TODO(), leaseName, &lease); err != nil {
return nil, errors.Wrapf(err, "Could not find Lease %v", leaseName)
}
renewTime := lease.Spec.RenewTime
return &renewTime.Time, nil
}
func isNodeAlive(lastLeaseTime *time.Time) bool {
delta := time.Now().Sub(*lastLeaseTime)
if delta > deadLeaseTimedelta {
return false
}
return true
}
func s(s string) *string {
return &s
}
func jobLabels(node *anesthesiav1alpha1.AnesthesiaNode) map[string]string {
return map[string]string{
"anesthesia.k3s.fr/node": node.Name,
"anesthesia.k3s.fr/type": "wakeup",
}
}
func podLabels(node *anesthesiav1alpha1.AnesthesiaNode) map[string]string {
return map[string]string{
"anesthesia.k3s.fr/node": node.Name,
"anesthesia.k3s.fr/type": "shutdown",
}
}
func (r *AnesthesiaNodeReconciler) WolJob(runnerNode *corev1.Node, wakeupNode *anesthesiav1alpha1.AnesthesiaNode) *batch.Job {
randid := uuid.New().String()[0:6]
job := &batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "wakeup-" + wakeupNode.Name + "-" + randid,
Namespace: namespace,
Labels: jobLabels(wakeupNode),
},
Spec: batch.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
NodeName: runnerNode.Name,
RestartPolicy: "OnFailure",
Containers: []corev1.Container{
{
Image: wolImage,
Name: "wakeup",
Command: []string{"sh", "-c", "for i in `seq 20`; do wol wake " + wakeupNode.Spec.Wakeup.Wol.MacAddr + "; sleep 2; done"},
},
},
},
},
},
}
return job
}
func shutdownPod(shutdownNode *anesthesiav1alpha1.AnesthesiaNode) *corev1.Pod {
randid := uuid.New().String()[0:6]
hostPathDirectory := corev1.HostPathDirectory
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "shutdown-" + shutdownNode.Name + "-" + randid,
Namespace: namespace,
Labels: podLabels(shutdownNode),
},
Spec: corev1.PodSpec{
NodeName: shutdownNode.Name,
RestartPolicy: "Never",
HostPID: true,
Containers: []corev1.Container{
{
Image: shutdownImage,
Name: "shutdown",
Command: []string{"sh", "-c", "systemctl poweroff"},
Env: []corev1.EnvVar{
{
Name: "SYSTEMD_IGNORE_CHROOT",
Value: "1",
},
},
VolumeMounts: []corev1.VolumeMount{
{
MountPath: "/run/systemd",
Name: "runsystemd",
},
},
},
},
Volumes: []corev1.Volume{
{
Name: "runsystemd",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/run/systemd",
Type: &hostPathDirectory,
},
},
},
},
},
}
return pod
}
// Ensures that there is a WOL pod running on a targetNode (that is on the same subnet)
// XXX to rebase ensureRessource
func (r *AnesthesiaNodeReconciler) ensureWolJob(job *batch.Job) error {
err := r.Create(context.TODO(), job)
return err
}
func (r *AnesthesiaNodeReconciler) ensurePod(job *corev1.Pod) error {
err := r.Create(context.TODO(), job)
return err
}
// Returns true if both IPs are on the same /cidr subnet
func areIpsOnSameNet(ip1 string, ip2 string, cidr string) bool {
_, ip1net, err := net.ParseCIDR(ip1 + cidr)
if err != nil {
return false
}
_, ip2net, err := net.ParseCIDR(ip2 + cidr)
if err != nil {
return false
}
fmt.Printf("I will compare %v vs %v\n", ip1, ip2)
fmt.Printf("i.e. %v vs %v\n", ip1net, ip2net)
return ip1net.IP.Equal(ip2net.IP)
}
func areNodesOnSameL2(n1 *corev1.Node, n2 *corev1.Node) bool {
// This is O(n^2) but I don't care one bit
for _, a1 := range n1.Status.Addresses {
if a1.Type == corev1.NodeHostName {
continue
}
isIpv6_a1 := strings.Contains(a1.Address, ":")
for _, a2 := range n2.Status.Addresses {
isIpv6_a2 := strings.Contains(a2.Address, ":")
if isIpv6_a1 != isIpv6_a2 {
continue
}
defaultCidr := defaultCidrV4
if isIpv6_a1 {
defaultCidr = defaultCidrV6
}
if areIpsOnSameNet(a1.Address, a2.Address, defaultCidr) {
return true
}
}
}
return false
}
// Returns all the nodes of the cluster that are on the same L2 than node.
// +kubebuilder:rbac:groups=v1,resources=nodes,verbs=get;list
func (r *AnesthesiaNodeReconciler) findNodesOnSameL2(n *corev1.Node) ([]*corev1.Node, error) {
var allNodes corev1.NodeList
var sameL2Nodes []*corev1.Node
err := r.List(context.TODO(), &allNodes)
if err != nil {
return nil, err
}
// Iterate on all the nodes
for _, nn := range allNodes.Items {
if nn.Name == n.Name {
continue
}
if areNodesOnSameL2(n, &nn) {
sameL2Nodes = append(sameL2Nodes, &nn)
}
}
fmt.Printf("SameL2as %v: %v\n\n\n", n, sameL2Nodes)
return sameL2Nodes, nil
}
func getCandidateNodeFromList(nodes []*corev1.Node) *corev1.Node {
// XXX FIXME this is PoC-style level
for _, node := range nodes {
// Check that the node is running
return node
}
return nil
}
// XXX kubebuilder
func (r *AnesthesiaNodeReconciler) listWakeupJobs(wakeupNode *anesthesiav1alpha1.AnesthesiaNode) (*batch.JobList, error) {
nodeJobLabels := jobLabels(wakeupNode)
var nodeJobs batch.JobList
err := r.List(context.TODO(), &nodeJobs, client.InNamespace(namespace), client.MatchingLabels(nodeJobLabels))
return &nodeJobs, err
}
func (r *AnesthesiaNodeReconciler) areRunningWakeupJobs(wakeupNode *anesthesiav1alpha1.AnesthesiaNode) (bool, error) {
// XXX this could return the running wakeupjobs so that the function is not called twice
nodeJobs, err := r.listWakeupJobs(wakeupNode)
if err != nil {
return false, err
}
for _, job := range nodeJobs.Items {
if job.Status.Active > 0 {
return true, nil
}
}
return false, nil
}
func (r *AnesthesiaNodeReconciler) removeOldJobs(wakeupNode *anesthesiav1alpha1.AnesthesiaNode) error {
nodeJobs, err := r.listWakeupJobs(wakeupNode)
if err != nil {
fmt.Printf("Could not list wakeupJobs: %v\n", err)
return err
}
for _, job := range nodeJobs.Items {
if job.Status.Succeeded >= 1 {
fmt.Printf("Will remove the fucking job %v\n", job)
err = r.Delete(context.TODO(), &job)
if err != nil {
fmt.Printf("Could not delete fucking job ! %v\n", err)
return err
}
}
}
return nil
}
func (r *AnesthesiaNodeReconciler) listShutdownPods(shutdownNode *anesthesiav1alpha1.AnesthesiaNode) (*corev1.PodList, error) {
nodePodLabels := podLabels(shutdownNode)
var podList corev1.PodList
err := r.List(context.TODO(), &podList, client.InNamespace(namespace), client.MatchingLabels(nodePodLabels))
return &podList, err
}
func (r *AnesthesiaNodeReconciler) removePod(pod corev1.Pod) error {
fmt.Printf("Will remove pod %v\n", pod.Name)
err := r.Delete(context.TODO(), &pod)
return err
}
func (r *AnesthesiaNodeReconciler) removeOldShutdownPods(shutdownNode *anesthesiav1alpha1.AnesthesiaNode) error {
shutdownPods, err := r.listShutdownPods(shutdownNode)
if err != nil {
fmt.Printf("Could not list shutdown pods: %v\n", err)
return err
}
for _, pod := range shutdownPods.Items {
switch pod.Status.Phase {
case corev1.PodPending, corev1.PodRunning:
// Nothing to do
case corev1.PodFailed:
fmt.Printf("Pod %v failed !\n", pod.Name)
fallthrough
case corev1.PodUnknown:
// Shouldn't be used by k8s, but arises in k3s
fallthrough
case corev1.PodSucceeded:
// This pod shall be removed
err := r.removePod(pod)
if err != nil {
return err // Should we continue with others instead ?
}
}
}
return nil
}
//+kubebuilder:rbac:groups=anesthesia.k3s.fr,resources=anesthesianodes,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=anesthesia.k3s.fr,resources=anesthesianodes/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=anesthesia.k3s.fr,resources=anesthesianodes/finalizers,verbs=update
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the AnesthesiaNode object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.14.1/pkg/reconcile
func (r *AnesthesiaNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)
var node anesthesiav1alpha1.AnesthesiaNode
if req.Name != "think02" {
return ctrl.Result{}, nil
}
fmt.Printf("\n\n\nHALLO, I was launched !!!\n\n")
if err := r.Get(ctx, req.NamespacedName, &node); err != nil {
log.Error(err, "Could not get AnesthesiaNode")
return ctrl.Result{}, client.IgnoreNotFound(err)
}
v1node, err := r.getV1Node(&node)
if err != nil {
log.Error(err, "")
return ctrl.Result{}, nil
}
renewTime, err := r.lastHeartbeat(v1node)
if err != nil {
log.Error(err, "Couldn't get last heartbeat")
return ctrl.Result{}, nil
}
nodeAlive := isNodeAlive(renewTime)
var newActualState string
requeue := 0 * time.Second
if node.Spec.State == "on" && nodeAlive { // FIXME
newActualState = "running"
err = r.removeOldJobs(&node)
if err != nil {
log.Error(err, "Couldn't remove old jobs")
return ctrl.Result{}, err
}
} else if node.Spec.State == "on" && !nodeAlive { // FIXME
newActualState = "starting"
runningJobs, err := r.areRunningWakeupJobs(&node)
if err != nil {
log.Error(err, "Could not get running wakeup jobs")
return ctrl.Result{}, err
}
if runningJobs {
log.Info("Wakeup job still running")
return ctrl.Result{RequeueAfter: time.Minute}, nil
}
l2Nodes, err := r.findNodesOnSameL2(v1node)
if err != nil {
log.Error(err, "Could not find nodes on same L2")
requeue = 1 * time.Minute
return ctrl.Result{RequeueAfter: requeue}, nil
}
// XXX here we should filter for running nodes
// i.e. we should have at least 1 running node before suspending it
if len(l2Nodes) == 0 {
log.Info("No nodes on the same L2 as our node")
requeue = 1 * time.Minute
return ctrl.Result{RequeueAfter: requeue}, nil
}
candidateNode := getCandidateNodeFromList(l2Nodes)
log.Info("Candidate node: " + candidateNode.Name)
woljob := r.WolJob(candidateNode, &node)
err = r.ensureWolJob(woljob)
if err != nil {
log.Error(err, "Could not ensure WOL Job")
return ctrl.Result{RequeueAfter: 2 * time.Minute}, err // XXX Maybe we need to fail here ?
}
requeue = 2 * time.Minute
} else if node.Spec.State == "off" && nodeAlive {
// We have to cordon + drain the node, and then shut it down
// XXX we need another if here with cordon -> shut down
newActualState = "shutting-down"
// XXX make sure that we will still have a node alive for WOL
// XXX cleanup old shutdown pods
err := r.removeOldShutdownPods(&node)
pod := shutdownPod(&node)
err = r.ensurePod(pod)
if err != nil {
log.Error(err, "Could not create shutdown Pod")
return ctrl.Result{RequeueAfter: 2 * time.Minute}, err // XXX maybe we need to fail here
}
log.Info("Created shutdown pod")
requeue = 2 * time.Minute
} else if node.Spec.State == "off" && !nodeAlive {
newActualState = "halted"
log.Info("Nothing new to do here !")
} else {
err := fmt.Errorf("Impossible state %v", node.Spec.State)
return ctrl.Result{}, err
}
if node.Status.ActualState != newActualState {
log.Info(fmt.Sprintf("Will update state %v->%v", node.Status.ActualState, newActualState))
node.Status.ActualState = newActualState
err := r.Status().Update(ctx, &node)
if err != nil {
log.Error(err, "Could not update state")
}
}
// TODO(user): your logic here
return ctrl.Result{RequeueAfter: requeue}, nil
}
// SetupWithManager sets up the controller with the Manager.
func (r *AnesthesiaNodeReconciler) SetupWithManager(mgr ctrl.Manager) error {
fmt.Println("IHA, I was setup !")
return ctrl.NewControllerManagedBy(mgr).
For(&anesthesiav1alpha1.AnesthesiaNode{}).
Complete(r)
}