2023-02-13 17:51:35 +00:00
package stream
2021-09-28 19:47:45 +00:00
import (
"bufio"
2022-09-26 10:57:20 +00:00
"flag"
2021-09-28 19:47:45 +00:00
"fmt"
"io"
2022-09-26 10:57:20 +00:00
"regexp"
2021-09-28 19:47:45 +00:00
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
2023-02-13 17:51:35 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog"
2023-01-07 02:59:39 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
2021-09-28 19:47:45 +00:00
"github.com/VictoriaMetrics/metrics"
)
2022-09-26 10:57:20 +00:00
var (
// The maximum request size is defined at https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
2023-12-05 00:26:22 +00:00
maxInsertRequestSize = flagutil . NewBytes ( "datadog.maxInsertRequestSize" , 64 * 1024 * 1024 , "The maximum size in bytes of a single DataDog POST request to /api/v1/series" )
2022-09-26 10:57:20 +00:00
// If all metrics in Datadog have the same naming schema as custom metrics, then the following rules apply:
// https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics
// But there's some hidden behaviour. In addition to what it states in the docs, the following is also done:
// - Consecutive underscores are replaced with just one underscore
// - Underscore immediately before or after a dot are removed
sanitizeMetricName = flag . Bool ( "datadog.sanitizeMetricName" , true , "Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at " +
"https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics" )
)
2021-09-28 19:47:45 +00:00
2023-12-05 00:26:22 +00:00
// Parse parses DataDog POST request for /api/v1/series from reader and calls callback for the parsed request.
2021-09-28 19:47:45 +00:00
//
// callback shouldn't hold series after returning.
2023-12-05 00:26:22 +00:00
func Parse ( r io . Reader , contentEncoding string , callback func ( series [ ] datadog . Series ) error ) error {
wcr := writeconcurrencylimiter . GetReader ( r )
2023-01-07 02:59:39 +00:00
defer writeconcurrencylimiter . PutReader ( wcr )
r = wcr
2021-09-28 19:47:45 +00:00
switch contentEncoding {
case "gzip" :
zr , err := common . GetGzipReader ( r )
if err != nil {
return fmt . Errorf ( "cannot read gzipped DataDog data: %w" , err )
}
defer common . PutGzipReader ( zr )
r = zr
case "deflate" :
zlr , err := common . GetZlibReader ( r )
if err != nil {
return fmt . Errorf ( "cannot read deflated DataDog data: %w" , err )
}
defer common . PutZlibReader ( zlr )
r = zlr
}
ctx := getPushCtx ( r )
defer putPushCtx ( ctx )
if err := ctx . Read ( ) ; err != nil {
return err
}
2023-12-05 00:26:22 +00:00
req := getRequest ( )
defer putRequest ( req )
if err := req . Unmarshal ( ctx . reqBuf . B ) ; err != nil {
2021-09-28 19:47:45 +00:00
unmarshalErrors . Inc ( )
2023-10-25 19:24:01 +00:00
return fmt . Errorf ( "cannot unmarshal DataDog POST request with size %d bytes: %w" , len ( ctx . reqBuf . B ) , err )
2021-09-28 19:47:45 +00:00
}
2023-12-05 00:26:22 +00:00
rows := 0
series := req . Series
for i := range series {
rows += len ( series [ i ] . Points )
if * sanitizeMetricName {
series [ i ] . Metric = sanitizeName ( series [ i ] . Metric )
}
2021-09-28 19:47:45 +00:00
}
2023-12-05 00:26:22 +00:00
rowsRead . Add ( rows )
2021-09-28 19:47:45 +00:00
2023-12-05 00:26:22 +00:00
if err := callback ( series ) ; err != nil {
2021-09-28 19:47:45 +00:00
return fmt . Errorf ( "error when processing imported data: %w" , err )
}
return nil
}
type pushCtx struct {
br * bufio . Reader
reqBuf bytesutil . ByteBuffer
}
func ( ctx * pushCtx ) reset ( ) {
ctx . br . Reset ( nil )
ctx . reqBuf . Reset ( )
}
func ( ctx * pushCtx ) Read ( ) error {
readCalls . Inc ( )
lr := io . LimitReader ( ctx . br , int64 ( maxInsertRequestSize . N ) + 1 )
startTime := fasttime . UnixTimestamp ( )
reqLen , err := ctx . reqBuf . ReadFrom ( lr )
if err != nil {
readErrors . Inc ( )
2023-10-15 22:25:23 +00:00
return fmt . Errorf ( "cannot read request in %d seconds: %w" , fasttime . UnixTimestamp ( ) - startTime , err )
2021-09-28 19:47:45 +00:00
}
if reqLen > int64 ( maxInsertRequestSize . N ) {
readErrors . Inc ( )
2023-10-15 22:25:23 +00:00
return fmt . Errorf ( "too big request; mustn't exceed -datadog.maxInsertRequestSize=%d bytes" , maxInsertRequestSize . N )
2021-09-28 19:47:45 +00:00
}
return nil
}
var (
readCalls = metrics . NewCounter ( ` vm_protoparser_read_calls_total { type="datadog"} ` )
readErrors = metrics . NewCounter ( ` vm_protoparser_read_errors_total { type="datadog"} ` )
rowsRead = metrics . NewCounter ( ` vm_protoparser_rows_read_total { type="datadog"} ` )
unmarshalErrors = metrics . NewCounter ( ` vm_protoparser_unmarshal_errors_total { type="datadog"} ` )
)
func getPushCtx ( r io . Reader ) * pushCtx {
select {
case ctx := <- pushCtxPoolCh :
ctx . br . Reset ( r )
return ctx
default :
if v := pushCtxPool . Get ( ) ; v != nil {
ctx := v . ( * pushCtx )
ctx . br . Reset ( r )
return ctx
}
return & pushCtx {
br : bufio . NewReaderSize ( r , 64 * 1024 ) ,
}
}
}
func putPushCtx ( ctx * pushCtx ) {
ctx . reset ( )
select {
case pushCtxPoolCh <- ctx :
default :
pushCtxPool . Put ( ctx )
}
}
var pushCtxPool sync . Pool
var pushCtxPoolCh = make ( chan * pushCtx , cgroup . AvailableCPUs ( ) )
2023-12-05 00:26:22 +00:00
func getRequest ( ) * datadog . Request {
2023-12-05 00:19:29 +00:00
v := requestPool . Get ( )
2021-09-28 19:47:45 +00:00
if v == nil {
2023-12-05 00:26:22 +00:00
return & datadog . Request { }
2021-09-28 19:47:45 +00:00
}
2023-12-05 00:26:22 +00:00
return v . ( * datadog . Request )
2021-09-28 19:47:45 +00:00
}
2023-12-05 00:26:22 +00:00
func putRequest ( req * datadog . Request ) {
2023-12-05 00:19:29 +00:00
requestPool . Put ( req )
2021-09-28 19:47:45 +00:00
}
2023-12-05 00:19:29 +00:00
var requestPool sync . Pool
2022-09-26 10:57:20 +00:00
2023-02-13 12:27:13 +00:00
// sanitizeName performs DataDog-compatible sanitizing for metric names
2022-09-26 10:57:20 +00:00
//
// See https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics
2023-12-05 00:26:22 +00:00
func sanitizeName ( name string ) string {
return namesSanitizer . Transform ( name )
2022-09-26 10:57:20 +00:00
}
2022-09-28 07:39:01 +00:00
var namesSanitizer = bytesutil . NewFastStringTransformer ( func ( s string ) string {
s = unsupportedDatadogChars . ReplaceAllString ( s , "_" )
s = multiUnderscores . ReplaceAllString ( s , "_" )
s = underscoresWithDots . ReplaceAllString ( s , "." )
return s
} )
2022-09-28 07:05:54 +00:00
2022-09-28 07:39:01 +00:00
var (
2022-09-26 10:57:20 +00:00
unsupportedDatadogChars = regexp . MustCompile ( ` [^0-9a-zA-Z_\.]+ ` )
multiUnderscores = regexp . MustCompile ( ` _+ ` )
underscoresWithDots = regexp . MustCompile ( ` _?\._? ` )
)