VictoriaMetrics/lib/logstorage/pipe_stream_context.go
Aliaksandr Valialkin 66b2987f49
lib/logstorage: optimize query imeediately after its parsing
This eliminates possible bugs related to forgotten Query.Optimize() calls.

This also allows removing optimize() function from pipe interface.

While at it, drop filterNoop inside filterAnd.
2024-11-08 16:43:54 +01:00

773 lines
18 KiB
Go

package logstorage
import (
"container/heap"
"fmt"
"math"
"slices"
"sort"
"strings"
"sync"
"sync/atomic"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/contextutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
)
// pipeStreamContext processes '| stream_context ...' queries.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#stream_context-pipe
type pipeStreamContext struct {
// linesBefore is the number of lines to return before the matching line
linesBefore int
// linesAfter is the number of lines to return after the matching line
linesAfter int
}
func (pc *pipeStreamContext) String() string {
s := "stream_context"
if pc.linesBefore > 0 {
s += fmt.Sprintf(" before %d", pc.linesBefore)
}
if pc.linesAfter > 0 {
s += fmt.Sprintf(" after %d", pc.linesAfter)
}
if pc.linesBefore <= 0 && pc.linesAfter <= 0 {
s += " after 0"
}
return s
}
func (pc *pipeStreamContext) canLiveTail() bool {
return false
}
var neededFieldsForStreamContext = []string{
"_time",
"_stream_id",
}
func (pc *pipeStreamContext) updateNeededFields(neededFields, unneededFields fieldsSet) {
neededFields.addFields(neededFieldsForStreamContext)
unneededFields.removeFields(neededFieldsForStreamContext)
}
func (pc *pipeStreamContext) hasFilterInWithQuery() bool {
return false
}
func (pc *pipeStreamContext) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) {
return pc, nil
}
func (pc *pipeStreamContext) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor {
maxStateSize := int64(float64(memory.Allowed()) * 0.2)
shards := make([]pipeStreamContextProcessorShard, workersCount)
for i := range shards {
shards[i] = pipeStreamContextProcessorShard{
pipeStreamContextProcessorShardNopad: pipeStreamContextProcessorShardNopad{
pc: pc,
},
}
}
pcp := &pipeStreamContextProcessor{
pc: pc,
stopCh: stopCh,
cancel: cancel,
ppNext: ppNext,
shards: shards,
maxStateSize: maxStateSize,
}
pcp.stateSizeBudget.Store(maxStateSize)
return pcp
}
type pipeStreamContextProcessor struct {
pc *pipeStreamContext
stopCh <-chan struct{}
cancel func()
ppNext pipeProcessor
s *Storage
neededColumnNames []string
unneededColumnNames []string
shards []pipeStreamContextProcessorShard
maxStateSize int64
stateSizeBudget atomic.Int64
}
func (pcp *pipeStreamContextProcessor) init(s *Storage, neededColumnNames, unneededColumnNames []string) {
pcp.s = s
pcp.neededColumnNames = neededColumnNames
pcp.unneededColumnNames = unneededColumnNames
}
func (pcp *pipeStreamContextProcessor) getStreamRowss(streamID string, neededRows []streamContextRow, stateSizeBudget int) ([][]*streamContextRow, error) {
tenantID, ok := getTenantIDFromStreamIDString(streamID)
if !ok {
logger.Panicf("BUG: cannot obtain tenantID from streamID %q", streamID)
}
// construct the query for selecting all the rows for the given streamID
qStr := "_stream_id:" + streamID
if slices.Contains(pcp.neededColumnNames, "*") {
if len(pcp.unneededColumnNames) > 0 {
qStr += " | delete " + fieldNamesString(pcp.unneededColumnNames)
}
} else {
if len(pcp.neededColumnNames) > 0 {
qStr += " | fields " + fieldNamesString(pcp.neededColumnNames)
}
}
q, err := ParseQuery(qStr)
if err != nil {
logger.Panicf("BUG: cannot parse query [%s]: %s", qStr, err)
}
// mu protects contextRows and stateSize inside writeBlock callback.
var mu sync.Mutex
contextRows := make([]streamContextRows, len(neededRows))
for i := range neededRows {
contextRows[i] = streamContextRows{
neededTimestamp: neededRows[i].timestamp,
linesBefore: pcp.pc.linesBefore,
linesAfter: pcp.pc.linesAfter,
}
}
sort.Slice(contextRows, func(i, j int) bool {
return contextRows[i].neededTimestamp < contextRows[j].neededTimestamp
})
stateSize := 0
ctxWithCancel, cancel := contextutil.NewStopChanContext(pcp.stopCh)
defer cancel()
writeBlock := func(_ uint, br *blockResult) {
mu.Lock()
defer mu.Unlock()
if stateSize > stateSizeBudget {
cancel()
return
}
for i := range contextRows {
if needStop(pcp.stopCh) {
break
}
if !contextRows[i].canUpdate(br) {
// Fast path - skip reading block timestamps for the given ctx.
continue
}
timestamps := br.getTimestamps()
for j, timestamp := range timestamps {
if i > 0 && timestamp <= contextRows[i-1].neededTimestamp {
continue
}
if i+1 < len(contextRows) && timestamp >= contextRows[i+1].neededTimestamp {
continue
}
stateSize += contextRows[i].update(br, j, timestamp)
}
}
}
if err := pcp.s.runQuery(ctxWithCancel, []TenantID{tenantID}, q, writeBlock); err != nil {
return nil, err
}
if stateSize > stateSizeBudget {
return nil, fmt.Errorf("more than %dMB of memory is needed for fetching the surrounding logs for %d matching logs", stateSizeBudget/(1<<20), len(neededRows))
}
// return sorted results from contextRows
rowss := make([][]*streamContextRow, len(contextRows))
for i, ctx := range contextRows {
rowss[i] = ctx.getSortedRows()
}
rowss = deduplicateStreamRowss(rowss)
return rowss, nil
}
func deduplicateStreamRowss(streamRowss [][]*streamContextRow) [][]*streamContextRow {
var lastSeenRow *streamContextRow
for _, streamRows := range streamRowss {
if len(streamRows) > 0 {
lastSeenRow = streamRows[len(streamRows)-1]
break
}
}
if lastSeenRow == nil {
return nil
}
resultRowss := streamRowss[:1]
for _, streamRows := range streamRowss[1:] {
i := 0
for i < len(streamRows) && !lastSeenRow.less(streamRows[i]) {
i++
}
streamRows = streamRows[i:]
if len(streamRows) == 0 {
continue
}
resultRowss = append(resultRowss, streamRows)
lastSeenRow = streamRows[len(streamRows)-1]
}
return resultRowss
}
type streamContextRows struct {
neededTimestamp int64
linesBefore int
linesAfter int
rowsBefore streamContextRowsHeapMin
rowsAfter streamContextRowsHeapMax
rowsMatched []*streamContextRow
}
func (ctx *streamContextRows) getSortedRows() []*streamContextRow {
var rows []*streamContextRow
rows = append(rows, ctx.rowsBefore...)
rows = append(rows, ctx.rowsMatched...)
rows = append(rows, ctx.rowsAfter...)
sort.Slice(rows, func(i, j int) bool {
return rows[i].less(rows[j])
})
return rows
}
func (ctx *streamContextRows) canUpdate(br *blockResult) bool {
if ctx.linesBefore > 0 {
if len(ctx.rowsBefore) < ctx.linesBefore {
return true
}
minTimestamp := ctx.rowsBefore[0].timestamp - 1
maxTimestamp := ctx.neededTimestamp
if br.intersectsTimeRange(minTimestamp, maxTimestamp) {
return true
}
}
if ctx.linesAfter > 0 {
if len(ctx.rowsAfter) < ctx.linesAfter {
return true
}
minTimestamp := ctx.neededTimestamp
maxTimestamp := ctx.rowsAfter[0].timestamp + 1
if br.intersectsTimeRange(minTimestamp, maxTimestamp) {
return true
}
}
if ctx.linesBefore <= 0 && ctx.linesAfter <= 0 {
if len(ctx.rowsMatched) == 0 {
return true
}
timestamp := ctx.rowsMatched[0].timestamp
if br.intersectsTimeRange(timestamp-1, timestamp+1) {
return true
}
}
return false
}
func (ctx *streamContextRows) update(br *blockResult, rowIdx int, rowTimestamp int64) int {
if rowTimestamp < ctx.neededTimestamp {
if ctx.linesBefore <= 0 {
return 0
}
if len(ctx.rowsBefore) < ctx.linesBefore {
r := ctx.copyRowAtIdx(br, rowIdx, rowTimestamp)
heap.Push(&ctx.rowsBefore, r)
return r.sizeBytes()
}
if rowTimestamp <= ctx.rowsBefore[0].timestamp {
return 0
}
r := ctx.copyRowAtIdx(br, rowIdx, rowTimestamp)
stateSizeChange := r.sizeBytes() - ctx.rowsBefore[0].sizeBytes()
ctx.rowsBefore[0] = r
heap.Fix(&ctx.rowsBefore, 0)
return stateSizeChange
}
if rowTimestamp > ctx.neededTimestamp {
if ctx.linesAfter <= 0 {
return 0
}
if len(ctx.rowsAfter) < ctx.linesAfter {
r := ctx.copyRowAtIdx(br, rowIdx, rowTimestamp)
heap.Push(&ctx.rowsAfter, r)
return r.sizeBytes()
}
if rowTimestamp >= ctx.rowsAfter[0].timestamp {
return 0
}
r := ctx.copyRowAtIdx(br, rowIdx, rowTimestamp)
stateSizeChange := r.sizeBytes() - ctx.rowsAfter[0].sizeBytes()
ctx.rowsAfter[0] = r
heap.Fix(&ctx.rowsAfter, 0)
return stateSizeChange
}
// rowTimestamp == ctx.neededTimestamp
r := ctx.copyRowAtIdx(br, rowIdx, rowTimestamp)
ctx.rowsMatched = append(ctx.rowsMatched, r)
return r.sizeBytes()
}
func (ctx *streamContextRows) copyRowAtIdx(br *blockResult, rowIdx int, rowTimestamp int64) *streamContextRow {
cs := br.getColumns()
fields := make([]Field, len(cs))
for i, c := range cs {
v := c.getValueAtRow(br, rowIdx)
fields[i] = Field{
Name: strings.Clone(c.name),
Value: strings.Clone(v),
}
}
return &streamContextRow{
timestamp: rowTimestamp,
fields: fields,
}
}
func getTenantIDFromStreamIDString(s string) (TenantID, bool) {
var sid streamID
if !sid.tryUnmarshalFromString(s) {
return TenantID{}, false
}
return sid.tenantID, true
}
type pipeStreamContextProcessorShard struct {
pipeStreamContextProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(pipeStreamContextProcessorShardNopad{})%128]byte
}
type streamContextRow struct {
timestamp int64
fields []Field
}
func (r *streamContextRow) sizeBytes() int {
n := 0
fields := r.fields
for _, f := range fields {
n += len(f.Name) + len(f.Value) + int(unsafe.Sizeof(f))
}
n += int(unsafe.Sizeof(*r) + unsafe.Sizeof(r))
return n
}
func (r *streamContextRow) less(other *streamContextRow) bool {
// compare timestamps at first
if r.timestamp != other.timestamp {
return r.timestamp < other.timestamp
}
// compare fields then
i := 0
aFields := r.fields
bFields := other.fields
for i < len(aFields) && i < len(bFields) {
af := &aFields[i]
bf := &bFields[i]
if af.Name != bf.Name {
return af.Name < bf.Name
}
if af.Value != bf.Value {
return af.Value < bf.Value
}
i++
}
if len(aFields) != len(bFields) {
return len(aFields) < len(bFields)
}
return false
}
type pipeStreamContextProcessorShardNopad struct {
// pc points to the parent pipeStreamContext.
pc *pipeStreamContext
// m holds per-stream matching rows
m map[string][]streamContextRow
// stateSizeBudget is the remaining budget for the whole state size for the shard.
// The per-shard budget is provided in chunks from the parent pipeStreamContextProcessor.
stateSizeBudget int
}
// writeBlock writes br to shard.
func (shard *pipeStreamContextProcessorShard) writeBlock(br *blockResult) {
m := shard.getM()
cs := br.getColumns()
cStreamID := br.getColumnByName("_stream_id")
stateSize := 0
timestamps := br.getTimestamps()
for i, timestamp := range timestamps {
fields := make([]Field, len(cs))
stateSize += int(unsafe.Sizeof(fields[0])) * len(fields)
for j, c := range cs {
v := c.getValueAtRow(br, i)
fields[j] = Field{
Name: strings.Clone(c.name),
Value: strings.Clone(v),
}
stateSize += len(c.name) + len(v)
}
row := streamContextRow{
timestamp: timestamp,
fields: fields,
}
stateSize += int(unsafe.Sizeof(row))
streamID := cStreamID.getValueAtRow(br, i)
rows, ok := m[streamID]
if !ok {
stateSize += len(streamID)
}
rows = append(rows, row)
streamID = strings.Clone(streamID)
m[streamID] = rows
}
shard.stateSizeBudget -= stateSize
}
func (shard *pipeStreamContextProcessorShard) getM() map[string][]streamContextRow {
if shard.m == nil {
shard.m = make(map[string][]streamContextRow)
}
return shard.m
}
func (pcp *pipeStreamContextProcessor) writeBlock(workerID uint, br *blockResult) {
if br.rowsLen == 0 {
return
}
shard := &pcp.shards[workerID]
for shard.stateSizeBudget < 0 {
// steal some budget for the state size from the global budget.
remaining := pcp.stateSizeBudget.Add(-stateSizeBudgetChunk)
if remaining < 0 {
// The state size is too big. Stop processing data in order to avoid OOM crash.
if remaining+stateSizeBudgetChunk >= 0 {
// Notify worker goroutines to stop calling writeBlock() in order to save CPU time.
pcp.cancel()
}
return
}
shard.stateSizeBudget += stateSizeBudgetChunk
}
shard.writeBlock(br)
}
func (pcp *pipeStreamContextProcessor) flush() error {
n := pcp.stateSizeBudget.Load()
if n <= 0 {
return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", pcp.pc.String(), pcp.maxStateSize/(1<<20))
}
if n > math.MaxInt {
logger.Panicf("BUG: stateSizeBudget shouldn't exceed math.MaxInt=%v; got %d", math.MaxInt, n)
}
stateSizeBudget := int(n)
// merge state across shards
shards := pcp.shards
m := shards[0].getM()
shards = shards[1:]
for i := range shards {
if needStop(pcp.stopCh) {
return nil
}
for streamID, rowsSrc := range shards[i].getM() {
rows, ok := m[streamID]
if !ok {
m[streamID] = rowsSrc
} else {
m[streamID] = append(rows, rowsSrc...)
}
}
}
// write result
wctx := &pipeStreamContextWriteContext{
pcp: pcp,
}
// write output contexts in the ascending order of rows
streamIDs := getStreamIDsSortedByMinRowTimestamp(m)
for _, streamID := range streamIDs {
rows := m[streamID]
streamRowss, err := pcp.getStreamRowss(streamID, rows, stateSizeBudget)
if err != nil {
return err
}
if needStop(pcp.stopCh) {
return nil
}
// Write streamRows to the output.
for _, streamRows := range streamRowss {
for _, streamRow := range streamRows {
wctx.writeRow(streamRow.fields)
}
if len(m) > 1 || len(streamRowss) > 1 {
lastRow := streamRows[len(streamRows)-1]
fields := newDelimiterRowFields(lastRow, streamID)
wctx.writeRow(fields)
}
}
}
wctx.flush()
return nil
}
func getStreamIDsSortedByMinRowTimestamp(m map[string][]streamContextRow) []string {
type streamTimestamp struct {
streamID string
timestamp int64
}
streamTimestamps := make([]streamTimestamp, 0, len(m))
for streamID, rows := range m {
minTimestamp := rows[0].timestamp
for _, r := range rows[1:] {
if r.timestamp < minTimestamp {
minTimestamp = r.timestamp
}
}
streamTimestamps = append(streamTimestamps, streamTimestamp{
streamID: streamID,
timestamp: minTimestamp,
})
}
sort.Slice(streamTimestamps, func(i, j int) bool {
return streamTimestamps[i].timestamp < streamTimestamps[j].timestamp
})
streamIDs := make([]string, len(streamTimestamps))
for i := range streamIDs {
streamIDs[i] = streamTimestamps[i].streamID
}
return streamIDs
}
func newDelimiterRowFields(r *streamContextRow, streamID string) []Field {
return []Field{
{
Name: "_time",
Value: string(marshalTimestampRFC3339NanoString(nil, r.timestamp+1)),
},
{
Name: "_stream_id",
Value: streamID,
},
{
Name: "_stream",
Value: getFieldValue(r.fields, "_stream"),
},
{
Name: "_msg",
Value: "---",
},
}
}
type pipeStreamContextWriteContext struct {
pcp *pipeStreamContextProcessor
rcs []resultColumn
br blockResult
// rowsCount is the number of rows in the current block
rowsCount int
// valuesLen is the total length of values in the current block
valuesLen int
}
func (wctx *pipeStreamContextWriteContext) writeRow(rowFields []Field) {
rcs := wctx.rcs
areEqualColumns := len(rcs) == len(rowFields)
if areEqualColumns {
for i, f := range rowFields {
if rcs[i].name != f.Name {
areEqualColumns = false
break
}
}
}
if !areEqualColumns {
// send the current block to ppNext and construct a block with new set of columns
wctx.flush()
rcs = wctx.rcs[:0]
for _, f := range rowFields {
rcs = appendResultColumnWithName(rcs, f.Name)
}
wctx.rcs = rcs
}
for i, f := range rowFields {
v := f.Value
rcs[i].addValue(v)
wctx.valuesLen += len(v)
}
wctx.rowsCount++
if wctx.valuesLen >= 1_000_000 {
wctx.flush()
}
}
func (wctx *pipeStreamContextWriteContext) flush() {
rcs := wctx.rcs
br := &wctx.br
wctx.valuesLen = 0
// Flush rcs to ppNext
br.setResultColumns(rcs, wctx.rowsCount)
wctx.rowsCount = 0
wctx.pcp.ppNext.writeBlock(0, br)
br.reset()
for i := range rcs {
rcs[i].resetValues()
}
}
func parsePipeStreamContext(lex *lexer) (*pipeStreamContext, error) {
if !lex.isKeyword("stream_context") {
return nil, fmt.Errorf("expecting 'stream_context'; got %q", lex.token)
}
lex.nextToken()
linesBefore, linesAfter, err := parsePipeStreamContextBeforeAfter(lex)
if err != nil {
return nil, err
}
pc := &pipeStreamContext{
linesBefore: linesBefore,
linesAfter: linesAfter,
}
return pc, nil
}
func parsePipeStreamContextBeforeAfter(lex *lexer) (int, int, error) {
linesBefore := 0
linesAfter := 0
beforeSet := false
afterSet := false
for {
switch {
case lex.isKeyword("before"):
lex.nextToken()
f, s, err := parseNumber(lex)
if err != nil {
return 0, 0, fmt.Errorf("cannot parse 'before' value in 'stream_context': %w", err)
}
if f < 0 {
return 0, 0, fmt.Errorf("'before' value cannot be smaller than 0; got %q", s)
}
linesBefore = int(f)
beforeSet = true
case lex.isKeyword("after"):
lex.nextToken()
f, s, err := parseNumber(lex)
if err != nil {
return 0, 0, fmt.Errorf("cannot parse 'after' value in 'stream_context': %w", err)
}
if f < 0 {
return 0, 0, fmt.Errorf("'after' value cannot be smaller than 0; got %q", s)
}
linesAfter = int(f)
afterSet = true
default:
if !beforeSet && !afterSet {
return 0, 0, fmt.Errorf("missing 'before N' or 'after N' in 'stream_context'")
}
return linesBefore, linesAfter, nil
}
}
}
type streamContextRowsHeapMax []*streamContextRow
func (h *streamContextRowsHeapMax) Len() int {
return len(*h)
}
func (h *streamContextRowsHeapMax) Less(i, j int) bool {
a := *h
return a[i].timestamp > a[j].timestamp
}
func (h *streamContextRowsHeapMax) Swap(i, j int) {
a := *h
a[i], a[j] = a[j], a[i]
}
func (h *streamContextRowsHeapMax) Push(v any) {
x := v.(*streamContextRow)
*h = append(*h, x)
}
func (h *streamContextRowsHeapMax) Pop() any {
a := *h
x := a[len(a)-1]
a[len(a)-1] = nil
*h = a[:len(a)-1]
return x
}
type streamContextRowsHeapMin streamContextRowsHeapMax
func (h *streamContextRowsHeapMin) Len() int {
return len(*h)
}
func (h *streamContextRowsHeapMin) Less(i, j int) bool {
a := *h
return a[i].timestamp < a[j].timestamp
}
func (h *streamContextRowsHeapMin) Swap(i, j int) {
a := *h
a[i], a[j] = a[j], a[i]
}
func (h *streamContextRowsHeapMin) Push(v any) {
x := v.(*streamContextRow)
*h = append(*h, x)
}
func (h *streamContextRowsHeapMin) Pop() any {
a := *h
x := a[len(a)-1]
a[len(a)-1] = nil
*h = a[:len(a)-1]
return x
}