lib/logstorage: add delimiter between log chunks returned from | stream_context pipe

This commit is contained in:
Aliaksandr Valialkin 2024-07-01 00:49:31 +02:00
parent c9568d45dc
commit dc291d8980
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
5 changed files with 63 additions and 21 deletions

View file

@ -19,11 +19,14 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip
* FEATURE: add delimiter log with `---` message between log chunks returned by [`stream_context` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stream_context-pipe). This should simplify investigation of the returned logs.
* FEATURE: reduce memory usage when big number of context logs are requested from [`stream_context` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stream_context-pipe).
## [v0.25.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.25.0-victorialogs)
Released at 2024-06-28
* FEATURE: add ability to select surrounding logs in front and after the selected logs via [`stream_context` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stream_context-pipe). This functionality may be useful for investigating stacktraces, panics or some correlated log messages.
* FEATURE: add ability to select surrounding logs in front and after the selected logs via [`stream_context` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stream_context-pipe). This functionality may be useful for investigating stacktraces, panics or some correlated log messages. This functionality is similar to `grep -A` and `grep -B`.
* FEATURE: add ability to return top `N` `"fields"` groups from [`/select/logsql/hits` HTTP endpoint](https://docs.victoriametrics.com/victorialogs/querying/#querying-hits-stats), by specifying `fields_limit=N` query arg. This query arg is going to be used in [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6545).
* BUGFIX: fix `runtime error: index out of range [0] with length 0` panic when empty lines are ingested via [Syslog format](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/) by Cisco controllers. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6548).

View file

@ -2304,7 +2304,10 @@ _time:5m | stats
### stream_context pipe
`| stream_context ...` [pipe](#pipes) allows selecting surrounding logs for the matching logs in [logs stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields).
`| stream_context ...` [pipe](#pipes) allows selecting surrounding logs for the matching logs in [logs stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields)
in the way similar to `grep -A` / `grep -B`. The returned log chunks are delimited with `---` [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field)
for easier investigation.
For example, the following query returns up to 10 additional logs after every log message with the `panic` [word](#word) across all the logs for the last 5 minutes:
```logsql

View file

@ -325,13 +325,9 @@ func (pcp *pipeStreamContextProcessor) flush() error {
if needStop(pcp.stopCh) {
return nil
}
resultRows, err := getStreamContextRows(streamRows, rows, pcp.pc.linesBefore, pcp.pc.linesAfter)
if err != nil {
if err := wctx.writeStreamContextRows(streamID, streamRows, rows, pcp.pc.linesBefore, pcp.pc.linesAfter); err != nil {
return fmt.Errorf("cannot obtain context rows for _stream_id=%q: %w", streamID, err)
}
for _, rowFields := range resultRows {
wctx.writeRow(rowFields)
}
}
wctx.flush()
@ -339,37 +335,55 @@ func (pcp *pipeStreamContextProcessor) flush() error {
return nil
}
func getStreamContextRows(streamRows, rows []streamContextRow, linesBefore, linesAfter int) ([][]Field, error) {
func (wctx *pipeStreamContextWriteContext) writeStreamContextRows(streamID string, streamRows, rows []streamContextRow, linesBefore, linesAfter int) error {
sortStreamContextRows(streamRows)
sortStreamContextRows(rows)
var resultRows [][]Field
idxNext := 0
for _, r := range rows {
idx := getStreamContextRowIdx(streamRows, r.timestamp)
for i := range rows {
r := &rows[i]
idx := getStreamContextRowIdx(streamRows, r)
if idx < 0 {
// This error may happen when streamRows became out of sync with rows.
// For example, when some streamRows were deleted after obtaining rows.
return nil, fmt.Errorf("missing row for timestamp=%d; len(streamRows)=%d, len(rows)=%d", r.timestamp, len(streamRows), len(rows))
return fmt.Errorf("missing row for timestamp=%d; len(streamRows)=%d, len(rows)=%d; re-execute the query", r.timestamp, len(streamRows), len(rows))
}
idxStart := idx - linesBefore
if idxStart < idxNext {
idxStart = idxNext
} else if idxNext > 0 && idxStart > idxNext {
// Write delimiter row between multiple contexts in the same stream.
// This simplifies investigation of the returned logs.
fields := []Field{
{
Name: "_time",
Value: string(marshalTimestampRFC3339NanoString(nil, r.timestamp+1)),
},
{
Name: "_stream_id",
Value: streamID,
},
{
Name: "_msg",
Value: "---",
},
}
wctx.writeRow(fields)
}
for idxStart < idx {
resultRows = append(resultRows, streamRows[idxStart].fields)
wctx.writeRow(streamRows[idxStart].fields)
idxStart++
}
if idx >= idxNext {
resultRows = append(resultRows, streamRows[idx].fields)
wctx.writeRow(streamRows[idx].fields)
idxNext = idx + 1
}
idxEnd := idx + 1 + linesAfter
for idxNext < idxEnd && idxNext < len(streamRows) {
resultRows = append(resultRows, streamRows[idxNext].fields)
wctx.writeRow(streamRows[idxNext].fields)
idxNext++
}
@ -378,19 +392,25 @@ func getStreamContextRows(streamRows, rows []streamContextRow, linesBefore, line
}
}
return resultRows, nil
return nil
}
func getStreamContextRowIdx(rows []streamContextRow, timestamp int64) int {
func getStreamContextRowIdx(rows []streamContextRow, r *streamContextRow) int {
n := sort.Search(len(rows), func(i int) bool {
return rows[i].timestamp >= timestamp
return rows[i].timestamp >= r.timestamp
})
if n == len(rows) {
return -1
}
if rows[n].timestamp != timestamp {
if rows[n].timestamp != r.timestamp {
return -1
}
for rows[n].timestamp == r.timestamp && !equalFields(rows[n].fields, r.fields) {
n++
if n >= len(rows) {
return -1
}
}
return n
}

View file

@ -79,6 +79,22 @@ func (f *Field) marshalToLogfmt(dst []byte) []byte {
return dst
}
func equalFields(a, b []Field) bool {
if len(a) != len(b) {
return false
}
for i, x := range a {
y := b[i]
if x.Name != y.Name {
return false
}
if x.Value != y.Value {
return false
}
}
return true
}
func needLogfmtQuoting(s string) bool {
for _, c := range s {
if !isTokenRune(c) {

View file

@ -707,7 +707,7 @@ func TestStorageRunQuery(t *testing.T) {
| stream_context before 1000
| stats count() rows`, [][]Field{
{
{"rows", "825"},
{"rows", "858"},
},
})
})
@ -716,7 +716,7 @@ func TestStorageRunQuery(t *testing.T) {
| stream_context after 1000
| stats count() rows`, [][]Field{
{
{"rows", "495"},
{"rows", "462"},
},
})
})