2019-05-22 21:16:55 +00:00
package prometheus
import (
"flag"
"fmt"
"math"
2021-09-15 15:04:28 +00:00
"net"
2019-05-22 21:16:55 +00:00
"net/http"
2022-10-01 19:05:43 +00:00
"runtime"
2019-05-22 21:16:55 +00:00
"strconv"
2020-10-12 17:01:51 +00:00
"strings"
2019-08-04 20:06:55 +00:00
"sync"
2022-10-01 19:05:43 +00:00
"sync/atomic"
2019-05-22 21:16:55 +00:00
"time"
2020-09-27 20:17:14 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/bufferedwriter"
2019-05-22 21:16:55 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
2020-12-25 14:44:26 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/querystats"
2020-09-10 21:29:26 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
2019-05-22 21:23:23 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
2020-09-26 01:29:45 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
2020-05-14 19:01:51 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
2019-05-22 21:23:23 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
2020-07-31 15:00:21 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
2019-05-22 21:23:23 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2022-05-31 23:31:40 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
2019-05-22 21:16:55 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
2020-03-10 19:45:15 +00:00
"github.com/valyala/fastjson/fastfloat"
2019-05-22 21:16:55 +00:00
)
var (
2020-02-12 12:00:38 +00:00
latencyOffset = flag . Duration ( "search.latencyOffset" , time . Second * 30 , "The time when data points become visible in query results after the collection. " +
2022-12-17 00:54:57 +00:00
"It can be overridden on per-query basis via latency_offset arg. " +
2019-10-28 10:30:50 +00:00
"Too small value can result in incomplete last points for query results" )
2020-09-10 21:29:26 +00:00
maxQueryLen = flagutil . NewBytes ( "search.maxQueryLen" , 16 * 1024 , "The maximum search query length in bytes" )
2021-06-10 05:32:52 +00:00
maxLookback = flag . Duration ( "search.maxLookback" , 0 , "Synonym to -search.lookback-delta from Prometheus. " +
2020-04-20 16:25:32 +00:00
"The value is dynamically detected from interval between time series datapoints if not set. It can be overridden on per-query basis via max_lookback arg. " +
"See also '-search.maxStalenessInterval' flag, which has the same meaining due to historical reasons" )
maxStalenessInterval = flag . Duration ( "search.maxStalenessInterval" , 0 , "The maximum interval for staleness calculations. " +
2020-04-20 16:41:59 +00:00
"By default it is automatically calculated from the median interval between samples. This flag could be useful for tuning " +
2020-04-20 16:25:32 +00:00
"Prometheus data model closer to Influx-style data model. See https://prometheus.io/docs/prometheus/latest/querying/basics/#staleness for details. " +
2022-06-22 11:17:02 +00:00
"See also '-search.setLookbackToStep' flag" )
setLookbackToStep = flag . Bool ( "search.setLookbackToStep" , false , "Whether to fix lookback interval to 'step' query arg value. " +
"If set to true, the query model becomes closer to InfluxDB data model. If set to true, then -search.maxLookback and -search.maxStalenessInterval are ignored" )
2021-01-19 20:55:46 +00:00
maxStepForPointsAdjustment = flag . Duration ( "search.maxStepForPointsAdjustment" , time . Minute , "The maximum step when /api/v1/query_range handler adjusts " +
"points with timestamps closer than -search.latencyOffset to the current time. The adjustment is needed because such points may contain incomplete data" )
2022-10-01 15:26:05 +00:00
selectNodes = flagutil . NewArrayString ( "selectNode" , "Comma-separated addresses of vmselect nodes; usage: -selectNode=vmselect-host1,...,vmselect-hostN" )
2022-03-26 08:17:37 +00:00
2022-08-24 12:25:18 +00:00
maxUniqueTimeseries = flag . Int ( "search.maxUniqueTimeseries" , 300e3 , "The maximum number of unique time series, which can be selected during /api/v1/query and /api/v1/query_range queries. This option allows limiting memory usage" )
maxFederateSeries = flag . Int ( "search.maxFederateSeries" , 1e6 , "The maximum number of time series, which can be returned from /federate. This option allows limiting memory usage" )
maxExportSeries = flag . Int ( "search.maxExportSeries" , 10e6 , "The maximum number of time series, which can be returned from /api/v1/export* APIs. This option allows limiting memory usage" )
maxTSDBStatusSeries = flag . Int ( "search.maxTSDBStatusSeries" , 10e6 , "The maximum number of time series, which can be processed during the call to /api/v1/status/tsdb. This option allows limiting memory usage" )
maxSeriesLimit = flag . Int ( "search.maxSeries" , 30e3 , "The maximum number of time series, which can be returned from /api/v1/series. This option allows limiting memory usage" )
maxPointsPerTimeseries = flag . Int ( "search.maxPointsPerTimeseries" , 30e3 , "The maximum points per a single timeseries returned from /api/v1/query_range. " +
"This option doesn't limit the number of scanned raw samples in the database. The main purpose of this option is to limit the number of per-series points " +
"returned to graphing UI such as VMUI or Grafana. There is no sense in setting this limit to values bigger than the horizontal resolution of the graph" )
2019-05-22 21:16:55 +00:00
)
// Default step used if not set.
const defaultStep = 5 * 60 * 1000
2022-12-15 00:01:33 +00:00
// ExpandWithExprs handles the request to /expand-with-exprs
func ExpandWithExprs ( w http . ResponseWriter , r * http . Request ) {
query := r . FormValue ( "query" )
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
WriteExpandWithExprsResponse ( bw , query )
_ = bw . Flush ( )
}
2019-05-22 21:16:55 +00:00
// FederateHandler implements /federate . See https://prometheus.io/docs/prometheus/latest/federation/
2020-02-04 14:13:59 +00:00
func FederateHandler ( startTime time . Time , at * auth . Token , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 10:58:54 +00:00
defer federateDuration . UpdateDuration ( startTime )
2022-06-09 17:13:04 +00:00
cp , err := getCommonParams ( r , startTime , true )
2019-06-06 19:17:13 +00:00
if err != nil {
return err
}
2022-06-09 17:13:04 +00:00
lookbackDelta , err := getMaxLookback ( r )
2019-06-06 19:17:13 +00:00
if err != nil {
return err
}
2022-06-09 17:13:04 +00:00
if lookbackDelta <= 0 {
lookbackDelta = defaultStep
2019-05-22 21:16:55 +00:00
}
2022-06-09 17:13:04 +00:00
if cp . IsDefaultTimeRange ( ) {
cp . start = cp . end - lookbackDelta
2019-05-22 21:16:55 +00:00
}
2022-06-09 17:13:04 +00:00
sq := storage . NewSearchQuery ( at . AccountID , at . ProjectID , cp . start , cp . end , cp . filterss , * maxFederateSeries )
2020-11-14 10:36:21 +00:00
denyPartialResponse := searchutils . GetDenyPartialResponse ( r )
2022-07-05 21:11:59 +00:00
rss , isPartial , err := netstorage . ProcessSearchQuery ( nil , denyPartialResponse , sq , cp . deadline )
2019-05-22 21:16:55 +00:00
if err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot fetch data for %q: %w" , sq , err )
2019-05-22 21:16:55 +00:00
}
2020-11-14 10:36:21 +00:00
if isPartial {
return fmt . Errorf ( "cannot export federated metrics, because some of vmstorage nodes are unavailable" )
2019-06-29 22:27:03 +00:00
}
2019-05-22 21:16:55 +00:00
2020-11-13 08:25:39 +00:00
w . Header ( ) . Set ( "Content-Type" , "text/plain; charset=utf-8" )
2020-09-27 20:17:14 +00:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
2022-10-01 19:05:43 +00:00
sw := newScalableWriter ( bw )
2022-05-31 23:31:40 +00:00
err = rss . RunParallel ( nil , func ( rs * netstorage . Result , workerID uint ) error {
2020-09-27 20:17:14 +00:00
if err := bw . Error ( ) ; err != nil {
return err
}
2022-10-01 19:05:43 +00:00
bb := sw . getBuffer ( workerID )
2020-09-27 20:17:14 +00:00
WriteFederate ( bb , rs )
2022-10-01 19:05:43 +00:00
return sw . maybeFlushBuffer ( bb )
2020-09-27 20:17:14 +00:00
} )
2019-05-22 21:16:55 +00:00
if err != nil {
2021-09-16 09:56:58 +00:00
return fmt . Errorf ( "error during sending data to remote client: %w" , err )
2019-05-22 21:16:55 +00:00
}
2022-10-01 19:05:43 +00:00
return sw . flush ( )
2019-05-22 21:16:55 +00:00
}
var federateDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/federate"} ` )
2020-10-12 17:01:51 +00:00
// ExportCSVHandler exports data in CSV format from /api/v1/export/csv
func ExportCSVHandler ( startTime time . Time , at * auth . Token , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 10:58:54 +00:00
defer exportCSVDuration . UpdateDuration ( startTime )
2022-06-09 17:13:04 +00:00
cp , err := getExportParams ( r , startTime )
if err != nil {
return err
}
2020-10-12 17:01:51 +00:00
format := r . FormValue ( "format" )
if len ( format ) == 0 {
2021-04-20 17:16:17 +00:00
return fmt . Errorf ( "missing `format` arg; see https://docs.victoriametrics.com/#how-to-export-csv-data" )
2020-10-12 17:01:51 +00:00
}
fieldNames := strings . Split ( format , "," )
2021-12-17 08:56:03 +00:00
reduceMemUsage := searchutils . GetBool ( r , "reduce_mem_usage" )
2022-05-03 12:52:50 +00:00
2022-06-09 17:13:04 +00:00
sq := storage . NewSearchQuery ( at . AccountID , at . ProjectID , cp . start , cp . end , cp . filterss , * maxExportSeries )
2020-11-13 08:25:39 +00:00
w . Header ( ) . Set ( "Content-Type" , "text/csv; charset=utf-8" )
2020-10-12 17:01:51 +00:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
2022-10-01 19:05:43 +00:00
sw := newScalableWriter ( bw )
writeCSVLine := func ( xb * exportBlock , workerID uint ) error {
2021-12-17 08:56:03 +00:00
if len ( xb . timestamps ) == 0 {
2022-10-01 19:05:43 +00:00
return nil
2021-12-17 08:56:03 +00:00
}
2022-10-01 19:05:43 +00:00
bb := sw . getBuffer ( workerID )
2021-12-17 08:56:03 +00:00
WriteExportCSVLine ( bb , xb , fieldNames )
2022-10-01 19:05:43 +00:00
return sw . maybeFlushBuffer ( bb )
2021-12-17 08:56:03 +00:00
}
doneCh := make ( chan error , 1 )
if ! reduceMemUsage {
// Unconditionally deny partial response for the exported data,
// since users usually expect that the exported data is full.
denyPartialResponse := true
2022-07-05 21:11:59 +00:00
rss , _ , err := netstorage . ProcessSearchQuery ( nil , denyPartialResponse , sq , cp . deadline )
2021-12-17 08:56:03 +00:00
if err != nil {
return fmt . Errorf ( "cannot fetch data for %q: %w" , sq , err )
}
go func ( ) {
2022-05-31 23:31:40 +00:00
err := rss . RunParallel ( nil , func ( rs * netstorage . Result , workerID uint ) error {
2021-12-17 08:56:03 +00:00
if err := bw . Error ( ) ; err != nil {
return err
}
xb := exportBlockPool . Get ( ) . ( * exportBlock )
xb . mn = & rs . MetricName
xb . timestamps = rs . Timestamps
xb . values = rs . Values
2022-10-01 19:05:43 +00:00
if err := writeCSVLine ( xb , workerID ) ; err != nil {
return err
}
2021-12-17 08:56:03 +00:00
xb . reset ( )
exportBlockPool . Put ( xb )
return nil
} )
doneCh <- err
} ( )
} else {
go func ( ) {
2022-10-01 19:05:43 +00:00
err := netstorage . ExportBlocks ( nil , sq , cp . deadline , func ( mn * storage . MetricName , b * storage . Block , tr storage . TimeRange , workerID uint ) error {
2021-12-17 08:56:03 +00:00
if err := bw . Error ( ) ; err != nil {
return err
}
if err := b . UnmarshalData ( ) ; err != nil {
return fmt . Errorf ( "cannot unmarshal block during export: %s" , err )
}
xb := exportBlockPool . Get ( ) . ( * exportBlock )
xb . mn = mn
xb . timestamps , xb . values = b . AppendRowsWithTimeRangeFilter ( xb . timestamps [ : 0 ] , xb . values [ : 0 ] , tr )
2022-10-01 19:05:43 +00:00
if err := writeCSVLine ( xb , workerID ) ; err != nil {
return err
}
2021-12-17 08:56:03 +00:00
xb . reset ( )
exportBlockPool . Put ( xb )
return nil
} )
doneCh <- err
} ( )
}
2020-10-12 17:01:51 +00:00
err = <- doneCh
if err != nil {
2021-09-16 09:56:58 +00:00
return fmt . Errorf ( "error during sending the exported csv data to remote client: %w" , err )
2020-10-12 17:01:51 +00:00
}
2022-10-01 19:05:43 +00:00
return sw . flush ( )
2020-10-12 17:01:51 +00:00
}
var exportCSVDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/export/csv"} ` )
2020-09-26 01:29:45 +00:00
// ExportNativeHandler exports data in native format from /api/v1/export/native.
func ExportNativeHandler ( startTime time . Time , at * auth . Token , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 10:58:54 +00:00
defer exportNativeDuration . UpdateDuration ( startTime )
2022-06-09 17:13:04 +00:00
cp , err := getExportParams ( r , startTime )
2020-09-26 01:29:45 +00:00
if err != nil {
return err
}
2022-05-03 12:52:50 +00:00
2022-06-09 17:13:04 +00:00
sq := storage . NewSearchQuery ( at . AccountID , at . ProjectID , cp . start , cp . end , cp . filterss , * maxExportSeries )
2020-09-26 01:29:45 +00:00
w . Header ( ) . Set ( "Content-Type" , "VictoriaMetrics/native" )
2020-09-27 20:17:14 +00:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
2022-10-01 19:05:43 +00:00
sw := newScalableWriter ( bw )
2020-09-26 01:29:45 +00:00
// Marshal tr
trBuf := make ( [ ] byte , 0 , 16 )
2022-06-09 17:13:04 +00:00
trBuf = encoding . MarshalInt64 ( trBuf , cp . start )
trBuf = encoding . MarshalInt64 ( trBuf , cp . end )
2020-09-29 08:36:12 +00:00
_ , _ = bw . Write ( trBuf )
2020-09-26 01:29:45 +00:00
2020-09-27 20:17:14 +00:00
// Marshal native blocks.
2022-10-01 19:05:43 +00:00
err = netstorage . ExportBlocks ( nil , sq , cp . deadline , func ( mn * storage . MetricName , b * storage . Block , tr storage . TimeRange , workerID uint ) error {
2020-09-27 20:17:14 +00:00
if err := bw . Error ( ) ; err != nil {
return err
}
2022-10-01 19:05:43 +00:00
bb := sw . getBuffer ( workerID )
dst := bb . B
2020-09-26 01:29:45 +00:00
tmpBuf := bbPool . Get ( )
tmp := tmpBuf . B
// Marshal mn
2020-09-27 21:56:30 +00:00
tmp = mn . MarshalNoAccountIDProjectID ( tmp [ : 0 ] )
2020-09-26 01:29:45 +00:00
dst = encoding . MarshalUint32 ( dst , uint32 ( len ( tmp ) ) )
dst = append ( dst , tmp ... )
// Marshal b
tmp = b . MarshalPortable ( tmp [ : 0 ] )
dst = encoding . MarshalUint32 ( dst , uint32 ( len ( tmp ) ) )
dst = append ( dst , tmp ... )
tmpBuf . B = tmp
bbPool . Put ( tmpBuf )
2022-10-01 19:05:43 +00:00
bb . B = dst
return sw . maybeFlushBuffer ( bb )
2020-09-26 01:29:45 +00:00
} )
2020-09-27 20:17:14 +00:00
if err != nil {
2021-09-16 09:56:58 +00:00
return fmt . Errorf ( "error during sending native data to remote client: %w" , err )
2020-09-27 20:17:14 +00:00
}
2022-10-01 19:05:43 +00:00
return sw . flush ( )
2020-09-26 01:29:45 +00:00
}
2020-09-27 20:17:14 +00:00
var exportNativeDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/export/native"} ` )
2020-09-26 01:29:45 +00:00
var bbPool bytesutil . ByteBufferPool
2019-05-22 21:16:55 +00:00
// ExportHandler exports data in raw format from /api/v1/export.
2020-02-04 14:13:59 +00:00
func ExportHandler ( startTime time . Time , at * auth . Token , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 10:58:54 +00:00
defer exportDuration . UpdateDuration ( startTime )
2022-06-09 17:13:04 +00:00
cp , err := getExportParams ( r , startTime )
2019-06-06 19:17:13 +00:00
if err != nil {
return err
}
2019-05-22 21:16:55 +00:00
format := r . FormValue ( "format" )
2020-03-10 19:45:15 +00:00
maxRowsPerLine := int ( fastfloat . ParseInt64BestEffort ( r . FormValue ( "max_rows_per_line" ) ) )
2020-09-26 01:29:45 +00:00
reduceMemUsage := searchutils . GetBool ( r , "reduce_mem_usage" )
2022-06-09 17:13:04 +00:00
if err := exportHandler ( nil , at , w , cp , format , maxRowsPerLine , reduceMemUsage ) ; err != nil {
return fmt . Errorf ( "error when exporting data on the time range (start=%d, end=%d): %w" , cp . start , cp . end , err )
2019-05-22 21:16:55 +00:00
}
return nil
}
var exportDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/export"} ` )
2022-06-09 17:13:04 +00:00
func exportHandler ( qt * querytracer . Tracer , at * auth . Token , w http . ResponseWriter , cp * commonParams , format string , maxRowsPerLine int , reduceMemUsage bool ) error {
2022-10-01 19:05:43 +00:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
sw := newScalableWriter ( bw )
writeLineFunc := func ( xb * exportBlock , workerID uint ) error {
bb := sw . getBuffer ( workerID )
2020-09-26 01:29:45 +00:00
WriteExportJSONLine ( bb , xb )
2022-10-01 19:05:43 +00:00
return sw . maybeFlushBuffer ( bb )
2020-08-10 17:57:18 +00:00
}
2020-11-13 08:25:39 +00:00
contentType := "application/stream+json; charset=utf-8"
2020-09-26 01:29:45 +00:00
if format == "prometheus" {
2020-11-13 08:25:39 +00:00
contentType = "text/plain; charset=utf-8"
2022-10-01 19:05:43 +00:00
writeLineFunc = func ( xb * exportBlock , workerID uint ) error {
bb := sw . getBuffer ( workerID )
2020-09-26 01:29:45 +00:00
WriteExportPrometheusLine ( bb , xb )
2022-10-01 19:05:43 +00:00
return sw . maybeFlushBuffer ( bb )
2020-09-26 01:29:45 +00:00
}
} else if format == "promapi" {
2022-10-01 19:05:43 +00:00
WriteExportPromAPIHeader ( bw )
firstLineOnce := uint32 ( 0 )
firstLineSent := uint32 ( 0 )
writeLineFunc = func ( xb * exportBlock , workerID uint ) error {
bb := sw . getBuffer ( workerID )
if atomic . CompareAndSwapUint32 ( & firstLineOnce , 0 , 1 ) {
// Send the first line to sw.bw
WriteExportPromAPILine ( bb , xb )
_ , err := sw . bw . Write ( bb . B )
bb . Reset ( )
atomic . StoreUint32 ( & firstLineSent , 1 )
return err
}
for atomic . LoadUint32 ( & firstLineSent ) == 0 {
// Busy wait until the first line is sent to sw.bw
runtime . Gosched ( )
}
bb . B = append ( bb . B , ',' )
2020-09-26 01:29:45 +00:00
WriteExportPromAPILine ( bb , xb )
2022-10-01 19:05:43 +00:00
return sw . maybeFlushBuffer ( bb )
2020-09-26 01:29:45 +00:00
}
}
2020-03-10 19:45:15 +00:00
if maxRowsPerLine > 0 {
2020-09-26 01:29:45 +00:00
writeLineFuncOrig := writeLineFunc
2022-10-01 19:05:43 +00:00
writeLineFunc = func ( xb * exportBlock , workerID uint ) error {
2020-09-26 01:29:45 +00:00
valuesOrig := xb . values
timestampsOrig := xb . timestamps
2020-03-10 19:45:15 +00:00
values := valuesOrig
timestamps := timestampsOrig
for len ( values ) > 0 {
var valuesChunk [ ] float64
var timestampsChunk [ ] int64
if len ( values ) > maxRowsPerLine {
valuesChunk = values [ : maxRowsPerLine ]
timestampsChunk = timestamps [ : maxRowsPerLine ]
values = values [ maxRowsPerLine : ]
timestamps = timestamps [ maxRowsPerLine : ]
} else {
valuesChunk = values
timestampsChunk = timestamps
values = nil
timestamps = nil
}
2020-09-26 01:29:45 +00:00
xb . values = valuesChunk
xb . timestamps = timestampsChunk
2022-10-01 19:05:43 +00:00
if err := writeLineFuncOrig ( xb , workerID ) ; err != nil {
return err
}
2020-03-10 19:45:15 +00:00
}
2020-09-26 01:29:45 +00:00
xb . values = valuesOrig
xb . timestamps = timestampsOrig
2022-10-01 19:05:43 +00:00
return nil
2020-08-10 17:57:18 +00:00
}
2019-05-22 21:16:55 +00:00
}
2022-06-09 17:13:04 +00:00
sq := storage . NewSearchQuery ( at . AccountID , at . ProjectID , cp . start , cp . end , cp . filterss , * maxExportSeries )
2020-09-27 20:17:14 +00:00
w . Header ( ) . Set ( "Content-Type" , contentType )
2021-12-17 08:56:03 +00:00
doneCh := make ( chan error , 1 )
2020-09-26 01:29:45 +00:00
if ! reduceMemUsage {
2021-03-25 16:38:49 +00:00
// Unconditionally deny partial response for the exported data,
// since users usually expect that the exported data is full.
2021-01-27 12:38:26 +00:00
denyPartialResponse := true
2022-07-05 21:11:59 +00:00
rss , _ , err := netstorage . ProcessSearchQuery ( qt , denyPartialResponse , sq , cp . deadline )
2020-09-26 01:29:45 +00:00
if err != nil {
return fmt . Errorf ( "cannot fetch data for %q: %w" , sq , err )
}
2022-06-08 18:05:17 +00:00
qtChild := qt . NewChild ( "background export format=%s" , format )
2020-09-26 01:29:45 +00:00
go func ( ) {
2022-05-31 23:31:40 +00:00
err := rss . RunParallel ( qtChild , func ( rs * netstorage . Result , workerID uint ) error {
2020-09-27 20:17:14 +00:00
if err := bw . Error ( ) ; err != nil {
return err
}
2020-09-26 01:29:45 +00:00
xb := exportBlockPool . Get ( ) . ( * exportBlock )
xb . mn = & rs . MetricName
xb . timestamps = rs . Timestamps
xb . values = rs . Values
2022-10-01 19:05:43 +00:00
if err := writeLineFunc ( xb , workerID ) ; err != nil {
return err
}
2020-09-26 01:29:45 +00:00
xb . reset ( )
exportBlockPool . Put ( xb )
2020-09-27 20:17:14 +00:00
return nil
2020-09-26 01:29:45 +00:00
} )
2022-06-08 18:05:17 +00:00
qtChild . Done ( )
2020-09-26 01:29:45 +00:00
doneCh <- err
} ( )
} else {
2022-06-08 18:05:17 +00:00
qtChild := qt . NewChild ( "background export format=%s" , format )
2020-09-26 01:29:45 +00:00
go func ( ) {
2022-10-01 19:05:43 +00:00
err := netstorage . ExportBlocks ( qtChild , sq , cp . deadline , func ( mn * storage . MetricName , b * storage . Block , tr storage . TimeRange , workerID uint ) error {
2020-09-27 20:17:14 +00:00
if err := bw . Error ( ) ; err != nil {
return err
}
2020-09-26 01:29:45 +00:00
if err := b . UnmarshalData ( ) ; err != nil {
return fmt . Errorf ( "cannot unmarshal block during export: %s" , err )
}
xb := exportBlockPool . Get ( ) . ( * exportBlock )
xb . mn = mn
xb . timestamps , xb . values = b . AppendRowsWithTimeRangeFilter ( xb . timestamps [ : 0 ] , xb . values [ : 0 ] , tr )
if len ( xb . timestamps ) > 0 {
2022-10-01 19:05:43 +00:00
if err := writeLineFunc ( xb , workerID ) ; err != nil {
return err
}
2020-09-26 01:29:45 +00:00
}
xb . reset ( )
exportBlockPool . Put ( xb )
return nil
} )
2022-06-08 18:05:17 +00:00
qtChild . Done ( )
2020-09-26 01:29:45 +00:00
doneCh <- err
} ( )
}
2022-05-03 12:52:50 +00:00
err := <- doneCh
2019-05-22 21:16:55 +00:00
if err != nil {
2022-10-01 19:05:43 +00:00
return fmt . Errorf ( "cannot send data to remote client: %w" , err )
}
if err := sw . flush ( ) ; err != nil {
return fmt . Errorf ( "cannot send data to remote client: %w" , err )
}
if format == "promapi" {
WriteExportPromAPIFooter ( bw , qt )
}
2022-12-15 00:01:33 +00:00
return bw . Flush ( )
2019-05-22 21:16:55 +00:00
}
2020-09-26 01:29:45 +00:00
type exportBlock struct {
mn * storage . MetricName
timestamps [ ] int64
values [ ] float64
}
func ( xb * exportBlock ) reset ( ) {
xb . mn = nil
xb . timestamps = xb . timestamps [ : 0 ]
xb . values = xb . values [ : 0 ]
}
var exportBlockPool = & sync . Pool {
New : func ( ) interface { } {
return & exportBlock { }
} ,
}
2019-05-22 21:16:55 +00:00
// DeleteHandler processes /api/v1/admin/tsdb/delete_series prometheus API request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#delete-series
2020-02-04 14:13:59 +00:00
func DeleteHandler ( startTime time . Time , at * auth . Token , r * http . Request ) error {
2021-08-19 10:58:54 +00:00
defer deleteDuration . UpdateDuration ( startTime )
2022-06-09 17:13:04 +00:00
cp , err := getCommonParams ( r , startTime , true )
2019-05-22 21:16:55 +00:00
if err != nil {
return err
}
2022-06-09 17:13:04 +00:00
if ! cp . IsDefaultTimeRange ( ) {
return fmt . Errorf ( "start=%d and end=%d args aren't supported. Remove these args from the query in order to delete all the matching metrics" , cp . start , cp . end )
}
sq := storage . NewSearchQuery ( at . AccountID , at . ProjectID , cp . start , cp . end , cp . filterss , 0 )
2022-07-05 21:11:59 +00:00
deletedCount , err := netstorage . DeleteSeries ( nil , sq , cp . deadline )
2019-05-22 21:16:55 +00:00
if err != nil {
2021-02-01 15:42:35 +00:00
return fmt . Errorf ( "cannot delete time series: %w" , err )
2019-05-22 21:16:55 +00:00
}
if deletedCount > 0 {
2019-05-22 21:23:23 +00:00
// Reset rollup result cache on all the vmselect nodes,
// since the cache may contain deleted data.
// TODO: reset only cache for (account, project)
resetRollupResultCaches ( )
2019-05-22 21:16:55 +00:00
}
return nil
}
var deleteDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/admin/tsdb/delete_series"} ` )
2019-05-22 21:23:23 +00:00
func resetRollupResultCaches ( ) {
2020-12-24 07:01:40 +00:00
resetRollupResultCacheCalls . Inc ( )
// Reset local cache before checking whether selectNodes list is empty.
// This guarantees that at least local cache is reset if selectNodes list is empty.
promql . ResetRollupResultCache ( )
2019-06-18 07:26:44 +00:00
if len ( * selectNodes ) == 0 {
2020-12-24 07:01:40 +00:00
logger . Warnf ( "missing -selectNode flag, cache reset request wont be propagated to the other vmselect nodes." +
"This can be fixed by enumerating all the vmselect node addresses in `-selectNode` command line flag. " +
" For example: -selectNode=select-addr-1:8481,select-addr-2:8481" )
return
2019-05-22 21:23:23 +00:00
}
2019-06-18 07:26:44 +00:00
for _ , selectNode := range * selectNodes {
2021-09-15 15:04:28 +00:00
if _ , _ , err := net . SplitHostPort ( selectNode ) ; err != nil {
// Add missing port
selectNode += ":8481"
}
2019-05-22 21:23:23 +00:00
callURL := fmt . Sprintf ( "http://%s/internal/resetRollupResultCache" , selectNode )
resp , err := httpClient . Get ( callURL )
if err != nil {
logger . Errorf ( "error when accessing %q: %s" , callURL , err )
resetRollupResultCacheErrors . Inc ( )
continue
}
if resp . StatusCode != http . StatusOK {
_ = resp . Body . Close ( )
logger . Errorf ( "unexpected status code at %q; got %d; want %d" , callURL , resp . StatusCode , http . StatusOK )
resetRollupResultCacheErrors . Inc ( )
continue
}
_ = resp . Body . Close ( )
}
}
var (
resetRollupResultCacheErrors = metrics . NewCounter ( "vm_reset_rollup_result_cache_errors_total" )
resetRollupResultCacheCalls = metrics . NewCounter ( "vm_reset_rollup_result_cache_calls_total" )
)
var httpClient = & http . Client {
Timeout : time . Second * 5 ,
}
2022-11-25 18:32:45 +00:00
// Tenants processes /admin/tenants request.
func Tenants ( qt * querytracer . Tracer , startTime time . Time , w http . ResponseWriter , r * http . Request ) error {
deadline := searchutils . GetDeadlineForStatusRequest ( r , startTime )
start , err := searchutils . GetTime ( r , "start" , 0 )
if err != nil {
return err
}
ct := startTime . UnixNano ( ) / 1e6
end , err := searchutils . GetTime ( r , "end" , ct )
if err != nil {
return err
}
tr := storage . TimeRange {
MinTimestamp : start ,
MaxTimestamp : end ,
}
tenants , err := netstorage . Tenants ( qt , tr , deadline )
if err != nil {
return err
}
w . Header ( ) . Set ( "Content-Type" , "application/json" )
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
WriteTenantsResponse ( bw , tenants , qt )
if err := bw . Flush ( ) ; err != nil {
return fmt . Errorf ( "canot flush label values to remote client: %w" , err )
}
return nil
}
2019-05-22 21:16:55 +00:00
// LabelValuesHandler processes /api/v1/label/<labelName>/values request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#querying-label-values
2022-05-31 23:31:40 +00:00
func LabelValuesHandler ( qt * querytracer . Tracer , startTime time . Time , at * auth . Token , labelName string , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 10:58:54 +00:00
defer labelValuesDuration . UpdateDuration ( startTime )
2022-09-05 08:53:15 +00:00
cp , err := getCommonParamsWithDefaultDuration ( r , startTime , false )
2021-02-01 15:42:35 +00:00
if err != nil {
return err
}
2022-06-10 06:50:30 +00:00
limit , err := searchutils . GetInt ( r , "limit" )
if err != nil {
return err
}
2020-11-14 10:36:21 +00:00
denyPartialResponse := searchutils . GetDenyPartialResponse ( r )
2022-06-12 01:32:13 +00:00
sq := storage . NewSearchQuery ( at . AccountID , at . ProjectID , cp . start , cp . end , cp . filterss , * maxUniqueTimeseries )
2022-07-05 21:11:59 +00:00
labelValues , isPartial , err := netstorage . LabelValues ( qt , denyPartialResponse , labelName , sq , limit , cp . deadline )
2022-06-12 01:32:13 +00:00
if err != nil {
return fmt . Errorf ( "cannot obtain values for label %q: %w" , labelName , err )
2019-05-22 21:16:55 +00:00
}
2021-11-09 16:03:50 +00:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2020-09-27 20:17:14 +00:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
2022-06-08 18:05:17 +00:00
WriteLabelValuesResponse ( bw , isPartial , labelValues , qt )
2020-09-27 20:17:14 +00:00
if err := bw . Flush ( ) ; err != nil {
2021-09-16 09:56:58 +00:00
return fmt . Errorf ( "canot flush label values to remote client: %w" , err )
2020-09-27 20:17:14 +00:00
}
2019-05-22 21:16:55 +00:00
return nil
}
var labelValuesDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/label/ { }/values"} ` )
2020-04-22 16:57:36 +00:00
const secsPerDay = 3600 * 24
// TSDBStatusHandler processes /api/v1/status/tsdb request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
2021-05-12 12:18:45 +00:00
//
// It can accept `match[]` filters in order to narrow down the search.
2022-06-09 16:46:26 +00:00
func TSDBStatusHandler ( qt * querytracer . Tracer , startTime time . Time , at * auth . Token , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 10:58:54 +00:00
defer tsdbStatusDuration . UpdateDuration ( startTime )
2022-06-09 17:13:04 +00:00
cp , err := getCommonParams ( r , startTime , false )
2021-05-12 12:18:45 +00:00
if err != nil {
return err
}
2022-06-09 17:13:04 +00:00
cp . deadline = searchutils . GetDeadlineForStatusRequest ( r , startTime )
2021-05-12 12:18:45 +00:00
2020-05-14 19:01:51 +00:00
date := fasttime . UnixDate ( )
2020-04-22 16:57:36 +00:00
dateStr := r . FormValue ( "date" )
if len ( dateStr ) > 0 {
2022-06-14 14:46:16 +00:00
if dateStr == "0" {
date = 0
} else {
t , err := time . Parse ( "2006-01-02" , dateStr )
if err != nil {
return fmt . Errorf ( "cannot parse `date` arg %q: %w" , dateStr , err )
}
date = uint64 ( t . Unix ( ) ) / secsPerDay
2020-04-22 16:57:36 +00:00
}
}
2022-06-14 14:46:16 +00:00
focusLabel := r . FormValue ( "focusLabel" )
2020-04-22 16:57:36 +00:00
topN := 10
topNStr := r . FormValue ( "topN" )
if len ( topNStr ) > 0 {
n , err := strconv . Atoi ( topNStr )
if err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot parse `topN` arg %q: %w" , topNStr , err )
2020-04-22 16:57:36 +00:00
}
if n <= 0 {
n = 1
}
if n > 1000 {
n = 1000
}
topN = n
}
2020-11-14 10:36:21 +00:00
denyPartialResponse := searchutils . GetDenyPartialResponse ( r )
2022-06-14 14:46:16 +00:00
start := int64 ( date * secsPerDay ) * 1000
end := int64 ( ( date + 1 ) * secsPerDay ) * 1000 - 1
sq := storage . NewSearchQuery ( at . AccountID , at . ProjectID , start , end , cp . filterss , * maxTSDBStatusSeries )
2022-07-05 21:11:59 +00:00
status , isPartial , err := netstorage . TSDBStatus ( qt , denyPartialResponse , sq , focusLabel , topN , cp . deadline )
2022-06-14 14:46:16 +00:00
if err != nil {
return fmt . Errorf ( "cannot obtain tsdb stats: %w" , err )
2020-04-22 16:57:36 +00:00
}
2020-11-14 10:36:21 +00:00
2021-11-09 16:03:50 +00:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2020-09-27 20:17:14 +00:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
2022-06-09 16:46:26 +00:00
WriteTSDBStatusResponse ( bw , isPartial , status , qt )
2020-09-27 20:17:14 +00:00
if err := bw . Flush ( ) ; err != nil {
2021-09-16 09:56:58 +00:00
return fmt . Errorf ( "cannot send tsdb status response to remote client: %w" , err )
2020-09-27 20:17:14 +00:00
}
2020-04-22 16:57:36 +00:00
return nil
}
var tsdbStatusDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/status/tsdb"} ` )
2019-05-22 21:16:55 +00:00
// LabelsHandler processes /api/v1/labels request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#getting-label-names
2022-05-31 23:31:40 +00:00
func LabelsHandler ( qt * querytracer . Tracer , startTime time . Time , at * auth . Token , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 10:58:54 +00:00
defer labelsDuration . UpdateDuration ( startTime )
2022-09-05 08:53:15 +00:00
cp , err := getCommonParamsWithDefaultDuration ( r , startTime , false )
2021-02-01 15:42:35 +00:00
if err != nil {
return err
}
2022-06-10 06:50:30 +00:00
limit , err := searchutils . GetInt ( r , "limit" )
if err != nil {
return err
}
2020-11-14 10:36:21 +00:00
denyPartialResponse := searchutils . GetDenyPartialResponse ( r )
2022-06-12 01:32:13 +00:00
sq := storage . NewSearchQuery ( at . AccountID , at . ProjectID , cp . start , cp . end , cp . filterss , * maxUniqueTimeseries )
2022-07-05 21:11:59 +00:00
labels , isPartial , err := netstorage . LabelNames ( qt , denyPartialResponse , sq , limit , cp . deadline )
2022-06-12 01:32:13 +00:00
if err != nil {
return fmt . Errorf ( "cannot obtain labels: %w" , err )
2019-05-22 21:16:55 +00:00
}
2021-11-09 16:03:50 +00:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2020-09-27 20:17:14 +00:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
2022-06-08 18:05:17 +00:00
WriteLabelsResponse ( bw , isPartial , labels , qt )
2020-09-27 20:17:14 +00:00
if err := bw . Flush ( ) ; err != nil {
2021-09-16 09:56:58 +00:00
return fmt . Errorf ( "cannot send labels response to remote client: %w" , err )
2020-09-27 20:17:14 +00:00
}
2019-05-22 21:16:55 +00:00
return nil
}
var labelsDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/labels"} ` )
// SeriesCountHandler processes /api/v1/series/count request.
2020-02-04 14:13:59 +00:00
func SeriesCountHandler ( startTime time . Time , at * auth . Token , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 10:58:54 +00:00
defer seriesCountDuration . UpdateDuration ( startTime )
2021-03-30 18:38:59 +00:00
deadline := searchutils . GetDeadlineForStatusRequest ( r , startTime )
2020-11-14 10:36:21 +00:00
denyPartialResponse := searchutils . GetDenyPartialResponse ( r )
2022-07-05 21:11:59 +00:00
n , isPartial , err := netstorage . SeriesCount ( nil , at . AccountID , at . ProjectID , denyPartialResponse , deadline )
2019-05-22 21:16:55 +00:00
if err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot obtain series count: %w" , err )
2019-05-22 21:16:55 +00:00
}
2020-11-14 10:36:21 +00:00
2021-11-09 16:03:50 +00:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2020-09-27 20:17:14 +00:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
2020-11-14 10:36:21 +00:00
WriteSeriesCountResponse ( bw , isPartial , n )
2020-09-27 20:17:14 +00:00
if err := bw . Flush ( ) ; err != nil {
2021-09-16 09:56:58 +00:00
return fmt . Errorf ( "cannot send series count response to remote client: %w" , err )
2020-09-27 20:17:14 +00:00
}
2019-05-22 21:16:55 +00:00
return nil
}
var seriesCountDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/series/count"} ` )
// SeriesHandler processes /api/v1/series request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers
2022-05-31 23:31:40 +00:00
func SeriesHandler ( qt * querytracer . Tracer , startTime time . Time , at * auth . Token , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 10:58:54 +00:00
defer seriesDuration . UpdateDuration ( startTime )
2020-09-10 21:29:26 +00:00
// Do not set start to searchutils.minTimeMsecs by default as Prometheus does,
2019-08-04 16:42:36 +00:00
// since this leads to fetching and scanning all the data from the storage,
// which can take a lot of time for big storages.
// It is better setting start as end-defaultStep by default.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/91
2022-09-05 08:53:15 +00:00
cp , err := getCommonParamsWithDefaultDuration ( r , startTime , true )
if err != nil {
return err
2019-06-06 19:17:13 +00:00
}
2022-07-11 17:18:30 +00:00
limit , err := searchutils . GetInt ( r , "limit" )
if err != nil {
return err
}
2022-08-30 09:34:23 +00:00
minLimit := * maxSeriesLimit
if limit > 0 && limit < * maxSeriesLimit {
minLimit = limit
}
sq := storage . NewSearchQuery ( at . AccountID , at . ProjectID , cp . start , cp . end , cp . filterss , minLimit )
2020-11-14 10:36:21 +00:00
denyPartialResponse := searchutils . GetDenyPartialResponse ( r )
2022-07-05 21:11:59 +00:00
metricNames , isPartial , err := netstorage . SearchMetricNames ( qt , denyPartialResponse , sq , cp . deadline )
2019-05-22 21:16:55 +00:00
if err != nil {
2022-06-28 09:55:20 +00:00
return fmt . Errorf ( "cannot fetch time series for %q: %w" , sq , err )
2019-05-22 21:16:55 +00:00
}
2021-11-09 16:03:50 +00:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2020-09-27 20:17:14 +00:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
2022-07-11 17:18:30 +00:00
if limit > 0 && limit < len ( metricNames ) {
metricNames = metricNames [ : limit ]
}
2022-06-28 09:55:20 +00:00
qtDone := func ( ) {
qt . Donef ( "start=%d, end=%d" , cp . start , cp . end )
2019-05-22 21:16:55 +00:00
}
2022-06-28 14:36:27 +00:00
WriteSeriesResponse ( bw , isPartial , metricNames , qt , qtDone )
2022-06-28 09:55:20 +00:00
if err := bw . Flush ( ) ; err != nil {
return err
2019-05-22 21:16:55 +00:00
}
return nil
}
var seriesDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/series"} ` )
// QueryHandler processes /api/v1/query request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries
2022-05-31 23:31:40 +00:00
func QueryHandler ( qt * querytracer . Tracer , startTime time . Time , at * auth . Token , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 10:58:54 +00:00
defer queryDuration . UpdateDuration ( startTime )
2020-07-21 15:34:59 +00:00
ct := startTime . UnixNano ( ) / 1e6
2022-06-09 10:09:15 +00:00
deadline := searchutils . GetDeadlineForQuery ( r , startTime )
2022-05-31 23:31:40 +00:00
mayCache := ! searchutils . GetBool ( r , "nocache" )
2019-05-22 21:16:55 +00:00
query := r . FormValue ( "query" )
2019-06-20 11:05:07 +00:00
if len ( query ) == 0 {
return fmt . Errorf ( "missing `query` arg" )
}
2020-09-10 21:29:26 +00:00
start , err := searchutils . GetTime ( r , "time" , ct )
2019-06-06 19:17:13 +00:00
if err != nil {
return err
}
2020-04-17 09:24:10 +00:00
lookbackDelta , err := getMaxLookback ( r )
2019-06-06 19:17:13 +00:00
if err != nil {
return err
}
2020-09-10 21:29:26 +00:00
step , err := searchutils . GetDuration ( r , "step" , lookbackDelta )
2019-10-15 16:12:27 +00:00
if err != nil {
return err
}
2020-04-17 09:24:10 +00:00
if step <= 0 {
step = defaultStep
}
2019-05-22 21:16:55 +00:00
2022-12-15 03:26:24 +00:00
if len ( query ) > maxQueryLen . IntN ( ) {
2020-08-16 14:05:52 +00:00
return fmt . Errorf ( "too long query; got %d bytes; mustn't exceed `-search.maxQueryLen=%d` bytes" , len ( query ) , maxQueryLen . N )
2019-05-22 21:16:55 +00:00
}
2021-12-06 15:07:06 +00:00
etfs , err := searchutils . GetExtraTagFilters ( r )
2021-02-01 15:42:35 +00:00
if err != nil {
return err
}
2021-07-12 14:16:38 +00:00
if childQuery , windowExpr , offsetExpr := promql . IsMetricSelectorWithRollup ( query ) ; childQuery != "" {
window := windowExpr . Duration ( step )
offset := offsetExpr . Duration ( step )
2019-05-22 21:16:55 +00:00
start -= offset
end := start
start = end - window
2021-03-12 10:16:50 +00:00
// Do not include data point with a timestamp matching the lower boundary of the window as Prometheus does.
start ++
if end < start {
end = start
}
2022-05-03 12:52:50 +00:00
tagFilterss , err := getTagFilterssFromMatches ( [ ] string { childQuery } )
if err != nil {
return err
}
filterss := searchutils . JoinTagFilterss ( tagFilterss , etfs )
2022-06-09 17:13:04 +00:00
cp := & commonParams {
2022-05-03 12:52:50 +00:00
deadline : deadline ,
start : start ,
end : end ,
filterss : filterss ,
}
2022-06-09 17:13:04 +00:00
if err := exportHandler ( qt , at , w , cp , "promapi" , 0 , false ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "error when exporting data for query=%q on the time range (start=%d, end=%d): %w" , childQuery , start , end , err )
2019-05-22 21:16:55 +00:00
}
return nil
}
2021-07-12 14:16:38 +00:00
if childQuery , windowExpr , stepExpr , offsetExpr := promql . IsRollup ( query ) ; childQuery != "" {
newStep := stepExpr . Duration ( step )
2019-12-10 22:42:44 +00:00
if newStep > 0 {
step = newStep
}
2021-07-12 14:16:38 +00:00
window := windowExpr . Duration ( step )
offset := offsetExpr . Duration ( step )
2019-12-10 22:42:44 +00:00
start -= offset
end := start
start = end - window
2022-05-31 23:31:40 +00:00
if err := queryRangeHandler ( qt , startTime , at , w , childQuery , start , end , step , r , ct , etfs ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "error when executing query=%q on the time range (start=%d, end=%d, step=%d): %w" , childQuery , start , end , step , err )
2019-12-10 22:42:44 +00:00
}
return nil
}
2019-05-22 21:16:55 +00:00
2022-12-17 01:12:26 +00:00
queryOffset , err := getLatencyOffsetMilliseconds ( r )
if err != nil {
return err
}
2020-09-23 09:58:46 +00:00
if ! searchutils . GetBool ( r , "nocache" ) && ct - start < queryOffset && start - ct < queryOffset {
// Adjust start time only if `nocache` arg isn't set.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/241
startPrev := start
start = ct - queryOffset
queryOffset = startPrev - start
} else {
queryOffset = 0
}
2023-03-27 22:11:40 +00:00
qs := & promql . QueryStats { }
ec := & promql . EvalConfig {
2021-12-06 15:07:06 +00:00
AuthToken : at ,
Start : start ,
End : start ,
Step : step ,
2022-08-24 12:25:18 +00:00
MaxPointsPerSeries : * maxPointsPerTimeseries ,
2022-03-26 08:17:37 +00:00
MaxSeries : * maxUniqueTimeseries ,
2021-12-06 15:07:06 +00:00
QuotedRemoteAddr : httpserver . GetQuotedRemoteAddr ( r ) ,
Deadline : deadline ,
2022-05-31 23:31:40 +00:00
MayCache : mayCache ,
2021-12-06 15:07:06 +00:00
LookbackDelta : lookbackDelta ,
RoundDigits : getRoundDigits ( r ) ,
EnforcedTagFilterss : etfs ,
2023-02-24 02:40:31 +00:00
GetRequestURI : func ( ) string {
return httpserver . GetRequestURI ( r )
} ,
2019-06-29 22:27:03 +00:00
2020-09-10 21:29:26 +00:00
DenyPartialResponse : searchutils . GetDenyPartialResponse ( r ) ,
2023-03-27 22:11:40 +00:00
QueryStats : qs ,
2019-05-22 21:16:55 +00:00
}
2023-03-27 22:11:40 +00:00
result , err := promql . Exec ( qt , ec , query , true )
2019-05-22 21:16:55 +00:00
if err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "error when executing query=%q for (time=%d, step=%d): %w" , query , start , step , err )
2019-05-22 21:16:55 +00:00
}
2020-09-23 09:58:46 +00:00
if queryOffset > 0 {
for i := range result {
2023-01-11 08:11:07 +00:00
r := & result [ i ]
// Do not modify r.Timestamps, since they may be shared among multiple series.
// Make a copy instead.
timestamps := append ( [ ] int64 { } , r . Timestamps ... )
2020-09-23 09:58:46 +00:00
for j := range timestamps {
timestamps [ j ] += queryOffset
}
2023-01-11 08:11:07 +00:00
r . Timestamps = timestamps
2020-09-23 09:58:46 +00:00
}
}
2019-05-22 21:16:55 +00:00
2021-11-09 16:03:50 +00:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2020-09-27 20:17:14 +00:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
2022-05-31 23:31:40 +00:00
qtDone := func ( ) {
2022-06-08 18:05:17 +00:00
qt . Donef ( "query=%s, time=%d: series=%d" , query , start , len ( result ) )
2022-05-31 23:31:40 +00:00
}
2023-03-27 15:51:33 +00:00
2023-03-27 22:11:40 +00:00
WriteQueryResponse ( bw , ec . IsPartialResponse . Load ( ) , result , qt , qtDone , qs )
2020-09-27 20:17:14 +00:00
if err := bw . Flush ( ) ; err != nil {
2021-09-16 09:56:58 +00:00
return fmt . Errorf ( "cannot flush query response to remote client: %w" , err )
2020-09-27 20:17:14 +00:00
}
2019-05-22 21:16:55 +00:00
return nil
}
var queryDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/query"} ` )
// QueryRangeHandler processes /api/v1/query_range request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries
2022-05-31 23:31:40 +00:00
func QueryRangeHandler ( qt * querytracer . Tracer , startTime time . Time , at * auth . Token , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 10:58:54 +00:00
defer queryRangeDuration . UpdateDuration ( startTime )
2020-07-21 15:34:59 +00:00
ct := startTime . UnixNano ( ) / 1e6
2019-05-22 21:16:55 +00:00
query := r . FormValue ( "query" )
2019-06-20 11:05:07 +00:00
if len ( query ) == 0 {
return fmt . Errorf ( "missing `query` arg" )
}
2020-09-10 21:29:26 +00:00
start , err := searchutils . GetTime ( r , "start" , ct - defaultStep )
2019-06-06 19:17:13 +00:00
if err != nil {
return err
}
2020-09-10 21:29:26 +00:00
end , err := searchutils . GetTime ( r , "end" , ct )
2019-06-06 19:17:13 +00:00
if err != nil {
return err
}
2020-09-10 21:29:26 +00:00
step , err := searchutils . GetDuration ( r , "step" , defaultStep )
2019-06-06 19:17:13 +00:00
if err != nil {
return err
}
2021-12-06 15:07:06 +00:00
etfs , err := searchutils . GetExtraTagFilters ( r )
2021-02-01 15:42:35 +00:00
if err != nil {
return err
}
2022-05-31 23:31:40 +00:00
if err := queryRangeHandler ( qt , startTime , at , w , query , start , end , step , r , ct , etfs ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "error when executing query=%q on the time range (start=%d, end=%d, step=%d): %w" , query , start , end , step , err )
2019-12-10 22:42:44 +00:00
}
return nil
}
2022-05-31 23:31:40 +00:00
func queryRangeHandler ( qt * querytracer . Tracer , startTime time . Time , at * auth . Token , w http . ResponseWriter , query string ,
start , end , step int64 , r * http . Request , ct int64 , etfs [ ] [ ] storage . TagFilter ) error {
2020-09-10 21:29:26 +00:00
deadline := searchutils . GetDeadlineForQuery ( r , startTime )
mayCache := ! searchutils . GetBool ( r , "nocache" )
2019-10-15 16:12:27 +00:00
lookbackDelta , err := getMaxLookback ( r )
if err != nil {
return err
}
2019-05-22 21:16:55 +00:00
// Validate input args.
2022-12-15 03:26:24 +00:00
if len ( query ) > maxQueryLen . IntN ( ) {
2020-08-16 14:05:52 +00:00
return fmt . Errorf ( "too long query; got %d bytes; mustn't exceed `-search.maxQueryLen=%d` bytes" , len ( query ) , maxQueryLen . N )
2019-05-22 21:16:55 +00:00
}
if start > end {
2019-11-22 14:10:33 +00:00
end = start + defaultStep
2019-05-22 21:16:55 +00:00
}
2022-08-24 12:25:18 +00:00
if err := promql . ValidateMaxPointsPerSeries ( start , end , step , * maxPointsPerTimeseries ) ; err != nil {
2022-09-06 10:25:59 +00:00
return fmt . Errorf ( "%w; (see -search.maxPointsPerTimeseries command-line flag)" , err )
2019-05-22 21:16:55 +00:00
}
2019-09-04 10:09:20 +00:00
if mayCache {
start , end = promql . AdjustStartEnd ( start , end , step )
}
2019-05-22 21:16:55 +00:00
2023-03-27 22:11:40 +00:00
qs := & promql . QueryStats { }
ec := & promql . EvalConfig {
2021-12-06 15:07:06 +00:00
AuthToken : at ,
Start : start ,
End : end ,
Step : step ,
2022-08-24 12:25:18 +00:00
MaxPointsPerSeries : * maxPointsPerTimeseries ,
2022-03-26 08:17:37 +00:00
MaxSeries : * maxUniqueTimeseries ,
2021-12-06 15:07:06 +00:00
QuotedRemoteAddr : httpserver . GetQuotedRemoteAddr ( r ) ,
Deadline : deadline ,
MayCache : mayCache ,
LookbackDelta : lookbackDelta ,
RoundDigits : getRoundDigits ( r ) ,
EnforcedTagFilterss : etfs ,
2023-02-24 02:40:31 +00:00
GetRequestURI : func ( ) string {
return httpserver . GetRequestURI ( r )
} ,
2019-06-29 22:27:03 +00:00
2020-09-10 21:29:26 +00:00
DenyPartialResponse : searchutils . GetDenyPartialResponse ( r ) ,
2023-03-27 22:11:40 +00:00
QueryStats : qs ,
2019-05-22 21:16:55 +00:00
}
2023-03-27 22:11:40 +00:00
result , err := promql . Exec ( qt , ec , query , false )
2019-05-22 21:16:55 +00:00
if err != nil {
2022-08-15 10:38:47 +00:00
return err
2019-05-22 21:16:55 +00:00
}
2021-01-19 20:55:46 +00:00
if step < maxStepForPointsAdjustment . Milliseconds ( ) {
2022-12-17 01:12:26 +00:00
queryOffset , err := getLatencyOffsetMilliseconds ( r )
if err != nil {
return err
}
2021-01-19 20:55:46 +00:00
if ct - queryOffset < end {
result = adjustLastPoints ( result , ct - queryOffset , ct + step )
}
2019-05-22 21:16:55 +00:00
}
2019-08-20 19:52:49 +00:00
// Remove NaN values as Prometheus does.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/153
2020-07-20 12:28:36 +00:00
result = removeEmptyValuesAndTimeseries ( result )
2019-08-20 19:52:49 +00:00
2021-11-09 16:03:50 +00:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2020-09-27 20:17:14 +00:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
2022-05-31 23:31:40 +00:00
qtDone := func ( ) {
2022-06-08 18:05:17 +00:00
qt . Donef ( "start=%d, end=%d, step=%d, query=%q: series=%d" , start , end , step , query , len ( result ) )
2022-05-31 23:31:40 +00:00
}
2023-03-27 22:11:40 +00:00
WriteQueryRangeResponse ( bw , ec . IsPartialResponse . Load ( ) , result , qt , qtDone , qs )
2020-09-27 20:17:14 +00:00
if err := bw . Flush ( ) ; err != nil {
2021-09-16 09:56:58 +00:00
return fmt . Errorf ( "cannot send query range response to remote client: %w" , err )
2020-09-27 20:17:14 +00:00
}
2019-05-22 21:16:55 +00:00
return nil
}
2020-07-20 12:28:36 +00:00
func removeEmptyValuesAndTimeseries ( tss [ ] netstorage . Result ) [ ] netstorage . Result {
dst := tss [ : 0 ]
2019-08-20 19:52:49 +00:00
for i := range tss {
ts := & tss [ i ]
hasNaNs := false
for _ , v := range ts . Values {
if math . IsNaN ( v ) {
hasNaNs = true
break
}
}
if ! hasNaNs {
// Fast path: nothing to remove.
2020-07-20 12:28:36 +00:00
if len ( ts . Values ) > 0 {
dst = append ( dst , * ts )
}
2019-08-20 19:52:49 +00:00
continue
}
// Slow path: remove NaNs.
srcTimestamps := ts . Timestamps
dstValues := ts . Values [ : 0 ]
2023-01-11 08:11:07 +00:00
// Do not re-use ts.Timestamps for dstTimestamps, since ts.Timestamps
// may be shared among multiple time series.
2023-01-11 09:05:31 +00:00
dstTimestamps := make ( [ ] int64 , 0 , len ( ts . Timestamps ) )
2019-08-20 19:52:49 +00:00
for j , v := range ts . Values {
if math . IsNaN ( v ) {
continue
}
dstValues = append ( dstValues , v )
dstTimestamps = append ( dstTimestamps , srcTimestamps [ j ] )
}
ts . Values = dstValues
ts . Timestamps = dstTimestamps
2020-07-20 12:28:36 +00:00
if len ( ts . Values ) > 0 {
dst = append ( dst , * ts )
}
2019-08-20 19:52:49 +00:00
}
2020-07-20 12:28:36 +00:00
return dst
2019-08-20 19:52:49 +00:00
}
2019-05-22 21:16:55 +00:00
var queryRangeDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/query_range"} ` )
2020-07-14 09:45:42 +00:00
var nan = math . NaN ( )
2020-07-05 15:17:02 +00:00
// adjustLastPoints substitutes the last point values on the time range (start..end]
2020-07-14 09:45:42 +00:00
// with the previous point values, since these points may contain incomplete values.
2020-07-05 15:17:02 +00:00
func adjustLastPoints ( tss [ ] netstorage . Result , start , end int64 ) [ ] netstorage . Result {
2019-05-22 21:16:55 +00:00
for i := range tss {
2020-07-05 15:17:02 +00:00
ts := & tss [ i ]
values := ts . Values
timestamps := ts . Timestamps
j := len ( timestamps ) - 1
2020-07-14 09:45:42 +00:00
if j >= 0 && timestamps [ j ] > end {
// It looks like the `offset` is used in the query, which shifts time range beyond the `end`.
// Leave such a time series as is, since it is unclear which points may be incomplete in it.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/625
continue
}
2020-07-05 15:17:02 +00:00
for j >= 0 && timestamps [ j ] > start {
2019-05-22 21:16:55 +00:00
j --
}
2020-07-05 15:17:02 +00:00
j ++
2020-07-14 09:45:42 +00:00
lastValue := nan
if j > 0 {
lastValue = values [ j - 1 ]
2020-07-05 14:56:54 +00:00
}
2020-07-05 15:17:02 +00:00
for j < len ( timestamps ) && timestamps [ j ] <= end {
2020-07-14 09:45:42 +00:00
values [ j ] = lastValue
2020-07-05 15:17:02 +00:00
j ++
2019-05-22 21:16:55 +00:00
}
}
2019-07-04 06:14:15 +00:00
return tss
2019-05-22 21:16:55 +00:00
}
2019-10-15 16:12:27 +00:00
func getMaxLookback ( r * http . Request ) ( int64 , error ) {
2020-03-29 18:50:10 +00:00
d := maxLookback . Milliseconds ( )
2020-04-20 16:25:32 +00:00
if d == 0 {
d = maxStalenessInterval . Milliseconds ( )
}
2022-06-22 11:17:02 +00:00
maxLookback , err := searchutils . GetDuration ( r , "max_lookback" , d )
if err != nil {
return 0 , err
}
d = maxLookback
if * setLookbackToStep {
step , err := searchutils . GetDuration ( r , "step" , d )
if err != nil {
return 0 , err
}
d = step
}
return d , nil
2019-05-22 21:16:55 +00:00
}
func getTagFilterssFromMatches ( matches [ ] string ) ( [ ] [ ] storage . TagFilter , error ) {
tagFilterss := make ( [ ] [ ] storage . TagFilter , 0 , len ( matches ) )
for _ , match := range matches {
2021-12-06 15:07:06 +00:00
tagFilters , err := searchutils . ParseMetricSelector ( match )
2019-05-22 21:16:55 +00:00
if err != nil {
2021-12-06 15:07:06 +00:00
return nil , fmt . Errorf ( "cannot parse matches[]=%s: %w" , match , err )
2019-05-22 21:16:55 +00:00
}
tagFilterss = append ( tagFilterss , tagFilters )
}
return tagFilterss , nil
}
2019-06-29 22:27:03 +00:00
2021-03-15 10:35:44 +00:00
func getRoundDigits ( r * http . Request ) int {
s := r . FormValue ( "round_digits" )
if len ( s ) == 0 {
return 100
}
n , err := strconv . Atoi ( s )
if err != nil {
return 100
}
return n
}
2022-12-17 01:12:26 +00:00
func getLatencyOffsetMilliseconds ( r * http . Request ) ( int64 , error ) {
2020-03-29 18:50:10 +00:00
d := latencyOffset . Milliseconds ( )
2023-02-22 02:06:53 +00:00
if d < 0 {
// Zero latency offset may be useful for some use cases.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2061#issuecomment-1299109836
d = 0
2019-10-28 10:30:50 +00:00
}
2022-12-17 01:12:26 +00:00
return searchutils . GetDuration ( r , "latency_offset" , d )
2019-10-28 10:30:50 +00:00
}
2020-12-25 14:42:05 +00:00
2020-12-25 14:44:26 +00:00
// QueryStatsHandler returns query stats at `/api/v1/status/top_queries`
2020-12-27 10:53:50 +00:00
func QueryStatsHandler ( startTime time . Time , at * auth . Token , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 10:58:54 +00:00
defer queryStatsDuration . UpdateDuration ( startTime )
2020-12-25 14:44:26 +00:00
topN := 20
2020-12-25 14:42:05 +00:00
topNStr := r . FormValue ( "topN" )
if len ( topNStr ) > 0 {
n , err := strconv . Atoi ( topNStr )
if err != nil {
return fmt . Errorf ( "cannot parse `topN` arg %q: %w" , topNStr , err )
}
topN = n
}
2020-12-25 14:44:26 +00:00
maxLifetimeMsecs , err := searchutils . GetDuration ( r , "maxLifetime" , 10 * 60 * 1000 )
if err != nil {
return fmt . Errorf ( "cannot parse `maxLifetime` arg: %w" , err )
}
2020-12-27 10:53:50 +00:00
maxLifetime := time . Duration ( maxLifetimeMsecs ) * time . Millisecond
2021-11-09 16:03:50 +00:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2020-12-25 14:42:05 +00:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
2020-12-27 10:53:50 +00:00
if at == nil {
querystats . WriteJSONQueryStats ( bw , topN , maxLifetime )
} else {
querystats . WriteJSONQueryStatsForAccountProject ( bw , topN , at . AccountID , at . ProjectID , maxLifetime )
}
2020-12-25 14:42:05 +00:00
if err := bw . Flush ( ) ; err != nil {
2021-09-16 09:56:58 +00:00
return fmt . Errorf ( "cannot send query stats response to client: %w" , err )
2020-12-25 14:42:05 +00:00
}
return nil
}
2020-12-25 14:44:26 +00:00
var queryStatsDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/status/top_queries"} ` )
2022-05-03 12:52:50 +00:00
2022-06-09 17:13:04 +00:00
// commonParams contains common parameters for all /api/v1/* handlers
2022-05-03 12:52:50 +00:00
//
2022-06-09 17:13:04 +00:00
// timeout, start, end, match[], extra_label, extra_filters[]
type commonParams struct {
deadline searchutils . Deadline
start int64
end int64
currentTimestamp int64
filterss [ ] [ ] storage . TagFilter
}
func ( cp * commonParams ) IsDefaultTimeRange ( ) bool {
return cp . start == 0 && cp . currentTimestamp - cp . end < 1000
2022-05-03 12:52:50 +00:00
}
2022-06-09 17:13:04 +00:00
// getCommonParams obtains common params from r, which are used in /api/v1/export* handlers
//
// - timeout
// - start
// - end
// - match[]
// - extra_label
// - extra_filters[]
func getExportParams ( r * http . Request , startTime time . Time ) ( * commonParams , error ) {
cp , err := getCommonParams ( r , startTime , true )
if err != nil {
return nil , err
}
cp . deadline = searchutils . GetDeadlineForExport ( r , startTime )
return cp , nil
}
2022-09-05 08:53:15 +00:00
func getCommonParamsWithDefaultDuration ( r * http . Request , startTime time . Time , requireNonEmptyMatch bool ) ( * commonParams , error ) {
cp , err := getCommonParams ( r , startTime , requireNonEmptyMatch )
if err != nil {
return nil , err
}
if cp . start == 0 {
cp . start = cp . end - defaultStep
}
return cp , nil
}
2022-06-09 17:13:04 +00:00
// getCommonParams obtains common params from r, which are used in /api/v1/* handlers:
//
// - timeout
// - start
// - end
// - match[]
// - extra_label
// - extra_filters[]
func getCommonParams ( r * http . Request , startTime time . Time , requireNonEmptyMatch bool ) ( * commonParams , error ) {
deadline := searchutils . GetDeadlineForQuery ( r , startTime )
2022-09-05 08:53:15 +00:00
start , err := searchutils . GetTime ( r , "start" , 0 )
2022-05-03 12:52:50 +00:00
if err != nil {
return nil , err
}
2022-09-05 08:53:15 +00:00
ct := startTime . UnixNano ( ) / 1e6
2022-05-03 12:52:50 +00:00
end , err := searchutils . GetTime ( r , "end" , ct )
if err != nil {
return nil , err
}
2022-06-16 17:46:31 +00:00
// Limit the `end` arg to the current time +2 days in the same way
// as it is limited during data ingestion.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/blob/ea06d2fd3ccbbb6aa4480ab3b04f7b671408be2a/lib/storage/table.go#L378
// This should fix possible timestamp overflow - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2669
maxTS := startTime . UnixNano ( ) / 1e6 + 2 * 24 * 3600 * 1000
if end > maxTS {
end = maxTS
}
2022-05-03 12:52:50 +00:00
if end < start {
end = start
}
2022-06-09 17:13:04 +00:00
matches := append ( [ ] string { } , r . Form [ "match[]" ] ... )
matches = append ( matches , r . Form [ "match" ] ... )
if requireNonEmptyMatch && len ( matches ) == 0 {
return nil , fmt . Errorf ( "missing `match[]` arg" )
2022-05-03 12:52:50 +00:00
}
tagFilterss , err := getTagFilterssFromMatches ( matches )
if err != nil {
return nil , err
}
etfs , err := searchutils . GetExtraTagFilters ( r )
if err != nil {
return nil , err
}
filterss := searchutils . JoinTagFilterss ( tagFilterss , etfs )
2022-06-09 17:13:04 +00:00
cp := & commonParams {
deadline : deadline ,
start : start ,
end : end ,
currentTimestamp : ct ,
filterss : filterss ,
}
return cp , nil
2022-05-03 12:52:50 +00:00
}
2022-10-01 19:05:43 +00:00
type scalableWriter struct {
bw * bufferedwriter . Writer
m sync . Map
}
func newScalableWriter ( bw * bufferedwriter . Writer ) * scalableWriter {
return & scalableWriter {
bw : bw ,
}
}
func ( sw * scalableWriter ) getBuffer ( workerID uint ) * bytesutil . ByteBuffer {
v , ok := sw . m . Load ( workerID )
if ! ok {
v = & bytesutil . ByteBuffer { }
sw . m . Store ( workerID , v )
}
return v . ( * bytesutil . ByteBuffer )
}
func ( sw * scalableWriter ) maybeFlushBuffer ( bb * bytesutil . ByteBuffer ) error {
if len ( bb . B ) < 1024 * 1024 {
return nil
}
_ , err := sw . bw . Write ( bb . B )
bb . Reset ( )
return err
}
func ( sw * scalableWriter ) flush ( ) error {
sw . m . Range ( func ( k , v interface { } ) bool {
bb := v . ( * bytesutil . ByteBuffer )
_ , err := sw . bw . Write ( bb . B )
return err == nil
} )
return sw . bw . Flush ( )
}