mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
Introduce a flag for limiting the number of time series to delete (#7091)
### Describe Your Changes Introduce the `-search.maxDeleteSeries` flag that limits the number of time series that can be deleted with a single `/api/v1/admin/tsdb/delete_series` call. Currently, any number can be deleted and if the number is big (millions) then the operation may result in unaccounted CPU and memory usage spikes which in some cases may result in OOM kill (see #7027). The flag limits the number to 30k by default and the users may override it if needed at the vmstorage start time. --------- Signed-off-by: Artem Fetishev <rtm@victoriametrics.com> Co-authored-by: Nikolay <nik@victoriametrics.com>
This commit is contained in:
parent
80a3c410d4
commit
ed5da38ede
9 changed files with 153 additions and 23 deletions
|
@ -4,7 +4,6 @@ import (
|
|||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
|
@ -12,6 +11,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/native"
|
||||
remote_read_integration "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/testdata/servers_integration_test"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
|
||||
|
@ -251,7 +251,7 @@ func deleteSeries(name, value string) (int, error) {
|
|||
if err := tfs.Add([]byte(name), []byte(value), false, true); err != nil {
|
||||
return 0, fmt.Errorf("unexpected error in TagFilters.Add: %w", err)
|
||||
}
|
||||
return vmstorage.DeleteSeries(nil, []*storage.TagFilters{tfs})
|
||||
return vmstorage.DeleteSeries(nil, []*storage.TagFilters{tfs}, 1e3)
|
||||
}
|
||||
|
||||
func TestBuildMatchWithFilter_Failure(t *testing.T) {
|
||||
|
|
|
@ -765,7 +765,7 @@ func putSortBlocksHeap(sbh *sortBlocksHeap) {
|
|||
|
||||
var sbhPool sync.Pool
|
||||
|
||||
// DeleteSeries deletes time series matching the given tagFilterss.
|
||||
// DeleteSeries deletes time series matching the given search query.
|
||||
func DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline searchutils.Deadline) (int, error) {
|
||||
qt = qt.NewChild("delete series: %s", sq)
|
||||
defer qt.Done()
|
||||
|
@ -774,7 +774,7 @@ func DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear
|
|||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return vmstorage.DeleteSeries(qt, tfss)
|
||||
return vmstorage.DeleteSeries(qt, tfss, sq.MaxMetrics)
|
||||
}
|
||||
|
||||
// LabelNames returns label names matching the given sq until the given deadline.
|
||||
|
|
|
@ -3,7 +3,6 @@ package prometheus
|
|||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"github.com/VictoriaMetrics/metricsql"
|
||||
"math"
|
||||
"net/http"
|
||||
"runtime"
|
||||
|
@ -27,6 +26,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
"github.com/VictoriaMetrics/metricsql"
|
||||
"github.com/valyala/fastjson/fastfloat"
|
||||
)
|
||||
|
||||
|
@ -52,6 +52,7 @@ var (
|
|||
maxExportSeries = flag.Int("search.maxExportSeries", 10e6, "The maximum number of time series, which can be returned from /api/v1/export* APIs. This option allows limiting memory usage")
|
||||
maxTSDBStatusSeries = flag.Int("search.maxTSDBStatusSeries", 10e6, "The maximum number of time series, which can be processed during the call to /api/v1/status/tsdb. This option allows limiting memory usage")
|
||||
maxSeriesLimit = flag.Int("search.maxSeries", 30e3, "The maximum number of time series, which can be returned from /api/v1/series. This option allows limiting memory usage")
|
||||
maxDeleteSeries = flag.Int("search.maxDeleteSeries", 30e3, "The maximum number of time series, which can be deleted using /api/v1/admin/tsdb/delete_series. This option allows limiting memory usage")
|
||||
maxLabelsAPISeries = flag.Int("search.maxLabelsAPISeries", 1e6, "The maximum number of time series, which could be scanned when searching for the matching time series "+
|
||||
"at /api/v1/labels and /api/v1/label/.../values. This option allows limiting memory usage and CPU usage. See also -search.maxLabelsAPIDuration, "+
|
||||
"-search.maxTagKeys, -search.maxTagValues and -search.ignoreExtraFiltersAtLabelsAPI")
|
||||
|
@ -479,7 +480,7 @@ func DeleteHandler(startTime time.Time, r *http.Request) error {
|
|||
if !cp.IsDefaultTimeRange() {
|
||||
return fmt.Errorf("start=%d and end=%d args aren't supported. Remove these args from the query in order to delete all the matching metrics", cp.start, cp.end)
|
||||
}
|
||||
sq := storage.NewSearchQuery(cp.start, cp.end, cp.filterss, 0)
|
||||
sq := storage.NewSearchQuery(cp.start, cp.end, cp.filterss, *maxDeleteSeries)
|
||||
deletedCount, err := netstorage.DeleteSeries(nil, sq, cp.deadline)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot delete time series: %w", err)
|
||||
|
|
|
@ -171,9 +171,9 @@ func RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow) {
|
|||
// DeleteSeries deletes series matching tfss.
|
||||
//
|
||||
// Returns the number of deleted series.
|
||||
func DeleteSeries(qt *querytracer.Tracer, tfss []*storage.TagFilters) (int, error) {
|
||||
func DeleteSeries(qt *querytracer.Tracer, tfss []*storage.TagFilters, maxMetrics int) (int, error) {
|
||||
WG.Add(1)
|
||||
n, err := Storage.DeleteSeries(qt, tfss)
|
||||
n, err := Storage.DeleteSeries(qt, tfss, maxMetrics)
|
||||
WG.Done()
|
||||
return n, err
|
||||
}
|
||||
|
|
|
@ -1184,7 +1184,10 @@ In this case [forced merge](#forced-merge) may help freeing up storage space.
|
|||
|
||||
It is recommended verifying which metrics will be deleted with the call to `http://<victoria-metrics-addr>:8428/api/v1/series?match[]=<timeseries_selector_for_delete>`
|
||||
before actually deleting the metrics. By default, this query will only scan series in the past 5 minutes, so you may need to
|
||||
adjust `start` and `end` to a suitable range to achieve match hits.
|
||||
adjust `start` and `end` to a suitable range to achieve match hits. Also, if the
|
||||
number of returned time series is rather big you will need to set
|
||||
`-search.maxDeleteSeries` flag (see
|
||||
[Resource usage limits](#resource-usage-limits)).
|
||||
|
||||
The `/api/v1/admin/tsdb/delete_series` handler may be protected with `authKey` if `-deleteAuthKey` command-line flag is set.
|
||||
Note that handler accepts any HTTP method, so sending a `GET` request to `/api/v1/admin/tsdb/delete_series` will result in deletion of time series.
|
||||
|
@ -1721,6 +1724,13 @@ By default, VictoriaMetrics is tuned for an optimal resource usage under typical
|
|||
of CPU time and memory when the database contains big number of unique time series because of [high churn rate](https://docs.victoriametrics.com/faq/#what-is-high-churn-rate).
|
||||
In this case it might be useful to set the `-search.maxSeries` to quite low value in order limit CPU and memory usage.
|
||||
See also `-search.maxLabelsAPIDuration` and `-search.maxLabelsAPISeries`.
|
||||
- `-search.maxDeleteSeries` limits the number of unique time series that can be
|
||||
deleted by a single
|
||||
[/api/v1/admin/tsdb/delete_series](https://docs.victoriametrics.com/url-examples/#apiv1admintsdbdelete_series)
|
||||
call. Deleting too many time series may require big amount of CPU and memory
|
||||
and this limit guards against unplanned resource usage spikes. Also see
|
||||
[How to delete time series](#how-to-delete-time-series) section to learn about
|
||||
different ways of deleting series.
|
||||
- `-search.maxTagKeys` limits the number of items, which may be returned from [/api/v1/labels](https://docs.victoriametrics.com/url-examples/#apiv1labels).
|
||||
This endpoint is used mostly by Grafana for auto-completion of label names. Queries to this endpoint may take big amounts of CPU time and memory
|
||||
when the database contains big number of unique time series because of [high churn rate](https://docs.victoriametrics.com/faq/#what-is-high-churn-rate).
|
||||
|
|
|
@ -52,6 +52,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
|
|||
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/metricsql/): consistently return the last non-`NaN` value from [`range_last`](https://docs.victoriametrics.com/metricsql/#range_last) function across all the returned data points. Previously `NaN` data points weren't replaced with the last non-`NaN` value.
|
||||
* BUGFIX: all VictoriaMetrics components: increase default value of `-loggerMaxArgLen` cmd-line flag from 1000 to 5000. This should improve visibility on errors produced by very long queries.
|
||||
* BUGFIX: all VictoriaMetrics components: trim trailing spaces when reading content from `*.passwordFile` and similar flags. It reverts changes introduced at [v1.102.0-rc2](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.102.0-rc2) release. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6986) for details.
|
||||
* BUGFIX: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): introduce the `-search.maxDeleteSeries` command line flag to limit the number of series that can be deleted by a single `/api/v1/admin/tsdb/delete_series` call. Previously, one could delete any number of series and if this number was big (millions) the deletion could result in OOM. See this [issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7027) for details.
|
||||
|
||||
## [v1.103.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.103.0)
|
||||
|
||||
|
|
|
@ -1564,12 +1564,14 @@ func (db *indexDB) searchMetricNameWithCache(dst []byte, metricID uint64) ([]byt
|
|||
return dst, false
|
||||
}
|
||||
|
||||
// DeleteTSIDs marks as deleted all the TSIDs matching the given tfss.
|
||||
// DeleteTSIDs marks as deleted all the TSIDs matching the given tfss and
|
||||
// updates or resets all caches where TSIDs and the corresponding MetricIDs may
|
||||
// be stored.
|
||||
//
|
||||
// The caller must reset all the caches which may contain the deleted TSIDs.
|
||||
//
|
||||
// Returns the number of metrics deleted.
|
||||
func (db *indexDB) DeleteTSIDs(qt *querytracer.Tracer, tfss []*TagFilters) (int, error) {
|
||||
// If the number of the series exceeds maxMetrics, no series will be deleted and
|
||||
// an error will be returned. Otherwise, the funciton returns the number of
|
||||
// series deleted.
|
||||
func (db *indexDB) DeleteTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, maxMetrics int) (int, error) {
|
||||
qt = qt.NewChild("deleting series for %s", tfss)
|
||||
defer qt.Done()
|
||||
if len(tfss) == 0 {
|
||||
|
@ -1582,7 +1584,7 @@ func (db *indexDB) DeleteTSIDs(qt *querytracer.Tracer, tfss []*TagFilters) (int,
|
|||
MaxTimestamp: (1 << 63) - 1,
|
||||
}
|
||||
is := db.getIndexSearch(noDeadline)
|
||||
metricIDs, err := is.searchMetricIDs(qt, tfss, tr, 2e9)
|
||||
metricIDs, err := is.searchMetricIDs(qt, tfss, tr, maxMetrics)
|
||||
db.putIndexSearch(is)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
|
@ -1594,7 +1596,7 @@ func (db *indexDB) DeleteTSIDs(qt *querytracer.Tracer, tfss []*TagFilters) (int,
|
|||
db.doExtDB(func(extDB *indexDB) {
|
||||
var n int
|
||||
qtChild := qt.NewChild("deleting series from the previos indexdb")
|
||||
n, err = extDB.DeleteTSIDs(qtChild, tfss)
|
||||
n, err = extDB.DeleteTSIDs(qtChild, tfss, maxMetrics)
|
||||
qtChild.Donef("deleted %d series", n)
|
||||
deletedCount += n
|
||||
})
|
||||
|
|
|
@ -1268,11 +1268,13 @@ func (s *Storage) prefetchMetricNames(qt *querytracer.Tracer, srcMetricIDs []uin
|
|||
// ErrDeadlineExceeded is returned when the request times out.
|
||||
var ErrDeadlineExceeded = fmt.Errorf("deadline exceeded")
|
||||
|
||||
// DeleteSeries deletes all the series matching the given tfss.
|
||||
// DeleteSeries deletes the series matching the given tfss.
|
||||
//
|
||||
// Returns the number of metrics deleted.
|
||||
func (s *Storage) DeleteSeries(qt *querytracer.Tracer, tfss []*TagFilters) (int, error) {
|
||||
deletedCount, err := s.idb().DeleteTSIDs(qt, tfss)
|
||||
// If the number of the series exceeds maxMetrics, no series will be deleted and
|
||||
// an error will be returned. Otherwise, the funciton returns the number of
|
||||
// metrics deleted.
|
||||
func (s *Storage) DeleteSeries(qt *querytracer.Tracer, tfss []*TagFilters, maxMetrics int) (int, error) {
|
||||
deletedCount, err := s.idb().DeleteTSIDs(qt, tfss, maxMetrics)
|
||||
if err != nil {
|
||||
return deletedCount, fmt.Errorf("cannot delete tsids: %w", err)
|
||||
}
|
||||
|
|
|
@ -735,7 +735,7 @@ func testStorageDeleteSeries(s *Storage, workerNum int) error {
|
|||
if n := metricBlocksCount(tfs); n == 0 {
|
||||
return fmt.Errorf("expecting non-zero number of metric blocks for tfs=%s", tfs)
|
||||
}
|
||||
deletedCount, err := s.DeleteSeries(nil, []*TagFilters{tfs})
|
||||
deletedCount, err := s.DeleteSeries(nil, []*TagFilters{tfs}, 1e9)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot delete metrics: %w", err)
|
||||
}
|
||||
|
@ -747,7 +747,7 @@ func testStorageDeleteSeries(s *Storage, workerNum int) error {
|
|||
}
|
||||
|
||||
// Try deleting empty tfss
|
||||
deletedCount, err = s.DeleteSeries(nil, nil)
|
||||
deletedCount, err = s.DeleteSeries(nil, nil, 1e9)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot delete empty tfss: %w", err)
|
||||
}
|
||||
|
@ -795,6 +795,120 @@ func checkLabelNames(lns []string, lnsExpected map[string]bool) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func TestStorageDeleteSeries_TooManyTimeseries(t *testing.T) {
|
||||
defer testRemoveAll(t)
|
||||
|
||||
const numSeries = 1000
|
||||
rng := rand.New(rand.NewSource(1))
|
||||
mrs := testGenerateMetricRowsWithPrefix(rng, numSeries, "metric", TimeRange{
|
||||
MinTimestamp: time.Now().Add(-100 * 24 * time.Hour).UnixMilli(),
|
||||
MaxTimestamp: time.Now().UnixMilli(),
|
||||
})
|
||||
|
||||
s := MustOpenStorage(t.Name(), 0, 0, 0)
|
||||
defer s.MustClose()
|
||||
s.AddRows(mrs, defaultPrecisionBits)
|
||||
s.DebugFlush()
|
||||
|
||||
tfs := NewTagFilters()
|
||||
if err := tfs.Add(nil, []byte("metric.*"), false, true); err != nil {
|
||||
t.Fatalf("unexpected error in TagFilters.Add: %v", err)
|
||||
}
|
||||
maxSeries := numSeries - 1
|
||||
count, err := s.DeleteSeries(nil, []*TagFilters{tfs}, maxSeries)
|
||||
if err == nil {
|
||||
t.Errorf("expected an error but there hasn't been one")
|
||||
}
|
||||
if count != 0 {
|
||||
t.Errorf("unexpected deleted series count: got %d, want 0", count)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStorageDeleteSeries_CachesAreUpdatedOrReset(t *testing.T) {
|
||||
defer testRemoveAll(t)
|
||||
|
||||
tr := TimeRange{
|
||||
MinTimestamp: time.Now().Add(-100 * 24 * time.Hour).UnixMilli(),
|
||||
MaxTimestamp: time.Now().UnixMilli(),
|
||||
}
|
||||
mn := MetricName{MetricGroup: []byte("metric")}
|
||||
mr := MetricRow{
|
||||
MetricNameRaw: mn.marshalRaw(nil),
|
||||
Timestamp: tr.MaxTimestamp,
|
||||
Value: 123,
|
||||
}
|
||||
var (
|
||||
genTSID generationTSID
|
||||
tfssKey []byte
|
||||
)
|
||||
tfs := NewTagFilters()
|
||||
if err := tfs.Add(nil, []byte("metric.*"), false, true); err != nil {
|
||||
t.Fatalf("unexpected error in TagFilters.Add: %v", err)
|
||||
}
|
||||
tfss := []*TagFilters{tfs}
|
||||
s := MustOpenStorage(t.Name(), 0, 0, 0)
|
||||
defer s.MustClose()
|
||||
|
||||
// Ensure caches are empty.
|
||||
if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) {
|
||||
t.Fatalf("tsidCache unexpected contents: got %v, want empty", genTSID)
|
||||
}
|
||||
tfssKey = marshalTagFiltersKey(nil, tfss, tr, true)
|
||||
if got, ok := s.idb().getMetricIDsFromTagFiltersCache(nil, tfssKey); ok {
|
||||
t.Fatalf("tagFiltersToMetricIDsCache unexpected contents: got %v, want empty", got)
|
||||
}
|
||||
if got := s.getDeletedMetricIDs().Len(); got != 0 {
|
||||
t.Fatalf("deletedMetricIDs cache: unexpected size: got %d, want empty", got)
|
||||
}
|
||||
|
||||
// Add one row, search it, and ensure that the tsidCache and
|
||||
// tagFiltersToMetricIDsCache are not empty but the deletedMetricIDs
|
||||
// cache is still empty.
|
||||
s.AddRows([]MetricRow{mr}, defaultPrecisionBits)
|
||||
s.DebugFlush()
|
||||
gotMetrics, err := s.SearchMetricNames(nil, tfss, tr, 1, noDeadline)
|
||||
if err != nil {
|
||||
t.Fatalf("SearchMetricNames() failed unexpectedly: %v", err)
|
||||
}
|
||||
wantMetrics := []string{string(mr.MetricNameRaw)}
|
||||
if reflect.DeepEqual(gotMetrics, wantMetrics) {
|
||||
t.Fatalf("SearchMetricNames() unexpected search result: got %v, want %v", gotMetrics, wantMetrics)
|
||||
}
|
||||
|
||||
if !s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) {
|
||||
t.Fatalf("tsidCache was expected to contain a record but it did not")
|
||||
}
|
||||
metricID := genTSID.TSID.MetricID
|
||||
tfssKey = marshalTagFiltersKey(nil, tfss, tr, true)
|
||||
if _, ok := s.idb().getMetricIDsFromTagFiltersCache(nil, tfssKey); !ok {
|
||||
t.Fatalf("tagFiltersToMetricIDsCache was expected to contain a record but it did not")
|
||||
}
|
||||
if got := s.getDeletedMetricIDs().Len(); got != 0 {
|
||||
t.Fatalf("deletedMetricIDs cache unexpected size: got %d, want empty", got)
|
||||
}
|
||||
|
||||
// Delete the metric added earlier and ensure that the tsidCache and
|
||||
// tagFiltersToMetricIDsCache have been reset and the deletedMetricIDs
|
||||
// cache is now contains ID of the deleted metric.
|
||||
numDeletedSeries, err := s.DeleteSeries(nil, tfss, 1)
|
||||
if err != nil {
|
||||
t.Fatalf("DeleteSeries() failed unexpectedly: %v", err)
|
||||
}
|
||||
if got, want := numDeletedSeries, 1; got != want {
|
||||
t.Fatalf("unexpected number of deleted series, got %d, want %d", got, want)
|
||||
}
|
||||
if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) {
|
||||
t.Fatalf("tsidCache unexpected contents: got %v, want empty", genTSID)
|
||||
}
|
||||
tfssKey = marshalTagFiltersKey(nil, tfss, tr, true)
|
||||
if got, ok := s.idb().getMetricIDsFromTagFiltersCache(nil, tfssKey); ok {
|
||||
t.Fatalf("tagFiltersToMetricIDsCache unexpected contents: got %v, want empty", got)
|
||||
}
|
||||
if got, want := s.getDeletedMetricIDs().AppendTo(nil), []uint64{metricID}; !reflect.DeepEqual(got, want) {
|
||||
t.Fatalf("deletedMetricIDs cache: unexpected contents: got %v, want %v", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStorageRegisterMetricNamesSerial(t *testing.T) {
|
||||
path := "TestStorageRegisterMetricNamesSerial"
|
||||
s := MustOpenStorage(path, 0, 0, 0)
|
||||
|
@ -1425,7 +1539,7 @@ func testCountAllMetricNames(s *Storage, tr TimeRange) int {
|
|||
}
|
||||
names, err := s.SearchMetricNames(nil, []*TagFilters{tfsAll}, tr, 1e9, noDeadline)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("SeachMetricNames() failed unexpectedly: %v", err))
|
||||
panic(fmt.Sprintf("SearchMetricNames() failed unexpectedly: %v", err))
|
||||
}
|
||||
return len(names)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue