mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 15:16:42 +00:00
app/vmselect: export seriesFetched
stat for /query responses (#3925)
The change adds a new field `seriesFetched` to EvalConfig object. Since EvalConfig object can be copied inside `Exec`, `seriesFetched` is a pointer which can be updated by all copied objects. The reason for having stats is that other components, like vmalert, could benefit from this information. Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
parent
86a98fa131
commit
10ab086366
5 changed files with 71 additions and 23 deletions
|
@ -884,7 +884,8 @@ func QueryHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Token, w
|
|||
qtDone := func() {
|
||||
qt.Donef("query=%s, time=%d: series=%d", query, start, len(result))
|
||||
}
|
||||
WriteQueryResponse(bw, ec.IsPartialResponse.Load(), result, qt, qtDone)
|
||||
|
||||
WriteQueryResponse(bw, ec.IsPartialResponse.Load(), result, qt, qtDone, ec.SeriesFetched())
|
||||
if err := bw.Flush(); err != nil {
|
||||
return fmt.Errorf("cannot flush query response to remote client: %w", err)
|
||||
}
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
{% stripspace %}
|
||||
QueryResponse generates response for /api/v1/query.
|
||||
See https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries
|
||||
{% func QueryResponse(isPartial bool, rs []netstorage.Result, qt *querytracer.Tracer, qtDone func()) %}
|
||||
{% func QueryResponse(isPartial bool, rs []netstorage.Result, qt *querytracer.Tracer, qtDone func(), seriesFetched int) %}
|
||||
{
|
||||
{% code seriesCount := len(rs) %}
|
||||
"status":"success",
|
||||
|
@ -29,6 +29,9 @@ See https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries
|
|||
{% endfor %}
|
||||
{% endif %}
|
||||
]
|
||||
},
|
||||
"stats":{
|
||||
"seriesFetched": "{%d seriesFetched %}"
|
||||
}
|
||||
{% code
|
||||
qt.Printf("generate /api/v1/query response for series=%d", seriesCount)
|
||||
|
|
|
@ -26,7 +26,7 @@ var (
|
|||
)
|
||||
|
||||
//line app/vmselect/prometheus/query_response.qtpl:9
|
||||
func StreamQueryResponse(qw422016 *qt422016.Writer, isPartial bool, rs []netstorage.Result, qt *querytracer.Tracer, qtDone func()) {
|
||||
func StreamQueryResponse(qw422016 *qt422016.Writer, isPartial bool, rs []netstorage.Result, qt *querytracer.Tracer, qtDone func(), seriesFetched int) {
|
||||
//line app/vmselect/prometheus/query_response.qtpl:9
|
||||
qw422016.N().S(`{`)
|
||||
//line app/vmselect/prometheus/query_response.qtpl:11
|
||||
|
@ -81,40 +81,44 @@ func StreamQueryResponse(qw422016 *qt422016.Writer, isPartial bool, rs []netstor
|
|||
//line app/vmselect/prometheus/query_response.qtpl:30
|
||||
}
|
||||
//line app/vmselect/prometheus/query_response.qtpl:30
|
||||
qw422016.N().S(`]}`)
|
||||
qw422016.N().S(`]},"stats":{"seriesFetched": "`)
|
||||
//line app/vmselect/prometheus/query_response.qtpl:34
|
||||
qw422016.N().D(seriesFetched)
|
||||
//line app/vmselect/prometheus/query_response.qtpl:34
|
||||
qw422016.N().S(`"}`)
|
||||
//line app/vmselect/prometheus/query_response.qtpl:37
|
||||
qt.Printf("generate /api/v1/query response for series=%d", seriesCount)
|
||||
qtDone()
|
||||
|
||||
//line app/vmselect/prometheus/query_response.qtpl:37
|
||||
//line app/vmselect/prometheus/query_response.qtpl:40
|
||||
streamdumpQueryTrace(qw422016, qt)
|
||||
//line app/vmselect/prometheus/query_response.qtpl:37
|
||||
//line app/vmselect/prometheus/query_response.qtpl:40
|
||||
qw422016.N().S(`}`)
|
||||
//line app/vmselect/prometheus/query_response.qtpl:39
|
||||
//line app/vmselect/prometheus/query_response.qtpl:42
|
||||
}
|
||||
|
||||
//line app/vmselect/prometheus/query_response.qtpl:39
|
||||
func WriteQueryResponse(qq422016 qtio422016.Writer, isPartial bool, rs []netstorage.Result, qt *querytracer.Tracer, qtDone func()) {
|
||||
//line app/vmselect/prometheus/query_response.qtpl:39
|
||||
//line app/vmselect/prometheus/query_response.qtpl:42
|
||||
func WriteQueryResponse(qq422016 qtio422016.Writer, isPartial bool, rs []netstorage.Result, qt *querytracer.Tracer, qtDone func(), seriesFetched int) {
|
||||
//line app/vmselect/prometheus/query_response.qtpl:42
|
||||
qw422016 := qt422016.AcquireWriter(qq422016)
|
||||
//line app/vmselect/prometheus/query_response.qtpl:39
|
||||
StreamQueryResponse(qw422016, isPartial, rs, qt, qtDone)
|
||||
//line app/vmselect/prometheus/query_response.qtpl:39
|
||||
//line app/vmselect/prometheus/query_response.qtpl:42
|
||||
StreamQueryResponse(qw422016, isPartial, rs, qt, qtDone, seriesFetched)
|
||||
//line app/vmselect/prometheus/query_response.qtpl:42
|
||||
qt422016.ReleaseWriter(qw422016)
|
||||
//line app/vmselect/prometheus/query_response.qtpl:39
|
||||
//line app/vmselect/prometheus/query_response.qtpl:42
|
||||
}
|
||||
|
||||
//line app/vmselect/prometheus/query_response.qtpl:39
|
||||
func QueryResponse(isPartial bool, rs []netstorage.Result, qt *querytracer.Tracer, qtDone func()) string {
|
||||
//line app/vmselect/prometheus/query_response.qtpl:39
|
||||
//line app/vmselect/prometheus/query_response.qtpl:42
|
||||
func QueryResponse(isPartial bool, rs []netstorage.Result, qt *querytracer.Tracer, qtDone func(), seriesFetched int) string {
|
||||
//line app/vmselect/prometheus/query_response.qtpl:42
|
||||
qb422016 := qt422016.AcquireByteBuffer()
|
||||
//line app/vmselect/prometheus/query_response.qtpl:39
|
||||
WriteQueryResponse(qb422016, isPartial, rs, qt, qtDone)
|
||||
//line app/vmselect/prometheus/query_response.qtpl:39
|
||||
//line app/vmselect/prometheus/query_response.qtpl:42
|
||||
WriteQueryResponse(qb422016, isPartial, rs, qt, qtDone, seriesFetched)
|
||||
//line app/vmselect/prometheus/query_response.qtpl:42
|
||||
qs422016 := string(qb422016.B)
|
||||
//line app/vmselect/prometheus/query_response.qtpl:39
|
||||
//line app/vmselect/prometheus/query_response.qtpl:42
|
||||
qt422016.ReleaseByteBuffer(qb422016)
|
||||
//line app/vmselect/prometheus/query_response.qtpl:39
|
||||
//line app/vmselect/prometheus/query_response.qtpl:42
|
||||
return qs422016
|
||||
//line app/vmselect/prometheus/query_response.qtpl:39
|
||||
//line app/vmselect/prometheus/query_response.qtpl:42
|
||||
}
|
||||
|
|
|
@ -142,6 +142,12 @@ type EvalConfig struct {
|
|||
|
||||
timestamps []int64
|
||||
timestampsOnce sync.Once
|
||||
|
||||
// seriesFetched stores the number of time series fetched
|
||||
// from the storage during the evaluation.
|
||||
// It is defined as a pointer since EvalConfig can be forked
|
||||
// during the evaluation but we want to keep its state.
|
||||
seriesFetched *int
|
||||
}
|
||||
|
||||
// copyEvalConfig returns src copy.
|
||||
|
@ -162,10 +168,28 @@ func copyEvalConfig(src *EvalConfig) *EvalConfig {
|
|||
ec.DenyPartialResponse = src.DenyPartialResponse
|
||||
ec.IsPartialResponse.Store(src.IsPartialResponse.Load())
|
||||
|
||||
ec.seriesFetched = src.seriesFetched
|
||||
|
||||
// do not copy src.timestamps - they must be generated again.
|
||||
return &ec
|
||||
}
|
||||
|
||||
func (ec *EvalConfig) addStats(series int) {
|
||||
if ec.seriesFetched == nil {
|
||||
ec.seriesFetched = new(int)
|
||||
}
|
||||
*ec.seriesFetched += series
|
||||
}
|
||||
|
||||
// SeriesFetched returns the number of series fetched from storages
|
||||
// during the evaluation.
|
||||
func (ec *EvalConfig) SeriesFetched() int {
|
||||
if ec.seriesFetched == nil {
|
||||
return 0
|
||||
}
|
||||
return *ec.seriesFetched
|
||||
}
|
||||
|
||||
func (ec *EvalConfig) updateIsPartialResponse(isPartialResponse bool) {
|
||||
ec.IsPartialResponse.CompareAndSwap(false, isPartialResponse)
|
||||
}
|
||||
|
@ -1095,6 +1119,7 @@ func evalRollupFuncWithMetricExpr(qt *querytracer.Tracer, ec *EvalConfig, funcNa
|
|||
tss := mergeTimeseries(tssCached, nil, start, ec)
|
||||
return tss, nil
|
||||
}
|
||||
ec.addStats(rssLen)
|
||||
|
||||
// Verify timeseries fit available memory after the rollup.
|
||||
// Take into account points from tssCached.
|
||||
|
|
|
@ -76,3 +76,18 @@ func TestValidateMaxPointsPerSeriesSuccess(t *testing.T) {
|
|||
f(1659962171908, 1659966077742, 5000, 800)
|
||||
f(1659962150000, 1659966070000, 10000, 393)
|
||||
}
|
||||
|
||||
func TestEvalConfig_SeriesFetched(t *testing.T) {
|
||||
ec := &EvalConfig{}
|
||||
ec.addStats(1)
|
||||
|
||||
if ec.SeriesFetched() != 1 {
|
||||
t.Fatalf("expected to get 1; got %d instead", ec.SeriesFetched())
|
||||
}
|
||||
|
||||
ecNew := copyEvalConfig(ec)
|
||||
ecNew.addStats(3)
|
||||
if ec.SeriesFetched() != 4 {
|
||||
t.Fatalf("expected to get 4; got %d instead", ec.SeriesFetched())
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue