app/vmselect: add vm_per_query_{rows,series}_processed_count histograms

This commit is contained in:
Aliaksandr Valialkin 2019-11-23 13:22:55 +02:00
parent b1c3284fd0
commit f8298c7f13

View file

@ -7,6 +7,7 @@ import (
"runtime" "runtime"
"sort" "sort"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
@ -89,6 +90,7 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint)) error {
doneCh := make(chan error) doneCh := make(chan error)
// Start workers. // Start workers.
rowsProcessedTotal := uint64(0)
for i := 0; i < workersCount; i++ { for i := 0; i < workersCount; i++ {
go func(workerID uint) { go func(workerID uint) {
rs := getResult() rs := getResult()
@ -96,6 +98,7 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint)) error {
maxWorkersCount := gomaxprocs / workersCount maxWorkersCount := gomaxprocs / workersCount
var err error var err error
rowsProcessed := 0
for pts := range workCh { for pts := range workCh {
if time.Until(rss.deadline.Deadline) < 0 { if time.Until(rss.deadline.Deadline) < 0 {
err = fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.Timeout) err = fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.Timeout)
@ -108,8 +111,10 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint)) error {
// Skip empty blocks. // Skip empty blocks.
continue continue
} }
rowsProcessed += len(rs.Values)
f(rs, workerID) f(rs, workerID)
} }
atomic.AddUint64(&rowsProcessedTotal, uint64(rowsProcessed))
// Drain the remaining work // Drain the remaining work
for range workCh { for range workCh {
} }
@ -121,6 +126,7 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint)) error {
for i := range rss.packedTimeseries { for i := range rss.packedTimeseries {
workCh <- &rss.packedTimeseries[i] workCh <- &rss.packedTimeseries[i]
} }
seriesProcessedTotal := len(rss.packedTimeseries)
rss.packedTimeseries = rss.packedTimeseries[:0] rss.packedTimeseries = rss.packedTimeseries[:0]
close(workCh) close(workCh)
@ -131,6 +137,8 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint)) error {
errors = append(errors, err) errors = append(errors, err)
} }
} }
perQueryRowsProcessed.Update(float64(rowsProcessedTotal))
perQuerySeriesProcessed.Update(float64(seriesProcessedTotal))
if len(errors) > 0 { if len(errors) > 0 {
// Return just the first error, since other errors // Return just the first error, since other errors
// is likely duplicate the first error. // is likely duplicate the first error.
@ -139,6 +147,9 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint)) error {
return nil return nil
} }
var perQueryRowsProcessed = metrics.NewHistogram(`vm_per_query_rows_processed_count`)
var perQuerySeriesProcessed = metrics.NewHistogram(`vm_per_query_series_processed_count`)
var gomaxprocs = runtime.GOMAXPROCS(-1) var gomaxprocs = runtime.GOMAXPROCS(-1)
type packedTimeseries struct { type packedTimeseries struct {