lib/logstorage: properly return surrounding logs outside the selected time range by stream_context pipe

Previously only logs inside the selected time range could be returned by stream_context pipe.
For example, the following query could return up to 10 surrounding logs only for the last 5 minutes,
while most users expect this query should return up to 10 surrounding logs without restrictions on the time range.

    _time:5m panic | stream_context before 10

This enables the ability to implement stream context feature at VictoriaLogs web UI: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7063 .

Reduce memory usage when returning stream context over big log streams with millions of entries.
The new logic scans over all the log messages for the selected log stream, while keeping in memory only
the given number of surrounding logs. Previously all the logs for the given log stream on the selected time range
were loaded in memory before selecting the needed surrounding logs.
This should help https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6730 .

Reduce the scan performance for big log streams by fetching only the requested fields. For example, the following
query should be executed much faster than before if logs contain many fields other than _stream, _msg and _time:

    panic | stream_context after 30 | fields _stream, _msg, _time
This commit is contained in:
Aliaksandr Valialkin 2024-09-26 16:52:55 +02:00
parent 037652d5ae
commit 4b1611267f
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
5 changed files with 342 additions and 137 deletions

View file

@ -20,11 +20,13 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
* FEATURE: improve performance of analytical queries, which do not need reading the `_time` field. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7070).
* FEATURE: add [`blocks_count` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#blocks_count-pipe), which can be used for counting the number of matching blocks for the given query. For example, `_time:5m | blocks_count` returns the number of blocks with logs for the last 5 minutes. This pipe can be useful for debugging purposes.
* FEATURE: support [ingesting logs](https://docs.victoriametrics.com/victorialogs/data-ingestion/) with `_time` field, which doesn't contain timezone information. For example, `2024-09-20T10:20:30`. In this case the local timezone of the host where VictoriaLogs runs is used. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6721).
* FEATURE: reduce memory usage when [`stream_context` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stream_context-pipe) is applied to [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) with big number of messages. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6730).
* BUGFIX: fix Windows build, which has been broken in [v0.29.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.29.0-victorialogs). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6973).
* BUGFIX: properly return logs from [`/select/logsql/tail` endpoint](https://docs.victoriametrics.com/victorialogs/querying/#live-tailing) if the query contains [`_time:some_duration` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter) like `_time:5m`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7028). The bug has been introduced in [v0.29.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.29.0-victorialogs).
* BUGFIX: properly return logs without [`_msg`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) field when `*` query is passed to [`/select/logsql/query` endpoint](https://docs.victoriametrics.com/victorialogs/querying/#querying-logs) together with positive `limit` arg. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6785). Thanks to @jiekun for identifying the root cause of the issue.
* BUGFIX: support [ingesting logs](https://docs.victoriametrics.com/victorialogs/data-ingestion/) with `_time` field containing whitespace delimiter between the date and time instead of `T` delimiter. For example, `2024-09-20 10:20:30`. This is valid [ISO8601 format](https://en.wikipedia.org/wiki/ISO_8601) aka `SQL datetime` format, which sometimes is used in production. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6721).
* BUGFIX: return all the requested surrounding logs for [`stream_context` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stream_context-pipe). Previously only logs matching the [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter) were returned. This is needed for [this feature](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7063).
## [v0.29.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.29.0-victorialogs)

View file

@ -0,0 +1,47 @@
package contextutil
import (
"context"
"time"
)
// NewStopChanContext returns new context for the given stopCh, together with cancel function.
//
// The returned context is canceled on the following events:
//
// - when stopCh is closed
// - when the returned CancelFunc is called
//
// The caller must call the returned CancelFunc when the context is no longer needed.
func NewStopChanContext(stopCh <-chan struct{}) (context.Context, context.CancelFunc) {
ctx := &stopChanContext{
stopCh: stopCh,
}
return context.WithCancel(ctx)
}
// stopChanContext implements context.Context for stopCh passed to newStopChanContext.
type stopChanContext struct {
stopCh <-chan struct{}
}
func (ctx *stopChanContext) Deadline() (time.Time, bool) {
return time.Time{}, false
}
func (ctx *stopChanContext) Done() <-chan struct{} {
return ctx.stopCh
}
func (ctx *stopChanContext) Err() error {
select {
case <-ctx.stopCh:
return context.Canceled
default:
return nil
}
}
func (ctx *stopChanContext) Value(key any) any {
return nil
}

View file

@ -1,15 +1,17 @@
package logstorage
import (
"context"
"container/heap"
"fmt"
"math"
"slices"
"sort"
"strings"
"sync"
"sync/atomic"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/contextutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
)
@ -97,39 +99,64 @@ type pipeStreamContextProcessor struct {
cancel func()
ppNext pipeProcessor
shards []pipeStreamContextProcessorShard
s *Storage
neededColumnNames []string
unneededColumnNames []string
getStreamRows func(streamID string, stateSizeBudget int) ([]streamContextRow, error)
shards []pipeStreamContextProcessorShard
maxStateSize int64
stateSizeBudget atomic.Int64
}
func (pcp *pipeStreamContextProcessor) init(ctx context.Context, s *Storage, minTimestamp, maxTimestamp int64) {
pcp.getStreamRows = func(streamID string, stateSizeBudget int) ([]streamContextRow, error) {
return getStreamRows(ctx, s, streamID, minTimestamp, maxTimestamp, stateSizeBudget)
}
func (pcp *pipeStreamContextProcessor) init(s *Storage, neededColumnNames, unneededColumnNames []string) {
pcp.s = s
pcp.neededColumnNames = neededColumnNames
pcp.unneededColumnNames = unneededColumnNames
}
func getStreamRows(ctx context.Context, s *Storage, streamID string, minTimestamp, maxTimestamp int64, stateSizeBudget int) ([]streamContextRow, error) {
func (pcp *pipeStreamContextProcessor) getStreamRowss(streamID string, neededRows []streamContextRow, stateSizeBudget int) ([][]*streamContextRow, error) {
tenantID, ok := getTenantIDFromStreamIDString(streamID)
if !ok {
logger.Panicf("BUG: cannot obtain tenantID from streamID %q", streamID)
}
// construct the query for selecting all the rows for the given streamID
qStr := "_stream_id:" + streamID
if slices.Contains(pcp.neededColumnNames, "*") {
if len(pcp.unneededColumnNames) > 0 {
qStr += " | delete " + fieldNamesString(pcp.unneededColumnNames)
}
} else {
if len(pcp.neededColumnNames) > 0 {
qStr += " | fields " + fieldNamesString(pcp.neededColumnNames)
}
}
q, err := ParseQuery(qStr)
if err != nil {
logger.Panicf("BUG: cannot parse query [%s]: %s", qStr, err)
}
q.AddTimeFilter(minTimestamp, maxTimestamp)
ctxWithCancel, cancel := context.WithCancel(ctx)
// mu protects contextRows and stateSize inside writeBlock callback.
var mu sync.Mutex
contextRows := make([]streamContextRows, len(neededRows))
for i := range neededRows {
contextRows[i] = streamContextRows{
neededTimestamp: neededRows[i].timestamp,
linesBefore: pcp.pc.linesBefore,
linesAfter: pcp.pc.linesAfter,
}
}
sort.Slice(contextRows, func(i, j int) bool {
return contextRows[i].neededTimestamp < contextRows[j].neededTimestamp
})
stateSize := 0
ctxWithCancel, cancel := contextutil.NewStopChanContext(pcp.stopCh)
defer cancel()
var mu sync.Mutex
var rows []streamContextRow
stateSize := 0
writeBlock := func(_ uint, br *blockResult) {
mu.Lock()
defer mu.Unlock()
@ -138,38 +165,148 @@ func getStreamRows(ctx context.Context, s *Storage, streamID string, minTimestam
cancel()
}
cs := br.getColumns()
timestamps := br.getTimestamps()
for i, timestamp := range timestamps {
fields := make([]Field, len(cs))
stateSize += int(unsafe.Sizeof(fields[0])) * len(fields)
for j, c := range cs {
v := c.getValueAtRow(br, i)
fields[j] = Field{
Name: strings.Clone(c.name),
Value: strings.Clone(v),
if needStop(pcp.stopCh) {
break
}
for j := range contextRows {
if j > 0 && timestamp <= contextRows[j-1].neededTimestamp {
continue
}
stateSize += len(c.name) + len(v)
if j+1 < len(contextRows) && timestamp >= contextRows[j+1].neededTimestamp {
continue
}
stateSize += contextRows[j].update(br, i, timestamp)
}
row := streamContextRow{
timestamp: timestamp,
fields: fields,
}
stateSize += int(unsafe.Sizeof(row))
rows = append(rows, row)
}
}
if err := s.runQuery(ctxWithCancel, []TenantID{tenantID}, q, writeBlock); err != nil {
if err := pcp.s.runQuery(ctxWithCancel, []TenantID{tenantID}, q, writeBlock); err != nil {
return nil, err
}
if stateSize > stateSizeBudget {
return nil, fmt.Errorf("more than %dMB of memory is needed for query [%s]", stateSizeBudget/(1<<20), q)
return nil, fmt.Errorf("more than %dMB of memory is needed for fetching the surrounding logs for %d matching logs", stateSizeBudget/(1<<20), len(neededRows))
}
return rows, nil
// return sorted results from contextRows
rowss := make([][]*streamContextRow, len(contextRows))
for i, ctx := range contextRows {
rowss[i] = ctx.getSortedRows()
}
rowss = deduplicateStreamRowss(rowss)
return rowss, nil
}
func deduplicateStreamRowss(streamRowss [][]*streamContextRow) [][]*streamContextRow {
var lastSeenRow *streamContextRow
for _, streamRows := range streamRowss {
if len(streamRows) > 0 {
lastSeenRow = streamRows[len(streamRows)-1]
break
}
}
if lastSeenRow == nil {
return nil
}
resultRowss := streamRowss[:1]
for _, streamRows := range streamRowss[1:] {
i := 0
for i < len(streamRows) && !lastSeenRow.less(streamRows[i]) {
i++
}
streamRows = streamRows[i:]
if len(streamRows) == 0 {
continue
}
resultRowss = append(resultRowss, streamRows)
lastSeenRow = streamRows[len(streamRows)-1]
}
return resultRowss
}
type streamContextRows struct {
neededTimestamp int64
linesBefore int
linesAfter int
rowsBefore streamContextRowsHeapMin
rowsAfter streamContextRowsHeapMax
rowsMatched []*streamContextRow
}
func (ctx *streamContextRows) getSortedRows() []*streamContextRow {
var rows []*streamContextRow
rows = append(rows, ctx.rowsBefore...)
rows = append(rows, ctx.rowsMatched...)
rows = append(rows, ctx.rowsAfter...)
sort.Slice(rows, func(i, j int) bool {
return rows[i].less(rows[j])
})
return rows
}
func (ctx *streamContextRows) update(br *blockResult, rowIdx int, rowTimestamp int64) int {
if rowTimestamp < ctx.neededTimestamp {
if ctx.linesBefore <= 0 {
return 0
}
if len(ctx.rowsBefore) < ctx.linesBefore {
r := ctx.copyRowAtIdx(br, rowIdx, rowTimestamp)
heap.Push(&ctx.rowsBefore, r)
return r.sizeBytes()
}
if rowTimestamp <= ctx.rowsBefore[0].timestamp {
return 0
}
r := ctx.copyRowAtIdx(br, rowIdx, rowTimestamp)
stateSizeChange := r.sizeBytes() - ctx.rowsBefore[0].sizeBytes()
ctx.rowsBefore[0] = r
heap.Fix(&ctx.rowsBefore, 0)
return stateSizeChange
}
if rowTimestamp > ctx.neededTimestamp {
if ctx.linesAfter <= 0 {
return 0
}
if len(ctx.rowsAfter) < ctx.linesAfter {
r := ctx.copyRowAtIdx(br, rowIdx, rowTimestamp)
heap.Push(&ctx.rowsAfter, r)
return r.sizeBytes()
}
if rowTimestamp >= ctx.rowsAfter[0].timestamp {
return 0
}
r := ctx.copyRowAtIdx(br, rowIdx, rowTimestamp)
stateSizeChange := r.sizeBytes() - ctx.rowsAfter[0].sizeBytes()
ctx.rowsAfter[0] = r
heap.Fix(&ctx.rowsAfter, 0)
return stateSizeChange
}
// rowTimestamp == ctx.neededTimestamp
r := ctx.copyRowAtIdx(br, rowIdx, rowTimestamp)
ctx.rowsMatched = append(ctx.rowsMatched, r)
return r.sizeBytes()
}
func (ctx *streamContextRows) copyRowAtIdx(br *blockResult, rowIdx int, rowTimestamp int64) *streamContextRow {
cs := br.getColumns()
fields := make([]Field, len(cs))
for i, c := range cs {
v := c.getValueAtRow(br, rowIdx)
fields[i] = Field{
Name: strings.Clone(c.name),
Value: strings.Clone(v),
}
}
return &streamContextRow{
timestamp: rowTimestamp,
fields: fields,
}
}
func getTenantIDFromStreamIDString(s string) (TenantID, bool) {
@ -192,6 +329,44 @@ type streamContextRow struct {
fields []Field
}
func (r *streamContextRow) sizeBytes() int {
n := 0
fields := r.fields
for _, f := range fields {
n += len(f.Name) + len(f.Value) + int(unsafe.Sizeof(f))
}
n += int(unsafe.Sizeof(*r) + unsafe.Sizeof(r))
return n
}
func (r *streamContextRow) less(other *streamContextRow) bool {
// compare timestamps at first
if r.timestamp != other.timestamp {
return r.timestamp < other.timestamp
}
// compare fields then
i := 0
aFields := r.fields
bFields := other.fields
for i < len(aFields) && i < len(bFields) {
af := &aFields[i]
bf := &bFields[i]
if af.Name != bf.Name {
return af.Name < bf.Name
}
if af.Value != bf.Value {
return af.Value < bf.Value
}
i++
}
if len(aFields) != len(bFields) {
return len(aFields) < len(bFields)
}
return false
}
type pipeStreamContextProcessorShardNopad struct {
// pc points to the parent pipeStreamContext.
pc *pipeStreamContext
@ -320,15 +495,24 @@ func (pcp *pipeStreamContextProcessor) flush() error {
}
for streamID, rows := range m {
streamRows, err := pcp.getStreamRows(streamID, stateSizeBudget)
streamRowss, err := pcp.getStreamRowss(streamID, rows, stateSizeBudget)
if err != nil {
return fmt.Errorf("cannot read rows for _stream_id=%q: %w", streamID, err)
return err
}
if needStop(pcp.stopCh) {
return nil
}
if err := wctx.writeStreamContextRows(streamID, streamRows, rows, pcp.pc.linesBefore, pcp.pc.linesAfter); err != nil {
return fmt.Errorf("cannot obtain context rows for _stream_id=%q: %w", streamID, err)
// Write streamRows to the output.
for _, streamRows := range streamRowss {
for _, streamRow := range streamRows {
wctx.writeRow(streamRow.fields)
}
if len(streamRowss) > 1 {
lastRow := streamRows[len(streamRows)-1]
fields := newDelimiterRowFields(lastRow, streamID)
wctx.writeRow(fields)
}
}
}
@ -337,103 +521,25 @@ func (pcp *pipeStreamContextProcessor) flush() error {
return nil
}
func (wctx *pipeStreamContextWriteContext) writeStreamContextRows(streamID string, streamRows, rows []streamContextRow, linesBefore, linesAfter int) error {
sortStreamContextRows(streamRows)
sortStreamContextRows(rows)
idxNext := 0
for i := range rows {
r := &rows[i]
idx := getStreamContextRowIdx(streamRows, r)
if idx < 0 {
// This error may happen when streamRows became out of sync with rows.
// For example, when some streamRows were deleted after obtaining rows.
return fmt.Errorf("missing row for timestamp=%d; len(streamRows)=%d, len(rows)=%d; re-execute the query", r.timestamp, len(streamRows), len(rows))
}
idxStart := idx - linesBefore
if idxStart < idxNext {
idxStart = idxNext
} else if idxNext > 0 && idxStart > idxNext {
// Write delimiter row between multiple contexts in the same stream.
// This simplifies investigation of the returned logs.
fields := []Field{
{
Name: "_time",
Value: string(marshalTimestampRFC3339NanoString(nil, r.timestamp+1)),
},
{
Name: "_stream_id",
Value: streamID,
},
{
Name: "_stream",
Value: getFieldValue(r.fields, "_stream"),
},
{
Name: "_msg",
Value: "---",
},
}
wctx.writeRow(fields)
}
for idxStart < idx {
wctx.writeRow(streamRows[idxStart].fields)
idxStart++
}
if idx >= idxNext {
wctx.writeRow(streamRows[idx].fields)
idxNext = idx + 1
}
idxEnd := idx + 1 + linesAfter
for idxNext < idxEnd && idxNext < len(streamRows) {
wctx.writeRow(streamRows[idxNext].fields)
idxNext++
}
if idxNext >= len(streamRows) {
break
}
func newDelimiterRowFields(r *streamContextRow, streamID string) []Field {
return []Field{
{
Name: "_time",
Value: string(marshalTimestampRFC3339NanoString(nil, r.timestamp+1)),
},
{
Name: "_stream_id",
Value: streamID,
},
{
Name: "_stream",
Value: getFieldValue(r.fields, "_stream"),
},
{
Name: "_msg",
Value: "---",
},
}
return nil
}
func getStreamContextRowIdx(rows []streamContextRow, r *streamContextRow) int {
n := sort.Search(len(rows), func(i int) bool {
return rows[i].timestamp >= r.timestamp
})
if n == len(rows) {
return -1
}
equalFields := func(fields []Field) bool {
for _, f := range r.fields {
if f.Value != getFieldValue(fields, f.Name) {
return false
}
}
return true
}
for rows[n].timestamp == r.timestamp && !equalFields(rows[n].fields) {
n++
if n >= len(rows) {
return -1
}
}
if rows[n].timestamp != r.timestamp {
return -1
}
return n
}
func sortStreamContextRows(rows []streamContextRow) {
sort.Slice(rows, func(i, j int) bool {
return rows[i].timestamp < rows[j].timestamp
})
}
type pipeStreamContextWriteContext struct {
@ -554,3 +660,53 @@ func parsePipeStreamContextBeforeAfter(lex *lexer) (int, int, error) {
}
}
}
type streamContextRowsHeapMax []*streamContextRow
func (h *streamContextRowsHeapMax) Len() int {
return len(*h)
}
func (h *streamContextRowsHeapMax) Less(i, j int) bool {
a := *h
return a[i].timestamp > a[j].timestamp
}
func (h *streamContextRowsHeapMax) Swap(i, j int) {
a := *h
a[i], a[j] = a[j], a[i]
}
func (h *streamContextRowsHeapMax) Push(v any) {
x := v.(*streamContextRow)
*h = append(*h, x)
}
func (h *streamContextRowsHeapMax) Pop() any {
a := *h
x := a[len(a)-1]
a[len(a)-1] = nil
*h = a[:len(a)-1]
return x
}
type streamContextRowsHeapMin streamContextRowsHeapMax
func (h *streamContextRowsHeapMin) Len() int {
return len(*h)
}
func (h *streamContextRowsHeapMin) Less(i, j int) bool {
a := *h
return a[i].timestamp < a[j].timestamp
}
func (h *streamContextRowsHeapMin) Swap(i, j int) {
a := *h
a[i], a[j] = a[j], a[i]
}
func (h *streamContextRowsHeapMin) Push(v any) {
x := v.(*streamContextRow)
*h = append(*h, x)
}
func (h *streamContextRowsHeapMin) Pop() any {
a := *h
x := a[len(a)-1]
a[len(a)-1] = nil
*h = a[:len(a)-1]
return x
}

View file

@ -148,7 +148,7 @@ func (s *Storage) runQuery(ctx context.Context, tenantIDs []TenantID, q *Query,
pcp, ok := pp.(*pipeStreamContextProcessor)
if ok {
pcp.init(ctx, s, minTimestamp, maxTimestamp)
pcp.init(s, neededColumnNames, unneededColumnNames)
if i > 0 {
errPipe = fmt.Errorf("[%s] pipe must go after [%s] filter; now it goes after the [%s] pipe", p, q.f, q.pipes[i-1])
}

View file

@ -707,7 +707,7 @@ func TestStorageRunQuery(t *testing.T) {
| stream_context before 1000
| stats count() rows`, [][]Field{
{
{"rows", "825"},
{"rows", "990"},
},
})
})
@ -716,7 +716,7 @@ func TestStorageRunQuery(t *testing.T) {
| stream_context after 1000
| stats count() rows`, [][]Field{
{
{"rows", "495"},
{"rows", "660"},
},
})
})
@ -725,7 +725,7 @@ func TestStorageRunQuery(t *testing.T) {
| stream_context before 1000 after 1000
| stats count() rows`, [][]Field{
{
{"rows", "1155"},
{"rows", "1320"},
},
})
})