package kubernetes import ( "encoding/json" "net/http" "net/http/httptest" "reflect" "sync" "testing" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" ) func TestGetAPIPathsWithNamespaces(t *testing.T) { f := func(role string, namespaces []string, selectors []Selector, expectedPaths []string) { t.Helper() paths := getAPIPathsWithNamespaces(role, namespaces, selectors) if !reflect.DeepEqual(paths, expectedPaths) { t.Fatalf("unexpected paths; got\n%q\nwant\n%q", paths, expectedPaths) } } // role=node f("node", nil, nil, []string{"/api/v1/nodes"}) f("node", []string{"foo", "bar"}, nil, []string{"/api/v1/nodes"}) f("node", nil, []Selector{ { Role: "pod", Label: "foo", Field: "bar", }, }, []string{"/api/v1/nodes"}) f("node", nil, []Selector{ { Role: "node", Label: "foo", Field: "bar", }, }, []string{"/api/v1/nodes?labelSelector=foo&fieldSelector=bar"}) f("node", []string{"x", "y"}, []Selector{ { Role: "node", Label: "foo", Field: "bar", }, }, []string{"/api/v1/nodes?labelSelector=foo&fieldSelector=bar"}) // role=pod f("pod", nil, nil, []string{"/api/v1/pods"}) f("pod", []string{"foo", "bar"}, nil, []string{ "/api/v1/namespaces/foo/pods", "/api/v1/namespaces/bar/pods", }) f("pod", nil, []Selector{ { Role: "node", Label: "foo", }, }, []string{"/api/v1/pods"}) f("pod", nil, []Selector{ { Role: "pod", Label: "foo", }, { Role: "pod", Label: "x", Field: "y", }, }, []string{"/api/v1/pods?labelSelector=foo%2Cx&fieldSelector=y"}) f("pod", []string{"x", "y"}, []Selector{ { Role: "pod", Label: "foo", }, { Role: "pod", Label: "x", Field: "y", }, }, []string{ "/api/v1/namespaces/x/pods?labelSelector=foo%2Cx&fieldSelector=y", "/api/v1/namespaces/y/pods?labelSelector=foo%2Cx&fieldSelector=y", }) // role=service f("service", nil, nil, []string{"/api/v1/services"}) f("service", []string{"x", "y"}, nil, []string{ "/api/v1/namespaces/x/services", "/api/v1/namespaces/y/services", }) f("service", nil, []Selector{ { Role: "node", Label: "foo", }, { Role: "service", Field: "bar", }, }, []string{"/api/v1/services?fieldSelector=bar"}) f("service", []string{"x", "y"}, []Selector{ { Role: "service", Label: "abc=de", }, }, []string{ "/api/v1/namespaces/x/services?labelSelector=abc%3Dde", "/api/v1/namespaces/y/services?labelSelector=abc%3Dde", }) // role=endpoints f("endpoints", nil, nil, []string{"/api/v1/endpoints"}) f("endpoints", []string{"x", "y"}, nil, []string{ "/api/v1/namespaces/x/endpoints", "/api/v1/namespaces/y/endpoints", }) f("endpoints", []string{"x", "y"}, []Selector{ { Role: "endpoints", Label: "bbb", }, { Role: "node", Label: "aa", }, }, []string{ "/api/v1/namespaces/x/endpoints?labelSelector=bbb", "/api/v1/namespaces/y/endpoints?labelSelector=bbb", }) // role=endpointslice f("endpointslice", nil, nil, []string{"/apis/discovery.k8s.io/v1/endpointslices"}) f("endpointslice", []string{"x", "y"}, []Selector{ { Role: "endpointslice", Field: "field", Label: "label", }, }, []string{ "/apis/discovery.k8s.io/v1/namespaces/x/endpointslices?labelSelector=label&fieldSelector=field", "/apis/discovery.k8s.io/v1/namespaces/y/endpointslices?labelSelector=label&fieldSelector=field", }) // role=ingress f("ingress", nil, nil, []string{"/apis/networking.k8s.io/v1/ingresses"}) f("ingress", []string{"x", "y"}, []Selector{ { Role: "node", Field: "xyay", }, { Role: "ingress", Field: "abc", }, { Role: "ingress", Label: "cde", }, { Role: "ingress", Label: "baaa", }, }, []string{ "/apis/networking.k8s.io/v1/namespaces/x/ingresses?labelSelector=cde%2Cbaaa&fieldSelector=abc", "/apis/networking.k8s.io/v1/namespaces/y/ingresses?labelSelector=cde%2Cbaaa&fieldSelector=abc", }) } func TestParseBookmark(t *testing.T) { data := `{"kind": "Pod", "apiVersion": "v1", "metadata": {"resourceVersion": "12746"} }` bm, err := parseBookmark([]byte(data)) if err != nil { t.Fatalf("unexpected error: %s", err) } expectedResourceVersion := "12746" if bm.Metadata.ResourceVersion != expectedResourceVersion { t.Fatalf("unexpected resourceVersion; got %q; want %q", bm.Metadata.ResourceVersion, expectedResourceVersion) } } func TestGetScrapeWorkObjects(t *testing.T) { type testCase struct { name string sdc *SDConfig expectedTargetsLen int initAPIObjectsByRole map[string][]byte // will be added for watching api. watchAPIMustAddObjectsByRole map[string][][]byte } cases := []testCase{ { name: "simple 1 pod with update 1", sdc: &SDConfig{ Role: "pod", }, expectedTargetsLen: 2, initAPIObjectsByRole: map[string][]byte{ "pod": []byte(`{ "kind": "PodList", "apiVersion": "v1", "metadata": { "resourceVersion": "72425" }, "items": [ { "apiVersion": "v1", "kind": "Pod", "metadata": { "labels": { "app.kubernetes.io/instance": "stack", "pod-template-hash": "5b9c6cf775" }, "name": "stack-name-1", "namespace": "default" }, "spec": { "containers": [ { "name": "generic-pod" } ] }, "status": { "podIP": "10.10.2.2", "phase": "Running" } }]}`), }, watchAPIMustAddObjectsByRole: map[string][][]byte{ "pod": { []byte(`{ "apiVersion": "v1", "kind": "Pod", "metadata": { "labels": { "app.kubernetes.io/instance": "stack", "pod-template-hash": "5b9c6cf775" }, "name": "stack-next-2", "namespace": "default" }, "spec": { "containers": [ { "name": "generic-pod-2" } ] }, "status": { "podIP": "10.10.2.5", "phase": "Running" } }`), }, }, }, { name: "endpoints with service update", sdc: &SDConfig{ Role: "endpoints", }, expectedTargetsLen: 2, initAPIObjectsByRole: map[string][]byte{ "service": []byte(`{ "kind": "ServiceList", "apiVersion": "v1", "metadata": { "resourceVersion": "72425" }, "items": []}`), "endpoints": []byte(`{ "kind": "EndpointsList", "apiVersion": "v1", "metadata": { "resourceVersion": "72425" }, "items": [ { "apiVersion": "v1", "kind": "Endpoints", "metadata": { "annotations": { "endpoints.kubernetes.io/last-change-trigger-time": "2021-04-27T02:06:55Z" }, "labels": { "app.kubernetes.io/managed-by": "Helm" }, "name": "stack-kube-state-metrics", "namespace": "default" }, "subsets": [ { "addresses": [ { "ip": "10.244.0.5", "nodeName": "kind-control-plane", "targetRef": { "kind": "Pod", "name": "stack-kube-state-metrics-db5879bf8-bg78p", "namespace": "default" } } ], "ports": [ { "name": "http", "port": 8080, "protocol": "TCP" } ] } ] } ]}`), "pod": []byte(`{ "kind": "PodList", "apiVersion": "v1", "metadata": { "resourceVersion": "72425" }, "items": [ { "apiVersion": "v1", "kind": "Pod", "metadata": { "labels": { "app.kubernetes.io/instance": "stack" }, "name": "stack-kube-state-metrics-db5879bf8-bg78p", "namespace": "default" }, "spec": { "containers": [ { "image": "k8s.gcr.io/kube-state-metrics/kube-state-metrics:v1.9.8", "name": "kube-state-metrics", "ports": [ { "containerPort": 8080, "protocol": "TCP" } ] }, { "image": "k8s.gcr.io/kube-state-metrics/kube-state-metrics:v1.9.8", "name": "kube-state-metrics-2", "ports": [ { "containerPort": 8085, "protocol": "TCP" } ] } ] }, "status": { "phase": "Running", "podIP": "10.244.0.5" } } ]}`), }, watchAPIMustAddObjectsByRole: map[string][][]byte{ "service": { []byte(`{ "apiVersion": "v1", "kind": "Service", "metadata": { "annotations": { "meta.helm.sh/release-name": "stack" }, "labels": { "app.kubernetes.io/managed-by": "Helm", "app.kubernetes.io/name": "kube-state-metrics" }, "name": "stack-kube-state-metrics", "namespace": "default" }, "spec": { "clusterIP": "10.97.109.249", "ports": [ { "name": "http", "port": 8080, "protocol": "TCP", "targetPort": 8080 } ], "selector": { "app.kubernetes.io/instance": "stack", "app.kubernetes.io/name": "kube-state-metrics" }, "type": "ClusterIP" } }`), }, }, }, { name: "get nodes", sdc: &SDConfig{Role: "node"}, expectedTargetsLen: 2, initAPIObjectsByRole: map[string][]byte{ "node": []byte(`{ "kind": "NodeList", "apiVersion": "v1", "metadata": { "selfLink": "/api/v1/nodes", "resourceVersion": "22627" }, "items": [ { "apiVersion": "v1", "kind": "Node", "metadata": { "annotations": { "kubeadm.alpha.kubernetes.io/cri-socket": "/run/containerd/containerd.sock" }, "labels": { "beta.kubernetes.io/arch": "amd64", "beta.kubernetes.io/os": "linux" }, "name": "kind-control-plane-new" }, "status": { "addresses": [ { "address": "10.10.2.5", "type": "InternalIP" }, { "address": "kind-control-plane", "type": "Hostname" } ] } } ]}`), }, watchAPIMustAddObjectsByRole: map[string][][]byte{ "node": { []byte(`{ "apiVersion": "v1", "kind": "Node", "metadata": { "annotations": { "kubeadm.alpha.kubernetes.io/cri-socket": "/run/containerd/containerd.sock" }, "labels": { "beta.kubernetes.io/arch": "amd64", "beta.kubernetes.io/os": "linux" }, "name": "kind-control-plane" }, "status": { "addresses": [ { "address": "10.10.2.2", "type": "InternalIP" }, { "address": "kind-control-plane", "type": "Hostname" } ] } }`), }, }, }, { name: "2 service with 2 added", sdc: &SDConfig{Role: "service"}, expectedTargetsLen: 4, initAPIObjectsByRole: map[string][]byte{ "service": []byte(`{ "kind": "ServiceList", "apiVersion": "v1", "metadata": { "selfLink": "/api/v1/services", "resourceVersion": "60485" }, "items": [ { "metadata": { "name": "kube-dns", "namespace": "kube-system", "labels": { "k8s-app": "kube-dns" } }, "spec": { "ports": [ { "name": "dns", "protocol": "UDP", "port": 53, "targetPort": 53 }, { "name": "dns-tcp", "protocol": "TCP", "port": 53, "targetPort": 53 } ], "selector": { "k8s-app": "kube-dns" }, "clusterIP": "10.96.0.10", "type": "ClusterIP", "sessionAffinity": "None" } } ] }`), }, watchAPIMustAddObjectsByRole: map[string][][]byte{ "service": { []byte(`{ "metadata": { "name": "another-service-1", "namespace": "default", "labels": { "k8s-app": "kube-dns" } }, "spec": { "ports": [ { "name": "some-app-1-tcp", "protocol": "TCP", "port": 1053, "targetPort": 1053 } ], "selector": { "k8s-app": "some-app-1" }, "clusterIP": "10.96.0.10", "type": "ClusterIP" } }`), []byte(`{ "metadata": { "name": "another-service-2", "namespace": "default", "labels": { "k8s-app": "kube-dns" } }, "spec": { "ports": [ { "name": "some-app-2-tcp", "protocol": "TCP", "port": 1053, "targetPort": 1053 } ], "selector": { "k8s-app": "some-app-2" }, "clusterIP": "10.96.0.15", "type": "ClusterIP" } }`), }, }, }, { name: "1 ingress with 2 add", expectedTargetsLen: 3, sdc: &SDConfig{ Role: "ingress", }, initAPIObjectsByRole: map[string][]byte{ "ingress": []byte(`{ "kind": "IngressList", "apiVersion": "extensions/v1", "metadata": { "selfLink": "/apis/extensions/v1/ingresses", "resourceVersion": "351452" }, "items": [ { "metadata": { "name": "test-ingress", "namespace": "default" }, "spec": { "backend": { "serviceName": "testsvc", "servicePort": 80 }, "rules": [ { "host": "foobar" } ] }, "status": { "loadBalancer": { "ingress": [ { "ip": "172.17.0.2" } ] } } } ] }`), }, watchAPIMustAddObjectsByRole: map[string][][]byte{ "ingress": { []byte(`{ "metadata": { "name": "test-ingress-1", "namespace": "default" }, "spec": { "backend": { "serviceName": "testsvc", "servicePort": 801 }, "rules": [ { "host": "foobar" } ] }, "status": { "loadBalancer": { "ingress": [ { "ip": "172.17.0.3" } ] } } }`), []byte(`{ "metadata": { "name": "test-ingress-2", "namespace": "default" }, "spec": { "backend": { "serviceName": "testsvc", "servicePort": 802 }, "rules": [ { "host": "foobar" } ] }, "status": { "loadBalancer": { "ingress": [ { "ip": "172.17.0.3" } ] } } }`), }, }, }, { name: "7 endpointslices slice with 1 service update", sdc: &SDConfig{ Role: "endpointslice", }, expectedTargetsLen: 7, initAPIObjectsByRole: map[string][]byte{ "endpointslice": []byte(`{ "kind": "EndpointSliceList", "apiVersion": "discovery.k8s.io/v1", "metadata": { "selfLink": "/apis/discovery.k8s.io/v1/endpointslices", "resourceVersion": "1177" }, "items": [ { "metadata": { "name": "kubernetes", "namespace": "default", "labels": { "kubernetes.io/service-name": "kubernetes" } }, "addressType": "IPv4", "endpoints": [ { "addresses": [ "172.18.0.2" ], "conditions": { "ready": true } } ], "ports": [ { "name": "https", "protocol": "TCP", "port": 6443 } ] }, { "metadata": { "name": "kube-dns", "namespace": "kube-system", "labels": { "kubernetes.io/service-name": "kube-dns" } }, "addressType": "IPv4", "endpoints": [ { "addresses": [ "10.244.0.3" ], "conditions": { "ready": true }, "targetRef": { "kind": "Pod", "namespace": "kube-system", "name": "coredns-66bff467f8-z8czk", "uid": "36a545ff-dbba-4192-a5f6-1dbb0c21c73d", "resourceVersion": "603" }, "topology": { "kubernetes.io/hostname": "kind-control-plane" } }, { "addresses": [ "10.244.0.4" ], "conditions": { "ready": true }, "targetRef": { "kind": "Pod", "namespace": "kube-system", "name": "coredns-66bff467f8-kpbhk", "uid": "db38d8b4-847a-4e82-874c-fe444fba2718", "resourceVersion": "576" }, "topology": { "kubernetes.io/hostname": "kind-control-plane" } } ], "ports": [ { "name": "dns-tcp", "protocol": "TCP", "port": 53 }, { "name": "metrics", "protocol": "TCP", "port": 9153 }, { "name": "dns", "protocol": "UDP", "port": 53 } ] } ] }`), "pod": []byte(`{ "kind": "PodList", "apiVersion": "v1", "metadata": { "resourceVersion": "72425" }, "items": [ { "apiVersion": "v1", "kind": "Pod", "metadata": { "labels": { "app.kubernetes.io/instance": "stack", "pod-template-hash": "5b9c6cf775" }, "name": "coredns-66bff467f8-kpbhk", "namespace": "kube-system" }, "spec": { "containers": [ { "name": "generic-pod" } ] }, "status": { "podIP": "10.10.2.2", "phase": "Running" } }, { "apiVersion": "v1", "kind": "Pod", "metadata": { "labels": { "app.kubernetes.io/instance": "stack", "pod-template-hash": "5b9c6cf775" }, "name": "coredns-66bff467f8-z8czk", "namespace": "kube-system" }, "spec": { "containers": [ { "name": "generic-pod" } ] }, "status": { "podIP": "10.10.2.3", "phase": "Running" } } ]}`), "service": []byte(`{ "kind": "ServiceList", "apiVersion": "v1", "metadata": { "selfLink": "/api/v1/services", "resourceVersion": "60485" }, "items": [ { "metadata": { "name": "kube-dns", "namespace": "kube-system", "labels": { "k8s-app": "kube-dns" } }, "spec": { "ports": [ { "name": "dns", "protocol": "UDP", "port": 53, "targetPort": 53 }, { "name": "dns-tcp", "protocol": "TCP", "port": 53, "targetPort": 53 } ], "selector": { "k8s-app": "kube-dns" }, "clusterIP": "10.96.0.10", "type": "ClusterIP", "sessionAffinity": "None" } } ] }`), }, watchAPIMustAddObjectsByRole: map[string][][]byte{ "service": { []byte(` { "metadata": { "name": "kube-dns", "namespace": "kube-system", "labels": { "k8s-app": "kube-dns", "some-new": "label-value" } }, "spec": { "ports": [ { "name": "dns-tcp", "protocol": "TCP", "port": 53, "targetPort": 53 } ], "selector": { "k8s-app": "kube-dns" }, "clusterIP": "10.96.0.10", "type": "ClusterIP", "sessionAffinity": "None" } } `), }, }, }, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { watchPublishersByRole := make(map[string]*watchObjectBroadcast) mux := http.NewServeMux() for role, obj := range tc.initAPIObjectsByRole { watchBroadCaster := &watchObjectBroadcast{} watchPublishersByRole[role] = watchBroadCaster apiPath := getAPIPath(getObjectTypeByRole(role), "", "") addAPIURLHandler(t, mux, apiPath, obj, watchBroadCaster) } testAPIServer := httptest.NewServer(mux) tc.sdc.APIServer = testAPIServer.URL ac, err := newAPIConfig(tc.sdc, "", func(metaLabels *promutils.Labels) interface{} { var res []interface{} for _, label := range metaLabels.Labels { res = append(res, label.Name) } return res }) if err != nil { t.Fatalf("unexpected error: %v", err) } tc.sdc.cfg = ac ac.aw.mustStart() defer ac.aw.mustStop() _, err = tc.sdc.GetScrapeWorkObjects() if err != nil { t.Fatalf("unexpected error: %v", err) } // need to wait, for subscribers to start. time.Sleep(80 * time.Millisecond) for role, objs := range tc.watchAPIMustAddObjectsByRole { for _, obj := range objs { watchPublishersByRole[role].pub(obj) } } for _, ch := range watchPublishersByRole { ch.shutdown() } if len(tc.watchAPIMustAddObjectsByRole) > 0 { // updates async, need to wait some time. // i guess, poll is not reliable. time.Sleep(80 * time.Millisecond) } got, err := tc.sdc.GetScrapeWorkObjects() if err != nil { t.Fatalf("unexpected error: %v", err) } if len(got) != tc.expectedTargetsLen { t.Fatalf("unexpected count of objects, got: %d, want: %d", len(got), tc.expectedTargetsLen) } }) } } type watchObjectBroadcast struct { mu sync.Mutex subscribers []chan []byte } func (o *watchObjectBroadcast) pub(msg []byte) { o.mu.Lock() defer o.mu.Unlock() for i := range o.subscribers { c := o.subscribers[i] select { case c <- msg: default: } } } func (o *watchObjectBroadcast) sub() <-chan []byte { c := make(chan []byte, 5) o.mu.Lock() o.subscribers = append(o.subscribers, c) o.mu.Unlock() return c } func (o *watchObjectBroadcast) shutdown() { o.mu.Lock() defer o.mu.Unlock() for i := range o.subscribers { c := o.subscribers[i] close(c) } } func addAPIURLHandler(t *testing.T, mux *http.ServeMux, apiURL string, initObjects []byte, notifier *watchObjectBroadcast) { t.Helper() mux.HandleFunc(apiURL, func(w http.ResponseWriter, r *http.Request) { if needWatch := r.URL.Query().Get("watch"); len(needWatch) > 0 { // start watch handler w.WriteHeader(200) flusher := w.(http.Flusher) flusher.Flush() updateC := notifier.sub() for obj := range updateC { we := WatchEvent{ Type: "ADDED", Object: obj, } szd, err := json.Marshal(we) if err != nil { t.Fatalf("cannot serialize: %v", err) } _, _ = w.Write(szd) flusher.Flush() } return } w.WriteHeader(200) _, _ = w.Write(initObjects) }) }