From 91c2b5b24d04b036dde50296ae3f95e8e76d12c9 Mon Sep 17 00:00:00 2001 From: Artem Fetishev <149964189+rtm0@users.noreply.github.com> Date: Mon, 30 Sep 2024 12:43:11 +0200 Subject: [PATCH] Introduce a flag for limiting the number of time series to delete (cluster version) (#7112) ### Describe Your Changes Introduce the `-search.maxDeleteSeries` flag that limits the number of time series that can be deleted with a single `/api/v1/admin/tsdb/delete_series` call. Currently, any number can be deleted and if the number is big (millions) then the operation may result in unaccounted CPU and memory usage spikes which in some cases may result in OOM kill (see #7027). The flag limits the number to 30k by default and the users may override it if needed at the vmstorage start time. Related issue: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7027 --------- Signed-off-by: Artem Fetishev --- app/vmselect/prometheus/prometheus.go | 5 +- app/vmstorage/servers/vmselect.go | 2 +- docs/Cluster-VictoriaMetrics.md | 7 ++ lib/storage/index_db.go | 16 ++-- lib/storage/storage.go | 10 ++- lib/storage/storage_test.go | 120 +++++++++++++++++++++++++- 6 files changed, 144 insertions(+), 16 deletions(-) diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index 3405ac81ef..5e761a69b7 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -3,7 +3,6 @@ package prometheus import ( "flag" "fmt" - "github.com/VictoriaMetrics/metricsql" "math" "net" "net/http" @@ -30,6 +29,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/metrics" + "github.com/VictoriaMetrics/metricsql" "github.com/valyala/fastjson/fastfloat" ) @@ -56,6 +56,7 @@ var ( maxExportSeries = flag.Int("search.maxExportSeries", 10e6, "The maximum number of time series, which can be returned from /api/v1/export* APIs. This option allows limiting memory usage") maxTSDBStatusSeries = flag.Int("search.maxTSDBStatusSeries", 10e6, "The maximum number of time series, which can be processed during the call to /api/v1/status/tsdb. This option allows limiting memory usage") maxSeriesLimit = flag.Int("search.maxSeries", 30e3, "The maximum number of time series, which can be returned from /api/v1/series. This option allows limiting memory usage") + maxDeleteSeries = flag.Int("search.maxDeleteSeries", 1e6, "The maximum number of time series, which can be deleted using /api/v1/admin/tsdb/delete_series. This option allows limiting memory usage") maxLabelsAPISeries = flag.Int("search.maxLabelsAPISeries", 1e6, "The maximum number of time series, which could be scanned when searching for the matching time series "+ "at /api/v1/labels and /api/v1/label/.../values. This option allows limiting memory usage and CPU usage. See also -search.maxLabelsAPIDuration, "+ "-search.maxTagKeys, -search.maxTagValues and -search.ignoreExtraFiltersAtLabelsAPI") @@ -493,7 +494,7 @@ func DeleteHandler(startTime time.Time, at *auth.Token, r *http.Request) error { if !cp.IsDefaultTimeRange() { return fmt.Errorf("start=%d and end=%d args aren't supported. Remove these args from the query in order to delete all the matching metrics", cp.start, cp.end) } - sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, cp.start, cp.end, cp.filterss, 0) + sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, cp.start, cp.end, cp.filterss, *maxDeleteSeries) deletedCount, err := netstorage.DeleteSeries(nil, sq, cp.deadline) if err != nil { return fmt.Errorf("cannot delete time series: %w", err) diff --git a/app/vmstorage/servers/vmselect.go b/app/vmstorage/servers/vmselect.go index ba119eeedf..5a30851a17 100644 --- a/app/vmstorage/servers/vmselect.go +++ b/app/vmstorage/servers/vmselect.go @@ -154,7 +154,7 @@ func (api *vmstorageAPI) DeleteSeries(qt *querytracer.Tracer, sq *storage.Search if len(tfss) == 0 { return 0, fmt.Errorf("missing tag filters") } - return api.s.DeleteSeries(qt, tfss) + return api.s.DeleteSeries(qt, tfss, maxMetrics) } func (api *vmstorageAPI) RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, _ uint64) error { diff --git a/docs/Cluster-VictoriaMetrics.md b/docs/Cluster-VictoriaMetrics.md index e843d051f3..50707cbbc2 100644 --- a/docs/Cluster-VictoriaMetrics.md +++ b/docs/Cluster-VictoriaMetrics.md @@ -706,6 +706,13 @@ Some workloads may need fine-grained resource usage limits. In these cases the f Queries to this endpoint may take big amounts of CPU time and memory at `vmstorage` and `vmselect` when the database contains big number of unique time series because of [high churn rate](https://docs.victoriametrics.com/faq/#what-is-high-churn-rate). In this case it might be useful to set the `-search.maxSeries` to quite low value in order limit CPU and memory usage. +- `-search.maxDeleteSeries` at `vmselect` limits the number of unique time + series that can be deleted by a single + [/api/v1/admin/tsdb/delete_series](https://docs.victoriametrics.com/url-examples/#apiv1admintsdbdelete_series) + call. Deleting too many time series may require big amount of CPU and memory + at `vmstorage` and this limit guards against unplanned resource usage spikes. + Also see [How to delete time series](#how-to-delete-time-series) section to + learn about different ways of deleting series. - `-search.maxTagKeys` at `vmstorage` limits the number of items, which may be returned from [/api/v1/labels](https://docs.victoriametrics.com/url-examples/#apiv1labels). This endpoint is used mostly by Grafana for auto-completion of label names. Queries to this endpoint may take big amounts of CPU time and memory at `vmstorage` and `vmselect` diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 25c1b400f2..819206df87 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -1716,12 +1716,14 @@ func (db *indexDB) searchMetricNameWithCache(dst []byte, metricID uint64, accoun return dst, false } -// DeleteTSIDs marks as deleted all the TSIDs matching the given tfss. +// DeleteTSIDs marks as deleted all the TSIDs matching the given tfss and +// updates or resets all caches where TSIDs and the corresponding MetricIDs may +// be stored. // -// The caller must reset all the caches which may contain the deleted TSIDs. -// -// Returns the number of metrics deleted. -func (db *indexDB) DeleteTSIDs(qt *querytracer.Tracer, tfss []*TagFilters) (int, error) { +// If the number of the series exceeds maxMetrics, no series will be deleted and +// an error will be returned. Otherwise, the funciton returns the number of +// series deleted. +func (db *indexDB) DeleteTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, maxMetrics int) (int, error) { qt = qt.NewChild("deleting series for %s", tfss) defer qt.Done() if len(tfss) == 0 { @@ -1734,7 +1736,7 @@ func (db *indexDB) DeleteTSIDs(qt *querytracer.Tracer, tfss []*TagFilters) (int, MaxTimestamp: (1 << 63) - 1, } is := db.getIndexSearch(tfss[0].accountID, tfss[0].projectID, noDeadline) - metricIDs, err := is.searchMetricIDs(qt, tfss, tr, 2e9) + metricIDs, err := is.searchMetricIDs(qt, tfss, tr, maxMetrics) db.putIndexSearch(is) if err != nil { return 0, err @@ -1746,7 +1748,7 @@ func (db *indexDB) DeleteTSIDs(qt *querytracer.Tracer, tfss []*TagFilters) (int, db.doExtDB(func(extDB *indexDB) { var n int qtChild := qt.NewChild("deleting series from the previos indexdb") - n, err = extDB.DeleteTSIDs(qtChild, tfss) + n, err = extDB.DeleteTSIDs(qtChild, tfss, maxMetrics) qtChild.Donef("deleted %d series", n) deletedCount += n }) diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 8d0ed4833c..543d5b89bf 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -1335,11 +1335,13 @@ func (s *Storage) prefetchMetricNames(qt *querytracer.Tracer, accountID, project // ErrDeadlineExceeded is returned when the request times out. var ErrDeadlineExceeded = fmt.Errorf("deadline exceeded") -// DeleteSeries deletes all the series matching the given tfss. +// DeleteSeries deletes the series matching the given tfss. // -// Returns the number of metrics deleted. -func (s *Storage) DeleteSeries(qt *querytracer.Tracer, tfss []*TagFilters) (int, error) { - deletedCount, err := s.idb().DeleteTSIDs(qt, tfss) +// If the number of the series exceeds maxMetrics, no series will be deleted and +// an error will be returned. Otherwise, the funciton returns the number of +// metrics deleted. +func (s *Storage) DeleteSeries(qt *querytracer.Tracer, tfss []*TagFilters, maxMetrics int) (int, error) { + deletedCount, err := s.idb().DeleteTSIDs(qt, tfss, maxMetrics) if err != nil { return deletedCount, fmt.Errorf("cannot delete tsids: %w", err) } diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index 624dda9901..81cd56995d 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -824,7 +824,7 @@ func testStorageDeleteSeries(s *Storage, workerNum int) error { if n := metricBlocksCount(tfs); n == 0 { return fmt.Errorf("expecting non-zero number of metric blocks for tfs=%s", tfs) } - deletedCount, err := s.DeleteSeries(nil, []*TagFilters{tfs}) + deletedCount, err := s.DeleteSeries(nil, []*TagFilters{tfs}, 1e9) if err != nil { return fmt.Errorf("cannot delete metrics: %w", err) } @@ -836,7 +836,7 @@ func testStorageDeleteSeries(s *Storage, workerNum int) error { } // Try deleting empty tfss - deletedCount, err = s.DeleteSeries(nil, nil) + deletedCount, err = s.DeleteSeries(nil, nil, 1e9) if err != nil { return fmt.Errorf("cannot delete empty tfss: %w", err) } @@ -884,6 +884,122 @@ func checkLabelNames(lns []string, lnsExpected map[string]bool) error { return nil } +func TestStorageDeleteSeries_TooManyTimeseries(t *testing.T) { + defer testRemoveAll(t) + + const numSeries = 1000 + rng := rand.New(rand.NewSource(1)) + var accountID uint32 = 1 + var projectID uint32 = 2 + mrs := testGenerateMetricRowsWithPrefixForTenantID(rng, accountID, projectID, numSeries, "metric", TimeRange{ + MinTimestamp: time.Now().Add(-100 * 24 * time.Hour).UnixMilli(), + MaxTimestamp: time.Now().UnixMilli(), + }) + + s := MustOpenStorage(t.Name(), 0, 0, 0) + defer s.MustClose() + s.AddRows(mrs, defaultPrecisionBits) + s.DebugFlush() + + tfs := NewTagFilters(accountID, projectID) + if err := tfs.Add(nil, []byte("metric.*"), false, true); err != nil { + t.Fatalf("unexpected error in TagFilters.Add: %v", err) + } + maxSeries := numSeries - 1 + count, err := s.DeleteSeries(nil, []*TagFilters{tfs}, maxSeries) + if err == nil { + t.Errorf("expected an error but there hasn't been one") + } + if count != 0 { + t.Errorf("unexpected deleted series count: got %d, want 0", count) + } +} + +func TestStorageDeleteSeries_CachesAreUpdatedOrReset(t *testing.T) { + defer testRemoveAll(t) + + tr := TimeRange{ + MinTimestamp: time.Now().Add(-100 * 24 * time.Hour).UnixMilli(), + MaxTimestamp: time.Now().UnixMilli(), + } + mn := MetricName{MetricGroup: []byte("metric")} + mr := MetricRow{ + MetricNameRaw: mn.marshalRaw(nil), + Timestamp: tr.MaxTimestamp, + Value: 123, + } + var ( + genTSID generationTSID + tfssKey []byte + ) + tfs := NewTagFilters(0, 0) + if err := tfs.Add(nil, []byte("metric.*"), false, true); err != nil { + t.Fatalf("unexpected error in TagFilters.Add: %v", err) + } + tfss := []*TagFilters{tfs} + s := MustOpenStorage(t.Name(), 0, 0, 0) + defer s.MustClose() + + // Ensure caches are empty. + if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) { + t.Fatalf("tsidCache unexpected contents: got %v, want empty", genTSID) + } + tfssKey = marshalTagFiltersKey(nil, tfss, tr, true) + if got, ok := s.idb().getMetricIDsFromTagFiltersCache(nil, tfssKey); ok { + t.Fatalf("tagFiltersToMetricIDsCache unexpected contents: got %v, want empty", got) + } + if got := s.getDeletedMetricIDs().Len(); got != 0 { + t.Fatalf("deletedMetricIDs cache: unexpected size: got %d, want empty", got) + } + + // Add one row, search it, and ensure that the tsidCache and + // tagFiltersToMetricIDsCache are not empty but the deletedMetricIDs + // cache is still empty. + s.AddRows([]MetricRow{mr}, defaultPrecisionBits) + s.DebugFlush() + gotMetrics, err := s.SearchMetricNames(nil, tfss, tr, 1, noDeadline) + if err != nil { + t.Fatalf("SearchMetricNames() failed unexpectedly: %v", err) + } + wantMetrics := []string{string(mr.MetricNameRaw)} + if reflect.DeepEqual(gotMetrics, wantMetrics) { + t.Fatalf("SearchMetricNames() unexpected search result: got %v, want %v", gotMetrics, wantMetrics) + } + + if !s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) { + t.Fatalf("tsidCache was expected to contain a record but it did not") + } + metricID := genTSID.TSID.MetricID + tfssKey = marshalTagFiltersKey(nil, tfss, tr, true) + if _, ok := s.idb().getMetricIDsFromTagFiltersCache(nil, tfssKey); !ok { + t.Fatalf("tagFiltersToMetricIDsCache was expected to contain a record but it did not") + } + if got := s.getDeletedMetricIDs().Len(); got != 0 { + t.Fatalf("deletedMetricIDs cache unexpected size: got %d, want empty", got) + } + + // Delete the metric added earlier and ensure that the tsidCache and + // tagFiltersToMetricIDsCache have been reset and the deletedMetricIDs + // cache is now contains ID of the deleted metric. + numDeletedSeries, err := s.DeleteSeries(nil, tfss, 1) + if err != nil { + t.Fatalf("DeleteSeries() failed unexpectedly: %v", err) + } + if got, want := numDeletedSeries, 1; got != want { + t.Fatalf("unexpected number of deleted series, got %d, want %d", got, want) + } + if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) { + t.Fatalf("tsidCache unexpected contents: got %v, want empty", genTSID) + } + tfssKey = marshalTagFiltersKey(nil, tfss, tr, true) + if got, ok := s.idb().getMetricIDsFromTagFiltersCache(nil, tfssKey); ok { + t.Fatalf("tagFiltersToMetricIDsCache unexpected contents: got %v, want empty", got) + } + if got, want := s.getDeletedMetricIDs().AppendTo(nil), []uint64{metricID}; !reflect.DeepEqual(got, want) { + t.Fatalf("deletedMetricIDs cache: unexpected contents: got %v, want %v", got, want) + } +} + func TestStorageRegisterMetricNamesSerial(t *testing.T) { path := "TestStorageRegisterMetricNamesSerial" s := MustOpenStorage(path, 0, 0, 0)