From f5e663c00ce750f864ab86f4f61fd97a92053da8 Mon Sep 17 00:00:00 2001 From: f41gh7 Date: Tue, 3 Oct 2023 14:57:03 +0200 Subject: [PATCH] app/vmselect: adds traces for series update API --- app/vmselect/netstorage/netstorage.go | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 2b6ce4542..9c45916f7 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -60,12 +60,16 @@ type Result struct { // Values are sorted by Timestamps. Values []float64 Timestamps []int64 + + // statistic for series updates + updateRows int } func (r *Result) reset() { r.MetricName.Reset() r.Values = r.Values[:0] r.Timestamps = r.Timestamps[:0] + r.updateRows = 0 } // Results holds results returned from ProcessSearchQuery. @@ -106,7 +110,8 @@ type timeseriesWork struct { f func(rs *Result, workerID uint) error err error - rowsProcessed int + rowsProcessed int + updateRowsProcessed int } func (tsw *timeseriesWork) reset() { @@ -116,6 +121,7 @@ func (tsw *timeseriesWork) reset() { tsw.f = nil tsw.err = nil tsw.rowsProcessed = 0 + tsw.updateRowsProcessed = 0 } func getTimeseriesWork() *timeseriesWork { @@ -146,6 +152,7 @@ func (tsw *timeseriesWork) do(r *Result, workerID uint) error { atomic.StoreUint32(tsw.mustStop, 1) return fmt.Errorf("error during time series unpacking: %w", err) } + tsw.updateRowsProcessed = r.updateRows tsw.rowsProcessed = len(r.Timestamps) if len(r.Timestamps) > 0 { if err := tsw.f(r, workerID); err != nil { @@ -162,17 +169,23 @@ func timeseriesWorker(qt *querytracer.Tracer, workChs []chan *timeseriesWork, wo // Perform own work at first. rowsProcessed := 0 seriesProcessed := 0 + updateGenerationsProcessed := 0 + updateRowsProcessed := 0 ch := workChs[workerID] for tsw := range ch { tsw.err = tsw.do(&tmpResult.rs, workerID) rowsProcessed += tsw.rowsProcessed + updateGenerationsProcessed += len(tsw.pts.updateAddrs) + updateRowsProcessed += tsw.updateRowsProcessed seriesProcessed++ } - qt.Printf("own work processed: series=%d, samples=%d", seriesProcessed, rowsProcessed) + qt.Printf("own work processed: series=%d, samples=%d, update_generations=%d, update_rows=%d", seriesProcessed, rowsProcessed, updateGenerationsProcessed, updateRowsProcessed) // Then help others with the remaining work. rowsProcessed = 0 seriesProcessed = 0 + updateGenerationsProcessed = 0 + updateRowsProcessed = 0 for i := uint(1); i < uint(len(workChs)); i++ { idx := (i + workerID) % uint(len(workChs)) ch := workChs[idx] @@ -190,10 +203,12 @@ func timeseriesWorker(qt *querytracer.Tracer, workChs []chan *timeseriesWork, wo } tsw.err = tsw.do(&tmpResult.rs, workerID) rowsProcessed += tsw.rowsProcessed + updateGenerationsProcessed += len(tsw.pts.updateAddrs) + updateRowsProcessed += tsw.updateRowsProcessed seriesProcessed++ } } - qt.Printf("others work processed: series=%d, samples=%d", seriesProcessed, rowsProcessed) + qt.Printf("others work processed: series=%d, samples=%d, update_generations=%d, update_rows=%d", seriesProcessed, rowsProcessed, updateGenerationsProcessed, updateRowsProcessed) putTmpResult(tmpResult) } @@ -598,12 +613,15 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbfs []*tmpBlocksFile, tr stora // apply updates if len(seriesUpdateSbss) > 0 { var updateDst Result + updateRows := 0 for _, seriesUpdateSbs := range seriesUpdateSbss { updateDst.reset() mergeSortBlocks(&updateDst, seriesUpdateSbs, dedupInterval) + updateRows += len(updateDst.Timestamps) mergeResult(dst, &updateDst) putSortBlocksHeap(seriesUpdateSbs) } + dst.updateRows = updateRows } putSortBlocksHeap(sbh) return nil @@ -1883,7 +1901,7 @@ func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *st if err != nil { return nil, false, fmt.Errorf("cannot finalize temporary blocks files: %w", err) } - qt.Printf("fetch unique series=%d, blocks=%d, samples=%d, bytes=%d", len(addrsByMetricName), blocksRead.GetTotal(), samples.GetTotal(), bytesTotal) + qt.Printf("fetch unique series=%d, blocks=%d, samples=%d, bytes=%d, unique_series_with_updates=%d", len(addrsByMetricName), blocksRead.GetTotal(), samples.GetTotal(), bytesTotal, len(tbfw.seriesUpdatesByMetricName)) var rss Results rss.tr = tr