diff --git a/app/vmstorage/servers/vmselect.go b/app/vmstorage/servers/vmselect.go index 08cb5589e..a224f448d 100644 --- a/app/vmstorage/servers/vmselect.go +++ b/app/vmstorage/servers/vmselect.go @@ -5,8 +5,11 @@ import ( "fmt" "net/http" "sync" + "sync/atomic" "time" + "github.com/VictoriaMetrics/metrics" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" @@ -39,8 +42,14 @@ var ( ) var ( - maxUniqueTimeseriesValue int - maxUniqueTimeseriesValueOnce sync.Once + maxUniqueTimeseriesValue int + maxUniqueTimeseriesValueOnce sync.Once + readMetricIDsSearch = metrics.NewHistogram(`vm_series_read_per_query{type="search"}`) + readMetricIDsSearchMetricNames = metrics.NewHistogram(`vm_series_read_per_query{type="search_metric_names"}`) + readMetricIDsSearchLabelValues = metrics.NewHistogram(`vm_series_read_per_query{type="search_label_values"}`) + readMetricIDsSearchLabelNames = metrics.NewHistogram(`vm_series_read_per_query{type="search_label_names"}`) + readMetricIDsTSDBStatus = metrics.NewHistogram(`vm_series_read_per_query{type="tsdb_status"}`) + readMetricIDsDeleteSeries = metrics.NewHistogram(`vm_series_read_per_query{type="delete_series"}`) ) // NewVMSelectServer starts new server at the given addr, which serves vmselect requests from the given s. @@ -79,7 +88,9 @@ func (api *vmstorageAPI) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQu return nil, fmt.Errorf("missing tag filters") } bi := getBlockIterator() - bi.sr.Init(qt, api.s, tfss, tr, maxMetrics, deadline) + readMetricIDs := atomic.Uint64{} + bi.sr.Init(qt, api.s, tfss, tr, maxMetrics, deadline, &readMetricIDs) + readMetricIDsSearch.Update(float64(readMetricIDs.Load())) if err := bi.sr.Error(); err != nil { bi.MustClose() return nil, err @@ -97,7 +108,10 @@ func (api *vmstorageAPI) SearchMetricNames(qt *querytracer.Tracer, sq *storage.S if len(tfss) == 0 { return nil, fmt.Errorf("missing tag filters") } - return api.s.SearchMetricNames(qt, tfss, tr, maxMetrics, deadline) + readMetricIDs := atomic.Uint64{} + res, err := api.s.SearchMetricNames(qt, tfss, tr, maxMetrics, deadline, &readMetricIDs) + readMetricIDsSearchMetricNames.Update(float64(readMetricIDs.Load())) + return res, err } func (api *vmstorageAPI) LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuery, labelName string, maxLabelValues int, deadline uint64) ([]string, error) { @@ -107,7 +121,10 @@ func (api *vmstorageAPI) LabelValues(qt *querytracer.Tracer, sq *storage.SearchQ if err != nil { return nil, err } - return api.s.SearchLabelValuesWithFiltersOnTimeRange(qt, sq.AccountID, sq.ProjectID, labelName, tfss, tr, maxLabelValues, maxMetrics, deadline) + readMetricIDs := atomic.Uint64{} + res, err := api.s.SearchLabelValuesWithFiltersOnTimeRange(qt, sq.AccountID, sq.ProjectID, labelName, tfss, tr, maxLabelValues, maxMetrics, deadline, &readMetricIDs) + readMetricIDsSearchLabelValues.Update(float64(readMetricIDs.Load())) + return res, err } func (api *vmstorageAPI) TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte, @@ -130,7 +147,10 @@ func (api *vmstorageAPI) LabelNames(qt *querytracer.Tracer, sq *storage.SearchQu if err != nil { return nil, err } - return api.s.SearchLabelNamesWithFiltersOnTimeRange(qt, sq.AccountID, sq.ProjectID, tfss, tr, maxLabelNames, maxMetrics, deadline) + readMetricIDs := atomic.Uint64{} + res, err := api.s.SearchLabelNamesWithFiltersOnTimeRange(qt, sq.AccountID, sq.ProjectID, tfss, tr, maxLabelNames, maxMetrics, deadline, &readMetricIDs) + readMetricIDsSearchLabelNames.Update(float64(readMetricIDs.Load())) + return res, err } func (api *vmstorageAPI) SeriesCount(_ *querytracer.Tracer, accountID, projectID uint32, deadline uint64) (uint64, error) { @@ -149,7 +169,11 @@ func (api *vmstorageAPI) TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQu return nil, err } date := uint64(sq.MinTimestamp) / (24 * 3600 * 1000) - return api.s.GetTSDBStatus(qt, sq.AccountID, sq.ProjectID, tfss, date, focusLabel, topN, maxMetrics, deadline) + + readMetricIDs := atomic.Uint64{} + status, err := api.s.GetTSDBStatus(qt, sq.AccountID, sq.ProjectID, tfss, date, focusLabel, topN, maxMetrics, deadline, &readMetricIDs) + readMetricIDsTSDBStatus.Update(float64(readMetricIDs.Load())) + return status, err } func (api *vmstorageAPI) DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (int, error) { @@ -162,7 +186,11 @@ func (api *vmstorageAPI) DeleteSeries(qt *querytracer.Tracer, sq *storage.Search if len(tfss) == 0 { return 0, fmt.Errorf("missing tag filters") } - return api.s.DeleteSeries(qt, tfss, maxMetrics) + + readMetricIDs := atomic.Uint64{} + res, err := api.s.DeleteSeries(qt, tfss, maxMetrics, &readMetricIDs) + readMetricIDsDeleteSeries.Update(float64(readMetricIDs.Load())) + return res, err } func (api *vmstorageAPI) RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, _ uint64) error { diff --git a/docs/changelog/CHANGELOG.md b/docs/changelog/CHANGELOG.md index 76f19c5fe..41e1d7202 100644 --- a/docs/changelog/CHANGELOG.md +++ b/docs/changelog/CHANGELOG.md @@ -18,6 +18,8 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). ## tip +* FEATURE: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): add `vm_series_read_per_query` metric to expose number of series matched per query. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7029) for the details. + * BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl/): drop rows that do not belong to the current series during import. The dropped rows should belong to another series whose tags are a superset of the current series. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7301) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7330). Thanks to @dpedu for reporting and cooperating with the test. * BUGFIX: [vmsingle](https://docs.victoriametrics.com/single-server-victoriametrics/), `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): keep the order of resulting time series when `limit_offset` is applied. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7068). * BUGFIX: [graphite](https://docs.victoriametrics.com/#graphite-render-api-usage): properly handle xFilesFactor=0 for `transformRemoveEmptySeries` function. See [this PR](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7337) for details. diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 333725b8d..79e4061ed 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -640,14 +640,14 @@ var indexItemsPool sync.Pool // SearchLabelNamesWithFiltersOnTimeRange returns all the label names, which match the given tfss on the given tr. func (db *indexDB) SearchLabelNamesWithFiltersOnTimeRange(qt *querytracer.Tracer, accountID, projectID uint32, tfss []*TagFilters, tr TimeRange, - maxLabelNames, maxMetrics int, deadline uint64) ([]string, error) { + maxLabelNames, maxMetrics int, deadline uint64, readMetricIDs *atomic.Uint64) ([]string, error) { qt = qt.NewChild("search for label names: filters=%s, timeRange=%s, maxLabelNames=%d, maxMetrics=%d", tfss, &tr, maxLabelNames, maxMetrics) defer qt.Done() lns := make(map[string]struct{}) qtChild := qt.NewChild("search for label names in the current indexdb") is := db.getIndexSearch(accountID, projectID, deadline) - err := is.searchLabelNamesWithFiltersOnTimeRange(qtChild, lns, tfss, tr, maxLabelNames, maxMetrics) + err := is.searchLabelNamesWithFiltersOnTimeRange(qtChild, lns, tfss, tr, maxLabelNames, maxMetrics, readMetricIDs) db.putIndexSearch(is) qtChild.Donef("found %d label names", len(lns)) if err != nil { @@ -658,7 +658,7 @@ func (db *indexDB) SearchLabelNamesWithFiltersOnTimeRange(qt *querytracer.Tracer qtChild := qt.NewChild("search for label names in the previous indexdb") lnsLen := len(lns) is := extDB.getIndexSearch(accountID, projectID, deadline) - err = is.searchLabelNamesWithFiltersOnTimeRange(qtChild, lns, tfss, tr, maxLabelNames, maxMetrics) + err = is.searchLabelNamesWithFiltersOnTimeRange(qtChild, lns, tfss, tr, maxLabelNames, maxMetrics, readMetricIDs) extDB.putIndexSearch(is) qtChild.Donef("found %d additional label names", len(lns)-lnsLen) }) @@ -675,12 +675,12 @@ func (db *indexDB) SearchLabelNamesWithFiltersOnTimeRange(qt *querytracer.Tracer return labelNames, nil } -func (is *indexSearch) searchLabelNamesWithFiltersOnTimeRange(qt *querytracer.Tracer, lns map[string]struct{}, tfss []*TagFilters, tr TimeRange, maxLabelNames, maxMetrics int) error { +func (is *indexSearch) searchLabelNamesWithFiltersOnTimeRange(qt *querytracer.Tracer, lns map[string]struct{}, tfss []*TagFilters, tr TimeRange, maxLabelNames, maxMetrics int, readMetricIDs *atomic.Uint64) error { minDate := uint64(tr.MinTimestamp) / msecPerDay maxDate := uint64(tr.MaxTimestamp-1) / msecPerDay if maxDate == 0 || minDate > maxDate || maxDate-minDate > maxDaysForPerDaySearch { qtChild := qt.NewChild("search for label names in global index: filters=%s", tfss) - err := is.searchLabelNamesWithFiltersOnDate(qtChild, lns, tfss, 0, maxLabelNames, maxMetrics) + err := is.searchLabelNamesWithFiltersOnDate(qtChild, lns, tfss, 0, maxLabelNames, maxMetrics, readMetricIDs) qtChild.Done() return err } @@ -698,7 +698,7 @@ func (is *indexSearch) searchLabelNamesWithFiltersOnTimeRange(qt *querytracer.Tr }() lnsLocal := make(map[string]struct{}) isLocal := is.db.getIndexSearch(is.accountID, is.projectID, is.deadline) - err := isLocal.searchLabelNamesWithFiltersOnDate(qtChild, lnsLocal, tfss, date, maxLabelNames, maxMetrics) + err := isLocal.searchLabelNamesWithFiltersOnDate(qtChild, lnsLocal, tfss, date, maxLabelNames, maxMetrics, readMetricIDs) is.db.putIndexSearch(isLocal) mu.Lock() defer mu.Unlock() @@ -723,8 +723,8 @@ func (is *indexSearch) searchLabelNamesWithFiltersOnTimeRange(qt *querytracer.Tr return errGlobal } -func (is *indexSearch) searchLabelNamesWithFiltersOnDate(qt *querytracer.Tracer, lns map[string]struct{}, tfss []*TagFilters, date uint64, maxLabelNames, maxMetrics int) error { - filter, err := is.searchMetricIDsWithFiltersOnDate(qt, tfss, date, maxMetrics) +func (is *indexSearch) searchLabelNamesWithFiltersOnDate(qt *querytracer.Tracer, lns map[string]struct{}, tfss []*TagFilters, date uint64, maxLabelNames, maxMetrics int, readMetricIDs *atomic.Uint64) error { + filter, err := is.searchMetricIDsWithFiltersOnDate(qt, tfss, date, maxMetrics, readMetricIDs) if err != nil { return err } @@ -987,14 +987,14 @@ func (is *indexSearch) searchTenantsOnDate(tenants map[string]struct{}, date uin // SearchLabelValuesWithFiltersOnTimeRange returns label values for the given labelName, tfss and tr. func (db *indexDB) SearchLabelValuesWithFiltersOnTimeRange(qt *querytracer.Tracer, accountID, projectID uint32, labelName string, tfss []*TagFilters, tr TimeRange, - maxLabelValues, maxMetrics int, deadline uint64) ([]string, error) { + maxLabelValues, maxMetrics int, deadline uint64, readMetricIDs *atomic.Uint64) ([]string, error) { qt = qt.NewChild("search for label values: labelName=%q, filters=%s, timeRange=%s, maxLabelNames=%d, maxMetrics=%d", labelName, tfss, &tr, maxLabelValues, maxMetrics) defer qt.Done() lvs := make(map[string]struct{}) qtChild := qt.NewChild("search for label values in the current indexdb") is := db.getIndexSearch(accountID, projectID, deadline) - err := is.searchLabelValuesWithFiltersOnTimeRange(qtChild, lvs, labelName, tfss, tr, maxLabelValues, maxMetrics) + err := is.searchLabelValuesWithFiltersOnTimeRange(qtChild, lvs, labelName, tfss, tr, maxLabelValues, maxMetrics, readMetricIDs) db.putIndexSearch(is) qtChild.Donef("found %d label values", len(lvs)) if err != nil { @@ -1004,7 +1004,7 @@ func (db *indexDB) SearchLabelValuesWithFiltersOnTimeRange(qt *querytracer.Trace qtChild := qt.NewChild("search for label values in the previous indexdb") lvsLen := len(lvs) is := extDB.getIndexSearch(accountID, projectID, deadline) - err = is.searchLabelValuesWithFiltersOnTimeRange(qtChild, lvs, labelName, tfss, tr, maxLabelValues, maxMetrics) + err = is.searchLabelValuesWithFiltersOnTimeRange(qtChild, lvs, labelName, tfss, tr, maxLabelValues, maxMetrics, readMetricIDs) extDB.putIndexSearch(is) qtChild.Donef("found %d additional label values", len(lvs)-lvsLen) }) @@ -1027,12 +1027,12 @@ func (db *indexDB) SearchLabelValuesWithFiltersOnTimeRange(qt *querytracer.Trace } func (is *indexSearch) searchLabelValuesWithFiltersOnTimeRange(qt *querytracer.Tracer, lvs map[string]struct{}, labelName string, tfss []*TagFilters, - tr TimeRange, maxLabelValues, maxMetrics int) error { + tr TimeRange, maxLabelValues, maxMetrics int, readMetricIDs *atomic.Uint64) error { minDate := uint64(tr.MinTimestamp) / msecPerDay maxDate := uint64(tr.MaxTimestamp-1) / msecPerDay if maxDate == 0 || minDate > maxDate || maxDate-minDate > maxDaysForPerDaySearch { qtChild := qt.NewChild("search for label values in global index: labelName=%q, filters=%s", labelName, tfss) - err := is.searchLabelValuesWithFiltersOnDate(qtChild, lvs, labelName, tfss, 0, maxLabelValues, maxMetrics) + err := is.searchLabelValuesWithFiltersOnDate(qtChild, lvs, labelName, tfss, 0, maxLabelValues, maxMetrics, readMetricIDs) qtChild.Done() return err } @@ -1050,7 +1050,7 @@ func (is *indexSearch) searchLabelValuesWithFiltersOnTimeRange(qt *querytracer.T }() lvsLocal := make(map[string]struct{}) isLocal := is.db.getIndexSearch(is.accountID, is.projectID, is.deadline) - err := isLocal.searchLabelValuesWithFiltersOnDate(qtChild, lvsLocal, labelName, tfss, date, maxLabelValues, maxMetrics) + err := isLocal.searchLabelValuesWithFiltersOnDate(qtChild, lvsLocal, labelName, tfss, date, maxLabelValues, maxMetrics, readMetricIDs) is.db.putIndexSearch(isLocal) mu.Lock() defer mu.Unlock() @@ -1076,8 +1076,8 @@ func (is *indexSearch) searchLabelValuesWithFiltersOnTimeRange(qt *querytracer.T } func (is *indexSearch) searchLabelValuesWithFiltersOnDate(qt *querytracer.Tracer, lvs map[string]struct{}, labelName string, tfss []*TagFilters, - date uint64, maxLabelValues, maxMetrics int) error { - filter, err := is.searchMetricIDsWithFiltersOnDate(qt, tfss, date, maxMetrics) + date uint64, maxLabelValues, maxMetrics int, readMetricIDs *atomic.Uint64) error { + filter, err := is.searchMetricIDsWithFiltersOnDate(qt, tfss, date, maxMetrics, readMetricIDs) if err != nil { return err } @@ -1417,11 +1417,11 @@ func (is *indexSearch) getSeriesCount() (uint64, error) { } // GetTSDBStatus returns topN entries for tsdb status for the given tfss, date and focusLabel. -func (db *indexDB) GetTSDBStatus(qt *querytracer.Tracer, accountID, projectID uint32, tfss []*TagFilters, date uint64, focusLabel string, topN, maxMetrics int, deadline uint64) (*TSDBStatus, error) { +func (db *indexDB) GetTSDBStatus(qt *querytracer.Tracer, accountID, projectID uint32, tfss []*TagFilters, date uint64, focusLabel string, topN, maxMetrics int, deadline uint64, readMetricIDs *atomic.Uint64) (*TSDBStatus, error) { qtChild := qt.NewChild("collect tsdb stats in the current indexdb") is := db.getIndexSearch(accountID, projectID, deadline) - status, err := is.getTSDBStatus(qtChild, tfss, date, focusLabel, topN, maxMetrics) + status, err := is.getTSDBStatus(qtChild, tfss, date, focusLabel, topN, maxMetrics, readMetricIDs) qtChild.Done() db.putIndexSearch(is) if err != nil { @@ -1433,7 +1433,7 @@ func (db *indexDB) GetTSDBStatus(qt *querytracer.Tracer, accountID, projectID ui db.doExtDB(func(extDB *indexDB) { qtChild := qt.NewChild("collect tsdb stats in the previous indexdb") is := extDB.getIndexSearch(accountID, projectID, deadline) - status, err = is.getTSDBStatus(qtChild, tfss, date, focusLabel, topN, maxMetrics) + status, err = is.getTSDBStatus(qtChild, tfss, date, focusLabel, topN, maxMetrics, readMetricIDs) qtChild.Done() extDB.putIndexSearch(is) }) @@ -1444,8 +1444,8 @@ func (db *indexDB) GetTSDBStatus(qt *querytracer.Tracer, accountID, projectID ui } // getTSDBStatus returns topN entries for tsdb status for the given tfss, date and focusLabel. -func (is *indexSearch) getTSDBStatus(qt *querytracer.Tracer, tfss []*TagFilters, date uint64, focusLabel string, topN, maxMetrics int) (*TSDBStatus, error) { - filter, err := is.searchMetricIDsWithFiltersOnDate(qt, tfss, date, maxMetrics) +func (is *indexSearch) getTSDBStatus(qt *querytracer.Tracer, tfss []*TagFilters, date uint64, focusLabel string, topN, maxMetrics int, readMetricIDs *atomic.Uint64) (*TSDBStatus, error) { + filter, err := is.searchMetricIDsWithFiltersOnDate(qt, tfss, date, maxMetrics, readMetricIDs) if err != nil { return nil, err } @@ -1723,7 +1723,7 @@ func (db *indexDB) searchMetricNameWithCache(dst []byte, metricID uint64, accoun // If the number of the series exceeds maxMetrics, no series will be deleted and // an error will be returned. Otherwise, the funciton returns the number of // series deleted. -func (db *indexDB) DeleteTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, maxMetrics int) (int, error) { +func (db *indexDB) DeleteTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, maxMetrics int, readMetricIDs *atomic.Uint64) (int, error) { qt = qt.NewChild("deleting series for %s", tfss) defer qt.Done() if len(tfss) == 0 { @@ -1736,7 +1736,7 @@ func (db *indexDB) DeleteTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, maxMe MaxTimestamp: (1 << 63) - 1, } is := db.getIndexSearch(tfss[0].accountID, tfss[0].projectID, noDeadline) - metricIDs, err := is.searchMetricIDs(qt, tfss, tr, maxMetrics) + metricIDs, err := is.searchMetricIDs(qt, tfss, tr, maxMetrics, readMetricIDs) db.putIndexSearch(is) if err != nil { return 0, err @@ -1748,7 +1748,7 @@ func (db *indexDB) DeleteTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, maxMe db.doExtDB(func(extDB *indexDB) { var n int qtChild := qt.NewChild("deleting series from the previos indexdb") - n, err = extDB.DeleteTSIDs(qtChild, tfss, maxMetrics) + n, err = extDB.DeleteTSIDs(qtChild, tfss, maxMetrics, readMetricIDs) qtChild.Donef("deleted %d series", n) deletedCount += n }) @@ -1827,7 +1827,7 @@ func (is *indexSearch) loadDeletedMetricIDs() (*uint64set.Set, error) { // searchMetricIDs returns metricIDs for the given tfss and tr. // // The returned metricIDs are sorted. -func (db *indexDB) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]uint64, error) { +func (db *indexDB) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64, readMetricIDs *atomic.Uint64) ([]uint64, error) { qt = qt.NewChild("search for matching metricIDs: filters=%s, timeRange=%s", tfss, &tr) defer qt.Done() @@ -1851,7 +1851,7 @@ func (db *indexDB) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, t accountID := tfss[0].accountID projectID := tfss[0].projectID is := db.getIndexSearch(accountID, projectID, deadline) - localMetricIDs, err := is.searchMetricIDs(qtChild, tfss, tr, maxMetrics) + localMetricIDs, err := is.searchMetricIDs(qtChild, tfss, tr, maxMetrics, readMetricIDs) db.putIndexSearch(is) if err != nil { return nil, fmt.Errorf("error when searching for metricIDs in the current indexdb: %w", err) @@ -1874,7 +1874,7 @@ func (db *indexDB) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, t return } is := extDB.getIndexSearch(accountID, projectID, deadline) - extMetricIDs, err = is.searchMetricIDs(qtChild, tfss, tr, maxMetrics) + extMetricIDs, err = is.searchMetricIDs(qtChild, tfss, tr, maxMetrics, readMetricIDs) extDB.putIndexSearch(is) extDB.putMetricIDsToTagFiltersCache(qtChild, extMetricIDs, tfKeyExtBuf.B) }) @@ -2373,7 +2373,7 @@ func matchTagFilters(mn *MetricName, tfs []*tagFilter, kb *bytesutil.ByteBuffer) return true, nil } -func (is *indexSearch) searchMetricIDsWithFiltersOnDate(qt *querytracer.Tracer, tfss []*TagFilters, date uint64, maxMetrics int) (*uint64set.Set, error) { +func (is *indexSearch) searchMetricIDsWithFiltersOnDate(qt *querytracer.Tracer, tfss []*TagFilters, date uint64, maxMetrics int, readMetricIDs *atomic.Uint64) (*uint64set.Set, error) { if len(tfss) == 0 { return nil, nil } @@ -2385,7 +2385,7 @@ func (is *indexSearch) searchMetricIDsWithFiltersOnDate(qt *querytracer.Tracer, // Search for metricIDs on the whole time range. tr.MaxTimestamp = timestampFromTime(time.Now()) } - metricIDs, err := is.searchMetricIDsInternal(qt, tfss, tr, maxMetrics) + metricIDs, err := is.searchMetricIDsInternal(qt, tfss, tr, maxMetrics, readMetricIDs) if err != nil { return nil, err } @@ -2395,8 +2395,8 @@ func (is *indexSearch) searchMetricIDsWithFiltersOnDate(qt *querytracer.Tracer, // searchMetricIDs returns metricIDs for the given tfss and tr. // // The returned metricIDs are sorted. -func (is *indexSearch) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]uint64, error) { - metricIDs, err := is.searchMetricIDsInternal(qt, tfss, tr, maxMetrics) +func (is *indexSearch) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, readMetricIDs *atomic.Uint64) ([]uint64, error) { + metricIDs, err := is.searchMetricIDsInternal(qt, tfss, tr, maxMetrics, readMetricIDs) if err != nil { return nil, err } @@ -2431,7 +2431,7 @@ func errTooManyTimeseries(maxMetrics int) error { "see https://docs.victoriametrics.com/#resource-usage-limits", maxMetrics) } -func (is *indexSearch) searchMetricIDsInternal(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int) (*uint64set.Set, error) { +func (is *indexSearch) searchMetricIDsInternal(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, readMetricIDs *atomic.Uint64) (*uint64set.Set, error) { qt = qt.NewChild("search for metric ids: filters=%s, timeRange=%s, maxMetrics=%d", tfss, &tr, maxMetrics) defer qt.Done() @@ -2459,6 +2459,7 @@ func (is *indexSearch) searchMetricIDsInternal(qt *querytracer.Tracer, tfss []*T prevMetricIDsLen := metricIDs.Len() err := is.updateMetricIDsForTagFilters(qtChild, metricIDs, tfs, tr, maxMetrics+1) qtChild.Donef("updated %d metric ids", metricIDs.Len()-prevMetricIDsLen) + readMetricIDs.Add(uint64(metricIDs.Len() - prevMetricIDsLen)) if err != nil { return nil, err } diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index 9c6e5c10e..621a5fa01 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -790,7 +790,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, ten } // Test SearchLabelValuesWithFiltersOnTimeRange - lvs, err := db.SearchLabelValuesWithFiltersOnTimeRange(nil, mn.AccountID, mn.ProjectID, "__name__", nil, TimeRange{}, 1e5, 1e9, noDeadline) + lvs, err := db.SearchLabelValuesWithFiltersOnTimeRange(nil, mn.AccountID, mn.ProjectID, "__name__", nil, TimeRange{}, 1e5, 1e9, noDeadline, &atomic.Uint64{}) if err != nil { return fmt.Errorf("error in SearchLabelValuesWithFiltersOnTimeRange(labelName=%q): %w", "__name__", err) } @@ -804,7 +804,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, ten } for i := range mn.Tags { tag := &mn.Tags[i] - lvs, err := db.SearchLabelValuesWithFiltersOnTimeRange(nil, mn.AccountID, mn.ProjectID, string(tag.Key), nil, TimeRange{}, 1e5, 1e9, noDeadline) + lvs, err := db.SearchLabelValuesWithFiltersOnTimeRange(nil, mn.AccountID, mn.ProjectID, string(tag.Key), nil, TimeRange{}, 1e5, 1e9, noDeadline, &atomic.Uint64{}) if err != nil { return fmt.Errorf("error in SearchLabelValuesWithFiltersOnTimeRange(labelName=%q): %w", tag.Key, err) } @@ -817,7 +817,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, ten // Test SearchLabelNamesWithFiltersOnTimeRange (empty filters, global time range) for k, labelNames := range allLabelNames { - lns, err := db.SearchLabelNamesWithFiltersOnTimeRange(nil, k.AccountID, k.ProjectID, nil, TimeRange{}, 1e5, 1e9, noDeadline) + lns, err := db.SearchLabelNamesWithFiltersOnTimeRange(nil, k.AccountID, k.ProjectID, nil, TimeRange{}, 1e5, 1e9, noDeadline, &atomic.Uint64{}) if err != nil { return fmt.Errorf("error in SearchLabelNamesWithFiltersOnTimeRange: %w", err) } @@ -1090,7 +1090,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, ten } func searchTSIDsInTest(db *indexDB, tfss []*TagFilters, tr TimeRange) ([]TSID, error) { - metricIDs, err := db.searchMetricIDs(nil, tfss, tr, 1e5, noDeadline) + metricIDs, err := db.searchMetricIDs(nil, tfss, tr, 1e5, noDeadline, &atomic.Uint64{}) if err != nil { return nil, err } @@ -1817,7 +1817,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { MinTimestamp: int64(now) - msecPerDay, MaxTimestamp: int64(now), } - lns, err := db.SearchLabelNamesWithFiltersOnTimeRange(nil, accountID, projectID, nil, tr, 10000, 1e9, noDeadline) + lns, err := db.SearchLabelNamesWithFiltersOnTimeRange(nil, accountID, projectID, nil, tr, 10000, 1e9, noDeadline, &atomic.Uint64{}) if err != nil { t.Fatalf("unexpected error in SearchLabelNamesWithFiltersOnTimeRange(timeRange=%s): %s", &tr, err) } @@ -1827,7 +1827,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { } // Check SearchLabelValuesWithFiltersOnTimeRange with the specified time range. - lvs, err := db.SearchLabelValuesWithFiltersOnTimeRange(nil, accountID, projectID, "", nil, tr, 10000, 1e9, noDeadline) + lvs, err := db.SearchLabelValuesWithFiltersOnTimeRange(nil, accountID, projectID, "", nil, tr, 10000, 1e9, noDeadline, &atomic.Uint64{}) if err != nil { t.Fatalf("unexpected error in SearchLabelValuesWithFiltersOnTimeRange(timeRange=%s): %s", &tr, err) } @@ -1864,7 +1864,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { } // Check SearchLabelNamesWithFiltersOnTimeRange with the specified filter. - lns, err = db.SearchLabelNamesWithFiltersOnTimeRange(nil, accountID, projectID, []*TagFilters{tfs}, TimeRange{}, 10000, 1e9, noDeadline) + lns, err = db.SearchLabelNamesWithFiltersOnTimeRange(nil, accountID, projectID, []*TagFilters{tfs}, TimeRange{}, 10000, 1e9, noDeadline, &atomic.Uint64{}) if err != nil { t.Fatalf("unexpected error in SearchLabelNamesWithFiltersOnTimeRange(filters=%s): %s", tfs, err) } @@ -1874,7 +1874,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { } // Check SearchLabelNamesWithFiltersOnTimeRange with the specified filter and time range. - lns, err = db.SearchLabelNamesWithFiltersOnTimeRange(nil, accountID, projectID, []*TagFilters{tfs}, tr, 10000, 1e9, noDeadline) + lns, err = db.SearchLabelNamesWithFiltersOnTimeRange(nil, accountID, projectID, []*TagFilters{tfs}, tr, 10000, 1e9, noDeadline, &atomic.Uint64{}) if err != nil { t.Fatalf("unexpected error in SearchLabelNamesWithFiltersOnTimeRange(filters=%s, timeRange=%s): %s", tfs, &tr, err) } @@ -1884,7 +1884,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { } // Check SearchLabelNamesWithFiltersOnTimeRange with filters on metric name and time range. - lns, err = db.SearchLabelNamesWithFiltersOnTimeRange(nil, accountID, projectID, []*TagFilters{tfsMetricName}, tr, 10000, 1e9, noDeadline) + lns, err = db.SearchLabelNamesWithFiltersOnTimeRange(nil, accountID, projectID, []*TagFilters{tfsMetricName}, tr, 10000, 1e9, noDeadline, &atomic.Uint64{}) if err != nil { t.Fatalf("unexpected error in SearchLabelNamesWithFiltersOnTimeRange(filters=%s, timeRange=%s): %s", tfs, &tr, err) } @@ -1894,7 +1894,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { } // Check SearchLabelValuesWithFiltersOnTimeRange with the specified filter. - lvs, err = db.SearchLabelValuesWithFiltersOnTimeRange(nil, accountID, projectID, "", []*TagFilters{tfs}, TimeRange{}, 10000, 1e9, noDeadline) + lvs, err = db.SearchLabelValuesWithFiltersOnTimeRange(nil, accountID, projectID, "", []*TagFilters{tfs}, TimeRange{}, 10000, 1e9, noDeadline, &atomic.Uint64{}) if err != nil { t.Fatalf("unexpected error in SearchLabelValuesWithFiltersOnTimeRange(filters=%s): %s", tfs, err) } @@ -1904,7 +1904,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { } // Check SearchLabelValuesWithFiltersOnTimeRange with the specified filter and time range. - lvs, err = db.SearchLabelValuesWithFiltersOnTimeRange(nil, accountID, projectID, "", []*TagFilters{tfs}, tr, 10000, 1e9, noDeadline) + lvs, err = db.SearchLabelValuesWithFiltersOnTimeRange(nil, accountID, projectID, "", []*TagFilters{tfs}, tr, 10000, 1e9, noDeadline, &atomic.Uint64{}) if err != nil { t.Fatalf("unexpected error in SearchLabelValuesWithFiltersOnTimeRange(filters=%s, timeRange=%s): %s", tfs, &tr, err) } @@ -1914,7 +1914,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { } // Check SearchLabelValuesWithFiltersOnTimeRange with filters on metric name and time range. - lvs, err = db.SearchLabelValuesWithFiltersOnTimeRange(nil, accountID, projectID, "", []*TagFilters{tfsMetricName}, tr, 10000, 1e9, noDeadline) + lvs, err = db.SearchLabelValuesWithFiltersOnTimeRange(nil, accountID, projectID, "", []*TagFilters{tfsMetricName}, tr, 10000, 1e9, noDeadline, &atomic.Uint64{}) if err != nil { t.Fatalf("unexpected error in SearchLabelValuesWithFiltersOnTimeRange(filters=%s, timeRange=%s): %s", tfs, &tr, err) } @@ -1938,7 +1938,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { } // Check GetTSDBStatus with nil filters. - status, err := db.GetTSDBStatus(nil, accountID, projectID, nil, baseDate, "day", 5, 1e6, noDeadline) + status, err := db.GetTSDBStatus(nil, accountID, projectID, nil, baseDate, "day", 5, 1e6, noDeadline, &atomic.Uint64{}) if err != nil { t.Fatalf("error in GetTSDBStatus with nil filters: %s", err) } @@ -2052,7 +2052,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { if err := tfs.Add([]byte("day"), []byte("0"), false, false); err != nil { t.Fatalf("cannot add filter: %s", err) } - status, err = db.GetTSDBStatus(nil, accountID, projectID, []*TagFilters{tfs}, baseDate, "", 5, 1e6, noDeadline) + status, err = db.GetTSDBStatus(nil, accountID, projectID, []*TagFilters{tfs}, baseDate, "", 5, 1e6, noDeadline, &atomic.Uint64{}) if err != nil { t.Fatalf("error in GetTSDBStatus: %s", err) } @@ -2078,7 +2078,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { } // Check GetTSDBStatus with non-nil filter, which matches all the series on a global time range - status, err = db.GetTSDBStatus(nil, accountID, projectID, nil, 0, "day", 5, 1e6, noDeadline) + status, err = db.GetTSDBStatus(nil, accountID, projectID, nil, 0, "day", 5, 1e6, noDeadline, &atomic.Uint64{}) if err != nil { t.Fatalf("error in GetTSDBStatus: %s", err) } @@ -2133,7 +2133,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { if err := tfs.Add([]byte("UniqueId"), []byte("0|1|3"), false, true); err != nil { t.Fatalf("cannot add filter: %s", err) } - status, err = db.GetTSDBStatus(nil, accountID, projectID, []*TagFilters{tfs}, baseDate, "", 5, 1e6, noDeadline) + status, err = db.GetTSDBStatus(nil, accountID, projectID, []*TagFilters{tfs}, baseDate, "", 5, 1e6, noDeadline, &atomic.Uint64{}) if err != nil { t.Fatalf("error in GetTSDBStatus: %s", err) } @@ -2159,7 +2159,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { } // Check GetTSDBStatus with non-nil filter on global time range, which matches only 15 series - status, err = db.GetTSDBStatus(nil, accountID, projectID, []*TagFilters{tfs}, 0, "", 5, 1e6, noDeadline) + status, err = db.GetTSDBStatus(nil, accountID, projectID, []*TagFilters{tfs}, 0, "", 5, 1e6, noDeadline, &atomic.Uint64{}) if err != nil { t.Fatalf("error in GetTSDBStatus: %s", err) } diff --git a/lib/storage/index_db_timing_test.go b/lib/storage/index_db_timing_test.go index 29e28cf7b..b55ea6f0f 100644 --- a/lib/storage/index_db_timing_test.go +++ b/lib/storage/index_db_timing_test.go @@ -146,7 +146,7 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) { } for i := 0; i < b.N; i++ { is := db.getIndexSearch(tfs.accountID, tfs.projectID, noDeadline) - metricIDs, err := is.searchMetricIDs(nil, tfss, tr, 2e9) + metricIDs, err := is.searchMetricIDs(nil, tfss, tr, 2e9, &atomic.Uint64{}) db.putIndexSearch(is) if err != nil { b.Fatalf("unexpected error in searchMetricIDs: %s", err) diff --git a/lib/storage/search.go b/lib/storage/search.go index b24940ad4..d9292cd41 100644 --- a/lib/storage/search.go +++ b/lib/storage/search.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "strings" + "sync/atomic" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" @@ -177,7 +178,7 @@ func (s *Search) reset() { // MustClose must be called when the search is done. // // Init returns the upper bound on the number of found time series. -func (s *Search) Init(qt *querytracer.Tracer, storage *Storage, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) int { +func (s *Search) Init(qt *querytracer.Tracer, storage *Storage, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64, readMetricIDs *atomic.Uint64) int { qt = qt.NewChild("init series search: filters=%s, timeRange=%s", tfss, &tr) defer qt.Done() if s.needClosing { @@ -194,7 +195,7 @@ func (s *Search) Init(qt *querytracer.Tracer, storage *Storage, tfss []*TagFilte s.needClosing = true var tsids []TSID - metricIDs, err := s.idb.searchMetricIDs(qt, tfss, tr, maxMetrics, deadline) + metricIDs, err := s.idb.searchMetricIDs(qt, tfss, tr, maxMetrics, deadline, readMetricIDs) if err == nil && len(metricIDs) > 0 && len(tfss) > 0 { accountID := tfss[0].accountID projectID := tfss[0].projectID diff --git a/lib/storage/search_test.go b/lib/storage/search_test.go index 29b371a61..89f611a60 100644 --- a/lib/storage/search_test.go +++ b/lib/storage/search_test.go @@ -8,6 +8,7 @@ import ( "reflect" "regexp" "sort" + "sync/atomic" "testing" "testing/quick" "time" @@ -204,7 +205,7 @@ func testSearchInternal(st *Storage, tr TimeRange, mrs []MetricRow, accountsCoun } // Search - s.Init(nil, st, []*TagFilters{tfs}, tr, 1e5, noDeadline) + s.Init(nil, st, []*TagFilters{tfs}, tr, 1e5, noDeadline, &atomic.Uint64{}) var mbs []metricBlock for s.NextMetricBlock() { var b Block diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 0581282be..7c24883d1 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -1197,11 +1197,11 @@ func nextRetentionDeadlineSeconds(atSecs, retentionSecs, offsetSecs int64) int64 // SearchMetricNames returns marshaled metric names matching the given tfss on the given tr. // // The marshaled metric names must be unmarshaled via MetricName.UnmarshalString(). -func (s *Storage) SearchMetricNames(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]string, error) { +func (s *Storage) SearchMetricNames(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64, readMetricIDs *atomic.Uint64) ([]string, error) { qt = qt.NewChild("search for matching metric names: filters=%s, timeRange=%s", tfss, &tr) defer qt.Done() - metricIDs, err := s.idb().searchMetricIDs(qt, tfss, tr, maxMetrics, deadline) + metricIDs, err := s.idb().searchMetricIDs(qt, tfss, tr, maxMetrics, deadline, readMetricIDs) if err != nil { return nil, err } @@ -1337,8 +1337,8 @@ var ErrDeadlineExceeded = fmt.Errorf("deadline exceeded") // If the number of the series exceeds maxMetrics, no series will be deleted and // an error will be returned. Otherwise, the funciton returns the number of // metrics deleted. -func (s *Storage) DeleteSeries(qt *querytracer.Tracer, tfss []*TagFilters, maxMetrics int) (int, error) { - deletedCount, err := s.idb().DeleteTSIDs(qt, tfss, maxMetrics) +func (s *Storage) DeleteSeries(qt *querytracer.Tracer, tfss []*TagFilters, maxMetrics int, readMetricIDs *atomic.Uint64) (int, error) { + deletedCount, err := s.idb().DeleteTSIDs(qt, tfss, maxMetrics, readMetricIDs) if err != nil { return deletedCount, fmt.Errorf("cannot delete tsids: %w", err) } @@ -1352,14 +1352,14 @@ func (s *Storage) DeleteSeries(qt *querytracer.Tracer, tfss []*TagFilters, maxMe // SearchLabelNamesWithFiltersOnTimeRange searches for label names matching the given tfss on tr. func (s *Storage) SearchLabelNamesWithFiltersOnTimeRange(qt *querytracer.Tracer, accountID, projectID uint32, tfss []*TagFilters, tr TimeRange, - maxLabelNames, maxMetrics int, deadline uint64, + maxLabelNames, maxMetrics int, deadline uint64, readMetricIDs *atomic.Uint64, ) ([]string, error) { - return s.idb().SearchLabelNamesWithFiltersOnTimeRange(qt, accountID, projectID, tfss, tr, maxLabelNames, maxMetrics, deadline) + return s.idb().SearchLabelNamesWithFiltersOnTimeRange(qt, accountID, projectID, tfss, tr, maxLabelNames, maxMetrics, deadline, readMetricIDs) } // SearchLabelValuesWithFiltersOnTimeRange searches for label values for the given labelName, filters and tr. func (s *Storage) SearchLabelValuesWithFiltersOnTimeRange(qt *querytracer.Tracer, accountID, projectID uint32, labelName string, tfss []*TagFilters, - tr TimeRange, maxLabelValues, maxMetrics int, deadline uint64, + tr TimeRange, maxLabelValues, maxMetrics int, deadline uint64, readMetricIDs *atomic.Uint64, ) ([]string, error) { idb := s.idb() @@ -1371,7 +1371,7 @@ func (s *Storage) SearchLabelValuesWithFiltersOnTimeRange(qt *querytracer.Tracer // tfss contains only a single filter on labelName. It is faster searching for label values // without any filters and limits and then later applying the filter and the limit to the found label values. qt.Printf("search for up to %d values for the label %q on the time range %s", maxMetrics, labelName, &tr) - lvs, err := idb.SearchLabelValuesWithFiltersOnTimeRange(qt, accountID, projectID, labelName, nil, tr, maxMetrics, maxMetrics, deadline) + lvs, err := idb.SearchLabelValuesWithFiltersOnTimeRange(qt, accountID, projectID, labelName, nil, tr, maxMetrics, maxMetrics, deadline, readMetricIDs) if err != nil { return nil, err } @@ -1394,7 +1394,7 @@ func (s *Storage) SearchLabelValuesWithFiltersOnTimeRange(qt *querytracer.Tracer qt.Printf("fall back to slow search because only a subset of label values is found") } - return idb.SearchLabelValuesWithFiltersOnTimeRange(qt, accountID, projectID, labelName, tfss, tr, maxLabelValues, maxMetrics, deadline) + return idb.SearchLabelValuesWithFiltersOnTimeRange(qt, accountID, projectID, labelName, tfss, tr, maxLabelValues, maxMetrics, deadline, readMetricIDs) } func filterLabelValues(accountID, projectID uint32, lvs []string, tf *tagFilter, key string) []string { @@ -1616,8 +1616,8 @@ func (s *Storage) SearchTenants(qt *querytracer.Tracer, tr TimeRange, deadline u } // GetTSDBStatus returns TSDB status data for /api/v1/status/tsdb -func (s *Storage) GetTSDBStatus(qt *querytracer.Tracer, accountID, projectID uint32, tfss []*TagFilters, date uint64, focusLabel string, topN, maxMetrics int, deadline uint64) (*TSDBStatus, error) { - return s.idb().GetTSDBStatus(qt, accountID, projectID, tfss, date, focusLabel, topN, maxMetrics, deadline) +func (s *Storage) GetTSDBStatus(qt *querytracer.Tracer, accountID, projectID uint32, tfss []*TagFilters, date uint64, focusLabel string, topN, maxMetrics int, deadline uint64, readMetricIDs *atomic.Uint64) (*TSDBStatus, error) { + return s.idb().GetTSDBStatus(qt, accountID, projectID, tfss, date, focusLabel, topN, maxMetrics, deadline, readMetricIDs) } // MetricRow is a metric to insert into storage. diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index d1ead9084..51d004bd2 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -8,6 +8,7 @@ import ( "reflect" "sort" "sync" + "sync/atomic" "testing" "testing/quick" "time" @@ -671,7 +672,7 @@ func TestStorageDeleteSeries(t *testing.T) { s := MustOpenStorage(path, 0, 0, 0) // Verify no label names exist - lns, err := s.SearchLabelNamesWithFiltersOnTimeRange(nil, 0, 0, nil, TimeRange{}, 1e5, 1e9, noDeadline) + lns, err := s.SearchLabelNamesWithFiltersOnTimeRange(nil, 0, 0, nil, TimeRange{}, 1e5, 1e9, noDeadline, &atomic.Uint64{}) if err != nil { t.Fatalf("error in SearchLabelNamesWithFiltersOnTimeRange() at the start: %s", err) } @@ -720,7 +721,7 @@ func TestStorageDeleteSeries(t *testing.T) { }) // Verify no more tag keys exist - lns, err = s.SearchLabelNamesWithFiltersOnTimeRange(nil, 0, 0, nil, TimeRange{}, 1e5, 1e9, noDeadline) + lns, err = s.SearchLabelNamesWithFiltersOnTimeRange(nil, 0, 0, nil, TimeRange{}, 1e5, 1e9, noDeadline, &atomic.Uint64{}) if err != nil { t.Fatalf("error in SearchLabelNamesWithFiltersOnTimeRange after the test: %s", err) } @@ -779,7 +780,7 @@ func testStorageDeleteSeries(s *Storage, workerNum int) error { s.DebugFlush() // Verify tag values exist - tvs, err := s.SearchLabelValuesWithFiltersOnTimeRange(nil, accountID, projectID, string(workerTag), nil, TimeRange{}, 1e5, 1e9, noDeadline) + tvs, err := s.SearchLabelValuesWithFiltersOnTimeRange(nil, accountID, projectID, string(workerTag), nil, TimeRange{}, 1e5, 1e9, noDeadline, &atomic.Uint64{}) if err != nil { return fmt.Errorf("error in SearchLabelValuesWithFiltersOnTimeRange before metrics removal: %w", err) } @@ -788,7 +789,7 @@ func testStorageDeleteSeries(s *Storage, workerNum int) error { } // Verify tag keys exist - lns, err := s.SearchLabelNamesWithFiltersOnTimeRange(nil, accountID, projectID, nil, TimeRange{}, 1e5, 1e9, noDeadline) + lns, err := s.SearchLabelNamesWithFiltersOnTimeRange(nil, accountID, projectID, nil, TimeRange{}, 1e5, 1e9, noDeadline, &atomic.Uint64{}) if err != nil { return fmt.Errorf("error in SearchLabelNamesWithFiltersOnTimeRange before metrics removal: %w", err) } @@ -804,7 +805,7 @@ func testStorageDeleteSeries(s *Storage, workerNum int) error { metricBlocksCount := func(tfs *TagFilters) int { // Verify the number of blocks n := 0 - sr.Init(nil, s, []*TagFilters{tfs}, tr, 1e5, noDeadline) + sr.Init(nil, s, []*TagFilters{tfs}, tr, 1e5, noDeadline, &atomic.Uint64{}) for sr.NextMetricBlock() { n++ } @@ -823,7 +824,7 @@ func testStorageDeleteSeries(s *Storage, workerNum int) error { if n := metricBlocksCount(tfs); n == 0 { return fmt.Errorf("expecting non-zero number of metric blocks for tfs=%s", tfs) } - deletedCount, err := s.DeleteSeries(nil, []*TagFilters{tfs}, 1e9) + deletedCount, err := s.DeleteSeries(nil, []*TagFilters{tfs}, 1e9, &atomic.Uint64{}) if err != nil { return fmt.Errorf("cannot delete metrics: %w", err) } @@ -835,7 +836,7 @@ func testStorageDeleteSeries(s *Storage, workerNum int) error { } // Try deleting empty tfss - deletedCount, err = s.DeleteSeries(nil, nil, 1e9) + deletedCount, err = s.DeleteSeries(nil, nil, 1e9, &atomic.Uint64{}) if err != nil { return fmt.Errorf("cannot delete empty tfss: %w", err) } @@ -852,7 +853,7 @@ func testStorageDeleteSeries(s *Storage, workerNum int) error { if n := metricBlocksCount(tfs); n != 0 { return fmt.Errorf("expecting zero metric blocks after deleting all the metrics; got %d blocks", n) } - tvs, err = s.SearchLabelValuesWithFiltersOnTimeRange(nil, accountID, projectID, string(workerTag), nil, TimeRange{}, 1e5, 1e9, noDeadline) + tvs, err = s.SearchLabelValuesWithFiltersOnTimeRange(nil, accountID, projectID, string(workerTag), nil, TimeRange{}, 1e5, 1e9, noDeadline, &atomic.Uint64{}) if err != nil { return fmt.Errorf("error in SearchLabelValuesWithFiltersOnTimeRange after all the metrics are removed: %w", err) } @@ -905,7 +906,7 @@ func TestStorageDeleteSeries_TooManyTimeseries(t *testing.T) { t.Fatalf("unexpected error in TagFilters.Add: %v", err) } maxSeries := numSeries - 1 - count, err := s.DeleteSeries(nil, []*TagFilters{tfs}, maxSeries) + count, err := s.DeleteSeries(nil, []*TagFilters{tfs}, maxSeries, &atomic.Uint64{}) if err == nil { t.Errorf("expected an error but there hasn't been one") } @@ -956,7 +957,7 @@ func TestStorageDeleteSeries_CachesAreUpdatedOrReset(t *testing.T) { // cache is still empty. s.AddRows([]MetricRow{mr}, defaultPrecisionBits) s.DebugFlush() - gotMetrics, err := s.SearchMetricNames(nil, tfss, tr, 1, noDeadline) + gotMetrics, err := s.SearchMetricNames(nil, tfss, tr, 1, noDeadline, &atomic.Uint64{}) if err != nil { t.Fatalf("SearchMetricNames() failed unexpectedly: %v", err) } @@ -980,7 +981,7 @@ func TestStorageDeleteSeries_CachesAreUpdatedOrReset(t *testing.T) { // Delete the metric added earlier and ensure that the tsidCache and // tagFiltersToMetricIDsCache have been reset and the deletedMetricIDs // cache is now contains ID of the deleted metric. - numDeletedSeries, err := s.DeleteSeries(nil, tfss, 1) + numDeletedSeries, err := s.DeleteSeries(nil, tfss, 1, &atomic.Uint64{}) if err != nil { t.Fatalf("DeleteSeries() failed unexpectedly: %v", err) } @@ -1084,7 +1085,7 @@ func testStorageRegisterMetricNames(s *Storage) error { "instance", "job", } - lns, err := s.SearchLabelNamesWithFiltersOnTimeRange(nil, accountID, projectID, nil, TimeRange{}, 100, 1e9, noDeadline) + lns, err := s.SearchLabelNamesWithFiltersOnTimeRange(nil, accountID, projectID, nil, TimeRange{}, 100, 1e9, noDeadline, &atomic.Uint64{}) if err != nil { return fmt.Errorf("error in SearchLabelNamesWithFiltersOnTimeRange: %w", err) } @@ -1094,7 +1095,7 @@ func testStorageRegisterMetricNames(s *Storage) error { } // Verify that SearchLabelNamesWithFiltersOnTimeRange returns empty results for incorrect accountID, projectID - lns, err = s.SearchLabelNamesWithFiltersOnTimeRange(nil, accountID+1, projectID+1, nil, TimeRange{}, 100, 1e9, noDeadline) + lns, err = s.SearchLabelNamesWithFiltersOnTimeRange(nil, accountID+1, projectID+1, nil, TimeRange{}, 100, 1e9, noDeadline, &atomic.Uint64{}) if err != nil { return fmt.Errorf("error in SearchTagKeys for incorrect accountID, projectID: %w", err) } @@ -1110,7 +1111,7 @@ func testStorageRegisterMetricNames(s *Storage) error { MinTimestamp: start, MaxTimestamp: end, } - lns, err = s.SearchLabelNamesWithFiltersOnTimeRange(nil, accountID, projectID, nil, tr, 100, 1e9, noDeadline) + lns, err = s.SearchLabelNamesWithFiltersOnTimeRange(nil, accountID, projectID, nil, tr, 100, 1e9, noDeadline, &atomic.Uint64{}) if err != nil { return fmt.Errorf("error in SearchLabelNamesWithFiltersOnTimeRange: %w", err) } @@ -1120,7 +1121,7 @@ func testStorageRegisterMetricNames(s *Storage) error { } // Verify that SearchLabelNamesWithFiltersOnTimeRange with the specified time range returns empty results for incrorrect accountID, projectID - lns, err = s.SearchLabelNamesWithFiltersOnTimeRange(nil, accountID+1, projectID+1, nil, tr, 100, 1e9, noDeadline) + lns, err = s.SearchLabelNamesWithFiltersOnTimeRange(nil, accountID+1, projectID+1, nil, tr, 100, 1e9, noDeadline, &atomic.Uint64{}) if err != nil { return fmt.Errorf("error in SearchTagKeysOnTimeRange for incorrect accountID, projectID: %w", err) } @@ -1129,7 +1130,7 @@ func testStorageRegisterMetricNames(s *Storage) error { } // Verify that SearchLabelValuesWithFiltersOnTimeRange returns correct result. - addIDs, err := s.SearchLabelValuesWithFiltersOnTimeRange(nil, accountID, projectID, "add_id", nil, TimeRange{}, addsCount+100, 1e9, noDeadline) + addIDs, err := s.SearchLabelValuesWithFiltersOnTimeRange(nil, accountID, projectID, "add_id", nil, TimeRange{}, addsCount+100, 1e9, noDeadline, &atomic.Uint64{}) if err != nil { return fmt.Errorf("error in SearchLabelValuesWithFiltersOnTimeRange: %w", err) } @@ -1139,7 +1140,7 @@ func testStorageRegisterMetricNames(s *Storage) error { } // Verify that SearchLabelValuesWithFiltersOnTimeRange return empty results for incorrect accountID, projectID - addIDs, err = s.SearchLabelValuesWithFiltersOnTimeRange(nil, accountID+1, projectID+1, "add_id", nil, TimeRange{}, addsCount+100, 1e9, noDeadline) + addIDs, err = s.SearchLabelValuesWithFiltersOnTimeRange(nil, accountID+1, projectID+1, "add_id", nil, TimeRange{}, addsCount+100, 1e9, noDeadline, &atomic.Uint64{}) if err != nil { return fmt.Errorf("error in SearchTagValues for incorrect accountID, projectID: %w", err) } @@ -1148,7 +1149,7 @@ func testStorageRegisterMetricNames(s *Storage) error { } // Verify that SearchLabelValuesWithFiltersOnTimeRange with the specified time range returns correct result. - addIDs, err = s.SearchLabelValuesWithFiltersOnTimeRange(nil, accountID, projectID, "add_id", nil, tr, addsCount+100, 1e9, noDeadline) + addIDs, err = s.SearchLabelValuesWithFiltersOnTimeRange(nil, accountID, projectID, "add_id", nil, tr, addsCount+100, 1e9, noDeadline, &atomic.Uint64{}) if err != nil { return fmt.Errorf("error in SearchLabelValuesWithFiltersOnTimeRange: %w", err) } @@ -1158,7 +1159,7 @@ func testStorageRegisterMetricNames(s *Storage) error { } // Verify that SearchLabelValuesWithFiltersOnTimeRange returns empty results for incorrect accountID, projectID - addIDs, err = s.SearchLabelValuesWithFiltersOnTimeRange(nil, accountID+1, projectID+1, "addd_id", nil, tr, addsCount+100, 1e9, noDeadline) + addIDs, err = s.SearchLabelValuesWithFiltersOnTimeRange(nil, accountID+1, projectID+1, "addd_id", nil, tr, addsCount+100, 1e9, noDeadline, &atomic.Uint64{}) if err != nil { return fmt.Errorf("error in SearchLabelValuesWithFiltersOnTimeRange for incorrect accoundID, projectID: %w", err) } @@ -1171,7 +1172,7 @@ func testStorageRegisterMetricNames(s *Storage) error { if err := tfs.Add([]byte("add_id"), []byte("0"), false, false); err != nil { return fmt.Errorf("unexpected error in TagFilters.Add: %w", err) } - metricNames, err := s.SearchMetricNames(nil, []*TagFilters{tfs}, tr, metricsPerAdd*addsCount*100+100, noDeadline) + metricNames, err := s.SearchMetricNames(nil, []*TagFilters{tfs}, tr, metricsPerAdd*addsCount*100+100, noDeadline, &atomic.Uint64{}) if err != nil { return fmt.Errorf("error in SearchMetricNames: %w", err) } @@ -1198,7 +1199,7 @@ func testStorageRegisterMetricNames(s *Storage) error { if err := tfs.Add([]byte("add_id"), []byte("0"), false, false); err != nil { return fmt.Errorf("unexpected error in TagFilters.Add: %w", err) } - metricNames, err = s.SearchMetricNames(nil, []*TagFilters{tfs}, tr, metricsPerAdd*addsCount*100+100, noDeadline) + metricNames, err = s.SearchMetricNames(nil, []*TagFilters{tfs}, tr, metricsPerAdd*addsCount*100+100, noDeadline, &atomic.Uint64{}) if err != nil { return fmt.Errorf("error in SearchMetricNames for incorrect accountID, projectID: %w", err) } @@ -1475,7 +1476,7 @@ func testCountAllMetricNamesNoExtDB(tfss *TagFilters, is *indexSearch, tr TimeRa if err := tfss.Add([]byte("__name__"), []byte(".*"), false, true); err != nil { panic(fmt.Sprintf("unexpected error in TagFilters.Add: %v", err)) } - metricIDs, err := is.searchMetricIDs(nil, []*TagFilters{tfss}, tr, 1e9) + metricIDs, err := is.searchMetricIDs(nil, []*TagFilters{tfss}, tr, 1e9, &atomic.Uint64{}) if err != nil { panic(fmt.Sprintf("searchMetricIDs failed unexpectedly: %v", err)) } @@ -1718,7 +1719,7 @@ func testCountAllMetricNames(s *Storage, accountID, projectID uint32, tr TimeRan if err := tfsAll.Add([]byte("__name__"), []byte(".*"), false, true); err != nil { panic(fmt.Sprintf("unexpected error in TagFilters.Add: %v", err)) } - names, err := s.SearchMetricNames(nil, []*TagFilters{tfsAll}, tr, 1e9, noDeadline) + names, err := s.SearchMetricNames(nil, []*TagFilters{tfsAll}, tr, 1e9, noDeadline, &atomic.Uint64{}) if err != nil { panic(fmt.Sprintf("SeachMetricNames() failed unexpectedly: %v", err)) } @@ -1775,7 +1776,7 @@ func TestStorageSearchMetricNames_TooManyTimeseries(t *testing.T) { tfss = append(tfss, tfs) } - names, err := s.SearchMetricNames(nil, tfss, opts.tr, opts.maxMetrics, noDeadline) + names, err := s.SearchMetricNames(nil, tfss, opts.tr, opts.maxMetrics, noDeadline, &atomic.Uint64{}) gotErr := err != nil if gotErr != opts.wantErr { t.Errorf("SeachMetricNames(%v, %v, %d): unexpected error: got %v, want error to happen %v", []any{ @@ -1949,7 +1950,8 @@ func testCountAllMetricIDs(s *Storage, tr TimeRange) int { if err := tfsAll.Add([]byte("__name__"), []byte(".*"), false, true); err != nil { panic(fmt.Sprintf("unexpected error in TagFilters.Add: %v", err)) } - ids, err := s.idb().searchMetricIDs(nil, []*TagFilters{tfsAll}, tr, 1e9, noDeadline) + readMetricIDs := atomic.Uint64{} + ids, err := s.idb().searchMetricIDs(nil, []*TagFilters{tfsAll}, tr, 1e9, noDeadline, &readMetricIDs) if err != nil { panic(fmt.Sprintf("seachMetricIDs() failed unexpectedly: %s", err)) } @@ -2254,7 +2256,7 @@ func assertCounts(t *testing.T, s *Storage, want *counts, strict bool) { for date, wantStatus := range want.dateTSDBStatuses { dt := time.UnixMilli(int64(date) * msecPerDay).UTC() - gotStatus, err := s.GetTSDBStatus(nil, 0, 0, nil, date, "", 10, 1e6, noDeadline) + gotStatus, err := s.GetTSDBStatus(nil, 0, 0, nil, date, "", 10, 1e6, noDeadline, &atomic.Uint64{}) if err != nil { t.Fatalf("GetTSDBStatus(%v) failed unexpectedly: %v", dt, err) }