2023-02-13 17:58:48 +00:00
package stream
2020-02-23 11:35:47 +00:00
import (
2020-09-27 23:06:27 +00:00
"bufio"
2020-04-10 09:43:51 +00:00
"flag"
2020-02-23 11:35:47 +00:00
"fmt"
"io"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2020-10-05 12:18:50 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
2020-02-23 11:35:47 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
2023-02-13 17:58:48 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
2023-01-07 02:59:39 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
2020-02-23 11:35:47 +00:00
"github.com/VictoriaMetrics/metrics"
)
2020-04-10 09:43:51 +00:00
var (
2024-10-15 09:48:40 +00:00
maxLineSize = flagutil . NewBytes ( "influx.maxLineSize" , 256 * 1024 , "The maximum size in bytes for a single InfluxDB line during parsing. Applicable for stream mode only. See https://docs.victoriametrics.com/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf" )
maxRequestSize = flagutil . NewBytes ( "influx.maxRequestSize" , 64 * 1024 * 1024 , "The maximum size in bytes of a single InfluxDB request. Applicable for batch mode only. See https://docs.victoriametrics.com/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf" )
trimTimestamp = flag . Duration ( "influxTrimTimestamp" , time . Millisecond , "Trim timestamps for InfluxDB line protocol data to this duration. " +
2020-04-10 09:43:51 +00:00
"Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data" )
)
2023-02-13 17:58:48 +00:00
// Parse parses r with the given args and calls callback for the parsed rows.
2020-02-23 11:35:47 +00:00
//
2020-11-01 21:12:13 +00:00
// The callback can be called concurrently multiple times for streamed data from r.
2020-02-23 11:35:47 +00:00
//
// callback shouldn't hold rows after returning.
2024-10-15 09:48:40 +00:00
func Parse ( r io . Reader , isStreamMode , isGzipped bool , precision , db string , callback func ( db string , rows [ ] influx . Row ) error ) error {
2023-01-07 02:59:39 +00:00
wcr := writeconcurrencylimiter . GetReader ( r )
defer writeconcurrencylimiter . PutReader ( wcr )
r = wcr
2020-02-25 17:09:46 +00:00
if isGzipped {
2020-02-23 11:35:47 +00:00
zr , err := common . GetGzipReader ( r )
if err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot read gzipped influx line protocol data: %w" , err )
2020-02-23 11:35:47 +00:00
}
defer common . PutGzipReader ( zr )
r = zr
}
2021-10-28 09:46:27 +00:00
tsMultiplier := int64 ( 0 )
2020-02-25 17:09:46 +00:00
switch precision {
2020-02-23 11:35:47 +00:00
case "ns" :
tsMultiplier = 1e6
2020-08-10 17:23:09 +00:00
case "u" , "us" , "µ" :
2020-02-23 11:35:47 +00:00
tsMultiplier = 1e3
case "ms" :
tsMultiplier = 1
case "s" :
tsMultiplier = - 1e3
case "m" :
tsMultiplier = - 1e3 * 60
case "h" :
tsMultiplier = - 1e3 * 3600
}
2024-10-15 09:48:40 +00:00
// processing payload altogether
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7090
if ! isStreamMode {
ctx := getBatchContext ( r )
defer putBatchContext ( ctx )
err := ctx . Read ( )
if err != nil {
return err
}
err = unmarshal ( & ctx . rows , ctx . reqBuf . B , tsMultiplier )
if err != nil {
return fmt . Errorf ( "cannot parse influx line protocol data: %s; To skip invalid lines switch to stream mode by passing Stream-Mode: \"1\" header with each request" , err )
}
return callback ( db , ctx . rows . Rows )
}
// processing in a streaming fashion, line-by-line
// invalid lines are skipped
2020-09-27 23:06:27 +00:00
ctx := getStreamContext ( r )
2020-02-23 11:35:47 +00:00
defer putStreamContext ( ctx )
2020-09-28 01:11:55 +00:00
for ctx . Read ( ) {
uw := getUnmarshalWork ( )
2022-04-06 11:00:08 +00:00
uw . ctx = ctx
uw . callback = callback
2020-09-28 01:11:55 +00:00
uw . db = db
uw . tsMultiplier = tsMultiplier
2020-09-28 14:06:26 +00:00
uw . reqBuf , ctx . reqBuf = ctx . reqBuf , uw . reqBuf
2020-11-13 11:03:54 +00:00
ctx . wg . Add ( 1 )
2020-09-28 01:11:55 +00:00
common . ScheduleUnmarshalWork ( uw )
2023-01-07 02:59:39 +00:00
wcr . DecConcurrency ( )
2020-02-23 11:35:47 +00:00
}
2020-11-13 11:03:54 +00:00
ctx . wg . Wait ( )
if err := ctx . Error ( ) ; err != nil {
return err
}
return ctx . callbackErr
2020-02-23 11:35:47 +00:00
}
var (
2020-02-28 18:19:35 +00:00
readCalls = metrics . NewCounter ( ` vm_protoparser_read_calls_total { type="influx"} ` )
readErrors = metrics . NewCounter ( ` vm_protoparser_read_errors_total { type="influx"} ` )
rowsRead = metrics . NewCounter ( ` vm_protoparser_rows_read_total { type="influx"} ` )
2020-02-23 11:35:47 +00:00
)
2024-10-15 09:48:40 +00:00
type batchContext struct {
br * bufio . Reader
reqBuf bytesutil . ByteBuffer
rows influx . Rows
}
func ( ctx * batchContext ) Read ( ) error {
readCalls . Inc ( )
lr := io . LimitReader ( ctx . br , int64 ( maxRequestSize . IntN ( ) ) )
reqLen , err := ctx . reqBuf . ReadFrom ( lr )
if err != nil {
readErrors . Inc ( )
return err
} else if reqLen > int64 ( maxRequestSize . IntN ( ) ) {
readErrors . Inc ( )
return fmt . Errorf ( "too big request; mustn't exceed -influx.maxRequestSize=%d bytes" , maxRequestSize . N )
}
return nil
}
func ( ctx * batchContext ) reset ( ) {
ctx . br . Reset ( nil )
ctx . reqBuf . Reset ( )
ctx . rows . Reset ( )
}
func getBatchContext ( r io . Reader ) * batchContext {
if v := batchContextPool . Get ( ) ; v != nil {
ctx := v . ( * batchContext )
ctx . br . Reset ( r )
return ctx
}
return & batchContext {
br : bufio . NewReaderSize ( r , 64 * 1024 ) ,
}
}
func putBatchContext ( ctx * batchContext ) {
ctx . reset ( )
batchContextPool . Put ( ctx )
}
var batchContextPool sync . Pool
2020-02-23 11:35:47 +00:00
type streamContext struct {
2020-09-27 23:06:27 +00:00
br * bufio . Reader
2020-02-23 11:35:47 +00:00
reqBuf [ ] byte
tailBuf [ ] byte
err error
2020-11-13 11:03:54 +00:00
wg sync . WaitGroup
callbackErrLock sync . Mutex
callbackErr error
2020-02-23 11:35:47 +00:00
}
2024-10-15 09:48:40 +00:00
func ( ctx * streamContext ) Read ( ) bool {
readCalls . Inc ( )
if ctx . err != nil || ctx . hasCallbackError ( ) {
return false
}
ctx . reqBuf , ctx . tailBuf , ctx . err = common . ReadLinesBlockExt ( ctx . br , ctx . reqBuf , ctx . tailBuf , maxLineSize . IntN ( ) )
if ctx . err != nil {
if ctx . err != io . EOF {
readErrors . Inc ( )
ctx . err = fmt . Errorf ( "cannot read influx line protocol data: %w" , ctx . err )
}
return false
}
return true
}
2020-02-23 11:35:47 +00:00
func ( ctx * streamContext ) Error ( ) error {
if ctx . err == io . EOF {
return nil
}
return ctx . err
}
2021-06-14 12:18:46 +00:00
func ( ctx * streamContext ) hasCallbackError ( ) bool {
ctx . callbackErrLock . Lock ( )
ok := ctx . callbackErr != nil
ctx . callbackErrLock . Unlock ( )
return ok
}
2020-02-23 11:35:47 +00:00
func ( ctx * streamContext ) reset ( ) {
2020-09-27 23:06:27 +00:00
ctx . br . Reset ( nil )
2020-02-23 11:35:47 +00:00
ctx . reqBuf = ctx . reqBuf [ : 0 ]
ctx . tailBuf = ctx . tailBuf [ : 0 ]
ctx . err = nil
2020-11-13 11:03:54 +00:00
ctx . callbackErr = nil
2020-02-23 11:35:47 +00:00
}
2020-09-27 23:06:27 +00:00
func getStreamContext ( r io . Reader ) * streamContext {
2024-04-20 19:59:49 +00:00
if v := streamContextPool . Get ( ) ; v != nil {
ctx := v . ( * streamContext )
2020-09-27 23:06:27 +00:00
ctx . br . Reset ( r )
2020-02-23 11:35:47 +00:00
return ctx
2024-04-20 19:59:49 +00:00
}
return & streamContext {
br : bufio . NewReaderSize ( r , 64 * 1024 ) ,
2020-02-23 11:35:47 +00:00
}
}
func putStreamContext ( ctx * streamContext ) {
ctx . reset ( )
2024-04-20 19:59:49 +00:00
streamContextPool . Put ( ctx )
2020-02-23 11:35:47 +00:00
}
var streamContextPool sync . Pool
2020-09-28 01:11:55 +00:00
type unmarshalWork struct {
2023-02-13 17:58:48 +00:00
rows influx . Rows
2022-04-06 11:00:08 +00:00
ctx * streamContext
2023-02-13 17:58:48 +00:00
callback func ( db string , rows [ ] influx . Row ) error
2020-09-28 01:11:55 +00:00
db string
tsMultiplier int64
reqBuf [ ] byte
}
func ( uw * unmarshalWork ) reset ( ) {
uw . rows . Reset ( )
2022-04-06 11:00:08 +00:00
uw . ctx = nil
2020-09-28 01:11:55 +00:00
uw . callback = nil
uw . db = ""
uw . tsMultiplier = 0
uw . reqBuf = uw . reqBuf [ : 0 ]
}
2024-10-15 09:48:40 +00:00
func ( uw * unmarshalWork ) runCallback ( ) {
2022-04-06 11:00:08 +00:00
ctx := uw . ctx
2024-10-15 09:48:40 +00:00
if err := uw . callback ( uw . db , uw . rows . Rows ) ; err != nil {
err = fmt . Errorf ( "error when processing imported data: %w" , err )
2022-04-06 11:00:08 +00:00
ctx . callbackErrLock . Lock ( )
if ctx . callbackErr == nil {
2024-10-15 09:48:40 +00:00
ctx . callbackErr = err
2022-04-06 11:00:08 +00:00
}
ctx . callbackErrLock . Unlock ( )
}
ctx . wg . Done ( )
}
2020-09-28 01:11:55 +00:00
// Unmarshal implements common.UnmarshalWork
func ( uw * unmarshalWork ) Unmarshal ( ) {
2024-10-15 09:48:40 +00:00
_ = unmarshal ( & uw . rows , uw . reqBuf , uw . tsMultiplier )
uw . runCallback ( )
putUnmarshalWork ( uw )
}
func getUnmarshalWork ( ) * unmarshalWork {
v := unmarshalWorkPool . Get ( )
if v == nil {
v = & unmarshalWork { }
}
uw := v . ( * unmarshalWork )
uw . rows . IgnoreErrs = true
return uw
}
func putUnmarshalWork ( uw * unmarshalWork ) {
uw . reset ( )
unmarshalWorkPool . Put ( uw )
}
var unmarshalWorkPool sync . Pool
func detectTimestamp ( ts , currentTs int64 ) int64 {
if ts == 0 {
return currentTs
}
if ts >= 1e17 {
// convert nanoseconds to milliseconds
return ts / 1e6
}
if ts >= 1e14 {
// convert microseconds to milliseconds
return ts / 1e3
}
if ts >= 1e11 {
// the ts is in milliseconds
return ts
}
// convert seconds to milliseconds
return ts * 1e3
}
func unmarshal ( rs * influx . Rows , reqBuf [ ] byte , tsMultiplier int64 ) error {
// do not return error immediately because rs.Rows could contain
// successfully parsed rows that needs to be processed below
err := rs . Unmarshal ( bytesutil . ToUnsafeString ( reqBuf ) )
rows := rs . Rows
2020-09-28 01:11:55 +00:00
rowsRead . Add ( len ( rows ) )
// Adjust timestamps according to uw.tsMultiplier
currentTs := time . Now ( ) . UnixNano ( ) / 1e6
2021-10-28 09:46:27 +00:00
if tsMultiplier == 0 {
// Default precision is 'ns'. See https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/#timestamp
// But it can be in ns, us, ms or s depending on the number of digits in practice.
for i := range rows {
tsPtr := & rows [ i ] . Timestamp
* tsPtr = detectTimestamp ( * tsPtr , currentTs )
}
} else if tsMultiplier >= 1 {
2020-09-28 01:11:55 +00:00
for i := range rows {
row := & rows [ i ]
if row . Timestamp == 0 {
row . Timestamp = currentTs
} else {
row . Timestamp /= tsMultiplier
}
}
} else if tsMultiplier < 0 {
tsMultiplier = - tsMultiplier
currentTs -= currentTs % tsMultiplier
for i := range rows {
row := & rows [ i ]
if row . Timestamp == 0 {
row . Timestamp = currentTs
} else {
row . Timestamp *= tsMultiplier
}
}
}
// Trim timestamps if required.
if tsTrim := trimTimestamp . Milliseconds ( ) ; tsTrim > 1 {
for i := range rows {
row := & rows [ i ]
row . Timestamp -= row . Timestamp % tsTrim
}
}
2024-10-15 09:48:40 +00:00
return err
2021-10-28 09:46:27 +00:00
}