From d4ca651547ab42396262419dab4093fa56a615ca Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 28 Jun 2024 19:14:29 +0200 Subject: [PATCH] lib/logstorage: add `stream_context` pipe, which allows selecting surrounding logs for the matching logs --- app/vlselect/logsql/logsql.go | 2 +- docs/VictoriaLogs/CHANGELOG.md | 1 + docs/VictoriaLogs/LogsQL.md | 31 +- docs/VictoriaLogs/README.md | 1 + docs/VictoriaLogs/Roadmap.md | 2 - docs/VictoriaLogs/keyConcepts.md | 2 +- docs/VictoriaLogs/logsql-examples.md | 10 + lib/logstorage/filter_test.go | 6 +- lib/logstorage/parser.go | 21 +- lib/logstorage/pipe.go | 7 + lib/logstorage/pipe_stream_context.go | 511 +++++++++++++++++++++ lib/logstorage/pipe_stream_context_test.go | 116 +++++ lib/logstorage/storage_search.go | 72 +-- lib/logstorage/storage_search_test.go | 151 +++--- 14 files changed, 827 insertions(+), 106 deletions(-) create mode 100644 lib/logstorage/pipe_stream_context.go create mode 100644 lib/logstorage/pipe_stream_context_test.go diff --git a/app/vlselect/logsql/logsql.go b/app/vlselect/logsql/logsql.go index b64c5755e..b4920e3b3 100644 --- a/app/vlselect/logsql/logsql.go +++ b/app/vlselect/logsql/logsql.go @@ -450,7 +450,7 @@ type logRow struct { } func sortLogRows(rows []logRow) { - sort.Slice(rows, func(i, j int) bool { + sort.SliceStable(rows, func(i, j int) bool { return rows[i].timestamp < rows[j].timestamp }) } diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 7b66f16e2..91a2758ff 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +* FEATURE: add ability to select surrounding logs in front and after the selected logs via [`stream_context` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stream_context-pipe). This functionality may be useful for investigating stacktraces panics or some correlated log messages. * FEATURE: add ability to return top `N` `"fields"` groups from [`/select/logsql/hits` HTTP endpoint](https://docs.victoriametrics.com/victorialogs/querying/#querying-hits-stats), by specifying `fields_limit=N` query arg. This query arg is going to be used in [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6545). * BUGFIX: fix `runtime error: index out of range [0] with length 0` panic when empty lines are ingested via [Syslog format](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/) by Cisco controllers. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6548). diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 3a2a5f499..5820e8827 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1305,6 +1305,8 @@ LogsQL supports the following pipes: - [`replace_regexp`](#replace_regexp-pipe) updates [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with regular expressions. - [`sort`](#sort-pipe) sorts logs by the given [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`stats`](#stats-pipe) calculates various stats over the selected logs. +- [`stream_context`](#stream_context-pipe) allows selecting surrounding logs in front and after the matching logs + per each [log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields). - [`top`](#top-pipe) returns top `N` field sets with the maximum number of matching logs. - [`uniq`](#uniq-pipe) returns unique log entires. - [`unpack_json`](#unpack_json-pipe) unpacks JSON messages from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). @@ -2300,6 +2302,30 @@ _time:5m | stats count() total ``` +### stream_context pipe + +`| stream_context ...` [pipe](#pipes) allows selecting surrounding logs for the matching logs in [logs stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields). +For example, the following query returns up to 10 additional logs after every log message with the `panic` [word](#word) across all the logs for the last 5 minutes: + +```logsql +_time:5m panic | stream_context after 10 +``` + +The following query returns up to 5 additional logs in front of eevery log message with the `stacktrace` [word](#word) across all the logs for the last 5 minutes: + +```logsql +_time:5m stacktrace | stream_context before 5 +``` + +The following query returns up to 2 logs in frount of the log message with the `error` [word](#word) and up to 5 logs after this log message +across all the logs for the last 5 minutes: + +```logsql +_time:5m error | stream_context before 2 after 5 +``` + +The `| stream_context` [pipe](#pipes) must go first just after the [filters](#filters). + ### top pipe `| top N by (field1, ..., fieldN)` [pipe](#pipes) returns top `N` sets for `(field1, ..., fieldN)` [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) @@ -3042,10 +3068,7 @@ See also: ## Stream context -LogsQL will support the ability to select the given number of surrounding log lines for the selected log lines -on a [per-stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) basis. - -See the [Roadmap](https://docs.victoriametrics.com/victorialogs/roadmap/) for details. +See [`stream_context` pipe](#stream_context-pipe). ## Transformations diff --git a/docs/VictoriaLogs/README.md b/docs/VictoriaLogs/README.md index b876af621..c3c3f6597 100644 --- a/docs/VictoriaLogs/README.md +++ b/docs/VictoriaLogs/README.md @@ -29,6 +29,7 @@ VictoriaLogs provides the following features: - VictoriaLogs supports multitenancy - see [these docs](#multitenancy). - VictoriaLogs supports out-of-order logs' ingestion aka backfilling. - VictoriaLogs supports live tailing for newly ingested logs. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#live-tailing). +- VictoriaLogs supports selecting surrounding logs in front and after the selected logs. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#stream_context-pipe). - VictoriaLogs provides web UI for querying logs - see [these docs](https://docs.victoriametrics.com/victorialogs/querying/#web-ui). VictoriaLogs is at the Preview stage now. It is ready for evaluation in production and verifying the claims given above. diff --git a/docs/VictoriaLogs/Roadmap.md b/docs/VictoriaLogs/Roadmap.md index 98921042b..4355f691f 100644 --- a/docs/VictoriaLogs/Roadmap.md +++ b/docs/VictoriaLogs/Roadmap.md @@ -34,8 +34,6 @@ The following functionality is planned in the future versions of VictoriaLogs: - Fluentd - Journald (systemd) - Datadog protocol for logs -- Add missing functionality to [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/): - - [Stream context](https://docs.victoriametrics.com/victorialogs/logsql/#stream-context). - Integration with Grafana ([partially done](https://github.com/VictoriaMetrics/victorialogs-datasource)). - Ability to make instant snapshots and backups in the way [similar to VictoriaMetrics](https://docs.victoriametrics.com/#how-to-work-with-snapshots). - Cluster version of VictoriaLogs. diff --git a/docs/VictoriaLogs/keyConcepts.md b/docs/VictoriaLogs/keyConcepts.md index 6fa0f7efe..a5160568b 100644 --- a/docs/VictoriaLogs/keyConcepts.md +++ b/docs/VictoriaLogs/keyConcepts.md @@ -247,7 +247,7 @@ This can help narrowing down and eliminating high-cardinality fields from [log s ### Other fields Every ingested log entry may contain arbitrary number of [fields](#data-model) additionally to [`_msg`](#message-field) and [`_time`](#time-field). -For example, `level`, `ip`, `user_id`, `trace_id`, etc. Such fields can be used for simplifying and optimizing [search queries](#https://docs.victoriametrics.com/victorialogs/logsql/). +For example, `level`, `ip`, `user_id`, `trace_id`, etc. Such fields can be used for simplifying and optimizing [search queries](https://docs.victoriametrics.com/victorialogs/logsql/). It is usually faster to search over a dedicated `trace_id` field instead of searching for the `trace_id` inside long [log message](#message-field). E.g. the `trace_id:="XXXX-YYYY-ZZZZ"` query usually works faster than the `_msg:"trace_id=XXXX-YYYY-ZZZZ"` query. diff --git a/docs/VictoriaLogs/logsql-examples.md b/docs/VictoriaLogs/logsql-examples.md index 971d05646..a28eec5f5 100644 --- a/docs/VictoriaLogs/logsql-examples.md +++ b/docs/VictoriaLogs/logsql-examples.md @@ -472,3 +472,13 @@ returns logs with the `cannot open file` phrase over the last 5 minutes: ```logsql _time:5m "cannot open file" ``` + +## How to select all the logs for a particular stacktrace or panic? + +Use [`stream_context` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stream_context-pipe) for selecting surrounding logs for the given log. +For example, the following query selects up to 10 logs in front of every log message containing the `stacktrace` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word), +plus up to 100 logs after the given log message: + +```logsql +_time:5m stacktrace | stream_context before 10 after 100 +``` diff --git a/lib/logstorage/filter_test.go b/lib/logstorage/filter_test.go index dc3358b64..67cc5764a 100644 --- a/lib/logstorage/filter_test.go +++ b/lib/logstorage/filter_test.go @@ -196,11 +196,7 @@ func testFilterMatchForColumns(t *testing.T, columns []column, f filter, neededC func testFilterMatchForStorage(t *testing.T, s *Storage, tenantID TenantID, f filter, neededColumnName string, expectedValues []string, expectedTimestamps []int64) { t.Helper() - so := &genericSearchOptions{ - tenantIDs: []TenantID{tenantID}, - filter: f, - neededColumnNames: []string{neededColumnName, "_time"}, - } + so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{neededColumnName, "_time"}) type result struct { value string diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index 3ff75b5e2..7fffea336 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -350,7 +350,26 @@ func (q *Query) CanReturnLastNResults() bool { // GetFilterTimeRange returns filter time range for the given q. func (q *Query) GetFilterTimeRange() (int64, int64) { - return getFilterTimeRange(q.f) + switch t := q.f.(type) { + case *filterAnd: + minTimestamp := int64(math.MinInt64) + maxTimestamp := int64(math.MaxInt64) + for _, filter := range t.filters { + ft, ok := filter.(*filterTime) + if ok { + if ft.minTimestamp > minTimestamp { + minTimestamp = ft.minTimestamp + } + if ft.maxTimestamp < maxTimestamp { + maxTimestamp = ft.maxTimestamp + } + } + } + return minTimestamp, maxTimestamp + case *filterTime: + return t.minTimestamp, t.maxTimestamp + } + return math.MinInt64, math.MaxInt64 } // AddTimeFilter adds global filter _time:[start ... end] to q. diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index f07b4b008..d119fd89f 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -219,6 +219,12 @@ func parsePipe(lex *lexer) (pipe, error) { return nil, fmt.Errorf("cannot parse 'stats' pipe: %w", err) } return ps, nil + case lex.isKeyword("stream_context"): + pc, err := parsePipeStreamContext(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'stream_context' pipe: %w", err) + } + return pc, nil case lex.isKeyword("top"): pt, err := parsePipeTop(lex) if err != nil { @@ -298,6 +304,7 @@ var pipeNames = func() map[string]struct{} { "replace_regexp", "sort", "stats", + "stream_context", "top", "uniq", "unpack_json", diff --git a/lib/logstorage/pipe_stream_context.go b/lib/logstorage/pipe_stream_context.go new file mode 100644 index 000000000..42748b798 --- /dev/null +++ b/lib/logstorage/pipe_stream_context.go @@ -0,0 +1,511 @@ +package logstorage + +import ( + "context" + "fmt" + "math" + "sort" + "strings" + "sync" + "sync/atomic" + "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" +) + +// pipeStreamContext processes '| stream_context ...' queries. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#stream_context-pipe +type pipeStreamContext struct { + // linesBefore is the number of lines to return before the matching line + linesBefore int + + // linesAfter is the number of lines to return after the matching line + linesAfter int +} + +func (pc *pipeStreamContext) String() string { + s := "stream_context" + if pc.linesBefore > 0 { + s += fmt.Sprintf(" before %d", pc.linesBefore) + } + if pc.linesAfter > 0 { + s += fmt.Sprintf(" after %d", pc.linesAfter) + } + return s +} + +func (pc *pipeStreamContext) canLiveTail() bool { + return false +} + +var neededFieldsForStreamContext = []string{ + "_time", + "_stream_id", +} + +func (pc *pipeStreamContext) updateNeededFields(neededFields, unneededFields fieldsSet) { + neededFields.addFields(neededFieldsForStreamContext) + unneededFields.removeFields(neededFieldsForStreamContext) +} + +func (pc *pipeStreamContext) optimize() { + // nothing to do +} + +func (pc *pipeStreamContext) hasFilterInWithQuery() bool { + return false +} + +func (pc *pipeStreamContext) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) { + return pc, nil +} + +func (pc *pipeStreamContext) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor { + maxStateSize := int64(float64(memory.Allowed()) * 0.2) + + shards := make([]pipeStreamContextProcessorShard, workersCount) + for i := range shards { + shards[i] = pipeStreamContextProcessorShard{ + pipeStreamContextProcessorShardNopad: pipeStreamContextProcessorShardNopad{ + pc: pc, + stateSizeBudget: stateSizeBudgetChunk, + }, + } + maxStateSize -= stateSizeBudgetChunk + } + + pcp := &pipeStreamContextProcessor{ + pc: pc, + stopCh: stopCh, + cancel: cancel, + ppNext: ppNext, + + shards: shards, + + maxStateSize: maxStateSize, + } + pcp.stateSizeBudget.Store(maxStateSize) + + return pcp +} + +type pipeStreamContextProcessor struct { + pc *pipeStreamContext + stopCh <-chan struct{} + cancel func() + ppNext pipeProcessor + + shards []pipeStreamContextProcessorShard + + getStreamRows func(streamID string, stateSizeBudget int) ([]streamContextRow, error) + + maxStateSize int64 + stateSizeBudget atomic.Int64 +} + +func (pcp *pipeStreamContextProcessor) init(ctx context.Context, s *Storage, minTimestamp, maxTimestamp int64) { + pcp.getStreamRows = func(streamID string, stateSizeBudget int) ([]streamContextRow, error) { + return getStreamRows(ctx, s, streamID, minTimestamp, maxTimestamp, stateSizeBudget) + } +} + +func getStreamRows(ctx context.Context, s *Storage, streamID string, minTimestamp, maxTimestamp int64, stateSizeBudget int) ([]streamContextRow, error) { + tenantID, ok := getTenantIDFromStreamIDString(streamID) + if !ok { + logger.Panicf("BUG: cannot obtain tenantID from streamID %q", streamID) + } + + qStr := "_stream_id:" + streamID + q, err := ParseQuery(qStr) + if err != nil { + logger.Panicf("BUG: cannot parse query [%s]: %s", qStr, err) + } + q.AddTimeFilter(minTimestamp, maxTimestamp) + + ctxWithCancel, cancel := context.WithCancel(ctx) + defer cancel() + + var mu sync.Mutex + var rows []streamContextRow + stateSize := 0 + writeBlock := func(_ uint, br *blockResult) { + mu.Lock() + defer mu.Unlock() + + if stateSize > stateSizeBudget { + cancel() + } + + cs := br.getColumns() + for i, timestamp := range br.timestamps { + fields := make([]Field, len(cs)) + stateSize += int(unsafe.Sizeof(fields[0])) * len(fields) + + for j, c := range cs { + v := c.getValueAtRow(br, i) + fields[j] = Field{ + Name: strings.Clone(c.name), + Value: strings.Clone(v), + } + stateSize += len(c.name) + len(v) + } + + row := streamContextRow{ + timestamp: timestamp, + fields: fields, + } + stateSize += int(unsafe.Sizeof(row)) + rows = append(rows, row) + } + } + + if err := s.runQuery(ctxWithCancel, []TenantID{tenantID}, q, writeBlock); err != nil { + return nil, err + } + if stateSize > stateSizeBudget { + return nil, fmt.Errorf("more than %dMB of memory is needed for query [%s]", stateSizeBudget/(1<<20), q) + } + + return rows, nil +} + +func getTenantIDFromStreamIDString(s string) (TenantID, bool) { + var sid streamID + if !sid.tryUnmarshalFromString(s) { + return TenantID{}, false + } + return sid.tenantID, true +} + +type pipeStreamContextProcessorShard struct { + pipeStreamContextProcessorShardNopad + + // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof(pipeStreamContextProcessorShardNopad{})%128]byte +} + +type streamContextRow struct { + timestamp int64 + fields []Field +} + +type pipeStreamContextProcessorShardNopad struct { + // pc points to the parent pipeStreamContext. + pc *pipeStreamContext + + // m holds per-stream matching rows + m map[string][]streamContextRow + + // stateSizeBudget is the remaining budget for the whole state size for the shard. + // The per-shard budget is provided in chunks from the parent pipeStreamContextProcessor. + stateSizeBudget int +} + +// writeBlock writes br to shard. +func (shard *pipeStreamContextProcessorShard) writeBlock(br *blockResult) { + m := shard.getM() + + cs := br.getColumns() + cStreamID := br.getColumnByName("_stream_id") + stateSize := 0 + for i, timestamp := range br.timestamps { + fields := make([]Field, len(cs)) + stateSize += int(unsafe.Sizeof(fields[0])) * len(fields) + + for j, c := range cs { + v := c.getValueAtRow(br, i) + fields[j] = Field{ + Name: strings.Clone(c.name), + Value: strings.Clone(v), + } + stateSize += len(c.name) + len(v) + } + + row := streamContextRow{ + timestamp: timestamp, + fields: fields, + } + stateSize += int(unsafe.Sizeof(row)) + + streamID := cStreamID.getValueAtRow(br, i) + rows, ok := m[streamID] + if !ok { + stateSize += len(streamID) + } + rows = append(rows, row) + streamID = strings.Clone(streamID) + m[streamID] = rows + } + + shard.stateSizeBudget -= stateSize +} + +func (shard *pipeStreamContextProcessorShard) getM() map[string][]streamContextRow { + if shard.m == nil { + shard.m = make(map[string][]streamContextRow) + } + return shard.m +} + +func (pcp *pipeStreamContextProcessor) writeBlock(workerID uint, br *blockResult) { + if len(br.timestamps) == 0 { + return + } + if pcp.pc.linesBefore <= 0 && pcp.pc.linesAfter <= 0 { + // Fast path - there is no need to fetch stream context. + pcp.ppNext.writeBlock(workerID, br) + return + } + + shard := &pcp.shards[workerID] + + for shard.stateSizeBudget < 0 { + // steal some budget for the state size from the global budget. + remaining := pcp.stateSizeBudget.Add(-stateSizeBudgetChunk) + if remaining < 0 { + // The state size is too big. Stop processing data in order to avoid OOM crash. + if remaining+stateSizeBudgetChunk >= 0 { + // Notify worker goroutines to stop calling writeBlock() in order to save CPU time. + pcp.cancel() + } + return + } + shard.stateSizeBudget += stateSizeBudgetChunk + } + + shard.writeBlock(br) +} + +func (pcp *pipeStreamContextProcessor) flush() error { + if pcp.pc.linesBefore <= 0 && pcp.pc.linesAfter <= 0 { + // Fast path - nothing to do. + return nil + } + + n := pcp.stateSizeBudget.Load() + if n <= 0 { + return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", pcp.pc.String(), pcp.maxStateSize/(1<<20)) + } + if n > math.MaxInt { + logger.Panicf("BUG: stateSizeBudget shouldn't exceed math.MaxInt=%v; got %d", math.MaxInt, n) + } + stateSizeBudget := int(n) + + // merge state across shards + shards := pcp.shards + m := shards[0].getM() + shards = shards[1:] + for i := range shards { + if needStop(pcp.stopCh) { + return nil + } + + for streamID, rowsSrc := range shards[i].getM() { + rows, ok := m[streamID] + if !ok { + m[streamID] = rowsSrc + } else { + m[streamID] = append(rows, rowsSrc...) + } + } + } + + // write result + wctx := &pipeStreamContextWriteContext{ + pcp: pcp, + } + + for streamID, rows := range m { + streamRows, err := pcp.getStreamRows(streamID, stateSizeBudget) + if err != nil { + return fmt.Errorf("cannot read rows for _stream_id=%q: %w", streamID, err) + } + if needStop(pcp.stopCh) { + return nil + } + resultRows, err := getStreamContextRows(streamRows, rows, pcp.pc.linesBefore, pcp.pc.linesAfter) + if err != nil { + return fmt.Errorf("cannot obtain context rows for _stream_id=%q: %w", streamID, err) + } + for _, rowFields := range resultRows { + wctx.writeRow(rowFields) + } + } + + wctx.flush() + + return nil +} + +func getStreamContextRows(streamRows, rows []streamContextRow, linesBefore, linesAfter int) ([][]Field, error) { + sortStreamContextRows(streamRows) + sortStreamContextRows(rows) + + var resultRows [][]Field + idxNext := 0 + for _, r := range rows { + idx := getStreamContextRowIdx(streamRows, r.timestamp) + if idx < 0 { + // This error may happen when streamRows became out of sync with rows. + // For example, when some streamRows were deleted after obtaining rows. + return nil, fmt.Errorf("missing row for timestamp=%d; len(streamRows)=%d, len(rows)=%d", r.timestamp, len(streamRows), len(rows)) + } + + idxStart := idx - linesBefore + if idxStart < idxNext { + idxStart = idxNext + } + for idxStart < idx { + resultRows = append(resultRows, streamRows[idxStart].fields) + idxStart++ + } + + if idx >= idxNext { + resultRows = append(resultRows, streamRows[idx].fields) + idxNext = idx + 1 + } + + idxEnd := idx + 1 + linesAfter + for idxNext < idxEnd && idxNext < len(streamRows) { + resultRows = append(resultRows, streamRows[idxNext].fields) + idxNext++ + } + + if idxNext >= len(streamRows) { + break + } + } + + return resultRows, nil +} + +func getStreamContextRowIdx(rows []streamContextRow, timestamp int64) int { + n := sort.Search(len(rows), func(i int) bool { + return rows[i].timestamp >= timestamp + }) + if n == len(rows) { + return -1 + } + if rows[n].timestamp != timestamp { + return -1 + } + return n +} + +func sortStreamContextRows(rows []streamContextRow) { + sort.SliceStable(rows, func(i, j int) bool { + return rows[i].timestamp < rows[j].timestamp + }) +} + +type pipeStreamContextWriteContext struct { + pcp *pipeStreamContextProcessor + rcs []resultColumn + br blockResult + + // rowsCount is the number of rows in the current block + rowsCount int + + // valuesLen is the total length of values in the current block + valuesLen int +} + +func (wctx *pipeStreamContextWriteContext) writeRow(rowFields []Field) { + rcs := wctx.rcs + + areEqualColumns := len(rcs) == len(rowFields) + if areEqualColumns { + for i, f := range rowFields { + if rcs[i].name != f.Name { + areEqualColumns = false + break + } + } + } + if !areEqualColumns { + // send the current block to ppNext and construct a block with new set of columns + wctx.flush() + + rcs = wctx.rcs[:0] + for _, f := range rowFields { + rcs = appendResultColumnWithName(rcs, f.Name) + } + wctx.rcs = rcs + } + + for i, f := range rowFields { + v := f.Value + rcs[i].addValue(v) + wctx.valuesLen += len(v) + } + + wctx.rowsCount++ + if wctx.valuesLen >= 1_000_000 { + wctx.flush() + } +} + +func (wctx *pipeStreamContextWriteContext) flush() { + rcs := wctx.rcs + br := &wctx.br + + wctx.valuesLen = 0 + + // Flush rcs to ppNext + br.setResultColumns(rcs, wctx.rowsCount) + wctx.rowsCount = 0 + wctx.pcp.ppNext.writeBlock(0, br) + br.reset() + for i := range rcs { + rcs[i].resetValues() + } +} + +func parsePipeStreamContext(lex *lexer) (*pipeStreamContext, error) { + if !lex.isKeyword("stream_context") { + return nil, fmt.Errorf("expecting 'stream_context'; got %q", lex.token) + } + lex.nextToken() + + linesBefore := 0 + beforeSet := false + if lex.isKeyword("before") { + lex.nextToken() + f, s, err := parseNumber(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'before' value in 'stream_context': %w", err) + } + if f < 0 { + return nil, fmt.Errorf("'before' value cannot be smaller than 0; got %q", s) + } + linesBefore = int(f) + beforeSet = true + } + + linesAfter := 0 + afterSet := false + if lex.isKeyword("after") { + lex.nextToken() + f, s, err := parseNumber(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'after' value in 'stream_context': %w", err) + } + if f < 0 { + return nil, fmt.Errorf("'after' value cannot be smaller than 0; got %q", s) + } + linesAfter = int(f) + afterSet = true + } + + if !beforeSet && !afterSet { + return nil, fmt.Errorf("missing 'before N' or 'after N' in 'stream_context'") + } + + pc := &pipeStreamContext{ + linesBefore: linesBefore, + linesAfter: linesAfter, + } + return pc, nil +} diff --git a/lib/logstorage/pipe_stream_context_test.go b/lib/logstorage/pipe_stream_context_test.go new file mode 100644 index 000000000..12943f85f --- /dev/null +++ b/lib/logstorage/pipe_stream_context_test.go @@ -0,0 +1,116 @@ +package logstorage + +import ( + "testing" +) + +func TestParsePipeStreamContextSuccess(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeSuccess(t, pipeStr) + } + + f(`stream_context before 5`) + f(`stream_context after 10`) + f(`stream_context before 10 after 20`) +} + +func TestParsePipeStreamContextFailure(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeFailure(t, pipeStr) + } + + f(`stream_context`) + f(`stream_context before`) + f(`stream_context after`) + f(`stream_context before after`) + f(`stream_context after before`) + f(`stream_context before -4`) + f(`stream_context after -4`) +} + +func TestPipeStreamContext(t *testing.T) { + f := func(pipeStr string, rows, rowsExpected [][]Field) { + t.Helper() + expectPipeResults(t, pipeStr, rows, rowsExpected) + } + + f("stream_context before 0", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }) + + f("stream_context after 0", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }) +} + +func TestPipeStreamContextUpdateNeededFields(t *testing.T) { + f := func(s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { + t.Helper() + expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected) + } + + // all the needed fields + f("stream_context before 10", "*", "", "*", "") + + // plus unneeded fields + f("stream_context after 4", "*", "f1,f2", "*", "f1,f2") + f("stream_context after 4", "*", "_time,f1,_stream_id", "*", "f1") + + // needed fields + f("stream_context before 3", "f1,f2", "", "_stream_id,_time,f1,f2", "") + f("stream_context before 3", "_time,f1,_stream_id", "", "_stream_id,_time,f1", "") +} diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index 4dcaff65e..ba82ccd0d 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -22,6 +22,12 @@ type genericSearchOptions struct { // If it is empty, then the search is performed by tenantIDs streamIDs []streamID + // minTimestamp is the minimum timestamp for the search + minTimestamp int64 + + // maxTimestamp is the maximum timestamp for the search + maxTimestamp int64 + // filter is the filter to use for the search filter filter @@ -110,10 +116,14 @@ func (s *Storage) runQuery(ctx context.Context, tenantIDs []TenantID, q *Query, return streamIDs[i].less(&streamIDs[j]) }) + minTimestamp, maxTimestamp := q.GetFilterTimeRange() + neededColumnNames, unneededColumnNames := q.getNeededColumns() so := &genericSearchOptions{ tenantIDs: tenantIDs, streamIDs: streamIDs, + minTimestamp: minTimestamp, + maxTimestamp: maxTimestamp, filter: q.f, neededColumnNames: neededColumnNames, unneededColumnNames: unneededColumnNames, @@ -127,10 +137,21 @@ func (s *Storage) runQuery(ctx context.Context, tenantIDs []TenantID, q *Query, stopCh := ctx.Done() cancels := make([]func(), len(q.pipes)) pps := make([]pipeProcessor, len(q.pipes)) + + var errPipe error for i := len(q.pipes) - 1; i >= 0; i-- { p := q.pipes[i] ctxChild, cancel := context.WithCancel(ctx) pp = p.newPipeProcessor(workersCount, stopCh, cancel, pp) + + pcp, ok := pp.(*pipeStreamContextProcessor) + if ok { + pcp.init(ctx, s, minTimestamp, maxTimestamp) + if i > 0 { + errPipe = fmt.Errorf("[%s] pipe must go after [%s] filter; now it goes after the [%s] pipe", p, q.f, q.pipes[i-1]) + } + } + stopCh = ctxChild.Done() ctx = ctxChild @@ -138,7 +159,9 @@ func (s *Storage) runQuery(ctx context.Context, tenantIDs []TenantID, q *Query, pps[i] = pp } - s.search(workersCount, so, stopCh, pp.writeBlock) + if errPipe == nil { + s.search(workersCount, so, stopCh, pp.writeBlock) + } var errFlush error for i, pp := range pps { @@ -151,6 +174,11 @@ func (s *Storage) runQuery(ctx context.Context, tenantIDs []TenantID, q *Query, if err := ppMain.flush(); err != nil && errFlush == nil { errFlush = err } + + if errPipe != nil { + return errPipe + } + return errFlush } @@ -642,19 +670,15 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch }(uint(i)) } - // Obtain time range from so.filter - f := so.filter - minTimestamp, maxTimestamp := getFilterTimeRange(f) - // Select partitions according to the selected time range s.partitionsLock.Lock() ptws := s.partitions - minDay := minTimestamp / nsecPerDay + minDay := so.minTimestamp / nsecPerDay n := sort.Search(len(ptws), func(i int) bool { return ptws[i].day >= minDay }) ptws = ptws[n:] - maxDay := maxTimestamp / nsecPerDay + maxDay := so.maxTimestamp / nsecPerDay n = sort.Search(len(ptws), func(i int) bool { return ptws[i].day > maxDay }) @@ -665,8 +689,7 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch s.partitionsLock.Unlock() // Obtain common filterStream from f - var sf *StreamFilter - sf, f = getCommonStreamFilter(f) + sf, f := getCommonStreamFilter(so.filter) // Schedule concurrent search across matching partitions. psfs := make([]partitionSearchFinalizer, len(ptws)) @@ -675,7 +698,7 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch partitionSearchConcurrencyLimitCh <- struct{}{} wgSearchers.Add(1) go func(idx int, pt *partition) { - psfs[idx] = pt.search(minTimestamp, maxTimestamp, sf, f, so, workCh, stopCh) + psfs[idx] = pt.search(sf, f, so, workCh, stopCh) wgSearchers.Done() <-partitionSearchConcurrencyLimitCh }(i, ptw.pt) @@ -704,7 +727,7 @@ var partitionSearchConcurrencyLimitCh = make(chan struct{}, cgroup.AvailableCPUs type partitionSearchFinalizer func() -func (pt *partition) search(minTimestamp, maxTimestamp int64, sf *StreamFilter, f filter, so *genericSearchOptions, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) partitionSearchFinalizer { +func (pt *partition) search(sf *StreamFilter, f filter, so *genericSearchOptions, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) partitionSearchFinalizer { if needStop(stopCh) { // Do not spend CPU time on search, since it is already stopped. return func() {} @@ -728,8 +751,8 @@ func (pt *partition) search(minTimestamp, maxTimestamp int64, sf *StreamFilter, soInternal := &searchOptions{ tenantIDs: tenantIDs, streamIDs: streamIDs, - minTimestamp: minTimestamp, - maxTimestamp: maxTimestamp, + minTimestamp: so.minTimestamp, + maxTimestamp: so.maxTimestamp, filter: f, neededColumnNames: so.neededColumnNames, unneededColumnNames: so.unneededColumnNames, @@ -1095,29 +1118,6 @@ func getCommonStreamFilter(f filter) (*StreamFilter, filter) { return nil, f } -func getFilterTimeRange(f filter) (int64, int64) { - switch t := f.(type) { - case *filterAnd: - minTimestamp := int64(math.MinInt64) - maxTimestamp := int64(math.MaxInt64) - for _, filter := range t.filters { - ft, ok := filter.(*filterTime) - if ok { - if ft.minTimestamp > minTimestamp { - minTimestamp = ft.minTimestamp - } - if ft.maxTimestamp < maxTimestamp { - maxTimestamp = ft.maxTimestamp - } - } - } - return minTimestamp, maxTimestamp - case *filterTime: - return t.minTimestamp, t.maxTimestamp - } - return math.MinInt64, math.MaxInt64 -} - func forEachStreamField(streams []ValueWithHits, f func(f Field, hits uint64)) { var fields []Field for i := range streams { diff --git a/lib/logstorage/storage_search_test.go b/lib/logstorage/storage_search_test.go index 49fc59956..be39d2d05 100644 --- a/lib/logstorage/storage_search_test.go +++ b/lib/logstorage/storage_search_test.go @@ -3,6 +3,7 @@ package logstorage import ( "context" "fmt" + "math" "reflect" "sort" "strings" @@ -87,7 +88,7 @@ func TestStorageRunQuery(t *testing.T) { t.Helper() err := s.RunQuery(context.Background(), tenantIDs, q, writeBlock) if err != nil { - t.Fatalf("unexpected error returned from the query %s: %s", q, err) + t.Fatalf("unexpected error returned from the query [%s]: %s", q, err) } } @@ -656,6 +657,78 @@ func TestStorageRunQuery(t *testing.T) { }, }) }) + t.Run("stream_context-noop-1", func(t *testing.T) { + f(t, `"message 3 at block 1" + | stream_context before 0 + | stats count() rows`, [][]Field{ + { + {"rows", "33"}, + }, + }) + }) + t.Run("stream_context-noop-2", func(t *testing.T) { + f(t, `"message 3 at block 1" + | stream_context before 0 after 0 + | stats count() rows`, [][]Field{ + { + {"rows", "33"}, + }, + }) + }) + t.Run("stream_context-before-1", func(t *testing.T) { + f(t, `"message 3 at block 1" + | stream_context before 1 + | stats count() rows`, [][]Field{ + { + {"rows", "66"}, + }, + }) + }) + t.Run("stream_context-after-1", func(t *testing.T) { + f(t, `"message 3 at block 1" + | stream_context after 1 + | stats count() rows`, [][]Field{ + { + {"rows", "66"}, + }, + }) + }) + t.Run("stream_context-before-after-1", func(t *testing.T) { + f(t, `"message 3 at block 1" + | stream_context before 1 after 1 + | stats count() rows`, [][]Field{ + { + {"rows", "99"}, + }, + }) + }) + t.Run("stream_context-before-1000", func(t *testing.T) { + f(t, `"message 4" + | stream_context before 1000 + | stats count() rows`, [][]Field{ + { + {"rows", "825"}, + }, + }) + }) + t.Run("stream_context-after-1000", func(t *testing.T) { + f(t, `"message 4" + | stream_context after 1000 + | stats count() rows`, [][]Field{ + { + {"rows", "495"}, + }, + }) + }) + t.Run("stream_context-before-after-1000", func(t *testing.T) { + f(t, `"message 4" + | stream_context before 1000 after 1000 + | stats count() rows`, [][]Field{ + { + {"rows", "1155"}, + }, + }) + }) // Close the storage and delete its data s.MustClose() @@ -757,11 +830,7 @@ func TestStorageSearch(t *testing.T) { minTimestamp := baseTimestamp maxTimestamp := baseTimestamp + rowsPerBlock*1e9 + blocksPerStream f := getBaseFilter(minTimestamp, maxTimestamp, nil) - so := &genericSearchOptions{ - tenantIDs: []TenantID{tenantID}, - filter: f, - neededColumnNames: []string{"_msg"}, - } + so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"}) processBlock := func(_ uint, _ *blockResult) { panic(fmt.Errorf("unexpected match")) } @@ -775,11 +844,7 @@ func TestStorageSearch(t *testing.T) { minTimestamp := baseTimestamp maxTimestamp := baseTimestamp + rowsPerBlock*1e9 + blocksPerStream f := getBaseFilter(minTimestamp, maxTimestamp, nil) - so := &genericSearchOptions{ - tenantIDs: []TenantID{tenantID}, - filter: f, - neededColumnNames: []string{"_msg"}, - } + so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"}) processBlock := func(_ uint, _ *blockResult) { panic(fmt.Errorf("unexpected match")) } @@ -793,11 +858,7 @@ func TestStorageSearch(t *testing.T) { minTimestamp := baseTimestamp maxTimestamp := baseTimestamp + rowsPerBlock*1e9 + blocksPerStream f := getBaseFilter(minTimestamp, maxTimestamp, nil) - so := &genericSearchOptions{ - tenantIDs: []TenantID{tenantID}, - filter: f, - neededColumnNames: []string{"_msg"}, - } + so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"}) processBlock := func(_ uint, _ *blockResult) { panic(fmt.Errorf("unexpected match")) } @@ -812,11 +873,7 @@ func TestStorageSearch(t *testing.T) { minTimestamp := baseTimestamp maxTimestamp := baseTimestamp + rowsPerBlock*1e9 + blocksPerStream f := getBaseFilter(minTimestamp, maxTimestamp, nil) - so := &genericSearchOptions{ - tenantIDs: []TenantID{tenantID}, - filter: f, - neededColumnNames: []string{"_msg"}, - } + so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"}) var rowsCountTotal atomic.Uint32 processBlock := func(_ uint, br *blockResult) { rowsCountTotal.Add(uint32(len(br.timestamps))) @@ -833,11 +890,7 @@ func TestStorageSearch(t *testing.T) { minTimestamp := baseTimestamp maxTimestamp := baseTimestamp + rowsPerBlock*1e9 + blocksPerStream f := getBaseFilter(minTimestamp, maxTimestamp, nil) - so := &genericSearchOptions{ - tenantIDs: allTenantIDs, - filter: f, - neededColumnNames: []string{"_msg"}, - } + so := newTestGenericSearchOptions(allTenantIDs, f, []string{"_msg"}) var rowsCountTotal atomic.Uint32 processBlock := func(_ uint, br *blockResult) { rowsCountTotal.Add(uint32(len(br.timestamps))) @@ -854,11 +907,7 @@ func TestStorageSearch(t *testing.T) { minTimestamp := baseTimestamp maxTimestamp := baseTimestamp + rowsPerBlock*1e9 + blocksPerStream f := getBaseFilter(minTimestamp, maxTimestamp, sf) - so := &genericSearchOptions{ - tenantIDs: allTenantIDs, - filter: f, - neededColumnNames: []string{"_msg"}, - } + so := newTestGenericSearchOptions(allTenantIDs, f, []string{"_msg"}) processBlock := func(_ uint, _ *blockResult) { panic(fmt.Errorf("unexpected match")) } @@ -874,11 +923,7 @@ func TestStorageSearch(t *testing.T) { minTimestamp := baseTimestamp maxTimestamp := baseTimestamp + rowsPerBlock*1e9 + blocksPerStream f := getBaseFilter(minTimestamp, maxTimestamp, sf) - so := &genericSearchOptions{ - tenantIDs: []TenantID{tenantID}, - filter: f, - neededColumnNames: []string{"_msg"}, - } + so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"}) var rowsCountTotal atomic.Uint32 processBlock := func(_ uint, br *blockResult) { rowsCountTotal.Add(uint32(len(br.timestamps))) @@ -900,11 +945,7 @@ func TestStorageSearch(t *testing.T) { minTimestamp := baseTimestamp maxTimestamp := baseTimestamp + rowsPerBlock*1e9 + blocksPerStream f := getBaseFilter(minTimestamp, maxTimestamp, sf) - so := &genericSearchOptions{ - tenantIDs: []TenantID{tenantID}, - filter: f, - neededColumnNames: []string{"_msg"}, - } + so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"}) var rowsCountTotal atomic.Uint32 processBlock := func(_ uint, br *blockResult) { rowsCountTotal.Add(uint32(len(br.timestamps))) @@ -934,11 +975,7 @@ func TestStorageSearch(t *testing.T) { }, }, } - so := &genericSearchOptions{ - tenantIDs: []TenantID{tenantID}, - filter: f, - neededColumnNames: []string{"_msg"}, - } + so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"}) var rowsCountTotal atomic.Uint32 processBlock := func(_ uint, br *blockResult) { rowsCountTotal.Add(uint32(len(br.timestamps))) @@ -959,11 +996,7 @@ func TestStorageSearch(t *testing.T) { minTimestamp := baseTimestamp + (rowsPerBlock-2)*1e9 maxTimestamp := baseTimestamp + (rowsPerBlock-1)*1e9 - 1 f := getBaseFilter(minTimestamp, maxTimestamp, sf) - so := &genericSearchOptions{ - tenantIDs: []TenantID{tenantID}, - filter: f, - neededColumnNames: []string{"_msg"}, - } + so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"}) var rowsCountTotal atomic.Uint32 processBlock := func(_ uint, br *blockResult) { rowsCountTotal.Add(uint32(len(br.timestamps))) @@ -984,11 +1017,7 @@ func TestStorageSearch(t *testing.T) { minTimestamp := baseTimestamp + (rowsPerBlock+1)*1e9 maxTimestamp := baseTimestamp + (rowsPerBlock+2)*1e9 f := getBaseFilter(minTimestamp, maxTimestamp, sf) - so := &genericSearchOptions{ - tenantIDs: []TenantID{tenantID}, - filter: f, - neededColumnNames: []string{"_msg"}, - } + so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"}) processBlock := func(_ uint, _ *blockResult) { panic(fmt.Errorf("unexpected match")) } @@ -1020,3 +1049,13 @@ func TestParseStreamFieldsSuccess(t *testing.T) { f(`{a="b",c="d"}`, `{"a":"b","c":"d"}`) f(`{a="a=,b\"c}",b="d"}`, `{"a":"a=,b\"c}","b":"d"}`) } + +func newTestGenericSearchOptions(tenantIDs []TenantID, f filter, neededColumns []string) *genericSearchOptions { + return &genericSearchOptions{ + tenantIDs: tenantIDs, + minTimestamp: math.MinInt64, + maxTimestamp: math.MaxInt64, + filter: f, + neededColumnNames: neededColumns, + } +}