VictoriaMetrics/lib/logstorage/pipe_stream_context.go
Aliaksandr Valialkin 4599429f51
lib/logstorage: read timestamps column when it is really needed during query execution
Previously timestamps column was read unconditionally on every query.
This could significantly slow down queries, which do not need reading this column
like in https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7070 .
2024-09-25 19:17:47 +02:00

556 lines
13 KiB
Go

package logstorage
import (
"context"
"fmt"
"math"
"sort"
"strings"
"sync"
"sync/atomic"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
)
// pipeStreamContext processes '| stream_context ...' queries.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#stream_context-pipe
type pipeStreamContext struct {
// linesBefore is the number of lines to return before the matching line
linesBefore int
// linesAfter is the number of lines to return after the matching line
linesAfter int
}
func (pc *pipeStreamContext) String() string {
s := "stream_context"
if pc.linesBefore > 0 {
s += fmt.Sprintf(" before %d", pc.linesBefore)
}
if pc.linesAfter > 0 {
s += fmt.Sprintf(" after %d", pc.linesAfter)
}
return s
}
func (pc *pipeStreamContext) canLiveTail() bool {
return false
}
var neededFieldsForStreamContext = []string{
"_time",
"_stream_id",
}
func (pc *pipeStreamContext) updateNeededFields(neededFields, unneededFields fieldsSet) {
neededFields.addFields(neededFieldsForStreamContext)
unneededFields.removeFields(neededFieldsForStreamContext)
}
func (pc *pipeStreamContext) optimize() {
// nothing to do
}
func (pc *pipeStreamContext) hasFilterInWithQuery() bool {
return false
}
func (pc *pipeStreamContext) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) {
return pc, nil
}
func (pc *pipeStreamContext) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor {
maxStateSize := int64(float64(memory.Allowed()) * 0.2)
shards := make([]pipeStreamContextProcessorShard, workersCount)
for i := range shards {
shards[i] = pipeStreamContextProcessorShard{
pipeStreamContextProcessorShardNopad: pipeStreamContextProcessorShardNopad{
pc: pc,
stateSizeBudget: stateSizeBudgetChunk,
},
}
maxStateSize -= stateSizeBudgetChunk
}
pcp := &pipeStreamContextProcessor{
pc: pc,
stopCh: stopCh,
cancel: cancel,
ppNext: ppNext,
shards: shards,
maxStateSize: maxStateSize,
}
pcp.stateSizeBudget.Store(maxStateSize)
return pcp
}
type pipeStreamContextProcessor struct {
pc *pipeStreamContext
stopCh <-chan struct{}
cancel func()
ppNext pipeProcessor
shards []pipeStreamContextProcessorShard
getStreamRows func(streamID string, stateSizeBudget int) ([]streamContextRow, error)
maxStateSize int64
stateSizeBudget atomic.Int64
}
func (pcp *pipeStreamContextProcessor) init(ctx context.Context, s *Storage, minTimestamp, maxTimestamp int64) {
pcp.getStreamRows = func(streamID string, stateSizeBudget int) ([]streamContextRow, error) {
return getStreamRows(ctx, s, streamID, minTimestamp, maxTimestamp, stateSizeBudget)
}
}
func getStreamRows(ctx context.Context, s *Storage, streamID string, minTimestamp, maxTimestamp int64, stateSizeBudget int) ([]streamContextRow, error) {
tenantID, ok := getTenantIDFromStreamIDString(streamID)
if !ok {
logger.Panicf("BUG: cannot obtain tenantID from streamID %q", streamID)
}
qStr := "_stream_id:" + streamID
q, err := ParseQuery(qStr)
if err != nil {
logger.Panicf("BUG: cannot parse query [%s]: %s", qStr, err)
}
q.AddTimeFilter(minTimestamp, maxTimestamp)
ctxWithCancel, cancel := context.WithCancel(ctx)
defer cancel()
var mu sync.Mutex
var rows []streamContextRow
stateSize := 0
writeBlock := func(_ uint, br *blockResult) {
mu.Lock()
defer mu.Unlock()
if stateSize > stateSizeBudget {
cancel()
}
cs := br.getColumns()
timestamps := br.getTimestamps()
for i, timestamp := range timestamps {
fields := make([]Field, len(cs))
stateSize += int(unsafe.Sizeof(fields[0])) * len(fields)
for j, c := range cs {
v := c.getValueAtRow(br, i)
fields[j] = Field{
Name: strings.Clone(c.name),
Value: strings.Clone(v),
}
stateSize += len(c.name) + len(v)
}
row := streamContextRow{
timestamp: timestamp,
fields: fields,
}
stateSize += int(unsafe.Sizeof(row))
rows = append(rows, row)
}
}
if err := s.runQuery(ctxWithCancel, []TenantID{tenantID}, q, writeBlock); err != nil {
return nil, err
}
if stateSize > stateSizeBudget {
return nil, fmt.Errorf("more than %dMB of memory is needed for query [%s]", stateSizeBudget/(1<<20), q)
}
return rows, nil
}
func getTenantIDFromStreamIDString(s string) (TenantID, bool) {
var sid streamID
if !sid.tryUnmarshalFromString(s) {
return TenantID{}, false
}
return sid.tenantID, true
}
type pipeStreamContextProcessorShard struct {
pipeStreamContextProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(pipeStreamContextProcessorShardNopad{})%128]byte
}
type streamContextRow struct {
timestamp int64
fields []Field
}
type pipeStreamContextProcessorShardNopad struct {
// pc points to the parent pipeStreamContext.
pc *pipeStreamContext
// m holds per-stream matching rows
m map[string][]streamContextRow
// stateSizeBudget is the remaining budget for the whole state size for the shard.
// The per-shard budget is provided in chunks from the parent pipeStreamContextProcessor.
stateSizeBudget int
}
// writeBlock writes br to shard.
func (shard *pipeStreamContextProcessorShard) writeBlock(br *blockResult) {
m := shard.getM()
cs := br.getColumns()
cStreamID := br.getColumnByName("_stream_id")
stateSize := 0
timestamps := br.getTimestamps()
for i, timestamp := range timestamps {
fields := make([]Field, len(cs))
stateSize += int(unsafe.Sizeof(fields[0])) * len(fields)
for j, c := range cs {
v := c.getValueAtRow(br, i)
fields[j] = Field{
Name: strings.Clone(c.name),
Value: strings.Clone(v),
}
stateSize += len(c.name) + len(v)
}
row := streamContextRow{
timestamp: timestamp,
fields: fields,
}
stateSize += int(unsafe.Sizeof(row))
streamID := cStreamID.getValueAtRow(br, i)
rows, ok := m[streamID]
if !ok {
stateSize += len(streamID)
}
rows = append(rows, row)
streamID = strings.Clone(streamID)
m[streamID] = rows
}
shard.stateSizeBudget -= stateSize
}
func (shard *pipeStreamContextProcessorShard) getM() map[string][]streamContextRow {
if shard.m == nil {
shard.m = make(map[string][]streamContextRow)
}
return shard.m
}
func (pcp *pipeStreamContextProcessor) writeBlock(workerID uint, br *blockResult) {
if br.rowsLen == 0 {
return
}
if pcp.pc.linesBefore <= 0 && pcp.pc.linesAfter <= 0 {
// Fast path - there is no need to fetch stream context.
pcp.ppNext.writeBlock(workerID, br)
return
}
shard := &pcp.shards[workerID]
for shard.stateSizeBudget < 0 {
// steal some budget for the state size from the global budget.
remaining := pcp.stateSizeBudget.Add(-stateSizeBudgetChunk)
if remaining < 0 {
// The state size is too big. Stop processing data in order to avoid OOM crash.
if remaining+stateSizeBudgetChunk >= 0 {
// Notify worker goroutines to stop calling writeBlock() in order to save CPU time.
pcp.cancel()
}
return
}
shard.stateSizeBudget += stateSizeBudgetChunk
}
shard.writeBlock(br)
}
func (pcp *pipeStreamContextProcessor) flush() error {
if pcp.pc.linesBefore <= 0 && pcp.pc.linesAfter <= 0 {
// Fast path - nothing to do.
return nil
}
n := pcp.stateSizeBudget.Load()
if n <= 0 {
return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", pcp.pc.String(), pcp.maxStateSize/(1<<20))
}
if n > math.MaxInt {
logger.Panicf("BUG: stateSizeBudget shouldn't exceed math.MaxInt=%v; got %d", math.MaxInt, n)
}
stateSizeBudget := int(n)
// merge state across shards
shards := pcp.shards
m := shards[0].getM()
shards = shards[1:]
for i := range shards {
if needStop(pcp.stopCh) {
return nil
}
for streamID, rowsSrc := range shards[i].getM() {
rows, ok := m[streamID]
if !ok {
m[streamID] = rowsSrc
} else {
m[streamID] = append(rows, rowsSrc...)
}
}
}
// write result
wctx := &pipeStreamContextWriteContext{
pcp: pcp,
}
for streamID, rows := range m {
streamRows, err := pcp.getStreamRows(streamID, stateSizeBudget)
if err != nil {
return fmt.Errorf("cannot read rows for _stream_id=%q: %w", streamID, err)
}
if needStop(pcp.stopCh) {
return nil
}
if err := wctx.writeStreamContextRows(streamID, streamRows, rows, pcp.pc.linesBefore, pcp.pc.linesAfter); err != nil {
return fmt.Errorf("cannot obtain context rows for _stream_id=%q: %w", streamID, err)
}
}
wctx.flush()
return nil
}
func (wctx *pipeStreamContextWriteContext) writeStreamContextRows(streamID string, streamRows, rows []streamContextRow, linesBefore, linesAfter int) error {
sortStreamContextRows(streamRows)
sortStreamContextRows(rows)
idxNext := 0
for i := range rows {
r := &rows[i]
idx := getStreamContextRowIdx(streamRows, r)
if idx < 0 {
// This error may happen when streamRows became out of sync with rows.
// For example, when some streamRows were deleted after obtaining rows.
return fmt.Errorf("missing row for timestamp=%d; len(streamRows)=%d, len(rows)=%d; re-execute the query", r.timestamp, len(streamRows), len(rows))
}
idxStart := idx - linesBefore
if idxStart < idxNext {
idxStart = idxNext
} else if idxNext > 0 && idxStart > idxNext {
// Write delimiter row between multiple contexts in the same stream.
// This simplifies investigation of the returned logs.
fields := []Field{
{
Name: "_time",
Value: string(marshalTimestampRFC3339NanoString(nil, r.timestamp+1)),
},
{
Name: "_stream_id",
Value: streamID,
},
{
Name: "_stream",
Value: getFieldValue(r.fields, "_stream"),
},
{
Name: "_msg",
Value: "---",
},
}
wctx.writeRow(fields)
}
for idxStart < idx {
wctx.writeRow(streamRows[idxStart].fields)
idxStart++
}
if idx >= idxNext {
wctx.writeRow(streamRows[idx].fields)
idxNext = idx + 1
}
idxEnd := idx + 1 + linesAfter
for idxNext < idxEnd && idxNext < len(streamRows) {
wctx.writeRow(streamRows[idxNext].fields)
idxNext++
}
if idxNext >= len(streamRows) {
break
}
}
return nil
}
func getStreamContextRowIdx(rows []streamContextRow, r *streamContextRow) int {
n := sort.Search(len(rows), func(i int) bool {
return rows[i].timestamp >= r.timestamp
})
if n == len(rows) {
return -1
}
equalFields := func(fields []Field) bool {
for _, f := range r.fields {
if f.Value != getFieldValue(fields, f.Name) {
return false
}
}
return true
}
for rows[n].timestamp == r.timestamp && !equalFields(rows[n].fields) {
n++
if n >= len(rows) {
return -1
}
}
if rows[n].timestamp != r.timestamp {
return -1
}
return n
}
func sortStreamContextRows(rows []streamContextRow) {
sort.Slice(rows, func(i, j int) bool {
return rows[i].timestamp < rows[j].timestamp
})
}
type pipeStreamContextWriteContext struct {
pcp *pipeStreamContextProcessor
rcs []resultColumn
br blockResult
// rowsCount is the number of rows in the current block
rowsCount int
// valuesLen is the total length of values in the current block
valuesLen int
}
func (wctx *pipeStreamContextWriteContext) writeRow(rowFields []Field) {
rcs := wctx.rcs
areEqualColumns := len(rcs) == len(rowFields)
if areEqualColumns {
for i, f := range rowFields {
if rcs[i].name != f.Name {
areEqualColumns = false
break
}
}
}
if !areEqualColumns {
// send the current block to ppNext and construct a block with new set of columns
wctx.flush()
rcs = wctx.rcs[:0]
for _, f := range rowFields {
rcs = appendResultColumnWithName(rcs, f.Name)
}
wctx.rcs = rcs
}
for i, f := range rowFields {
v := f.Value
rcs[i].addValue(v)
wctx.valuesLen += len(v)
}
wctx.rowsCount++
if wctx.valuesLen >= 1_000_000 {
wctx.flush()
}
}
func (wctx *pipeStreamContextWriteContext) flush() {
rcs := wctx.rcs
br := &wctx.br
wctx.valuesLen = 0
// Flush rcs to ppNext
br.setResultColumns(rcs, wctx.rowsCount)
wctx.rowsCount = 0
wctx.pcp.ppNext.writeBlock(0, br)
br.reset()
for i := range rcs {
rcs[i].resetValues()
}
}
func parsePipeStreamContext(lex *lexer) (*pipeStreamContext, error) {
if !lex.isKeyword("stream_context") {
return nil, fmt.Errorf("expecting 'stream_context'; got %q", lex.token)
}
lex.nextToken()
linesBefore, linesAfter, err := parsePipeStreamContextBeforeAfter(lex)
if err != nil {
return nil, err
}
pc := &pipeStreamContext{
linesBefore: linesBefore,
linesAfter: linesAfter,
}
return pc, nil
}
func parsePipeStreamContextBeforeAfter(lex *lexer) (int, int, error) {
linesBefore := 0
linesAfter := 0
beforeSet := false
afterSet := false
for {
switch {
case lex.isKeyword("before"):
lex.nextToken()
f, s, err := parseNumber(lex)
if err != nil {
return 0, 0, fmt.Errorf("cannot parse 'before' value in 'stream_context': %w", err)
}
if f < 0 {
return 0, 0, fmt.Errorf("'before' value cannot be smaller than 0; got %q", s)
}
linesBefore = int(f)
beforeSet = true
case lex.isKeyword("after"):
lex.nextToken()
f, s, err := parseNumber(lex)
if err != nil {
return 0, 0, fmt.Errorf("cannot parse 'after' value in 'stream_context': %w", err)
}
if f < 0 {
return 0, 0, fmt.Errorf("'after' value cannot be smaller than 0; got %q", s)
}
linesAfter = int(f)
afterSet = true
default:
if !beforeSet && !afterSet {
return 0, 0, fmt.Errorf("missing 'before N' or 'after N' in 'stream_context'")
}
return linesBefore, linesAfter, nil
}
}
}