From dc291d898084fa680a3ef0291fdab4a13df9b3cb Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 1 Jul 2024 00:49:31 +0200 Subject: [PATCH] lib/logstorage: add delimiter between log chunks returned from `| stream_context` pipe --- docs/VictoriaLogs/CHANGELOG.md | 5 ++- docs/VictoriaLogs/LogsQL.md | 5 ++- lib/logstorage/pipe_stream_context.go | 54 ++++++++++++++++++--------- lib/logstorage/rows.go | 16 ++++++++ lib/logstorage/storage_search_test.go | 4 +- 5 files changed, 63 insertions(+), 21 deletions(-) diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index d58a57ec7..9966b3020 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,11 +19,14 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +* FEATURE: add delimiter log with `---` message between log chunks returned by [`stream_context` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stream_context-pipe). This should simplify investigation of the returned logs. +* FEATURE: reduce memory usage when big number of context logs are requested from [`stream_context` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stream_context-pipe). + ## [v0.25.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.25.0-victorialogs) Released at 2024-06-28 -* FEATURE: add ability to select surrounding logs in front and after the selected logs via [`stream_context` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stream_context-pipe). This functionality may be useful for investigating stacktraces, panics or some correlated log messages. +* FEATURE: add ability to select surrounding logs in front and after the selected logs via [`stream_context` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stream_context-pipe). This functionality may be useful for investigating stacktraces, panics or some correlated log messages. This functionality is similar to `grep -A` and `grep -B`. * FEATURE: add ability to return top `N` `"fields"` groups from [`/select/logsql/hits` HTTP endpoint](https://docs.victoriametrics.com/victorialogs/querying/#querying-hits-stats), by specifying `fields_limit=N` query arg. This query arg is going to be used in [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6545). * BUGFIX: fix `runtime error: index out of range [0] with length 0` panic when empty lines are ingested via [Syslog format](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/) by Cisco controllers. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6548). diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index afdf16c64..b6e73cfd0 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -2304,7 +2304,10 @@ _time:5m | stats ### stream_context pipe -`| stream_context ...` [pipe](#pipes) allows selecting surrounding logs for the matching logs in [logs stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields). +`| stream_context ...` [pipe](#pipes) allows selecting surrounding logs for the matching logs in [logs stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) +in the way similar to `grep -A` / `grep -B`. The returned log chunks are delimited with `---` [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) +for easier investigation. + For example, the following query returns up to 10 additional logs after every log message with the `panic` [word](#word) across all the logs for the last 5 minutes: ```logsql diff --git a/lib/logstorage/pipe_stream_context.go b/lib/logstorage/pipe_stream_context.go index 42748b798..f5e9cb94d 100644 --- a/lib/logstorage/pipe_stream_context.go +++ b/lib/logstorage/pipe_stream_context.go @@ -325,13 +325,9 @@ func (pcp *pipeStreamContextProcessor) flush() error { if needStop(pcp.stopCh) { return nil } - resultRows, err := getStreamContextRows(streamRows, rows, pcp.pc.linesBefore, pcp.pc.linesAfter) - if err != nil { + if err := wctx.writeStreamContextRows(streamID, streamRows, rows, pcp.pc.linesBefore, pcp.pc.linesAfter); err != nil { return fmt.Errorf("cannot obtain context rows for _stream_id=%q: %w", streamID, err) } - for _, rowFields := range resultRows { - wctx.writeRow(rowFields) - } } wctx.flush() @@ -339,37 +335,55 @@ func (pcp *pipeStreamContextProcessor) flush() error { return nil } -func getStreamContextRows(streamRows, rows []streamContextRow, linesBefore, linesAfter int) ([][]Field, error) { +func (wctx *pipeStreamContextWriteContext) writeStreamContextRows(streamID string, streamRows, rows []streamContextRow, linesBefore, linesAfter int) error { sortStreamContextRows(streamRows) sortStreamContextRows(rows) - var resultRows [][]Field idxNext := 0 - for _, r := range rows { - idx := getStreamContextRowIdx(streamRows, r.timestamp) + for i := range rows { + r := &rows[i] + idx := getStreamContextRowIdx(streamRows, r) if idx < 0 { // This error may happen when streamRows became out of sync with rows. // For example, when some streamRows were deleted after obtaining rows. - return nil, fmt.Errorf("missing row for timestamp=%d; len(streamRows)=%d, len(rows)=%d", r.timestamp, len(streamRows), len(rows)) + return fmt.Errorf("missing row for timestamp=%d; len(streamRows)=%d, len(rows)=%d; re-execute the query", r.timestamp, len(streamRows), len(rows)) } idxStart := idx - linesBefore if idxStart < idxNext { idxStart = idxNext + } else if idxNext > 0 && idxStart > idxNext { + // Write delimiter row between multiple contexts in the same stream. + // This simplifies investigation of the returned logs. + fields := []Field{ + { + Name: "_time", + Value: string(marshalTimestampRFC3339NanoString(nil, r.timestamp+1)), + }, + { + Name: "_stream_id", + Value: streamID, + }, + { + Name: "_msg", + Value: "---", + }, + } + wctx.writeRow(fields) } for idxStart < idx { - resultRows = append(resultRows, streamRows[idxStart].fields) + wctx.writeRow(streamRows[idxStart].fields) idxStart++ } if idx >= idxNext { - resultRows = append(resultRows, streamRows[idx].fields) + wctx.writeRow(streamRows[idx].fields) idxNext = idx + 1 } idxEnd := idx + 1 + linesAfter for idxNext < idxEnd && idxNext < len(streamRows) { - resultRows = append(resultRows, streamRows[idxNext].fields) + wctx.writeRow(streamRows[idxNext].fields) idxNext++ } @@ -378,19 +392,25 @@ func getStreamContextRows(streamRows, rows []streamContextRow, linesBefore, line } } - return resultRows, nil + return nil } -func getStreamContextRowIdx(rows []streamContextRow, timestamp int64) int { +func getStreamContextRowIdx(rows []streamContextRow, r *streamContextRow) int { n := sort.Search(len(rows), func(i int) bool { - return rows[i].timestamp >= timestamp + return rows[i].timestamp >= r.timestamp }) if n == len(rows) { return -1 } - if rows[n].timestamp != timestamp { + if rows[n].timestamp != r.timestamp { return -1 } + for rows[n].timestamp == r.timestamp && !equalFields(rows[n].fields, r.fields) { + n++ + if n >= len(rows) { + return -1 + } + } return n } diff --git a/lib/logstorage/rows.go b/lib/logstorage/rows.go index c165641a3..f17b65c6f 100644 --- a/lib/logstorage/rows.go +++ b/lib/logstorage/rows.go @@ -79,6 +79,22 @@ func (f *Field) marshalToLogfmt(dst []byte) []byte { return dst } +func equalFields(a, b []Field) bool { + if len(a) != len(b) { + return false + } + for i, x := range a { + y := b[i] + if x.Name != y.Name { + return false + } + if x.Value != y.Value { + return false + } + } + return true +} + func needLogfmtQuoting(s string) bool { for _, c := range s { if !isTokenRune(c) { diff --git a/lib/logstorage/storage_search_test.go b/lib/logstorage/storage_search_test.go index be39d2d05..352d7fe26 100644 --- a/lib/logstorage/storage_search_test.go +++ b/lib/logstorage/storage_search_test.go @@ -707,7 +707,7 @@ func TestStorageRunQuery(t *testing.T) { | stream_context before 1000 | stats count() rows`, [][]Field{ { - {"rows", "825"}, + {"rows", "858"}, }, }) }) @@ -716,7 +716,7 @@ func TestStorageRunQuery(t *testing.T) { | stream_context after 1000 | stats count() rows`, [][]Field{ { - {"rows", "495"}, + {"rows", "462"}, }, }) })