mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/storage: add querytracer to more contexts
querytracer has been added to the following storage.Storage methods: - RegisterMetricNames - DeleteMetrics - SearchTagValueSuffixes - SearchGraphitePaths
This commit is contained in:
parent
134751e43e
commit
ba514284f1
6 changed files with 53 additions and 39 deletions
|
@ -129,7 +129,7 @@ func registerMetrics(startTime time.Time, w http.ResponseWriter, r *http.Request
|
|||
mr.MetricNameRaw = storage.MarshalMetricNameRaw(mr.MetricNameRaw[:0], labels)
|
||||
mr.Timestamp = ct
|
||||
}
|
||||
if err := vmstorage.RegisterMetricNames(mrs); err != nil {
|
||||
if err := vmstorage.RegisterMetricNames(nil, mrs); err != nil {
|
||||
return fmt.Errorf("cannot register paths: %w", err)
|
||||
}
|
||||
|
||||
|
|
|
@ -604,11 +604,11 @@ func DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear
|
|||
MinTimestamp: sq.MinTimestamp,
|
||||
MaxTimestamp: sq.MaxTimestamp,
|
||||
}
|
||||
tfss, err := setupTfss(tr, sq.TagFilterss, sq.MaxMetrics, deadline)
|
||||
tfss, err := setupTfss(qt, tr, sq.TagFilterss, sq.MaxMetrics, deadline)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return vmstorage.DeleteMetrics(tfss)
|
||||
return vmstorage.DeleteMetrics(qt, tfss)
|
||||
}
|
||||
|
||||
// LabelNames returns label names matching the given sq until the given deadline.
|
||||
|
@ -625,7 +625,7 @@ func LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLabelNames i
|
|||
MinTimestamp: sq.MinTimestamp,
|
||||
MaxTimestamp: sq.MaxTimestamp,
|
||||
}
|
||||
tfss, err := setupTfss(tr, sq.TagFilterss, sq.MaxMetrics, deadline)
|
||||
tfss, err := setupTfss(qt, tr, sq.TagFilterss, sq.MaxMetrics, deadline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -701,7 +701,7 @@ func LabelValues(qt *querytracer.Tracer, labelName string, sq *storage.SearchQue
|
|||
MinTimestamp: sq.MinTimestamp,
|
||||
MaxTimestamp: sq.MaxTimestamp,
|
||||
}
|
||||
tfss, err := setupTfss(tr, sq.TagFilterss, sq.MaxMetrics, deadline)
|
||||
tfss, err := setupTfss(qt, tr, sq.TagFilterss, sq.MaxMetrics, deadline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -751,7 +751,7 @@ func TagValueSuffixes(qt *querytracer.Tracer, tr storage.TimeRange, tagKey, tagV
|
|||
if deadline.Exceeded() {
|
||||
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
||||
}
|
||||
suffixes, err := vmstorage.SearchTagValueSuffixes(tr, []byte(tagKey), []byte(tagValuePrefix), delimiter, *maxTagValueSuffixesPerSearch, deadline.Deadline())
|
||||
suffixes, err := vmstorage.SearchTagValueSuffixes(qt, tr, []byte(tagKey), []byte(tagValuePrefix), delimiter, *maxTagValueSuffixesPerSearch, deadline.Deadline())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error during search for suffixes for tagKey=%q, tagValuePrefix=%q, delimiter=%c on time range %s: %w",
|
||||
tagKey, tagValuePrefix, delimiter, tr.String(), err)
|
||||
|
@ -777,7 +777,7 @@ func TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery, focusLabel stri
|
|||
MinTimestamp: sq.MinTimestamp,
|
||||
MaxTimestamp: sq.MaxTimestamp,
|
||||
}
|
||||
tfss, err := setupTfss(tr, sq.TagFilterss, sq.MaxMetrics, deadline)
|
||||
tfss, err := setupTfss(qt, tr, sq.TagFilterss, sq.MaxMetrics, deadline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -837,7 +837,7 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear
|
|||
if err := vmstorage.CheckTimeRange(tr); err != nil {
|
||||
return err
|
||||
}
|
||||
tfss, err := setupTfss(tr, sq.TagFilterss, sq.MaxMetrics, deadline)
|
||||
tfss, err := setupTfss(qt, tr, sq.TagFilterss, sq.MaxMetrics, deadline)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -951,7 +951,7 @@ func SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline
|
|||
if err := vmstorage.CheckTimeRange(tr); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tfss, err := setupTfss(tr, sq.TagFilterss, sq.MaxMetrics, deadline)
|
||||
tfss, err := setupTfss(qt, tr, sq.TagFilterss, sq.MaxMetrics, deadline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -981,7 +981,7 @@ func ProcessSearchQuery(qt *querytracer.Tracer, sq *storage.SearchQuery, fetchDa
|
|||
if err := vmstorage.CheckTimeRange(tr); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tfss, err := setupTfss(tr, sq.TagFilterss, sq.MaxMetrics, deadline)
|
||||
tfss, err := setupTfss(qt, tr, sq.TagFilterss, sq.MaxMetrics, deadline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1074,7 +1074,7 @@ type blockRef struct {
|
|||
addr tmpBlockAddr
|
||||
}
|
||||
|
||||
func setupTfss(tr storage.TimeRange, tagFilterss [][]storage.TagFilter, maxMetrics int, deadline searchutils.Deadline) ([]*storage.TagFilters, error) {
|
||||
func setupTfss(qt *querytracer.Tracer, tr storage.TimeRange, tagFilterss [][]storage.TagFilter, maxMetrics int, deadline searchutils.Deadline) ([]*storage.TagFilters, error) {
|
||||
tfss := make([]*storage.TagFilters, 0, len(tagFilterss))
|
||||
for _, tagFilters := range tagFilterss {
|
||||
tfs := storage.NewTagFilters()
|
||||
|
@ -1082,7 +1082,7 @@ func setupTfss(tr storage.TimeRange, tagFilterss [][]storage.TagFilter, maxMetri
|
|||
tf := &tagFilters[i]
|
||||
if string(tf.Key) == "__graphite__" {
|
||||
query := tf.Value
|
||||
paths, err := vmstorage.SearchGraphitePaths(tr, query, maxMetrics, deadline.Deadline())
|
||||
paths, err := vmstorage.SearchGraphitePaths(qt, tr, query, maxMetrics, deadline.Deadline())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error when searching for Graphite paths for query %q: %w", query, err)
|
||||
}
|
||||
|
|
|
@ -155,9 +155,9 @@ func AddRows(mrs []storage.MetricRow) error {
|
|||
var errReadOnly = errors.New("the storage is in read-only mode; check -storage.minFreeDiskSpaceBytes command-line flag value")
|
||||
|
||||
// RegisterMetricNames registers all the metrics from mrs in the storage.
|
||||
func RegisterMetricNames(mrs []storage.MetricRow) error {
|
||||
func RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow) error {
|
||||
WG.Add(1)
|
||||
err := Storage.RegisterMetricNames(mrs)
|
||||
err := Storage.RegisterMetricNames(qt, mrs)
|
||||
WG.Done()
|
||||
return err
|
||||
}
|
||||
|
@ -165,9 +165,9 @@ func RegisterMetricNames(mrs []storage.MetricRow) error {
|
|||
// DeleteMetrics deletes metrics matching tfss.
|
||||
//
|
||||
// Returns the number of deleted metrics.
|
||||
func DeleteMetrics(tfss []*storage.TagFilters) (int, error) {
|
||||
func DeleteMetrics(qt *querytracer.Tracer, tfss []*storage.TagFilters) (int, error) {
|
||||
WG.Add(1)
|
||||
n, err := Storage.DeleteMetrics(tfss)
|
||||
n, err := Storage.DeleteMetrics(qt, tfss)
|
||||
WG.Done()
|
||||
return n, err
|
||||
}
|
||||
|
@ -200,17 +200,17 @@ func SearchLabelValuesWithFiltersOnTimeRange(qt *querytracer.Tracer, labelName s
|
|||
// SearchTagValueSuffixes returns all the tag value suffixes for the given tagKey and tagValuePrefix on the given tr.
|
||||
//
|
||||
// This allows implementing https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find or similar APIs.
|
||||
func SearchTagValueSuffixes(tr storage.TimeRange, tagKey, tagValuePrefix []byte, delimiter byte, maxTagValueSuffixes int, deadline uint64) ([]string, error) {
|
||||
func SearchTagValueSuffixes(qt *querytracer.Tracer, tr storage.TimeRange, tagKey, tagValuePrefix []byte, delimiter byte, maxTagValueSuffixes int, deadline uint64) ([]string, error) {
|
||||
WG.Add(1)
|
||||
suffixes, err := Storage.SearchTagValueSuffixes(tr, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes, deadline)
|
||||
suffixes, err := Storage.SearchTagValueSuffixes(qt, tr, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes, deadline)
|
||||
WG.Done()
|
||||
return suffixes, err
|
||||
}
|
||||
|
||||
// SearchGraphitePaths returns all the metric names matching the given Graphite query.
|
||||
func SearchGraphitePaths(tr storage.TimeRange, query []byte, maxPaths int, deadline uint64) ([]string, error) {
|
||||
func SearchGraphitePaths(qt *querytracer.Tracer, tr storage.TimeRange, query []byte, maxPaths int, deadline uint64) ([]string, error) {
|
||||
WG.Add(1)
|
||||
paths, err := Storage.SearchGraphitePaths(tr, query, maxPaths, deadline)
|
||||
paths, err := Storage.SearchGraphitePaths(qt, tr, query, maxPaths, deadline)
|
||||
WG.Done()
|
||||
return paths, err
|
||||
}
|
||||
|
|
|
@ -1038,7 +1038,11 @@ func (is *indexSearch) searchLabelValuesWithFiltersOnDate(qt *querytracer.Tracer
|
|||
// This allows implementing https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find or similar APIs.
|
||||
//
|
||||
// If it returns maxTagValueSuffixes suffixes, then it is likely more than maxTagValueSuffixes suffixes is found.
|
||||
func (db *indexDB) SearchTagValueSuffixes(tr TimeRange, tagKey, tagValuePrefix []byte, delimiter byte, maxTagValueSuffixes int, deadline uint64) ([]string, error) {
|
||||
func (db *indexDB) SearchTagValueSuffixes(qt *querytracer.Tracer, tr TimeRange, tagKey, tagValuePrefix []byte, delimiter byte, maxTagValueSuffixes int, deadline uint64) ([]string, error) {
|
||||
qt = qt.NewChild("search tag value suffixes for timeRange=%s, tagKey=%q, tagValuePrefix=%q, delimiter=%c, maxTagValueSuffixes=%d",
|
||||
&tr, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes)
|
||||
defer qt.Done()
|
||||
|
||||
// TODO: cache results?
|
||||
|
||||
tvss := make(map[string]struct{})
|
||||
|
@ -1051,7 +1055,9 @@ func (db *indexDB) SearchTagValueSuffixes(tr TimeRange, tagKey, tagValuePrefix [
|
|||
if len(tvss) < maxTagValueSuffixes {
|
||||
ok := db.doExtDB(func(extDB *indexDB) {
|
||||
is := extDB.getIndexSearch(deadline)
|
||||
qtChild := qt.NewChild("search tag value suffixes in the previous indexdb")
|
||||
err = is.searchTagValueSuffixesForTimeRange(tvss, tr, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes)
|
||||
qtChild.Done()
|
||||
extDB.putIndexSearch(is)
|
||||
})
|
||||
if ok && err != nil {
|
||||
|
@ -1068,6 +1074,7 @@ func (db *indexDB) SearchTagValueSuffixes(tr TimeRange, tagKey, tagValuePrefix [
|
|||
suffixes = suffixes[:maxTagValueSuffixes]
|
||||
}
|
||||
// Do not sort suffixes, since they must be sorted by vmselect.
|
||||
qt.Printf("found %d suffixes", len(suffixes))
|
||||
return suffixes, nil
|
||||
}
|
||||
|
||||
|
@ -1557,7 +1564,9 @@ func (db *indexDB) searchMetricNameWithCache(dst []byte, metricID uint64) ([]byt
|
|||
// The caller must reset all the caches which may contain the deleted TSIDs.
|
||||
//
|
||||
// Returns the number of metrics deleted.
|
||||
func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) {
|
||||
func (db *indexDB) DeleteTSIDs(qt *querytracer.Tracer, tfss []*TagFilters) (int, error) {
|
||||
qt = qt.NewChild("deleting series for %s", tfss)
|
||||
defer qt.Done()
|
||||
if len(tfss) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
@ -1568,7 +1577,7 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) {
|
|||
MaxTimestamp: (1 << 63) - 1,
|
||||
}
|
||||
is := db.getIndexSearch(noDeadline)
|
||||
metricIDs, err := is.searchMetricIDs(nil, tfss, tr, 2e9)
|
||||
metricIDs, err := is.searchMetricIDs(qt, tfss, tr, 2e9)
|
||||
db.putIndexSearch(is)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
|
@ -1581,7 +1590,9 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) {
|
|||
deletedCount := len(metricIDs)
|
||||
if db.doExtDB(func(extDB *indexDB) {
|
||||
var n int
|
||||
n, err = extDB.DeleteTSIDs(tfss)
|
||||
qtChild := qt.NewChild("deleting series from the previos indexdb")
|
||||
n, err = extDB.DeleteTSIDs(qtChild, tfss)
|
||||
qtChild.Donef("deleted %d series", n)
|
||||
deletedCount += n
|
||||
}) {
|
||||
if err != nil {
|
||||
|
|
|
@ -1271,8 +1271,8 @@ var ErrDeadlineExceeded = fmt.Errorf("deadline exceeded")
|
|||
// DeleteMetrics deletes all the metrics matching the given tfss.
|
||||
//
|
||||
// Returns the number of metrics deleted.
|
||||
func (s *Storage) DeleteMetrics(tfss []*TagFilters) (int, error) {
|
||||
deletedCount, err := s.idb().DeleteTSIDs(tfss)
|
||||
func (s *Storage) DeleteMetrics(qt *querytracer.Tracer, tfss []*TagFilters) (int, error) {
|
||||
deletedCount, err := s.idb().DeleteTSIDs(qt, tfss)
|
||||
if err != nil {
|
||||
return deletedCount, fmt.Errorf("cannot delete tsids: %w", err)
|
||||
}
|
||||
|
@ -1301,14 +1301,15 @@ func (s *Storage) SearchLabelValuesWithFiltersOnTimeRange(qt *querytracer.Tracer
|
|||
// This allows implementing https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find or similar APIs.
|
||||
//
|
||||
// If more than maxTagValueSuffixes suffixes is found, then only the first maxTagValueSuffixes suffixes is returned.
|
||||
func (s *Storage) SearchTagValueSuffixes(tr TimeRange, tagKey, tagValuePrefix []byte, delimiter byte, maxTagValueSuffixes int, deadline uint64) ([]string, error) {
|
||||
return s.idb().SearchTagValueSuffixes(tr, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes, deadline)
|
||||
func (s *Storage) SearchTagValueSuffixes(qt *querytracer.Tracer, tr TimeRange, tagKey, tagValuePrefix []byte,
|
||||
delimiter byte, maxTagValueSuffixes int, deadline uint64) ([]string, error) {
|
||||
return s.idb().SearchTagValueSuffixes(qt, tr, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes, deadline)
|
||||
}
|
||||
|
||||
// SearchGraphitePaths returns all the matching paths for the given graphite query on the given tr.
|
||||
func (s *Storage) SearchGraphitePaths(tr TimeRange, query []byte, maxPaths int, deadline uint64) ([]string, error) {
|
||||
func (s *Storage) SearchGraphitePaths(qt *querytracer.Tracer, tr TimeRange, query []byte, maxPaths int, deadline uint64) ([]string, error) {
|
||||
query = replaceAlternateRegexpsWithGraphiteWildcards(query)
|
||||
return s.searchGraphitePaths(tr, nil, query, maxPaths, deadline)
|
||||
return s.searchGraphitePaths(qt, tr, nil, query, maxPaths, deadline)
|
||||
}
|
||||
|
||||
// replaceAlternateRegexpsWithGraphiteWildcards replaces (foo|..|bar) with {foo,...,bar} in b and returns the new value.
|
||||
|
@ -1353,12 +1354,12 @@ func replaceAlternateRegexpsWithGraphiteWildcards(b []byte) []byte {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Storage) searchGraphitePaths(tr TimeRange, qHead, qTail []byte, maxPaths int, deadline uint64) ([]string, error) {
|
||||
func (s *Storage) searchGraphitePaths(qt *querytracer.Tracer, tr TimeRange, qHead, qTail []byte, maxPaths int, deadline uint64) ([]string, error) {
|
||||
n := bytes.IndexAny(qTail, "*[{")
|
||||
if n < 0 {
|
||||
// Verify that qHead matches a metric name.
|
||||
qHead = append(qHead, qTail...)
|
||||
suffixes, err := s.SearchTagValueSuffixes(tr, nil, qHead, '.', 1, deadline)
|
||||
suffixes, err := s.SearchTagValueSuffixes(qt, tr, nil, qHead, '.', 1, deadline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1373,7 +1374,7 @@ func (s *Storage) searchGraphitePaths(tr TimeRange, qHead, qTail []byte, maxPath
|
|||
return []string{string(qHead)}, nil
|
||||
}
|
||||
qHead = append(qHead, qTail[:n]...)
|
||||
suffixes, err := s.SearchTagValueSuffixes(tr, nil, qHead, '.', maxPaths, deadline)
|
||||
suffixes, err := s.SearchTagValueSuffixes(qt, tr, nil, qHead, '.', maxPaths, deadline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1410,7 +1411,7 @@ func (s *Storage) searchGraphitePaths(tr TimeRange, qHead, qTail []byte, maxPath
|
|||
continue
|
||||
}
|
||||
qHead = append(qHead[:qHeadLen], suffix...)
|
||||
ps, err := s.searchGraphitePaths(tr, qHead, qTail, maxPaths, deadline)
|
||||
ps, err := s.searchGraphitePaths(qt, tr, qHead, qTail, maxPaths, deadline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1665,7 +1666,9 @@ var (
|
|||
//
|
||||
// The the MetricRow.Timestamp is used for registering the metric name starting from the given timestamp.
|
||||
// Th MetricRow.Value field is ignored.
|
||||
func (s *Storage) RegisterMetricNames(mrs []MetricRow) error {
|
||||
func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) error {
|
||||
qt = qt.NewChild("registering %d series", len(mrs))
|
||||
defer qt.Done()
|
||||
var metricName []byte
|
||||
var genTSID generationTSID
|
||||
mn := GetMetricName()
|
||||
|
|
|
@ -654,7 +654,7 @@ func testStorageDeleteMetrics(s *Storage, workerNum int) error {
|
|||
if n := metricBlocksCount(tfs); n == 0 {
|
||||
return fmt.Errorf("expecting non-zero number of metric blocks for tfs=%s", tfs)
|
||||
}
|
||||
deletedCount, err := s.DeleteMetrics([]*TagFilters{tfs})
|
||||
deletedCount, err := s.DeleteMetrics(nil, []*TagFilters{tfs})
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot delete metrics: %w", err)
|
||||
}
|
||||
|
@ -666,7 +666,7 @@ func testStorageDeleteMetrics(s *Storage, workerNum int) error {
|
|||
}
|
||||
|
||||
// Try deleting empty tfss
|
||||
deletedCount, err = s.DeleteMetrics(nil)
|
||||
deletedCount, err = s.DeleteMetrics(nil, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot delete empty tfss: %w", err)
|
||||
}
|
||||
|
@ -783,8 +783,8 @@ func testStorageRegisterMetricNames(s *Storage) error {
|
|||
}
|
||||
mrs = append(mrs, mr)
|
||||
}
|
||||
if err := s.RegisterMetricNames(mrs); err != nil {
|
||||
return fmt.Errorf("unexpected error in AddMetrics: %w", err)
|
||||
if err := s.RegisterMetricNames(nil, mrs); err != nil {
|
||||
return fmt.Errorf("unexpected error in RegisterMetricNames: %w", err)
|
||||
}
|
||||
}
|
||||
var addIDsExpected []string
|
||||
|
|
Loading…
Reference in a new issue