diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index 3405ac81ef..5e761a69b7 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -3,7 +3,6 @@ package prometheus import ( "flag" "fmt" - "github.com/VictoriaMetrics/metricsql" "math" "net" "net/http" @@ -30,6 +29,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/metrics" + "github.com/VictoriaMetrics/metricsql" "github.com/valyala/fastjson/fastfloat" ) @@ -56,6 +56,7 @@ var ( maxExportSeries = flag.Int("search.maxExportSeries", 10e6, "The maximum number of time series, which can be returned from /api/v1/export* APIs. This option allows limiting memory usage") maxTSDBStatusSeries = flag.Int("search.maxTSDBStatusSeries", 10e6, "The maximum number of time series, which can be processed during the call to /api/v1/status/tsdb. This option allows limiting memory usage") maxSeriesLimit = flag.Int("search.maxSeries", 30e3, "The maximum number of time series, which can be returned from /api/v1/series. This option allows limiting memory usage") + maxDeleteSeries = flag.Int("search.maxDeleteSeries", 1e6, "The maximum number of time series, which can be deleted using /api/v1/admin/tsdb/delete_series. This option allows limiting memory usage") maxLabelsAPISeries = flag.Int("search.maxLabelsAPISeries", 1e6, "The maximum number of time series, which could be scanned when searching for the matching time series "+ "at /api/v1/labels and /api/v1/label/.../values. This option allows limiting memory usage and CPU usage. See also -search.maxLabelsAPIDuration, "+ "-search.maxTagKeys, -search.maxTagValues and -search.ignoreExtraFiltersAtLabelsAPI") @@ -493,7 +494,7 @@ func DeleteHandler(startTime time.Time, at *auth.Token, r *http.Request) error { if !cp.IsDefaultTimeRange() { return fmt.Errorf("start=%d and end=%d args aren't supported. Remove these args from the query in order to delete all the matching metrics", cp.start, cp.end) } - sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, cp.start, cp.end, cp.filterss, 0) + sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, cp.start, cp.end, cp.filterss, *maxDeleteSeries) deletedCount, err := netstorage.DeleteSeries(nil, sq, cp.deadline) if err != nil { return fmt.Errorf("cannot delete time series: %w", err) diff --git a/app/vmstorage/servers/vmselect.go b/app/vmstorage/servers/vmselect.go index ba119eeedf..5a30851a17 100644 --- a/app/vmstorage/servers/vmselect.go +++ b/app/vmstorage/servers/vmselect.go @@ -154,7 +154,7 @@ func (api *vmstorageAPI) DeleteSeries(qt *querytracer.Tracer, sq *storage.Search if len(tfss) == 0 { return 0, fmt.Errorf("missing tag filters") } - return api.s.DeleteSeries(qt, tfss) + return api.s.DeleteSeries(qt, tfss, maxMetrics) } func (api *vmstorageAPI) RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, _ uint64) error { diff --git a/docs/Cluster-VictoriaMetrics.md b/docs/Cluster-VictoriaMetrics.md index e843d051f3..50707cbbc2 100644 --- a/docs/Cluster-VictoriaMetrics.md +++ b/docs/Cluster-VictoriaMetrics.md @@ -706,6 +706,13 @@ Some workloads may need fine-grained resource usage limits. In these cases the f Queries to this endpoint may take big amounts of CPU time and memory at `vmstorage` and `vmselect` when the database contains big number of unique time series because of [high churn rate](https://docs.victoriametrics.com/faq/#what-is-high-churn-rate). In this case it might be useful to set the `-search.maxSeries` to quite low value in order limit CPU and memory usage. +- `-search.maxDeleteSeries` at `vmselect` limits the number of unique time + series that can be deleted by a single + [/api/v1/admin/tsdb/delete_series](https://docs.victoriametrics.com/url-examples/#apiv1admintsdbdelete_series) + call. Deleting too many time series may require big amount of CPU and memory + at `vmstorage` and this limit guards against unplanned resource usage spikes. + Also see [How to delete time series](#how-to-delete-time-series) section to + learn about different ways of deleting series. - `-search.maxTagKeys` at `vmstorage` limits the number of items, which may be returned from [/api/v1/labels](https://docs.victoriametrics.com/url-examples/#apiv1labels). This endpoint is used mostly by Grafana for auto-completion of label names. Queries to this endpoint may take big amounts of CPU time and memory at `vmstorage` and `vmselect` diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 25c1b400f2..819206df87 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -1716,12 +1716,14 @@ func (db *indexDB) searchMetricNameWithCache(dst []byte, metricID uint64, accoun return dst, false } -// DeleteTSIDs marks as deleted all the TSIDs matching the given tfss. +// DeleteTSIDs marks as deleted all the TSIDs matching the given tfss and +// updates or resets all caches where TSIDs and the corresponding MetricIDs may +// be stored. // -// The caller must reset all the caches which may contain the deleted TSIDs. -// -// Returns the number of metrics deleted. -func (db *indexDB) DeleteTSIDs(qt *querytracer.Tracer, tfss []*TagFilters) (int, error) { +// If the number of the series exceeds maxMetrics, no series will be deleted and +// an error will be returned. Otherwise, the funciton returns the number of +// series deleted. +func (db *indexDB) DeleteTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, maxMetrics int) (int, error) { qt = qt.NewChild("deleting series for %s", tfss) defer qt.Done() if len(tfss) == 0 { @@ -1734,7 +1736,7 @@ func (db *indexDB) DeleteTSIDs(qt *querytracer.Tracer, tfss []*TagFilters) (int, MaxTimestamp: (1 << 63) - 1, } is := db.getIndexSearch(tfss[0].accountID, tfss[0].projectID, noDeadline) - metricIDs, err := is.searchMetricIDs(qt, tfss, tr, 2e9) + metricIDs, err := is.searchMetricIDs(qt, tfss, tr, maxMetrics) db.putIndexSearch(is) if err != nil { return 0, err @@ -1746,7 +1748,7 @@ func (db *indexDB) DeleteTSIDs(qt *querytracer.Tracer, tfss []*TagFilters) (int, db.doExtDB(func(extDB *indexDB) { var n int qtChild := qt.NewChild("deleting series from the previos indexdb") - n, err = extDB.DeleteTSIDs(qtChild, tfss) + n, err = extDB.DeleteTSIDs(qtChild, tfss, maxMetrics) qtChild.Donef("deleted %d series", n) deletedCount += n }) diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 8d0ed4833c..543d5b89bf 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -1335,11 +1335,13 @@ func (s *Storage) prefetchMetricNames(qt *querytracer.Tracer, accountID, project // ErrDeadlineExceeded is returned when the request times out. var ErrDeadlineExceeded = fmt.Errorf("deadline exceeded") -// DeleteSeries deletes all the series matching the given tfss. +// DeleteSeries deletes the series matching the given tfss. // -// Returns the number of metrics deleted. -func (s *Storage) DeleteSeries(qt *querytracer.Tracer, tfss []*TagFilters) (int, error) { - deletedCount, err := s.idb().DeleteTSIDs(qt, tfss) +// If the number of the series exceeds maxMetrics, no series will be deleted and +// an error will be returned. Otherwise, the funciton returns the number of +// metrics deleted. +func (s *Storage) DeleteSeries(qt *querytracer.Tracer, tfss []*TagFilters, maxMetrics int) (int, error) { + deletedCount, err := s.idb().DeleteTSIDs(qt, tfss, maxMetrics) if err != nil { return deletedCount, fmt.Errorf("cannot delete tsids: %w", err) } diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index 624dda9901..81cd56995d 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -824,7 +824,7 @@ func testStorageDeleteSeries(s *Storage, workerNum int) error { if n := metricBlocksCount(tfs); n == 0 { return fmt.Errorf("expecting non-zero number of metric blocks for tfs=%s", tfs) } - deletedCount, err := s.DeleteSeries(nil, []*TagFilters{tfs}) + deletedCount, err := s.DeleteSeries(nil, []*TagFilters{tfs}, 1e9) if err != nil { return fmt.Errorf("cannot delete metrics: %w", err) } @@ -836,7 +836,7 @@ func testStorageDeleteSeries(s *Storage, workerNum int) error { } // Try deleting empty tfss - deletedCount, err = s.DeleteSeries(nil, nil) + deletedCount, err = s.DeleteSeries(nil, nil, 1e9) if err != nil { return fmt.Errorf("cannot delete empty tfss: %w", err) } @@ -884,6 +884,122 @@ func checkLabelNames(lns []string, lnsExpected map[string]bool) error { return nil } +func TestStorageDeleteSeries_TooManyTimeseries(t *testing.T) { + defer testRemoveAll(t) + + const numSeries = 1000 + rng := rand.New(rand.NewSource(1)) + var accountID uint32 = 1 + var projectID uint32 = 2 + mrs := testGenerateMetricRowsWithPrefixForTenantID(rng, accountID, projectID, numSeries, "metric", TimeRange{ + MinTimestamp: time.Now().Add(-100 * 24 * time.Hour).UnixMilli(), + MaxTimestamp: time.Now().UnixMilli(), + }) + + s := MustOpenStorage(t.Name(), 0, 0, 0) + defer s.MustClose() + s.AddRows(mrs, defaultPrecisionBits) + s.DebugFlush() + + tfs := NewTagFilters(accountID, projectID) + if err := tfs.Add(nil, []byte("metric.*"), false, true); err != nil { + t.Fatalf("unexpected error in TagFilters.Add: %v", err) + } + maxSeries := numSeries - 1 + count, err := s.DeleteSeries(nil, []*TagFilters{tfs}, maxSeries) + if err == nil { + t.Errorf("expected an error but there hasn't been one") + } + if count != 0 { + t.Errorf("unexpected deleted series count: got %d, want 0", count) + } +} + +func TestStorageDeleteSeries_CachesAreUpdatedOrReset(t *testing.T) { + defer testRemoveAll(t) + + tr := TimeRange{ + MinTimestamp: time.Now().Add(-100 * 24 * time.Hour).UnixMilli(), + MaxTimestamp: time.Now().UnixMilli(), + } + mn := MetricName{MetricGroup: []byte("metric")} + mr := MetricRow{ + MetricNameRaw: mn.marshalRaw(nil), + Timestamp: tr.MaxTimestamp, + Value: 123, + } + var ( + genTSID generationTSID + tfssKey []byte + ) + tfs := NewTagFilters(0, 0) + if err := tfs.Add(nil, []byte("metric.*"), false, true); err != nil { + t.Fatalf("unexpected error in TagFilters.Add: %v", err) + } + tfss := []*TagFilters{tfs} + s := MustOpenStorage(t.Name(), 0, 0, 0) + defer s.MustClose() + + // Ensure caches are empty. + if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) { + t.Fatalf("tsidCache unexpected contents: got %v, want empty", genTSID) + } + tfssKey = marshalTagFiltersKey(nil, tfss, tr, true) + if got, ok := s.idb().getMetricIDsFromTagFiltersCache(nil, tfssKey); ok { + t.Fatalf("tagFiltersToMetricIDsCache unexpected contents: got %v, want empty", got) + } + if got := s.getDeletedMetricIDs().Len(); got != 0 { + t.Fatalf("deletedMetricIDs cache: unexpected size: got %d, want empty", got) + } + + // Add one row, search it, and ensure that the tsidCache and + // tagFiltersToMetricIDsCache are not empty but the deletedMetricIDs + // cache is still empty. + s.AddRows([]MetricRow{mr}, defaultPrecisionBits) + s.DebugFlush() + gotMetrics, err := s.SearchMetricNames(nil, tfss, tr, 1, noDeadline) + if err != nil { + t.Fatalf("SearchMetricNames() failed unexpectedly: %v", err) + } + wantMetrics := []string{string(mr.MetricNameRaw)} + if reflect.DeepEqual(gotMetrics, wantMetrics) { + t.Fatalf("SearchMetricNames() unexpected search result: got %v, want %v", gotMetrics, wantMetrics) + } + + if !s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) { + t.Fatalf("tsidCache was expected to contain a record but it did not") + } + metricID := genTSID.TSID.MetricID + tfssKey = marshalTagFiltersKey(nil, tfss, tr, true) + if _, ok := s.idb().getMetricIDsFromTagFiltersCache(nil, tfssKey); !ok { + t.Fatalf("tagFiltersToMetricIDsCache was expected to contain a record but it did not") + } + if got := s.getDeletedMetricIDs().Len(); got != 0 { + t.Fatalf("deletedMetricIDs cache unexpected size: got %d, want empty", got) + } + + // Delete the metric added earlier and ensure that the tsidCache and + // tagFiltersToMetricIDsCache have been reset and the deletedMetricIDs + // cache is now contains ID of the deleted metric. + numDeletedSeries, err := s.DeleteSeries(nil, tfss, 1) + if err != nil { + t.Fatalf("DeleteSeries() failed unexpectedly: %v", err) + } + if got, want := numDeletedSeries, 1; got != want { + t.Fatalf("unexpected number of deleted series, got %d, want %d", got, want) + } + if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) { + t.Fatalf("tsidCache unexpected contents: got %v, want empty", genTSID) + } + tfssKey = marshalTagFiltersKey(nil, tfss, tr, true) + if got, ok := s.idb().getMetricIDsFromTagFiltersCache(nil, tfssKey); ok { + t.Fatalf("tagFiltersToMetricIDsCache unexpected contents: got %v, want empty", got) + } + if got, want := s.getDeletedMetricIDs().AppendTo(nil), []uint64{metricID}; !reflect.DeepEqual(got, want) { + t.Fatalf("deletedMetricIDs cache: unexpected contents: got %v, want %v", got, want) + } +} + func TestStorageRegisterMetricNamesSerial(t *testing.T) { path := "TestStorageRegisterMetricNamesSerial" s := MustOpenStorage(path, 0, 0, 0)