272 lines
7.1 KiB
Go
272 lines
7.1 KiB
Go
|
/*
|
||
|
Copyright 2023.
|
||
|
|
||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
you may not use this file except in compliance with the License.
|
||
|
You may obtain a copy of the License at
|
||
|
|
||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||
|
|
||
|
Unless required by applicable law or agreed to in writing, software
|
||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
See the License for the specific language governing permissions and
|
||
|
limitations under the License.
|
||
|
*/
|
||
|
|
||
|
package controllers
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"fmt"
|
||
|
"net"
|
||
|
"strings"
|
||
|
"time"
|
||
|
|
||
|
errors "github.com/pkg/errors"
|
||
|
|
||
|
batch "k8s.io/api/batch/v1"
|
||
|
coordination "k8s.io/api/coordination/v1"
|
||
|
core "k8s.io/api/core/v1"
|
||
|
"k8s.io/apimachinery/pkg/runtime"
|
||
|
ctrl "sigs.k8s.io/controller-runtime"
|
||
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
||
|
"sigs.k8s.io/controller-runtime/pkg/log"
|
||
|
|
||
|
anesthesiav1alpha1 "gitlab.k3s.fr/k8s/kube-anesthesia-operator/api/v1alpha1"
|
||
|
)
|
||
|
|
||
|
// AnesthesiaNodeReconciler reconciles a AnesthesiaNode object
|
||
|
type AnesthesiaNodeReconciler struct {
|
||
|
client.Client
|
||
|
Scheme *runtime.Scheme
|
||
|
apiReader client.Reader
|
||
|
}
|
||
|
|
||
|
var (
|
||
|
nodeLeaseNs = "kube-node-lease"
|
||
|
deadLeaseTimedelta = 1 * time.Minute
|
||
|
defaultCidrV4 = "/24"
|
||
|
defaultCidrV6 = "/56"
|
||
|
)
|
||
|
|
||
|
func (r *AnesthesiaNodeReconciler) getV1Node(an *anesthesiav1alpha1.AnesthesiaNode) (*core.Node, error) {
|
||
|
var node core.Node
|
||
|
nodeName := client.ObjectKey{
|
||
|
Name: an.ObjectMeta.Name,
|
||
|
Namespace: "",
|
||
|
}
|
||
|
if err := r.Get(context.TODO(), nodeName, &node); err != nil {
|
||
|
return nil, errors.Wrapf(err, "Could not find v1 Node named %v", nodeName.Name)
|
||
|
}
|
||
|
return &node, nil
|
||
|
|
||
|
}
|
||
|
|
||
|
func (r *AnesthesiaNodeReconciler) lastHeartbeat(n *core.Node) (*time.Time, error) {
|
||
|
var lease coordination.Lease
|
||
|
leaseName := client.ObjectKey{
|
||
|
Name: n.Name,
|
||
|
Namespace: nodeLeaseNs,
|
||
|
}
|
||
|
if err := r.Get(context.TODO(), leaseName, &lease); err != nil {
|
||
|
return nil, errors.Wrapf(err, "Could not find Lease %v", leaseName)
|
||
|
}
|
||
|
|
||
|
renewTime := lease.Spec.RenewTime
|
||
|
|
||
|
return &renewTime.Time, nil
|
||
|
}
|
||
|
|
||
|
func isNodeAlive(lastLeaseTime *time.Time) bool {
|
||
|
|
||
|
delta := time.Now().Sub(*lastLeaseTime)
|
||
|
|
||
|
if delta > deadLeaseTimedelta {
|
||
|
return false
|
||
|
}
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
// Ensures that there is a WOL pod running on a targetNode (that is on the same subnet)
|
||
|
func (r *AnesthesiaNodeReconciler) ensureWolJob(an *anesthesiav1alpha1.AnesthesiaNode, targetNode string) error {
|
||
|
job := batch.Job{}
|
||
|
_ = job
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Returns true if both IPs are on the same /cidr subnet
|
||
|
func areIpsOnSameNet(ip1 string, ip2 string, cidr string) bool {
|
||
|
_, ip1net, err := net.ParseCIDR(ip1 + cidr)
|
||
|
if err != nil {
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
_, ip2net, err := net.ParseCIDR(ip2 + cidr)
|
||
|
if err != nil {
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
fmt.Printf("I will compare %v vs %v\n", ip1, ip2)
|
||
|
fmt.Printf("i.e. %v vs %v\n", ip1net, ip2net)
|
||
|
|
||
|
return ip1net.IP.Equal(ip2net.IP)
|
||
|
}
|
||
|
|
||
|
func areNodesOnSameL2(n1 *core.Node, n2 *core.Node) bool {
|
||
|
// This is O(n^2) but I don't care one bit
|
||
|
for _, a1 := range n1.Status.Addresses {
|
||
|
if a1.Type == core.NodeHostName {
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
isIpv6_a1 := strings.Contains(a1.Address, ":")
|
||
|
|
||
|
for _, a2 := range n2.Status.Addresses {
|
||
|
isIpv6_a2 := strings.Contains(a2.Address, ":")
|
||
|
|
||
|
if isIpv6_a1 != isIpv6_a2 {
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
defaultCidr := defaultCidrV4
|
||
|
if isIpv6_a1 {
|
||
|
defaultCidr = defaultCidrV6
|
||
|
}
|
||
|
|
||
|
if areIpsOnSameNet(a1.Address, a2.Address, defaultCidr) {
|
||
|
return true
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
// Returns all the nodes of the cluster that are on the same L2 than node.
|
||
|
// +kubebuilder:rbac:groups=v1,resources=nodes,verbs=get;list
|
||
|
func (r *AnesthesiaNodeReconciler) findNodesOnSameL2(n *core.Node) ([]core.Node, error) {
|
||
|
var allNodes core.NodeList
|
||
|
var sameL2Nodes []core.Node
|
||
|
|
||
|
err := r.List(context.TODO(), &allNodes)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
// Iterate on all the nodes
|
||
|
for _, nn := range allNodes.Items {
|
||
|
if nn.Name == n.Name {
|
||
|
continue
|
||
|
}
|
||
|
if areNodesOnSameL2(n, &nn) {
|
||
|
sameL2Nodes = append(sameL2Nodes, nn)
|
||
|
}
|
||
|
}
|
||
|
return sameL2Nodes, nil
|
||
|
}
|
||
|
|
||
|
//+kubebuilder:rbac:groups=anesthesia.k3s.fr,resources=anesthesianodes,verbs=get;list;watch;create;update;patch;delete
|
||
|
//+kubebuilder:rbac:groups=anesthesia.k3s.fr,resources=anesthesianodes/status,verbs=get;update;patch
|
||
|
//+kubebuilder:rbac:groups=anesthesia.k3s.fr,resources=anesthesianodes/finalizers,verbs=update
|
||
|
|
||
|
// Reconcile is part of the main kubernetes reconciliation loop which aims to
|
||
|
// move the current state of the cluster closer to the desired state.
|
||
|
// TODO(user): Modify the Reconcile function to compare the state specified by
|
||
|
// the AnesthesiaNode object against the actual cluster state, and then
|
||
|
// perform operations to make the cluster state reflect the state specified by
|
||
|
// the user.
|
||
|
//
|
||
|
// For more details, check Reconcile and its Result here:
|
||
|
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.14.1/pkg/reconcile
|
||
|
func (r *AnesthesiaNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
|
||
|
log := log.FromContext(ctx)
|
||
|
|
||
|
var node anesthesiav1alpha1.AnesthesiaNode
|
||
|
|
||
|
if req.Name != "think03" && req.Name != "think02" {
|
||
|
return ctrl.Result{}, nil
|
||
|
}
|
||
|
|
||
|
if err := r.Get(ctx, req.NamespacedName, &node); err != nil {
|
||
|
log.Error(err, "Could not get AnesthesiaNode")
|
||
|
return ctrl.Result{}, client.IgnoreNotFound(err)
|
||
|
}
|
||
|
|
||
|
v1node, err := r.getV1Node(&node)
|
||
|
if err != nil {
|
||
|
log.Error(err, "")
|
||
|
return ctrl.Result{}, nil
|
||
|
}
|
||
|
|
||
|
renewTime, err := r.lastHeartbeat(v1node)
|
||
|
if err != nil {
|
||
|
log.Error(err, "")
|
||
|
return ctrl.Result{}, nil
|
||
|
}
|
||
|
|
||
|
nodeAlive := isNodeAlive(renewTime)
|
||
|
|
||
|
var newActualState string
|
||
|
if node.Spec.State == "on" && nodeAlive {
|
||
|
newActualState = "running"
|
||
|
|
||
|
} else if node.Spec.State == "on" && !nodeAlive {
|
||
|
newActualState = "starting"
|
||
|
|
||
|
nodes, err := r.findNodesOnSameL2(v1node)
|
||
|
if err != nil {
|
||
|
log.Error(err, "Could not find nodes on same L2")
|
||
|
}
|
||
|
|
||
|
// XXX here we should filter for running nodes
|
||
|
if len(nodes) == 0 {
|
||
|
log.Info("No nodes on the same L2 as our node")
|
||
|
return ctrl.Result{RequeueAfter: time.Minute}, nil
|
||
|
}
|
||
|
var allNames []string
|
||
|
for _, n := range nodes {
|
||
|
allNames = append(allNames, n.Name)
|
||
|
}
|
||
|
|
||
|
fmt.Printf("Sames nodes on same L2 as %v: %v", v1node.Name, allNames)
|
||
|
|
||
|
//r.ensureWolJob(&node, "TODO")
|
||
|
// TODO
|
||
|
|
||
|
} else if node.Spec.State == "off" && nodeAlive {
|
||
|
newActualState = "shutting-down"
|
||
|
// XXX make sure that we will still have a node alive for WOL
|
||
|
// TODO
|
||
|
|
||
|
} else if node.Spec.State == "off" && !nodeAlive {
|
||
|
newActualState = "halted"
|
||
|
// TODO
|
||
|
|
||
|
} else {
|
||
|
err := fmt.Errorf("Impossible state %v", node.Spec.State)
|
||
|
return ctrl.Result{}, err
|
||
|
}
|
||
|
|
||
|
if node.Status.ActualState != newActualState {
|
||
|
node.Status.ActualState = newActualState
|
||
|
|
||
|
log.Info("Will update state")
|
||
|
r.Status().Update(ctx, &node)
|
||
|
}
|
||
|
|
||
|
log.Info(fmt.Sprintf("The last renew time of %v was %v", req.Name, renewTime))
|
||
|
|
||
|
// TODO(user): your logic here
|
||
|
|
||
|
return ctrl.Result{RequeueAfter: 2 * time.Second}, nil
|
||
|
}
|
||
|
|
||
|
// SetupWithManager sets up the controller with the Manager.
|
||
|
func (r *AnesthesiaNodeReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
||
|
fmt.Println("IHA, I was setup !")
|
||
|
|
||
|
return ctrl.NewControllerManagedBy(mgr).
|
||
|
For(&anesthesiav1alpha1.AnesthesiaNode{}).
|
||
|
Complete(r)
|
||
|
}
|