app/vmselect/prometheus: remove stale NaN for range vector query

This commit is contained in:
hao.peng 2024-10-30 16:49:39 +08:00
parent 9ea5ceffdb
commit dd2da3bce5

View file

@ -12,6 +12,7 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/metricsql" "github.com/VictoriaMetrics/metricsql"
"github.com/valyala/fastjson/fastfloat" "github.com/valyala/fastjson/fastfloat"
@ -22,7 +23,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bufferedwriter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bufferedwriter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
@ -312,6 +312,7 @@ func ExportHandler(startTime time.Time, w http.ResponseWriter, r *http.Request)
var exportDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export"}`) var exportDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export"}`)
func exportHandler(qt *querytracer.Tracer, w http.ResponseWriter, cp *commonParams, format string, maxRowsPerLine int, reduceMemUsage bool) error { func exportHandler(qt *querytracer.Tracer, w http.ResponseWriter, cp *commonParams, format string, maxRowsPerLine int, reduceMemUsage bool) error {
var dropStale bool
bw := bufferedwriter.Get(w) bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw) defer bufferedwriter.Put(bw)
sw := newScalableWriter(bw) sw := newScalableWriter(bw)
@ -329,6 +330,7 @@ func exportHandler(qt *querytracer.Tracer, w http.ResponseWriter, cp *commonPara
return sw.maybeFlushBuffer(bb) return sw.maybeFlushBuffer(bb)
} }
} else if format == "promapi" { } else if format == "promapi" {
dropStale = true
WriteExportPromAPIHeader(bw) WriteExportPromAPIHeader(bw)
var firstLineOnce atomic.Bool var firstLineOnce atomic.Bool
var firstLineSent atomic.Bool var firstLineSent atomic.Bool
@ -403,12 +405,17 @@ func exportHandler(qt *querytracer.Tracer, w http.ResponseWriter, cp *commonPara
} }
xb := exportBlockPool.Get().(*exportBlock) xb := exportBlockPool.Get().(*exportBlock)
xb.mn = &rs.MetricName xb.mn = &rs.MetricName
for i, v := range rs.Values { if dropStale {
if decimal.IsStaleNaN(v) { for i, v := range rs.Values {
continue if decimal.IsStaleNaN(v) {
continue
}
xb.values = append(xb.values, v)
xb.timestamps = append(xb.timestamps, rs.Timestamps[i])
} }
xb.values = append(xb.values, v) } else {
xb.timestamps = append(xb.timestamps, rs.Timestamps[i]) xb.timestamps = rs.Timestamps
xb.values = rs.Values
} }
if err := writeLineFunc(xb, workerID); err != nil { if err := writeLineFunc(xb, workerID); err != nil {
return err return err