mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
8a91eb25c4
- Add `Active queries` chapter to VMUI docs - Set `Content-Type: json` header inside promql.WriteActiveQueries() handler, in order to be consistent with other request handlers called at app/vmselect/main.go - Pass the request to promql.WriteActiveQueries() handler, so it can change its output depending on the provided request params. This also improves consistency of promql.WriteActiveQueries() args with other request hanlers at app/vmselect/main.go Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4653
90 lines
2 KiB
Go
90 lines
2 KiB
Go
package promql
|
|
|
|
import (
|
|
"fmt"
|
|
"net/http"
|
|
"sort"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// WriteActiveQueries writes active queries to w.
|
|
//
|
|
// The written active queries are sorted in descending order of their exeuction duration.
|
|
func WriteActiveQueries(w http.ResponseWriter, r *http.Request) {
|
|
aqes := activeQueriesV.GetAll()
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
sort.Slice(aqes, func(i, j int) bool {
|
|
return aqes[i].startTime.Sub(aqes[j].startTime) < 0
|
|
})
|
|
now := time.Now()
|
|
fmt.Fprintf(w, `{"status":"ok","data":[`)
|
|
for i, aqe := range aqes {
|
|
d := now.Sub(aqe.startTime)
|
|
fmt.Fprintf(w, `{"duration":"%.3fs","id":"%016X","remote_addr":%s,"query":%q,"start":%d,"end":%d,"step":%d}`,
|
|
d.Seconds(), aqe.qid, aqe.quotedRemoteAddr, aqe.q, aqe.start, aqe.end, aqe.step)
|
|
if i+1 < len(aqes) {
|
|
fmt.Fprintf(w, `,`)
|
|
}
|
|
}
|
|
fmt.Fprintf(w, `]}`)
|
|
}
|
|
|
|
var activeQueriesV = newActiveQueries()
|
|
|
|
type activeQueries struct {
|
|
mu sync.Mutex
|
|
m map[uint64]activeQueryEntry
|
|
}
|
|
|
|
type activeQueryEntry struct {
|
|
start int64
|
|
end int64
|
|
step int64
|
|
qid uint64
|
|
quotedRemoteAddr string
|
|
q string
|
|
startTime time.Time
|
|
}
|
|
|
|
func newActiveQueries() *activeQueries {
|
|
return &activeQueries{
|
|
m: make(map[uint64]activeQueryEntry),
|
|
}
|
|
}
|
|
|
|
func (aq *activeQueries) Add(ec *EvalConfig, q string) uint64 {
|
|
var aqe activeQueryEntry
|
|
aqe.start = ec.Start
|
|
aqe.end = ec.End
|
|
aqe.step = ec.Step
|
|
aqe.qid = atomic.AddUint64(&nextActiveQueryID, 1)
|
|
aqe.quotedRemoteAddr = ec.QuotedRemoteAddr
|
|
aqe.q = q
|
|
aqe.startTime = time.Now()
|
|
|
|
aq.mu.Lock()
|
|
aq.m[aqe.qid] = aqe
|
|
aq.mu.Unlock()
|
|
return aqe.qid
|
|
}
|
|
|
|
func (aq *activeQueries) Remove(qid uint64) {
|
|
aq.mu.Lock()
|
|
delete(aq.m, qid)
|
|
aq.mu.Unlock()
|
|
}
|
|
|
|
func (aq *activeQueries) GetAll() []activeQueryEntry {
|
|
aq.mu.Lock()
|
|
aqes := make([]activeQueryEntry, 0, len(aq.m))
|
|
for _, aqe := range aq.m {
|
|
aqes = append(aqes, aqe)
|
|
}
|
|
aq.mu.Unlock()
|
|
return aqes
|
|
}
|
|
|
|
var nextActiveQueryID = uint64(time.Now().UnixNano())
|