app/vmselect: export seriesFetched stat for /query responses (#3925)

The change adds a new field `seriesFetched` to EvalConfig object.
Since EvalConfig object can be copied inside `Exec`,
`seriesFetched` is a pointer which can be updated by all copied
objects.

The reason for having stats is that other components, like vmalert,
could benefit from this information.

Signed-off-by: hagen1778 <roman@victoriametrics.com>
Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
Roman Khavronenko 2023-03-27 17:51:33 +02:00 committed by Aliaksandr Valialkin
parent 3214b1c315
commit 4021aa11b5
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
6 changed files with 72 additions and 24 deletions

View file

@ -786,7 +786,8 @@ func QueryHandler(qt *querytracer.Tracer, startTime time.Time, w http.ResponseWr
qtDone := func() {
qt.Donef("query=%s, time=%d: series=%d", query, start, len(result))
}
WriteQueryResponse(bw, result, qt, qtDone)
WriteQueryResponse(bw, result, qt, qtDone, ec.SeriesFetched())
if err := bw.Flush(); err != nil {
return fmt.Errorf("cannot flush query response to remote client: %w", err)
}

View file

@ -6,7 +6,7 @@
{% stripspace %}
QueryResponse generates response for /api/v1/query.
See https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries
{% func QueryResponse(rs []netstorage.Result, qt *querytracer.Tracer, qtDone func()) %}
{% func QueryResponse(rs []netstorage.Result, qt *querytracer.Tracer, qtDone func(), seriesFetched int) %}
{
{% code seriesCount := len(rs) %}
"status":"success",
@ -28,6 +28,9 @@ See https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries
{% endfor %}
{% endif %}
]
},
"stats":{
"seriesFetched": "{%d seriesFetched %}"
}
{% code
qt.Printf("generate /api/v1/query response for series=%d", seriesCount)

View file

@ -26,7 +26,7 @@ var (
)
//line app/vmselect/prometheus/query_response.qtpl:9
func StreamQueryResponse(qw422016 *qt422016.Writer, rs []netstorage.Result, qt *querytracer.Tracer, qtDone func()) {
func StreamQueryResponse(qw422016 *qt422016.Writer, rs []netstorage.Result, qt *querytracer.Tracer, qtDone func(), seriesFetched int) {
//line app/vmselect/prometheus/query_response.qtpl:9
qw422016.N().S(`{`)
//line app/vmselect/prometheus/query_response.qtpl:11
@ -69,40 +69,44 @@ func StreamQueryResponse(qw422016 *qt422016.Writer, rs []netstorage.Result, qt *
//line app/vmselect/prometheus/query_response.qtpl:29
}
//line app/vmselect/prometheus/query_response.qtpl:29
qw422016.N().S(`]}`)
qw422016.N().S(`]},"stats":{"seriesFetched": "`)
//line app/vmselect/prometheus/query_response.qtpl:33
qw422016.N().D(seriesFetched)
//line app/vmselect/prometheus/query_response.qtpl:33
qw422016.N().S(`"}`)
//line app/vmselect/prometheus/query_response.qtpl:36
qt.Printf("generate /api/v1/query response for series=%d", seriesCount)
qtDone()
//line app/vmselect/prometheus/query_response.qtpl:36
//line app/vmselect/prometheus/query_response.qtpl:39
streamdumpQueryTrace(qw422016, qt)
//line app/vmselect/prometheus/query_response.qtpl:36
//line app/vmselect/prometheus/query_response.qtpl:39
qw422016.N().S(`}`)
//line app/vmselect/prometheus/query_response.qtpl:38
//line app/vmselect/prometheus/query_response.qtpl:41
}
//line app/vmselect/prometheus/query_response.qtpl:38
func WriteQueryResponse(qq422016 qtio422016.Writer, rs []netstorage.Result, qt *querytracer.Tracer, qtDone func()) {
//line app/vmselect/prometheus/query_response.qtpl:38
//line app/vmselect/prometheus/query_response.qtpl:41
func WriteQueryResponse(qq422016 qtio422016.Writer, rs []netstorage.Result, qt *querytracer.Tracer, qtDone func(), seriesFetched int) {
//line app/vmselect/prometheus/query_response.qtpl:41
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/query_response.qtpl:38
StreamQueryResponse(qw422016, rs, qt, qtDone)
//line app/vmselect/prometheus/query_response.qtpl:38
//line app/vmselect/prometheus/query_response.qtpl:41
StreamQueryResponse(qw422016, rs, qt, qtDone, seriesFetched)
//line app/vmselect/prometheus/query_response.qtpl:41
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/query_response.qtpl:38
//line app/vmselect/prometheus/query_response.qtpl:41
}
//line app/vmselect/prometheus/query_response.qtpl:38
func QueryResponse(rs []netstorage.Result, qt *querytracer.Tracer, qtDone func()) string {
//line app/vmselect/prometheus/query_response.qtpl:38
//line app/vmselect/prometheus/query_response.qtpl:41
func QueryResponse(rs []netstorage.Result, qt *querytracer.Tracer, qtDone func(), seriesFetched int) string {
//line app/vmselect/prometheus/query_response.qtpl:41
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/query_response.qtpl:38
WriteQueryResponse(qb422016, rs, qt, qtDone)
//line app/vmselect/prometheus/query_response.qtpl:38
//line app/vmselect/prometheus/query_response.qtpl:41
WriteQueryResponse(qb422016, rs, qt, qtDone, seriesFetched)
//line app/vmselect/prometheus/query_response.qtpl:41
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/query_response.qtpl:38
//line app/vmselect/prometheus/query_response.qtpl:41
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/query_response.qtpl:38
//line app/vmselect/prometheus/query_response.qtpl:41
return qs422016
//line app/vmselect/prometheus/query_response.qtpl:38
//line app/vmselect/prometheus/query_response.qtpl:41
}

View file

@ -134,6 +134,12 @@ type EvalConfig struct {
timestamps []int64
timestampsOnce sync.Once
// seriesFetched stores the number of time series fetched
// from the storage during the evaluation.
// It is defined as a pointer since EvalConfig can be forked
// during the evaluation but we want to keep its state.
seriesFetched *int
}
// copyEvalConfig returns src copy.
@ -151,10 +157,28 @@ func copyEvalConfig(src *EvalConfig) *EvalConfig {
ec.EnforcedTagFilterss = src.EnforcedTagFilterss
ec.GetRequestURI = src.GetRequestURI
ec.seriesFetched = src.seriesFetched
// do not copy src.timestamps - they must be generated again.
return &ec
}
func (ec *EvalConfig) addStats(series int) {
if ec.seriesFetched == nil {
ec.seriesFetched = new(int)
}
*ec.seriesFetched += series
}
// SeriesFetched returns the number of series fetched from storages
// during the evaluation.
func (ec *EvalConfig) SeriesFetched() int {
if ec.seriesFetched == nil {
return 0
}
return *ec.seriesFetched
}
func (ec *EvalConfig) validate() {
if ec.Start > ec.End {
logger.Panicf("BUG: start cannot exceed end; got %d vs %d", ec.Start, ec.End)
@ -1077,6 +1101,7 @@ func evalRollupFuncWithMetricExpr(qt *querytracer.Tracer, ec *EvalConfig, funcNa
tss := mergeTimeseries(tssCached, nil, start, ec)
return tss, nil
}
ec.addStats(rssLen)
// Verify timeseries fit available memory after the rollup.
// Take into account points from tssCached.

View file

@ -76,3 +76,18 @@ func TestValidateMaxPointsPerSeriesSuccess(t *testing.T) {
f(1659962171908, 1659966077742, 5000, 800)
f(1659962150000, 1659966070000, 10000, 393)
}
func TestEvalConfig_SeriesFetched(t *testing.T) {
ec := &EvalConfig{}
ec.addStats(1)
if ec.SeriesFetched() != 1 {
t.Fatalf("expected to get 1; got %d instead", ec.SeriesFetched())
}
ecNew := copyEvalConfig(ec)
ecNew.addStats(3)
if ec.SeriesFetched() != 4 {
t.Fatalf("expected to get 4; got %d instead", ec.SeriesFetched())
}
}