From 00deb69e28e10e37288738f877db7b37a7824d60 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 24 Apr 2021 02:28:53 +0300 Subject: [PATCH 01/16] docs: ordering fix --- docs/Articles.md | 2 +- docs/Home.md | 2 +- docs/Release-Guide.md | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/Articles.md b/docs/Articles.md index 7995d47874..68b0f95c17 100644 --- a/docs/Articles.md +++ b/docs/Articles.md @@ -1,5 +1,5 @@ --- -sort: 18 +sort: 16 --- # Articles diff --git a/docs/Home.md b/docs/Home.md index a7d5a718b3..66adcb8034 100644 --- a/docs/Home.md +++ b/docs/Home.md @@ -1,5 +1,5 @@ --- -sort: 20 +sort: 21 --- # Docs diff --git a/docs/Release-Guide.md b/docs/Release-Guide.md index 0eb6e00928..cf3fdc7f77 100644 --- a/docs/Release-Guide.md +++ b/docs/Release-Guide.md @@ -1,5 +1,5 @@ --- -sort: 16 +sort: 18 --- # Release process guidance From d6f44977a7b73f5c9a489633eb3b1e42757ec304 Mon Sep 17 00:00:00 2001 From: Roman Khavronenko Date: Sun, 25 Apr 2021 09:34:28 +0100 Subject: [PATCH 02/16] docs: update per tenant stats page (#1246) --- docs/PerTenantStatistic.md | 53 +++++++++++++++++++++----------------- 1 file changed, 30 insertions(+), 23 deletions(-) diff --git a/docs/PerTenantStatistic.md b/docs/PerTenantStatistic.md index 29af7172e4..c9dbad2a25 100644 --- a/docs/PerTenantStatistic.md +++ b/docs/PerTenantStatistic.md @@ -2,31 +2,35 @@ sort: 19 --- -## VictoriaMetrics Cluster Per Tenant Statistic +# VictoriaMetrics Cluster Per Tenant Statistic cluster-per-tenant-stat -The enterprise version of VictoriaMetrics cluster exposes the usage statistics for each tenant. - -When the next statistic is exposed: - +VictoriaMetrics cluster for enterprise provides various metrics and statistics usage per tenant: - `vminsert` - - * `vm_tenant_inserted_rows_total` - the ingestion rate by tenant + * `vm_tenant_inserted_rows_total` - total number of inserted rows. Find out which tenant + puts the most of the pressure on the storage. + - `vmselect` - - * `vm_tenant_select_requests_duration_ms_total` - query latency by tenant. It can be useful to identify the tenant with the heaviest queries - * `vm_tenant_select_requests_total` - total requests. You can calculate request rate (qps) with this metric + * `vm_tenant_select_requests_duration_ms_total` - query latency. + Helps to identify tenants with the heaviest queries. + * `vm_tenant_select_requests_total` - total number of requests. + Discover which tenant sends the most of the queries and how it changes with time. - `vmstorage` - * `vm_tenant_active_timeseries` - the number of active timeseries - * `vm_tenant_used_tenant_bytes` - the disk space consumed by the metrics for a particular tenant - * `vm_tenant_timeseries_created_total` - the total number for timeseries by tenant + * `vm_tenant_active_timeseries` - number of active time series. + This metric correlates with memory usage, so can be used to find the most expensive + tenant in terms of memory. + * `vm_tenant_used_tenant_bytes` - disk space usage. Helps to track disk space usage + per tenant. + * `vm_tenant_timeseries_created_total` - number of new time series created. Helps to track + the churn rate per tenant, or identify inefficient usage of the system. +Collect the metrics by any scrape agent you like (`vmagent`, `victoriametrics`, Prometheus, etc) and put into TSDB. +It is ok to use existing cluster for storing such metrics, but make sure to use a different tenant for it to avoid collisions. +Or just run a separate TSDB (VM single, Promethes, etc.) to keep the data isolated from the main cluster. -The information should be scraped by the agent (`vmagent`, `victoriametrics`, prometheus, etc) and stored in the TSDB. This can be the same cluster but a different tenant however, we encourage the use of one more instance of TSDB (more lightweight, eg. VM single) for the monitoring of monitoring. - -the config example for statistic scraping +Example of the scraping configuration for statistic is the following: ```yaml scrape_configs: @@ -36,20 +40,23 @@ scrape_configs: - targets: ['vmselect:8481','vmstorage:8482','vminsert:8480'] ``` -### Visualization +## Visualization -Visualisation of statistics can be done in Grafana using this dashboard [link](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/cluster/dashboards/clusterbytenant.json) +Visualisation of statistics can be done in Grafana using the following +[dashboard](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/cluster/dashboards/clusterbytenant.json). -### Integration with vmgateway +## Integration with vmgateway -Per Tenant Statistics are the source data for the `vmgateway` rate limiter. More information can be found [here](https://docs.victoriametrics.com/vmgateway.html) +`vmgateway` supports integration with Per Tenant Statistics data for rate limiting purposes. +More information can be found [here](https://docs.victoriametrics.com/vmgateway.html) -### Integration with vmalert +## Integration with vmalert -You can generate alerts based on each tenants' resource usage and notify the system/users that they are reaching the limits. +You can generate alerts based on each tenant's resource usage and send notifications +to prevent limits exhaustion. -Here is an example of an alert for high churn rate by the tenant +Here is an alert example for high churn rate by the tenant: ```yaml From 87018650ddabdbbcf1946d36393ad10ae56fb6dd Mon Sep 17 00:00:00 2001 From: Roman Khavronenko Date: Sun, 25 Apr 2021 23:03:22 +0100 Subject: [PATCH 03/16] vmalert: keep the returned timestamp when persisting recording rule (#1245) Previously, vmalert used `lastExecTime` timestamp when writing recording rules to the remote storage. This may be incorrect, if vmalert uses `datasource.lookback` flag, which means rule's expression will be executed at some moment in the past. To avoid such situations, vmalert now will use returned timestamp instead of `lastExecTime`. --- app/vmalert/recording.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/vmalert/recording.go b/app/vmalert/recording.go index 4f075cdf12..5e8f780f07 100644 --- a/app/vmalert/recording.go +++ b/app/vmalert/recording.go @@ -100,7 +100,7 @@ func (rr *RecordingRule) Exec(ctx context.Context, q datasource.Querier, series duplicates := make(map[uint64]prompbmarshal.TimeSeries, len(qMetrics)) var tss []prompbmarshal.TimeSeries for _, r := range qMetrics { - ts := rr.toTimeSeries(r, rr.lastExecTime) + ts := rr.toTimeSeries(r, time.Unix(r.Timestamp, 0)) h := hashTimeSeries(ts) if _, ok := duplicates[h]; ok { rr.lastExecError = errDuplicate From 60947fb2d59d3b55ea1a8bc9fce08e8ff79759c5 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 27 Apr 2021 00:23:25 +0300 Subject: [PATCH 04/16] lib/persistentqueue: eliminate possible data race when obtaining vm_persistentqueue_bytes_pending metric value --- docs/CHANGELOG.md | 1 + lib/persistentqueue/fastqueue.go | 8 ++++++++ lib/persistentqueue/persistentqueue.go | 5 ----- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 6003ded54e..bfd6364958 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -12,6 +12,7 @@ Thanks to @johnseekins! * BUGFIX: vmagent: properly update `role: endpoints` and `role: endpointslices` scrape targets if the underlying service changes in `kubernetes_sd_config`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240). * BUGFIX: vmagent: apply `scrape_timeout` on receiving the first response byte from `stream_parse: true` scrape targets. Previously it was applied to receiving and *processing* the full response stream. This could result in false timeout errors when scrape target exposes millions of metrics as described [here](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1017#issuecomment-767235047). +* BUGFIX: vmagent: eliminate possible data race when obtaining value for the metric `vm_persistentqueue_bytes_pending`. The data race could result in incorrect value for this metric. * BUGFIX: vmstorage: remove empty directories on startup. Such directories can be left after unclean shutdown on NFS storage. Previously such directories could lead to crashloop until manually removed. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1142). diff --git a/lib/persistentqueue/fastqueue.go b/lib/persistentqueue/fastqueue.go index 7dda73a862..9c99f2b01b 100644 --- a/lib/persistentqueue/fastqueue.go +++ b/lib/persistentqueue/fastqueue.go @@ -1,11 +1,13 @@ package persistentqueue import ( + "fmt" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/metrics" ) // FastQueue is fast persistent queue, which prefers sending data via memory. @@ -47,6 +49,12 @@ func MustOpenFastQueue(path, name string, maxInmemoryBlocks, maxPendingBytes int } fq.cond.L = &fq.mu fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp() + _ = metrics.GetOrCreateGauge(fmt.Sprintf(`vm_persistentqueue_bytes_pending{path=%q}`, path), func() float64 { + fq.mu.Lock() + n := fq.pq.GetPendingBytes() + fq.mu.Unlock() + return float64(n) + }) pendingBytes := fq.GetPendingBytes() logger.Infof("opened fast persistent queue at %q with maxInmemoryBlocks=%d, it contains %d pending bytes", path, maxInmemoryBlocks, pendingBytes) return fq diff --git a/lib/persistentqueue/persistentqueue.go b/lib/persistentqueue/persistentqueue.go index 4c082a9f87..474c559a53 100644 --- a/lib/persistentqueue/persistentqueue.go +++ b/lib/persistentqueue/persistentqueue.go @@ -61,8 +61,6 @@ type queue struct { blocksRead *metrics.Counter bytesRead *metrics.Counter - - bytesPending *metrics.Gauge } // ResetIfEmpty resets q if it is empty. @@ -173,9 +171,6 @@ func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingB q.bytesWritten = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_bytes_written_total{path=%q}`, path)) q.blocksRead = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_blocks_read_total{path=%q}`, path)) q.bytesRead = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_bytes_read_total{path=%q}`, path)) - q.bytesPending = metrics.GetOrCreateGauge(fmt.Sprintf(`vm_persistentqueue_bytes_pending{path=%q}`, path), func() float64 { - return float64(q.GetPendingBytes()) - }) cleanOnError := func() { if q.reader != nil { From f89c1f7f49d074dca5dcaf85f9ef42a0e402486a Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 27 Apr 2021 13:31:48 +0300 Subject: [PATCH 05/16] lib/storage: typo fix in info message when deleting the part outside the configured retention Previously the message was displaying incorrect retention time --- lib/storage/partition.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 0196b916b9..3d7f43fac8 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -1363,7 +1363,7 @@ func (pt *partition) removeStaleParts() { pt.snapshotLock.RLock() var removeWG sync.WaitGroup for pw := range m { - logger.Infof("removing part %q, since its data is out of the configured retention (%d secs)", pw.p.path, retentionDeadline/1000) + logger.Infof("removing part %q, since its data is out of the configured retention (%d secs)", pw.p.path, pt.retentionMsecs/1000) removeWG.Add(1) fs.MustRemoveAllWithDoneCallback(pw.p.path, removeWG.Done) } From 2ec7d8b38441548dba6bc4d4c0690604d3412395 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 27 Apr 2021 14:57:48 +0300 Subject: [PATCH 06/16] lib/promscrape/discovery/kubernetes: fix a deadlock introduced in eddba29664eb63214ca529bfa5d24426b91d70cd Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240 Thanks to @f41gh7 for providing the initial idea for deadlock fix at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1248 --- lib/promscrape/discovery/kubernetes/api_watcher.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/lib/promscrape/discovery/kubernetes/api_watcher.go b/lib/promscrape/discovery/kubernetes/api_watcher.go index fe9fb071df..92a9a6cd66 100644 --- a/lib/promscrape/discovery/kubernetes/api_watcher.go +++ b/lib/promscrape/discovery/kubernetes/api_watcher.go @@ -74,6 +74,11 @@ func newAPIWatcher(apiServer string, ac *promauth.Config, sdc *SDConfig, swcFunc func (aw *apiWatcher) mustStart() { aw.gw.startWatchersForRole(aw.role, aw) + if aw.role == "endpoints" || aw.role == "endpointslices" { + // endpoints and endpointslices watchers query pod and service objects. So start watchers for these roles as well. + aw.gw.startWatchersForRole("pod", nil) + aw.gw.startWatchersForRole("service", nil) + } } func (aw *apiWatcher) mustStop() { @@ -280,7 +285,6 @@ func (gw *groupWatcher) getObjectByRole(role, namespace, name string) object { func (gw *groupWatcher) getCachedObjectByRole(role, namespace, name string) object { key := namespace + "/" + name - gw.startWatchersForRole(role, nil) uws := gw.getURLWatchers() for _, uw := range uws { if uw.role != role { @@ -309,9 +313,6 @@ func (gw *groupWatcher) refreshEndpointsLabels(namespace, key string) { } func (gw *groupWatcher) refreshObjectLabels(role, namespace, key string) { - // There is no need in starting url watcher for the given role, - // since there is no (namespace, key) object yet for this role. - // gw.startWatchersForRole(role, nil) uws := gw.getURLWatchers() for _, uw := range uws { if uw.role != role { @@ -470,6 +471,10 @@ func (uw *urlWatcher) subscribeAPIWatcher(aw *apiWatcher) { func (uw *urlWatcher) registerPendingAPIWatchers() { uw.mu.Lock() + if len(uw.awsPending) == 0 { + uw.mu.Unlock() + return + } awsPending := make([]*apiWatcher, 0, len(uw.awsPending)) for aw := range uw.awsPending { awsPending = append(awsPending, aw) From 56b6b893ce821eaa14ce6a6730b26e0f342b3670 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 27 Apr 2021 15:36:31 +0300 Subject: [PATCH 07/16] lib/mergeset: split rows ingestion among multiple shards This improves rows ingestion on systems with many CPU cores by reducing lock contention. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1244 Thanks to @waldoweng for the original idea and draft implementation at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1243 --- docs/CHANGELOG.md | 1 + lib/mergeset/table.go | 171 ++++++++++++++++++++++++++++----------- lib/storage/partition.go | 53 ++++++------ 3 files changed, 150 insertions(+), 75 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index bfd6364958..5c0270c62a 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -9,6 +9,7 @@ sort: 15 * FEATURE: vmauth: add ability to set madatory query args in `url_prefix`. For example, `url_prefix: http://vm:8428/?extra_label=team=dev` would add `extra_label=team=dev` query arg to all the incoming requests. See [the example](https://docs.victoriametrics.com/vmauth.html#auth-config) for more details. * FEATURE: add OpenTSDB migration option to vmctl. See more details [here](https://docs.victoriametrics.com/vmctl#migrating-data-from-opentsdb). Thanks to @johnseekins! +* FEATURE: improved new time series registration speed on systems with many CPU cores. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1244). Thanks to @waldoweng for the idea and [draft implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1243). * BUGFIX: vmagent: properly update `role: endpoints` and `role: endpointslices` scrape targets if the underlying service changes in `kubernetes_sd_config`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240). * BUGFIX: vmagent: apply `scrape_timeout` on receiving the first response byte from `stream_parse: true` scrape targets. Previously it was applied to receiving and *processing* the full response stream. This could result in false timeout errors when scrape target exposes millions of metrics as described [here](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1017#issuecomment-767235047). diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index a5ce4d4ff6..b6bf3f470a 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -103,9 +103,10 @@ type Table struct { partsLock sync.Mutex parts []*partWrapper - rawItemsBlocks []*inmemoryBlock - rawItemsLock sync.Mutex - rawItemsLastFlushTime uint64 + // rawItems contains recently added items that haven't been converted to parts yet. + // + // rawItems aren't used in search for performance reasons + rawItems rawItemsShards snapshotLock sync.RWMutex @@ -124,6 +125,97 @@ type Table struct { rawItemsPendingFlushesWG syncwg.WaitGroup } +type rawItemsShards struct { + shardIdx uint64 + + // shards reduce lock contention when adding rows on multi-CPU systems. + shards []rawItemsShard +} + +// The number of shards for rawItems per table. +// +// Higher number of shards reduces CPU contention and increases the max bandwidth on multi-core systems. +var rawItemsShardsPerTable = cgroup.AvailableCPUs() + +const maxBlocksPerShard = 512 + +func (riss *rawItemsShards) init() { + riss.shards = make([]rawItemsShard, rawItemsShardsPerTable) +} + +func (riss *rawItemsShards) addItems(tb *Table, items [][]byte) error { + n := atomic.AddUint64(&riss.shardIdx, 1) + shards := riss.shards + idx := n % uint64(len(shards)) + shard := &shards[idx] + return shard.addItems(tb, items) +} + +func (riss *rawItemsShards) Len() int { + n := 0 + for i := range riss.shards { + n += riss.shards[i].Len() + } + return n +} + +type rawItemsShard struct { + mu sync.Mutex + ibs []*inmemoryBlock + lastFlushTime uint64 +} + +func (ris *rawItemsShard) Len() int { + ris.mu.Lock() + n := 0 + for _, ib := range ris.ibs { + n += len(ib.items) + } + ris.mu.Unlock() + return n +} + +func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) error { + var err error + var blocksToMerge []*inmemoryBlock + + ris.mu.Lock() + ibs := ris.ibs + if len(ibs) == 0 { + ib := getInmemoryBlock() + ibs = append(ibs, ib) + ris.ibs = ibs + } + ib := ibs[len(ibs)-1] + for _, item := range items { + if !ib.Add(item) { + ib = getInmemoryBlock() + if !ib.Add(item) { + putInmemoryBlock(ib) + err = fmt.Errorf("cannot insert an item %q into an empty inmemoryBlock; it looks like the item is too large? len(item)=%d", item, len(item)) + break + } + ibs = append(ibs, ib) + ris.ibs = ibs + } + } + if len(ibs) >= maxBlocksPerShard { + blocksToMerge = ibs + ris.ibs = make([]*inmemoryBlock, 0, maxBlocksPerShard) + ris.lastFlushTime = fasttime.UnixTimestamp() + } + ris.mu.Unlock() + + if blocksToMerge == nil { + // Fast path. + return err + } + + // Slow path: merge blocksToMerge. + tb.mergeRawItemsBlocks(blocksToMerge) + return err +} + type partWrapper struct { p *part @@ -195,6 +287,7 @@ func OpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallb flockF: flockF, stopCh: make(chan struct{}), } + tb.rawItems.init() tb.startPartMergers() tb.startRawItemsFlusher() @@ -314,11 +407,7 @@ func (tb *Table) UpdateMetrics(m *TableMetrics) { m.ItemsMerged += atomic.LoadUint64(&tb.itemsMerged) m.AssistedMerges += atomic.LoadUint64(&tb.assistedMerges) - tb.rawItemsLock.Lock() - for _, ib := range tb.rawItemsBlocks { - m.PendingItems += uint64(len(ib.items)) - } - tb.rawItemsLock.Unlock() + m.PendingItems += uint64(tb.rawItems.Len()) tb.partsLock.Lock() m.PartsCount += uint64(len(tb.parts)) @@ -352,42 +441,10 @@ func (tb *Table) UpdateMetrics(m *TableMetrics) { // AddItems adds the given items to the tb. func (tb *Table) AddItems(items [][]byte) error { - var err error - var blocksToMerge []*inmemoryBlock - - tb.rawItemsLock.Lock() - if len(tb.rawItemsBlocks) == 0 { - ib := getInmemoryBlock() - tb.rawItemsBlocks = append(tb.rawItemsBlocks, ib) + if err := tb.rawItems.addItems(tb, items); err != nil { + return fmt.Errorf("cannot insert data into %q: %w", tb.path, err) } - ib := tb.rawItemsBlocks[len(tb.rawItemsBlocks)-1] - for _, item := range items { - if !ib.Add(item) { - ib = getInmemoryBlock() - if !ib.Add(item) { - putInmemoryBlock(ib) - err = fmt.Errorf("cannot insert an item %q into an empty inmemoryBlock on %q; it looks like the item is too large? len(item)=%d", - item, tb.path, len(item)) - break - } - tb.rawItemsBlocks = append(tb.rawItemsBlocks, ib) - } - } - if len(tb.rawItemsBlocks) >= 512 { - blocksToMerge = tb.rawItemsBlocks - tb.rawItemsBlocks = nil - tb.rawItemsLastFlushTime = fasttime.UnixTimestamp() - } - tb.rawItemsLock.Unlock() - - if blocksToMerge == nil { - // Fast path. - return err - } - - // Slow path: merge blocksToMerge. - tb.mergeRawItemsBlocks(blocksToMerge) - return err + return nil } // getParts appends parts snapshot to dst and returns it. @@ -522,9 +579,25 @@ func (tb *Table) DebugFlush() { } func (tb *Table) flushRawItems(isFinal bool) { + tb.rawItems.flush(tb, isFinal) +} + +func (riss *rawItemsShards) flush(tb *Table, isFinal bool) { tb.rawItemsPendingFlushesWG.Add(1) defer tb.rawItemsPendingFlushesWG.Done() + var wg sync.WaitGroup + wg.Add(len(riss.shards)) + for i := range riss.shards { + go func(ris *rawItemsShard) { + ris.flush(tb, isFinal) + wg.Done() + }(&riss.shards[i]) + } + wg.Wait() +} + +func (ris *rawItemsShard) flush(tb *Table, isFinal bool) { mustFlush := false currentTime := fasttime.UnixTimestamp() flushSeconds := int64(rawItemsFlushInterval.Seconds()) @@ -533,14 +606,14 @@ func (tb *Table) flushRawItems(isFinal bool) { } var blocksToMerge []*inmemoryBlock - tb.rawItemsLock.Lock() - if isFinal || currentTime-tb.rawItemsLastFlushTime > uint64(flushSeconds) { + ris.mu.Lock() + if isFinal || currentTime-ris.lastFlushTime > uint64(flushSeconds) { mustFlush = true - blocksToMerge = tb.rawItemsBlocks - tb.rawItemsBlocks = nil - tb.rawItemsLastFlushTime = currentTime + blocksToMerge = ris.ibs + ris.ibs = make([]*inmemoryBlock, 0, maxBlocksPerShard) + ris.lastFlushTime = currentTime } - tb.rawItemsLock.Unlock() + ris.mu.Unlock() if mustFlush { tb.mergeRawItemsBlocks(blocksToMerge) diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 3d7f43fac8..07bb23ca2d 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -437,54 +437,49 @@ func (pt *partition) AddRows(rows []rawRow) { } type rawRowsShards struct { - lock sync.Mutex - shardIdx int + shardIdx uint64 // Shards reduce lock contention when adding rows on multi-CPU systems. shards []rawRowsShard } -func (rrs *rawRowsShards) init() { - rrs.shards = make([]rawRowsShard, rawRowsShardsPerPartition) +func (rrss *rawRowsShards) init() { + rrss.shards = make([]rawRowsShard, rawRowsShardsPerPartition) } -func (rrs *rawRowsShards) addRows(pt *partition, rows []rawRow) { - rrs.lock.Lock() - rrs.shardIdx++ - if rrs.shardIdx >= len(rrs.shards) { - rrs.shardIdx = 0 - } - shard := &rrs.shards[rrs.shardIdx] - rrs.lock.Unlock() - +func (rrss *rawRowsShards) addRows(pt *partition, rows []rawRow) { + n := atomic.AddUint64(&rrss.shardIdx, 1) + shards := rrss.shards + idx := n % uint64(len(shards)) + shard := &shards[idx] shard.addRows(pt, rows) } -func (rrs *rawRowsShards) Len() int { +func (rrss *rawRowsShards) Len() int { n := 0 - for i := range rrs.shards[:] { - n += rrs.shards[i].Len() + for i := range rrss.shards[:] { + n += rrss.shards[i].Len() } return n } type rawRowsShard struct { - lock sync.Mutex + mu sync.Mutex rows []rawRow lastFlushTime uint64 } func (rrs *rawRowsShard) Len() int { - rrs.lock.Lock() + rrs.mu.Lock() n := len(rrs.rows) - rrs.lock.Unlock() + rrs.mu.Unlock() return n } func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) { var rrss []*rawRows - rrs.lock.Lock() + rrs.mu.Lock() if cap(rrs.rows) == 0 { rrs.rows = getRawRowsMaxSize().rows } @@ -506,7 +501,7 @@ func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) { rrss = append(rrss, rr) rrs.lastFlushTime = fasttime.UnixTimestamp() } - rrs.lock.Unlock() + rrs.mu.Unlock() for _, rr := range rrss { pt.addRowsPart(rr.rows) @@ -752,10 +747,16 @@ func (pt *partition) flushRawRows(isFinal bool) { pt.rawRows.flush(pt, isFinal) } -func (rrs *rawRowsShards) flush(pt *partition, isFinal bool) { - for i := range rrs.shards[:] { - rrs.shards[i].flush(pt, isFinal) +func (rrss *rawRowsShards) flush(pt *partition, isFinal bool) { + var wg sync.WaitGroup + wg.Add(len(rrss.shards)) + for i := range rrss.shards { + go func(rrs *rawRowsShard) { + rrs.flush(pt, isFinal) + wg.Done() + }(&rrss.shards[i]) } + wg.Wait() } func (rrs *rawRowsShard) flush(pt *partition, isFinal bool) { @@ -766,12 +767,12 @@ func (rrs *rawRowsShard) flush(pt *partition, isFinal bool) { flushSeconds = 1 } - rrs.lock.Lock() + rrs.mu.Lock() if isFinal || currentTime-rrs.lastFlushTime > uint64(flushSeconds) { rr = getRawRowsMaxSize() rrs.rows, rr.rows = rr.rows, rrs.rows } - rrs.lock.Unlock() + rrs.mu.Unlock() if rr != nil { pt.addRowsPart(rr.rows) From 87179c68395b3c0e8589e48873866265ebd4c34a Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 27 Apr 2021 16:41:22 +0300 Subject: [PATCH 08/16] lib/{storage,mergeset}: fix `unaligned 64-bit atomic operation` panic for 32-bit architectures The panic has been introduced in 56b6b893ce821eaa14ce6a6730b26e0f342b3670 --- lib/mergeset/table.go | 6 +++--- lib/storage/partition.go | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index b6bf3f470a..ddd0bac405 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -126,7 +126,7 @@ type Table struct { } type rawItemsShards struct { - shardIdx uint64 + shardIdx uint32 // shards reduce lock contention when adding rows on multi-CPU systems. shards []rawItemsShard @@ -144,9 +144,9 @@ func (riss *rawItemsShards) init() { } func (riss *rawItemsShards) addItems(tb *Table, items [][]byte) error { - n := atomic.AddUint64(&riss.shardIdx, 1) + n := atomic.AddUint32(&riss.shardIdx, 1) shards := riss.shards - idx := n % uint64(len(shards)) + idx := n % uint32(len(shards)) shard := &shards[idx] return shard.addItems(tb, items) } diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 07bb23ca2d..4bf6f833be 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -437,7 +437,7 @@ func (pt *partition) AddRows(rows []rawRow) { } type rawRowsShards struct { - shardIdx uint64 + shardIdx uint32 // Shards reduce lock contention when adding rows on multi-CPU systems. shards []rawRowsShard @@ -448,9 +448,9 @@ func (rrss *rawRowsShards) init() { } func (rrss *rawRowsShards) addRows(pt *partition, rows []rawRow) { - n := atomic.AddUint64(&rrss.shardIdx, 1) + n := atomic.AddUint32(&rrss.shardIdx, 1) shards := rrss.shards - idx := n % uint64(len(shards)) + idx := n % uint32(len(shards)) shard := &shards[idx] shard.addRows(pt, rows) } From 15609ee447572d59505b8deead6143fa10ec8b5d Mon Sep 17 00:00:00 2001 From: Nikolay Date: Wed, 28 Apr 2021 23:41:15 +0300 Subject: [PATCH 09/16] changes vmalert Querier with per rule querier (#1249) * changes vmalert Querier with per rule querier it allows to changes some parametrs based on rule setting for instance - alert type, tenant for cluster version or event endpoint url. --- app/vmalert/alerting.go | 15 ++++++----- app/vmalert/alerting_test.go | 14 ++++++---- app/vmalert/datasource/datasource.go | 7 ++++- app/vmalert/datasource/init.go | 2 +- app/vmalert/datasource/vm.go | 40 +++++++++++++++++++++++++--- app/vmalert/datasource/vm_test.go | 19 +++++++------ app/vmalert/group.go | 20 +++++++------- app/vmalert/group_test.go | 28 ++++++++++++++----- app/vmalert/helpers_test.go | 9 ++++++- app/vmalert/main.go | 8 +++--- app/vmalert/manager.go | 11 ++++---- app/vmalert/manager_test.go | 8 +++--- app/vmalert/recording.go | 9 ++++--- app/vmalert/recording_test.go | 9 ++++--- app/vmalert/remoteread/init.go | 2 +- app/vmalert/rule.go | 3 +-- 16 files changed, 139 insertions(+), 65 deletions(-) diff --git a/app/vmalert/alerting.go b/app/vmalert/alerting.go index 9eff009bcf..c454d66bc3 100644 --- a/app/vmalert/alerting.go +++ b/app/vmalert/alerting.go @@ -29,6 +29,8 @@ type AlertingRule struct { GroupID uint64 GroupName string + q datasource.Querier + // guard status fields mu sync.RWMutex // stores list of active alerts @@ -49,7 +51,7 @@ type alertingRuleMetrics struct { active *gauge } -func newAlertingRule(group *Group, cfg config.Rule) *AlertingRule { +func newAlertingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rule) *AlertingRule { ar := &AlertingRule{ Type: cfg.Type, RuleID: cfg.ID, @@ -60,6 +62,7 @@ func newAlertingRule(group *Group, cfg config.Rule) *AlertingRule { Annotations: cfg.Annotations, GroupID: group.ID(), GroupName: group.Name, + q: qb.BuildWithParams(datasource.QuerierParams{DataSourceType: &cfg.Type}), alerts: make(map[uint64]*notifier.Alert), metrics: &alertingRuleMetrics{}, } @@ -121,8 +124,8 @@ func (ar *AlertingRule) ID() uint64 { // Exec executes AlertingRule expression via the given Querier. // Based on the Querier results AlertingRule maintains notifier.Alerts -func (ar *AlertingRule) Exec(ctx context.Context, q datasource.Querier, series bool) ([]prompbmarshal.TimeSeries, error) { - qMetrics, err := q.Query(ctx, ar.Expr, ar.Type) +func (ar *AlertingRule) Exec(ctx context.Context, series bool) ([]prompbmarshal.TimeSeries, error) { + qMetrics, err := ar.q.Query(ctx, ar.Expr) ar.mu.Lock() defer ar.mu.Unlock() @@ -139,7 +142,7 @@ func (ar *AlertingRule) Exec(ctx context.Context, q datasource.Querier, series b } } - qFn := func(query string) ([]datasource.Metric, error) { return q.Query(ctx, query, ar.Type) } + qFn := func(query string) ([]datasource.Metric, error) { return ar.q.Query(ctx, query) } updated := make(map[uint64]struct{}) // update list of active alerts for _, m := range qMetrics { @@ -407,7 +410,7 @@ func (ar *AlertingRule) Restore(ctx context.Context, q datasource.Querier, lookb return fmt.Errorf("querier is nil") } - qFn := func(query string) ([]datasource.Metric, error) { return q.Query(ctx, query, ar.Type) } + qFn := func(query string) ([]datasource.Metric, error) { return ar.q.Query(ctx, query) } // account for external labels in filter var labelsFilter string @@ -420,7 +423,7 @@ func (ar *AlertingRule) Restore(ctx context.Context, q datasource.Querier, lookb // remote write protocol which is used for state persistence in vmalert. expr := fmt.Sprintf("last_over_time(%s{alertname=%q%s}[%ds])", alertForStateMetricName, ar.Name, labelsFilter, int(lookback.Seconds())) - qMetrics, err := q.Query(ctx, expr, ar.Type) + qMetrics, err := q.Query(ctx, expr) if err != nil { return err } diff --git a/app/vmalert/alerting_test.go b/app/vmalert/alerting_test.go index e5f10cc31b..ea5bb51dcf 100644 --- a/app/vmalert/alerting_test.go +++ b/app/vmalert/alerting_test.go @@ -294,11 +294,12 @@ func TestAlertingRule_Exec(t *testing.T) { for _, tc := range testCases { t.Run(tc.rule.Name, func(t *testing.T) { fq := &fakeQuerier{} + tc.rule.q = fq tc.rule.GroupID = fakeGroup.ID() for _, step := range tc.steps { fq.reset() fq.add(step...) - if _, err := tc.rule.Exec(context.TODO(), fq, false); err != nil { + if _, err := tc.rule.Exec(context.TODO(), false); err != nil { t.Fatalf("unexpected err: %s", err) } // artificial delay between applying steps @@ -410,6 +411,7 @@ func TestAlertingRule_Restore(t *testing.T) { t.Run(tc.rule.Name, func(t *testing.T) { fq := &fakeQuerier{} tc.rule.GroupID = fakeGroup.ID() + tc.rule.q = fq fq.add(tc.metrics...) if err := tc.rule.Restore(context.TODO(), fq, time.Hour, nil); err != nil { t.Fatalf("unexpected err: %s", err) @@ -437,17 +439,18 @@ func TestAlertingRule_Exec_Negative(t *testing.T) { fq := &fakeQuerier{} ar := newTestAlertingRule("test", 0) ar.Labels = map[string]string{"job": "test"} + ar.q = fq // successful attempt fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "bar")) - _, err := ar.Exec(context.TODO(), fq, false) + _, err := ar.Exec(context.TODO(), false) if err != nil { t.Fatal(err) } // label `job` will collide with rule extra label and will make both time series equal fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "baz")) - _, err = ar.Exec(context.TODO(), fq, false) + _, err = ar.Exec(context.TODO(), false) if !errors.Is(err, errDuplicate) { t.Fatalf("expected to have %s error; got %s", errDuplicate, err) } @@ -456,7 +459,7 @@ func TestAlertingRule_Exec_Negative(t *testing.T) { expErr := "connection reset by peer" fq.setErr(errors.New(expErr)) - _, err = ar.Exec(context.TODO(), fq, false) + _, err = ar.Exec(context.TODO(), false) if err == nil { t.Fatalf("expected to get err; got nil") } @@ -544,8 +547,9 @@ func TestAlertingRule_Template(t *testing.T) { t.Run(tc.rule.Name, func(t *testing.T) { fq := &fakeQuerier{} tc.rule.GroupID = fakeGroup.ID() + tc.rule.q = fq fq.add(tc.metrics...) - if _, err := tc.rule.Exec(context.TODO(), fq, false); err != nil { + if _, err := tc.rule.Exec(context.TODO(), false); err != nil { t.Fatalf("unexpected err: %s", err) } for hash, expAlert := range tc.expAlerts { diff --git a/app/vmalert/datasource/datasource.go b/app/vmalert/datasource/datasource.go index 4625d09eed..fbdd36abcf 100644 --- a/app/vmalert/datasource/datasource.go +++ b/app/vmalert/datasource/datasource.go @@ -4,11 +4,16 @@ import ( "context" ) +// QuerierBuilder builds Querier with given params. +type QuerierBuilder interface { + BuildWithParams(params QuerierParams) Querier +} + // Querier interface wraps Query method which // executes given query and returns list of Metrics // as result type Querier interface { - Query(ctx context.Context, query string, engine Type) ([]Metric, error) + Query(ctx context.Context, query string) ([]Metric, error) } // Metric is the basic entity which should be return by datasource diff --git a/app/vmalert/datasource/init.go b/app/vmalert/datasource/init.go index 9d500fa387..7d27b9d55e 100644 --- a/app/vmalert/datasource/init.go +++ b/app/vmalert/datasource/init.go @@ -28,7 +28,7 @@ var ( ) // Init creates a Querier from provided flag values. -func Init() (Querier, error) { +func Init() (QuerierBuilder, error) { if *addr == "" { return nil, fmt.Errorf("datasource.url is empty") } diff --git a/app/vmalert/datasource/vm.go b/app/vmalert/datasource/vm.go index e06b3af3d8..6e92db2c17 100644 --- a/app/vmalert/datasource/vm.go +++ b/app/vmalert/datasource/vm.go @@ -81,6 +81,7 @@ type VMStorage struct { appendTypePrefix bool lookBack time.Duration queryStep time.Duration + dataSourceType Type } const queryPath = "/api/v1/query" @@ -89,6 +90,38 @@ const graphitePath = "/render" const prometheusPrefix = "/prometheus" const graphitePrefix = "/graphite" +// QuerierParams params for Querier. +type QuerierParams struct { + DataSourceType *Type +} + +// Clone makes clone of VMStorage, shares http client. +func (s *VMStorage) Clone() *VMStorage { + return &VMStorage{ + c: s.c, + datasourceURL: s.datasourceURL, + basicAuthUser: s.basicAuthUser, + basicAuthPass: s.basicAuthPass, + lookBack: s.lookBack, + queryStep: s.queryStep, + appendTypePrefix: s.appendTypePrefix, + dataSourceType: s.dataSourceType, + } +} + +// ApplyParams - changes given querier params. +func (s *VMStorage) ApplyParams(params QuerierParams) *VMStorage { + if params.DataSourceType != nil { + s.dataSourceType = *params.DataSourceType + } + return s +} + +// BuildWithParams - implements interface. +func (s *VMStorage) BuildWithParams(params QuerierParams) Querier { + return s.Clone().ApplyParams(params) +} + // NewVMStorage is a constructor for VMStorage func NewVMStorage(baseURL, basicAuthUser, basicAuthPass string, lookBack time.Duration, queryStep time.Duration, appendTypePrefix bool, c *http.Client) *VMStorage { return &VMStorage{ @@ -99,18 +132,19 @@ func NewVMStorage(baseURL, basicAuthUser, basicAuthPass string, lookBack time.Du appendTypePrefix: appendTypePrefix, lookBack: lookBack, queryStep: queryStep, + dataSourceType: NewPrometheusType(), } } // Query reads metrics from datasource by given query and type -func (s *VMStorage) Query(ctx context.Context, query string, dataSourceType Type) ([]Metric, error) { - switch dataSourceType.name { +func (s *VMStorage) Query(ctx context.Context, query string) ([]Metric, error) { + switch s.dataSourceType.name { case "", prometheusType: return s.queryDataSource(ctx, query, s.setPrometheusReqParams, parsePrometheusResponse) case graphiteType: return s.queryDataSource(ctx, query, s.setGraphiteReqParams, parseGraphiteResponse) default: - return nil, fmt.Errorf("engine not found: %q", dataSourceType) + return nil, fmt.Errorf("engine not found: %q", s.dataSourceType.name) } } diff --git a/app/vmalert/datasource/vm_test.go b/app/vmalert/datasource/vm_test.go index 2c61481330..9ed26ef749 100644 --- a/app/vmalert/datasource/vm_test.go +++ b/app/vmalert/datasource/vm_test.go @@ -70,25 +70,25 @@ func TestVMSelectQuery(t *testing.T) { srv := httptest.NewServer(mux) defer srv.Close() am := NewVMStorage(srv.URL, basicAuthName, basicAuthPass, time.Minute, 0, false, srv.Client()) - if _, err := am.Query(ctx, query, NewPrometheusType()); err == nil { + if _, err := am.Query(ctx, query); err == nil { t.Fatalf("expected connection error got nil") } - if _, err := am.Query(ctx, query, NewPrometheusType()); err == nil { + if _, err := am.Query(ctx, query); err == nil { t.Fatalf("expected invalid response status error got nil") } - if _, err := am.Query(ctx, query, NewPrometheusType()); err == nil { + if _, err := am.Query(ctx, query); err == nil { t.Fatalf("expected response body error got nil") } - if _, err := am.Query(ctx, query, NewPrometheusType()); err == nil { + if _, err := am.Query(ctx, query); err == nil { t.Fatalf("expected error status got nil") } - if _, err := am.Query(ctx, query, NewPrometheusType()); err == nil { + if _, err := am.Query(ctx, query); err == nil { t.Fatalf("expected unknown status got nil") } - if _, err := am.Query(ctx, query, NewPrometheusType()); err == nil { + if _, err := am.Query(ctx, query); err == nil { t.Fatalf("expected non-vector resultType error got nil") } - m, err := am.Query(ctx, query, NewPrometheusType()) + m, err := am.Query(ctx, query) if err != nil { t.Fatalf("unexpected %s", err) } @@ -106,7 +106,10 @@ func TestVMSelectQuery(t *testing.T) { m[0].Labels[0].Name != expected.Labels[0].Name { t.Fatalf("unexpected metric %+v want %+v", m[0], expected) } - m, err = am.Query(ctx, queryRender, NewGraphiteType()) + + dst := NewGraphiteType() + q := am.BuildWithParams(QuerierParams{&dst}) + m, err = q.Query(ctx, queryRender) if err != nil { t.Fatalf("unexpected %s", err) } diff --git a/app/vmalert/group.go b/app/vmalert/group.go index e8a73b25f9..f9ded5d861 100644 --- a/app/vmalert/group.go +++ b/app/vmalert/group.go @@ -49,7 +49,7 @@ func newGroupMetrics(name, file string) *groupMetrics { return m } -func newGroup(cfg config.Group, defaultInterval time.Duration, labels map[string]string) *Group { +func newGroup(cfg config.Group, qb datasource.QuerierBuilder, defaultInterval time.Duration, labels map[string]string) *Group { g := &Group{ Type: cfg.Type, Name: cfg.Name, @@ -81,17 +81,17 @@ func newGroup(cfg config.Group, defaultInterval time.Duration, labels map[string } r.Labels[k] = v } - rules[i] = g.newRule(r) + rules[i] = g.newRule(qb, r) } g.Rules = rules return g } -func (g *Group) newRule(rule config.Rule) Rule { +func (g *Group) newRule(qb datasource.QuerierBuilder, rule config.Rule) Rule { if rule.Alert != "" { - return newAlertingRule(g, rule) + return newAlertingRule(qb, g, rule) } - return newRecordingRule(g, rule) + return newRecordingRule(qb, g, rule) } // ID return unique group ID that consists of @@ -106,7 +106,7 @@ func (g *Group) ID() uint64 { } // Restore restores alerts state for group rules -func (g *Group) Restore(ctx context.Context, q datasource.Querier, lookback time.Duration, labels map[string]string) error { +func (g *Group) Restore(ctx context.Context, qb datasource.QuerierBuilder, lookback time.Duration, labels map[string]string) error { for _, rule := range g.Rules { rr, ok := rule.(*AlertingRule) if !ok { @@ -115,6 +115,7 @@ func (g *Group) Restore(ctx context.Context, q datasource.Querier, lookback time if rr.For < 1 { continue } + q := qb.BuildWithParams(datasource.QuerierParams{}) if err := rr.Restore(ctx, q, lookback, labels); err != nil { return fmt.Errorf("error while restoring rule %q: %w", rule, err) } @@ -189,7 +190,7 @@ func (g *Group) close() { var skipRandSleepOnGroupStart bool -func (g *Group) start(ctx context.Context, querier datasource.Querier, nts []notifier.Notifier, rw *remotewrite.Client) { +func (g *Group) start(ctx context.Context, nts []notifier.Notifier, rw *remotewrite.Client) { defer func() { close(g.finishedCh) }() // Spread group rules evaluation over time in order to reduce load on VictoriaMetrics. @@ -213,7 +214,7 @@ func (g *Group) start(ctx context.Context, querier datasource.Querier, nts []not } logger.Infof("group %q started; interval=%v; concurrency=%d", g.Name, g.Interval, g.Concurrency) - e := &executor{querier, nts, rw} + e := &executor{nts, rw} t := time.NewTicker(g.Interval) defer t.Stop() for { @@ -256,7 +257,6 @@ func (g *Group) start(ctx context.Context, querier datasource.Querier, nts []not } type executor struct { - querier datasource.Querier notifiers []notifier.Notifier rw *remotewrite.Client } @@ -310,7 +310,7 @@ func (e *executor) exec(ctx context.Context, rule Rule, returnSeries bool, inter execDuration.UpdateDuration(execStart) }() - tss, err := rule.Exec(ctx, e.querier, returnSeries) + tss, err := rule.Exec(ctx, returnSeries) if err != nil { execErrors.Inc() return fmt.Errorf("rule %q: failed to execute: %w", rule, err) diff --git a/app/vmalert/group_test.go b/app/vmalert/group_test.go index 093817abf6..7f1103278c 100644 --- a/app/vmalert/group_test.go +++ b/app/vmalert/group_test.go @@ -6,6 +6,8 @@ import ( "testing" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" ) @@ -105,20 +107,32 @@ func TestUpdateWith(t *testing.T) { {Record: "foo5"}, }, }, + { + "update datasource type", + []config.Rule{ + {Alert: "foo1", Type: datasource.NewPrometheusType()}, + {Alert: "foo3", Type: datasource.NewGraphiteType()}, + }, + []config.Rule{ + {Alert: "foo1", Type: datasource.NewGraphiteType()}, + {Alert: "foo10", Type: datasource.NewPrometheusType()}, + }, + }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { g := &Group{Name: "test"} + qb := &fakeQuerier{} for _, r := range tc.currentRules { r.ID = config.HashRule(r) - g.Rules = append(g.Rules, g.newRule(r)) + g.Rules = append(g.Rules, g.newRule(qb, r)) } ng := &Group{Name: "test"} for _, r := range tc.newRules { r.ID = config.HashRule(r) - ng.Rules = append(ng.Rules, ng.newRule(r)) + ng.Rules = append(ng.Rules, ng.newRule(qb, r)) } err := g.updateWith(ng) @@ -156,11 +170,11 @@ func TestGroupStart(t *testing.T) { t.Fatalf("failed to parse rules: %s", err) } const evalInterval = time.Millisecond - g := newGroup(groups[0], evalInterval, map[string]string{"cluster": "east-1"}) - g.Concurrency = 2 - - fn := &fakeNotifier{} fs := &fakeQuerier{} + fn := &fakeNotifier{} + + g := newGroup(groups[0], fs, evalInterval, map[string]string{"cluster": "east-1"}) + g.Concurrency = 2 const inst1, inst2, job = "foo", "bar", "baz" m1 := metricWithLabels(t, "instance", inst1, "job", job) @@ -195,7 +209,7 @@ func TestGroupStart(t *testing.T) { fs.add(m1) fs.add(m2) go func() { - g.start(context.Background(), fs, []notifier.Notifier{fn}, nil) + g.start(context.Background(), []notifier.Notifier{fn}, nil) close(finished) }() diff --git a/app/vmalert/helpers_test.go b/app/vmalert/helpers_test.go index 3f879d4661..9c31222df2 100644 --- a/app/vmalert/helpers_test.go +++ b/app/vmalert/helpers_test.go @@ -38,7 +38,11 @@ func (fq *fakeQuerier) add(metrics ...datasource.Metric) { fq.Unlock() } -func (fq *fakeQuerier) Query(_ context.Context, _ string, _ datasource.Type) ([]datasource.Metric, error) { +func (fq *fakeQuerier) BuildWithParams(_ datasource.QuerierParams) datasource.Querier { + return fq +} + +func (fq *fakeQuerier) Query(_ context.Context, _ string) ([]datasource.Metric, error) { fq.Lock() defer fq.Unlock() if fq.err != nil { @@ -160,6 +164,9 @@ func compareAlertingRules(t *testing.T, a, b *AlertingRule) error { if !reflect.DeepEqual(a.Labels, b.Labels) { return fmt.Errorf("expected to have labels %#v; got %#v", a.Labels, b.Labels) } + if a.Type.String() != b.Type.String() { + return fmt.Errorf("expected to have Type %#v; got %#v", a.Type.String(), b.Type.String()) + } return nil } diff --git a/app/vmalert/main.go b/app/vmalert/main.go index a76fe5869a..a2a5ff75a1 100644 --- a/app/vmalert/main.go +++ b/app/vmalert/main.go @@ -140,10 +140,10 @@ func newManager(ctx context.Context) (*manager, error) { } manager := &manager{ - groups: make(map[uint64]*Group), - querier: q, - notifiers: nts, - labels: map[string]string{}, + groups: make(map[uint64]*Group), + querierBuilder: q, + notifiers: nts, + labels: map[string]string{}, } rw, err := remotewrite.Init(ctx) if err != nil { diff --git a/app/vmalert/manager.go b/app/vmalert/manager.go index baaf0effb5..cd248f5bd2 100644 --- a/app/vmalert/manager.go +++ b/app/vmalert/manager.go @@ -15,11 +15,12 @@ import ( // manager controls group states type manager struct { - querier datasource.Querier - notifiers []notifier.Notifier + querierBuilder datasource.QuerierBuilder + notifiers []notifier.Notifier rw *remotewrite.Client - rr datasource.Querier + // remote read builder. + rr datasource.QuerierBuilder wg sync.WaitGroup labels map[string]string @@ -74,7 +75,7 @@ func (m *manager) startGroup(ctx context.Context, group *Group, restore bool) { m.wg.Add(1) id := group.ID() go func() { - group.start(ctx, m.querier, m.notifiers, m.rw) + group.start(ctx, m.notifiers, m.rw) m.wg.Done() }() m.groups[id] = group @@ -89,7 +90,7 @@ func (m *manager) update(ctx context.Context, path []string, validateTpl, valida groupsRegistry := make(map[uint64]*Group) for _, cfg := range groupsCfg { - ng := newGroup(cfg, *evaluationInterval, m.labels) + ng := newGroup(cfg, m.querierBuilder, *evaluationInterval, m.labels) groupsRegistry[ng.ID()] = ng } diff --git a/app/vmalert/manager_test.go b/app/vmalert/manager_test.go index 8762ef6cb3..984505a3a4 100644 --- a/app/vmalert/manager_test.go +++ b/app/vmalert/manager_test.go @@ -37,9 +37,9 @@ func TestManagerEmptyRulesDir(t *testing.T) { // Should be executed with -race flag func TestManagerUpdateConcurrent(t *testing.T) { m := &manager{ - groups: make(map[uint64]*Group), - querier: &fakeQuerier{}, - notifiers: []notifier.Notifier{&fakeNotifier{}}, + groups: make(map[uint64]*Group), + querierBuilder: &fakeQuerier{}, + notifiers: []notifier.Notifier{&fakeNotifier{}}, } paths := []string{ "config/testdata/dir/rules0-good.rules", @@ -242,7 +242,7 @@ func TestManagerUpdate(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { ctx, cancel := context.WithCancel(context.TODO()) - m := &manager{groups: make(map[uint64]*Group), querier: &fakeQuerier{}} + m := &manager{groups: make(map[uint64]*Group), querierBuilder: &fakeQuerier{}} path := []string{tc.initPath} if err := m.update(ctx, path, true, true, false); err != nil { t.Fatalf("failed to complete initial rules update: %s", err) diff --git a/app/vmalert/recording.go b/app/vmalert/recording.go index 5e8f780f07..698c19b86b 100644 --- a/app/vmalert/recording.go +++ b/app/vmalert/recording.go @@ -25,6 +25,8 @@ type RecordingRule struct { Labels map[string]string GroupID uint64 + q datasource.Querier + // guard status fields mu sync.RWMutex // stores last moment of time Exec was called @@ -52,7 +54,7 @@ func (rr *RecordingRule) ID() uint64 { return rr.RuleID } -func newRecordingRule(group *Group, cfg config.Rule) *RecordingRule { +func newRecordingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rule) *RecordingRule { rr := &RecordingRule{ Type: cfg.Type, RuleID: cfg.ID, @@ -61,6 +63,7 @@ func newRecordingRule(group *Group, cfg config.Rule) *RecordingRule { Labels: cfg.Labels, GroupID: group.ID(), metrics: &recordingRuleMetrics{}, + q: qb.BuildWithParams(datasource.QuerierParams{DataSourceType: &cfg.Type}), } labels := fmt.Sprintf(`recording=%q, group=%q, id="%d"`, rr.Name, group.Name, rr.ID()) @@ -82,12 +85,12 @@ func (rr *RecordingRule) Close() { } // Exec executes RecordingRule expression via the given Querier. -func (rr *RecordingRule) Exec(ctx context.Context, q datasource.Querier, series bool) ([]prompbmarshal.TimeSeries, error) { +func (rr *RecordingRule) Exec(ctx context.Context, series bool) ([]prompbmarshal.TimeSeries, error) { if !series { return nil, nil } - qMetrics, err := q.Query(ctx, rr.Expr, rr.Type) + qMetrics, err := rr.q.Query(ctx, rr.Expr) rr.mu.Lock() defer rr.mu.Unlock() diff --git a/app/vmalert/recording_test.go b/app/vmalert/recording_test.go index 1ab35295c9..80877b6ff9 100644 --- a/app/vmalert/recording_test.go +++ b/app/vmalert/recording_test.go @@ -76,7 +76,8 @@ func TestRecoridngRule_ToTimeSeries(t *testing.T) { t.Run(tc.rule.Name, func(t *testing.T) { fq := &fakeQuerier{} fq.add(tc.metrics...) - tss, err := tc.rule.Exec(context.TODO(), fq, true) + tc.rule.q = fq + tss, err := tc.rule.Exec(context.TODO(), true) if err != nil { t.Fatalf("unexpected Exec err: %s", err) } @@ -95,8 +96,8 @@ func TestRecoridngRule_ToTimeSeriesNegative(t *testing.T) { fq := &fakeQuerier{} expErr := "connection reset by peer" fq.setErr(errors.New(expErr)) - - _, err := rr.Exec(context.TODO(), fq, true) + rr.q = fq + _, err := rr.Exec(context.TODO(), true) if err == nil { t.Fatalf("expected to get err; got nil") } @@ -111,7 +112,7 @@ func TestRecoridngRule_ToTimeSeriesNegative(t *testing.T) { fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "foo")) fq.add(metricWithValueAndLabels(t, 2, "__name__", "foo", "job", "bar")) - _, err = rr.Exec(context.TODO(), fq, true) + _, err = rr.Exec(context.TODO(), true) if err == nil { t.Fatalf("expected to get err; got nil") } diff --git a/app/vmalert/remoteread/init.go b/app/vmalert/remoteread/init.go index 4a2153c107..3900989b4a 100644 --- a/app/vmalert/remoteread/init.go +++ b/app/vmalert/remoteread/init.go @@ -26,7 +26,7 @@ var ( // Init creates a Querier from provided flag values. // Returns nil if addr flag wasn't set. -func Init() (datasource.Querier, error) { +func Init() (datasource.QuerierBuilder, error) { if *addr == "" { return nil, nil } diff --git a/app/vmalert/rule.go b/app/vmalert/rule.go index 293e929bf7..7aa3ddff0a 100644 --- a/app/vmalert/rule.go +++ b/app/vmalert/rule.go @@ -4,7 +4,6 @@ import ( "context" "errors" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" ) @@ -18,7 +17,7 @@ type Rule interface { // Exec executes the rule with given context // and Querier. If returnSeries is true, Exec // may return TimeSeries as result of execution - Exec(ctx context.Context, q datasource.Querier, returnSeries bool) ([]prompbmarshal.TimeSeries, error) + Exec(ctx context.Context, returnSeries bool) ([]prompbmarshal.TimeSeries, error) // UpdateWith performs modification of current Rule // with fields of the given Rule. UpdateWith(Rule) error From 4e5a88114a971e98da41cc359ebd172f1b963276 Mon Sep 17 00:00:00 2001 From: Nikolay Date: Thu, 29 Apr 2021 10:10:24 +0300 Subject: [PATCH 10/16] vmagent kubernetes_sd tests (#1253) * first part of tests for kubernetes sd * makes linter happy * added more test cases * adds pub/sub for tests --- .../discovery/kubernetes/api_watcher_test.go | 851 ++++++++++++++++++ 1 file changed, 851 insertions(+) diff --git a/lib/promscrape/discovery/kubernetes/api_watcher_test.go b/lib/promscrape/discovery/kubernetes/api_watcher_test.go index f971512cfb..e0262cea56 100644 --- a/lib/promscrape/discovery/kubernetes/api_watcher_test.go +++ b/lib/promscrape/discovery/kubernetes/api_watcher_test.go @@ -1,8 +1,13 @@ package kubernetes import ( + "encoding/json" + "net/http" + "net/http/httptest" "reflect" + "sync" "testing" + "time" ) func TestGetAPIPathsWithNamespaces(t *testing.T) { @@ -175,3 +180,849 @@ func TestParseBookmark(t *testing.T) { t.Fatalf("unexpected resourceVersion; got %q; want %q", bm.Metadata.ResourceVersion, expectedResourceVersion) } } + +func TestGetScrapeWorkObjects(t *testing.T) { + type testCase struct { + name string + sdc *SDConfig + expectedTargetsLen int + initAPIObjectsByRole map[string][]byte + // will be added for watching api. + watchAPIMustAddObjectsByRole map[string][][]byte + } + cases := []testCase{ + { + name: "simple 1 pod with update 1", + sdc: &SDConfig{ + Role: "pod", + }, + expectedTargetsLen: 2, + initAPIObjectsByRole: map[string][]byte{ + "pod": []byte(`{ + "kind": "PodList", + "apiVersion": "v1", + "metadata": { + "resourceVersion": "72425" + }, + "items": [ +{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": { + "labels": { + "app.kubernetes.io/instance": "stack", + "pod-template-hash": "5b9c6cf775" + }, + "name": "stack-name-1", + "namespace": "default" + }, + "spec": { + "containers": [ + { + "name": "generic-pod" + } + ] + }, + "status": { + "podIP": "10.10.2.2", + "phase": "Running" + } +}]}`), + }, + watchAPIMustAddObjectsByRole: map[string][][]byte{ + "pod": { + []byte(`{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": { + "labels": { + "app.kubernetes.io/instance": "stack", + "pod-template-hash": "5b9c6cf775" + }, + "name": "stack-next-2", + "namespace": "default" + }, + "spec": { + "containers": [ + { + "name": "generic-pod-2" + } + ] + }, + "status": { + "podIP": "10.10.2.5", + "phase": "Running" + } +}`), + }, + }, + }, + { + name: "endpoints with service update", + sdc: &SDConfig{ + Role: "endpoints", + }, + expectedTargetsLen: 2, + initAPIObjectsByRole: map[string][]byte{ + "service": []byte(`{ + "kind": "ServiceList", + "apiVersion": "v1", + "metadata": { + "resourceVersion": "72425" + }, + "items": []}`), + "endpoints": []byte(`{ + "kind": "EndpointsList", + "apiVersion": "v1", + "metadata": { + "resourceVersion": "72425" + }, + "items": [ +{ + "apiVersion": "v1", + "kind": "Endpoints", + "metadata": { + "annotations": { + "endpoints.kubernetes.io/last-change-trigger-time": "2021-04-27T02:06:55Z" + }, + "labels": { + "app.kubernetes.io/managed-by": "Helm" + }, + "name": "stack-kube-state-metrics", + "namespace": "default" + }, + "subsets": [ + { + "addresses": [ + { + "ip": "10.244.0.5", + "nodeName": "kind-control-plane", + "targetRef": { + "kind": "Pod", + "name": "stack-kube-state-metrics-db5879bf8-bg78p", + "namespace": "default" + } + } + ], + "ports": [ + { + "name": "http", + "port": 8080, + "protocol": "TCP" + } + ] + } + ] +} +]}`), + "pod": []byte(`{ + "kind": "PodList", + "apiVersion": "v1", + "metadata": { + "resourceVersion": "72425" + }, + "items": [ +{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": { + "labels": { + "app.kubernetes.io/instance": "stack" + }, + "name": "stack-kube-state-metrics-db5879bf8-bg78p", + "namespace": "default" + }, + "spec": { + "containers": [ + { + "image": "k8s.gcr.io/kube-state-metrics/kube-state-metrics:v1.9.8", + "name": "kube-state-metrics", + "ports": [ + { + "containerPort": 8080, + "protocol": "TCP" + } + ] + }, + { + "image": "k8s.gcr.io/kube-state-metrics/kube-state-metrics:v1.9.8", + "name": "kube-state-metrics-2", + "ports": [ + { + "containerPort": 8085, + "protocol": "TCP" + } + ] + } + ] + }, + "status": { + "phase": "Running", + "podIP": "10.244.0.5" + } +} +]}`), + }, + watchAPIMustAddObjectsByRole: map[string][][]byte{ + "service": { + []byte(`{ + "apiVersion": "v1", + "kind": "Service", + "metadata": { + "annotations": { + "meta.helm.sh/release-name": "stack" + }, + "labels": { + "app.kubernetes.io/managed-by": "Helm", + "app.kubernetes.io/name": "kube-state-metrics" + }, + "name": "stack-kube-state-metrics", + "namespace": "default" + }, + "spec": { + "clusterIP": "10.97.109.249", + "ports": [ + { + "name": "http", + "port": 8080, + "protocol": "TCP", + "targetPort": 8080 + } + ], + "selector": { + "app.kubernetes.io/instance": "stack", + "app.kubernetes.io/name": "kube-state-metrics" + }, + "type": "ClusterIP" + } +}`), + }, + }, + }, + { + name: "get nodes", + sdc: &SDConfig{Role: "node"}, + expectedTargetsLen: 2, + initAPIObjectsByRole: map[string][]byte{ + "node": []byte(`{ + "kind": "NodeList", + "apiVersion": "v1", + "metadata": { + "selfLink": "/api/v1/nodes", + "resourceVersion": "22627" + }, + "items": [ +{ + "apiVersion": "v1", + "kind": "Node", + "metadata": { + "annotations": { + "kubeadm.alpha.kubernetes.io/cri-socket": "/run/containerd/containerd.sock" + }, + "labels": { + "beta.kubernetes.io/arch": "amd64", + "beta.kubernetes.io/os": "linux" + }, + "name": "kind-control-plane-new" + }, + "status": { + "addresses": [ + { + "address": "10.10.2.5", + "type": "InternalIP" + }, + { + "address": "kind-control-plane", + "type": "Hostname" + } + ] + } +} +]}`), + }, + watchAPIMustAddObjectsByRole: map[string][][]byte{ + "node": { + []byte(`{ + "apiVersion": "v1", + "kind": "Node", + "metadata": { + "annotations": { + "kubeadm.alpha.kubernetes.io/cri-socket": "/run/containerd/containerd.sock" + }, + "labels": { + "beta.kubernetes.io/arch": "amd64", + "beta.kubernetes.io/os": "linux" + }, + "name": "kind-control-plane" + }, + "status": { + "addresses": [ + { + "address": "10.10.2.2", + "type": "InternalIP" + }, + { + "address": "kind-control-plane", + "type": "Hostname" + } + ] + } +}`), + }, + }, + }, + { + name: "2 service with 2 added", + sdc: &SDConfig{Role: "service"}, + expectedTargetsLen: 4, + initAPIObjectsByRole: map[string][]byte{ + "service": []byte(`{ + "kind": "ServiceList", + "apiVersion": "v1", + "metadata": { + "selfLink": "/api/v1/services", + "resourceVersion": "60485" + }, + "items": [ + { + "metadata": { + "name": "kube-dns", + "namespace": "kube-system", + "labels": { + "k8s-app": "kube-dns" + } + }, + "spec": { + "ports": [ + { + "name": "dns", + "protocol": "UDP", + "port": 53, + "targetPort": 53 + }, + { + "name": "dns-tcp", + "protocol": "TCP", + "port": 53, + "targetPort": 53 + } + ], + "selector": { + "k8s-app": "kube-dns" + }, + "clusterIP": "10.96.0.10", + "type": "ClusterIP", + "sessionAffinity": "None" + } + } + ] +}`), + }, + watchAPIMustAddObjectsByRole: map[string][][]byte{ + "service": { + []byte(`{ + "metadata": { + "name": "another-service-1", + "namespace": "default", + "labels": { + "k8s-app": "kube-dns" + } + }, + "spec": { + "ports": [ + { + "name": "some-app-1-tcp", + "protocol": "TCP", + "port": 1053, + "targetPort": 1053 + } + ], + "selector": { + "k8s-app": "some-app-1" + }, + "clusterIP": "10.96.0.10", + "type": "ClusterIP" + } +}`), + []byte(`{ + "metadata": { + "name": "another-service-2", + "namespace": "default", + "labels": { + "k8s-app": "kube-dns" + } + }, + "spec": { + "ports": [ + { + "name": "some-app-2-tcp", + "protocol": "TCP", + "port": 1053, + "targetPort": 1053 + } + ], + "selector": { + "k8s-app": "some-app-2" + }, + "clusterIP": "10.96.0.15", + "type": "ClusterIP" + } +}`), + }, + }, + }, + { + name: "1 ingress with 2 add", + expectedTargetsLen: 3, + sdc: &SDConfig{ + Role: "ingress", + }, + initAPIObjectsByRole: map[string][]byte{ + "ingress": []byte(`{ + "kind": "IngressList", + "apiVersion": "extensions/v1beta1", + "metadata": { + "selfLink": "/apis/extensions/v1beta1/ingresses", + "resourceVersion": "351452" + }, + "items": [ + { + "metadata": { + "name": "test-ingress", + "namespace": "default" + }, + "spec": { + "backend": { + "serviceName": "testsvc", + "servicePort": 80 + }, + "rules": [ + { + "host": "foobar" + } + ] + }, + "status": { + "loadBalancer": { + "ingress": [ + { + "ip": "172.17.0.2" + } + ] + } + } + } + ] +}`), + }, + watchAPIMustAddObjectsByRole: map[string][][]byte{ + "ingress": { + []byte(`{ + "metadata": { + "name": "test-ingress-1", + "namespace": "default" + }, + "spec": { + "backend": { + "serviceName": "testsvc", + "servicePort": 801 + }, + "rules": [ + { + "host": "foobar" + } + ] + }, + "status": { + "loadBalancer": { + "ingress": [ + { + "ip": "172.17.0.3" + } + ] + } + } +}`), + []byte(`{ + "metadata": { + "name": "test-ingress-2", + "namespace": "default" + }, + "spec": { + "backend": { + "serviceName": "testsvc", + "servicePort": 802 + }, + "rules": [ + { + "host": "foobar" + } + ] + }, + "status": { + "loadBalancer": { + "ingress": [ + { + "ip": "172.17.0.3" + } + ] + } + } +}`), + }, + }, + }, + { + name: "7 endpointslices slice with 1 service update", + sdc: &SDConfig{ + Role: "endpointslices", + }, + expectedTargetsLen: 7, + initAPIObjectsByRole: map[string][]byte{ + "endpointslices": []byte(`{ + "kind": "EndpointSliceList", + "apiVersion": "discovery.k8s.io/v1beta1", + "metadata": { + "selfLink": "/apis/discovery.k8s.io/v1beta1/endpointslices", + "resourceVersion": "1177" + }, + "items": [ + { + "metadata": { + "name": "kubernetes", + "namespace": "default", + "labels": { + "kubernetes.io/service-name": "kubernetes" + } + }, + "addressType": "IPv4", + "endpoints": [ + { + "addresses": [ + "172.18.0.2" + ], + "conditions": { + "ready": true + } + } + ], + "ports": [ + { + "name": "https", + "protocol": "TCP", + "port": 6443 + } + ] + }, + { + "metadata": { + "name": "kube-dns", + "namespace": "kube-system", + "labels": { + "kubernetes.io/service-name": "kube-dns" + } + }, + "addressType": "IPv4", + "endpoints": [ + { + "addresses": [ + "10.244.0.3" + ], + "conditions": { + "ready": true + }, + "targetRef": { + "kind": "Pod", + "namespace": "kube-system", + "name": "coredns-66bff467f8-z8czk", + "uid": "36a545ff-dbba-4192-a5f6-1dbb0c21c73d", + "resourceVersion": "603" + }, + "topology": { + "kubernetes.io/hostname": "kind-control-plane" + } + }, + { + "addresses": [ + "10.244.0.4" + ], + "conditions": { + "ready": true + }, + "targetRef": { + "kind": "Pod", + "namespace": "kube-system", + "name": "coredns-66bff467f8-kpbhk", + "uid": "db38d8b4-847a-4e82-874c-fe444fba2718", + "resourceVersion": "576" + }, + "topology": { + "kubernetes.io/hostname": "kind-control-plane" + } + } + ], + "ports": [ + { + "name": "dns-tcp", + "protocol": "TCP", + "port": 53 + }, + { + "name": "metrics", + "protocol": "TCP", + "port": 9153 + }, + { + "name": "dns", + "protocol": "UDP", + "port": 53 + } + ] + } + ] +}`), + "pod": []byte(`{ + "kind": "PodList", + "apiVersion": "v1", + "metadata": { + "resourceVersion": "72425" + }, + "items": [ +{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": { + "labels": { + "app.kubernetes.io/instance": "stack", + "pod-template-hash": "5b9c6cf775" + }, + "name": "coredns-66bff467f8-kpbhk", + "namespace": "kube-system" + }, + "spec": { + "containers": [ + { + "name": "generic-pod" + } + ] + }, + "status": { + "podIP": "10.10.2.2", + "phase": "Running" + } +}, +{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": { + "labels": { + "app.kubernetes.io/instance": "stack", + "pod-template-hash": "5b9c6cf775" + }, + "name": "coredns-66bff467f8-z8czk", + "namespace": "kube-system" + }, + "spec": { + "containers": [ + { + "name": "generic-pod" + } + ] + }, + "status": { + "podIP": "10.10.2.3", + "phase": "Running" + } +} +]}`), + "service": []byte(`{ + "kind": "ServiceList", + "apiVersion": "v1", + "metadata": { + "selfLink": "/api/v1/services", + "resourceVersion": "60485" + }, + "items": [ + { + "metadata": { + "name": "kube-dns", + "namespace": "kube-system", + "labels": { + "k8s-app": "kube-dns" + } + }, + "spec": { + "ports": [ + { + "name": "dns", + "protocol": "UDP", + "port": 53, + "targetPort": 53 + }, + { + "name": "dns-tcp", + "protocol": "TCP", + "port": 53, + "targetPort": 53 + } + ], + "selector": { + "k8s-app": "kube-dns" + }, + "clusterIP": "10.96.0.10", + "type": "ClusterIP", + "sessionAffinity": "None" + } + } + ] +}`), + }, + watchAPIMustAddObjectsByRole: map[string][][]byte{ + "service": { + []byte(` { + "metadata": { + "name": "kube-dns", + "namespace": "kube-system", + "labels": { + "k8s-app": "kube-dns", + "some-new": "label-value" + } + }, + "spec": { + "ports": [ + { + "name": "dns-tcp", + "protocol": "TCP", + "port": 53, + "targetPort": 53 + } + ], + "selector": { + "k8s-app": "kube-dns" + }, + "clusterIP": "10.96.0.10", + "type": "ClusterIP", + "sessionAffinity": "None" + } + } +`), + }, + }, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + watchPublishersByRole := make(map[string]*watchObjectBroadcast) + mux := http.NewServeMux() + for role, obj := range tc.initAPIObjectsByRole { + watchBroadCaster := &watchObjectBroadcast{} + watchPublishersByRole[role] = watchBroadCaster + apiPath := getAPIPath(getObjectTypeByRole(role), "", "") + addAPIURLHandler(t, mux, apiPath, obj, watchBroadCaster) + } + testAPIServer := httptest.NewServer(mux) + tc.sdc.APIServer = testAPIServer.URL + ac, err := newAPIConfig(tc.sdc, "", func(metaLabels map[string]string) interface{} { + var res []interface{} + for k := range metaLabels { + res = append(res, k) + } + return res + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + tc.sdc.cfg = ac + ac.aw.mustStart() + defer ac.aw.mustStop() + _, err = tc.sdc.GetScrapeWorkObjects() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + // need to wait, for subscribers to start. + time.Sleep(80 * time.Millisecond) + for role, objs := range tc.watchAPIMustAddObjectsByRole { + for _, obj := range objs { + watchPublishersByRole[role].pub(obj) + } + } + for _, ch := range watchPublishersByRole { + ch.shutdown() + } + if len(tc.watchAPIMustAddObjectsByRole) > 0 { + // updates async, need to wait some time. + // i guess, poll is not reliable. + time.Sleep(80 * time.Millisecond) + } + got, err := tc.sdc.GetScrapeWorkObjects() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(got) != tc.expectedTargetsLen { + t.Fatalf("unexpected count of objects, got: %d, want: %d", len(got), tc.expectedTargetsLen) + } + }) + } +} + +type watchObjectBroadcast struct { + mu sync.Mutex + subscribers []chan []byte +} + +func (o *watchObjectBroadcast) pub(msg []byte) { + o.mu.Lock() + defer o.mu.Unlock() + for i := range o.subscribers { + c := o.subscribers[i] + select { + case c <- msg: + default: + } + } +} + +func (o *watchObjectBroadcast) sub() <-chan []byte { + c := make(chan []byte, 5) + o.mu.Lock() + o.subscribers = append(o.subscribers, c) + o.mu.Unlock() + return c +} + +func (o *watchObjectBroadcast) shutdown() { + o.mu.Lock() + defer o.mu.Unlock() + for i := range o.subscribers { + c := o.subscribers[i] + close(c) + } +} + +func addAPIURLHandler(t *testing.T, mux *http.ServeMux, apiURL string, initObjects []byte, notifier *watchObjectBroadcast) { + t.Helper() + mux.HandleFunc(apiURL, func(w http.ResponseWriter, r *http.Request) { + if needWatch := r.URL.Query().Get("watch"); len(needWatch) > 0 { + // start watch handler + w.WriteHeader(200) + flusher := w.(http.Flusher) + flusher.Flush() + updateC := notifier.sub() + for obj := range updateC { + we := WatchEvent{ + Type: "ADDED", + Object: obj, + } + szd, err := json.Marshal(we) + if err != nil { + t.Fatalf("cannot serialize: %v", err) + } + _, _ = w.Write(szd) + flusher.Flush() + } + return + } + w.WriteHeader(200) + _, _ = w.Write(initObjects) + }) +} From 25b8d71df595af0a380ba36eaeb8e3688781299d Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 29 Apr 2021 09:47:51 +0300 Subject: [PATCH 11/16] vendor: update github.com/klauspost/compress from v1.12.1 to v1.12.2 --- go.mod | 2 +- go.sum | 4 +- .../klauspost/compress/flate/level5.go | 17 ++- .../klauspost/compress/flate/level6.go | 25 ++++ .../klauspost/compress/zstd/README.md | 24 ++-- .../klauspost/compress/zstd/enc_best.go | 14 ++ .../klauspost/compress/zstd/enc_better.go | 73 ++++++++++- .../github.com/klauspost/compress/zstd/zip.go | 120 ++++++++++++++++++ vendor/modules.txt | 2 +- 9 files changed, 260 insertions(+), 21 deletions(-) create mode 100644 vendor/github.com/klauspost/compress/zstd/zip.go diff --git a/go.mod b/go.mod index 9bdbcfd3da..59799f1000 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/snappy v0.0.3 github.com/influxdata/influxdb v1.8.5 - github.com/klauspost/compress v1.12.1 + github.com/klauspost/compress v1.12.2 github.com/prometheus/client_golang v1.10.0 // indirect github.com/prometheus/common v0.21.0 // indirect github.com/prometheus/prometheus v1.8.2-0.20201119142752-3ad25a6dc3d9 diff --git a/go.sum b/go.sum index cc86cd06e9..e168f0462a 100644 --- a/go.sum +++ b/go.sum @@ -517,8 +517,8 @@ github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0 github.com/klauspost/compress v1.10.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.11.0/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= -github.com/klauspost/compress v1.12.1 h1:/+xsCsk06wE38cyiqOR/o7U2fSftcH72xD+BQXmja/g= -github.com/klauspost/compress v1.12.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= +github.com/klauspost/compress v1.12.2 h1:2KCfW3I9M7nSc5wOqXAlW2v2U6v+w6cbjvbfp+OykW8= +github.com/klauspost/compress v1.12.2/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6/go.mod h1:+ZoRqAPRLkC4NPOvfYeR5KNOrY6TD+/sAC3HXPZgDYg= github.com/klauspost/pgzip v1.0.2-0.20170402124221-0bf5dcad4ada/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= diff --git a/vendor/github.com/klauspost/compress/flate/level5.go b/vendor/github.com/klauspost/compress/flate/level5.go index d513f1ffd3..293a3a320b 100644 --- a/vendor/github.com/klauspost/compress/flate/level5.go +++ b/vendor/github.com/klauspost/compress/flate/level5.go @@ -182,12 +182,27 @@ func (e *fastEncL5) Encode(dst *tokens, src []byte) { // match. But, prior to the match, src[nextEmit:s] are unmatched. Emit // them as literal bytes. - // Extend the 4-byte match as long as possible. if l == 0 { + // Extend the 4-byte match as long as possible. l = e.matchlenLong(s+4, t+4, src) + 4 } else if l == maxMatchLength { l += e.matchlenLong(s+l, t+l, src) } + + // Try to locate a better match by checking the end of best match... + if sAt := s + l; l < 30 && sAt < sLimit { + eLong := e.bTable[hash7(load6432(src, sAt), tableBits)].Cur.offset + // Test current + t2 := eLong - e.cur - l + off := s - t2 + if t2 >= 0 && off < maxMatchOffset && off > 0 { + if l2 := e.matchlenLong(s, t2, src); l2 > l { + t = t2 + l = l2 + } + } + } + // Extend backwards for t > 0 && s > nextEmit && src[t-1] == src[s-1] { s-- diff --git a/vendor/github.com/klauspost/compress/flate/level6.go b/vendor/github.com/klauspost/compress/flate/level6.go index a52c80ea45..a709977ec4 100644 --- a/vendor/github.com/klauspost/compress/flate/level6.go +++ b/vendor/github.com/klauspost/compress/flate/level6.go @@ -211,6 +211,31 @@ func (e *fastEncL6) Encode(dst *tokens, src []byte) { l += e.matchlenLong(s+l, t+l, src) } + // Try to locate a better match by checking the end-of-match... + if sAt := s + l; sAt < sLimit { + eLong := &e.bTable[hash7(load6432(src, sAt), tableBits)] + // Test current + t2 := eLong.Cur.offset - e.cur - l + off := s - t2 + if off < maxMatchOffset { + if off > 0 && t2 >= 0 { + if l2 := e.matchlenLong(s, t2, src); l2 > l { + t = t2 + l = l2 + } + } + // Test next: + t2 = eLong.Prev.offset - e.cur - l + off := s - t2 + if off > 0 && off < maxMatchOffset && t2 >= 0 { + if l2 := e.matchlenLong(s, t2, src); l2 > l { + t = t2 + l = l2 + } + } + } + } + // Extend backwards for t > 0 && s > nextEmit && src[t-1] == src[s-1] { s-- diff --git a/vendor/github.com/klauspost/compress/zstd/README.md b/vendor/github.com/klauspost/compress/zstd/README.md index 7680bfe1dd..e7d7eb0ae4 100644 --- a/vendor/github.com/klauspost/compress/zstd/README.md +++ b/vendor/github.com/klauspost/compress/zstd/README.md @@ -152,8 +152,8 @@ This package: file out level insize outsize millis mb/s silesia.tar zskp 1 211947520 73101992 643 313.87 silesia.tar zskp 2 211947520 67504318 969 208.38 -silesia.tar zskp 3 211947520 65177448 1899 106.44 -silesia.tar zskp 4 211947520 61381950 8115 24.91 +silesia.tar zskp 3 211947520 64595893 2007 100.68 +silesia.tar zskp 4 211947520 60995370 7691 26.28 cgo zstd: silesia.tar zstd 1 211947520 73605392 543 371.56 @@ -171,8 +171,8 @@ https://files.klauspost.com/compress/gob-stream.7z file out level insize outsize millis mb/s gob-stream zskp 1 1911399616 235022249 3088 590.30 gob-stream zskp 2 1911399616 205669791 3786 481.34 -gob-stream zskp 3 1911399616 185792019 9324 195.48 -gob-stream zskp 4 1911399616 171537212 32113 56.76 +gob-stream zskp 3 1911399616 175034659 9636 189.17 +gob-stream zskp 4 1911399616 167273881 29337 62.13 gob-stream zstd 1 1911399616 249810424 2637 691.26 gob-stream zstd 3 1911399616 208192146 3490 522.31 gob-stream zstd 6 1911399616 193632038 6687 272.56 @@ -187,8 +187,8 @@ http://mattmahoney.net/dc/textdata.html file out level insize outsize millis mb/s enwik9 zskp 1 1000000000 343848582 3609 264.18 enwik9 zskp 2 1000000000 317276632 5746 165.97 -enwik9 zskp 3 1000000000 294540704 11725 81.34 -enwik9 zskp 4 1000000000 276609671 44029 21.66 +enwik9 zskp 3 1000000000 292243069 12162 78.41 +enwik9 zskp 4 1000000000 275241169 36430 26.18 enwik9 zstd 1 1000000000 358072021 3110 306.65 enwik9 zstd 3 1000000000 313734672 4784 199.35 enwik9 zstd 6 1000000000 295138875 10290 92.68 @@ -202,8 +202,8 @@ https://files.klauspost.com/compress/github-june-2days-2019.json.zst file out level insize outsize millis mb/s github-june-2days-2019.json zskp 1 6273951764 699045015 10620 563.40 github-june-2days-2019.json zskp 2 6273951764 617881763 11687 511.96 -github-june-2days-2019.json zskp 3 6273951764 537511906 29252 204.54 -github-june-2days-2019.json zskp 4 6273951764 512796117 97791 61.18 +github-june-2days-2019.json zskp 3 6273951764 524340691 34043 175.75 +github-june-2days-2019.json zskp 4 6273951764 503314661 93811 63.78 github-june-2days-2019.json zstd 1 6273951764 766284037 8450 708.00 github-june-2days-2019.json zstd 3 6273951764 661889476 10927 547.57 github-june-2days-2019.json zstd 6 6273951764 642756859 22996 260.18 @@ -217,8 +217,8 @@ https://files.klauspost.com/compress/rawstudio-mint14.7z file out level insize outsize millis mb/s rawstudio-mint14.tar zskp 1 8558382592 3667489370 20210 403.84 rawstudio-mint14.tar zskp 2 8558382592 3364592300 31873 256.07 -rawstudio-mint14.tar zskp 3 8558382592 3224594213 71751 113.75 -rawstudio-mint14.tar zskp 4 8558382592 3027332295 486243 16.79 +rawstudio-mint14.tar zskp 3 8558382592 3158085214 77675 105.08 +rawstudio-mint14.tar zskp 4 8558382592 3020370044 404956 20.16 rawstudio-mint14.tar zstd 1 8558382592 3609250104 17136 476.27 rawstudio-mint14.tar zstd 3 8558382592 3341679997 29262 278.92 rawstudio-mint14.tar zstd 6 8558382592 3235846406 77904 104.77 @@ -232,8 +232,8 @@ https://files.klauspost.com/compress/nyc-taxi-data-10M.csv.zst file out level insize outsize millis mb/s nyc-taxi-data-10M.csv zskp 1 3325605752 641339945 8925 355.35 nyc-taxi-data-10M.csv zskp 2 3325605752 591748091 11268 281.44 -nyc-taxi-data-10M.csv zskp 3 3325605752 538490114 19880 159.53 -nyc-taxi-data-10M.csv zskp 4 3325605752 495986829 89368 35.49 +nyc-taxi-data-10M.csv zskp 3 3325605752 530289687 25239 125.66 +nyc-taxi-data-10M.csv zskp 4 3325605752 490907191 65939 48.10 nyc-taxi-data-10M.csv zstd 1 3325605752 687399637 8233 385.18 nyc-taxi-data-10M.csv zstd 3 3325605752 598514411 10065 315.07 nyc-taxi-data-10M.csv zstd 6 3325605752 570522953 20038 158.27 diff --git a/vendor/github.com/klauspost/compress/zstd/enc_best.go b/vendor/github.com/klauspost/compress/zstd/enc_best.go index fe3625c5f5..dc1eed5f00 100644 --- a/vendor/github.com/klauspost/compress/zstd/enc_best.go +++ b/vendor/github.com/klauspost/compress/zstd/enc_best.go @@ -220,6 +220,20 @@ encodeLoop: best = bestOf(best, matchAt(candidateL.prev-e.cur, s, uint32(cv), -1)) best = bestOf(best, matchAt(candidateL2.offset-e.cur, s+1, uint32(cv2), -1)) best = bestOf(best, matchAt(candidateL2.prev-e.cur, s+1, uint32(cv2), -1)) + + // See if we can find a better match by checking where the current best ends. + // Use that offset to see if we can find a better full match. + if sAt := best.s + best.length; sAt < sLimit { + nextHashL := hash8(load6432(src, sAt), bestLongTableBits) + candidateEnd := e.longTable[nextHashL] + if pos := candidateEnd.offset - e.cur - best.length; pos >= 0 { + bestEnd := bestOf(best, matchAt(pos, best.s, load3232(src, best.s), -1)) + if pos := candidateEnd.prev - e.cur - best.length; pos >= 0 { + bestEnd = bestOf(bestEnd, matchAt(pos, best.s, load3232(src, best.s), -1)) + } + best = bestEnd + } + } } // We have a match, we can store the forward value diff --git a/vendor/github.com/klauspost/compress/zstd/enc_better.go b/vendor/github.com/klauspost/compress/zstd/enc_better.go index c2ce4a2bac..604954290e 100644 --- a/vendor/github.com/klauspost/compress/zstd/enc_better.go +++ b/vendor/github.com/klauspost/compress/zstd/enc_better.go @@ -412,8 +412,41 @@ encodeLoop: cv = load6432(src, s) } - // A 4-byte match has been found. Update recent offsets. - // We'll later see if more than 4 bytes. + // Try to find a better match by searching for a long match at the end of the current best match + if true && s+matched < sLimit { + nextHashL := hash8(load6432(src, s+matched), betterLongTableBits) + cv := load3232(src, s) + candidateL := e.longTable[nextHashL] + coffsetL := candidateL.offset - e.cur - matched + if coffsetL >= 0 && coffsetL < s && s-coffsetL < e.maxMatchOff && cv == load3232(src, coffsetL) { + // Found a long match, at least 4 bytes. + matchedNext := e.matchlen(s+4, coffsetL+4, src) + 4 + if matchedNext > matched { + t = coffsetL + matched = matchedNext + if debugMatches { + println("long match at end-of-match") + } + } + } + + // Check prev long... + if true { + coffsetL = candidateL.prev - e.cur - matched + if coffsetL >= 0 && coffsetL < s && s-coffsetL < e.maxMatchOff && cv == load3232(src, coffsetL) { + // Found a long match, at least 4 bytes. + matchedNext := e.matchlen(s+4, coffsetL+4, src) + 4 + if matchedNext > matched { + t = coffsetL + matched = matchedNext + if debugMatches { + println("prev long match at end-of-match") + } + } + } + } + } + // A match has been found. Update recent offsets. offset2 = offset1 offset1 = s - t @@ -905,9 +938,41 @@ encodeLoop: } cv = load6432(src, s) } + // Try to find a better match by searching for a long match at the end of the current best match + if s+matched < sLimit { + nextHashL := hash8(load6432(src, s+matched), betterLongTableBits) + cv := load3232(src, s) + candidateL := e.longTable[nextHashL] + coffsetL := candidateL.offset - e.cur - matched + if coffsetL >= 0 && coffsetL < s && s-coffsetL < e.maxMatchOff && cv == load3232(src, coffsetL) { + // Found a long match, at least 4 bytes. + matchedNext := e.matchlen(s+4, coffsetL+4, src) + 4 + if matchedNext > matched { + t = coffsetL + matched = matchedNext + if debugMatches { + println("long match at end-of-match") + } + } + } - // A 4-byte match has been found. Update recent offsets. - // We'll later see if more than 4 bytes. + // Check prev long... + if true { + coffsetL = candidateL.prev - e.cur - matched + if coffsetL >= 0 && coffsetL < s && s-coffsetL < e.maxMatchOff && cv == load3232(src, coffsetL) { + // Found a long match, at least 4 bytes. + matchedNext := e.matchlen(s+4, coffsetL+4, src) + 4 + if matchedNext > matched { + t = coffsetL + matched = matchedNext + if debugMatches { + println("prev long match at end-of-match") + } + } + } + } + } + // A match has been found. Update recent offsets. offset2 = offset1 offset1 = s - t diff --git a/vendor/github.com/klauspost/compress/zstd/zip.go b/vendor/github.com/klauspost/compress/zstd/zip.go new file mode 100644 index 0000000000..e35a0a2f89 --- /dev/null +++ b/vendor/github.com/klauspost/compress/zstd/zip.go @@ -0,0 +1,120 @@ +// Copyright 2019+ Klaus Post. All rights reserved. +// License information can be found in the LICENSE file. + +package zstd + +import ( + "errors" + "io" + "sync" +) + +// ZipMethodWinZip is the method for Zstandard compressed data inside Zip files for WinZip. +// See https://www.winzip.com/win/en/comp_info.html +const ZipMethodWinZip = 93 + +// ZipMethodPKWare is the method number used by PKWARE to indicate Zstandard compression. +// See https://pkware.cachefly.net/webdocs/APPNOTE/APPNOTE-6.3.7.TXT +const ZipMethodPKWare = 20 + +var zipReaderPool sync.Pool + +// newZipReader cannot be used since we would leak goroutines... +func newZipReader(r io.Reader) io.ReadCloser { + dec, ok := zipReaderPool.Get().(*Decoder) + if ok { + dec.Reset(r) + } else { + d, err := NewReader(r, WithDecoderConcurrency(1), WithDecoderLowmem(true)) + if err != nil { + panic(err) + } + dec = d + } + return &pooledZipReader{dec: dec} +} + +type pooledZipReader struct { + mu sync.Mutex // guards Close and Read + dec *Decoder +} + +func (r *pooledZipReader) Read(p []byte) (n int, err error) { + r.mu.Lock() + defer r.mu.Unlock() + if r.dec == nil { + return 0, errors.New("Read after Close") + } + dec, err := r.dec.Read(p) + + return dec, err +} + +func (r *pooledZipReader) Close() error { + r.mu.Lock() + defer r.mu.Unlock() + var err error + if r.dec != nil { + err = r.dec.Reset(nil) + zipReaderPool.Put(r.dec) + r.dec = nil + } + return err +} + +type pooledZipWriter struct { + mu sync.Mutex // guards Close and Read + enc *Encoder +} + +func (w *pooledZipWriter) Write(p []byte) (n int, err error) { + w.mu.Lock() + defer w.mu.Unlock() + if w.enc == nil { + return 0, errors.New("Write after Close") + } + return w.enc.Write(p) +} + +func (w *pooledZipWriter) Close() error { + w.mu.Lock() + defer w.mu.Unlock() + var err error + if w.enc != nil { + err = w.enc.Close() + zipReaderPool.Put(w.enc) + w.enc = nil + } + return err +} + +// ZipCompressor returns a compressor that can be registered with zip libraries. +// The provided encoder options will be used on all encodes. +func ZipCompressor(opts ...EOption) func(w io.Writer) (io.WriteCloser, error) { + var pool sync.Pool + return func(w io.Writer) (io.WriteCloser, error) { + enc, ok := pool.Get().(*Encoder) + if ok { + enc.Reset(w) + } else { + var err error + enc, err = NewWriter(w, opts...) + if err != nil { + return nil, err + } + } + return &pooledZipWriter{enc: enc}, nil + } +} + +// ZipDecompressor returns a decompressor that can be registered with zip libraries. +// See ZipCompressor for example. +func ZipDecompressor() func(r io.Reader) io.ReadCloser { + return func(r io.Reader) io.ReadCloser { + d, err := NewReader(r, WithDecoderConcurrency(1), WithDecoderLowmem(true)) + if err != nil { + panic(err) + } + return d.IOReadCloser() + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 579c6e72ee..ce0259eb6b 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -125,7 +125,7 @@ github.com/jmespath/go-jmespath github.com/jstemmer/go-junit-report github.com/jstemmer/go-junit-report/formatter github.com/jstemmer/go-junit-report/parser -# github.com/klauspost/compress v1.12.1 +# github.com/klauspost/compress v1.12.2 ## explicit github.com/klauspost/compress/flate github.com/klauspost/compress/fse From 2ab1266593708e072ed26d842e5a88eeb747abb6 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 29 Apr 2021 10:14:24 +0300 Subject: [PATCH 12/16] lib/promscrape/discovery/kubernetes: remove a mutex at urlWatcher - use groupWatcher mutex for accessing all the urlWatcher children This simplifies the code a bit and reduces the probability of improper mutex handling and deadlocks. --- .../discovery/kubernetes/api_watcher.go | 194 ++++++------------ .../discovery/kubernetes/endpoints.go | 4 +- .../discovery/kubernetes/endpointslices.go | 4 +- 3 files changed, 62 insertions(+), 140 deletions(-) diff --git a/lib/promscrape/discovery/kubernetes/api_watcher.go b/lib/promscrape/discovery/kubernetes/api_watcher.go index 92a9a6cd66..dd7ba75ca1 100644 --- a/lib/promscrape/discovery/kubernetes/api_watcher.go +++ b/lib/promscrape/discovery/kubernetes/api_watcher.go @@ -33,6 +33,8 @@ type WatchEvent struct { // object is any Kubernetes object. type object interface { key() string + + // getTargetLabels must be called under gw.mu lock. getTargetLabels(gw *groupWatcher) []map[string]string } @@ -231,62 +233,13 @@ var ( }) ) -// getObjectByRole returns an object with the given (namespace, name) key and the given role. -func (gw *groupWatcher) getObjectByRole(role, namespace, name string) object { +func (gw *groupWatcher) getObjectByRoleLocked(role, namespace, name string) object { if gw == nil { // this is needed for testing return nil } - o := gw.getCachedObjectByRole(role, namespace, name) - if o != nil { - // Fast path: the object has been found in the cache. - return o - } - - // The object wasn't found in the cache. Try querying it directly from API server. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1182#issuecomment-813353359 for details. - metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_loads_total{role=%q}`, role)).Inc() - objectType := getObjectTypeByRole(role) - path := getAPIPath(objectType, namespace, "") - path += "/" + name - requestURL := gw.apiServer + path - resp, err := gw.doRequest(requestURL) - if err != nil { - metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_load_errors_total{role=%q}`, role)).Inc() - logger.Errorf("cannot obtain data for object %s (namespace=%q, name=%q): %s", role, namespace, name, err) - return nil - } - data, err := ioutil.ReadAll(resp.Body) - _ = resp.Body.Close() - if err != nil { - metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_load_errors_total{role=%q}`, role)).Inc() - logger.Errorf("cannot read response from %q: %s", requestURL, err) - return nil - } - if resp.StatusCode != http.StatusOK { - if resp.StatusCode == http.StatusNotFound { - metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_load_misses_total{role=%q}`, role)).Inc() - return nil - } - metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_load_errors_total{role=%q}`, role)).Inc() - logger.Errorf("unexpected status code when reading response from %q; got %d; want %d; response body: %q", requestURL, resp.StatusCode, http.StatusOK, data) - return nil - } - parseObject, _ := getObjectParsersForRole(role) - o, err = parseObject(data) - if err != nil { - metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_load_errors_total{role=%q}`, role)).Inc() - logger.Errorf("cannot parse object obtained from %q: %s; response body: %q", requestURL, err, data) - return nil - } - // There is no need in storing the object in urlWatcher cache, since it should be eventually populated there by urlWatcher itself. - return o -} - -func (gw *groupWatcher) getCachedObjectByRole(role, namespace, name string) object { key := namespace + "/" + name - uws := gw.getURLWatchers() - for _, uw := range uws { + for _, uw := range gw.m { if uw.role != role { // Role mismatch continue @@ -295,26 +248,26 @@ func (gw *groupWatcher) getCachedObjectByRole(role, namespace, name string) obje // Namespace mismatch continue } - uw.mu.Lock() - o := uw.objectsByKey[key] - uw.mu.Unlock() - if o != nil { + if o := uw.objectsByKey[key]; o != nil { return o } } return nil } -func (gw *groupWatcher) refreshEndpointsLabels(namespace, key string) { +func (gw *groupWatcher) refreshEndpointsLabelsLocked(namespace, key string) { // Refresh endpoints and endpointslices labels for the corresponding service as Prometheus does. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240 - gw.refreshObjectLabels("endpoints", namespace, key) - gw.refreshObjectLabels("endpointslices", namespace, key) + gw.refreshObjectLabelsLocked("endpoints", namespace, key) + gw.refreshObjectLabelsLocked("endpointslices", namespace, key) } -func (gw *groupWatcher) refreshObjectLabels(role, namespace, key string) { - uws := gw.getURLWatchers() - for _, uw := range uws { +func (gw *groupWatcher) refreshObjectLabelsLocked(role, namespace, key string) { + for _, uw := range gw.m { + if len(uw.aws) == 0 { + // No apiWatchers to notify + continue + } if uw.role != role { // Role mismatch continue @@ -323,16 +276,9 @@ func (gw *groupWatcher) refreshObjectLabels(role, namespace, key string) { // Namespace mismatch continue } - var aws []*apiWatcher - uw.mu.Lock() - o := uw.objectsByKey[key] - if o != nil { - aws = uw.getAPIWatchersLocked() - } - uw.mu.Unlock() - if len(aws) > 0 { + if o := uw.objectsByKey[key]; o != nil { labels := o.getTargetLabels(gw) - for _, aw := range aws { + for aw := range uw.aws { aw.setScrapeWorks(namespace, key, labels) } } @@ -350,14 +296,14 @@ func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) { uw = newURLWatcher(role, namespaces[i], apiURL, gw) gw.m[apiURL] = uw } + if aw != nil { + uw.subscribeAPIWatcherLocked(aw) + } gw.mu.Unlock() if needStart { uw.reloadObjects() go uw.watchForUpdates() } - if aw != nil { - uw.subscribeAPIWatcher(aw) - } } } @@ -374,30 +320,24 @@ func (gw *groupWatcher) doRequest(requestURL string) (*http.Response, error) { } func (gw *groupWatcher) registerPendingAPIWatchers() { - uws := gw.getURLWatchers() - for _, uw := range uws { - uw.registerPendingAPIWatchers() - } -} - -func (gw *groupWatcher) getURLWatchers() []*urlWatcher { gw.mu.Lock() - uws := make([]*urlWatcher, 0, len(gw.m)) + defer gw.mu.Unlock() for _, uw := range gw.m { - uws = append(uws, uw) + uw.registerPendingAPIWatchersLocked() } - gw.mu.Unlock() - return uws } func (gw *groupWatcher) unsubscribeAPIWatcher(aw *apiWatcher) { - uws := gw.getURLWatchers() - for _, uw := range uws { - uw.unsubscribeAPIWatcher(aw) + gw.mu.Lock() + defer gw.mu.Unlock() + for _, uw := range gw.m { + uw.unsubscribeAPIWatcherLocked(aw) } } // urlWatcher watches for an apiURL and updates object states in objectsByKey. +// +// urlWatcher fields must be accessed under gw.mu lock. type urlWatcher struct { role string namespace string @@ -407,14 +347,11 @@ type urlWatcher struct { parseObject parseObjectFunc parseObjectList parseObjectListFunc - // mu protects aws, awsPending, objectsByKey and resourceVersion - mu sync.Mutex - // awsPending contains pending apiWatcher objects, which are registered in a batch. // Batch registering saves CPU time needed for registering big number of Kubernetes objects // shared among big number of scrape jobs, since per-object labels are generated only once // for all the scrape jobs (each scrape job is associated with a single apiWatcher). - // See reloadScrapeWorksForAPIWatchers for details. + // See reloadScrapeWorksForAPIWatchersLocked for details. awsPending map[*apiWatcher]struct{} // aws contains registered apiWatcher objects @@ -458,37 +395,31 @@ func newURLWatcher(role, namespace, apiURL string, gw *groupWatcher) *urlWatcher return uw } -func (uw *urlWatcher) subscribeAPIWatcher(aw *apiWatcher) { - uw.mu.Lock() +func (uw *urlWatcher) subscribeAPIWatcherLocked(aw *apiWatcher) { if _, ok := uw.aws[aw]; !ok { if _, ok := uw.awsPending[aw]; !ok { uw.awsPending[aw] = struct{}{} metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Inc() } } - uw.mu.Unlock() } -func (uw *urlWatcher) registerPendingAPIWatchers() { - uw.mu.Lock() +func (uw *urlWatcher) registerPendingAPIWatchersLocked() { if len(uw.awsPending) == 0 { - uw.mu.Unlock() return } awsPending := make([]*apiWatcher, 0, len(uw.awsPending)) for aw := range uw.awsPending { awsPending = append(awsPending, aw) - delete(uw.awsPending, aw) uw.aws[aw] = struct{}{} } - uw.reloadScrapeWorksForAPIWatchers(awsPending, uw.objectsByKey) - uw.mu.Unlock() + uw.reloadScrapeWorksForAPIWatchersLocked(uw.awsPending) + uw.awsPending = make(map[*apiWatcher]struct{}) metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="working"}`, uw.role)).Add(len(awsPending)) metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Add(-len(awsPending)) } -func (uw *urlWatcher) unsubscribeAPIWatcher(aw *apiWatcher) { - uw.mu.Lock() +func (uw *urlWatcher) unsubscribeAPIWatcherLocked(aw *apiWatcher) { if _, ok := uw.awsPending[aw]; ok { delete(uw.awsPending, aw) metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Dec() @@ -497,20 +428,19 @@ func (uw *urlWatcher) unsubscribeAPIWatcher(aw *apiWatcher) { delete(uw.aws, aw) metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="working"}`, uw.role)).Dec() } - uw.mu.Unlock() } func (uw *urlWatcher) setResourceVersion(resourceVersion string) { - uw.mu.Lock() + uw.gw.mu.Lock() uw.resourceVersion = resourceVersion - uw.mu.Unlock() + uw.gw.mu.Unlock() } // reloadObjects reloads objects to the latest state and returns resourceVersion for the latest state. func (uw *urlWatcher) reloadObjects() string { - uw.mu.Lock() + uw.gw.mu.Lock() resourceVersion := uw.resourceVersion - uw.mu.Unlock() + uw.gw.mu.Unlock() if resourceVersion != "" { // Fast path - there is no need in reloading the objects. return resourceVersion @@ -535,7 +465,7 @@ func (uw *urlWatcher) reloadObjects() string { return "" } - uw.mu.Lock() + uw.gw.mu.Lock() var updated, removed, added int for key := range uw.objectsByKey { if o, ok := objectsByKey[key]; ok { @@ -556,31 +486,34 @@ func (uw *urlWatcher) reloadObjects() string { uw.objectsRemoved.Add(removed) uw.objectsAdded.Add(added) uw.objectsCount.Add(added - removed) + uw.reloadScrapeWorksForAPIWatchersLocked(uw.aws) uw.resourceVersion = metadata.ResourceVersion - aws := uw.getAPIWatchersLocked() - uw.mu.Unlock() - - uw.reloadScrapeWorksForAPIWatchers(aws, objectsByKey) if uw.role == "service" { // Refresh endpoints labels for the corresponding services as Prometheus does. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240 for key := range objectsByKey { - uw.gw.refreshEndpointsLabels(uw.namespace, key) + uw.gw.refreshEndpointsLabelsLocked(uw.namespace, key) } } + uw.gw.mu.Unlock() + logger.Infof("reloaded %d objects from %q", len(objectsByKey), requestURL) return uw.resourceVersion } -func (uw *urlWatcher) reloadScrapeWorksForAPIWatchers(aws []*apiWatcher, objectsByKey map[string]object) { - if len(aws) == 0 { +func (uw *urlWatcher) reloadScrapeWorksForAPIWatchersLocked(awsMap map[*apiWatcher]struct{}) { + if len(awsMap) == 0 { return } + aws := make([]*apiWatcher, 0, len(awsMap)) + for aw := range awsMap { + aws = append(aws, aw) + } swosByKey := make([]map[string][]interface{}, len(aws)) for i := range aws { swosByKey[i] = make(map[string][]interface{}) } - for key, o := range objectsByKey { + for key, o := range uw.objectsByKey { labels := o.getTargetLabels(uw.gw) for i, aw := range aws { swos := aw.getScrapeWorkObjectsForLabels(labels) @@ -594,15 +527,6 @@ func (uw *urlWatcher) reloadScrapeWorksForAPIWatchers(aws []*apiWatcher, objects } } -func (uw *urlWatcher) getAPIWatchersLocked() []*apiWatcher { - awsMap := uw.aws - aws := make([]*apiWatcher, 0, len(awsMap)) - for aw := range awsMap { - aws = append(aws, aw) - } - return aws -} - // watchForUpdates watches for object updates starting from uw.resourceVersion and updates the corresponding objects to the latest state. // // See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes @@ -678,7 +602,7 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error { return err } key := o.key() - uw.mu.Lock() + uw.gw.mu.Lock() if _, ok := uw.objectsByKey[key]; !ok { uw.objectsCount.Inc() uw.objectsAdded.Inc() @@ -686,41 +610,39 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error { uw.objectsUpdated.Inc() } uw.objectsByKey[key] = o - aws := uw.getAPIWatchersLocked() - uw.mu.Unlock() - if len(aws) > 0 { + if len(uw.aws) > 0 { labels := o.getTargetLabels(uw.gw) - for _, aw := range aws { + for aw := range uw.aws { aw.setScrapeWorks(uw.namespace, key, labels) } } if uw.role == "service" { // Refresh endpoints labels for the corresponding service as Prometheus does. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240 - uw.gw.refreshEndpointsLabels(uw.namespace, key) + uw.gw.refreshEndpointsLabelsLocked(uw.namespace, key) } + uw.gw.mu.Unlock() case "DELETED": o, err := uw.parseObject(we.Object) if err != nil { return err } key := o.key() - uw.mu.Lock() + uw.gw.mu.Lock() if _, ok := uw.objectsByKey[key]; ok { uw.objectsCount.Dec() uw.objectsRemoved.Inc() delete(uw.objectsByKey, key) } - aws := uw.getAPIWatchersLocked() - uw.mu.Unlock() - for _, aw := range aws { + for aw := range uw.aws { aw.removeScrapeWorks(uw.namespace, key) } if uw.role == "service" { // Refresh endpoints labels for the corresponding service as Prometheus does. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240 - uw.gw.refreshEndpointsLabels(uw.namespace, key) + uw.gw.refreshEndpointsLabelsLocked(uw.namespace, key) } + uw.gw.mu.Unlock() case "BOOKMARK": // See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks bm, err := parseBookmark(we.Object) diff --git a/lib/promscrape/discovery/kubernetes/endpoints.go b/lib/promscrape/discovery/kubernetes/endpoints.go index af4b16b5e0..68b5c3c3df 100644 --- a/lib/promscrape/discovery/kubernetes/endpoints.go +++ b/lib/promscrape/discovery/kubernetes/endpoints.go @@ -92,7 +92,7 @@ type EndpointPort struct { // See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#endpoints func (eps *Endpoints) getTargetLabels(gw *groupWatcher) []map[string]string { var svc *Service - if o := gw.getObjectByRole("service", eps.Metadata.Namespace, eps.Metadata.Name); o != nil { + if o := gw.getObjectByRoleLocked("service", eps.Metadata.Namespace, eps.Metadata.Name); o != nil { svc = o.(*Service) } podPortsSeen := make(map[*Pod][]int) @@ -140,7 +140,7 @@ func appendEndpointLabelsForAddresses(ms []map[string]string, gw *groupWatcher, for _, ea := range eas { var p *Pod if ea.TargetRef.Name != "" { - if o := gw.getObjectByRole("pod", ea.TargetRef.Namespace, ea.TargetRef.Name); o != nil { + if o := gw.getObjectByRoleLocked("pod", ea.TargetRef.Namespace, ea.TargetRef.Name); o != nil { p = o.(*Pod) } } diff --git a/lib/promscrape/discovery/kubernetes/endpointslices.go b/lib/promscrape/discovery/kubernetes/endpointslices.go index 5e1961e926..4ac24b4750 100644 --- a/lib/promscrape/discovery/kubernetes/endpointslices.go +++ b/lib/promscrape/discovery/kubernetes/endpointslices.go @@ -39,14 +39,14 @@ func parseEndpointSlice(data []byte) (object, error) { // See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#endpointslices func (eps *EndpointSlice) getTargetLabels(gw *groupWatcher) []map[string]string { var svc *Service - if o := gw.getObjectByRole("service", eps.Metadata.Namespace, eps.Metadata.Name); o != nil { + if o := gw.getObjectByRoleLocked("service", eps.Metadata.Namespace, eps.Metadata.Name); o != nil { svc = o.(*Service) } podPortsSeen := make(map[*Pod][]int) var ms []map[string]string for _, ess := range eps.Endpoints { var p *Pod - if o := gw.getObjectByRole("pod", ess.TargetRef.Namespace, ess.TargetRef.Name); o != nil { + if o := gw.getObjectByRoleLocked("pod", ess.TargetRef.Namespace, ess.TargetRef.Name); o != nil { p = o.(*Pod) } for _, epp := range eps.Ports { From 6fa5981e68006a77311c1a74bbfa6fddf6bf3ad0 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 30 Apr 2021 09:19:08 +0300 Subject: [PATCH 13/16] app/vmagent: list user-visible endpoints at `http://vmagent:8429/` While at it, use common WriteAPIHelp function for the listing in vmagent, vmalert and victoria-metrics --- app/victoria-metrics/main.go | 19 +++++-------------- app/vmagent/main.go | 10 +++++++++- app/vmalert/web.go | 20 +++++++------------- docs/CHANGELOG.md | 1 + lib/httpserver/httpserver.go | 10 ++++++++++ 5 files changed, 32 insertions(+), 28 deletions(-) diff --git a/app/victoria-metrics/main.go b/app/victoria-metrics/main.go index 6c2793caa3..72bde8dc90 100644 --- a/app/victoria-metrics/main.go +++ b/app/victoria-metrics/main.go @@ -3,10 +3,8 @@ package main import ( "flag" "fmt" - "io" "net/http" "os" - "path" "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert" @@ -89,14 +87,16 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { if r.Method != "GET" { return false } - fmt.Fprintf(w, "

Single-node VictoriaMetrics.


") + fmt.Fprintf(w, "

Single-node VictoriaMetrics


") fmt.Fprintf(w, "See docs at https://docs.victoriametrics.com/
") - fmt.Fprintf(w, "Useful endpoints:
") - writeAPIHelp(w, [][]string{ + fmt.Fprintf(w, "Useful endpoints:
") + httpserver.WriteAPIHelp(w, [][2]string{ {"/targets", "discovered targets list"}, {"/api/v1/targets", "advanced information about discovered targets in JSON format"}, {"/metrics", "available service metrics"}, {"/api/v1/status/tsdb", "tsdb status page"}, + {"/api/v1/status/top_queries", "top queries"}, + {"/api/v1/status/active_queries", "active queries"}, }) return true } @@ -112,15 +112,6 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { return false } -func writeAPIHelp(w io.Writer, pathList [][]string) { - pathPrefix := httpserver.GetPathPrefix() - for _, p := range pathList { - p, doc := p[0], p[1] - p = path.Join(pathPrefix, p) - fmt.Fprintf(w, "%q - %s
", p, p, doc) - } -} - func usage() { const s = ` victoria-metrics is a time series database and monitoring solution. diff --git a/app/vmagent/main.go b/app/vmagent/main.go index 8d33df888f..22be00942c 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -148,7 +148,15 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { if r.Method != "GET" { return false } - fmt.Fprintf(w, "vmagent - see docs at https://docs.victoriametrics.com/vmagent.html") + fmt.Fprintf(w, "

vmagent

") + fmt.Fprintf(w, "See docs at https://docs.victoriametrics.com/vmagent.html
") + fmt.Fprintf(w, "Useful endpoints:
") + httpserver.WriteAPIHelp(w, [][2]string{ + {"/targets", "discovered targets list"}, + {"/api/v1/targets", "advanced information about discovered targets in JSON format"}, + {"/metrics", "available service metrics"}, + {"/-/reload", "reload configuration"}, + }) return true } path := strings.Replace(r.URL.Path, "//", "/", -1) diff --git a/app/vmalert/web.go b/app/vmalert/web.go index 8fc71359e6..51ff476a71 100644 --- a/app/vmalert/web.go +++ b/app/vmalert/web.go @@ -17,25 +17,19 @@ type requestHandler struct { m *manager } -var pathList = [][]string{ - {"/api/v1/groups", "list all loaded groups and rules"}, - {"/api/v1/alerts", "list all active alerts"}, - {"/api/v1/groupID/alertID/status", "get alert status by ID"}, - // /metrics is served by httpserver by default - {"/metrics", "list of application metrics"}, - {"/-/reload", "reload configuration"}, -} - func (rh *requestHandler) handler(w http.ResponseWriter, r *http.Request) bool { switch r.URL.Path { case "/": if r.Method != "GET" { return false } - for _, path := range pathList { - p, doc := path[0], path[1] - fmt.Fprintf(w, "%q - %s
", p, p, doc) - } + httpserver.WriteAPIHelp(w, [][2]string{ + {"/api/v1/groups", "list all loaded groups and rules"}, + {"/api/v1/alerts", "list all active alerts"}, + {"/api/v1/groupID/alertID/status", "get alert status by ID"}, + {"/metrics", "list of application metrics"}, + {"/-/reload", "reload configuration"}, + }) return true case "/api/v1/groups": data, err := rh.listGroups() diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 5c0270c62a..b1603ead8d 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -10,6 +10,7 @@ sort: 15 * FEATURE: add OpenTSDB migration option to vmctl. See more details [here](https://docs.victoriametrics.com/vmctl#migrating-data-from-opentsdb). Thanks to @johnseekins! * FEATURE: improved new time series registration speed on systems with many CPU cores. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1244). Thanks to @waldoweng for the idea and [draft implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1243). +* FEATURE: vmagent: list user-visible endpoints at `http://vmagent:8429/`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1251). * BUGFIX: vmagent: properly update `role: endpoints` and `role: endpointslices` scrape targets if the underlying service changes in `kubernetes_sd_config`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240). * BUGFIX: vmagent: apply `scrape_timeout` on receiving the first response byte from `stream_parse: true` scrape targets. Previously it was applied to receiving and *processing* the full response stream. This could result in false timeout errors when scrape target exposes millions of metrics as described [here](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1017#issuecomment-767235047). diff --git a/lib/httpserver/httpserver.go b/lib/httpserver/httpserver.go index 5c163bb2f1..51a69e3a8f 100644 --- a/lib/httpserver/httpserver.go +++ b/lib/httpserver/httpserver.go @@ -11,6 +11,7 @@ import ( "net" "net/http" "net/http/pprof" + "path" "runtime" "strconv" "strings" @@ -589,3 +590,12 @@ func IsTLS() bool { func GetPathPrefix() string { return *pathPrefix } + +// WriteAPIHelp writes pathList to w in HTML format. +func WriteAPIHelp(w io.Writer, pathList [][2]string) { + for _, p := range pathList { + p, doc := p[0], p[1] + p = path.Join(*pathPrefix, p) + fmt.Fprintf(w, "%s - %s
", p, p, doc) + } +} From f3a048288e22d8931f12748982ea2a03c37e30e4 Mon Sep 17 00:00:00 2001 From: Roman Khavronenko Date: Fri, 30 Apr 2021 07:46:03 +0100 Subject: [PATCH 14/16] Vmalert: adjust `time` param for datasource queries according to `evaluationInterval` (#1257) * Simplify arguments list for fn `queryDataSource` to improve readbility * vmalert: adjust `time` param according to rule evaluation interval With this change, vmalert will start to use rule's evaluation interval for truncating the `time` param. This is mostly needed to produce consistent time series with timestamps unaffected by vmalert start time. Now, timestamp becomes predictable. Additionally, adjustment is similar to what Grafana does for plotting range graphs. Hence, recording rule series and recording rule expression plotted in grafana suppose to become similar in most of cases. --- app/vmalert/alerting.go | 9 +- app/vmalert/datasource/vm.go | 73 ++++++++----- app/vmalert/datasource/vm_test.go | 170 ++++++++++++++++++++++++++---- app/vmalert/recording.go | 5 +- 4 files changed, 209 insertions(+), 48 deletions(-) diff --git a/app/vmalert/alerting.go b/app/vmalert/alerting.go index c454d66bc3..75c80f2de8 100644 --- a/app/vmalert/alerting.go +++ b/app/vmalert/alerting.go @@ -62,9 +62,12 @@ func newAlertingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rule Annotations: cfg.Annotations, GroupID: group.ID(), GroupName: group.Name, - q: qb.BuildWithParams(datasource.QuerierParams{DataSourceType: &cfg.Type}), - alerts: make(map[uint64]*notifier.Alert), - metrics: &alertingRuleMetrics{}, + q: qb.BuildWithParams(datasource.QuerierParams{ + DataSourceType: &cfg.Type, + EvaluationInterval: group.Interval, + }), + alerts: make(map[uint64]*notifier.Alert), + metrics: &alertingRuleMetrics{}, } labels := fmt.Sprintf(`alertname=%q, group=%q, id="%d"`, ar.Name, group.Name, ar.ID()) diff --git a/app/vmalert/datasource/vm.go b/app/vmalert/datasource/vm.go index 6e92db2c17..f7a149c53a 100644 --- a/app/vmalert/datasource/vm.go +++ b/app/vmalert/datasource/vm.go @@ -81,7 +81,9 @@ type VMStorage struct { appendTypePrefix bool lookBack time.Duration queryStep time.Duration - dataSourceType Type + + dataSourceType Type + evaluationInterval time.Duration } const queryPath = "/api/v1/query" @@ -92,7 +94,8 @@ const graphitePrefix = "/graphite" // QuerierParams params for Querier. type QuerierParams struct { - DataSourceType *Type + DataSourceType *Type + EvaluationInterval time.Duration } // Clone makes clone of VMStorage, shares http client. @@ -113,6 +116,7 @@ func (s *VMStorage) Clone() *VMStorage { func (s *VMStorage) ApplyParams(params QuerierParams) *VMStorage { if params.DataSourceType != nil { s.dataSourceType = *params.DataSourceType + s.evaluationInterval = params.EvaluationInterval } return s } @@ -136,24 +140,27 @@ func NewVMStorage(baseURL, basicAuthUser, basicAuthPass string, lookBack time.Du } } -// Query reads metrics from datasource by given query and type +// Query executes the given query and returns parsed response func (s *VMStorage) Query(ctx context.Context, query string) ([]Metric, error) { - switch s.dataSourceType.name { - case "", prometheusType: - return s.queryDataSource(ctx, query, s.setPrometheusReqParams, parsePrometheusResponse) - case graphiteType: - return s.queryDataSource(ctx, query, s.setGraphiteReqParams, parseGraphiteResponse) - default: - return nil, fmt.Errorf("engine not found: %q", s.dataSourceType.name) + req, err := s.prepareReq(query, time.Now()) + if err != nil { + return nil, err } + + resp, err := s.do(ctx, req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + parseFn := parsePrometheusResponse + if s.dataSourceType.name != prometheusType { + parseFn = parseGraphiteResponse + } + return parseFn(req, resp) } -func (s *VMStorage) queryDataSource( - ctx context.Context, - query string, - setReqParams func(r *http.Request, query string), - processResponse func(r *http.Request, resp *http.Response, - ) ([]Metric, error)) ([]Metric, error) { +func (s *VMStorage) prepareReq(query string, timestamp time.Time) (*http.Request, error) { req, err := http.NewRequest("POST", s.datasourceURL, nil) if err != nil { return nil, err @@ -162,20 +169,32 @@ func (s *VMStorage) queryDataSource( if s.basicAuthPass != "" { req.SetBasicAuth(s.basicAuthUser, s.basicAuthPass) } - setReqParams(req, query) + + switch s.dataSourceType.name { + case "", prometheusType: + s.setPrometheusReqParams(req, query, timestamp) + case graphiteType: + s.setGraphiteReqParams(req, query, timestamp) + default: + return nil, fmt.Errorf("engine not found: %q", s.dataSourceType.name) + } + return req, nil +} + +func (s *VMStorage) do(ctx context.Context, req *http.Request) (*http.Response, error) { resp, err := s.c.Do(req.WithContext(ctx)) if err != nil { return nil, fmt.Errorf("error getting response from %s: %w", req.URL, err) } - defer func() { _ = resp.Body.Close() }() if resp.StatusCode != http.StatusOK { body, _ := ioutil.ReadAll(resp.Body) - return nil, fmt.Errorf("datasource returns unexpected response code %d for %s. Response body %s", resp.StatusCode, req.URL, body) + _ = resp.Body.Close() + return nil, fmt.Errorf("unexpected response code %d for %s. Response body %s", resp.StatusCode, req.URL, body) } - return processResponse(req, resp) + return resp, nil } -func (s *VMStorage) setPrometheusReqParams(r *http.Request, query string) { +func (s *VMStorage) setPrometheusReqParams(r *http.Request, query string, timestamp time.Time) { if s.appendTypePrefix { r.URL.Path += prometheusPrefix } @@ -183,16 +202,20 @@ func (s *VMStorage) setPrometheusReqParams(r *http.Request, query string) { q := r.URL.Query() q.Set("query", query) if s.lookBack > 0 { - lookBack := time.Now().Add(-s.lookBack) - q.Set("time", fmt.Sprintf("%d", lookBack.Unix())) + timestamp = timestamp.Add(-s.lookBack) } + if s.evaluationInterval > 0 { + // see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1232 + timestamp = timestamp.Truncate(s.evaluationInterval) + } + q.Set("time", fmt.Sprintf("%d", timestamp.Unix())) if s.queryStep > 0 { q.Set("step", s.queryStep.String()) } r.URL.RawQuery = q.Encode() } -func (s *VMStorage) setGraphiteReqParams(r *http.Request, query string) { +func (s *VMStorage) setGraphiteReqParams(r *http.Request, query string, timestamp time.Time) { if s.appendTypePrefix { r.URL.Path += graphitePrefix } @@ -202,7 +225,7 @@ func (s *VMStorage) setGraphiteReqParams(r *http.Request, query string) { q.Set("target", query) from := "-5min" if s.lookBack > 0 { - lookBack := time.Now().Add(-s.lookBack) + lookBack := timestamp.Add(-s.lookBack) from = strconv.FormatInt(lookBack.Unix(), 10) } q.Set("from", from) diff --git a/app/vmalert/datasource/vm_test.go b/app/vmalert/datasource/vm_test.go index 9ed26ef749..f9bad36bee 100644 --- a/app/vmalert/datasource/vm_test.go +++ b/app/vmalert/datasource/vm_test.go @@ -2,8 +2,10 @@ package datasource import ( "context" + "fmt" "net/http" "net/http/httptest" + "reflect" "strconv" "testing" "time" @@ -69,26 +71,31 @@ func TestVMSelectQuery(t *testing.T) { srv := httptest.NewServer(mux) defer srv.Close() - am := NewVMStorage(srv.URL, basicAuthName, basicAuthPass, time.Minute, 0, false, srv.Client()) - if _, err := am.Query(ctx, query); err == nil { + + s := NewVMStorage(srv.URL, basicAuthName, basicAuthPass, time.Minute, 0, false, srv.Client()) + + p := NewPrometheusType() + pq := s.BuildWithParams(QuerierParams{DataSourceType: &p, EvaluationInterval: 15 * time.Second}) + + if _, err := pq.Query(ctx, query); err == nil { t.Fatalf("expected connection error got nil") } - if _, err := am.Query(ctx, query); err == nil { + if _, err := pq.Query(ctx, query); err == nil { t.Fatalf("expected invalid response status error got nil") } - if _, err := am.Query(ctx, query); err == nil { + if _, err := pq.Query(ctx, query); err == nil { t.Fatalf("expected response body error got nil") } - if _, err := am.Query(ctx, query); err == nil { + if _, err := pq.Query(ctx, query); err == nil { t.Fatalf("expected error status got nil") } - if _, err := am.Query(ctx, query); err == nil { + if _, err := pq.Query(ctx, query); err == nil { t.Fatalf("expected unknown status got nil") } - if _, err := am.Query(ctx, query); err == nil { + if _, err := pq.Query(ctx, query); err == nil { t.Fatalf("expected non-vector resultType error got nil") } - m, err := am.Query(ctx, query) + m, err := pq.Query(ctx, query) if err != nil { t.Fatalf("unexpected %s", err) } @@ -100,16 +107,14 @@ func TestVMSelectQuery(t *testing.T) { Timestamp: 1583786142, Value: 13763, } - if m[0].Timestamp != expected.Timestamp && - m[0].Value != expected.Value && - m[0].Labels[0].Value != expected.Labels[0].Value && - m[0].Labels[0].Name != expected.Labels[0].Name { + if !reflect.DeepEqual(m[0], expected) { t.Fatalf("unexpected metric %+v want %+v", m[0], expected) } - dst := NewGraphiteType() - q := am.BuildWithParams(QuerierParams{&dst}) - m, err = q.Query(ctx, queryRender) + g := NewGraphiteType() + gq := s.BuildWithParams(QuerierParams{DataSourceType: &g}) + + m, err = gq.Query(ctx, queryRender) if err != nil { t.Fatalf("unexpected %s", err) } @@ -121,10 +126,137 @@ func TestVMSelectQuery(t *testing.T) { Timestamp: 1611758403, Value: 10, } - if m[0].Timestamp != expected.Timestamp && - m[0].Value != expected.Value && - m[0].Labels[0].Value != expected.Labels[0].Value && - m[0].Labels[0].Name != expected.Labels[0].Name { + if !reflect.DeepEqual(m[0], expected) { t.Fatalf("unexpected metric %+v want %+v", m[0], expected) } } + +func TestPrepareReq(t *testing.T) { + query := "up" + timestamp := time.Date(2001, 2, 3, 4, 5, 6, 0, time.UTC) + testCases := []struct { + name string + vm *VMStorage + checkFn func(t *testing.T, r *http.Request) + }{ + { + "prometheus path", + &VMStorage{ + dataSourceType: NewPrometheusType(), + }, + func(t *testing.T, r *http.Request) { + checkEqualString(t, queryPath, r.URL.Path) + }, + }, + { + "prometheus prefix", + &VMStorage{ + dataSourceType: NewPrometheusType(), + appendTypePrefix: true, + }, + func(t *testing.T, r *http.Request) { + checkEqualString(t, prometheusPrefix+queryPath, r.URL.Path) + }, + }, + { + "graphite path", + &VMStorage{ + dataSourceType: NewGraphiteType(), + }, + func(t *testing.T, r *http.Request) { + checkEqualString(t, graphitePath, r.URL.Path) + }, + }, + { + "graphite prefix", + &VMStorage{ + dataSourceType: NewGraphiteType(), + appendTypePrefix: true, + }, + func(t *testing.T, r *http.Request) { + checkEqualString(t, graphitePrefix+graphitePath, r.URL.Path) + }, + }, + { + "default params", + &VMStorage{}, + func(t *testing.T, r *http.Request) { + exp := fmt.Sprintf("query=%s&time=%d", query, timestamp.Unix()) + checkEqualString(t, exp, r.URL.RawQuery) + }, + }, + { + "basic auth", + &VMStorage{ + basicAuthUser: "foo", + basicAuthPass: "bar", + }, + func(t *testing.T, r *http.Request) { + u, p, _ := r.BasicAuth() + checkEqualString(t, "foo", u) + checkEqualString(t, "bar", p) + }, + }, + { + "lookback", + &VMStorage{ + lookBack: time.Minute, + }, + func(t *testing.T, r *http.Request) { + exp := fmt.Sprintf("query=%s&time=%d", query, timestamp.Add(-time.Minute).Unix()) + checkEqualString(t, exp, r.URL.RawQuery) + }, + }, + { + "evaluation interval", + &VMStorage{ + evaluationInterval: 15 * time.Second, + }, + func(t *testing.T, r *http.Request) { + tt := timestamp.Truncate(15 * time.Second) + exp := fmt.Sprintf("query=%s&time=%d", query, tt.Unix()) + checkEqualString(t, exp, r.URL.RawQuery) + }, + }, + { + "lookback + evaluation interval", + &VMStorage{ + lookBack: time.Minute, + evaluationInterval: 15 * time.Second, + }, + func(t *testing.T, r *http.Request) { + tt := timestamp.Add(-time.Minute) + tt = tt.Truncate(15 * time.Second) + exp := fmt.Sprintf("query=%s&time=%d", query, tt.Unix()) + checkEqualString(t, exp, r.URL.RawQuery) + }, + }, + { + "step", + &VMStorage{ + queryStep: time.Minute, + }, + func(t *testing.T, r *http.Request) { + exp := fmt.Sprintf("query=%s&step=%v&time=%d", query, time.Minute, timestamp.Unix()) + checkEqualString(t, exp, r.URL.RawQuery) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + req, err := tc.vm.prepareReq(query, timestamp) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + tc.checkFn(t, req) + }) + } +} + +func checkEqualString(t *testing.T, exp, got string) { + t.Helper() + if got != exp { + t.Errorf("expected to get %q; got %q", exp, got) + } +} diff --git a/app/vmalert/recording.go b/app/vmalert/recording.go index 698c19b86b..fbdeb7d59a 100644 --- a/app/vmalert/recording.go +++ b/app/vmalert/recording.go @@ -63,7 +63,10 @@ func newRecordingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rul Labels: cfg.Labels, GroupID: group.ID(), metrics: &recordingRuleMetrics{}, - q: qb.BuildWithParams(datasource.QuerierParams{DataSourceType: &cfg.Type}), + q: qb.BuildWithParams(datasource.QuerierParams{ + DataSourceType: &cfg.Type, + EvaluationInterval: group.Interval, + }), } labels := fmt.Sprintf(`recording=%q, group=%q, id="%d"`, rr.Name, group.Name, rr.ID()) From 4394dc6cbb957866d34c5838107501ef744bfc8a Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 30 Apr 2021 09:54:36 +0300 Subject: [PATCH 15/16] docs/CHANGELOG.md: document the change from f3a048288e22d8931f12748982ea2a03c37e30e4 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1232 --- app/vmalert/datasource/vm.go | 4 +++- docs/CHANGELOG.md | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/app/vmalert/datasource/vm.go b/app/vmalert/datasource/vm.go index f7a149c53a..defab53543 100644 --- a/app/vmalert/datasource/vm.go +++ b/app/vmalert/datasource/vm.go @@ -151,7 +151,9 @@ func (s *VMStorage) Query(ctx context.Context, query string) ([]Metric, error) { if err != nil { return nil, err } - defer resp.Body.Close() + defer func() { + _ = resp.Body.Close() + }() parseFn := parsePrometheusResponse if s.dataSourceType.name != prometheusType { diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index b1603ead8d..162ca7c719 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -11,6 +11,7 @@ sort: 15 Thanks to @johnseekins! * FEATURE: improved new time series registration speed on systems with many CPU cores. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1244). Thanks to @waldoweng for the idea and [draft implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1243). * FEATURE: vmagent: list user-visible endpoints at `http://vmagent:8429/`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1251). +* FEATURE: vmalert: use the same technique as Grafana for determining evaluation timestamps for recording rules. This should make consistent graphs for series generated by recording rules compared to graphs generated for queries from recording rules in Grafana. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1232). * BUGFIX: vmagent: properly update `role: endpoints` and `role: endpointslices` scrape targets if the underlying service changes in `kubernetes_sd_config`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240). * BUGFIX: vmagent: apply `scrape_timeout` on receiving the first response byte from `stream_parse: true` scrape targets. Previously it was applied to receiving and *processing* the full response stream. This could result in false timeout errors when scrape target exposes millions of metrics as described [here](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1017#issuecomment-767235047). From afca7b430ce7f78449c357660806045476670a84 Mon Sep 17 00:00:00 2001 From: Roman Khavronenko Date: Fri, 30 Apr 2021 08:01:05 +0100 Subject: [PATCH 16/16] vmalert: use rule's `evaluationInterval` as `step` param by default (#1258) User still can override param by specifying `datasource.queryStep` flag. --- app/vmalert/datasource/init.go | 3 ++- app/vmalert/datasource/vm.go | 4 ++++ app/vmalert/datasource/vm_test.go | 12 +++++++----- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/app/vmalert/datasource/init.go b/app/vmalert/datasource/init.go index 7d27b9d55e..62603f749b 100644 --- a/app/vmalert/datasource/init.go +++ b/app/vmalert/datasource/init.go @@ -23,7 +23,8 @@ var ( lookBack = flag.Duration("datasource.lookback", 0, `Lookback defines how far into the past to look when evaluating queries. For example, if the datasource.lookback=5m then param "time" with value now()-5m will be added to every query.`) queryStep = flag.Duration("datasource.queryStep", 0, "queryStep defines how far a value can fallback to when evaluating queries. "+ - "For example, if datasource.queryStep=15s then param \"step\" with value \"15s\" will be added to every query.") + "For example, if datasource.queryStep=15s then param \"step\" with value \"15s\" will be added to every query."+ + "If queryStep isn't specified, rule's evaluationInterval will be used instead.") maxIdleConnections = flag.Int("datasource.maxIdleConnections", 100, `Defines the number of idle (keep-alive connections) to each configured datasource. Consider setting this value equal to the value: groups_total * group.concurrency. Too low a value may result in a high number of sockets in TIME_WAIT state.`) ) diff --git a/app/vmalert/datasource/vm.go b/app/vmalert/datasource/vm.go index defab53543..2c23a06dcb 100644 --- a/app/vmalert/datasource/vm.go +++ b/app/vmalert/datasource/vm.go @@ -209,9 +209,13 @@ func (s *VMStorage) setPrometheusReqParams(r *http.Request, query string, timest if s.evaluationInterval > 0 { // see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1232 timestamp = timestamp.Truncate(s.evaluationInterval) + // set step as evaluationInterval by default + q.Set("step", s.evaluationInterval.String()) } q.Set("time", fmt.Sprintf("%d", timestamp.Unix())) + if s.queryStep > 0 { + // override step with user-specified value q.Set("step", s.queryStep.String()) } r.URL.RawQuery = q.Encode() diff --git a/app/vmalert/datasource/vm_test.go b/app/vmalert/datasource/vm_test.go index f9bad36bee..db030cd5ac 100644 --- a/app/vmalert/datasource/vm_test.go +++ b/app/vmalert/datasource/vm_test.go @@ -213,8 +213,9 @@ func TestPrepareReq(t *testing.T) { evaluationInterval: 15 * time.Second, }, func(t *testing.T, r *http.Request) { - tt := timestamp.Truncate(15 * time.Second) - exp := fmt.Sprintf("query=%s&time=%d", query, tt.Unix()) + evalInterval := 15 * time.Second + tt := timestamp.Truncate(evalInterval) + exp := fmt.Sprintf("query=%s&step=%v&time=%d", query, evalInterval, tt.Unix()) checkEqualString(t, exp, r.URL.RawQuery) }, }, @@ -225,14 +226,15 @@ func TestPrepareReq(t *testing.T) { evaluationInterval: 15 * time.Second, }, func(t *testing.T, r *http.Request) { + evalInterval := 15 * time.Second tt := timestamp.Add(-time.Minute) - tt = tt.Truncate(15 * time.Second) - exp := fmt.Sprintf("query=%s&time=%d", query, tt.Unix()) + tt = tt.Truncate(evalInterval) + exp := fmt.Sprintf("query=%s&step=%v&time=%d", query, evalInterval, tt.Unix()) checkEqualString(t, exp, r.URL.RawQuery) }, }, { - "step", + "step override", &VMStorage{ queryStep: time.Minute, },