diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 6402685a4..eb8231f56 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -20,11 +20,13 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta * FEATURE: improve performance of analytical queries, which do not need reading the `_time` field. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7070). * FEATURE: add [`blocks_count` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#blocks_count-pipe), which can be used for counting the number of matching blocks for the given query. For example, `_time:5m | blocks_count` returns the number of blocks with logs for the last 5 minutes. This pipe can be useful for debugging purposes. * FEATURE: support [ingesting logs](https://docs.victoriametrics.com/victorialogs/data-ingestion/) with `_time` field, which doesn't contain timezone information. For example, `2024-09-20T10:20:30`. In this case the local timezone of the host where VictoriaLogs runs is used. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6721). +* FEATURE: reduce memory usage when [`stream_context` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stream_context-pipe) is applied to [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) with big number of messages. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6730). * BUGFIX: fix Windows build, which has been broken in [v0.29.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.29.0-victorialogs). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6973). * BUGFIX: properly return logs from [`/select/logsql/tail` endpoint](https://docs.victoriametrics.com/victorialogs/querying/#live-tailing) if the query contains [`_time:some_duration` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter) like `_time:5m`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7028). The bug has been introduced in [v0.29.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.29.0-victorialogs). * BUGFIX: properly return logs without [`_msg`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) field when `*` query is passed to [`/select/logsql/query` endpoint](https://docs.victoriametrics.com/victorialogs/querying/#querying-logs) together with positive `limit` arg. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6785). Thanks to @jiekun for identifying the root cause of the issue. * BUGFIX: support [ingesting logs](https://docs.victoriametrics.com/victorialogs/data-ingestion/) with `_time` field containing whitespace delimiter between the date and time instead of `T` delimiter. For example, `2024-09-20 10:20:30`. This is valid [ISO8601 format](https://en.wikipedia.org/wiki/ISO_8601) aka `SQL datetime` format, which sometimes is used in production. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6721). +* BUGFIX: return all the requested surrounding logs for [`stream_context` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stream_context-pipe). Previously only logs matching the [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter) were returned. This is needed for [this feature](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7063). ## [v0.29.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.29.0-victorialogs) diff --git a/lib/contextutil/stop_chan_context.go b/lib/contextutil/stop_chan_context.go new file mode 100644 index 000000000..76e487c6d --- /dev/null +++ b/lib/contextutil/stop_chan_context.go @@ -0,0 +1,47 @@ +package contextutil + +import ( + "context" + "time" +) + +// NewStopChanContext returns new context for the given stopCh, together with cancel function. +// +// The returned context is canceled on the following events: +// +// - when stopCh is closed +// - when the returned CancelFunc is called +// +// The caller must call the returned CancelFunc when the context is no longer needed. +func NewStopChanContext(stopCh <-chan struct{}) (context.Context, context.CancelFunc) { + ctx := &stopChanContext{ + stopCh: stopCh, + } + return context.WithCancel(ctx) +} + +// stopChanContext implements context.Context for stopCh passed to newStopChanContext. +type stopChanContext struct { + stopCh <-chan struct{} +} + +func (ctx *stopChanContext) Deadline() (time.Time, bool) { + return time.Time{}, false +} + +func (ctx *stopChanContext) Done() <-chan struct{} { + return ctx.stopCh +} + +func (ctx *stopChanContext) Err() error { + select { + case <-ctx.stopCh: + return context.Canceled + default: + return nil + } +} + +func (ctx *stopChanContext) Value(key any) any { + return nil +} diff --git a/lib/logstorage/pipe_stream_context.go b/lib/logstorage/pipe_stream_context.go index 679e19386..9b4b9f535 100644 --- a/lib/logstorage/pipe_stream_context.go +++ b/lib/logstorage/pipe_stream_context.go @@ -1,15 +1,17 @@ package logstorage import ( - "context" + "container/heap" "fmt" "math" + "slices" "sort" "strings" "sync" "sync/atomic" "unsafe" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/contextutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" ) @@ -97,39 +99,64 @@ type pipeStreamContextProcessor struct { cancel func() ppNext pipeProcessor - shards []pipeStreamContextProcessorShard + s *Storage + neededColumnNames []string + unneededColumnNames []string - getStreamRows func(streamID string, stateSizeBudget int) ([]streamContextRow, error) + shards []pipeStreamContextProcessorShard maxStateSize int64 stateSizeBudget atomic.Int64 } -func (pcp *pipeStreamContextProcessor) init(ctx context.Context, s *Storage, minTimestamp, maxTimestamp int64) { - pcp.getStreamRows = func(streamID string, stateSizeBudget int) ([]streamContextRow, error) { - return getStreamRows(ctx, s, streamID, minTimestamp, maxTimestamp, stateSizeBudget) - } +func (pcp *pipeStreamContextProcessor) init(s *Storage, neededColumnNames, unneededColumnNames []string) { + pcp.s = s + pcp.neededColumnNames = neededColumnNames + pcp.unneededColumnNames = unneededColumnNames } -func getStreamRows(ctx context.Context, s *Storage, streamID string, minTimestamp, maxTimestamp int64, stateSizeBudget int) ([]streamContextRow, error) { +func (pcp *pipeStreamContextProcessor) getStreamRowss(streamID string, neededRows []streamContextRow, stateSizeBudget int) ([][]*streamContextRow, error) { tenantID, ok := getTenantIDFromStreamIDString(streamID) if !ok { logger.Panicf("BUG: cannot obtain tenantID from streamID %q", streamID) } + // construct the query for selecting all the rows for the given streamID qStr := "_stream_id:" + streamID + if slices.Contains(pcp.neededColumnNames, "*") { + if len(pcp.unneededColumnNames) > 0 { + qStr += " | delete " + fieldNamesString(pcp.unneededColumnNames) + } + } else { + if len(pcp.neededColumnNames) > 0 { + qStr += " | fields " + fieldNamesString(pcp.neededColumnNames) + } + } q, err := ParseQuery(qStr) if err != nil { logger.Panicf("BUG: cannot parse query [%s]: %s", qStr, err) } - q.AddTimeFilter(minTimestamp, maxTimestamp) - ctxWithCancel, cancel := context.WithCancel(ctx) + // mu protects contextRows and stateSize inside writeBlock callback. + var mu sync.Mutex + + contextRows := make([]streamContextRows, len(neededRows)) + for i := range neededRows { + contextRows[i] = streamContextRows{ + neededTimestamp: neededRows[i].timestamp, + linesBefore: pcp.pc.linesBefore, + linesAfter: pcp.pc.linesAfter, + } + } + sort.Slice(contextRows, func(i, j int) bool { + return contextRows[i].neededTimestamp < contextRows[j].neededTimestamp + }) + + stateSize := 0 + + ctxWithCancel, cancel := contextutil.NewStopChanContext(pcp.stopCh) defer cancel() - var mu sync.Mutex - var rows []streamContextRow - stateSize := 0 writeBlock := func(_ uint, br *blockResult) { mu.Lock() defer mu.Unlock() @@ -138,38 +165,148 @@ func getStreamRows(ctx context.Context, s *Storage, streamID string, minTimestam cancel() } - cs := br.getColumns() timestamps := br.getTimestamps() for i, timestamp := range timestamps { - fields := make([]Field, len(cs)) - stateSize += int(unsafe.Sizeof(fields[0])) * len(fields) - - for j, c := range cs { - v := c.getValueAtRow(br, i) - fields[j] = Field{ - Name: strings.Clone(c.name), - Value: strings.Clone(v), + if needStop(pcp.stopCh) { + break + } + for j := range contextRows { + if j > 0 && timestamp <= contextRows[j-1].neededTimestamp { + continue } - stateSize += len(c.name) + len(v) + if j+1 < len(contextRows) && timestamp >= contextRows[j+1].neededTimestamp { + continue + } + stateSize += contextRows[j].update(br, i, timestamp) } - - row := streamContextRow{ - timestamp: timestamp, - fields: fields, - } - stateSize += int(unsafe.Sizeof(row)) - rows = append(rows, row) } } - if err := s.runQuery(ctxWithCancel, []TenantID{tenantID}, q, writeBlock); err != nil { + if err := pcp.s.runQuery(ctxWithCancel, []TenantID{tenantID}, q, writeBlock); err != nil { return nil, err } if stateSize > stateSizeBudget { - return nil, fmt.Errorf("more than %dMB of memory is needed for query [%s]", stateSizeBudget/(1<<20), q) + return nil, fmt.Errorf("more than %dMB of memory is needed for fetching the surrounding logs for %d matching logs", stateSizeBudget/(1<<20), len(neededRows)) } - return rows, nil + // return sorted results from contextRows + rowss := make([][]*streamContextRow, len(contextRows)) + for i, ctx := range contextRows { + rowss[i] = ctx.getSortedRows() + } + rowss = deduplicateStreamRowss(rowss) + return rowss, nil +} + +func deduplicateStreamRowss(streamRowss [][]*streamContextRow) [][]*streamContextRow { + var lastSeenRow *streamContextRow + for _, streamRows := range streamRowss { + if len(streamRows) > 0 { + lastSeenRow = streamRows[len(streamRows)-1] + break + } + } + if lastSeenRow == nil { + return nil + } + + resultRowss := streamRowss[:1] + for _, streamRows := range streamRowss[1:] { + i := 0 + for i < len(streamRows) && !lastSeenRow.less(streamRows[i]) { + i++ + } + streamRows = streamRows[i:] + if len(streamRows) == 0 { + continue + } + resultRowss = append(resultRowss, streamRows) + lastSeenRow = streamRows[len(streamRows)-1] + } + return resultRowss +} + +type streamContextRows struct { + neededTimestamp int64 + linesBefore int + linesAfter int + + rowsBefore streamContextRowsHeapMin + rowsAfter streamContextRowsHeapMax + rowsMatched []*streamContextRow +} + +func (ctx *streamContextRows) getSortedRows() []*streamContextRow { + var rows []*streamContextRow + rows = append(rows, ctx.rowsBefore...) + rows = append(rows, ctx.rowsMatched...) + rows = append(rows, ctx.rowsAfter...) + sort.Slice(rows, func(i, j int) bool { + return rows[i].less(rows[j]) + }) + return rows +} + +func (ctx *streamContextRows) update(br *blockResult, rowIdx int, rowTimestamp int64) int { + if rowTimestamp < ctx.neededTimestamp { + if ctx.linesBefore <= 0 { + return 0 + } + if len(ctx.rowsBefore) < ctx.linesBefore { + r := ctx.copyRowAtIdx(br, rowIdx, rowTimestamp) + heap.Push(&ctx.rowsBefore, r) + return r.sizeBytes() + } + if rowTimestamp <= ctx.rowsBefore[0].timestamp { + return 0 + } + r := ctx.copyRowAtIdx(br, rowIdx, rowTimestamp) + stateSizeChange := r.sizeBytes() - ctx.rowsBefore[0].sizeBytes() + ctx.rowsBefore[0] = r + heap.Fix(&ctx.rowsBefore, 0) + return stateSizeChange + } + + if rowTimestamp > ctx.neededTimestamp { + if ctx.linesAfter <= 0 { + return 0 + } + if len(ctx.rowsAfter) < ctx.linesAfter { + r := ctx.copyRowAtIdx(br, rowIdx, rowTimestamp) + heap.Push(&ctx.rowsAfter, r) + return r.sizeBytes() + } + if rowTimestamp >= ctx.rowsAfter[0].timestamp { + return 0 + } + r := ctx.copyRowAtIdx(br, rowIdx, rowTimestamp) + stateSizeChange := r.sizeBytes() - ctx.rowsAfter[0].sizeBytes() + ctx.rowsAfter[0] = r + heap.Fix(&ctx.rowsAfter, 0) + return stateSizeChange + } + + // rowTimestamp == ctx.neededTimestamp + r := ctx.copyRowAtIdx(br, rowIdx, rowTimestamp) + ctx.rowsMatched = append(ctx.rowsMatched, r) + return r.sizeBytes() +} + +func (ctx *streamContextRows) copyRowAtIdx(br *blockResult, rowIdx int, rowTimestamp int64) *streamContextRow { + cs := br.getColumns() + + fields := make([]Field, len(cs)) + for i, c := range cs { + v := c.getValueAtRow(br, rowIdx) + fields[i] = Field{ + Name: strings.Clone(c.name), + Value: strings.Clone(v), + } + } + return &streamContextRow{ + timestamp: rowTimestamp, + fields: fields, + } } func getTenantIDFromStreamIDString(s string) (TenantID, bool) { @@ -192,6 +329,44 @@ type streamContextRow struct { fields []Field } +func (r *streamContextRow) sizeBytes() int { + n := 0 + fields := r.fields + for _, f := range fields { + n += len(f.Name) + len(f.Value) + int(unsafe.Sizeof(f)) + } + n += int(unsafe.Sizeof(*r) + unsafe.Sizeof(r)) + return n +} + +func (r *streamContextRow) less(other *streamContextRow) bool { + // compare timestamps at first + if r.timestamp != other.timestamp { + return r.timestamp < other.timestamp + } + + // compare fields then + i := 0 + aFields := r.fields + bFields := other.fields + for i < len(aFields) && i < len(bFields) { + af := &aFields[i] + bf := &bFields[i] + if af.Name != bf.Name { + return af.Name < bf.Name + } + if af.Value != bf.Value { + return af.Value < bf.Value + } + i++ + } + if len(aFields) != len(bFields) { + return len(aFields) < len(bFields) + } + + return false +} + type pipeStreamContextProcessorShardNopad struct { // pc points to the parent pipeStreamContext. pc *pipeStreamContext @@ -320,15 +495,24 @@ func (pcp *pipeStreamContextProcessor) flush() error { } for streamID, rows := range m { - streamRows, err := pcp.getStreamRows(streamID, stateSizeBudget) + streamRowss, err := pcp.getStreamRowss(streamID, rows, stateSizeBudget) if err != nil { - return fmt.Errorf("cannot read rows for _stream_id=%q: %w", streamID, err) + return err } if needStop(pcp.stopCh) { return nil } - if err := wctx.writeStreamContextRows(streamID, streamRows, rows, pcp.pc.linesBefore, pcp.pc.linesAfter); err != nil { - return fmt.Errorf("cannot obtain context rows for _stream_id=%q: %w", streamID, err) + + // Write streamRows to the output. + for _, streamRows := range streamRowss { + for _, streamRow := range streamRows { + wctx.writeRow(streamRow.fields) + } + if len(streamRowss) > 1 { + lastRow := streamRows[len(streamRows)-1] + fields := newDelimiterRowFields(lastRow, streamID) + wctx.writeRow(fields) + } } } @@ -337,103 +521,25 @@ func (pcp *pipeStreamContextProcessor) flush() error { return nil } -func (wctx *pipeStreamContextWriteContext) writeStreamContextRows(streamID string, streamRows, rows []streamContextRow, linesBefore, linesAfter int) error { - sortStreamContextRows(streamRows) - sortStreamContextRows(rows) - - idxNext := 0 - for i := range rows { - r := &rows[i] - idx := getStreamContextRowIdx(streamRows, r) - if idx < 0 { - // This error may happen when streamRows became out of sync with rows. - // For example, when some streamRows were deleted after obtaining rows. - return fmt.Errorf("missing row for timestamp=%d; len(streamRows)=%d, len(rows)=%d; re-execute the query", r.timestamp, len(streamRows), len(rows)) - } - - idxStart := idx - linesBefore - if idxStart < idxNext { - idxStart = idxNext - } else if idxNext > 0 && idxStart > idxNext { - // Write delimiter row between multiple contexts in the same stream. - // This simplifies investigation of the returned logs. - fields := []Field{ - { - Name: "_time", - Value: string(marshalTimestampRFC3339NanoString(nil, r.timestamp+1)), - }, - { - Name: "_stream_id", - Value: streamID, - }, - { - Name: "_stream", - Value: getFieldValue(r.fields, "_stream"), - }, - { - Name: "_msg", - Value: "---", - }, - } - wctx.writeRow(fields) - } - for idxStart < idx { - wctx.writeRow(streamRows[idxStart].fields) - idxStart++ - } - - if idx >= idxNext { - wctx.writeRow(streamRows[idx].fields) - idxNext = idx + 1 - } - - idxEnd := idx + 1 + linesAfter - for idxNext < idxEnd && idxNext < len(streamRows) { - wctx.writeRow(streamRows[idxNext].fields) - idxNext++ - } - - if idxNext >= len(streamRows) { - break - } +func newDelimiterRowFields(r *streamContextRow, streamID string) []Field { + return []Field{ + { + Name: "_time", + Value: string(marshalTimestampRFC3339NanoString(nil, r.timestamp+1)), + }, + { + Name: "_stream_id", + Value: streamID, + }, + { + Name: "_stream", + Value: getFieldValue(r.fields, "_stream"), + }, + { + Name: "_msg", + Value: "---", + }, } - - return nil -} - -func getStreamContextRowIdx(rows []streamContextRow, r *streamContextRow) int { - n := sort.Search(len(rows), func(i int) bool { - return rows[i].timestamp >= r.timestamp - }) - if n == len(rows) { - return -1 - } - - equalFields := func(fields []Field) bool { - for _, f := range r.fields { - if f.Value != getFieldValue(fields, f.Name) { - return false - } - } - return true - } - - for rows[n].timestamp == r.timestamp && !equalFields(rows[n].fields) { - n++ - if n >= len(rows) { - return -1 - } - } - if rows[n].timestamp != r.timestamp { - return -1 - } - return n -} - -func sortStreamContextRows(rows []streamContextRow) { - sort.Slice(rows, func(i, j int) bool { - return rows[i].timestamp < rows[j].timestamp - }) } type pipeStreamContextWriteContext struct { @@ -554,3 +660,53 @@ func parsePipeStreamContextBeforeAfter(lex *lexer) (int, int, error) { } } } + +type streamContextRowsHeapMax []*streamContextRow + +func (h *streamContextRowsHeapMax) Len() int { + return len(*h) +} +func (h *streamContextRowsHeapMax) Less(i, j int) bool { + a := *h + return a[i].timestamp > a[j].timestamp +} +func (h *streamContextRowsHeapMax) Swap(i, j int) { + a := *h + a[i], a[j] = a[j], a[i] +} +func (h *streamContextRowsHeapMax) Push(v any) { + x := v.(*streamContextRow) + *h = append(*h, x) +} +func (h *streamContextRowsHeapMax) Pop() any { + a := *h + x := a[len(a)-1] + a[len(a)-1] = nil + *h = a[:len(a)-1] + return x +} + +type streamContextRowsHeapMin streamContextRowsHeapMax + +func (h *streamContextRowsHeapMin) Len() int { + return len(*h) +} +func (h *streamContextRowsHeapMin) Less(i, j int) bool { + a := *h + return a[i].timestamp < a[j].timestamp +} +func (h *streamContextRowsHeapMin) Swap(i, j int) { + a := *h + a[i], a[j] = a[j], a[i] +} +func (h *streamContextRowsHeapMin) Push(v any) { + x := v.(*streamContextRow) + *h = append(*h, x) +} +func (h *streamContextRowsHeapMin) Pop() any { + a := *h + x := a[len(a)-1] + a[len(a)-1] = nil + *h = a[:len(a)-1] + return x +} diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index 58724fa58..941c195c8 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -148,7 +148,7 @@ func (s *Storage) runQuery(ctx context.Context, tenantIDs []TenantID, q *Query, pcp, ok := pp.(*pipeStreamContextProcessor) if ok { - pcp.init(ctx, s, minTimestamp, maxTimestamp) + pcp.init(s, neededColumnNames, unneededColumnNames) if i > 0 { errPipe = fmt.Errorf("[%s] pipe must go after [%s] filter; now it goes after the [%s] pipe", p, q.f, q.pipes[i-1]) } diff --git a/lib/logstorage/storage_search_test.go b/lib/logstorage/storage_search_test.go index 0dd9d4f6a..fb7709fe9 100644 --- a/lib/logstorage/storage_search_test.go +++ b/lib/logstorage/storage_search_test.go @@ -707,7 +707,7 @@ func TestStorageRunQuery(t *testing.T) { | stream_context before 1000 | stats count() rows`, [][]Field{ { - {"rows", "825"}, + {"rows", "990"}, }, }) }) @@ -716,7 +716,7 @@ func TestStorageRunQuery(t *testing.T) { | stream_context after 1000 | stats count() rows`, [][]Field{ { - {"rows", "495"}, + {"rows", "660"}, }, }) }) @@ -725,7 +725,7 @@ func TestStorageRunQuery(t *testing.T) { | stream_context before 1000 after 1000 | stats count() rows`, [][]Field{ { - {"rows", "1155"}, + {"rows", "1320"}, }, }) })