2020-02-23 11:35:47 +00:00
package promremotewrite
2019-05-22 21:16:55 +00:00
import (
2020-09-27 23:06:27 +00:00
"bufio"
2019-05-22 21:16:55 +00:00
"fmt"
2020-01-28 20:53:50 +00:00
"io"
2019-05-22 21:16:55 +00:00
"net/http"
"runtime"
"sync"
2020-01-28 20:53:50 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2020-08-16 14:05:52 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
2020-09-28 01:11:55 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2019-05-22 21:16:55 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
2020-09-28 01:11:55 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
2019-05-22 21:16:55 +00:00
"github.com/VictoriaMetrics/metrics"
2020-01-28 20:53:50 +00:00
"github.com/golang/snappy"
2019-05-22 21:16:55 +00:00
)
2020-08-16 14:05:52 +00:00
var maxInsertRequestSize = flagutil . NewBytes ( "maxInsertRequestSize" , 32 * 1024 * 1024 , "The maximum size in bytes of a single Prometheus remote_write API request" )
2020-01-28 20:53:50 +00:00
2020-02-23 11:35:47 +00:00
// ParseStream parses Prometheus remote_write message req and calls callback for the parsed timeseries.
//
2020-11-01 21:12:13 +00:00
// The callback can be called concurrently multiple times for streamed data from req.
// The callback can be called after ParseStream returns.
//
2020-09-28 01:11:55 +00:00
// callback shouldn't hold tss after returning.
func ParseStream ( req * http . Request , callback func ( tss [ ] prompb . TimeSeries ) error ) error {
2020-09-27 23:06:27 +00:00
ctx := getPushCtx ( req . Body )
2019-05-22 21:16:55 +00:00
defer putPushCtx ( ctx )
2020-09-27 23:06:27 +00:00
if err := ctx . Read ( ) ; err != nil {
2019-05-22 21:16:55 +00:00
return err
}
2020-09-28 01:11:55 +00:00
uw := getUnmarshalWork ( )
2020-11-13 08:58:33 +00:00
ctx . wg . Add ( 1 )
uw . callback = func ( tss [ ] prompb . TimeSeries ) error {
// Propagate the error to the caller of ParseStream, so it could properly return HTTP 503 status code on error.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/896
ctx . err = callback ( tss )
ctx . wg . Done ( )
// Do not return the error from callback in order to prevent from double logging.
return nil
}
2020-09-28 14:06:26 +00:00
uw . reqBuf , ctx . reqBuf . B = ctx . reqBuf . B , uw . reqBuf
2020-09-28 01:11:55 +00:00
common . ScheduleUnmarshalWork ( uw )
2020-11-13 08:58:33 +00:00
ctx . wg . Wait ( )
return ctx . err
2019-05-22 21:16:55 +00:00
}
type pushCtx struct {
2020-09-27 23:06:27 +00:00
br * bufio . Reader
2020-09-28 01:11:55 +00:00
reqBuf bytesutil . ByteBuffer
2020-11-13 08:58:33 +00:00
wg sync . WaitGroup
err error
2019-05-22 21:16:55 +00:00
}
func ( ctx * pushCtx ) reset ( ) {
2020-09-27 23:06:27 +00:00
ctx . br . Reset ( nil )
2020-09-28 01:11:55 +00:00
ctx . reqBuf . Reset ( )
2020-11-13 08:58:33 +00:00
ctx . err = nil
2019-05-22 21:16:55 +00:00
}
2020-09-27 23:06:27 +00:00
func ( ctx * pushCtx ) Read ( ) error {
2020-02-23 11:35:47 +00:00
readCalls . Inc ( )
2020-09-28 01:11:55 +00:00
lr := io . LimitReader ( ctx . br , int64 ( maxInsertRequestSize . N ) + 1 )
reqLen , err := ctx . reqBuf . ReadFrom ( lr )
2019-05-22 21:16:55 +00:00
if err != nil {
2020-02-23 11:35:47 +00:00
readErrors . Inc ( )
2020-09-28 01:11:55 +00:00
return fmt . Errorf ( "cannot read compressed request: %w" , err )
2019-05-22 21:16:55 +00:00
}
2020-09-28 01:11:55 +00:00
if reqLen > int64 ( maxInsertRequestSize . N ) {
readErrors . Inc ( )
return fmt . Errorf ( "too big packed request; mustn't exceed `-maxInsertRequestSize=%d` bytes" , maxInsertRequestSize . N )
2020-02-23 11:35:47 +00:00
}
2019-05-22 21:16:55 +00:00
return nil
}
var (
2020-02-28 18:19:35 +00:00
readCalls = metrics . NewCounter ( ` vm_protoparser_read_calls_total { type="promremotewrite"} ` )
readErrors = metrics . NewCounter ( ` vm_protoparser_read_errors_total { type="promremotewrite"} ` )
rowsRead = metrics . NewCounter ( ` vm_protoparser_rows_read_total { type="promremotewrite"} ` )
2020-07-08 11:18:41 +00:00
unmarshalErrors = metrics . NewCounter ( ` vm_protoparser_unmarshal_errors_total { type="promremotewrite"} ` )
2019-05-22 21:16:55 +00:00
)
2020-09-27 23:06:27 +00:00
func getPushCtx ( r io . Reader ) * pushCtx {
2019-05-22 21:16:55 +00:00
select {
case ctx := <- pushCtxPoolCh :
2020-09-27 23:06:27 +00:00
ctx . br . Reset ( r )
2019-05-22 21:16:55 +00:00
return ctx
default :
if v := pushCtxPool . Get ( ) ; v != nil {
2020-09-27 23:06:27 +00:00
ctx := v . ( * pushCtx )
ctx . br . Reset ( r )
return ctx
}
return & pushCtx {
br : bufio . NewReaderSize ( r , 64 * 1024 ) ,
2019-05-22 21:16:55 +00:00
}
}
}
func putPushCtx ( ctx * pushCtx ) {
ctx . reset ( )
select {
case pushCtxPoolCh <- ctx :
default :
pushCtxPool . Put ( ctx )
}
}
var pushCtxPool sync . Pool
var pushCtxPoolCh = make ( chan * pushCtx , runtime . GOMAXPROCS ( - 1 ) )
2020-01-28 20:53:50 +00:00
2020-09-28 01:11:55 +00:00
type unmarshalWork struct {
wr prompb . WriteRequest
callback func ( tss [ ] prompb . TimeSeries ) error
reqBuf [ ] byte
}
func ( uw * unmarshalWork ) reset ( ) {
uw . wr . Reset ( )
uw . callback = nil
uw . reqBuf = uw . reqBuf [ : 0 ]
}
// Unmarshal implements common.UnmarshalWork
func ( uw * unmarshalWork ) Unmarshal ( ) {
2020-01-28 20:53:50 +00:00
bb := bodyBufferPool . Get ( )
2020-09-28 01:11:55 +00:00
defer bodyBufferPool . Put ( bb )
var err error
bb . B , err = snappy . Decode ( bb . B [ : cap ( bb . B ) ] , uw . reqBuf )
2020-01-28 20:53:50 +00:00
if err != nil {
2020-09-28 01:11:55 +00:00
logger . Errorf ( "cannot decompress request with length %d: %s" , len ( uw . reqBuf ) , err )
return
2020-01-28 20:53:50 +00:00
}
2020-09-28 01:11:55 +00:00
if len ( bb . B ) > maxInsertRequestSize . N {
logger . Errorf ( "too big unpacked request; mustn't exceed `-maxInsertRequestSize=%d` bytes; got %d bytes" , maxInsertRequestSize . N , len ( bb . B ) )
return
2020-01-28 20:53:50 +00:00
}
2020-09-28 01:11:55 +00:00
if err := uw . wr . Unmarshal ( bb . B ) ; err != nil {
unmarshalErrors . Inc ( )
logger . Errorf ( "cannot unmarshal prompb.WriteRequest with size %d bytes: %s" , len ( bb . B ) , err )
return
2020-01-28 20:53:50 +00:00
}
2020-09-28 01:11:55 +00:00
rows := 0
tss := uw . wr . Timeseries
for i := range tss {
rows += len ( tss [ i ] . Samples )
2020-01-28 20:53:50 +00:00
}
2020-09-28 01:11:55 +00:00
rowsRead . Add ( rows )
if err := uw . callback ( tss ) ; err != nil {
logger . Errorf ( "error when processing imported data: %s" , err )
putUnmarshalWork ( uw )
return
2020-01-28 20:53:50 +00:00
}
2020-09-28 01:11:55 +00:00
putUnmarshalWork ( uw )
2020-01-28 20:53:50 +00:00
}
var bodyBufferPool bytesutil . ByteBufferPool
2020-09-28 01:11:55 +00:00
func getUnmarshalWork ( ) * unmarshalWork {
v := unmarshalWorkPool . Get ( )
if v == nil {
return & unmarshalWork { }
}
return v . ( * unmarshalWork )
}
func putUnmarshalWork ( uw * unmarshalWork ) {
uw . reset ( )
unmarshalWorkPool . Put ( uw )
}
var unmarshalWorkPool sync . Pool