diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index a04f69eca..099f8e762 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +* BUGFIX: return the proper surrounding logs for [`stream_context` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stream_context-pipe) when additional [pipes](https://docs.victoriametrics.com/victorialogs/logsql/#pipes) are put after the `stream_context` pipe. This has been broken in [v0.26.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.26.0-victorialogs). + ## [v0.26.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.26.0-victorialogs) Released at 2024-07-01 diff --git a/lib/logstorage/pipe_stream_context.go b/lib/logstorage/pipe_stream_context.go index f5e9cb94d..51add0f3a 100644 --- a/lib/logstorage/pipe_stream_context.go +++ b/lib/logstorage/pipe_stream_context.go @@ -364,6 +364,10 @@ func (wctx *pipeStreamContextWriteContext) writeStreamContextRows(streamID strin Name: "_stream_id", Value: streamID, }, + { + Name: "_stream", + Value: getFieldValue(r.fields, "_stream"), + }, { Name: "_msg", Value: "---", @@ -402,20 +406,30 @@ func getStreamContextRowIdx(rows []streamContextRow, r *streamContextRow) int { if n == len(rows) { return -1 } - if rows[n].timestamp != r.timestamp { - return -1 + + equalFields := func(fields []Field) bool { + for _, f := range r.fields { + if f.Value != getFieldValue(fields, f.Name) { + return false + } + } + return true } - for rows[n].timestamp == r.timestamp && !equalFields(rows[n].fields, r.fields) { + + for rows[n].timestamp == r.timestamp && !equalFields(rows[n].fields) { n++ if n >= len(rows) { return -1 } } + if rows[n].timestamp != r.timestamp { + return -1 + } return n } func sortStreamContextRows(rows []streamContextRow) { - sort.SliceStable(rows, func(i, j int) bool { + sort.Slice(rows, func(i, j int) bool { return rows[i].timestamp < rows[j].timestamp }) } diff --git a/lib/logstorage/rows.go b/lib/logstorage/rows.go index f17b65c6f..8b444a768 100644 --- a/lib/logstorage/rows.go +++ b/lib/logstorage/rows.go @@ -79,20 +79,13 @@ func (f *Field) marshalToLogfmt(dst []byte) []byte { return dst } -func equalFields(a, b []Field) bool { - if len(a) != len(b) { - return false - } - for i, x := range a { - y := b[i] - if x.Name != y.Name { - return false - } - if x.Value != y.Value { - return false +func getFieldValue(fields []Field, name string) string { + for _, f := range fields { + if f.Name == name { + return f.Value } } - return true + return "" } func needLogfmtQuoting(s string) bool { diff --git a/lib/logstorage/storage_search_test.go b/lib/logstorage/storage_search_test.go index 352d7fe26..be39d2d05 100644 --- a/lib/logstorage/storage_search_test.go +++ b/lib/logstorage/storage_search_test.go @@ -707,7 +707,7 @@ func TestStorageRunQuery(t *testing.T) { | stream_context before 1000 | stats count() rows`, [][]Field{ { - {"rows", "858"}, + {"rows", "825"}, }, }) }) @@ -716,7 +716,7 @@ func TestStorageRunQuery(t *testing.T) { | stream_context after 1000 | stats count() rows`, [][]Field{ { - {"rows", "462"}, + {"rows", "495"}, }, }) })