mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 15:16:42 +00:00
app/vmselect: adds traces for series update API
This commit is contained in:
parent
07394fb847
commit
f5e663c00c
1 changed files with 22 additions and 4 deletions
|
@ -60,12 +60,16 @@ type Result struct {
|
|||
// Values are sorted by Timestamps.
|
||||
Values []float64
|
||||
Timestamps []int64
|
||||
|
||||
// statistic for series updates
|
||||
updateRows int
|
||||
}
|
||||
|
||||
func (r *Result) reset() {
|
||||
r.MetricName.Reset()
|
||||
r.Values = r.Values[:0]
|
||||
r.Timestamps = r.Timestamps[:0]
|
||||
r.updateRows = 0
|
||||
}
|
||||
|
||||
// Results holds results returned from ProcessSearchQuery.
|
||||
|
@ -106,7 +110,8 @@ type timeseriesWork struct {
|
|||
f func(rs *Result, workerID uint) error
|
||||
err error
|
||||
|
||||
rowsProcessed int
|
||||
rowsProcessed int
|
||||
updateRowsProcessed int
|
||||
}
|
||||
|
||||
func (tsw *timeseriesWork) reset() {
|
||||
|
@ -116,6 +121,7 @@ func (tsw *timeseriesWork) reset() {
|
|||
tsw.f = nil
|
||||
tsw.err = nil
|
||||
tsw.rowsProcessed = 0
|
||||
tsw.updateRowsProcessed = 0
|
||||
}
|
||||
|
||||
func getTimeseriesWork() *timeseriesWork {
|
||||
|
@ -146,6 +152,7 @@ func (tsw *timeseriesWork) do(r *Result, workerID uint) error {
|
|||
atomic.StoreUint32(tsw.mustStop, 1)
|
||||
return fmt.Errorf("error during time series unpacking: %w", err)
|
||||
}
|
||||
tsw.updateRowsProcessed = r.updateRows
|
||||
tsw.rowsProcessed = len(r.Timestamps)
|
||||
if len(r.Timestamps) > 0 {
|
||||
if err := tsw.f(r, workerID); err != nil {
|
||||
|
@ -162,17 +169,23 @@ func timeseriesWorker(qt *querytracer.Tracer, workChs []chan *timeseriesWork, wo
|
|||
// Perform own work at first.
|
||||
rowsProcessed := 0
|
||||
seriesProcessed := 0
|
||||
updateGenerationsProcessed := 0
|
||||
updateRowsProcessed := 0
|
||||
ch := workChs[workerID]
|
||||
for tsw := range ch {
|
||||
tsw.err = tsw.do(&tmpResult.rs, workerID)
|
||||
rowsProcessed += tsw.rowsProcessed
|
||||
updateGenerationsProcessed += len(tsw.pts.updateAddrs)
|
||||
updateRowsProcessed += tsw.updateRowsProcessed
|
||||
seriesProcessed++
|
||||
}
|
||||
qt.Printf("own work processed: series=%d, samples=%d", seriesProcessed, rowsProcessed)
|
||||
qt.Printf("own work processed: series=%d, samples=%d, update_generations=%d, update_rows=%d", seriesProcessed, rowsProcessed, updateGenerationsProcessed, updateRowsProcessed)
|
||||
|
||||
// Then help others with the remaining work.
|
||||
rowsProcessed = 0
|
||||
seriesProcessed = 0
|
||||
updateGenerationsProcessed = 0
|
||||
updateRowsProcessed = 0
|
||||
for i := uint(1); i < uint(len(workChs)); i++ {
|
||||
idx := (i + workerID) % uint(len(workChs))
|
||||
ch := workChs[idx]
|
||||
|
@ -190,10 +203,12 @@ func timeseriesWorker(qt *querytracer.Tracer, workChs []chan *timeseriesWork, wo
|
|||
}
|
||||
tsw.err = tsw.do(&tmpResult.rs, workerID)
|
||||
rowsProcessed += tsw.rowsProcessed
|
||||
updateGenerationsProcessed += len(tsw.pts.updateAddrs)
|
||||
updateRowsProcessed += tsw.updateRowsProcessed
|
||||
seriesProcessed++
|
||||
}
|
||||
}
|
||||
qt.Printf("others work processed: series=%d, samples=%d", seriesProcessed, rowsProcessed)
|
||||
qt.Printf("others work processed: series=%d, samples=%d, update_generations=%d, update_rows=%d", seriesProcessed, rowsProcessed, updateGenerationsProcessed, updateRowsProcessed)
|
||||
|
||||
putTmpResult(tmpResult)
|
||||
}
|
||||
|
@ -598,12 +613,15 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbfs []*tmpBlocksFile, tr stora
|
|||
// apply updates
|
||||
if len(seriesUpdateSbss) > 0 {
|
||||
var updateDst Result
|
||||
updateRows := 0
|
||||
for _, seriesUpdateSbs := range seriesUpdateSbss {
|
||||
updateDst.reset()
|
||||
mergeSortBlocks(&updateDst, seriesUpdateSbs, dedupInterval)
|
||||
updateRows += len(updateDst.Timestamps)
|
||||
mergeResult(dst, &updateDst)
|
||||
putSortBlocksHeap(seriesUpdateSbs)
|
||||
}
|
||||
dst.updateRows = updateRows
|
||||
}
|
||||
putSortBlocksHeap(sbh)
|
||||
return nil
|
||||
|
@ -1883,7 +1901,7 @@ func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *st
|
|||
if err != nil {
|
||||
return nil, false, fmt.Errorf("cannot finalize temporary blocks files: %w", err)
|
||||
}
|
||||
qt.Printf("fetch unique series=%d, blocks=%d, samples=%d, bytes=%d", len(addrsByMetricName), blocksRead.GetTotal(), samples.GetTotal(), bytesTotal)
|
||||
qt.Printf("fetch unique series=%d, blocks=%d, samples=%d, bytes=%d, unique_series_with_updates=%d", len(addrsByMetricName), blocksRead.GetTotal(), samples.GetTotal(), bytesTotal, len(tbfw.seriesUpdatesByMetricName))
|
||||
|
||||
var rss Results
|
||||
rss.tr = tr
|
||||
|
|
Loading…
Reference in a new issue