From 1de15ad490dbde84ad2a657f3b65a6311991f372 Mon Sep 17 00:00:00 2001 From: Nikolay Date: Fri, 25 Dec 2020 12:03:13 +0300 Subject: [PATCH 1/7] adds escape for CRLF (#984) at external.alert.source - \n and \r symbols was url encoded, instead of direct usage. replace it from "\n" to `\n` allows to skip url encoding. https://github.com/VictoriaMetrics/VictoriaMetrics/issues/890 --- app/vmalert/README.md | 2 +- app/vmalert/main.go | 2 +- app/vmalert/notifier/template_func.go | 4 ++++ docs/vmalert.md | 2 +- 4 files changed, 7 insertions(+), 3 deletions(-) diff --git a/app/vmalert/README.md b/app/vmalert/README.md index 6f705db6f..aba8231d6 100644 --- a/app/vmalert/README.md +++ b/app/vmalert/README.md @@ -202,7 +202,7 @@ The shortlist of configuration flags is the following: How often to evaluate the rules (default 1m0s) -external.alert.source string External Alert Source allows to override the Source link for alerts sent to AlertManager for cases where you want to build a custom link to Grafana, Prometheus or any other service. - eg. 'explore?orgId=1&left=[\"now-1h\",\"now\",\"VictoriaMetrics\",{\"expr\": \"{{$expr|quotesEscape|pathEscape}}\"},{\"mode\":\"Metrics\"},{\"ui\":[true,true,true,\"none\"]}]'.If empty '/api/v1/:groupID/alertID/status' is used + eg. 'explore?orgId=1&left=[\"now-1h\",\"now\",\"VictoriaMetrics\",{\"expr\": \"{{$expr|quotesEscape|crlfEscape|pathEscape}}\"},{\"mode\":\"Metrics\"},{\"ui\":[true,true,true,\"none\"]}]'.If empty '/api/v1/:groupID/alertID/status' is used -external.label array Optional label in the form 'name=value' to add to all generated recording rules and alerts. Pass multiple -label flags in order to add multiple label sets. Supports array of values separated by comma or specified via multiple flags. diff --git a/app/vmalert/main.go b/app/vmalert/main.go index ea6922d86..d04584eb2 100644 --- a/app/vmalert/main.go +++ b/app/vmalert/main.go @@ -41,7 +41,7 @@ Rule files may contain %{ENV_VAR} placeholders, which are substituted by the cor validateExpressions = flag.Bool("rule.validateExpressions", true, "Whether to validate rules expressions via MetricsQL engine") externalURL = flag.String("external.url", "", "External URL is used as alert's source for sent alerts to the notifier") externalAlertSource = flag.String("external.alert.source", "", `External Alert Source allows to override the Source link for alerts sent to AlertManager for cases where you want to build a custom link to Grafana, Prometheus or any other service. -eg. 'explore?orgId=1&left=[\"now-1h\",\"now\",\"VictoriaMetrics\",{\"expr\": \"{{$expr|quotesEscape|pathEscape}}\"},{\"mode\":\"Metrics\"},{\"ui\":[true,true,true,\"none\"]}]'.If empty '/api/v1/:groupID/alertID/status' is used`) +eg. 'explore?orgId=1&left=[\"now-1h\",\"now\",\"VictoriaMetrics\",{\"expr\": \"{{$expr|quotesEscape|crlfEscape|pathEscape}}\"},{\"mode\":\"Metrics\"},{\"ui\":[true,true,true,\"none\"]}]'.If empty '/api/v1/:groupID/alertID/status' is used`) externalLabels = flagutil.NewArray("external.label", "Optional label in the form 'name=value' to add to all generated recording rules and alerts. "+ "Pass multiple -label flags in order to add multiple label sets.") diff --git a/app/vmalert/notifier/template_func.go b/app/vmalert/notifier/template_func.go index e643d5880..043339b87 100644 --- a/app/vmalert/notifier/template_func.go +++ b/app/vmalert/notifier/template_func.go @@ -167,6 +167,10 @@ func InitTemplateFunc(externalURL *url.URL) { "queryEscape": func(q string) string { return url.QueryEscape(q) }, + "crlfEscape": func(q string) string { + q = strings.Replace(q, "\n", `\n`, -1) + return strings.Replace(q, "\r", `\r`, -1) + }, "quotesEscape": func(q string) string { return strings.Replace(q, `"`, `\"`, -1) }, diff --git a/docs/vmalert.md b/docs/vmalert.md index 6f705db6f..aba8231d6 100644 --- a/docs/vmalert.md +++ b/docs/vmalert.md @@ -202,7 +202,7 @@ The shortlist of configuration flags is the following: How often to evaluate the rules (default 1m0s) -external.alert.source string External Alert Source allows to override the Source link for alerts sent to AlertManager for cases where you want to build a custom link to Grafana, Prometheus or any other service. - eg. 'explore?orgId=1&left=[\"now-1h\",\"now\",\"VictoriaMetrics\",{\"expr\": \"{{$expr|quotesEscape|pathEscape}}\"},{\"mode\":\"Metrics\"},{\"ui\":[true,true,true,\"none\"]}]'.If empty '/api/v1/:groupID/alertID/status' is used + eg. 'explore?orgId=1&left=[\"now-1h\",\"now\",\"VictoriaMetrics\",{\"expr\": \"{{$expr|quotesEscape|crlfEscape|pathEscape}}\"},{\"mode\":\"Metrics\"},{\"ui\":[true,true,true,\"none\"]}]'.If empty '/api/v1/:groupID/alertID/status' is used -external.label array Optional label in the form 'name=value' to add to all generated recording rules and alerts. Pass multiple -label flags in order to add multiple label sets. Supports array of values separated by comma or specified via multiple flags. From 932e53522d30dd1415df3e0c746ce87b3c00297b Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 25 Dec 2020 11:11:59 +0200 Subject: [PATCH 2/7] docs/CHANGELOG.md: mention that vmalert now properly escapes multi-line queries when passing to Grafana A follow-up for 1de15ad490dbde84ad2a657f3b65a6311991f372 See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/890 --- docs/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 193529981..a03f6fe08 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -6,6 +6,7 @@ * FEATURE: remove parts with stale data as soon as they go outside the configured `-retentionPeriod`. Previously such parts may remain active for long periods of time. This should help reducing disk usage for `-retentionPeriod` smaller than one month. * FEATURE: vmalert: allow setting multiple values for `-notifier.tlsInsecureSkipVerify` command-line flag per each `-notifier.url`. +* BUGFIX: vmalert: properly escape multiline queries when passing them to Grafana. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/890 * BUGFIX: vmagent: set missing `__meta_kubernetes_service_*` labels in `kubernetes_sd_config` for `endpoints` and `endpointslices` roles. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/982 From 490c69c64e22bf65b193100a6f34eda218b983fb Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 25 Dec 2020 11:45:47 +0200 Subject: [PATCH 3/7] lib/storage: wait for pending transactions before closing and dropping the partition This deflakes `make test-full-386` test --- lib/fs/fs.go | 4 ++++ lib/storage/partition.go | 27 ++++++++++++++++++++------- 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/lib/fs/fs.go b/lib/fs/fs.go index fb05fa876..9e0cefb13 100644 --- a/lib/fs/fs.go +++ b/lib/fs/fs.go @@ -186,6 +186,8 @@ func mustSyncParentDirIfExists(path string) { // MustRemoveAll removes path with all the contents. // +// It properly fsyncs the parent directory after path removal. +// // It properly handles NFS issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 . func MustRemoveAll(path string) { _ = mustRemoveAll(path, func() {}) @@ -193,6 +195,8 @@ func MustRemoveAll(path string) { // MustRemoveAllWithDoneCallback removes path with all the contents. // +// It properly fsyncs the parent directory after path removal. +// // done is called after the path is successfully removed. // // done may be called after the function returns for NFS path. diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 7ed6ca89f..472c532c8 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -240,6 +240,9 @@ func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath str // The pt must be detached from table before calling pt.Drop. func (pt *partition) Drop() { logger.Infof("dropping partition %q at smallPartsPath=%q, bigPartsPath=%q", pt.name, pt.smallPartsPath, pt.bigPartsPath) + // Wait until all the pending transaction deletions are finished before removing partition directories. + pendingTxnDeletionsWG.Wait() + fs.MustRemoveAll(pt.smallPartsPath) fs.MustRemoveAll(pt.bigPartsPath) logger.Infof("partition %q has been dropped", pt.name) @@ -643,6 +646,9 @@ func (pt *partition) PutParts(pws []*partWrapper) { func (pt *partition) MustClose() { close(pt.stopCh) + // Wait until all the pending transaction deletions are finished. + pendingTxnDeletionsWG.Wait() + logger.Infof("waiting for stale parts remover to stop on %q...", pt.smallPartsPath) startTime := time.Now() pt.stalePartsRemoverWG.Wait() @@ -1352,13 +1358,14 @@ func (pt *partition) removeStaleParts() { pt.snapshotLock.RLock() var removeWG sync.WaitGroup for pw := range m { - removeWG.Add(1) logger.Infof("removing part %q, since its data is out of the configured retention (%d secs)", pw.p.path, retentionDeadline/1000) + removeWG.Add(1) fs.MustRemoveAllWithDoneCallback(pw.p.path, removeWG.Done) } removeWG.Wait() - fs.MustSyncPath(pt.smallPartsPath) - fs.MustSyncPath(pt.bigPartsPath) + // There is no need in calling fs.MustSyncPath() on pt.smallPartsPath and pt.bigPartsPath, + // since they should be automatically called inside fs.MustRemoveAllWithDoneCallback. + pt.snapshotLock.RUnlock() // Remove partition references from removed parts. @@ -1738,9 +1745,15 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath str } } else { // Just remove srcPath. - fs.MustRemoveAll(srcPath) + removeWG.Add(1) + fs.MustRemoveAllWithDoneCallback(srcPath, removeWG.Done) } + // Flush pathPrefix* directory metadata to the underying storage, + // so the moved files become visible there. + fs.MustSyncPath(pathPrefix1) + fs.MustSyncPath(pathPrefix2) + pendingTxnDeletionsWG.Add(1) go func() { defer pendingTxnDeletionsWG.Done() @@ -1748,9 +1761,9 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath str // This is required for NFS mounts. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 . removeWG.Wait() - // Flush pathPrefix* directory metadata to the underying storage. - fs.MustSyncPath(pathPrefix1) - fs.MustSyncPath(pathPrefix2) + // There is no need in calling fs.MustSyncPath for pathPrefix* after parts' removal, + // since it is already called by fs.MustRemoveAllWithDoneCallback. + if err := os.Remove(txnPath); err != nil { logger.Errorf("cannot remove transaction file %q: %s", txnPath, err) } From 86630350bfbeec900bc61c010cf54568428d727a Mon Sep 17 00:00:00 2001 From: Nikolay Date: Fri, 25 Dec 2020 17:42:05 +0300 Subject: [PATCH 4/7] Adds query stats handler (#945) * Adds query stat handler, for query and query_range api, victoriametrics tracks query execution time, stats are expored at /api/v1/status/queries endpoint with topN param https://github.com/VictoriaMetrics/VictoriaMetrics/issues/907 * fixed query stats bugs * improves queryStats tracker * improves query stat * small fix * fix tests * added more tests * fixes 386 tests * naming fixes * adds drop for outdated records --- app/vmselect/main.go | 19 ++ app/vmselect/prometheus/prometheus.go | 34 ++++ app/vmselect/promql/exec.go | 7 + app/vmselect/promql/query_stats.go | 254 ++++++++++++++++++++++++ app/vmselect/promql/query_stats_test.go | 144 ++++++++++++++ 5 files changed, 458 insertions(+) create mode 100644 app/vmselect/promql/query_stats.go create mode 100644 app/vmselect/promql/query_stats_test.go diff --git a/app/vmselect/main.go b/app/vmselect/main.go index 5847113ca..da79de4d9 100644 --- a/app/vmselect/main.go +++ b/app/vmselect/main.go @@ -198,6 +198,25 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { return true } return true + case "/api/v1/status/queries/avg_duration": + if err := prometheus.QueryStatsHandler(startTime, w, r, "avg_duration"); err != nil { + sendPrometheusError(w, r, fmt.Errorf("cannot query status endpoint: %w", err)) + return true + } + return true + case "/api/v1/status/queries/duration": + if err := prometheus.QueryStatsHandler(startTime, w, r, "duration"); err != nil { + sendPrometheusError(w, r, fmt.Errorf("cannot query status endpoint: %w", err)) + return true + } + return true + case "/api/v1/status/queries/frequency": + if err := prometheus.QueryStatsHandler(startTime, w, r, "frequency"); err != nil { + sendPrometheusError(w, r, fmt.Errorf("cannot query status endpoint: %w", err)) + return true + } + return true + case "/api/v1/status/tsdb": statusTSDBRequests.Inc() if err := prometheus.TSDBStatusHandler(startTime, w, r); err != nil { diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index e2c5cd704..0c3e5cad3 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -1250,3 +1250,37 @@ func getLatencyOffsetMilliseconds() int64 { } return d } + +// QueryStatsHandler - returns statistics for queries executions with given aggregate func name. +// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/907 +func QueryStatsHandler(startTime time.Time, w http.ResponseWriter, r *http.Request, aggregateBy string) error { + if err := r.ParseForm(); err != nil { + return fmt.Errorf("cannot parse form values: %w", err) + } + topN := 10 + topNStr := r.FormValue("topN") + if len(topNStr) > 0 { + n, err := strconv.Atoi(topNStr) + if err != nil { + return fmt.Errorf("cannot parse `topN` arg %q: %w", topNStr, err) + } + if n <= 0 { + n = 1 + } + if n > 1000 { + n = 1000 + } + topN = n + } + w.Header().Set("Content-Type", "application/json; charset=utf-8") + bw := bufferedwriter.Get(w) + defer bufferedwriter.Put(bw) + promql.WriteQueryStatsResponse(bw, topN, aggregateBy) + if err := bw.Flush(); err != nil { + return err + } + queryStatsDuration.UpdateDuration(startTime) + return nil +} + +var queryStatsDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/status/queries"}`) diff --git a/app/vmselect/promql/exec.go b/app/vmselect/promql/exec.go index c41fbac00..d6fc18ea1 100644 --- a/app/vmselect/promql/exec.go +++ b/app/vmselect/promql/exec.go @@ -39,6 +39,13 @@ func Exec(ec *EvalConfig, q string, isFirstPointOnly bool) ([]netstorage.Result, } }() } + if *maxQueryStatsTrackerItemsCount > 0 { + start := time.Now() + defer func() { + tr := ec.End - ec.Start + InsertQueryStat(q, tr, start, time.Since(start)) + }() + } ec.validate() diff --git a/app/vmselect/promql/query_stats.go b/app/vmselect/promql/query_stats.go new file mode 100644 index 000000000..f7496ea79 --- /dev/null +++ b/app/vmselect/promql/query_stats.go @@ -0,0 +1,254 @@ +package promql + +import ( + "flag" + "fmt" + "io" + "sort" + "strings" + "sync" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/metrics" +) + +var ( + maxQueryStatsRecordLifeTime = flag.Duration("search.MaxQueryStatsRecordLifeTime", 10*time.Minute, "Limits maximum lifetime for query stats record. With minimum 10 seconds") + maxQueryStatsTrackerItemsCount = flag.Int("search.MaxQueryStatsItems", 1000, "Limits count for distinct query stat records, keyed by query name and query time range. "+ + "With Maximum 5000 records. Zero value disables query stats recording") +) + +var ( + shrinkQueryStatsCalls = metrics.NewCounter(`vm_query_stats_shrink_calls_total`) + globalQueryStatsTracker *queryStatsTracker + gQSTOnce sync.Once +) + +// InsertQueryStat - inserts query stats record to global query stats tracker +// with given query name, query time-range, execution time and its duration. +func InsertQueryStat(query string, tr int64, execTime time.Time, duration time.Duration) { + gQSTOnce.Do(func() { + initQueryStatsTracker() + }) + globalQueryStatsTracker.insertQueryStat(query, tr, execTime, duration) +} + +// WriteQueryStatsResponse - writes query stats to given writer in json format with given aggregate key. +func WriteQueryStatsResponse(w io.Writer, topN int, aggregateBy string) { + gQSTOnce.Do(func() { + initQueryStatsTracker() + }) + writeJSONQueryStats(w, globalQueryStatsTracker, topN, aggregateBy) +} + +// queryStatsTracker - hold statistics for all queries, +// query name and query range is a group key. +type queryStatsTracker struct { + maxQueryLogRecordTime time.Duration + limit int + queryStatsLocker sync.Mutex + qs []queryStats +} + +// queryStats - represent single query statistic. +type queryStats struct { + query string + queryRange int64 + queryLastSeen int64 + queryStatRecords []queryStatRecord +} + +// queryStatRecord - one record of query stat. +type queryStatRecord struct { + // end-start + duration time.Duration + // in seconds as unix_ts. + execTime int64 +} + +func initQueryStatsTracker() { + limit := *maxQueryStatsTrackerItemsCount + if limit > 5000 { + limit = 5000 + } + qlt := *maxQueryStatsRecordLifeTime + if qlt == 0 { + qlt = time.Second * 10 + } + logger.Infof("enabled query stats tracking, max records count: %d, max query record lifetime: %s", limit, qlt) + qst := queryStatsTracker{ + limit: limit, + maxQueryLogRecordTime: qlt, + } + go func() { + for { + time.Sleep(time.Second * 10) + qst.dropOutdatedRecords() + } + }() + globalQueryStatsTracker = &qst +} + +func formatJSONQueryStats(queries []queryStats) string { + var s strings.Builder + for i, q := range queries { + fmt.Fprintf(&s, `{"query": %q,`, q.query) + fmt.Fprintf(&s, `"query_time_range": %q,`, time.Duration(q.queryRange*1e6)) + fmt.Fprintf(&s, `"cumalative_duration": %q,`, q.Duration()) + if len(q.queryStatRecords) > 0 { + fmt.Fprintf(&s, `"avg_duration": %q,`, q.Duration()/time.Duration(len(q.queryStatRecords))) + } + fmt.Fprintf(&s, `"requests_count": "%d"`, len(q.queryStatRecords)) + s.WriteString(`}`) + if i != len(queries)-1 { + s.WriteString(`,`) + } + + } + return s.String() +} + +func writeJSONQueryStats(w io.Writer, ql *queryStatsTracker, topN int, aggregateBy string) { + fmt.Fprintf(w, `{"top_n": "%d",`, topN) + fmt.Fprintf(w, `"stats_max_duration": %q,`, maxQueryStatsRecordLifeTime.String()) + fmt.Fprint(w, `"top": [`) + switch aggregateBy { + case "frequency": + fmt.Fprint(w, formatJSONQueryStats(getTopNQueriesByRecordCount(ql, topN))) + case "duration": + fmt.Fprint(w, formatJSONQueryStats(getTopNQueriesByDuration(ql, topN))) + case "avg_duration": + fmt.Fprint(w, formatJSONQueryStats(getTopNQueriesByAvgDuration(ql, topN))) + default: + logger.Errorf("invalid aggregation key=%q, report bug", aggregateBy) + fmt.Fprintf(w, `{"error": "invalid aggregateBy value=%s"}`, aggregateBy) + } + fmt.Fprint(w, `]`) + fmt.Fprint(w, `}`) +} + +// drops query stats records less then given time in seconds. +// no need to sort +// its added in chronological order. +// must be called with mutex. +func (qs *queryStats) dropOutDatedRecords(t int64) { + // fast path + // compare time with last elem. + if len(qs.queryStatRecords) > 0 && qs.queryStatRecords[len(qs.queryStatRecords)-1].execTime < t { + qs.queryStatRecords = qs.queryStatRecords[:0] + return + } + // remove all elements by default. + shrinkIndex := len(qs.queryStatRecords) + for i, v := range qs.queryStatRecords { + if t < v.execTime { + shrinkIndex = i + break + } + } + if shrinkIndex > 0 { + qs.queryStatRecords = qs.queryStatRecords[shrinkIndex:] + } +} + +// calculates cumulative duration for query. +func (qs *queryStats) Duration() time.Duration { + var cnt time.Duration + for _, v := range qs.queryStatRecords { + cnt += v.duration + } + return cnt +} + +// must be called with mutex, +// shrinks slice by the last added query with given shrinkSize. +func (qst *queryStatsTracker) shrink(shrinkSize int) { + if len(qst.qs) < shrinkSize { + return + } + sort.Slice(qst.qs, func(i, j int) bool { + return qst.qs[i].queryLastSeen < qst.qs[j].queryLastSeen + }) + qst.qs = qst.qs[shrinkSize:] +} + +// drop outdated keys. +func (qst *queryStatsTracker) dropOutdatedRecords() { + qst.queryStatsLocker.Lock() + defer qst.queryStatsLocker.Unlock() + t := time.Now().Add(-qst.maxQueryLogRecordTime).Unix() + var i int + for _, v := range qst.qs { + v.dropOutDatedRecords(t) + if len(v.queryStatRecords) > 0 { + qst.qs[i] = v + i++ + } + } + if i == len(qst.qs) { + return + } + qst.qs = qst.qs[:i] +} + +func (qst *queryStatsTracker) insertQueryStat(query string, tr int64, execTime time.Time, duration time.Duration) { + qst.queryStatsLocker.Lock() + defer qst.queryStatsLocker.Unlock() + // shrink old queries. + if len(qst.qs) > qst.limit { + shrinkQueryStatsCalls.Inc() + qst.shrink(1) + } + // add record to exist stats, keyed by query string and time-range. + for i, v := range qst.qs { + if v.query == query && v.queryRange == tr { + v.queryLastSeen = execTime.Unix() + v.queryStatRecords = append(v.queryStatRecords, queryStatRecord{execTime: execTime.Unix(), duration: duration}) + qst.qs[i] = v + return + } + } + qst.qs = append(qst.qs, queryStats{ + queryStatRecords: []queryStatRecord{{execTime: execTime.Unix(), duration: duration}}, + queryLastSeen: execTime.Unix(), + query: query, + queryRange: tr, + }) + +} + +func getTopNQueriesByAvgDuration(qst *queryStatsTracker, top int) []queryStats { + return getTopNQueryStatsItemsWithFilter(qst, top, func(i, j int) bool { + lenI := len(qst.qs[i].queryStatRecords) + lenJ := len(qst.qs[j].queryStatRecords) + if lenI == 0 || lenJ == 0 { + return false + } + return qst.qs[i].Duration()/time.Duration(lenI) > qst.qs[j].Duration()/time.Duration(lenJ) + }) +} + +func getTopNQueriesByRecordCount(qst *queryStatsTracker, top int) []queryStats { + return getTopNQueryStatsItemsWithFilter(qst, top, func(i, j int) bool { + return len(qst.qs[i].queryStatRecords) > len(qst.qs[j].queryStatRecords) + }) +} + +func getTopNQueriesByDuration(qst *queryStatsTracker, top int) []queryStats { + return getTopNQueryStatsItemsWithFilter(qst, top, func(i, j int) bool { + return qst.qs[i].Duration() > qst.qs[j].Duration() + }) +} + +func getTopNQueryStatsItemsWithFilter(qst *queryStatsTracker, top int, filterFunc func(i, j int) bool) []queryStats { + qst.queryStatsLocker.Lock() + defer qst.queryStatsLocker.Unlock() + if top > len(qst.qs) { + top = len(qst.qs) + } + sort.Slice(qst.qs, filterFunc) + result := make([]queryStats, 0, top) + result = append(result, qst.qs[:top]...) + return result +} diff --git a/app/vmselect/promql/query_stats_test.go b/app/vmselect/promql/query_stats_test.go new file mode 100644 index 000000000..eec36c0fa --- /dev/null +++ b/app/vmselect/promql/query_stats_test.go @@ -0,0 +1,144 @@ +package promql + +import ( + "fmt" + "reflect" + "strings" + "testing" + "time" +) + +func TestQueryLoggerShrink(t *testing.T) { + f := func(addItemCount, limit, expectedLen int) { + t.Helper() + qst := &queryStatsTracker{ + limit: limit, + maxQueryLogRecordTime: time.Second * 5, + } + for i := 0; i < addItemCount; i++ { + qst.insertQueryStat(fmt.Sprintf("random-n-%d", i), int64(i), time.Now().Add(-time.Second), 500+time.Duration(i)) + } + if len(qst.qs) != expectedLen { + t.Fatalf("unxpected len got=%d, for queryStats slice, want=%d", len(qst.qs), expectedLen) + } + } + f(10, 5, 6) + f(30, 10, 11) + f(15, 15, 15) +} + +func TestGetTopNQueriesByDuration(t *testing.T) { + f := func(topN int, expectedQueryStats []queryStats) { + t.Helper() + ql := &queryStatsTracker{ + limit: 25, + maxQueryLogRecordTime: time.Second * 5, + } + queriesDurations := []int{16, 4, 5, 10} + for i, v := range queriesDurations { + ql.insertQueryStat(fmt.Sprintf("query-n-%d", i), int64(0), time.Now(), time.Second*time.Duration(v)) + } + got := getTopNQueriesByAvgDuration(ql, topN) + + if len(got) != len(expectedQueryStats) { + t.Fatalf("unxpected len of result, got: %d, want: %d", len(got), len(expectedQueryStats)) + } + for i, gotR := range got { + if gotR.query != expectedQueryStats[i].query { + t.Fatalf("unxpected query: %q at position: %d, want: %q", gotR.query, i, expectedQueryStats[i].query) + } + } + } + f(1, []queryStats{{query: "query-n-0"}}) + f(2, []queryStats{{query: "query-n-0"}, {query: "query-n-3"}}) +} + +func TestGetTopNQueriesByCount(t *testing.T) { + f := func(topN int, expectedQueryStats []queryStats) { + t.Helper() + ql := &queryStatsTracker{ + limit: 25, + maxQueryLogRecordTime: time.Second * 5, + } + queriesCounts := []int{1, 4, 5, 11} + for i, v := range queriesCounts { + for ic := 0; ic < v; ic++ { + ql.insertQueryStat(fmt.Sprintf("query-n-%d", i), int64(0), time.Now(), time.Second*time.Duration(v)) + } + } + + got := getTopNQueriesByRecordCount(ql, topN) + + if len(got) != len(expectedQueryStats) { + t.Fatalf("unxpected len of result, got: %d, want: %d", len(got), len(expectedQueryStats)) + } + for i, gotR := range got { + if gotR.query != expectedQueryStats[i].query { + t.Fatalf("unxpected query: %q at position: %d, want: %q", gotR.query, i, expectedQueryStats[i].query) + } + } + } + f(1, []queryStats{{query: "query-n-3"}}) + f(2, []queryStats{{query: "query-n-3"}, {query: "query-n-2"}}) +} + +func TestGetTopNQueriesByAverageDuration(t *testing.T) { + f := func(topN int, expectedQueryStats []queryStats) { + t.Helper() + ql := &queryStatsTracker{ + limit: 25, + maxQueryLogRecordTime: time.Second * 5, + } + queriesQurations := []int{4, 25, 14, 10} + for i, v := range queriesQurations { + ql.insertQueryStat(fmt.Sprintf("query-n-%d", i), int64(0), time.Now(), time.Second*time.Duration(v)) + } + + got := getTopNQueriesByAvgDuration(ql, topN) + + if len(got) != len(expectedQueryStats) { + t.Fatalf("unxpected len of result, got: %d, want: %d", len(got), len(expectedQueryStats)) + } + for i, gotR := range got { + if gotR.query != expectedQueryStats[i].query { + t.Fatalf("unxpected query: %q at position: %d, want: %q", gotR.query, i, expectedQueryStats[i].query) + } + } + } + f(1, []queryStats{{query: "query-n-1"}}) + f(2, []queryStats{{query: "query-n-1"}, {query: "query-n-2"}}) +} + +func TestWriteJSONQueryStats(t *testing.T) { + qst := queryStatsTracker{ + limit: 100, + maxQueryLogRecordTime: time.Minute * 5, + } + t1 := time.Now() + qst.insertQueryStat("sum(rate(rps_total)[1m]) by(service)", 360, t1, time.Microsecond*100) + qst.insertQueryStat("up", 360, t1, time.Microsecond) + qst.insertQueryStat("up", 360, t1, time.Microsecond) + qst.insertQueryStat("up", 360, t1, time.Microsecond) + + f := func(t *testing.T, wantResp, aggregateBy string) { + var got strings.Builder + writeJSONQueryStats(&got, &qst, 5, aggregateBy) + if !reflect.DeepEqual(got.String(), wantResp) { + t.Fatalf("unexpected response, \ngot: %s,\nwant: %s", got.String(), wantResp) + } + } + + t.Run("aggregateByDuration", func(t *testing.T) { + f(t, `{"top_n": "5","stats_max_duration": "10m0s","top": [{"query": "sum(rate(rps_total)[1m]) by(service)","query_time_range": "360ms","cumalative_duration": "100µs","avg_duration": "100µs","requests_count": "1"},{"query": "up","query_time_range": "360ms","cumalative_duration": "3µs","avg_duration": "1µs","requests_count": "3"}]}`, + "duration") + }) + t.Run("aggregateByfrequency", func(t *testing.T) { + f(t, `{"top_n": "5","stats_max_duration": "10m0s","top": [{"query": "up","query_time_range": "360ms","cumalative_duration": "3µs","avg_duration": "1µs","requests_count": "3"},{"query": "sum(rate(rps_total)[1m]) by(service)","query_time_range": "360ms","cumalative_duration": "100µs","avg_duration": "100µs","requests_count": "1"}]}`, + "frequency") + }) + t.Run("aggregateByDuration", func(t *testing.T) { + f(t, `{"top_n": "5","stats_max_duration": "10m0s","top": [{"query": "sum(rate(rps_total)[1m]) by(service)","query_time_range": "360ms","cumalative_duration": "100µs","avg_duration": "100µs","requests_count": "1"},{"query": "up","query_time_range": "360ms","cumalative_duration": "3µs","avg_duration": "1µs","requests_count": "3"}]}`, + "avg_duration") + }) + +} From fb338c50a3738b90838b63a30dd5ff72795fdbc2 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 25 Dec 2020 16:40:20 +0200 Subject: [PATCH 5/7] app/victoria-metrics: show usage info when incorrect command-line flag is passed to executable --- app/victoria-metrics/main.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/app/victoria-metrics/main.go b/app/victoria-metrics/main.go index 517c10aea..61f3e61c8 100644 --- a/app/victoria-metrics/main.go +++ b/app/victoria-metrics/main.go @@ -15,6 +15,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -35,6 +36,7 @@ var ( func main() { // Write flags and help message to stdout, since it is easier to grep or pipe. flag.CommandLine.SetOutput(os.Stdout) + flag.Usage = usage envflag.Parse() buildinfo.Init() logger.Init() @@ -115,3 +117,12 @@ func writeAPIHelp(w io.Writer, pathList [][]string) { fmt.Fprintf(w, "%q - %s
", p, p, doc) } } + +func usage() { + const s = ` +victoria-metrics is a time series database and monitoring solution. + +See the docs at https://victoriametrics.github.io/ +` + flagutil.Usage(s) +} From 59183f66d0adfe342b133347996387ab4c215548 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 25 Dec 2020 16:44:26 +0200 Subject: [PATCH 6/7] app/vmselect: refactor `/api/v1/stats/top_queries` Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/907 --- README.md | 12 +- app/vmselect/main.go | 22 +- app/vmselect/prometheus/prometheus.go | 22 +- app/vmselect/promql/exec.go | 8 +- app/vmselect/promql/query_stats.go | 254 ------------------------ app/vmselect/promql/query_stats_test.go | 144 -------------- app/vmselect/querystats/querystats.go | 247 +++++++++++++++++++++++ docs/CHANGELOG.md | 1 + docs/Single-server-VictoriaMetrics.md | 12 +- 9 files changed, 287 insertions(+), 435 deletions(-) delete mode 100644 app/vmselect/promql/query_stats.go delete mode 100644 app/vmselect/promql/query_stats_test.go create mode 100644 app/vmselect/querystats/querystats.go diff --git a/README.md b/README.md index a8544b656..d57e82571 100644 --- a/README.md +++ b/README.md @@ -535,11 +535,17 @@ See [this feature request](https://github.com/prometheus/prometheus/issues/6178) Additionally VictoriaMetrics provides the following handlers: -* `/api/v1/series/count` - it returns the total number of time series in the database. Some notes: +* `/api/v1/series/count` - returns the total number of time series in the database. Some notes: * the handler scans all the inverted index, so it can be slow if the database contains tens of millions of time series; * the handler may count [deleted time series](#how-to-delete-time-series) additionally to normal time series due to internal implementation restrictions; -* `/api/v1/labels/count` - it returns a list of `label: values_count` entries. It can be used for determining labels with the maximum number of values. -* `/api/v1/status/active_queries` - it returns a list of currently running queries. +* `/api/v1/labels/count` - returns a list of `label: values_count` entries. It can be used for determining labels with the maximum number of values. +* `/api/v1/status/active_queries` - returns a list of currently running queries. +* `/api/v1/status/top_queries` - returns the following query lists: + * the most frequently executed queries - `topByCount` + * queries with the biggest average execution duration - `topByAvgDuration` + * queries that took the most time for execution - `topBySumDuration` + The number of returned queries can be limited via `topN` query arg. Old queries can be filtered out with `maxLifetime` query arg. + For example, request to `/api/v1/status/top_queries?topN=5&maxLifetime=30s` would return up to 5 queries per list, which were executed during the last 30 seconds. ## Graphite API usage diff --git a/app/vmselect/main.go b/app/vmselect/main.go index da79de4d9..4f78ab3cd 100644 --- a/app/vmselect/main.go +++ b/app/vmselect/main.go @@ -198,25 +198,14 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { return true } return true - case "/api/v1/status/queries/avg_duration": - if err := prometheus.QueryStatsHandler(startTime, w, r, "avg_duration"); err != nil { + case "/api/v1/status/top_queries": + topQueriesRequests.Inc() + if err := prometheus.QueryStatsHandler(startTime, w, r); err != nil { + topQueriesErrors.Inc() sendPrometheusError(w, r, fmt.Errorf("cannot query status endpoint: %w", err)) return true } return true - case "/api/v1/status/queries/duration": - if err := prometheus.QueryStatsHandler(startTime, w, r, "duration"); err != nil { - sendPrometheusError(w, r, fmt.Errorf("cannot query status endpoint: %w", err)) - return true - } - return true - case "/api/v1/status/queries/frequency": - if err := prometheus.QueryStatsHandler(startTime, w, r, "frequency"); err != nil { - sendPrometheusError(w, r, fmt.Errorf("cannot query status endpoint: %w", err)) - return true - } - return true - case "/api/v1/status/tsdb": statusTSDBRequests.Inc() if err := prometheus.TSDBStatusHandler(startTime, w, r); err != nil { @@ -435,6 +424,9 @@ var ( labelsCountRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/labels/count"}`) labelsCountErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/labels/count"}`) + topQueriesRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/status/top_queries"}`) + topQueriesErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/status/top_queries"}`) + statusTSDBRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/status/tsdb"}`) statusTSDBErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/status/tsdb"}`) diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index 0c3e5cad3..d7a1a5a88 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -14,6 +14,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/bufferedwriter" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/querystats" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" @@ -1251,31 +1252,28 @@ func getLatencyOffsetMilliseconds() int64 { return d } -// QueryStatsHandler - returns statistics for queries executions with given aggregate func name. -// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/907 -func QueryStatsHandler(startTime time.Time, w http.ResponseWriter, r *http.Request, aggregateBy string) error { +// QueryStatsHandler returns query stats at `/api/v1/status/top_queries` +func QueryStatsHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error { if err := r.ParseForm(); err != nil { return fmt.Errorf("cannot parse form values: %w", err) } - topN := 10 + topN := 20 topNStr := r.FormValue("topN") if len(topNStr) > 0 { n, err := strconv.Atoi(topNStr) if err != nil { return fmt.Errorf("cannot parse `topN` arg %q: %w", topNStr, err) } - if n <= 0 { - n = 1 - } - if n > 1000 { - n = 1000 - } topN = n } + maxLifetimeMsecs, err := searchutils.GetDuration(r, "maxLifetime", 10*60*1000) + if err != nil { + return fmt.Errorf("cannot parse `maxLifetime` arg: %w", err) + } w.Header().Set("Content-Type", "application/json; charset=utf-8") bw := bufferedwriter.Get(w) defer bufferedwriter.Put(bw) - promql.WriteQueryStatsResponse(bw, topN, aggregateBy) + querystats.WriteJSONQueryStats(bw, topN, time.Duration(maxLifetimeMsecs)*time.Millisecond) if err := bw.Flush(); err != nil { return err } @@ -1283,4 +1281,4 @@ func QueryStatsHandler(startTime time.Time, w http.ResponseWriter, r *http.Reque return nil } -var queryStatsDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/status/queries"}`) +var queryStatsDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/status/top_queries"}`) diff --git a/app/vmselect/promql/exec.go b/app/vmselect/promql/exec.go index d6fc18ea1..adb42b0a6 100644 --- a/app/vmselect/promql/exec.go +++ b/app/vmselect/promql/exec.go @@ -11,6 +11,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/querystats" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metricsql" @@ -39,11 +40,10 @@ func Exec(ec *EvalConfig, q string, isFirstPointOnly bool) ([]netstorage.Result, } }() } - if *maxQueryStatsTrackerItemsCount > 0 { - start := time.Now() + if querystats.Enabled() { + startTime := time.Now() defer func() { - tr := ec.End - ec.Start - InsertQueryStat(q, tr, start, time.Since(start)) + querystats.RegisterQuery(q, ec.End-ec.Start, startTime) }() } diff --git a/app/vmselect/promql/query_stats.go b/app/vmselect/promql/query_stats.go deleted file mode 100644 index f7496ea79..000000000 --- a/app/vmselect/promql/query_stats.go +++ /dev/null @@ -1,254 +0,0 @@ -package promql - -import ( - "flag" - "fmt" - "io" - "sort" - "strings" - "sync" - "time" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/metrics" -) - -var ( - maxQueryStatsRecordLifeTime = flag.Duration("search.MaxQueryStatsRecordLifeTime", 10*time.Minute, "Limits maximum lifetime for query stats record. With minimum 10 seconds") - maxQueryStatsTrackerItemsCount = flag.Int("search.MaxQueryStatsItems", 1000, "Limits count for distinct query stat records, keyed by query name and query time range. "+ - "With Maximum 5000 records. Zero value disables query stats recording") -) - -var ( - shrinkQueryStatsCalls = metrics.NewCounter(`vm_query_stats_shrink_calls_total`) - globalQueryStatsTracker *queryStatsTracker - gQSTOnce sync.Once -) - -// InsertQueryStat - inserts query stats record to global query stats tracker -// with given query name, query time-range, execution time and its duration. -func InsertQueryStat(query string, tr int64, execTime time.Time, duration time.Duration) { - gQSTOnce.Do(func() { - initQueryStatsTracker() - }) - globalQueryStatsTracker.insertQueryStat(query, tr, execTime, duration) -} - -// WriteQueryStatsResponse - writes query stats to given writer in json format with given aggregate key. -func WriteQueryStatsResponse(w io.Writer, topN int, aggregateBy string) { - gQSTOnce.Do(func() { - initQueryStatsTracker() - }) - writeJSONQueryStats(w, globalQueryStatsTracker, topN, aggregateBy) -} - -// queryStatsTracker - hold statistics for all queries, -// query name and query range is a group key. -type queryStatsTracker struct { - maxQueryLogRecordTime time.Duration - limit int - queryStatsLocker sync.Mutex - qs []queryStats -} - -// queryStats - represent single query statistic. -type queryStats struct { - query string - queryRange int64 - queryLastSeen int64 - queryStatRecords []queryStatRecord -} - -// queryStatRecord - one record of query stat. -type queryStatRecord struct { - // end-start - duration time.Duration - // in seconds as unix_ts. - execTime int64 -} - -func initQueryStatsTracker() { - limit := *maxQueryStatsTrackerItemsCount - if limit > 5000 { - limit = 5000 - } - qlt := *maxQueryStatsRecordLifeTime - if qlt == 0 { - qlt = time.Second * 10 - } - logger.Infof("enabled query stats tracking, max records count: %d, max query record lifetime: %s", limit, qlt) - qst := queryStatsTracker{ - limit: limit, - maxQueryLogRecordTime: qlt, - } - go func() { - for { - time.Sleep(time.Second * 10) - qst.dropOutdatedRecords() - } - }() - globalQueryStatsTracker = &qst -} - -func formatJSONQueryStats(queries []queryStats) string { - var s strings.Builder - for i, q := range queries { - fmt.Fprintf(&s, `{"query": %q,`, q.query) - fmt.Fprintf(&s, `"query_time_range": %q,`, time.Duration(q.queryRange*1e6)) - fmt.Fprintf(&s, `"cumalative_duration": %q,`, q.Duration()) - if len(q.queryStatRecords) > 0 { - fmt.Fprintf(&s, `"avg_duration": %q,`, q.Duration()/time.Duration(len(q.queryStatRecords))) - } - fmt.Fprintf(&s, `"requests_count": "%d"`, len(q.queryStatRecords)) - s.WriteString(`}`) - if i != len(queries)-1 { - s.WriteString(`,`) - } - - } - return s.String() -} - -func writeJSONQueryStats(w io.Writer, ql *queryStatsTracker, topN int, aggregateBy string) { - fmt.Fprintf(w, `{"top_n": "%d",`, topN) - fmt.Fprintf(w, `"stats_max_duration": %q,`, maxQueryStatsRecordLifeTime.String()) - fmt.Fprint(w, `"top": [`) - switch aggregateBy { - case "frequency": - fmt.Fprint(w, formatJSONQueryStats(getTopNQueriesByRecordCount(ql, topN))) - case "duration": - fmt.Fprint(w, formatJSONQueryStats(getTopNQueriesByDuration(ql, topN))) - case "avg_duration": - fmt.Fprint(w, formatJSONQueryStats(getTopNQueriesByAvgDuration(ql, topN))) - default: - logger.Errorf("invalid aggregation key=%q, report bug", aggregateBy) - fmt.Fprintf(w, `{"error": "invalid aggregateBy value=%s"}`, aggregateBy) - } - fmt.Fprint(w, `]`) - fmt.Fprint(w, `}`) -} - -// drops query stats records less then given time in seconds. -// no need to sort -// its added in chronological order. -// must be called with mutex. -func (qs *queryStats) dropOutDatedRecords(t int64) { - // fast path - // compare time with last elem. - if len(qs.queryStatRecords) > 0 && qs.queryStatRecords[len(qs.queryStatRecords)-1].execTime < t { - qs.queryStatRecords = qs.queryStatRecords[:0] - return - } - // remove all elements by default. - shrinkIndex := len(qs.queryStatRecords) - for i, v := range qs.queryStatRecords { - if t < v.execTime { - shrinkIndex = i - break - } - } - if shrinkIndex > 0 { - qs.queryStatRecords = qs.queryStatRecords[shrinkIndex:] - } -} - -// calculates cumulative duration for query. -func (qs *queryStats) Duration() time.Duration { - var cnt time.Duration - for _, v := range qs.queryStatRecords { - cnt += v.duration - } - return cnt -} - -// must be called with mutex, -// shrinks slice by the last added query with given shrinkSize. -func (qst *queryStatsTracker) shrink(shrinkSize int) { - if len(qst.qs) < shrinkSize { - return - } - sort.Slice(qst.qs, func(i, j int) bool { - return qst.qs[i].queryLastSeen < qst.qs[j].queryLastSeen - }) - qst.qs = qst.qs[shrinkSize:] -} - -// drop outdated keys. -func (qst *queryStatsTracker) dropOutdatedRecords() { - qst.queryStatsLocker.Lock() - defer qst.queryStatsLocker.Unlock() - t := time.Now().Add(-qst.maxQueryLogRecordTime).Unix() - var i int - for _, v := range qst.qs { - v.dropOutDatedRecords(t) - if len(v.queryStatRecords) > 0 { - qst.qs[i] = v - i++ - } - } - if i == len(qst.qs) { - return - } - qst.qs = qst.qs[:i] -} - -func (qst *queryStatsTracker) insertQueryStat(query string, tr int64, execTime time.Time, duration time.Duration) { - qst.queryStatsLocker.Lock() - defer qst.queryStatsLocker.Unlock() - // shrink old queries. - if len(qst.qs) > qst.limit { - shrinkQueryStatsCalls.Inc() - qst.shrink(1) - } - // add record to exist stats, keyed by query string and time-range. - for i, v := range qst.qs { - if v.query == query && v.queryRange == tr { - v.queryLastSeen = execTime.Unix() - v.queryStatRecords = append(v.queryStatRecords, queryStatRecord{execTime: execTime.Unix(), duration: duration}) - qst.qs[i] = v - return - } - } - qst.qs = append(qst.qs, queryStats{ - queryStatRecords: []queryStatRecord{{execTime: execTime.Unix(), duration: duration}}, - queryLastSeen: execTime.Unix(), - query: query, - queryRange: tr, - }) - -} - -func getTopNQueriesByAvgDuration(qst *queryStatsTracker, top int) []queryStats { - return getTopNQueryStatsItemsWithFilter(qst, top, func(i, j int) bool { - lenI := len(qst.qs[i].queryStatRecords) - lenJ := len(qst.qs[j].queryStatRecords) - if lenI == 0 || lenJ == 0 { - return false - } - return qst.qs[i].Duration()/time.Duration(lenI) > qst.qs[j].Duration()/time.Duration(lenJ) - }) -} - -func getTopNQueriesByRecordCount(qst *queryStatsTracker, top int) []queryStats { - return getTopNQueryStatsItemsWithFilter(qst, top, func(i, j int) bool { - return len(qst.qs[i].queryStatRecords) > len(qst.qs[j].queryStatRecords) - }) -} - -func getTopNQueriesByDuration(qst *queryStatsTracker, top int) []queryStats { - return getTopNQueryStatsItemsWithFilter(qst, top, func(i, j int) bool { - return qst.qs[i].Duration() > qst.qs[j].Duration() - }) -} - -func getTopNQueryStatsItemsWithFilter(qst *queryStatsTracker, top int, filterFunc func(i, j int) bool) []queryStats { - qst.queryStatsLocker.Lock() - defer qst.queryStatsLocker.Unlock() - if top > len(qst.qs) { - top = len(qst.qs) - } - sort.Slice(qst.qs, filterFunc) - result := make([]queryStats, 0, top) - result = append(result, qst.qs[:top]...) - return result -} diff --git a/app/vmselect/promql/query_stats_test.go b/app/vmselect/promql/query_stats_test.go deleted file mode 100644 index eec36c0fa..000000000 --- a/app/vmselect/promql/query_stats_test.go +++ /dev/null @@ -1,144 +0,0 @@ -package promql - -import ( - "fmt" - "reflect" - "strings" - "testing" - "time" -) - -func TestQueryLoggerShrink(t *testing.T) { - f := func(addItemCount, limit, expectedLen int) { - t.Helper() - qst := &queryStatsTracker{ - limit: limit, - maxQueryLogRecordTime: time.Second * 5, - } - for i := 0; i < addItemCount; i++ { - qst.insertQueryStat(fmt.Sprintf("random-n-%d", i), int64(i), time.Now().Add(-time.Second), 500+time.Duration(i)) - } - if len(qst.qs) != expectedLen { - t.Fatalf("unxpected len got=%d, for queryStats slice, want=%d", len(qst.qs), expectedLen) - } - } - f(10, 5, 6) - f(30, 10, 11) - f(15, 15, 15) -} - -func TestGetTopNQueriesByDuration(t *testing.T) { - f := func(topN int, expectedQueryStats []queryStats) { - t.Helper() - ql := &queryStatsTracker{ - limit: 25, - maxQueryLogRecordTime: time.Second * 5, - } - queriesDurations := []int{16, 4, 5, 10} - for i, v := range queriesDurations { - ql.insertQueryStat(fmt.Sprintf("query-n-%d", i), int64(0), time.Now(), time.Second*time.Duration(v)) - } - got := getTopNQueriesByAvgDuration(ql, topN) - - if len(got) != len(expectedQueryStats) { - t.Fatalf("unxpected len of result, got: %d, want: %d", len(got), len(expectedQueryStats)) - } - for i, gotR := range got { - if gotR.query != expectedQueryStats[i].query { - t.Fatalf("unxpected query: %q at position: %d, want: %q", gotR.query, i, expectedQueryStats[i].query) - } - } - } - f(1, []queryStats{{query: "query-n-0"}}) - f(2, []queryStats{{query: "query-n-0"}, {query: "query-n-3"}}) -} - -func TestGetTopNQueriesByCount(t *testing.T) { - f := func(topN int, expectedQueryStats []queryStats) { - t.Helper() - ql := &queryStatsTracker{ - limit: 25, - maxQueryLogRecordTime: time.Second * 5, - } - queriesCounts := []int{1, 4, 5, 11} - for i, v := range queriesCounts { - for ic := 0; ic < v; ic++ { - ql.insertQueryStat(fmt.Sprintf("query-n-%d", i), int64(0), time.Now(), time.Second*time.Duration(v)) - } - } - - got := getTopNQueriesByRecordCount(ql, topN) - - if len(got) != len(expectedQueryStats) { - t.Fatalf("unxpected len of result, got: %d, want: %d", len(got), len(expectedQueryStats)) - } - for i, gotR := range got { - if gotR.query != expectedQueryStats[i].query { - t.Fatalf("unxpected query: %q at position: %d, want: %q", gotR.query, i, expectedQueryStats[i].query) - } - } - } - f(1, []queryStats{{query: "query-n-3"}}) - f(2, []queryStats{{query: "query-n-3"}, {query: "query-n-2"}}) -} - -func TestGetTopNQueriesByAverageDuration(t *testing.T) { - f := func(topN int, expectedQueryStats []queryStats) { - t.Helper() - ql := &queryStatsTracker{ - limit: 25, - maxQueryLogRecordTime: time.Second * 5, - } - queriesQurations := []int{4, 25, 14, 10} - for i, v := range queriesQurations { - ql.insertQueryStat(fmt.Sprintf("query-n-%d", i), int64(0), time.Now(), time.Second*time.Duration(v)) - } - - got := getTopNQueriesByAvgDuration(ql, topN) - - if len(got) != len(expectedQueryStats) { - t.Fatalf("unxpected len of result, got: %d, want: %d", len(got), len(expectedQueryStats)) - } - for i, gotR := range got { - if gotR.query != expectedQueryStats[i].query { - t.Fatalf("unxpected query: %q at position: %d, want: %q", gotR.query, i, expectedQueryStats[i].query) - } - } - } - f(1, []queryStats{{query: "query-n-1"}}) - f(2, []queryStats{{query: "query-n-1"}, {query: "query-n-2"}}) -} - -func TestWriteJSONQueryStats(t *testing.T) { - qst := queryStatsTracker{ - limit: 100, - maxQueryLogRecordTime: time.Minute * 5, - } - t1 := time.Now() - qst.insertQueryStat("sum(rate(rps_total)[1m]) by(service)", 360, t1, time.Microsecond*100) - qst.insertQueryStat("up", 360, t1, time.Microsecond) - qst.insertQueryStat("up", 360, t1, time.Microsecond) - qst.insertQueryStat("up", 360, t1, time.Microsecond) - - f := func(t *testing.T, wantResp, aggregateBy string) { - var got strings.Builder - writeJSONQueryStats(&got, &qst, 5, aggregateBy) - if !reflect.DeepEqual(got.String(), wantResp) { - t.Fatalf("unexpected response, \ngot: %s,\nwant: %s", got.String(), wantResp) - } - } - - t.Run("aggregateByDuration", func(t *testing.T) { - f(t, `{"top_n": "5","stats_max_duration": "10m0s","top": [{"query": "sum(rate(rps_total)[1m]) by(service)","query_time_range": "360ms","cumalative_duration": "100µs","avg_duration": "100µs","requests_count": "1"},{"query": "up","query_time_range": "360ms","cumalative_duration": "3µs","avg_duration": "1µs","requests_count": "3"}]}`, - "duration") - }) - t.Run("aggregateByfrequency", func(t *testing.T) { - f(t, `{"top_n": "5","stats_max_duration": "10m0s","top": [{"query": "up","query_time_range": "360ms","cumalative_duration": "3µs","avg_duration": "1µs","requests_count": "3"},{"query": "sum(rate(rps_total)[1m]) by(service)","query_time_range": "360ms","cumalative_duration": "100µs","avg_duration": "100µs","requests_count": "1"}]}`, - "frequency") - }) - t.Run("aggregateByDuration", func(t *testing.T) { - f(t, `{"top_n": "5","stats_max_duration": "10m0s","top": [{"query": "sum(rate(rps_total)[1m]) by(service)","query_time_range": "360ms","cumalative_duration": "100µs","avg_duration": "100µs","requests_count": "1"},{"query": "up","query_time_range": "360ms","cumalative_duration": "3µs","avg_duration": "1µs","requests_count": "3"}]}`, - "avg_duration") - }) - -} diff --git a/app/vmselect/querystats/querystats.go b/app/vmselect/querystats/querystats.go new file mode 100644 index 000000000..b4fa385d6 --- /dev/null +++ b/app/vmselect/querystats/querystats.go @@ -0,0 +1,247 @@ +package querystats + +import ( + "flag" + "fmt" + "io" + "sort" + "sync" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" +) + +var ( + lastQueriesCount = flag.Int("search.queryStats.lastQueriesCount", 100000, "Query stats for `/api/v1/status/top_queries` is tracked on this number of last queries. "+ + "Zero value disables query stats tracking") + minQueryDuration = flag.Duration("search.queryStats.minQueryDuration", 0, "The minimum duration for queries to track in query stats at `/api/v1/status/top_queries`. "+ + "Queries with lower duration are ignored in query stats") +) + +var ( + qsTracker *queryStatsTracker + initOnce sync.Once +) + +// Enabled returns true of query stats tracking is enabled. +func Enabled() bool { + return *lastQueriesCount > 0 +} + +// RegisterQuery registers the query on the given timeRangeMsecs, which has been started at startTime. +// +// RegisterQuery must be called when the query is finished. +func RegisterQuery(query string, timeRangeMsecs int64, startTime time.Time) { + initOnce.Do(initQueryStats) + qsTracker.registerQuery(query, timeRangeMsecs, startTime) +} + +// WriteJSONQueryStats writes query stats to given writer in json format. +func WriteJSONQueryStats(w io.Writer, topN int, maxLifetime time.Duration) { + initOnce.Do(initQueryStats) + qsTracker.writeJSONQueryStats(w, topN, maxLifetime) +} + +// queryStatsTracker holds statistics for queries +type queryStatsTracker struct { + mu sync.Mutex + a []queryStatRecord + nextIdx uint +} + +type queryStatRecord struct { + query string + timeRangeSecs int64 + registerTime time.Time + duration time.Duration +} + +type queryStatKey struct { + query string + timeRangeSecs int64 +} + +func initQueryStats() { + recordsCount := *lastQueriesCount + if recordsCount <= 0 { + recordsCount = 1 + } else { + logger.Infof("enabled query stats tracking at `/api/v1/status/top_queries` with -search.queryStats.lastQueriesCount=%d, -search.queryStats.minQueryDuration=%s", + *lastQueriesCount, *minQueryDuration) + } + qsTracker = &queryStatsTracker{ + a: make([]queryStatRecord, recordsCount), + } +} + +func (qst *queryStatsTracker) writeJSONQueryStats(w io.Writer, topN int, maxLifetime time.Duration) { + fmt.Fprintf(w, `{"topN":"%d","maxLifetime":%q,`, topN, maxLifetime) + fmt.Fprintf(w, `"search.queryStats.lastQueriesCount":%d,`, *lastQueriesCount) + fmt.Fprintf(w, `"search.queryStats.minQueryDuration":%q,`, *minQueryDuration) + fmt.Fprintf(w, `"topByCount":[`) + topByCount := qst.getTopByCount(topN, maxLifetime) + for i, r := range topByCount { + fmt.Fprintf(w, `{"query":%q,"timeRangeSeconds":%d,"count":%d}`, r.query, r.timeRangeSecs, r.count) + if i+1 < len(topByCount) { + fmt.Fprintf(w, `,`) + } + } + fmt.Fprintf(w, `],"topByAvgDuration":[`) + topByAvgDuration := qst.getTopByAvgDuration(topN, maxLifetime) + for i, r := range topByAvgDuration { + fmt.Fprintf(w, `{"query":%q,"timeRangeSeconds":%d,"avgDurationSeconds":%.3f}`, r.query, r.timeRangeSecs, r.duration.Seconds()) + if i+1 < len(topByAvgDuration) { + fmt.Fprintf(w, `,`) + } + } + fmt.Fprintf(w, `],"topBySumDuration":[`) + topBySumDuration := qst.getTopBySumDuration(topN, maxLifetime) + for i, r := range topBySumDuration { + fmt.Fprintf(w, `{"query":%q,"timeRangeSeconds":%d,"sumDurationSeconds":%.3f}`, r.query, r.timeRangeSecs, r.duration.Seconds()) + if i+1 < len(topBySumDuration) { + fmt.Fprintf(w, `,`) + } + } + fmt.Fprintf(w, `]}`) +} + +func (qst *queryStatsTracker) registerQuery(query string, timeRangeMsecs int64, startTime time.Time) { + registerTime := time.Now() + duration := registerTime.Sub(startTime) + if duration < *minQueryDuration { + return + } + + qst.mu.Lock() + defer qst.mu.Unlock() + + a := qst.a + idx := qst.nextIdx + if idx >= uint(len(a)) { + idx = 0 + } + qst.nextIdx = idx + 1 + r := &a[idx] + r.query = query + r.timeRangeSecs = timeRangeMsecs / 1000 + r.registerTime = registerTime + r.duration = duration +} + +func (qst *queryStatsTracker) getTopByCount(topN int, maxLifetime time.Duration) []queryStatByCount { + currentTime := time.Now() + qst.mu.Lock() + m := make(map[queryStatKey]int) + for _, r := range qst.a { + if r.query == "" || currentTime.Sub(r.registerTime) > maxLifetime { + continue + } + k := queryStatKey{ + query: r.query, + timeRangeSecs: r.timeRangeSecs, + } + m[k] = m[k] + 1 + } + qst.mu.Unlock() + + var a []queryStatByCount + for k, count := range m { + a = append(a, queryStatByCount{ + query: k.query, + timeRangeSecs: k.timeRangeSecs, + count: count, + }) + } + sort.Slice(a, func(i, j int) bool { + return a[i].count > a[j].count + }) + if len(a) > topN { + a = a[:topN] + } + return a +} + +type queryStatByCount struct { + query string + timeRangeSecs int64 + count int +} + +func (qst *queryStatsTracker) getTopByAvgDuration(topN int, maxLifetime time.Duration) []queryStatByDuration { + currentTime := time.Now() + qst.mu.Lock() + type countSum struct { + count int + sum time.Duration + } + m := make(map[queryStatKey]countSum) + for _, r := range qst.a { + if r.query == "" || currentTime.Sub(r.registerTime) > maxLifetime { + continue + } + k := queryStatKey{ + query: r.query, + timeRangeSecs: r.timeRangeSecs, + } + ks := m[k] + ks.count++ + ks.sum += r.duration + m[k] = ks + } + qst.mu.Unlock() + + var a []queryStatByDuration + for k, ks := range m { + a = append(a, queryStatByDuration{ + query: k.query, + timeRangeSecs: k.timeRangeSecs, + duration: ks.sum / time.Duration(ks.count), + }) + } + sort.Slice(a, func(i, j int) bool { + return a[i].duration > a[j].duration + }) + if len(a) > topN { + a = a[:topN] + } + return a +} + +type queryStatByDuration struct { + query string + timeRangeSecs int64 + duration time.Duration +} + +func (qst *queryStatsTracker) getTopBySumDuration(topN int, maxLifetime time.Duration) []queryStatByDuration { + currentTime := time.Now() + qst.mu.Lock() + m := make(map[queryStatKey]time.Duration) + for _, r := range qst.a { + if r.query == "" || currentTime.Sub(r.registerTime) > maxLifetime { + continue + } + k := queryStatKey{ + query: r.query, + timeRangeSecs: r.timeRangeSecs, + } + m[k] = m[k] + r.duration + } + qst.mu.Unlock() + + var a []queryStatByDuration + for k, d := range m { + a = append(a, queryStatByDuration{ + query: k.query, + timeRangeSecs: k.timeRangeSecs, + duration: d, + }) + } + sort.Slice(a, func(i, j int) bool { + return a[i].duration > a[j].duration + }) + if len(a) > topN { + a = a[:topN] + } + return a +} diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index a03f6fe08..df7d27cc9 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -2,6 +2,7 @@ # tip +* FEATURE: add `/api/v1/status/top_queries` handler, which returns the most frequently executed queries and queries that took the most time for execution. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/907 * FEATURE: vmagent: add support for `proxy_url` config option in Prometheus scrape configs. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/503 * FEATURE: remove parts with stale data as soon as they go outside the configured `-retentionPeriod`. Previously such parts may remain active for long periods of time. This should help reducing disk usage for `-retentionPeriod` smaller than one month. * FEATURE: vmalert: allow setting multiple values for `-notifier.tlsInsecureSkipVerify` command-line flag per each `-notifier.url`. diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index a8544b656..d57e82571 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -535,11 +535,17 @@ See [this feature request](https://github.com/prometheus/prometheus/issues/6178) Additionally VictoriaMetrics provides the following handlers: -* `/api/v1/series/count` - it returns the total number of time series in the database. Some notes: +* `/api/v1/series/count` - returns the total number of time series in the database. Some notes: * the handler scans all the inverted index, so it can be slow if the database contains tens of millions of time series; * the handler may count [deleted time series](#how-to-delete-time-series) additionally to normal time series due to internal implementation restrictions; -* `/api/v1/labels/count` - it returns a list of `label: values_count` entries. It can be used for determining labels with the maximum number of values. -* `/api/v1/status/active_queries` - it returns a list of currently running queries. +* `/api/v1/labels/count` - returns a list of `label: values_count` entries. It can be used for determining labels with the maximum number of values. +* `/api/v1/status/active_queries` - returns a list of currently running queries. +* `/api/v1/status/top_queries` - returns the following query lists: + * the most frequently executed queries - `topByCount` + * queries with the biggest average execution duration - `topByAvgDuration` + * queries that took the most time for execution - `topBySumDuration` + The number of returned queries can be limited via `topN` query arg. Old queries can be filtered out with `maxLifetime` query arg. + For example, request to `/api/v1/status/top_queries?topN=5&maxLifetime=30s` would return up to 5 queries per list, which were executed during the last 30 seconds. ## Graphite API usage From ad4e6a92837409122d76ba91bdd7ac3c4abc0fa3 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 25 Dec 2020 17:39:50 +0200 Subject: [PATCH 7/7] app/vmselect/querystats: reduce the default number of last queries to track from 100K to 20K This should reduce memory usage in constrained environments --- app/vmselect/querystats/querystats.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/vmselect/querystats/querystats.go b/app/vmselect/querystats/querystats.go index b4fa385d6..5f7a32559 100644 --- a/app/vmselect/querystats/querystats.go +++ b/app/vmselect/querystats/querystats.go @@ -12,7 +12,7 @@ import ( ) var ( - lastQueriesCount = flag.Int("search.queryStats.lastQueriesCount", 100000, "Query stats for `/api/v1/status/top_queries` is tracked on this number of last queries. "+ + lastQueriesCount = flag.Int("search.queryStats.lastQueriesCount", 20000, "Query stats for `/api/v1/status/top_queries` is tracked on this number of last queries. "+ "Zero value disables query stats tracking") minQueryDuration = flag.Duration("search.queryStats.minQueryDuration", 0, "The minimum duration for queries to track in query stats at `/api/v1/status/top_queries`. "+ "Queries with lower duration are ignored in query stats")