From ba514284f152aa4ab30207e49be0774da1ea973d Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 27 Jun 2022 12:53:46 +0300 Subject: [PATCH] lib/storage: add querytracer to more contexts querytracer has been added to the following storage.Storage methods: - RegisterMetricNames - DeleteMetrics - SearchTagValueSuffixes - SearchGraphitePaths --- app/vmselect/graphite/tags_api.go | 2 +- app/vmselect/netstorage/netstorage.go | 22 +++++++++++----------- app/vmstorage/main.go | 16 ++++++++-------- lib/storage/index_db.go | 19 +++++++++++++++---- lib/storage/storage.go | 25 ++++++++++++++----------- lib/storage/storage_test.go | 8 ++++---- 6 files changed, 53 insertions(+), 39 deletions(-) diff --git a/app/vmselect/graphite/tags_api.go b/app/vmselect/graphite/tags_api.go index 2606ec063..e1e43cf79 100644 --- a/app/vmselect/graphite/tags_api.go +++ b/app/vmselect/graphite/tags_api.go @@ -129,7 +129,7 @@ func registerMetrics(startTime time.Time, w http.ResponseWriter, r *http.Request mr.MetricNameRaw = storage.MarshalMetricNameRaw(mr.MetricNameRaw[:0], labels) mr.Timestamp = ct } - if err := vmstorage.RegisterMetricNames(mrs); err != nil { + if err := vmstorage.RegisterMetricNames(nil, mrs); err != nil { return fmt.Errorf("cannot register paths: %w", err) } diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 25d1c2524..f0de6c690 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -604,11 +604,11 @@ func DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear MinTimestamp: sq.MinTimestamp, MaxTimestamp: sq.MaxTimestamp, } - tfss, err := setupTfss(tr, sq.TagFilterss, sq.MaxMetrics, deadline) + tfss, err := setupTfss(qt, tr, sq.TagFilterss, sq.MaxMetrics, deadline) if err != nil { return 0, err } - return vmstorage.DeleteMetrics(tfss) + return vmstorage.DeleteMetrics(qt, tfss) } // LabelNames returns label names matching the given sq until the given deadline. @@ -625,7 +625,7 @@ func LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLabelNames i MinTimestamp: sq.MinTimestamp, MaxTimestamp: sq.MaxTimestamp, } - tfss, err := setupTfss(tr, sq.TagFilterss, sq.MaxMetrics, deadline) + tfss, err := setupTfss(qt, tr, sq.TagFilterss, sq.MaxMetrics, deadline) if err != nil { return nil, err } @@ -701,7 +701,7 @@ func LabelValues(qt *querytracer.Tracer, labelName string, sq *storage.SearchQue MinTimestamp: sq.MinTimestamp, MaxTimestamp: sq.MaxTimestamp, } - tfss, err := setupTfss(tr, sq.TagFilterss, sq.MaxMetrics, deadline) + tfss, err := setupTfss(qt, tr, sq.TagFilterss, sq.MaxMetrics, deadline) if err != nil { return nil, err } @@ -751,7 +751,7 @@ func TagValueSuffixes(qt *querytracer.Tracer, tr storage.TimeRange, tagKey, tagV if deadline.Exceeded() { return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) } - suffixes, err := vmstorage.SearchTagValueSuffixes(tr, []byte(tagKey), []byte(tagValuePrefix), delimiter, *maxTagValueSuffixesPerSearch, deadline.Deadline()) + suffixes, err := vmstorage.SearchTagValueSuffixes(qt, tr, []byte(tagKey), []byte(tagValuePrefix), delimiter, *maxTagValueSuffixesPerSearch, deadline.Deadline()) if err != nil { return nil, fmt.Errorf("error during search for suffixes for tagKey=%q, tagValuePrefix=%q, delimiter=%c on time range %s: %w", tagKey, tagValuePrefix, delimiter, tr.String(), err) @@ -777,7 +777,7 @@ func TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery, focusLabel stri MinTimestamp: sq.MinTimestamp, MaxTimestamp: sq.MaxTimestamp, } - tfss, err := setupTfss(tr, sq.TagFilterss, sq.MaxMetrics, deadline) + tfss, err := setupTfss(qt, tr, sq.TagFilterss, sq.MaxMetrics, deadline) if err != nil { return nil, err } @@ -837,7 +837,7 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear if err := vmstorage.CheckTimeRange(tr); err != nil { return err } - tfss, err := setupTfss(tr, sq.TagFilterss, sq.MaxMetrics, deadline) + tfss, err := setupTfss(qt, tr, sq.TagFilterss, sq.MaxMetrics, deadline) if err != nil { return err } @@ -951,7 +951,7 @@ func SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline if err := vmstorage.CheckTimeRange(tr); err != nil { return nil, err } - tfss, err := setupTfss(tr, sq.TagFilterss, sq.MaxMetrics, deadline) + tfss, err := setupTfss(qt, tr, sq.TagFilterss, sq.MaxMetrics, deadline) if err != nil { return nil, err } @@ -981,7 +981,7 @@ func ProcessSearchQuery(qt *querytracer.Tracer, sq *storage.SearchQuery, fetchDa if err := vmstorage.CheckTimeRange(tr); err != nil { return nil, err } - tfss, err := setupTfss(tr, sq.TagFilterss, sq.MaxMetrics, deadline) + tfss, err := setupTfss(qt, tr, sq.TagFilterss, sq.MaxMetrics, deadline) if err != nil { return nil, err } @@ -1074,7 +1074,7 @@ type blockRef struct { addr tmpBlockAddr } -func setupTfss(tr storage.TimeRange, tagFilterss [][]storage.TagFilter, maxMetrics int, deadline searchutils.Deadline) ([]*storage.TagFilters, error) { +func setupTfss(qt *querytracer.Tracer, tr storage.TimeRange, tagFilterss [][]storage.TagFilter, maxMetrics int, deadline searchutils.Deadline) ([]*storage.TagFilters, error) { tfss := make([]*storage.TagFilters, 0, len(tagFilterss)) for _, tagFilters := range tagFilterss { tfs := storage.NewTagFilters() @@ -1082,7 +1082,7 @@ func setupTfss(tr storage.TimeRange, tagFilterss [][]storage.TagFilter, maxMetri tf := &tagFilters[i] if string(tf.Key) == "__graphite__" { query := tf.Value - paths, err := vmstorage.SearchGraphitePaths(tr, query, maxMetrics, deadline.Deadline()) + paths, err := vmstorage.SearchGraphitePaths(qt, tr, query, maxMetrics, deadline.Deadline()) if err != nil { return nil, fmt.Errorf("error when searching for Graphite paths for query %q: %w", query, err) } diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index b59713629..552e45883 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -155,9 +155,9 @@ func AddRows(mrs []storage.MetricRow) error { var errReadOnly = errors.New("the storage is in read-only mode; check -storage.minFreeDiskSpaceBytes command-line flag value") // RegisterMetricNames registers all the metrics from mrs in the storage. -func RegisterMetricNames(mrs []storage.MetricRow) error { +func RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow) error { WG.Add(1) - err := Storage.RegisterMetricNames(mrs) + err := Storage.RegisterMetricNames(qt, mrs) WG.Done() return err } @@ -165,9 +165,9 @@ func RegisterMetricNames(mrs []storage.MetricRow) error { // DeleteMetrics deletes metrics matching tfss. // // Returns the number of deleted metrics. -func DeleteMetrics(tfss []*storage.TagFilters) (int, error) { +func DeleteMetrics(qt *querytracer.Tracer, tfss []*storage.TagFilters) (int, error) { WG.Add(1) - n, err := Storage.DeleteMetrics(tfss) + n, err := Storage.DeleteMetrics(qt, tfss) WG.Done() return n, err } @@ -200,17 +200,17 @@ func SearchLabelValuesWithFiltersOnTimeRange(qt *querytracer.Tracer, labelName s // SearchTagValueSuffixes returns all the tag value suffixes for the given tagKey and tagValuePrefix on the given tr. // // This allows implementing https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find or similar APIs. -func SearchTagValueSuffixes(tr storage.TimeRange, tagKey, tagValuePrefix []byte, delimiter byte, maxTagValueSuffixes int, deadline uint64) ([]string, error) { +func SearchTagValueSuffixes(qt *querytracer.Tracer, tr storage.TimeRange, tagKey, tagValuePrefix []byte, delimiter byte, maxTagValueSuffixes int, deadline uint64) ([]string, error) { WG.Add(1) - suffixes, err := Storage.SearchTagValueSuffixes(tr, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes, deadline) + suffixes, err := Storage.SearchTagValueSuffixes(qt, tr, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes, deadline) WG.Done() return suffixes, err } // SearchGraphitePaths returns all the metric names matching the given Graphite query. -func SearchGraphitePaths(tr storage.TimeRange, query []byte, maxPaths int, deadline uint64) ([]string, error) { +func SearchGraphitePaths(qt *querytracer.Tracer, tr storage.TimeRange, query []byte, maxPaths int, deadline uint64) ([]string, error) { WG.Add(1) - paths, err := Storage.SearchGraphitePaths(tr, query, maxPaths, deadline) + paths, err := Storage.SearchGraphitePaths(qt, tr, query, maxPaths, deadline) WG.Done() return paths, err } diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index d02e85db1..17860c8fd 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -1038,7 +1038,11 @@ func (is *indexSearch) searchLabelValuesWithFiltersOnDate(qt *querytracer.Tracer // This allows implementing https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find or similar APIs. // // If it returns maxTagValueSuffixes suffixes, then it is likely more than maxTagValueSuffixes suffixes is found. -func (db *indexDB) SearchTagValueSuffixes(tr TimeRange, tagKey, tagValuePrefix []byte, delimiter byte, maxTagValueSuffixes int, deadline uint64) ([]string, error) { +func (db *indexDB) SearchTagValueSuffixes(qt *querytracer.Tracer, tr TimeRange, tagKey, tagValuePrefix []byte, delimiter byte, maxTagValueSuffixes int, deadline uint64) ([]string, error) { + qt = qt.NewChild("search tag value suffixes for timeRange=%s, tagKey=%q, tagValuePrefix=%q, delimiter=%c, maxTagValueSuffixes=%d", + &tr, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes) + defer qt.Done() + // TODO: cache results? tvss := make(map[string]struct{}) @@ -1051,7 +1055,9 @@ func (db *indexDB) SearchTagValueSuffixes(tr TimeRange, tagKey, tagValuePrefix [ if len(tvss) < maxTagValueSuffixes { ok := db.doExtDB(func(extDB *indexDB) { is := extDB.getIndexSearch(deadline) + qtChild := qt.NewChild("search tag value suffixes in the previous indexdb") err = is.searchTagValueSuffixesForTimeRange(tvss, tr, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes) + qtChild.Done() extDB.putIndexSearch(is) }) if ok && err != nil { @@ -1068,6 +1074,7 @@ func (db *indexDB) SearchTagValueSuffixes(tr TimeRange, tagKey, tagValuePrefix [ suffixes = suffixes[:maxTagValueSuffixes] } // Do not sort suffixes, since they must be sorted by vmselect. + qt.Printf("found %d suffixes", len(suffixes)) return suffixes, nil } @@ -1557,7 +1564,9 @@ func (db *indexDB) searchMetricNameWithCache(dst []byte, metricID uint64) ([]byt // The caller must reset all the caches which may contain the deleted TSIDs. // // Returns the number of metrics deleted. -func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) { +func (db *indexDB) DeleteTSIDs(qt *querytracer.Tracer, tfss []*TagFilters) (int, error) { + qt = qt.NewChild("deleting series for %s", tfss) + defer qt.Done() if len(tfss) == 0 { return 0, nil } @@ -1568,7 +1577,7 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) { MaxTimestamp: (1 << 63) - 1, } is := db.getIndexSearch(noDeadline) - metricIDs, err := is.searchMetricIDs(nil, tfss, tr, 2e9) + metricIDs, err := is.searchMetricIDs(qt, tfss, tr, 2e9) db.putIndexSearch(is) if err != nil { return 0, err @@ -1581,7 +1590,9 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) { deletedCount := len(metricIDs) if db.doExtDB(func(extDB *indexDB) { var n int - n, err = extDB.DeleteTSIDs(tfss) + qtChild := qt.NewChild("deleting series from the previos indexdb") + n, err = extDB.DeleteTSIDs(qtChild, tfss) + qtChild.Donef("deleted %d series", n) deletedCount += n }) { if err != nil { diff --git a/lib/storage/storage.go b/lib/storage/storage.go index cde974619..79faef092 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -1271,8 +1271,8 @@ var ErrDeadlineExceeded = fmt.Errorf("deadline exceeded") // DeleteMetrics deletes all the metrics matching the given tfss. // // Returns the number of metrics deleted. -func (s *Storage) DeleteMetrics(tfss []*TagFilters) (int, error) { - deletedCount, err := s.idb().DeleteTSIDs(tfss) +func (s *Storage) DeleteMetrics(qt *querytracer.Tracer, tfss []*TagFilters) (int, error) { + deletedCount, err := s.idb().DeleteTSIDs(qt, tfss) if err != nil { return deletedCount, fmt.Errorf("cannot delete tsids: %w", err) } @@ -1301,14 +1301,15 @@ func (s *Storage) SearchLabelValuesWithFiltersOnTimeRange(qt *querytracer.Tracer // This allows implementing https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find or similar APIs. // // If more than maxTagValueSuffixes suffixes is found, then only the first maxTagValueSuffixes suffixes is returned. -func (s *Storage) SearchTagValueSuffixes(tr TimeRange, tagKey, tagValuePrefix []byte, delimiter byte, maxTagValueSuffixes int, deadline uint64) ([]string, error) { - return s.idb().SearchTagValueSuffixes(tr, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes, deadline) +func (s *Storage) SearchTagValueSuffixes(qt *querytracer.Tracer, tr TimeRange, tagKey, tagValuePrefix []byte, + delimiter byte, maxTagValueSuffixes int, deadline uint64) ([]string, error) { + return s.idb().SearchTagValueSuffixes(qt, tr, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes, deadline) } // SearchGraphitePaths returns all the matching paths for the given graphite query on the given tr. -func (s *Storage) SearchGraphitePaths(tr TimeRange, query []byte, maxPaths int, deadline uint64) ([]string, error) { +func (s *Storage) SearchGraphitePaths(qt *querytracer.Tracer, tr TimeRange, query []byte, maxPaths int, deadline uint64) ([]string, error) { query = replaceAlternateRegexpsWithGraphiteWildcards(query) - return s.searchGraphitePaths(tr, nil, query, maxPaths, deadline) + return s.searchGraphitePaths(qt, tr, nil, query, maxPaths, deadline) } // replaceAlternateRegexpsWithGraphiteWildcards replaces (foo|..|bar) with {foo,...,bar} in b and returns the new value. @@ -1353,12 +1354,12 @@ func replaceAlternateRegexpsWithGraphiteWildcards(b []byte) []byte { } } -func (s *Storage) searchGraphitePaths(tr TimeRange, qHead, qTail []byte, maxPaths int, deadline uint64) ([]string, error) { +func (s *Storage) searchGraphitePaths(qt *querytracer.Tracer, tr TimeRange, qHead, qTail []byte, maxPaths int, deadline uint64) ([]string, error) { n := bytes.IndexAny(qTail, "*[{") if n < 0 { // Verify that qHead matches a metric name. qHead = append(qHead, qTail...) - suffixes, err := s.SearchTagValueSuffixes(tr, nil, qHead, '.', 1, deadline) + suffixes, err := s.SearchTagValueSuffixes(qt, tr, nil, qHead, '.', 1, deadline) if err != nil { return nil, err } @@ -1373,7 +1374,7 @@ func (s *Storage) searchGraphitePaths(tr TimeRange, qHead, qTail []byte, maxPath return []string{string(qHead)}, nil } qHead = append(qHead, qTail[:n]...) - suffixes, err := s.SearchTagValueSuffixes(tr, nil, qHead, '.', maxPaths, deadline) + suffixes, err := s.SearchTagValueSuffixes(qt, tr, nil, qHead, '.', maxPaths, deadline) if err != nil { return nil, err } @@ -1410,7 +1411,7 @@ func (s *Storage) searchGraphitePaths(tr TimeRange, qHead, qTail []byte, maxPath continue } qHead = append(qHead[:qHeadLen], suffix...) - ps, err := s.searchGraphitePaths(tr, qHead, qTail, maxPaths, deadline) + ps, err := s.searchGraphitePaths(qt, tr, qHead, qTail, maxPaths, deadline) if err != nil { return nil, err } @@ -1665,7 +1666,9 @@ var ( // // The the MetricRow.Timestamp is used for registering the metric name starting from the given timestamp. // Th MetricRow.Value field is ignored. -func (s *Storage) RegisterMetricNames(mrs []MetricRow) error { +func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) error { + qt = qt.NewChild("registering %d series", len(mrs)) + defer qt.Done() var metricName []byte var genTSID generationTSID mn := GetMetricName() diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index 7460eca3a..b31660122 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -654,7 +654,7 @@ func testStorageDeleteMetrics(s *Storage, workerNum int) error { if n := metricBlocksCount(tfs); n == 0 { return fmt.Errorf("expecting non-zero number of metric blocks for tfs=%s", tfs) } - deletedCount, err := s.DeleteMetrics([]*TagFilters{tfs}) + deletedCount, err := s.DeleteMetrics(nil, []*TagFilters{tfs}) if err != nil { return fmt.Errorf("cannot delete metrics: %w", err) } @@ -666,7 +666,7 @@ func testStorageDeleteMetrics(s *Storage, workerNum int) error { } // Try deleting empty tfss - deletedCount, err = s.DeleteMetrics(nil) + deletedCount, err = s.DeleteMetrics(nil, nil) if err != nil { return fmt.Errorf("cannot delete empty tfss: %w", err) } @@ -783,8 +783,8 @@ func testStorageRegisterMetricNames(s *Storage) error { } mrs = append(mrs, mr) } - if err := s.RegisterMetricNames(mrs); err != nil { - return fmt.Errorf("unexpected error in AddMetrics: %w", err) + if err := s.RegisterMetricNames(nil, mrs); err != nil { + return fmt.Errorf("unexpected error in RegisterMetricNames: %w", err) } } var addIDsExpected []string