mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-11 15:34:56 +00:00
app/vmselect: add vm_per_query_{rows,series}_processed_count
histograms
This commit is contained in:
parent
0827bb6ce5
commit
4bb88843cf
1 changed files with 10 additions and 0 deletions
|
@ -92,6 +92,7 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint)) error {
|
|||
doneCh := make(chan error)
|
||||
|
||||
// Start workers.
|
||||
rowsProcessedTotal := uint64(0)
|
||||
for i := 0; i < workersCount; i++ {
|
||||
go func(workerID uint) {
|
||||
rs := getResult()
|
||||
|
@ -99,6 +100,7 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint)) error {
|
|||
maxWorkersCount := gomaxprocs / workersCount
|
||||
|
||||
var err error
|
||||
rowsProcessed := 0
|
||||
for pts := range workCh {
|
||||
if time.Until(rss.deadline.Deadline) < 0 {
|
||||
err = fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.Timeout)
|
||||
|
@ -111,8 +113,10 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint)) error {
|
|||
// Skip empty blocks.
|
||||
continue
|
||||
}
|
||||
rowsProcessed += len(rs.Values)
|
||||
f(rs, workerID)
|
||||
}
|
||||
atomic.AddUint64(&rowsProcessedTotal, uint64(rowsProcessed))
|
||||
// Drain the remaining work
|
||||
for range workCh {
|
||||
}
|
||||
|
@ -124,6 +128,7 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint)) error {
|
|||
for i := range rss.packedTimeseries {
|
||||
workCh <- &rss.packedTimeseries[i]
|
||||
}
|
||||
seriesProcessedTotal := len(rss.packedTimeseries)
|
||||
rss.packedTimeseries = rss.packedTimeseries[:0]
|
||||
close(workCh)
|
||||
|
||||
|
@ -134,6 +139,8 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint)) error {
|
|||
errors = append(errors, err)
|
||||
}
|
||||
}
|
||||
perQueryRowsProcessed.Update(float64(rowsProcessedTotal))
|
||||
perQuerySeriesProcessed.Update(float64(seriesProcessedTotal))
|
||||
if len(errors) > 0 {
|
||||
// Return just the first error, since other errors
|
||||
// is likely duplicate the first error.
|
||||
|
@ -142,6 +149,9 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint)) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
var perQueryRowsProcessed = metrics.NewHistogram(`vm_per_query_rows_processed_count`)
|
||||
var perQuerySeriesProcessed = metrics.NewHistogram(`vm_per_query_series_processed_count`)
|
||||
|
||||
var gomaxprocs = runtime.GOMAXPROCS(-1)
|
||||
|
||||
type packedTimeseries struct {
|
||||
|
|
Loading…
Reference in a new issue