From f5dfe1cacd8106c83edd1c43e433558ce409b656 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 26 Sep 2024 16:52:55 +0200 Subject: [PATCH] lib/logstorage: properly return surrounding logs outside the selected time range by stream_context pipe Previously only logs inside the selected time range could be returned by stream_context pipe. For example, the following query could return up to 10 surrounding logs only for the last 5 minutes, while most users expect this query should return up to 10 surrounding logs without restrictions on the time range. _time:5m panic | stream_context before 10 This enables the ability to implement stream context feature at VictoriaLogs web UI: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7063 . Reduce memory usage when returning stream context over big log streams with millions of entries. The new logic scans over all the log messages for the selected log stream, while keeping in memory only the given number of surrounding logs. Previously all the logs for the given log stream on the selected time range were loaded in memory before selecting the needed surrounding logs. This should help https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6730 . Reduce the scan performance for big log streams by fetching only the requested fields. For example, the following query should be executed much faster than before if logs contain many fields other than _stream, _msg and _time: panic | stream_context after 30 | fields _stream, _msg, _time --- docs/VictoriaLogs/CHANGELOG.md | 2 + lib/contextutil/stop_chan_context.go | 47 +++ lib/logstorage/pipe_stream_context.go | 422 ++++++++++++++++++-------- lib/logstorage/storage_search.go | 2 +- lib/logstorage/storage_search_test.go | 6 +- 5 files changed, 342 insertions(+), 137 deletions(-) create mode 100644 lib/contextutil/stop_chan_context.go diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 6402685a4..eb8231f56 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -20,11 +20,13 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta * FEATURE: improve performance of analytical queries, which do not need reading the `_time` field. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7070). * FEATURE: add [`blocks_count` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#blocks_count-pipe), which can be used for counting the number of matching blocks for the given query. For example, `_time:5m | blocks_count` returns the number of blocks with logs for the last 5 minutes. This pipe can be useful for debugging purposes. * FEATURE: support [ingesting logs](https://docs.victoriametrics.com/victorialogs/data-ingestion/) with `_time` field, which doesn't contain timezone information. For example, `2024-09-20T10:20:30`. In this case the local timezone of the host where VictoriaLogs runs is used. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6721). +* FEATURE: reduce memory usage when [`stream_context` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stream_context-pipe) is applied to [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) with big number of messages. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6730). * BUGFIX: fix Windows build, which has been broken in [v0.29.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.29.0-victorialogs). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6973). * BUGFIX: properly return logs from [`/select/logsql/tail` endpoint](https://docs.victoriametrics.com/victorialogs/querying/#live-tailing) if the query contains [`_time:some_duration` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter) like `_time:5m`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7028). The bug has been introduced in [v0.29.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.29.0-victorialogs). * BUGFIX: properly return logs without [`_msg`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) field when `*` query is passed to [`/select/logsql/query` endpoint](https://docs.victoriametrics.com/victorialogs/querying/#querying-logs) together with positive `limit` arg. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6785). Thanks to @jiekun for identifying the root cause of the issue. * BUGFIX: support [ingesting logs](https://docs.victoriametrics.com/victorialogs/data-ingestion/) with `_time` field containing whitespace delimiter between the date and time instead of `T` delimiter. For example, `2024-09-20 10:20:30`. This is valid [ISO8601 format](https://en.wikipedia.org/wiki/ISO_8601) aka `SQL datetime` format, which sometimes is used in production. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6721). +* BUGFIX: return all the requested surrounding logs for [`stream_context` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stream_context-pipe). Previously only logs matching the [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter) were returned. This is needed for [this feature](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7063). ## [v0.29.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.29.0-victorialogs) diff --git a/lib/contextutil/stop_chan_context.go b/lib/contextutil/stop_chan_context.go new file mode 100644 index 000000000..76e487c6d --- /dev/null +++ b/lib/contextutil/stop_chan_context.go @@ -0,0 +1,47 @@ +package contextutil + +import ( + "context" + "time" +) + +// NewStopChanContext returns new context for the given stopCh, together with cancel function. +// +// The returned context is canceled on the following events: +// +// - when stopCh is closed +// - when the returned CancelFunc is called +// +// The caller must call the returned CancelFunc when the context is no longer needed. +func NewStopChanContext(stopCh <-chan struct{}) (context.Context, context.CancelFunc) { + ctx := &stopChanContext{ + stopCh: stopCh, + } + return context.WithCancel(ctx) +} + +// stopChanContext implements context.Context for stopCh passed to newStopChanContext. +type stopChanContext struct { + stopCh <-chan struct{} +} + +func (ctx *stopChanContext) Deadline() (time.Time, bool) { + return time.Time{}, false +} + +func (ctx *stopChanContext) Done() <-chan struct{} { + return ctx.stopCh +} + +func (ctx *stopChanContext) Err() error { + select { + case <-ctx.stopCh: + return context.Canceled + default: + return nil + } +} + +func (ctx *stopChanContext) Value(key any) any { + return nil +} diff --git a/lib/logstorage/pipe_stream_context.go b/lib/logstorage/pipe_stream_context.go index 679e19386..9b4b9f535 100644 --- a/lib/logstorage/pipe_stream_context.go +++ b/lib/logstorage/pipe_stream_context.go @@ -1,15 +1,17 @@ package logstorage import ( - "context" + "container/heap" "fmt" "math" + "slices" "sort" "strings" "sync" "sync/atomic" "unsafe" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/contextutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" ) @@ -97,39 +99,64 @@ type pipeStreamContextProcessor struct { cancel func() ppNext pipeProcessor - shards []pipeStreamContextProcessorShard + s *Storage + neededColumnNames []string + unneededColumnNames []string - getStreamRows func(streamID string, stateSizeBudget int) ([]streamContextRow, error) + shards []pipeStreamContextProcessorShard maxStateSize int64 stateSizeBudget atomic.Int64 } -func (pcp *pipeStreamContextProcessor) init(ctx context.Context, s *Storage, minTimestamp, maxTimestamp int64) { - pcp.getStreamRows = func(streamID string, stateSizeBudget int) ([]streamContextRow, error) { - return getStreamRows(ctx, s, streamID, minTimestamp, maxTimestamp, stateSizeBudget) - } +func (pcp *pipeStreamContextProcessor) init(s *Storage, neededColumnNames, unneededColumnNames []string) { + pcp.s = s + pcp.neededColumnNames = neededColumnNames + pcp.unneededColumnNames = unneededColumnNames } -func getStreamRows(ctx context.Context, s *Storage, streamID string, minTimestamp, maxTimestamp int64, stateSizeBudget int) ([]streamContextRow, error) { +func (pcp *pipeStreamContextProcessor) getStreamRowss(streamID string, neededRows []streamContextRow, stateSizeBudget int) ([][]*streamContextRow, error) { tenantID, ok := getTenantIDFromStreamIDString(streamID) if !ok { logger.Panicf("BUG: cannot obtain tenantID from streamID %q", streamID) } + // construct the query for selecting all the rows for the given streamID qStr := "_stream_id:" + streamID + if slices.Contains(pcp.neededColumnNames, "*") { + if len(pcp.unneededColumnNames) > 0 { + qStr += " | delete " + fieldNamesString(pcp.unneededColumnNames) + } + } else { + if len(pcp.neededColumnNames) > 0 { + qStr += " | fields " + fieldNamesString(pcp.neededColumnNames) + } + } q, err := ParseQuery(qStr) if err != nil { logger.Panicf("BUG: cannot parse query [%s]: %s", qStr, err) } - q.AddTimeFilter(minTimestamp, maxTimestamp) - ctxWithCancel, cancel := context.WithCancel(ctx) + // mu protects contextRows and stateSize inside writeBlock callback. + var mu sync.Mutex + + contextRows := make([]streamContextRows, len(neededRows)) + for i := range neededRows { + contextRows[i] = streamContextRows{ + neededTimestamp: neededRows[i].timestamp, + linesBefore: pcp.pc.linesBefore, + linesAfter: pcp.pc.linesAfter, + } + } + sort.Slice(contextRows, func(i, j int) bool { + return contextRows[i].neededTimestamp < contextRows[j].neededTimestamp + }) + + stateSize := 0 + + ctxWithCancel, cancel := contextutil.NewStopChanContext(pcp.stopCh) defer cancel() - var mu sync.Mutex - var rows []streamContextRow - stateSize := 0 writeBlock := func(_ uint, br *blockResult) { mu.Lock() defer mu.Unlock() @@ -138,38 +165,148 @@ func getStreamRows(ctx context.Context, s *Storage, streamID string, minTimestam cancel() } - cs := br.getColumns() timestamps := br.getTimestamps() for i, timestamp := range timestamps { - fields := make([]Field, len(cs)) - stateSize += int(unsafe.Sizeof(fields[0])) * len(fields) - - for j, c := range cs { - v := c.getValueAtRow(br, i) - fields[j] = Field{ - Name: strings.Clone(c.name), - Value: strings.Clone(v), + if needStop(pcp.stopCh) { + break + } + for j := range contextRows { + if j > 0 && timestamp <= contextRows[j-1].neededTimestamp { + continue } - stateSize += len(c.name) + len(v) + if j+1 < len(contextRows) && timestamp >= contextRows[j+1].neededTimestamp { + continue + } + stateSize += contextRows[j].update(br, i, timestamp) } - - row := streamContextRow{ - timestamp: timestamp, - fields: fields, - } - stateSize += int(unsafe.Sizeof(row)) - rows = append(rows, row) } } - if err := s.runQuery(ctxWithCancel, []TenantID{tenantID}, q, writeBlock); err != nil { + if err := pcp.s.runQuery(ctxWithCancel, []TenantID{tenantID}, q, writeBlock); err != nil { return nil, err } if stateSize > stateSizeBudget { - return nil, fmt.Errorf("more than %dMB of memory is needed for query [%s]", stateSizeBudget/(1<<20), q) + return nil, fmt.Errorf("more than %dMB of memory is needed for fetching the surrounding logs for %d matching logs", stateSizeBudget/(1<<20), len(neededRows)) } - return rows, nil + // return sorted results from contextRows + rowss := make([][]*streamContextRow, len(contextRows)) + for i, ctx := range contextRows { + rowss[i] = ctx.getSortedRows() + } + rowss = deduplicateStreamRowss(rowss) + return rowss, nil +} + +func deduplicateStreamRowss(streamRowss [][]*streamContextRow) [][]*streamContextRow { + var lastSeenRow *streamContextRow + for _, streamRows := range streamRowss { + if len(streamRows) > 0 { + lastSeenRow = streamRows[len(streamRows)-1] + break + } + } + if lastSeenRow == nil { + return nil + } + + resultRowss := streamRowss[:1] + for _, streamRows := range streamRowss[1:] { + i := 0 + for i < len(streamRows) && !lastSeenRow.less(streamRows[i]) { + i++ + } + streamRows = streamRows[i:] + if len(streamRows) == 0 { + continue + } + resultRowss = append(resultRowss, streamRows) + lastSeenRow = streamRows[len(streamRows)-1] + } + return resultRowss +} + +type streamContextRows struct { + neededTimestamp int64 + linesBefore int + linesAfter int + + rowsBefore streamContextRowsHeapMin + rowsAfter streamContextRowsHeapMax + rowsMatched []*streamContextRow +} + +func (ctx *streamContextRows) getSortedRows() []*streamContextRow { + var rows []*streamContextRow + rows = append(rows, ctx.rowsBefore...) + rows = append(rows, ctx.rowsMatched...) + rows = append(rows, ctx.rowsAfter...) + sort.Slice(rows, func(i, j int) bool { + return rows[i].less(rows[j]) + }) + return rows +} + +func (ctx *streamContextRows) update(br *blockResult, rowIdx int, rowTimestamp int64) int { + if rowTimestamp < ctx.neededTimestamp { + if ctx.linesBefore <= 0 { + return 0 + } + if len(ctx.rowsBefore) < ctx.linesBefore { + r := ctx.copyRowAtIdx(br, rowIdx, rowTimestamp) + heap.Push(&ctx.rowsBefore, r) + return r.sizeBytes() + } + if rowTimestamp <= ctx.rowsBefore[0].timestamp { + return 0 + } + r := ctx.copyRowAtIdx(br, rowIdx, rowTimestamp) + stateSizeChange := r.sizeBytes() - ctx.rowsBefore[0].sizeBytes() + ctx.rowsBefore[0] = r + heap.Fix(&ctx.rowsBefore, 0) + return stateSizeChange + } + + if rowTimestamp > ctx.neededTimestamp { + if ctx.linesAfter <= 0 { + return 0 + } + if len(ctx.rowsAfter) < ctx.linesAfter { + r := ctx.copyRowAtIdx(br, rowIdx, rowTimestamp) + heap.Push(&ctx.rowsAfter, r) + return r.sizeBytes() + } + if rowTimestamp >= ctx.rowsAfter[0].timestamp { + return 0 + } + r := ctx.copyRowAtIdx(br, rowIdx, rowTimestamp) + stateSizeChange := r.sizeBytes() - ctx.rowsAfter[0].sizeBytes() + ctx.rowsAfter[0] = r + heap.Fix(&ctx.rowsAfter, 0) + return stateSizeChange + } + + // rowTimestamp == ctx.neededTimestamp + r := ctx.copyRowAtIdx(br, rowIdx, rowTimestamp) + ctx.rowsMatched = append(ctx.rowsMatched, r) + return r.sizeBytes() +} + +func (ctx *streamContextRows) copyRowAtIdx(br *blockResult, rowIdx int, rowTimestamp int64) *streamContextRow { + cs := br.getColumns() + + fields := make([]Field, len(cs)) + for i, c := range cs { + v := c.getValueAtRow(br, rowIdx) + fields[i] = Field{ + Name: strings.Clone(c.name), + Value: strings.Clone(v), + } + } + return &streamContextRow{ + timestamp: rowTimestamp, + fields: fields, + } } func getTenantIDFromStreamIDString(s string) (TenantID, bool) { @@ -192,6 +329,44 @@ type streamContextRow struct { fields []Field } +func (r *streamContextRow) sizeBytes() int { + n := 0 + fields := r.fields + for _, f := range fields { + n += len(f.Name) + len(f.Value) + int(unsafe.Sizeof(f)) + } + n += int(unsafe.Sizeof(*r) + unsafe.Sizeof(r)) + return n +} + +func (r *streamContextRow) less(other *streamContextRow) bool { + // compare timestamps at first + if r.timestamp != other.timestamp { + return r.timestamp < other.timestamp + } + + // compare fields then + i := 0 + aFields := r.fields + bFields := other.fields + for i < len(aFields) && i < len(bFields) { + af := &aFields[i] + bf := &bFields[i] + if af.Name != bf.Name { + return af.Name < bf.Name + } + if af.Value != bf.Value { + return af.Value < bf.Value + } + i++ + } + if len(aFields) != len(bFields) { + return len(aFields) < len(bFields) + } + + return false +} + type pipeStreamContextProcessorShardNopad struct { // pc points to the parent pipeStreamContext. pc *pipeStreamContext @@ -320,15 +495,24 @@ func (pcp *pipeStreamContextProcessor) flush() error { } for streamID, rows := range m { - streamRows, err := pcp.getStreamRows(streamID, stateSizeBudget) + streamRowss, err := pcp.getStreamRowss(streamID, rows, stateSizeBudget) if err != nil { - return fmt.Errorf("cannot read rows for _stream_id=%q: %w", streamID, err) + return err } if needStop(pcp.stopCh) { return nil } - if err := wctx.writeStreamContextRows(streamID, streamRows, rows, pcp.pc.linesBefore, pcp.pc.linesAfter); err != nil { - return fmt.Errorf("cannot obtain context rows for _stream_id=%q: %w", streamID, err) + + // Write streamRows to the output. + for _, streamRows := range streamRowss { + for _, streamRow := range streamRows { + wctx.writeRow(streamRow.fields) + } + if len(streamRowss) > 1 { + lastRow := streamRows[len(streamRows)-1] + fields := newDelimiterRowFields(lastRow, streamID) + wctx.writeRow(fields) + } } } @@ -337,103 +521,25 @@ func (pcp *pipeStreamContextProcessor) flush() error { return nil } -func (wctx *pipeStreamContextWriteContext) writeStreamContextRows(streamID string, streamRows, rows []streamContextRow, linesBefore, linesAfter int) error { - sortStreamContextRows(streamRows) - sortStreamContextRows(rows) - - idxNext := 0 - for i := range rows { - r := &rows[i] - idx := getStreamContextRowIdx(streamRows, r) - if idx < 0 { - // This error may happen when streamRows became out of sync with rows. - // For example, when some streamRows were deleted after obtaining rows. - return fmt.Errorf("missing row for timestamp=%d; len(streamRows)=%d, len(rows)=%d; re-execute the query", r.timestamp, len(streamRows), len(rows)) - } - - idxStart := idx - linesBefore - if idxStart < idxNext { - idxStart = idxNext - } else if idxNext > 0 && idxStart > idxNext { - // Write delimiter row between multiple contexts in the same stream. - // This simplifies investigation of the returned logs. - fields := []Field{ - { - Name: "_time", - Value: string(marshalTimestampRFC3339NanoString(nil, r.timestamp+1)), - }, - { - Name: "_stream_id", - Value: streamID, - }, - { - Name: "_stream", - Value: getFieldValue(r.fields, "_stream"), - }, - { - Name: "_msg", - Value: "---", - }, - } - wctx.writeRow(fields) - } - for idxStart < idx { - wctx.writeRow(streamRows[idxStart].fields) - idxStart++ - } - - if idx >= idxNext { - wctx.writeRow(streamRows[idx].fields) - idxNext = idx + 1 - } - - idxEnd := idx + 1 + linesAfter - for idxNext < idxEnd && idxNext < len(streamRows) { - wctx.writeRow(streamRows[idxNext].fields) - idxNext++ - } - - if idxNext >= len(streamRows) { - break - } +func newDelimiterRowFields(r *streamContextRow, streamID string) []Field { + return []Field{ + { + Name: "_time", + Value: string(marshalTimestampRFC3339NanoString(nil, r.timestamp+1)), + }, + { + Name: "_stream_id", + Value: streamID, + }, + { + Name: "_stream", + Value: getFieldValue(r.fields, "_stream"), + }, + { + Name: "_msg", + Value: "---", + }, } - - return nil -} - -func getStreamContextRowIdx(rows []streamContextRow, r *streamContextRow) int { - n := sort.Search(len(rows), func(i int) bool { - return rows[i].timestamp >= r.timestamp - }) - if n == len(rows) { - return -1 - } - - equalFields := func(fields []Field) bool { - for _, f := range r.fields { - if f.Value != getFieldValue(fields, f.Name) { - return false - } - } - return true - } - - for rows[n].timestamp == r.timestamp && !equalFields(rows[n].fields) { - n++ - if n >= len(rows) { - return -1 - } - } - if rows[n].timestamp != r.timestamp { - return -1 - } - return n -} - -func sortStreamContextRows(rows []streamContextRow) { - sort.Slice(rows, func(i, j int) bool { - return rows[i].timestamp < rows[j].timestamp - }) } type pipeStreamContextWriteContext struct { @@ -554,3 +660,53 @@ func parsePipeStreamContextBeforeAfter(lex *lexer) (int, int, error) { } } } + +type streamContextRowsHeapMax []*streamContextRow + +func (h *streamContextRowsHeapMax) Len() int { + return len(*h) +} +func (h *streamContextRowsHeapMax) Less(i, j int) bool { + a := *h + return a[i].timestamp > a[j].timestamp +} +func (h *streamContextRowsHeapMax) Swap(i, j int) { + a := *h + a[i], a[j] = a[j], a[i] +} +func (h *streamContextRowsHeapMax) Push(v any) { + x := v.(*streamContextRow) + *h = append(*h, x) +} +func (h *streamContextRowsHeapMax) Pop() any { + a := *h + x := a[len(a)-1] + a[len(a)-1] = nil + *h = a[:len(a)-1] + return x +} + +type streamContextRowsHeapMin streamContextRowsHeapMax + +func (h *streamContextRowsHeapMin) Len() int { + return len(*h) +} +func (h *streamContextRowsHeapMin) Less(i, j int) bool { + a := *h + return a[i].timestamp < a[j].timestamp +} +func (h *streamContextRowsHeapMin) Swap(i, j int) { + a := *h + a[i], a[j] = a[j], a[i] +} +func (h *streamContextRowsHeapMin) Push(v any) { + x := v.(*streamContextRow) + *h = append(*h, x) +} +func (h *streamContextRowsHeapMin) Pop() any { + a := *h + x := a[len(a)-1] + a[len(a)-1] = nil + *h = a[:len(a)-1] + return x +} diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index 58724fa58..941c195c8 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -148,7 +148,7 @@ func (s *Storage) runQuery(ctx context.Context, tenantIDs []TenantID, q *Query, pcp, ok := pp.(*pipeStreamContextProcessor) if ok { - pcp.init(ctx, s, minTimestamp, maxTimestamp) + pcp.init(s, neededColumnNames, unneededColumnNames) if i > 0 { errPipe = fmt.Errorf("[%s] pipe must go after [%s] filter; now it goes after the [%s] pipe", p, q.f, q.pipes[i-1]) } diff --git a/lib/logstorage/storage_search_test.go b/lib/logstorage/storage_search_test.go index 0dd9d4f6a..fb7709fe9 100644 --- a/lib/logstorage/storage_search_test.go +++ b/lib/logstorage/storage_search_test.go @@ -707,7 +707,7 @@ func TestStorageRunQuery(t *testing.T) { | stream_context before 1000 | stats count() rows`, [][]Field{ { - {"rows", "825"}, + {"rows", "990"}, }, }) }) @@ -716,7 +716,7 @@ func TestStorageRunQuery(t *testing.T) { | stream_context after 1000 | stats count() rows`, [][]Field{ { - {"rows", "495"}, + {"rows", "660"}, }, }) }) @@ -725,7 +725,7 @@ func TestStorageRunQuery(t *testing.T) { | stream_context before 1000 after 1000 | stats count() rows`, [][]Field{ { - {"rows", "1155"}, + {"rows", "1320"}, }, }) })