diff --git a/lib/promscrape/discovery/kubernetes/api_watcher_test.go b/lib/promscrape/discovery/kubernetes/api_watcher_test.go index f971512cf..e0262cea5 100644 --- a/lib/promscrape/discovery/kubernetes/api_watcher_test.go +++ b/lib/promscrape/discovery/kubernetes/api_watcher_test.go @@ -1,8 +1,13 @@ package kubernetes import ( + "encoding/json" + "net/http" + "net/http/httptest" "reflect" + "sync" "testing" + "time" ) func TestGetAPIPathsWithNamespaces(t *testing.T) { @@ -175,3 +180,849 @@ func TestParseBookmark(t *testing.T) { t.Fatalf("unexpected resourceVersion; got %q; want %q", bm.Metadata.ResourceVersion, expectedResourceVersion) } } + +func TestGetScrapeWorkObjects(t *testing.T) { + type testCase struct { + name string + sdc *SDConfig + expectedTargetsLen int + initAPIObjectsByRole map[string][]byte + // will be added for watching api. + watchAPIMustAddObjectsByRole map[string][][]byte + } + cases := []testCase{ + { + name: "simple 1 pod with update 1", + sdc: &SDConfig{ + Role: "pod", + }, + expectedTargetsLen: 2, + initAPIObjectsByRole: map[string][]byte{ + "pod": []byte(`{ + "kind": "PodList", + "apiVersion": "v1", + "metadata": { + "resourceVersion": "72425" + }, + "items": [ +{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": { + "labels": { + "app.kubernetes.io/instance": "stack", + "pod-template-hash": "5b9c6cf775" + }, + "name": "stack-name-1", + "namespace": "default" + }, + "spec": { + "containers": [ + { + "name": "generic-pod" + } + ] + }, + "status": { + "podIP": "10.10.2.2", + "phase": "Running" + } +}]}`), + }, + watchAPIMustAddObjectsByRole: map[string][][]byte{ + "pod": { + []byte(`{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": { + "labels": { + "app.kubernetes.io/instance": "stack", + "pod-template-hash": "5b9c6cf775" + }, + "name": "stack-next-2", + "namespace": "default" + }, + "spec": { + "containers": [ + { + "name": "generic-pod-2" + } + ] + }, + "status": { + "podIP": "10.10.2.5", + "phase": "Running" + } +}`), + }, + }, + }, + { + name: "endpoints with service update", + sdc: &SDConfig{ + Role: "endpoints", + }, + expectedTargetsLen: 2, + initAPIObjectsByRole: map[string][]byte{ + "service": []byte(`{ + "kind": "ServiceList", + "apiVersion": "v1", + "metadata": { + "resourceVersion": "72425" + }, + "items": []}`), + "endpoints": []byte(`{ + "kind": "EndpointsList", + "apiVersion": "v1", + "metadata": { + "resourceVersion": "72425" + }, + "items": [ +{ + "apiVersion": "v1", + "kind": "Endpoints", + "metadata": { + "annotations": { + "endpoints.kubernetes.io/last-change-trigger-time": "2021-04-27T02:06:55Z" + }, + "labels": { + "app.kubernetes.io/managed-by": "Helm" + }, + "name": "stack-kube-state-metrics", + "namespace": "default" + }, + "subsets": [ + { + "addresses": [ + { + "ip": "10.244.0.5", + "nodeName": "kind-control-plane", + "targetRef": { + "kind": "Pod", + "name": "stack-kube-state-metrics-db5879bf8-bg78p", + "namespace": "default" + } + } + ], + "ports": [ + { + "name": "http", + "port": 8080, + "protocol": "TCP" + } + ] + } + ] +} +]}`), + "pod": []byte(`{ + "kind": "PodList", + "apiVersion": "v1", + "metadata": { + "resourceVersion": "72425" + }, + "items": [ +{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": { + "labels": { + "app.kubernetes.io/instance": "stack" + }, + "name": "stack-kube-state-metrics-db5879bf8-bg78p", + "namespace": "default" + }, + "spec": { + "containers": [ + { + "image": "k8s.gcr.io/kube-state-metrics/kube-state-metrics:v1.9.8", + "name": "kube-state-metrics", + "ports": [ + { + "containerPort": 8080, + "protocol": "TCP" + } + ] + }, + { + "image": "k8s.gcr.io/kube-state-metrics/kube-state-metrics:v1.9.8", + "name": "kube-state-metrics-2", + "ports": [ + { + "containerPort": 8085, + "protocol": "TCP" + } + ] + } + ] + }, + "status": { + "phase": "Running", + "podIP": "10.244.0.5" + } +} +]}`), + }, + watchAPIMustAddObjectsByRole: map[string][][]byte{ + "service": { + []byte(`{ + "apiVersion": "v1", + "kind": "Service", + "metadata": { + "annotations": { + "meta.helm.sh/release-name": "stack" + }, + "labels": { + "app.kubernetes.io/managed-by": "Helm", + "app.kubernetes.io/name": "kube-state-metrics" + }, + "name": "stack-kube-state-metrics", + "namespace": "default" + }, + "spec": { + "clusterIP": "10.97.109.249", + "ports": [ + { + "name": "http", + "port": 8080, + "protocol": "TCP", + "targetPort": 8080 + } + ], + "selector": { + "app.kubernetes.io/instance": "stack", + "app.kubernetes.io/name": "kube-state-metrics" + }, + "type": "ClusterIP" + } +}`), + }, + }, + }, + { + name: "get nodes", + sdc: &SDConfig{Role: "node"}, + expectedTargetsLen: 2, + initAPIObjectsByRole: map[string][]byte{ + "node": []byte(`{ + "kind": "NodeList", + "apiVersion": "v1", + "metadata": { + "selfLink": "/api/v1/nodes", + "resourceVersion": "22627" + }, + "items": [ +{ + "apiVersion": "v1", + "kind": "Node", + "metadata": { + "annotations": { + "kubeadm.alpha.kubernetes.io/cri-socket": "/run/containerd/containerd.sock" + }, + "labels": { + "beta.kubernetes.io/arch": "amd64", + "beta.kubernetes.io/os": "linux" + }, + "name": "kind-control-plane-new" + }, + "status": { + "addresses": [ + { + "address": "10.10.2.5", + "type": "InternalIP" + }, + { + "address": "kind-control-plane", + "type": "Hostname" + } + ] + } +} +]}`), + }, + watchAPIMustAddObjectsByRole: map[string][][]byte{ + "node": { + []byte(`{ + "apiVersion": "v1", + "kind": "Node", + "metadata": { + "annotations": { + "kubeadm.alpha.kubernetes.io/cri-socket": "/run/containerd/containerd.sock" + }, + "labels": { + "beta.kubernetes.io/arch": "amd64", + "beta.kubernetes.io/os": "linux" + }, + "name": "kind-control-plane" + }, + "status": { + "addresses": [ + { + "address": "10.10.2.2", + "type": "InternalIP" + }, + { + "address": "kind-control-plane", + "type": "Hostname" + } + ] + } +}`), + }, + }, + }, + { + name: "2 service with 2 added", + sdc: &SDConfig{Role: "service"}, + expectedTargetsLen: 4, + initAPIObjectsByRole: map[string][]byte{ + "service": []byte(`{ + "kind": "ServiceList", + "apiVersion": "v1", + "metadata": { + "selfLink": "/api/v1/services", + "resourceVersion": "60485" + }, + "items": [ + { + "metadata": { + "name": "kube-dns", + "namespace": "kube-system", + "labels": { + "k8s-app": "kube-dns" + } + }, + "spec": { + "ports": [ + { + "name": "dns", + "protocol": "UDP", + "port": 53, + "targetPort": 53 + }, + { + "name": "dns-tcp", + "protocol": "TCP", + "port": 53, + "targetPort": 53 + } + ], + "selector": { + "k8s-app": "kube-dns" + }, + "clusterIP": "10.96.0.10", + "type": "ClusterIP", + "sessionAffinity": "None" + } + } + ] +}`), + }, + watchAPIMustAddObjectsByRole: map[string][][]byte{ + "service": { + []byte(`{ + "metadata": { + "name": "another-service-1", + "namespace": "default", + "labels": { + "k8s-app": "kube-dns" + } + }, + "spec": { + "ports": [ + { + "name": "some-app-1-tcp", + "protocol": "TCP", + "port": 1053, + "targetPort": 1053 + } + ], + "selector": { + "k8s-app": "some-app-1" + }, + "clusterIP": "10.96.0.10", + "type": "ClusterIP" + } +}`), + []byte(`{ + "metadata": { + "name": "another-service-2", + "namespace": "default", + "labels": { + "k8s-app": "kube-dns" + } + }, + "spec": { + "ports": [ + { + "name": "some-app-2-tcp", + "protocol": "TCP", + "port": 1053, + "targetPort": 1053 + } + ], + "selector": { + "k8s-app": "some-app-2" + }, + "clusterIP": "10.96.0.15", + "type": "ClusterIP" + } +}`), + }, + }, + }, + { + name: "1 ingress with 2 add", + expectedTargetsLen: 3, + sdc: &SDConfig{ + Role: "ingress", + }, + initAPIObjectsByRole: map[string][]byte{ + "ingress": []byte(`{ + "kind": "IngressList", + "apiVersion": "extensions/v1beta1", + "metadata": { + "selfLink": "/apis/extensions/v1beta1/ingresses", + "resourceVersion": "351452" + }, + "items": [ + { + "metadata": { + "name": "test-ingress", + "namespace": "default" + }, + "spec": { + "backend": { + "serviceName": "testsvc", + "servicePort": 80 + }, + "rules": [ + { + "host": "foobar" + } + ] + }, + "status": { + "loadBalancer": { + "ingress": [ + { + "ip": "172.17.0.2" + } + ] + } + } + } + ] +}`), + }, + watchAPIMustAddObjectsByRole: map[string][][]byte{ + "ingress": { + []byte(`{ + "metadata": { + "name": "test-ingress-1", + "namespace": "default" + }, + "spec": { + "backend": { + "serviceName": "testsvc", + "servicePort": 801 + }, + "rules": [ + { + "host": "foobar" + } + ] + }, + "status": { + "loadBalancer": { + "ingress": [ + { + "ip": "172.17.0.3" + } + ] + } + } +}`), + []byte(`{ + "metadata": { + "name": "test-ingress-2", + "namespace": "default" + }, + "spec": { + "backend": { + "serviceName": "testsvc", + "servicePort": 802 + }, + "rules": [ + { + "host": "foobar" + } + ] + }, + "status": { + "loadBalancer": { + "ingress": [ + { + "ip": "172.17.0.3" + } + ] + } + } +}`), + }, + }, + }, + { + name: "7 endpointslices slice with 1 service update", + sdc: &SDConfig{ + Role: "endpointslices", + }, + expectedTargetsLen: 7, + initAPIObjectsByRole: map[string][]byte{ + "endpointslices": []byte(`{ + "kind": "EndpointSliceList", + "apiVersion": "discovery.k8s.io/v1beta1", + "metadata": { + "selfLink": "/apis/discovery.k8s.io/v1beta1/endpointslices", + "resourceVersion": "1177" + }, + "items": [ + { + "metadata": { + "name": "kubernetes", + "namespace": "default", + "labels": { + "kubernetes.io/service-name": "kubernetes" + } + }, + "addressType": "IPv4", + "endpoints": [ + { + "addresses": [ + "172.18.0.2" + ], + "conditions": { + "ready": true + } + } + ], + "ports": [ + { + "name": "https", + "protocol": "TCP", + "port": 6443 + } + ] + }, + { + "metadata": { + "name": "kube-dns", + "namespace": "kube-system", + "labels": { + "kubernetes.io/service-name": "kube-dns" + } + }, + "addressType": "IPv4", + "endpoints": [ + { + "addresses": [ + "10.244.0.3" + ], + "conditions": { + "ready": true + }, + "targetRef": { + "kind": "Pod", + "namespace": "kube-system", + "name": "coredns-66bff467f8-z8czk", + "uid": "36a545ff-dbba-4192-a5f6-1dbb0c21c73d", + "resourceVersion": "603" + }, + "topology": { + "kubernetes.io/hostname": "kind-control-plane" + } + }, + { + "addresses": [ + "10.244.0.4" + ], + "conditions": { + "ready": true + }, + "targetRef": { + "kind": "Pod", + "namespace": "kube-system", + "name": "coredns-66bff467f8-kpbhk", + "uid": "db38d8b4-847a-4e82-874c-fe444fba2718", + "resourceVersion": "576" + }, + "topology": { + "kubernetes.io/hostname": "kind-control-plane" + } + } + ], + "ports": [ + { + "name": "dns-tcp", + "protocol": "TCP", + "port": 53 + }, + { + "name": "metrics", + "protocol": "TCP", + "port": 9153 + }, + { + "name": "dns", + "protocol": "UDP", + "port": 53 + } + ] + } + ] +}`), + "pod": []byte(`{ + "kind": "PodList", + "apiVersion": "v1", + "metadata": { + "resourceVersion": "72425" + }, + "items": [ +{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": { + "labels": { + "app.kubernetes.io/instance": "stack", + "pod-template-hash": "5b9c6cf775" + }, + "name": "coredns-66bff467f8-kpbhk", + "namespace": "kube-system" + }, + "spec": { + "containers": [ + { + "name": "generic-pod" + } + ] + }, + "status": { + "podIP": "10.10.2.2", + "phase": "Running" + } +}, +{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": { + "labels": { + "app.kubernetes.io/instance": "stack", + "pod-template-hash": "5b9c6cf775" + }, + "name": "coredns-66bff467f8-z8czk", + "namespace": "kube-system" + }, + "spec": { + "containers": [ + { + "name": "generic-pod" + } + ] + }, + "status": { + "podIP": "10.10.2.3", + "phase": "Running" + } +} +]}`), + "service": []byte(`{ + "kind": "ServiceList", + "apiVersion": "v1", + "metadata": { + "selfLink": "/api/v1/services", + "resourceVersion": "60485" + }, + "items": [ + { + "metadata": { + "name": "kube-dns", + "namespace": "kube-system", + "labels": { + "k8s-app": "kube-dns" + } + }, + "spec": { + "ports": [ + { + "name": "dns", + "protocol": "UDP", + "port": 53, + "targetPort": 53 + }, + { + "name": "dns-tcp", + "protocol": "TCP", + "port": 53, + "targetPort": 53 + } + ], + "selector": { + "k8s-app": "kube-dns" + }, + "clusterIP": "10.96.0.10", + "type": "ClusterIP", + "sessionAffinity": "None" + } + } + ] +}`), + }, + watchAPIMustAddObjectsByRole: map[string][][]byte{ + "service": { + []byte(` { + "metadata": { + "name": "kube-dns", + "namespace": "kube-system", + "labels": { + "k8s-app": "kube-dns", + "some-new": "label-value" + } + }, + "spec": { + "ports": [ + { + "name": "dns-tcp", + "protocol": "TCP", + "port": 53, + "targetPort": 53 + } + ], + "selector": { + "k8s-app": "kube-dns" + }, + "clusterIP": "10.96.0.10", + "type": "ClusterIP", + "sessionAffinity": "None" + } + } +`), + }, + }, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + watchPublishersByRole := make(map[string]*watchObjectBroadcast) + mux := http.NewServeMux() + for role, obj := range tc.initAPIObjectsByRole { + watchBroadCaster := &watchObjectBroadcast{} + watchPublishersByRole[role] = watchBroadCaster + apiPath := getAPIPath(getObjectTypeByRole(role), "", "") + addAPIURLHandler(t, mux, apiPath, obj, watchBroadCaster) + } + testAPIServer := httptest.NewServer(mux) + tc.sdc.APIServer = testAPIServer.URL + ac, err := newAPIConfig(tc.sdc, "", func(metaLabels map[string]string) interface{} { + var res []interface{} + for k := range metaLabels { + res = append(res, k) + } + return res + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + tc.sdc.cfg = ac + ac.aw.mustStart() + defer ac.aw.mustStop() + _, err = tc.sdc.GetScrapeWorkObjects() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + // need to wait, for subscribers to start. + time.Sleep(80 * time.Millisecond) + for role, objs := range tc.watchAPIMustAddObjectsByRole { + for _, obj := range objs { + watchPublishersByRole[role].pub(obj) + } + } + for _, ch := range watchPublishersByRole { + ch.shutdown() + } + if len(tc.watchAPIMustAddObjectsByRole) > 0 { + // updates async, need to wait some time. + // i guess, poll is not reliable. + time.Sleep(80 * time.Millisecond) + } + got, err := tc.sdc.GetScrapeWorkObjects() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(got) != tc.expectedTargetsLen { + t.Fatalf("unexpected count of objects, got: %d, want: %d", len(got), tc.expectedTargetsLen) + } + }) + } +} + +type watchObjectBroadcast struct { + mu sync.Mutex + subscribers []chan []byte +} + +func (o *watchObjectBroadcast) pub(msg []byte) { + o.mu.Lock() + defer o.mu.Unlock() + for i := range o.subscribers { + c := o.subscribers[i] + select { + case c <- msg: + default: + } + } +} + +func (o *watchObjectBroadcast) sub() <-chan []byte { + c := make(chan []byte, 5) + o.mu.Lock() + o.subscribers = append(o.subscribers, c) + o.mu.Unlock() + return c +} + +func (o *watchObjectBroadcast) shutdown() { + o.mu.Lock() + defer o.mu.Unlock() + for i := range o.subscribers { + c := o.subscribers[i] + close(c) + } +} + +func addAPIURLHandler(t *testing.T, mux *http.ServeMux, apiURL string, initObjects []byte, notifier *watchObjectBroadcast) { + t.Helper() + mux.HandleFunc(apiURL, func(w http.ResponseWriter, r *http.Request) { + if needWatch := r.URL.Query().Get("watch"); len(needWatch) > 0 { + // start watch handler + w.WriteHeader(200) + flusher := w.(http.Flusher) + flusher.Flush() + updateC := notifier.sub() + for obj := range updateC { + we := WatchEvent{ + Type: "ADDED", + Object: obj, + } + szd, err := json.Marshal(we) + if err != nil { + t.Fatalf("cannot serialize: %v", err) + } + _, _ = w.Write(szd) + flusher.Flush() + } + return + } + w.WriteHeader(200) + _, _ = w.Write(initObjects) + }) +}