VictoriaMetrics/app/vmselect/promql/active_queries.go

117 lines
2.7 KiB
Go
Raw Normal View History

package promql
import (
"fmt"
"net/http"
"sort"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
)
// ActiveQueriesHandler returns response to /api/v1/status/active_queries
//
// It writes a JSON with active queries to w.
//
// If at is nil, then all the active queries across all the tenants are written.
func ActiveQueriesHandler(at *auth.Token, w http.ResponseWriter, _ *http.Request) {
aqes := activeQueriesV.GetAll()
if at != nil {
// Filter out queries, which do not belong to at.
dst := aqes[:0]
for _, aqe := range aqes {
if aqe.accountID == at.AccountID && aqe.projectID == at.ProjectID {
dst = append(dst, aqe)
}
}
aqes = dst
}
writeActiveQueries(w, aqes)
}
func writeActiveQueries(w http.ResponseWriter, aqes []activeQueryEntry) {
w.Header().Set("Content-Type", "application/json")
sort.Slice(aqes, func(i, j int) bool {
return aqes[i].startTime.Sub(aqes[j].startTime) < 0
})
now := time.Now()
fmt.Fprintf(w, `{"status":"ok","data":[`)
for i, aqe := range aqes {
d := now.Sub(aqe.startTime)
fmt.Fprintf(w, `{"duration":"%.3fs","id":"%016X","remote_addr":%s,"account_id":"%d","project_id":"%d","query":%q,"start":%d,"end":%d,"step":%d}`,
d.Seconds(), aqe.qid, aqe.quotedRemoteAddr, aqe.accountID, aqe.projectID, aqe.q, aqe.start, aqe.end, aqe.step)
if i+1 < len(aqes) {
fmt.Fprintf(w, `,`)
}
}
fmt.Fprintf(w, `]}`)
}
var activeQueriesV = newActiveQueries()
type activeQueries struct {
mu sync.Mutex
m map[uint64]activeQueryEntry
}
type activeQueryEntry struct {
accountID uint32
projectID uint32
start int64
end int64
step int64
qid uint64
quotedRemoteAddr string
q string
startTime time.Time
}
func newActiveQueries() *activeQueries {
return &activeQueries{
m: make(map[uint64]activeQueryEntry),
}
}
func (aq *activeQueries) Add(ec *EvalConfig, q string) uint64 {
var aqe activeQueryEntry
aqe.accountID = ec.AuthToken.AccountID
aqe.projectID = ec.AuthToken.ProjectID
aqe.start = ec.Start
aqe.end = ec.End
aqe.step = ec.Step
aqe.qid = nextActiveQueryID.Add(1)
aqe.quotedRemoteAddr = ec.QuotedRemoteAddr
aqe.q = q
aqe.startTime = time.Now()
aq.mu.Lock()
aq.m[aqe.qid] = aqe
aq.mu.Unlock()
return aqe.qid
}
func (aq *activeQueries) Remove(qid uint64) {
aq.mu.Lock()
delete(aq.m, qid)
aq.mu.Unlock()
}
func (aq *activeQueries) GetAll() []activeQueryEntry {
aq.mu.Lock()
aqes := make([]activeQueryEntry, 0, len(aq.m))
for _, aqe := range aq.m {
aqes = append(aqes, aqe)
}
aq.mu.Unlock()
return aqes
}
var nextActiveQueryID = func() *atomic.Uint64 {
var x atomic.Uint64
x.Store(uint64(time.Now().UnixNano()))
return &x
}()