app/vlinsert: support extracting default stream fields from request body for Loki and OpenTelemetry

This commit is contained in:
Andrii Chubatiuk 2024-11-16 12:59:23 +02:00
parent a335ed23c7
commit 342af8db1b
No known key found for this signature in database
GPG key ID: 96D776CC99880667
13 changed files with 85 additions and 14 deletions

View file

@ -144,6 +144,9 @@ type LogMessageProcessor interface {
// MustClose() must flush all the remaining fields and free up resources occupied by LogMessageProcessor. // MustClose() must flush all the remaining fields and free up resources occupied by LogMessageProcessor.
MustClose() MustClose()
// UpdateStreamFields reinits LogMessageProcessor with new stream fields
UpdateStreamFields(streamFields []logstorage.Field)
} }
type logMessageProcessor struct { type logMessageProcessor struct {
@ -214,6 +217,17 @@ func (lmp *logMessageProcessor) flushLocked() {
lmp.lr.ResetKeepSettings() lmp.lr.ResetKeepSettings()
} }
// flushResetStreamFields flushes rows and updates stream fields
func (lmp *logMessageProcessor) flushResetStreamFields(streamFields []logstorage.Field) {
if !lmp.lr.StreamFieldsChanged(streamFields) {
return
}
lmp.lastFlushTime = time.Now()
vlstorage.MustAddRows(lmp.lr)
lmp.lr.ResetStreamFields(streamFields)
}
// MustClose flushes the remaining data to the underlying storage and closes lmp. // MustClose flushes the remaining data to the underlying storage and closes lmp.
func (lmp *logMessageProcessor) MustClose() { func (lmp *logMessageProcessor) MustClose() {
close(lmp.stopCh) close(lmp.stopCh)
@ -224,6 +238,11 @@ func (lmp *logMessageProcessor) MustClose() {
lmp.lr = nil lmp.lr = nil
} }
// UpdateStreamFields reinits LogMessageProcessor with new stream fields
func (lmp *logMessageProcessor) UpdateStreamFields(streamFields []logstorage.Field) {
lmp.flushResetStreamFields(streamFields)
}
// NewLogMessageProcessor returns new LogMessageProcessor for the given cp. // NewLogMessageProcessor returns new LogMessageProcessor for the given cp.
// //
// MustClose() must be called on the returned LogMessageProcessor when it is no longer needed. // MustClose() must be called on the returned LogMessageProcessor when it is no longer needed.

View file

@ -24,6 +24,10 @@ func (tlp *TestLogMessageProcessor) AddRow(timestamp int64, fields []logstorage.
func (tlp *TestLogMessageProcessor) MustClose() { func (tlp *TestLogMessageProcessor) MustClose() {
} }
// UpdateStreamFields updates streamFields in tlp.
func (tlp *TestLogMessageProcessor) UpdateStreamFields(_ []logstorage.Field) {
}
// Verify verifies the number of rows, timestamps and results after AddRow calls. // Verify verifies the number of rows, timestamps and results after AddRow calls.
func (tlp *TestLogMessageProcessor) Verify(rowsExpected int, timestampsExpected []int64, resultExpected string) error { func (tlp *TestLogMessageProcessor) Verify(rowsExpected int, timestampsExpected []int64, resultExpected string) error {
result := strings.Join(tlp.rows, "\n") result := strings.Join(tlp.rows, "\n")
@ -51,3 +55,7 @@ func (blp *BenchmarkLogMessageProcessor) AddRow(_ int64, _ []logstorage.Field) {
// MustClose implements LogMessageProcessor interface. // MustClose implements LogMessageProcessor interface.
func (blp *BenchmarkLogMessageProcessor) MustClose() { func (blp *BenchmarkLogMessageProcessor) MustClose() {
} }
// UpdateStreamFields implements LogMessageProcessor interface.
func (blp *BenchmarkLogMessageProcessor) UpdateStreamFields(_ []logstorage.Field) {
}

View file

@ -54,7 +54,8 @@ func handleJSON(r *http.Request, w http.ResponseWriter) {
return return
} }
lmp := cp.NewLogMessageProcessor() lmp := cp.NewLogMessageProcessor()
n, err := parseJSONRequest(data, lmp) useDefaultStreamFields := len(cp.StreamFields) == 0
n, err := parseJSONRequest(data, lmp, useDefaultStreamFields)
lmp.MustClose() lmp.MustClose()
if err != nil { if err != nil {
httpserver.Errorf(w, r, "cannot parse Loki json request: %s; data=%s", err, data) httpserver.Errorf(w, r, "cannot parse Loki json request: %s; data=%s", err, data)
@ -75,7 +76,7 @@ var (
requestJSONDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="json"}`) requestJSONDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="json"}`)
) )
func parseJSONRequest(data []byte, lmp insertutils.LogMessageProcessor) (int, error) { func parseJSONRequest(data []byte, lmp insertutils.LogMessageProcessor, useDefaultStreamFields bool) (int, error) {
p := parserPool.Get() p := parserPool.Get()
defer parserPool.Put(p) defer parserPool.Put(p)
v, err := p.ParseBytes(data) v, err := p.ParseBytes(data)
@ -122,6 +123,10 @@ func parseJSONRequest(data []byte, lmp insertutils.LogMessageProcessor) (int, er
return rowsIngested, fmt.Errorf("error when parsing `stream` object: %w", err) return rowsIngested, fmt.Errorf("error when parsing `stream` object: %w", err)
} }
if useDefaultStreamFields {
lmp.UpdateStreamFields(commonFields)
}
// populate messages from `values` array // populate messages from `values` array
linesV := stream.Get("values") linesV := stream.Get("values")
if linesV == nil { if linesV == nil {

View file

@ -11,7 +11,7 @@ func TestParseJSONRequest_Failure(t *testing.T) {
t.Helper() t.Helper()
tlp := &insertutils.TestLogMessageProcessor{} tlp := &insertutils.TestLogMessageProcessor{}
n, err := parseJSONRequest([]byte(s), tlp) n, err := parseJSONRequest([]byte(s), tlp, false)
if err == nil { if err == nil {
t.Fatalf("expecting non-nil error") t.Fatalf("expecting non-nil error")
} }
@ -66,7 +66,7 @@ func TestParseJSONRequest_Success(t *testing.T) {
tlp := &insertutils.TestLogMessageProcessor{} tlp := &insertutils.TestLogMessageProcessor{}
n, err := parseJSONRequest([]byte(s), tlp) n, err := parseJSONRequest([]byte(s), tlp, false)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %s", err) t.Fatalf("unexpected error: %s", err)
} }

View file

@ -28,7 +28,7 @@ func benchmarkParseJSONRequest(b *testing.B, streams, rows, labels int) {
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
data := getJSONBody(streams, rows, labels) data := getJSONBody(streams, rows, labels)
for pb.Next() { for pb.Next() {
_, err := parseJSONRequest(data, blp) _, err := parseJSONRequest(data, blp, false)
if err != nil { if err != nil {
panic(fmt.Errorf("unexpected error: %w", err)) panic(fmt.Errorf("unexpected error: %w", err))
} }

View file

@ -45,7 +45,8 @@ func handleProtobuf(r *http.Request, w http.ResponseWriter) {
return return
} }
lmp := cp.NewLogMessageProcessor() lmp := cp.NewLogMessageProcessor()
n, err := parseProtobufRequest(data, lmp) useDefaultStreamFields := len(cp.StreamFields) == 0
n, err := parseProtobufRequest(data, lmp, useDefaultStreamFields)
lmp.MustClose() lmp.MustClose()
if err != nil { if err != nil {
httpserver.Errorf(w, r, "cannot parse Loki protobuf request: %s", err) httpserver.Errorf(w, r, "cannot parse Loki protobuf request: %s", err)
@ -66,7 +67,7 @@ var (
requestProtobufDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="protobuf"}`) requestProtobufDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="protobuf"}`)
) )
func parseProtobufRequest(data []byte, lmp insertutils.LogMessageProcessor) (int, error) { func parseProtobufRequest(data []byte, lmp insertutils.LogMessageProcessor, useDefaultStreamFields bool) (int, error) {
bb := bytesBufPool.Get() bb := bytesBufPool.Get()
defer bytesBufPool.Put(bb) defer bytesBufPool.Put(bb)
@ -99,6 +100,9 @@ func parseProtobufRequest(data []byte, lmp insertutils.LogMessageProcessor) (int
return rowsIngested, fmt.Errorf("cannot parse stream labels %q: %w", stream.Labels, err) return rowsIngested, fmt.Errorf("cannot parse stream labels %q: %w", stream.Labels, err)
} }
commonFieldsLen := len(fields.fields) commonFieldsLen := len(fields.fields)
if useDefaultStreamFields {
lmp.UpdateStreamFields(fields.fields)
}
entries := stream.Entries entries := stream.Entries
for j := range entries { for j := range entries {

View file

@ -45,12 +45,15 @@ func (tlp *testLogMessageProcessor) AddRow(timestamp int64, fields []logstorage.
func (tlp *testLogMessageProcessor) MustClose() { func (tlp *testLogMessageProcessor) MustClose() {
} }
func (tlp *testLogMessageProcessor) UpdateStreamFields(_ []logstorage.Field) {
}
func TestParseProtobufRequest_Success(t *testing.T) { func TestParseProtobufRequest_Success(t *testing.T) {
f := func(s string, timestampsExpected []int64, resultExpected string) { f := func(s string, timestampsExpected []int64, resultExpected string) {
t.Helper() t.Helper()
tlp := &testLogMessageProcessor{} tlp := &testLogMessageProcessor{}
n, err := parseJSONRequest([]byte(s), tlp) n, err := parseJSONRequest([]byte(s), tlp, false)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %s", err) t.Fatalf("unexpected error: %s", err)
} }
@ -62,7 +65,7 @@ func TestParseProtobufRequest_Success(t *testing.T) {
encodedData := snappy.Encode(nil, data) encodedData := snappy.Encode(nil, data)
tlp2 := &insertutils.TestLogMessageProcessor{} tlp2 := &insertutils.TestLogMessageProcessor{}
n, err = parseProtobufRequest(encodedData, tlp2) n, err = parseProtobufRequest(encodedData, tlp2, false)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %s", err) t.Fatalf("unexpected error: %s", err)
} }

View file

@ -31,7 +31,7 @@ func benchmarkParseProtobufRequest(b *testing.B, streams, rows, labels int) {
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
body := getProtobufBody(streams, rows, labels) body := getProtobufBody(streams, rows, labels)
for pb.Next() { for pb.Next() {
_, err := parseProtobufRequest(body, blp) _, err := parseProtobufRequest(body, blp, false)
if err != nil { if err != nil {
panic(fmt.Errorf("unexpected error: %w", err)) panic(fmt.Errorf("unexpected error: %w", err))
} }

View file

@ -67,7 +67,8 @@ func handleProtobuf(r *http.Request, w http.ResponseWriter) {
} }
lmp := cp.NewLogMessageProcessor() lmp := cp.NewLogMessageProcessor()
n, err := pushProtobufRequest(data, lmp) useDefaultStreamFields := len(cp.StreamFields) == 0
n, err := pushProtobufRequest(data, lmp, useDefaultStreamFields)
lmp.MustClose() lmp.MustClose()
if err != nil { if err != nil {
httpserver.Errorf(w, r, "cannot parse OpenTelemetry protobuf request: %s", err) httpserver.Errorf(w, r, "cannot parse OpenTelemetry protobuf request: %s", err)
@ -91,7 +92,7 @@ var (
requestProtobufDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/opentelemetry/v1/logs",format="protobuf"}`) requestProtobufDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/opentelemetry/v1/logs",format="protobuf"}`)
) )
func pushProtobufRequest(data []byte, lmp insertutils.LogMessageProcessor) (int, error) { func pushProtobufRequest(data []byte, lmp insertutils.LogMessageProcessor, useDefaultStreamFields bool) (int, error) {
var req pb.ExportLogsServiceRequest var req pb.ExportLogsServiceRequest
if err := req.UnmarshalProtobuf(data); err != nil { if err := req.UnmarshalProtobuf(data); err != nil {
errorsTotal.Inc() errorsTotal.Inc()
@ -108,6 +109,9 @@ func pushProtobufRequest(data []byte, lmp insertutils.LogMessageProcessor) (int,
commonFields[i].Value = attr.Value.FormatString() commonFields[i].Value = attr.Value.FormatString()
} }
commonFieldsLen := len(commonFields) commonFieldsLen := len(commonFields)
if useDefaultStreamFields {
lmp.UpdateStreamFields(commonFields)
}
for _, sc := range rl.ScopeLogs { for _, sc := range rl.ScopeLogs {
var scopeIngested int var scopeIngested int
commonFields, scopeIngested = pushFieldsFromScopeLogs(&sc, commonFields[:commonFieldsLen], lmp) commonFields, scopeIngested = pushFieldsFromScopeLogs(&sc, commonFields[:commonFieldsLen], lmp)

View file

@ -16,7 +16,7 @@ func TestPushProtoOk(t *testing.T) {
pData := lr.MarshalProtobuf(nil) pData := lr.MarshalProtobuf(nil)
tlp := &insertutils.TestLogMessageProcessor{} tlp := &insertutils.TestLogMessageProcessor{}
n, err := pushProtobufRequest(pData, tlp) n, err := pushProtobufRequest(pData, tlp, false)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %s", err) t.Fatalf("unexpected error: %s", err)
} }

View file

@ -27,7 +27,7 @@ func benchmarkParseProtobufRequest(b *testing.B, streams, rows, labels int) {
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
body := getProtobufBody(streams, rows, labels) body := getProtobufBody(streams, rows, labels)
for pb.Next() { for pb.Next() {
_, err := pushProtobufRequest(body, blp) _, err := pushProtobufRequest(body, blp, false)
if err != nil { if err != nil {
panic(fmt.Errorf("unexpected error: %w", err)) panic(fmt.Errorf("unexpected error: %w", err))
} }

View file

@ -15,6 +15,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip ## tip
* FEATURE: [vlinsert](https://docs.victoriametrics.com/victorialogs/): use Loki message stream fields and Opentelemetry common attributes as VictoriaLogs stream fields if none were passed via HTTP headers or query args. This is useful for agents, which do not support setting custom headers or query args.
## [v1.0.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.0.0-victorialogs) ## [v1.0.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.0.0-victorialogs)
Released at 2024-11-12 Released at 2024-11-12

View file

@ -126,6 +126,32 @@ func (lr *LogRows) ResetKeepSettings() {
lr.sf = nil lr.sf = nil
} }
// StreamFieldsChanged checks if passed stream fields differ from lr.streamFields
func (lr *LogRows) StreamFieldsChanged(streamFields []Field) bool {
sfs := lr.streamFields
if len(sfs) != len(streamFields) {
return true
}
for _, f := range streamFields {
if _, ok := sfs[f.Name]; !ok {
return true
}
}
return false
}
// ResetStreamFields same as ResetKeepSettings, but additionally updates lr.streamFields
func (lr *LogRows) ResetStreamFields(streamFields []Field) {
lr.ResetKeepSettings()
sfs := lr.streamFields
for k := range sfs {
delete(sfs, k)
}
for _, f := range streamFields {
sfs[f.Name] = struct{}{}
}
}
// NeedFlush returns true if lr contains too much data, so it must be flushed to the storage. // NeedFlush returns true if lr contains too much data, so it must be flushed to the storage.
func (lr *LogRows) NeedFlush() bool { func (lr *LogRows) NeedFlush() bool {
return len(lr.a.b) > (maxUncompressedBlockSize/8)*7 return len(lr.a.b) > (maxUncompressedBlockSize/8)*7