mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 15:16:42 +00:00
e0930687f1
This commit adds the following changes: - Added support to push datadog logs with examples of how to ingest data using Vector and Fluentbit - Updated VictoriaLogs examples directory structure to have single container image for victorialogs, agent (fluentbit, vector, etc) but multiple configurations for different protocols Related issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6632
185 lines
5.1 KiB
Go
185 lines
5.1 KiB
Go
package datadog
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/metrics"
|
|
"github.com/valyala/fastjson"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
|
)
|
|
|
|
var parserPool fastjson.ParserPool
|
|
|
|
// RequestHandler processes Datadog insert requests
|
|
func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
|
|
switch path {
|
|
case "/api/v1/validate":
|
|
fmt.Fprintf(w, `{}`)
|
|
return true
|
|
case "/api/v2/logs":
|
|
return datadogLogsIngestion(w, r)
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func datadogLogsIngestion(w http.ResponseWriter, r *http.Request) bool {
|
|
w.Header().Add("Content-Type", "application/json")
|
|
startTime := time.Now()
|
|
v2LogsRequestsTotal.Inc()
|
|
reader := r.Body
|
|
|
|
var ts int64
|
|
if tsValue := r.Header.Get("dd-message-timestamp"); tsValue != "" && tsValue != "0" {
|
|
var err error
|
|
ts, err = strconv.ParseInt(tsValue, 10, 64)
|
|
if err != nil {
|
|
httpserver.Errorf(w, r, "could not parse dd-message-timestamp header value: %s", err)
|
|
return true
|
|
}
|
|
ts *= 1e6
|
|
} else {
|
|
ts = startTime.UnixNano()
|
|
}
|
|
|
|
if r.Header.Get("Content-Encoding") == "gzip" {
|
|
zr, err := common.GetGzipReader(reader)
|
|
if err != nil {
|
|
httpserver.Errorf(w, r, "cannot read gzipped logs request: %s", err)
|
|
return true
|
|
}
|
|
defer common.PutGzipReader(zr)
|
|
reader = zr
|
|
}
|
|
|
|
wcr := writeconcurrencylimiter.GetReader(reader)
|
|
data, err := io.ReadAll(wcr)
|
|
writeconcurrencylimiter.PutReader(wcr)
|
|
if err != nil {
|
|
httpserver.Errorf(w, r, "cannot read request body: %s", err)
|
|
return true
|
|
}
|
|
|
|
cp, err := insertutils.GetCommonParams(r)
|
|
if err != nil {
|
|
httpserver.Errorf(w, r, "%s", err)
|
|
return true
|
|
}
|
|
|
|
if err := vlstorage.CanWriteData(); err != nil {
|
|
httpserver.Errorf(w, r, "%s", err)
|
|
return true
|
|
}
|
|
|
|
lmp := cp.NewLogMessageProcessor()
|
|
n, err := readLogsRequest(ts, data, lmp.AddRow)
|
|
lmp.MustClose()
|
|
if n > 0 {
|
|
rowsIngestedTotal.Add(n)
|
|
}
|
|
if err != nil {
|
|
logger.Warnf("cannot decode log message in /api/v2/logs request: %s, stream fields: %s", err, cp.StreamFields)
|
|
return true
|
|
}
|
|
|
|
// update v2LogsRequestDuration only for successfully parsed requests
|
|
// There is no need in updating v2LogsRequestDuration for request errors,
|
|
// since their timings are usually much smaller than the timing for successful request parsing.
|
|
v2LogsRequestDuration.UpdateDuration(startTime)
|
|
fmt.Fprintf(w, `{}`)
|
|
return true
|
|
}
|
|
|
|
var (
|
|
v2LogsRequestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/datadog/api/v2/logs"}`)
|
|
rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="datadog"}`)
|
|
v2LogsRequestDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/datadog/api/v2/logs"}`)
|
|
)
|
|
|
|
// readLogsRequest parses data according to DataDog logs format
|
|
// https://docs.datadoghq.com/api/latest/logs/#send-logs
|
|
func readLogsRequest(ts int64, data []byte, processLogMessage func(int64, []logstorage.Field)) (int, error) {
|
|
p := parserPool.Get()
|
|
defer parserPool.Put(p)
|
|
v, err := p.ParseBytes(data)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("cannot parse JSON request body: %w", err)
|
|
}
|
|
records, err := v.Array()
|
|
if err != nil {
|
|
return 0, fmt.Errorf("cannot extract array from parsed JSON: %w", err)
|
|
}
|
|
|
|
var fields []logstorage.Field
|
|
for m, r := range records {
|
|
o, err := r.Object()
|
|
if err != nil {
|
|
return m + 1, fmt.Errorf("could not extract log record: %w", err)
|
|
}
|
|
o.Visit(func(k []byte, v *fastjson.Value) {
|
|
if err != nil {
|
|
return
|
|
}
|
|
val, e := v.StringBytes()
|
|
if e != nil {
|
|
err = fmt.Errorf("unexpected label value type for %q:%q; want string", k, v)
|
|
return
|
|
}
|
|
switch string(k) {
|
|
case "message":
|
|
fields = append(fields, logstorage.Field{
|
|
Name: "_msg",
|
|
Value: bytesutil.ToUnsafeString(val),
|
|
})
|
|
case "ddtags":
|
|
// https://docs.datadoghq.com/getting_started/tagging/
|
|
var pair []byte
|
|
idx := 0
|
|
for idx >= 0 {
|
|
idx = bytes.IndexByte(val, ',')
|
|
if idx < 0 {
|
|
pair = val
|
|
} else {
|
|
pair = val[:idx]
|
|
val = val[idx+1:]
|
|
}
|
|
if len(pair) > 0 {
|
|
n := bytes.IndexByte(pair, ':')
|
|
if n < 0 {
|
|
// No tag value.
|
|
fields = append(fields, logstorage.Field{
|
|
Name: bytesutil.ToUnsafeString(pair),
|
|
Value: "no_label_value",
|
|
})
|
|
}
|
|
fields = append(fields, logstorage.Field{
|
|
Name: bytesutil.ToUnsafeString(pair[:n]),
|
|
Value: bytesutil.ToUnsafeString(pair[n+1:]),
|
|
})
|
|
}
|
|
}
|
|
default:
|
|
fields = append(fields, logstorage.Field{
|
|
Name: bytesutil.ToUnsafeString(k),
|
|
Value: bytesutil.ToUnsafeString(val),
|
|
})
|
|
}
|
|
})
|
|
processLogMessage(ts, fields)
|
|
fields = fields[:0]
|
|
}
|
|
return len(records), nil
|
|
}
|