mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
9930ce1fa9
* feat(vmselect): add support for listing current running queries and canceling specific query * fix(vmselect): change current queries' pid from int64 counter to uuid * feat(vmselect): add auth to internal operations like `/resetRollupResultCache`, `/query/list` and `/query/kill`. add flag `internalAuthKey` for these auth * fix(vmselect): add more info to current queries * review: delete some unnecessary code and use function instead of init * review: returen *queriesMap in newQueriesMap * review: delete unused var in struct queriesMap, add comments to exported functions * review: add return if error occurs * feat(vmselect): truncate query string in current running query list API since the size of query string might be large; use query string's pointer in struct `query` for the same reason; add query info API to get full access of query's info;
341 lines
7.5 KiB
Go
341 lines
7.5 KiB
Go
package promql
|
|
|
|
import (
|
|
"flag"
|
|
"fmt"
|
|
"math"
|
|
"sort"
|
|
"strconv"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
"github.com/VictoriaMetrics/metricsql"
|
|
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
var logSlowQueryDuration = flag.Duration("search.logSlowQueryDuration", 5*time.Second, "Log queries with execution time exceeding this value. Zero disables slow query logging")
|
|
|
|
type query struct {
|
|
q *string
|
|
ec *EvalConfig
|
|
stopCh chan error
|
|
startAt time.Time
|
|
}
|
|
|
|
type queriesMap struct {
|
|
mu sync.Mutex
|
|
m map[string]query
|
|
}
|
|
|
|
func newQueriesMap() *queriesMap {
|
|
var qm queriesMap
|
|
qm.m = make(map[string]query)
|
|
|
|
return &qm
|
|
}
|
|
|
|
func (qm *queriesMap) Add(q query) string {
|
|
qm.mu.Lock()
|
|
c := uuid.New().String()
|
|
qm.m[c] = q
|
|
qm.mu.Unlock()
|
|
|
|
return c
|
|
}
|
|
|
|
func (qm *queriesMap) Delete(c string) {
|
|
qm.mu.Lock()
|
|
delete(qm.m, c)
|
|
qm.mu.Unlock()
|
|
}
|
|
|
|
var runningQueries = newQueriesMap()
|
|
|
|
const truncateQueryLength = 16
|
|
|
|
// GetAllRunningQueries get all the running queries' list
|
|
func GetAllRunningQueries() map[string]map[string]string {
|
|
all := make(map[string]map[string]string)
|
|
runningQueries.mu.Lock()
|
|
for c, rq := range runningQueries.m {
|
|
m := make(map[string]string)
|
|
if len(*rq.q) > truncateQueryLength {
|
|
m["query"] = (*rq.q)[:truncateQueryLength] + "..."
|
|
} else {
|
|
m["query"] = *rq.q
|
|
}
|
|
m["cost"] = time.Since(rq.startAt).String()
|
|
all[c] = m
|
|
}
|
|
runningQueries.mu.Unlock()
|
|
|
|
return all
|
|
}
|
|
|
|
// GetQueryInfo get all the running queries' info
|
|
func GetQueryInfo(c string) (map[string]string, error) {
|
|
if rq, ok := runningQueries.m[c]; ok {
|
|
m := make(map[string]string)
|
|
m["query"] = *rq.q
|
|
m["start"] = strconv.FormatInt(rq.ec.Start, 10)
|
|
m["end"] = strconv.FormatInt(rq.ec.End, 10)
|
|
m["step"] = strconv.FormatInt(rq.ec.Step, 10)
|
|
m["cost"] = time.Since(rq.startAt).String()
|
|
|
|
return m, nil
|
|
}
|
|
return nil, fmt.Errorf("query of qid {%v} is not running", c)
|
|
}
|
|
|
|
// CancelRunningQuery cancel the given query's execution
|
|
func CancelRunningQuery(c string) error {
|
|
runningQueries.mu.Lock()
|
|
defer runningQueries.mu.Unlock()
|
|
if rq, ok := runningQueries.m[c]; ok {
|
|
rq.stopCh <- fmt.Errorf("cancel query manully")
|
|
return nil
|
|
}
|
|
return fmt.Errorf("query of qid {%v} is not running", c)
|
|
|
|
}
|
|
|
|
var slowQueries = metrics.NewCounter(`vm_slow_queries_total`)
|
|
|
|
// Exec executes q for the given ec.
|
|
func Exec(ec *EvalConfig, q string, isFirstPointOnly bool) ([]netstorage.Result, error) {
|
|
if *logSlowQueryDuration > 0 {
|
|
startTime := time.Now()
|
|
defer func() {
|
|
d := time.Since(startTime)
|
|
if d >= *logSlowQueryDuration {
|
|
logger.Infof("slow query according to -search.logSlowQueryDuration=%s: duration=%.3f seconds, start=%d, end=%d, step=%d, accountID=%d, projectID=%d, query=%q",
|
|
*logSlowQueryDuration, d.Seconds(), ec.Start/1000, ec.End/1000, ec.Step/1000, ec.AuthToken.AccountID, ec.AuthToken.ProjectID, q)
|
|
slowQueries.Inc()
|
|
}
|
|
}()
|
|
}
|
|
|
|
stopCh := make(chan error, 1)
|
|
resultCh := make(chan []netstorage.Result)
|
|
c := runningQueries.Add(query{
|
|
q: &q,
|
|
ec: ec,
|
|
startAt: time.Now(),
|
|
stopCh: stopCh,
|
|
})
|
|
defer runningQueries.Delete(c)
|
|
|
|
go exec(ec, q, isFirstPointOnly, stopCh, resultCh)
|
|
|
|
select {
|
|
case err := <-stopCh:
|
|
logger.Infof(err.Error())
|
|
return nil, err
|
|
case result := <-resultCh:
|
|
return result, nil
|
|
}
|
|
}
|
|
|
|
func exec(ec *EvalConfig, q string, isFirstPointOnly bool, stopCh chan error, resultCh chan []netstorage.Result) {
|
|
ec.validate()
|
|
|
|
e, err := parsePromQLWithCache(q)
|
|
if err != nil {
|
|
stopCh <- err
|
|
return
|
|
}
|
|
|
|
rv, err := evalExpr(ec, e)
|
|
if err != nil {
|
|
stopCh <- err
|
|
return
|
|
}
|
|
|
|
if isFirstPointOnly {
|
|
// Remove all the points except the first one from every time series.
|
|
for _, ts := range rv {
|
|
ts.Values = ts.Values[:1]
|
|
ts.Timestamps = ts.Timestamps[:1]
|
|
}
|
|
}
|
|
|
|
maySort := maySortResults(e, rv)
|
|
result, err := timeseriesToResult(rv, maySort)
|
|
if err != nil {
|
|
stopCh <- err
|
|
return
|
|
}
|
|
|
|
resultCh <- result
|
|
}
|
|
|
|
func maySortResults(e metricsql.Expr, tss []*timeseries) bool {
|
|
if len(tss) > 100 {
|
|
// There is no sense in sorting a lot of results
|
|
return false
|
|
}
|
|
fe, ok := e.(*metricsql.FuncExpr)
|
|
if !ok {
|
|
return true
|
|
}
|
|
switch fe.Name {
|
|
case "sort", "sort_desc",
|
|
"sort_by_label", "sort_by_label_desc":
|
|
return false
|
|
default:
|
|
return true
|
|
}
|
|
}
|
|
|
|
func timeseriesToResult(tss []*timeseries, maySort bool) ([]netstorage.Result, error) {
|
|
tss = removeNaNs(tss)
|
|
result := make([]netstorage.Result, len(tss))
|
|
m := make(map[string]struct{}, len(tss))
|
|
bb := bbPool.Get()
|
|
for i, ts := range tss {
|
|
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
|
|
if _, ok := m[string(bb.B)]; ok {
|
|
return nil, fmt.Errorf(`duplicate output timeseries: %s`, stringMetricName(&ts.MetricName))
|
|
}
|
|
m[string(bb.B)] = struct{}{}
|
|
|
|
rs := &result[i]
|
|
rs.MetricNameMarshaled = append(rs.MetricNameMarshaled[:0], bb.B...)
|
|
rs.MetricName.CopyFrom(&ts.MetricName)
|
|
rs.Values = append(rs.Values[:0], ts.Values...)
|
|
rs.Timestamps = append(rs.Timestamps[:0], ts.Timestamps...)
|
|
}
|
|
bbPool.Put(bb)
|
|
|
|
if maySort {
|
|
sort.Slice(result, func(i, j int) bool {
|
|
return string(result[i].MetricNameMarshaled) < string(result[j].MetricNameMarshaled)
|
|
})
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func removeNaNs(tss []*timeseries) []*timeseries {
|
|
rvs := tss[:0]
|
|
for _, ts := range tss {
|
|
allNans := true
|
|
for _, v := range ts.Values {
|
|
if !math.IsNaN(v) {
|
|
allNans = false
|
|
break
|
|
}
|
|
}
|
|
if allNans {
|
|
// Skip timeseries with all NaNs.
|
|
continue
|
|
}
|
|
rvs = append(rvs, ts)
|
|
}
|
|
for i := len(rvs); i < len(tss); i++ {
|
|
// Zero unused time series, so GC could reclaim them.
|
|
tss[i] = nil
|
|
}
|
|
return rvs
|
|
}
|
|
|
|
func parsePromQLWithCache(q string) (metricsql.Expr, error) {
|
|
pcv := parseCacheV.Get(q)
|
|
if pcv == nil {
|
|
e, err := metricsql.Parse(q)
|
|
pcv = &parseCacheValue{
|
|
e: e,
|
|
err: err,
|
|
}
|
|
parseCacheV.Put(q, pcv)
|
|
}
|
|
if pcv.err != nil {
|
|
return nil, pcv.err
|
|
}
|
|
return pcv.e, nil
|
|
}
|
|
|
|
var parseCacheV = func() *parseCache {
|
|
pc := &parseCache{
|
|
m: make(map[string]*parseCacheValue),
|
|
}
|
|
metrics.NewGauge(`vm_cache_requests_total{type="promql/parse"}`, func() float64 {
|
|
return float64(pc.Requests())
|
|
})
|
|
metrics.NewGauge(`vm_cache_misses_total{type="promql/parse"}`, func() float64 {
|
|
return float64(pc.Misses())
|
|
})
|
|
metrics.NewGauge(`vm_cache_entries{type="promql/parse"}`, func() float64 {
|
|
return float64(pc.Len())
|
|
})
|
|
return pc
|
|
}()
|
|
|
|
const parseCacheMaxLen = 10e3
|
|
|
|
type parseCacheValue struct {
|
|
e metricsql.Expr
|
|
err error
|
|
}
|
|
|
|
type parseCache struct {
|
|
// Move atomic counters to the top of struct for 8-byte alignment on 32-bit arch.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
|
|
|
|
requests uint64
|
|
misses uint64
|
|
|
|
m map[string]*parseCacheValue
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
func (pc *parseCache) Requests() uint64 {
|
|
return atomic.LoadUint64(&pc.requests)
|
|
}
|
|
|
|
func (pc *parseCache) Misses() uint64 {
|
|
return atomic.LoadUint64(&pc.misses)
|
|
}
|
|
|
|
func (pc *parseCache) Len() uint64 {
|
|
pc.mu.RLock()
|
|
n := len(pc.m)
|
|
pc.mu.RUnlock()
|
|
return uint64(n)
|
|
}
|
|
|
|
func (pc *parseCache) Get(q string) *parseCacheValue {
|
|
atomic.AddUint64(&pc.requests, 1)
|
|
|
|
pc.mu.RLock()
|
|
pcv := pc.m[q]
|
|
pc.mu.RUnlock()
|
|
|
|
if pcv == nil {
|
|
atomic.AddUint64(&pc.misses, 1)
|
|
}
|
|
return pcv
|
|
}
|
|
|
|
func (pc *parseCache) Put(q string, pcv *parseCacheValue) {
|
|
pc.mu.Lock()
|
|
overflow := len(pc.m) - parseCacheMaxLen
|
|
if overflow > 0 {
|
|
// Remove 10% of items from the cache.
|
|
overflow = int(float64(len(pc.m)) * 0.1)
|
|
for k := range pc.m {
|
|
delete(pc.m, k)
|
|
overflow--
|
|
if overflow <= 0 {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
pc.m[q] = pcv
|
|
pc.mu.Unlock()
|
|
}
|