diff --git a/app/vmctl/vm_native_test.go b/app/vmctl/vm_native_test.go index 874f8774f..74f57e3bf 100644 --- a/app/vmctl/vm_native_test.go +++ b/app/vmctl/vm_native_test.go @@ -4,7 +4,6 @@ import ( "context" "flag" "fmt" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool" "log" "net/http" "os" @@ -12,6 +11,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/native" remote_read_integration "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/testdata/servers_integration_test" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm" @@ -251,7 +251,7 @@ func deleteSeries(name, value string) (int, error) { if err := tfs.Add([]byte(name), []byte(value), false, true); err != nil { return 0, fmt.Errorf("unexpected error in TagFilters.Add: %w", err) } - return vmstorage.DeleteSeries(nil, []*storage.TagFilters{tfs}) + return vmstorage.DeleteSeries(nil, []*storage.TagFilters{tfs}, 1e3) } func TestBuildMatchWithFilter_Failure(t *testing.T) { diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 3492ac37d..7c6733bf4 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -765,7 +765,7 @@ func putSortBlocksHeap(sbh *sortBlocksHeap) { var sbhPool sync.Pool -// DeleteSeries deletes time series matching the given tagFilterss. +// DeleteSeries deletes time series matching the given search query. func DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline searchutils.Deadline) (int, error) { qt = qt.NewChild("delete series: %s", sq) defer qt.Done() @@ -774,7 +774,7 @@ func DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear if err != nil { return 0, err } - return vmstorage.DeleteSeries(qt, tfss) + return vmstorage.DeleteSeries(qt, tfss, sq.MaxMetrics) } // LabelNames returns label names matching the given sq until the given deadline. diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index cca6113df..413a58d4d 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -3,7 +3,6 @@ package prometheus import ( "flag" "fmt" - "github.com/VictoriaMetrics/metricsql" "math" "net/http" "runtime" @@ -27,6 +26,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/metrics" + "github.com/VictoriaMetrics/metricsql" "github.com/valyala/fastjson/fastfloat" ) @@ -52,6 +52,7 @@ var ( maxExportSeries = flag.Int("search.maxExportSeries", 10e6, "The maximum number of time series, which can be returned from /api/v1/export* APIs. This option allows limiting memory usage") maxTSDBStatusSeries = flag.Int("search.maxTSDBStatusSeries", 10e6, "The maximum number of time series, which can be processed during the call to /api/v1/status/tsdb. This option allows limiting memory usage") maxSeriesLimit = flag.Int("search.maxSeries", 30e3, "The maximum number of time series, which can be returned from /api/v1/series. This option allows limiting memory usage") + maxDeleteSeries = flag.Int("search.maxDeleteSeries", 30e3, "The maximum number of time series, which can be deleted using /api/v1/admin/tsdb/delete_series. This option allows limiting memory usage") maxLabelsAPISeries = flag.Int("search.maxLabelsAPISeries", 1e6, "The maximum number of time series, which could be scanned when searching for the matching time series "+ "at /api/v1/labels and /api/v1/label/.../values. This option allows limiting memory usage and CPU usage. See also -search.maxLabelsAPIDuration, "+ "-search.maxTagKeys, -search.maxTagValues and -search.ignoreExtraFiltersAtLabelsAPI") @@ -479,7 +480,7 @@ func DeleteHandler(startTime time.Time, r *http.Request) error { if !cp.IsDefaultTimeRange() { return fmt.Errorf("start=%d and end=%d args aren't supported. Remove these args from the query in order to delete all the matching metrics", cp.start, cp.end) } - sq := storage.NewSearchQuery(cp.start, cp.end, cp.filterss, 0) + sq := storage.NewSearchQuery(cp.start, cp.end, cp.filterss, *maxDeleteSeries) deletedCount, err := netstorage.DeleteSeries(nil, sq, cp.deadline) if err != nil { return fmt.Errorf("cannot delete time series: %w", err) diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index a4d7c4f97..ad471899d 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -171,9 +171,9 @@ func RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow) { // DeleteSeries deletes series matching tfss. // // Returns the number of deleted series. -func DeleteSeries(qt *querytracer.Tracer, tfss []*storage.TagFilters) (int, error) { +func DeleteSeries(qt *querytracer.Tracer, tfss []*storage.TagFilters, maxMetrics int) (int, error) { WG.Add(1) - n, err := Storage.DeleteSeries(qt, tfss) + n, err := Storage.DeleteSeries(qt, tfss, maxMetrics) WG.Done() return n, err } diff --git a/docs/README.md b/docs/README.md index f0e2e1af1..91168c8ee 100644 --- a/docs/README.md +++ b/docs/README.md @@ -1184,7 +1184,10 @@ In this case [forced merge](#forced-merge) may help freeing up storage space. It is recommended verifying which metrics will be deleted with the call to `http://:8428/api/v1/series?match[]=` before actually deleting the metrics. By default, this query will only scan series in the past 5 minutes, so you may need to -adjust `start` and `end` to a suitable range to achieve match hits. +adjust `start` and `end` to a suitable range to achieve match hits. Also, if the +number of returned time series is rather big you will need to set +`-search.maxDeleteSeries` flag (see +[Resource usage limits](#resource-usage-limits)). The `/api/v1/admin/tsdb/delete_series` handler may be protected with `authKey` if `-deleteAuthKey` command-line flag is set. Note that handler accepts any HTTP method, so sending a `GET` request to `/api/v1/admin/tsdb/delete_series` will result in deletion of time series. @@ -1721,6 +1724,13 @@ By default, VictoriaMetrics is tuned for an optimal resource usage under typical of CPU time and memory when the database contains big number of unique time series because of [high churn rate](https://docs.victoriametrics.com/faq/#what-is-high-churn-rate). In this case it might be useful to set the `-search.maxSeries` to quite low value in order limit CPU and memory usage. See also `-search.maxLabelsAPIDuration` and `-search.maxLabelsAPISeries`. +- `-search.maxDeleteSeries` limits the number of unique time series that can be + deleted by a single + [/api/v1/admin/tsdb/delete_series](https://docs.victoriametrics.com/url-examples/#apiv1admintsdbdelete_series) + call. Deleting too many time series may require big amount of CPU and memory + and this limit guards against unplanned resource usage spikes. Also see + [How to delete time series](#how-to-delete-time-series) section to learn about + different ways of deleting series. - `-search.maxTagKeys` limits the number of items, which may be returned from [/api/v1/labels](https://docs.victoriametrics.com/url-examples/#apiv1labels). This endpoint is used mostly by Grafana for auto-completion of label names. Queries to this endpoint may take big amounts of CPU time and memory when the database contains big number of unique time series because of [high churn rate](https://docs.victoriametrics.com/faq/#what-is-high-churn-rate). diff --git a/docs/changelog/CHANGELOG.md b/docs/changelog/CHANGELOG.md index c8808af14..196a5ae1d 100644 --- a/docs/changelog/CHANGELOG.md +++ b/docs/changelog/CHANGELOG.md @@ -52,6 +52,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/metricsql/): consistently return the last non-`NaN` value from [`range_last`](https://docs.victoriametrics.com/metricsql/#range_last) function across all the returned data points. Previously `NaN` data points weren't replaced with the last non-`NaN` value. * BUGFIX: all VictoriaMetrics components: increase default value of `-loggerMaxArgLen` cmd-line flag from 1000 to 5000. This should improve visibility on errors produced by very long queries. * BUGFIX: all VictoriaMetrics components: trim trailing spaces when reading content from `*.passwordFile` and similar flags. It reverts changes introduced at [v1.102.0-rc2](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.102.0-rc2) release. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6986) for details. +* BUGFIX: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): introduce the `-search.maxDeleteSeries` command line flag to limit the number of series that can be deleted by a single `/api/v1/admin/tsdb/delete_series` call. Previously, one could delete any number of series and if this number was big (millions) the deletion could result in OOM. See this [issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7027) for details. ## [v1.103.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.103.0) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index d27e92ea1..1eeb1e680 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -1564,12 +1564,14 @@ func (db *indexDB) searchMetricNameWithCache(dst []byte, metricID uint64) ([]byt return dst, false } -// DeleteTSIDs marks as deleted all the TSIDs matching the given tfss. +// DeleteTSIDs marks as deleted all the TSIDs matching the given tfss and +// updates or resets all caches where TSIDs and the corresponding MetricIDs may +// be stored. // -// The caller must reset all the caches which may contain the deleted TSIDs. -// -// Returns the number of metrics deleted. -func (db *indexDB) DeleteTSIDs(qt *querytracer.Tracer, tfss []*TagFilters) (int, error) { +// If the number of the series exceeds maxMetrics, no series will be deleted and +// an error will be returned. Otherwise, the funciton returns the number of +// series deleted. +func (db *indexDB) DeleteTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, maxMetrics int) (int, error) { qt = qt.NewChild("deleting series for %s", tfss) defer qt.Done() if len(tfss) == 0 { @@ -1582,7 +1584,7 @@ func (db *indexDB) DeleteTSIDs(qt *querytracer.Tracer, tfss []*TagFilters) (int, MaxTimestamp: (1 << 63) - 1, } is := db.getIndexSearch(noDeadline) - metricIDs, err := is.searchMetricIDs(qt, tfss, tr, 2e9) + metricIDs, err := is.searchMetricIDs(qt, tfss, tr, maxMetrics) db.putIndexSearch(is) if err != nil { return 0, err @@ -1594,7 +1596,7 @@ func (db *indexDB) DeleteTSIDs(qt *querytracer.Tracer, tfss []*TagFilters) (int, db.doExtDB(func(extDB *indexDB) { var n int qtChild := qt.NewChild("deleting series from the previos indexdb") - n, err = extDB.DeleteTSIDs(qtChild, tfss) + n, err = extDB.DeleteTSIDs(qtChild, tfss, maxMetrics) qtChild.Donef("deleted %d series", n) deletedCount += n }) diff --git a/lib/storage/storage.go b/lib/storage/storage.go index bf0e3a233..46362a38a 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -1268,11 +1268,13 @@ func (s *Storage) prefetchMetricNames(qt *querytracer.Tracer, srcMetricIDs []uin // ErrDeadlineExceeded is returned when the request times out. var ErrDeadlineExceeded = fmt.Errorf("deadline exceeded") -// DeleteSeries deletes all the series matching the given tfss. +// DeleteSeries deletes the series matching the given tfss. // -// Returns the number of metrics deleted. -func (s *Storage) DeleteSeries(qt *querytracer.Tracer, tfss []*TagFilters) (int, error) { - deletedCount, err := s.idb().DeleteTSIDs(qt, tfss) +// If the number of the series exceeds maxMetrics, no series will be deleted and +// an error will be returned. Otherwise, the funciton returns the number of +// metrics deleted. +func (s *Storage) DeleteSeries(qt *querytracer.Tracer, tfss []*TagFilters, maxMetrics int) (int, error) { + deletedCount, err := s.idb().DeleteTSIDs(qt, tfss, maxMetrics) if err != nil { return deletedCount, fmt.Errorf("cannot delete tsids: %w", err) } diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index 8ccccc4ee..4396fcb43 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -735,7 +735,7 @@ func testStorageDeleteSeries(s *Storage, workerNum int) error { if n := metricBlocksCount(tfs); n == 0 { return fmt.Errorf("expecting non-zero number of metric blocks for tfs=%s", tfs) } - deletedCount, err := s.DeleteSeries(nil, []*TagFilters{tfs}) + deletedCount, err := s.DeleteSeries(nil, []*TagFilters{tfs}, 1e9) if err != nil { return fmt.Errorf("cannot delete metrics: %w", err) } @@ -747,7 +747,7 @@ func testStorageDeleteSeries(s *Storage, workerNum int) error { } // Try deleting empty tfss - deletedCount, err = s.DeleteSeries(nil, nil) + deletedCount, err = s.DeleteSeries(nil, nil, 1e9) if err != nil { return fmt.Errorf("cannot delete empty tfss: %w", err) } @@ -795,6 +795,120 @@ func checkLabelNames(lns []string, lnsExpected map[string]bool) error { return nil } +func TestStorageDeleteSeries_TooManyTimeseries(t *testing.T) { + defer testRemoveAll(t) + + const numSeries = 1000 + rng := rand.New(rand.NewSource(1)) + mrs := testGenerateMetricRowsWithPrefix(rng, numSeries, "metric", TimeRange{ + MinTimestamp: time.Now().Add(-100 * 24 * time.Hour).UnixMilli(), + MaxTimestamp: time.Now().UnixMilli(), + }) + + s := MustOpenStorage(t.Name(), 0, 0, 0) + defer s.MustClose() + s.AddRows(mrs, defaultPrecisionBits) + s.DebugFlush() + + tfs := NewTagFilters() + if err := tfs.Add(nil, []byte("metric.*"), false, true); err != nil { + t.Fatalf("unexpected error in TagFilters.Add: %v", err) + } + maxSeries := numSeries - 1 + count, err := s.DeleteSeries(nil, []*TagFilters{tfs}, maxSeries) + if err == nil { + t.Errorf("expected an error but there hasn't been one") + } + if count != 0 { + t.Errorf("unexpected deleted series count: got %d, want 0", count) + } +} + +func TestStorageDeleteSeries_CachesAreUpdatedOrReset(t *testing.T) { + defer testRemoveAll(t) + + tr := TimeRange{ + MinTimestamp: time.Now().Add(-100 * 24 * time.Hour).UnixMilli(), + MaxTimestamp: time.Now().UnixMilli(), + } + mn := MetricName{MetricGroup: []byte("metric")} + mr := MetricRow{ + MetricNameRaw: mn.marshalRaw(nil), + Timestamp: tr.MaxTimestamp, + Value: 123, + } + var ( + genTSID generationTSID + tfssKey []byte + ) + tfs := NewTagFilters() + if err := tfs.Add(nil, []byte("metric.*"), false, true); err != nil { + t.Fatalf("unexpected error in TagFilters.Add: %v", err) + } + tfss := []*TagFilters{tfs} + s := MustOpenStorage(t.Name(), 0, 0, 0) + defer s.MustClose() + + // Ensure caches are empty. + if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) { + t.Fatalf("tsidCache unexpected contents: got %v, want empty", genTSID) + } + tfssKey = marshalTagFiltersKey(nil, tfss, tr, true) + if got, ok := s.idb().getMetricIDsFromTagFiltersCache(nil, tfssKey); ok { + t.Fatalf("tagFiltersToMetricIDsCache unexpected contents: got %v, want empty", got) + } + if got := s.getDeletedMetricIDs().Len(); got != 0 { + t.Fatalf("deletedMetricIDs cache: unexpected size: got %d, want empty", got) + } + + // Add one row, search it, and ensure that the tsidCache and + // tagFiltersToMetricIDsCache are not empty but the deletedMetricIDs + // cache is still empty. + s.AddRows([]MetricRow{mr}, defaultPrecisionBits) + s.DebugFlush() + gotMetrics, err := s.SearchMetricNames(nil, tfss, tr, 1, noDeadline) + if err != nil { + t.Fatalf("SearchMetricNames() failed unexpectedly: %v", err) + } + wantMetrics := []string{string(mr.MetricNameRaw)} + if reflect.DeepEqual(gotMetrics, wantMetrics) { + t.Fatalf("SearchMetricNames() unexpected search result: got %v, want %v", gotMetrics, wantMetrics) + } + + if !s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) { + t.Fatalf("tsidCache was expected to contain a record but it did not") + } + metricID := genTSID.TSID.MetricID + tfssKey = marshalTagFiltersKey(nil, tfss, tr, true) + if _, ok := s.idb().getMetricIDsFromTagFiltersCache(nil, tfssKey); !ok { + t.Fatalf("tagFiltersToMetricIDsCache was expected to contain a record but it did not") + } + if got := s.getDeletedMetricIDs().Len(); got != 0 { + t.Fatalf("deletedMetricIDs cache unexpected size: got %d, want empty", got) + } + + // Delete the metric added earlier and ensure that the tsidCache and + // tagFiltersToMetricIDsCache have been reset and the deletedMetricIDs + // cache is now contains ID of the deleted metric. + numDeletedSeries, err := s.DeleteSeries(nil, tfss, 1) + if err != nil { + t.Fatalf("DeleteSeries() failed unexpectedly: %v", err) + } + if got, want := numDeletedSeries, 1; got != want { + t.Fatalf("unexpected number of deleted series, got %d, want %d", got, want) + } + if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) { + t.Fatalf("tsidCache unexpected contents: got %v, want empty", genTSID) + } + tfssKey = marshalTagFiltersKey(nil, tfss, tr, true) + if got, ok := s.idb().getMetricIDsFromTagFiltersCache(nil, tfssKey); ok { + t.Fatalf("tagFiltersToMetricIDsCache unexpected contents: got %v, want empty", got) + } + if got, want := s.getDeletedMetricIDs().AppendTo(nil), []uint64{metricID}; !reflect.DeepEqual(got, want) { + t.Fatalf("deletedMetricIDs cache: unexpected contents: got %v, want %v", got, want) + } +} + func TestStorageRegisterMetricNamesSerial(t *testing.T) { path := "TestStorageRegisterMetricNamesSerial" s := MustOpenStorage(path, 0, 0, 0) @@ -1425,7 +1539,7 @@ func testCountAllMetricNames(s *Storage, tr TimeRange) int { } names, err := s.SearchMetricNames(nil, []*TagFilters{tfsAll}, tr, 1e9, noDeadline) if err != nil { - panic(fmt.Sprintf("SeachMetricNames() failed unexpectedly: %v", err)) + panic(fmt.Sprintf("SearchMetricNames() failed unexpectedly: %v", err)) } return len(names) }