mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
Adds query stats handler (#945)
* Adds query stat handler, for query and query_range api, victoriametrics tracks query execution time, stats are expored at /api/v1/status/queries endpoint with topN param https://github.com/VictoriaMetrics/VictoriaMetrics/issues/907 * fixed query stats bugs * improves queryStats tracker * improves query stat * small fix * fix tests * added more tests * fixes 386 tests * naming fixes * adds drop for outdated records
This commit is contained in:
parent
ca8919e8e1
commit
76d092c091
5 changed files with 458 additions and 0 deletions
|
@ -274,6 +274,25 @@ func selectHandler(startTime time.Time, w http.ResponseWriter, r *http.Request,
|
|||
return true
|
||||
}
|
||||
return true
|
||||
case "prometheus/api/v1/status/queries/avg_duration":
|
||||
if err := prometheus.QueryStatsHandler(startTime, w, r, "avg_duration"); err != nil {
|
||||
sendPrometheusError(w, r, fmt.Errorf("cannot query status endpoint: %w", err))
|
||||
return true
|
||||
}
|
||||
return true
|
||||
case "prometheus/api/v1/status/queries/duration":
|
||||
if err := prometheus.QueryStatsHandler(startTime, w, r, "duration"); err != nil {
|
||||
sendPrometheusError(w, r, fmt.Errorf("cannot query status endpoint: %w", err))
|
||||
return true
|
||||
}
|
||||
return true
|
||||
case "prometheus/api/v1/status/queries/frequency":
|
||||
if err := prometheus.QueryStatsHandler(startTime, w, r, "frequency"); err != nil {
|
||||
sendPrometheusError(w, r, fmt.Errorf("cannot query status endpoint: %w", err))
|
||||
return true
|
||||
}
|
||||
return true
|
||||
|
||||
case "prometheus/api/v1/status/tsdb":
|
||||
statusTSDBRequests.Inc()
|
||||
if err := prometheus.TSDBStatusHandler(startTime, at, w, r); err != nil {
|
||||
|
|
|
@ -1326,3 +1326,37 @@ func getLatencyOffsetMilliseconds() int64 {
|
|||
}
|
||||
return d
|
||||
}
|
||||
|
||||
// QueryStatsHandler - returns statistics for queries executions with given aggregate func name.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/907
|
||||
func QueryStatsHandler(startTime time.Time, w http.ResponseWriter, r *http.Request, aggregateBy string) error {
|
||||
if err := r.ParseForm(); err != nil {
|
||||
return fmt.Errorf("cannot parse form values: %w", err)
|
||||
}
|
||||
topN := 10
|
||||
topNStr := r.FormValue("topN")
|
||||
if len(topNStr) > 0 {
|
||||
n, err := strconv.Atoi(topNStr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot parse `topN` arg %q: %w", topNStr, err)
|
||||
}
|
||||
if n <= 0 {
|
||||
n = 1
|
||||
}
|
||||
if n > 1000 {
|
||||
n = 1000
|
||||
}
|
||||
topN = n
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||||
bw := bufferedwriter.Get(w)
|
||||
defer bufferedwriter.Put(bw)
|
||||
promql.WriteQueryStatsResponse(bw, topN, aggregateBy)
|
||||
if err := bw.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
queryStatsDuration.UpdateDuration(startTime)
|
||||
return nil
|
||||
}
|
||||
|
||||
var queryStatsDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/status/queries"}`)
|
||||
|
|
|
@ -39,6 +39,13 @@ func Exec(ec *EvalConfig, q string, isFirstPointOnly bool) ([]netstorage.Result,
|
|||
}
|
||||
}()
|
||||
}
|
||||
if *maxQueryStatsTrackerItemsCount > 0 {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
tr := ec.End - ec.Start
|
||||
InsertQueryStat(q, tr, start, time.Since(start))
|
||||
}()
|
||||
}
|
||||
|
||||
ec.validate()
|
||||
|
||||
|
|
254
app/vmselect/promql/query_stats.go
Normal file
254
app/vmselect/promql/query_stats.go
Normal file
|
@ -0,0 +1,254 @@
|
|||
package promql
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
maxQueryStatsRecordLifeTime = flag.Duration("search.MaxQueryStatsRecordLifeTime", 10*time.Minute, "Limits maximum lifetime for query stats record. With minimum 10 seconds")
|
||||
maxQueryStatsTrackerItemsCount = flag.Int("search.MaxQueryStatsItems", 1000, "Limits count for distinct query stat records, keyed by query name and query time range. "+
|
||||
"With Maximum 5000 records. Zero value disables query stats recording")
|
||||
)
|
||||
|
||||
var (
|
||||
shrinkQueryStatsCalls = metrics.NewCounter(`vm_query_stats_shrink_calls_total`)
|
||||
globalQueryStatsTracker *queryStatsTracker
|
||||
gQSTOnce sync.Once
|
||||
)
|
||||
|
||||
// InsertQueryStat - inserts query stats record to global query stats tracker
|
||||
// with given query name, query time-range, execution time and its duration.
|
||||
func InsertQueryStat(query string, tr int64, execTime time.Time, duration time.Duration) {
|
||||
gQSTOnce.Do(func() {
|
||||
initQueryStatsTracker()
|
||||
})
|
||||
globalQueryStatsTracker.insertQueryStat(query, tr, execTime, duration)
|
||||
}
|
||||
|
||||
// WriteQueryStatsResponse - writes query stats to given writer in json format with given aggregate key.
|
||||
func WriteQueryStatsResponse(w io.Writer, topN int, aggregateBy string) {
|
||||
gQSTOnce.Do(func() {
|
||||
initQueryStatsTracker()
|
||||
})
|
||||
writeJSONQueryStats(w, globalQueryStatsTracker, topN, aggregateBy)
|
||||
}
|
||||
|
||||
// queryStatsTracker - hold statistics for all queries,
|
||||
// query name and query range is a group key.
|
||||
type queryStatsTracker struct {
|
||||
maxQueryLogRecordTime time.Duration
|
||||
limit int
|
||||
queryStatsLocker sync.Mutex
|
||||
qs []queryStats
|
||||
}
|
||||
|
||||
// queryStats - represent single query statistic.
|
||||
type queryStats struct {
|
||||
query string
|
||||
queryRange int64
|
||||
queryLastSeen int64
|
||||
queryStatRecords []queryStatRecord
|
||||
}
|
||||
|
||||
// queryStatRecord - one record of query stat.
|
||||
type queryStatRecord struct {
|
||||
// end-start
|
||||
duration time.Duration
|
||||
// in seconds as unix_ts.
|
||||
execTime int64
|
||||
}
|
||||
|
||||
func initQueryStatsTracker() {
|
||||
limit := *maxQueryStatsTrackerItemsCount
|
||||
if limit > 5000 {
|
||||
limit = 5000
|
||||
}
|
||||
qlt := *maxQueryStatsRecordLifeTime
|
||||
if qlt == 0 {
|
||||
qlt = time.Second * 10
|
||||
}
|
||||
logger.Infof("enabled query stats tracking, max records count: %d, max query record lifetime: %s", limit, qlt)
|
||||
qst := queryStatsTracker{
|
||||
limit: limit,
|
||||
maxQueryLogRecordTime: qlt,
|
||||
}
|
||||
go func() {
|
||||
for {
|
||||
time.Sleep(time.Second * 10)
|
||||
qst.dropOutdatedRecords()
|
||||
}
|
||||
}()
|
||||
globalQueryStatsTracker = &qst
|
||||
}
|
||||
|
||||
func formatJSONQueryStats(queries []queryStats) string {
|
||||
var s strings.Builder
|
||||
for i, q := range queries {
|
||||
fmt.Fprintf(&s, `{"query": %q,`, q.query)
|
||||
fmt.Fprintf(&s, `"query_time_range": %q,`, time.Duration(q.queryRange*1e6))
|
||||
fmt.Fprintf(&s, `"cumalative_duration": %q,`, q.Duration())
|
||||
if len(q.queryStatRecords) > 0 {
|
||||
fmt.Fprintf(&s, `"avg_duration": %q,`, q.Duration()/time.Duration(len(q.queryStatRecords)))
|
||||
}
|
||||
fmt.Fprintf(&s, `"requests_count": "%d"`, len(q.queryStatRecords))
|
||||
s.WriteString(`}`)
|
||||
if i != len(queries)-1 {
|
||||
s.WriteString(`,`)
|
||||
}
|
||||
|
||||
}
|
||||
return s.String()
|
||||
}
|
||||
|
||||
func writeJSONQueryStats(w io.Writer, ql *queryStatsTracker, topN int, aggregateBy string) {
|
||||
fmt.Fprintf(w, `{"top_n": "%d",`, topN)
|
||||
fmt.Fprintf(w, `"stats_max_duration": %q,`, maxQueryStatsRecordLifeTime.String())
|
||||
fmt.Fprint(w, `"top": [`)
|
||||
switch aggregateBy {
|
||||
case "frequency":
|
||||
fmt.Fprint(w, formatJSONQueryStats(getTopNQueriesByRecordCount(ql, topN)))
|
||||
case "duration":
|
||||
fmt.Fprint(w, formatJSONQueryStats(getTopNQueriesByDuration(ql, topN)))
|
||||
case "avg_duration":
|
||||
fmt.Fprint(w, formatJSONQueryStats(getTopNQueriesByAvgDuration(ql, topN)))
|
||||
default:
|
||||
logger.Errorf("invalid aggregation key=%q, report bug", aggregateBy)
|
||||
fmt.Fprintf(w, `{"error": "invalid aggregateBy value=%s"}`, aggregateBy)
|
||||
}
|
||||
fmt.Fprint(w, `]`)
|
||||
fmt.Fprint(w, `}`)
|
||||
}
|
||||
|
||||
// drops query stats records less then given time in seconds.
|
||||
// no need to sort
|
||||
// its added in chronological order.
|
||||
// must be called with mutex.
|
||||
func (qs *queryStats) dropOutDatedRecords(t int64) {
|
||||
// fast path
|
||||
// compare time with last elem.
|
||||
if len(qs.queryStatRecords) > 0 && qs.queryStatRecords[len(qs.queryStatRecords)-1].execTime < t {
|
||||
qs.queryStatRecords = qs.queryStatRecords[:0]
|
||||
return
|
||||
}
|
||||
// remove all elements by default.
|
||||
shrinkIndex := len(qs.queryStatRecords)
|
||||
for i, v := range qs.queryStatRecords {
|
||||
if t < v.execTime {
|
||||
shrinkIndex = i
|
||||
break
|
||||
}
|
||||
}
|
||||
if shrinkIndex > 0 {
|
||||
qs.queryStatRecords = qs.queryStatRecords[shrinkIndex:]
|
||||
}
|
||||
}
|
||||
|
||||
// calculates cumulative duration for query.
|
||||
func (qs *queryStats) Duration() time.Duration {
|
||||
var cnt time.Duration
|
||||
for _, v := range qs.queryStatRecords {
|
||||
cnt += v.duration
|
||||
}
|
||||
return cnt
|
||||
}
|
||||
|
||||
// must be called with mutex,
|
||||
// shrinks slice by the last added query with given shrinkSize.
|
||||
func (qst *queryStatsTracker) shrink(shrinkSize int) {
|
||||
if len(qst.qs) < shrinkSize {
|
||||
return
|
||||
}
|
||||
sort.Slice(qst.qs, func(i, j int) bool {
|
||||
return qst.qs[i].queryLastSeen < qst.qs[j].queryLastSeen
|
||||
})
|
||||
qst.qs = qst.qs[shrinkSize:]
|
||||
}
|
||||
|
||||
// drop outdated keys.
|
||||
func (qst *queryStatsTracker) dropOutdatedRecords() {
|
||||
qst.queryStatsLocker.Lock()
|
||||
defer qst.queryStatsLocker.Unlock()
|
||||
t := time.Now().Add(-qst.maxQueryLogRecordTime).Unix()
|
||||
var i int
|
||||
for _, v := range qst.qs {
|
||||
v.dropOutDatedRecords(t)
|
||||
if len(v.queryStatRecords) > 0 {
|
||||
qst.qs[i] = v
|
||||
i++
|
||||
}
|
||||
}
|
||||
if i == len(qst.qs) {
|
||||
return
|
||||
}
|
||||
qst.qs = qst.qs[:i]
|
||||
}
|
||||
|
||||
func (qst *queryStatsTracker) insertQueryStat(query string, tr int64, execTime time.Time, duration time.Duration) {
|
||||
qst.queryStatsLocker.Lock()
|
||||
defer qst.queryStatsLocker.Unlock()
|
||||
// shrink old queries.
|
||||
if len(qst.qs) > qst.limit {
|
||||
shrinkQueryStatsCalls.Inc()
|
||||
qst.shrink(1)
|
||||
}
|
||||
// add record to exist stats, keyed by query string and time-range.
|
||||
for i, v := range qst.qs {
|
||||
if v.query == query && v.queryRange == tr {
|
||||
v.queryLastSeen = execTime.Unix()
|
||||
v.queryStatRecords = append(v.queryStatRecords, queryStatRecord{execTime: execTime.Unix(), duration: duration})
|
||||
qst.qs[i] = v
|
||||
return
|
||||
}
|
||||
}
|
||||
qst.qs = append(qst.qs, queryStats{
|
||||
queryStatRecords: []queryStatRecord{{execTime: execTime.Unix(), duration: duration}},
|
||||
queryLastSeen: execTime.Unix(),
|
||||
query: query,
|
||||
queryRange: tr,
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func getTopNQueriesByAvgDuration(qst *queryStatsTracker, top int) []queryStats {
|
||||
return getTopNQueryStatsItemsWithFilter(qst, top, func(i, j int) bool {
|
||||
lenI := len(qst.qs[i].queryStatRecords)
|
||||
lenJ := len(qst.qs[j].queryStatRecords)
|
||||
if lenI == 0 || lenJ == 0 {
|
||||
return false
|
||||
}
|
||||
return qst.qs[i].Duration()/time.Duration(lenI) > qst.qs[j].Duration()/time.Duration(lenJ)
|
||||
})
|
||||
}
|
||||
|
||||
func getTopNQueriesByRecordCount(qst *queryStatsTracker, top int) []queryStats {
|
||||
return getTopNQueryStatsItemsWithFilter(qst, top, func(i, j int) bool {
|
||||
return len(qst.qs[i].queryStatRecords) > len(qst.qs[j].queryStatRecords)
|
||||
})
|
||||
}
|
||||
|
||||
func getTopNQueriesByDuration(qst *queryStatsTracker, top int) []queryStats {
|
||||
return getTopNQueryStatsItemsWithFilter(qst, top, func(i, j int) bool {
|
||||
return qst.qs[i].Duration() > qst.qs[j].Duration()
|
||||
})
|
||||
}
|
||||
|
||||
func getTopNQueryStatsItemsWithFilter(qst *queryStatsTracker, top int, filterFunc func(i, j int) bool) []queryStats {
|
||||
qst.queryStatsLocker.Lock()
|
||||
defer qst.queryStatsLocker.Unlock()
|
||||
if top > len(qst.qs) {
|
||||
top = len(qst.qs)
|
||||
}
|
||||
sort.Slice(qst.qs, filterFunc)
|
||||
result := make([]queryStats, 0, top)
|
||||
result = append(result, qst.qs[:top]...)
|
||||
return result
|
||||
}
|
144
app/vmselect/promql/query_stats_test.go
Normal file
144
app/vmselect/promql/query_stats_test.go
Normal file
|
@ -0,0 +1,144 @@
|
|||
package promql
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestQueryLoggerShrink(t *testing.T) {
|
||||
f := func(addItemCount, limit, expectedLen int) {
|
||||
t.Helper()
|
||||
qst := &queryStatsTracker{
|
||||
limit: limit,
|
||||
maxQueryLogRecordTime: time.Second * 5,
|
||||
}
|
||||
for i := 0; i < addItemCount; i++ {
|
||||
qst.insertQueryStat(fmt.Sprintf("random-n-%d", i), int64(i), time.Now().Add(-time.Second), 500+time.Duration(i))
|
||||
}
|
||||
if len(qst.qs) != expectedLen {
|
||||
t.Fatalf("unxpected len got=%d, for queryStats slice, want=%d", len(qst.qs), expectedLen)
|
||||
}
|
||||
}
|
||||
f(10, 5, 6)
|
||||
f(30, 10, 11)
|
||||
f(15, 15, 15)
|
||||
}
|
||||
|
||||
func TestGetTopNQueriesByDuration(t *testing.T) {
|
||||
f := func(topN int, expectedQueryStats []queryStats) {
|
||||
t.Helper()
|
||||
ql := &queryStatsTracker{
|
||||
limit: 25,
|
||||
maxQueryLogRecordTime: time.Second * 5,
|
||||
}
|
||||
queriesDurations := []int{16, 4, 5, 10}
|
||||
for i, v := range queriesDurations {
|
||||
ql.insertQueryStat(fmt.Sprintf("query-n-%d", i), int64(0), time.Now(), time.Second*time.Duration(v))
|
||||
}
|
||||
got := getTopNQueriesByAvgDuration(ql, topN)
|
||||
|
||||
if len(got) != len(expectedQueryStats) {
|
||||
t.Fatalf("unxpected len of result, got: %d, want: %d", len(got), len(expectedQueryStats))
|
||||
}
|
||||
for i, gotR := range got {
|
||||
if gotR.query != expectedQueryStats[i].query {
|
||||
t.Fatalf("unxpected query: %q at position: %d, want: %q", gotR.query, i, expectedQueryStats[i].query)
|
||||
}
|
||||
}
|
||||
}
|
||||
f(1, []queryStats{{query: "query-n-0"}})
|
||||
f(2, []queryStats{{query: "query-n-0"}, {query: "query-n-3"}})
|
||||
}
|
||||
|
||||
func TestGetTopNQueriesByCount(t *testing.T) {
|
||||
f := func(topN int, expectedQueryStats []queryStats) {
|
||||
t.Helper()
|
||||
ql := &queryStatsTracker{
|
||||
limit: 25,
|
||||
maxQueryLogRecordTime: time.Second * 5,
|
||||
}
|
||||
queriesCounts := []int{1, 4, 5, 11}
|
||||
for i, v := range queriesCounts {
|
||||
for ic := 0; ic < v; ic++ {
|
||||
ql.insertQueryStat(fmt.Sprintf("query-n-%d", i), int64(0), time.Now(), time.Second*time.Duration(v))
|
||||
}
|
||||
}
|
||||
|
||||
got := getTopNQueriesByRecordCount(ql, topN)
|
||||
|
||||
if len(got) != len(expectedQueryStats) {
|
||||
t.Fatalf("unxpected len of result, got: %d, want: %d", len(got), len(expectedQueryStats))
|
||||
}
|
||||
for i, gotR := range got {
|
||||
if gotR.query != expectedQueryStats[i].query {
|
||||
t.Fatalf("unxpected query: %q at position: %d, want: %q", gotR.query, i, expectedQueryStats[i].query)
|
||||
}
|
||||
}
|
||||
}
|
||||
f(1, []queryStats{{query: "query-n-3"}})
|
||||
f(2, []queryStats{{query: "query-n-3"}, {query: "query-n-2"}})
|
||||
}
|
||||
|
||||
func TestGetTopNQueriesByAverageDuration(t *testing.T) {
|
||||
f := func(topN int, expectedQueryStats []queryStats) {
|
||||
t.Helper()
|
||||
ql := &queryStatsTracker{
|
||||
limit: 25,
|
||||
maxQueryLogRecordTime: time.Second * 5,
|
||||
}
|
||||
queriesQurations := []int{4, 25, 14, 10}
|
||||
for i, v := range queriesQurations {
|
||||
ql.insertQueryStat(fmt.Sprintf("query-n-%d", i), int64(0), time.Now(), time.Second*time.Duration(v))
|
||||
}
|
||||
|
||||
got := getTopNQueriesByAvgDuration(ql, topN)
|
||||
|
||||
if len(got) != len(expectedQueryStats) {
|
||||
t.Fatalf("unxpected len of result, got: %d, want: %d", len(got), len(expectedQueryStats))
|
||||
}
|
||||
for i, gotR := range got {
|
||||
if gotR.query != expectedQueryStats[i].query {
|
||||
t.Fatalf("unxpected query: %q at position: %d, want: %q", gotR.query, i, expectedQueryStats[i].query)
|
||||
}
|
||||
}
|
||||
}
|
||||
f(1, []queryStats{{query: "query-n-1"}})
|
||||
f(2, []queryStats{{query: "query-n-1"}, {query: "query-n-2"}})
|
||||
}
|
||||
|
||||
func TestWriteJSONQueryStats(t *testing.T) {
|
||||
qst := queryStatsTracker{
|
||||
limit: 100,
|
||||
maxQueryLogRecordTime: time.Minute * 5,
|
||||
}
|
||||
t1 := time.Now()
|
||||
qst.insertQueryStat("sum(rate(rps_total)[1m]) by(service)", 360, t1, time.Microsecond*100)
|
||||
qst.insertQueryStat("up", 360, t1, time.Microsecond)
|
||||
qst.insertQueryStat("up", 360, t1, time.Microsecond)
|
||||
qst.insertQueryStat("up", 360, t1, time.Microsecond)
|
||||
|
||||
f := func(t *testing.T, wantResp, aggregateBy string) {
|
||||
var got strings.Builder
|
||||
writeJSONQueryStats(&got, &qst, 5, aggregateBy)
|
||||
if !reflect.DeepEqual(got.String(), wantResp) {
|
||||
t.Fatalf("unexpected response, \ngot: %s,\nwant: %s", got.String(), wantResp)
|
||||
}
|
||||
}
|
||||
|
||||
t.Run("aggregateByDuration", func(t *testing.T) {
|
||||
f(t, `{"top_n": "5","stats_max_duration": "10m0s","top": [{"query": "sum(rate(rps_total)[1m]) by(service)","query_time_range": "360ms","cumalative_duration": "100µs","avg_duration": "100µs","requests_count": "1"},{"query": "up","query_time_range": "360ms","cumalative_duration": "3µs","avg_duration": "1µs","requests_count": "3"}]}`,
|
||||
"duration")
|
||||
})
|
||||
t.Run("aggregateByfrequency", func(t *testing.T) {
|
||||
f(t, `{"top_n": "5","stats_max_duration": "10m0s","top": [{"query": "up","query_time_range": "360ms","cumalative_duration": "3µs","avg_duration": "1µs","requests_count": "3"},{"query": "sum(rate(rps_total)[1m]) by(service)","query_time_range": "360ms","cumalative_duration": "100µs","avg_duration": "100µs","requests_count": "1"}]}`,
|
||||
"frequency")
|
||||
})
|
||||
t.Run("aggregateByDuration", func(t *testing.T) {
|
||||
f(t, `{"top_n": "5","stats_max_duration": "10m0s","top": [{"query": "sum(rate(rps_total)[1m]) by(service)","query_time_range": "360ms","cumalative_duration": "100µs","avg_duration": "100µs","requests_count": "1"},{"query": "up","query_time_range": "360ms","cumalative_duration": "3µs","avg_duration": "1µs","requests_count": "3"}]}`,
|
||||
"avg_duration")
|
||||
})
|
||||
|
||||
}
|
Loading…
Reference in a new issue