diff --git a/app/vlinsert/insertutils/common_params.go b/app/vlinsert/insertutils/common_params.go index c357d3f87..cd21fd37d 100644 --- a/app/vlinsert/insertutils/common_params.go +++ b/app/vlinsert/insertutils/common_params.go @@ -144,6 +144,9 @@ type LogMessageProcessor interface { // MustClose() must flush all the remaining fields and free up resources occupied by LogMessageProcessor. MustClose() + + // UpdateStreamFields reinits LogMessageProcessor with new stream fields + UpdateStreamFields(streamFields []logstorage.Field) } type logMessageProcessor struct { @@ -214,6 +217,17 @@ func (lmp *logMessageProcessor) flushLocked() { lmp.lr.ResetKeepSettings() } +// flushResetStreamFields flushes rows and updates stream fields +func (lmp *logMessageProcessor) flushResetStreamFields(streamFields []logstorage.Field) { + if !lmp.lr.StreamFieldsChanged(streamFields) { + return + } + + lmp.lastFlushTime = time.Now() + vlstorage.MustAddRows(lmp.lr) + lmp.lr.ResetStreamFields(streamFields) +} + // MustClose flushes the remaining data to the underlying storage and closes lmp. func (lmp *logMessageProcessor) MustClose() { close(lmp.stopCh) @@ -224,6 +238,11 @@ func (lmp *logMessageProcessor) MustClose() { lmp.lr = nil } +// UpdateStreamFields reinits LogMessageProcessor with new stream fields +func (lmp *logMessageProcessor) UpdateStreamFields(streamFields []logstorage.Field) { + lmp.flushResetStreamFields(streamFields) +} + // NewLogMessageProcessor returns new LogMessageProcessor for the given cp. // // MustClose() must be called on the returned LogMessageProcessor when it is no longer needed. diff --git a/app/vlinsert/insertutils/testutils.go b/app/vlinsert/insertutils/testutils.go index 229b2c5bb..51c6e4203 100644 --- a/app/vlinsert/insertutils/testutils.go +++ b/app/vlinsert/insertutils/testutils.go @@ -24,6 +24,10 @@ func (tlp *TestLogMessageProcessor) AddRow(timestamp int64, fields []logstorage. func (tlp *TestLogMessageProcessor) MustClose() { } +// UpdateStreamFields updates streamFields in tlp. +func (tlp *TestLogMessageProcessor) UpdateStreamFields(_ []logstorage.Field) { +} + // Verify verifies the number of rows, timestamps and results after AddRow calls. func (tlp *TestLogMessageProcessor) Verify(rowsExpected int, timestampsExpected []int64, resultExpected string) error { result := strings.Join(tlp.rows, "\n") @@ -51,3 +55,7 @@ func (blp *BenchmarkLogMessageProcessor) AddRow(_ int64, _ []logstorage.Field) { // MustClose implements LogMessageProcessor interface. func (blp *BenchmarkLogMessageProcessor) MustClose() { } + +// UpdateStreamFields implements LogMessageProcessor interface. +func (blp *BenchmarkLogMessageProcessor) UpdateStreamFields(_ []logstorage.Field) { +} diff --git a/app/vlinsert/loki/loki_json.go b/app/vlinsert/loki/loki_json.go index 768691c11..173ca5631 100644 --- a/app/vlinsert/loki/loki_json.go +++ b/app/vlinsert/loki/loki_json.go @@ -54,7 +54,8 @@ func handleJSON(r *http.Request, w http.ResponseWriter) { return } lmp := cp.NewLogMessageProcessor() - n, err := parseJSONRequest(data, lmp) + useDefaultStreamFields := len(cp.StreamFields) == 0 + n, err := parseJSONRequest(data, lmp, useDefaultStreamFields) lmp.MustClose() if err != nil { httpserver.Errorf(w, r, "cannot parse Loki json request: %s; data=%s", err, data) @@ -75,7 +76,7 @@ var ( requestJSONDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="json"}`) ) -func parseJSONRequest(data []byte, lmp insertutils.LogMessageProcessor) (int, error) { +func parseJSONRequest(data []byte, lmp insertutils.LogMessageProcessor, useDefaultStreamFields bool) (int, error) { p := parserPool.Get() defer parserPool.Put(p) v, err := p.ParseBytes(data) @@ -122,6 +123,10 @@ func parseJSONRequest(data []byte, lmp insertutils.LogMessageProcessor) (int, er return rowsIngested, fmt.Errorf("error when parsing `stream` object: %w", err) } + if useDefaultStreamFields { + lmp.UpdateStreamFields(commonFields) + } + // populate messages from `values` array linesV := stream.Get("values") if linesV == nil { diff --git a/app/vlinsert/loki/loki_json_test.go b/app/vlinsert/loki/loki_json_test.go index 304186f5f..59c190581 100644 --- a/app/vlinsert/loki/loki_json_test.go +++ b/app/vlinsert/loki/loki_json_test.go @@ -11,7 +11,7 @@ func TestParseJSONRequest_Failure(t *testing.T) { t.Helper() tlp := &insertutils.TestLogMessageProcessor{} - n, err := parseJSONRequest([]byte(s), tlp) + n, err := parseJSONRequest([]byte(s), tlp, false) if err == nil { t.Fatalf("expecting non-nil error") } @@ -66,7 +66,7 @@ func TestParseJSONRequest_Success(t *testing.T) { tlp := &insertutils.TestLogMessageProcessor{} - n, err := parseJSONRequest([]byte(s), tlp) + n, err := parseJSONRequest([]byte(s), tlp, false) if err != nil { t.Fatalf("unexpected error: %s", err) } diff --git a/app/vlinsert/loki/loki_json_timing_test.go b/app/vlinsert/loki/loki_json_timing_test.go index d92006749..2691f09d3 100644 --- a/app/vlinsert/loki/loki_json_timing_test.go +++ b/app/vlinsert/loki/loki_json_timing_test.go @@ -28,7 +28,7 @@ func benchmarkParseJSONRequest(b *testing.B, streams, rows, labels int) { b.RunParallel(func(pb *testing.PB) { data := getJSONBody(streams, rows, labels) for pb.Next() { - _, err := parseJSONRequest(data, blp) + _, err := parseJSONRequest(data, blp, false) if err != nil { panic(fmt.Errorf("unexpected error: %w", err)) } diff --git a/app/vlinsert/loki/loki_protobuf.go b/app/vlinsert/loki/loki_protobuf.go index 2c1ac6b39..b3025affe 100644 --- a/app/vlinsert/loki/loki_protobuf.go +++ b/app/vlinsert/loki/loki_protobuf.go @@ -45,7 +45,8 @@ func handleProtobuf(r *http.Request, w http.ResponseWriter) { return } lmp := cp.NewLogMessageProcessor() - n, err := parseProtobufRequest(data, lmp) + useDefaultStreamFields := len(cp.StreamFields) == 0 + n, err := parseProtobufRequest(data, lmp, useDefaultStreamFields) lmp.MustClose() if err != nil { httpserver.Errorf(w, r, "cannot parse Loki protobuf request: %s", err) @@ -66,7 +67,7 @@ var ( requestProtobufDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="protobuf"}`) ) -func parseProtobufRequest(data []byte, lmp insertutils.LogMessageProcessor) (int, error) { +func parseProtobufRequest(data []byte, lmp insertutils.LogMessageProcessor, useDefaultStreamFields bool) (int, error) { bb := bytesBufPool.Get() defer bytesBufPool.Put(bb) @@ -99,6 +100,9 @@ func parseProtobufRequest(data []byte, lmp insertutils.LogMessageProcessor) (int return rowsIngested, fmt.Errorf("cannot parse stream labels %q: %w", stream.Labels, err) } commonFieldsLen := len(fields.fields) + if useDefaultStreamFields { + lmp.UpdateStreamFields(fields.fields) + } entries := stream.Entries for j := range entries { diff --git a/app/vlinsert/loki/loki_protobuf_test.go b/app/vlinsert/loki/loki_protobuf_test.go index 14b3452e6..27c3850da 100644 --- a/app/vlinsert/loki/loki_protobuf_test.go +++ b/app/vlinsert/loki/loki_protobuf_test.go @@ -45,12 +45,15 @@ func (tlp *testLogMessageProcessor) AddRow(timestamp int64, fields []logstorage. func (tlp *testLogMessageProcessor) MustClose() { } +func (tlp *testLogMessageProcessor) UpdateStreamFields(_ []logstorage.Field) { +} + func TestParseProtobufRequest_Success(t *testing.T) { f := func(s string, timestampsExpected []int64, resultExpected string) { t.Helper() tlp := &testLogMessageProcessor{} - n, err := parseJSONRequest([]byte(s), tlp) + n, err := parseJSONRequest([]byte(s), tlp, false) if err != nil { t.Fatalf("unexpected error: %s", err) } @@ -62,7 +65,7 @@ func TestParseProtobufRequest_Success(t *testing.T) { encodedData := snappy.Encode(nil, data) tlp2 := &insertutils.TestLogMessageProcessor{} - n, err = parseProtobufRequest(encodedData, tlp2) + n, err = parseProtobufRequest(encodedData, tlp2, false) if err != nil { t.Fatalf("unexpected error: %s", err) } diff --git a/app/vlinsert/loki/loki_protobuf_timing_test.go b/app/vlinsert/loki/loki_protobuf_timing_test.go index 1143c33c4..eff011c87 100644 --- a/app/vlinsert/loki/loki_protobuf_timing_test.go +++ b/app/vlinsert/loki/loki_protobuf_timing_test.go @@ -31,7 +31,7 @@ func benchmarkParseProtobufRequest(b *testing.B, streams, rows, labels int) { b.RunParallel(func(pb *testing.PB) { body := getProtobufBody(streams, rows, labels) for pb.Next() { - _, err := parseProtobufRequest(body, blp) + _, err := parseProtobufRequest(body, blp, false) if err != nil { panic(fmt.Errorf("unexpected error: %w", err)) } diff --git a/app/vlinsert/opentelemetry/opentelemetry.go b/app/vlinsert/opentelemetry/opentelemetry.go index b300500ca..f32ad3214 100644 --- a/app/vlinsert/opentelemetry/opentelemetry.go +++ b/app/vlinsert/opentelemetry/opentelemetry.go @@ -67,7 +67,8 @@ func handleProtobuf(r *http.Request, w http.ResponseWriter) { } lmp := cp.NewLogMessageProcessor() - n, err := pushProtobufRequest(data, lmp) + useDefaultStreamFields := len(cp.StreamFields) == 0 + n, err := pushProtobufRequest(data, lmp, useDefaultStreamFields) lmp.MustClose() if err != nil { httpserver.Errorf(w, r, "cannot parse OpenTelemetry protobuf request: %s", err) @@ -91,7 +92,7 @@ var ( requestProtobufDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/opentelemetry/v1/logs",format="protobuf"}`) ) -func pushProtobufRequest(data []byte, lmp insertutils.LogMessageProcessor) (int, error) { +func pushProtobufRequest(data []byte, lmp insertutils.LogMessageProcessor, useDefaultStreamFields bool) (int, error) { var req pb.ExportLogsServiceRequest if err := req.UnmarshalProtobuf(data); err != nil { errorsTotal.Inc() @@ -108,6 +109,9 @@ func pushProtobufRequest(data []byte, lmp insertutils.LogMessageProcessor) (int, commonFields[i].Value = attr.Value.FormatString() } commonFieldsLen := len(commonFields) + if useDefaultStreamFields { + lmp.UpdateStreamFields(commonFields) + } for _, sc := range rl.ScopeLogs { var scopeIngested int commonFields, scopeIngested = pushFieldsFromScopeLogs(&sc, commonFields[:commonFieldsLen], lmp) diff --git a/app/vlinsert/opentelemetry/opentelemetry_test.go b/app/vlinsert/opentelemetry/opentelemetry_test.go index 254d9f21b..ede04f60a 100644 --- a/app/vlinsert/opentelemetry/opentelemetry_test.go +++ b/app/vlinsert/opentelemetry/opentelemetry_test.go @@ -16,7 +16,7 @@ func TestPushProtoOk(t *testing.T) { pData := lr.MarshalProtobuf(nil) tlp := &insertutils.TestLogMessageProcessor{} - n, err := pushProtobufRequest(pData, tlp) + n, err := pushProtobufRequest(pData, tlp, false) if err != nil { t.Fatalf("unexpected error: %s", err) } diff --git a/app/vlinsert/opentelemetry/opentemetry_timing_test.go b/app/vlinsert/opentelemetry/opentemetry_timing_test.go index c5998cca0..a9722b4d8 100644 --- a/app/vlinsert/opentelemetry/opentemetry_timing_test.go +++ b/app/vlinsert/opentelemetry/opentemetry_timing_test.go @@ -27,7 +27,7 @@ func benchmarkParseProtobufRequest(b *testing.B, streams, rows, labels int) { b.RunParallel(func(pb *testing.PB) { body := getProtobufBody(streams, rows, labels) for pb.Next() { - _, err := pushProtobufRequest(body, blp) + _, err := pushProtobufRequest(body, blp, false) if err != nil { panic(fmt.Errorf("unexpected error: %w", err)) } diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 697e07825..e2fc52656 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -15,6 +15,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +* FEATURE: [vlinsert](https://docs.victoriametrics.com/victorialogs/): use Loki message stream fields and Opentelemetry common attributes as VictoriaLogs stream fields if none were passed via HTTP headers or query args. This is useful for agents, which do not support setting custom headers or query args. + ## [v1.0.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.0.0-victorialogs) Released at 2024-11-12 diff --git a/lib/logstorage/log_rows.go b/lib/logstorage/log_rows.go index 2b642c199..49d29c4bb 100644 --- a/lib/logstorage/log_rows.go +++ b/lib/logstorage/log_rows.go @@ -126,6 +126,32 @@ func (lr *LogRows) ResetKeepSettings() { lr.sf = nil } +// StreamFieldsChanged checks if passed stream fields differ from lr.streamFields +func (lr *LogRows) StreamFieldsChanged(streamFields []Field) bool { + sfs := lr.streamFields + if len(sfs) != len(streamFields) { + return true + } + for _, f := range streamFields { + if _, ok := sfs[f.Name]; !ok { + return true + } + } + return false +} + +// ResetStreamFields same as ResetKeepSettings, but additionally updates lr.streamFields +func (lr *LogRows) ResetStreamFields(streamFields []Field) { + lr.ResetKeepSettings() + sfs := lr.streamFields + for k := range sfs { + delete(sfs, k) + } + for _, f := range streamFields { + sfs[f.Name] = struct{}{} + } +} + // NeedFlush returns true if lr contains too much data, so it must be flushed to the storage. func (lr *LogRows) NeedFlush() bool { return len(lr.a.b) > (maxUncompressedBlockSize/8)*7