From e9950f63076eba22d34a8fb85913c5c21aad4496 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 25 Sep 2024 19:15:32 +0200 Subject: [PATCH] lib/logstorage: add `blocks_count` pipe This pipe is useful for debugging purposes when the number of processed blocks must be calculated for the given query: | blocks_count This helps detecting the root cause of query performance slowdown in cases like https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7070 --- docs/VictoriaLogs/CHANGELOG.md | 1 + docs/VictoriaLogs/LogsQL.md | 5 + lib/logstorage/parser.go | 3 +- lib/logstorage/parser_test.go | 30 +++++ lib/logstorage/pipe.go | 7 ++ lib/logstorage/pipe_blocks_count.go | 136 +++++++++++++++++++++++ lib/logstorage/pipe_blocks_count_test.go | 48 ++++++++ lib/logstorage/pipe_field_names.go | 2 +- 8 files changed, 230 insertions(+), 2 deletions(-) create mode 100644 lib/logstorage/pipe_blocks_count.go create mode 100644 lib/logstorage/pipe_blocks_count_test.go diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 7c40abde5..4a88ee19e 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -17,6 +17,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta * FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): add button for enabling auto refresh, similarly to VictoriaMetrics vmui. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7017). * FEATURE: improve performance of analytical queries, which do not need reading the `_time` field. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7070). +* FEATURE: add [`blocks_count` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#blocks_count-pipe), which can be used for counting the number of matching blocks for the given query. For example, `_time:5m | blocks_count` returns the number of blocks with logs for the last 5 minutes. This pipe can be useful for debugging purposes. * BUGFIX: properly return logs without [`_msg`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) field when `*` query is passed to [`/select/logsql/query` endpoint](https://docs.victoriametrics.com/victorialogs/querying/#querying-logs) together with positive `limit` arg. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6785). Thanks to @jiekun for itentifying the root cause of the issue. diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index a50329623..5e2ee15a8 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1281,6 +1281,7 @@ _time:5m | stats by (_stream) count() per_stream_logs | sort by (per_stream_logs LogsQL supports the following pipes: +- [`blocks_count`](#blocks_count-pipe) counts the number of blocks with logs processed by the query. - [`copy`](#copy-pipe) copies [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`delete`](#delete-pipe) deletes [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`drop_empty_fields`](#drop_empty_fields-pipe) drops [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with empty values. @@ -1310,6 +1311,10 @@ LogsQL supports the following pipes: - [`unpack_syslog`](#unpack_syslog-pipe) unpacks [syslog](https://en.wikipedia.org/wiki/Syslog) messages from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`unroll`](#unroll-pipe) unrolls JSON arrays from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +### blocks_count + +` | blocks_count` [pipe](#pipes) counts the number of blocks with logs processed by ``. This pipe is needed mostly for debugging. + ### copy pipe If some [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) must be copied, then `| copy src1 as dst1, ..., srcN as dstN` [pipe](#pipes) can be used. diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index 58ce041b5..ea42a1688 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -350,7 +350,8 @@ func (q *Query) Clone() *Query { func (q *Query) CanReturnLastNResults() bool { for _, p := range q.pipes { switch p.(type) { - case *pipeFieldNames, + case *pipeBlocksCount, + *pipeFieldNames, *pipeFieldValues, *pipeLimit, *pipeOffset, diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index ca78dd1f5..352644632 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -967,6 +967,12 @@ func TestParseQuerySuccess(t *testing.T) { // field_names pipe f(`foo | field_names as x`, `foo | field_names as x`) f(`foo | field_names y`, `foo | field_names as y`) + f(`foo | field_names`, `foo | field_names`) + + // blocks_count pipe + f(`foo | blocks_count as x`, `foo | blocks_count as x`) + f(`foo | blocks_count y`, `foo | blocks_count as y`) + f(`foo | blocks_count`, `foo | blocks_count`) // copy and cp pipe f(`* | copy foo as bar`, `* | copy foo as bar`) @@ -1458,6 +1464,17 @@ func TestParseQueryFailure(t *testing.T) { f(`foo | field_names x y`) f(`foo | field_names x, y`) + // invalid blocks_count + f(`foo | blocks_count |`) + f(`foo | blocks_count (`) + f(`foo | blocks_count )`) + f(`foo | blocks_count ,`) + f(`foo | blocks_count ()`) + f(`foo | blocks_count (x)`) + f(`foo | blocks_count (x,y)`) + f(`foo | blocks_count x y`) + f(`foo | blocks_count x, y`) + // invalid copy and cp pipe f(`foo | copy`) f(`foo | cp`) @@ -1820,6 +1837,14 @@ func TestQueryGetNeededColumns(t *testing.T) { f(`* | fields x,y | field_names as bar | fields baz`, `x,y`, ``) f(`* | rm x,y | field_names as bar | fields baz`, `*`, `x,y`) + f(`* | blocks_count as foo`, ``, ``) + f(`* | blocks_count foo | fields bar`, ``, ``) + f(`* | blocks_count foo | fields foo`, ``, ``) + f(`* | blocks_count foo | rm foo`, ``, ``) + f(`* | blocks_count foo | rm bar`, ``, ``) + f(`* | fields x,y | blocks_count as bar | fields baz`, ``, ``) + f(`* | rm x,y | blocks_count as bar | fields baz`, ``, ``) + f(`* | format "foo" as s1`, `*`, `s1`) f(`* | format "foo" as s1`, `*`, `s1`) f(`* | format "foo" as s1`, `*`, ``) @@ -1932,6 +1957,8 @@ func TestQueryGetNeededColumns(t *testing.T) { f(`* | extract_regexp if (q:w p:a) "(?P.*)bar" from x | count() r1`, `p,q`, ``) f(`* | field_names | count() r1`, `*`, `_time`) f(`* | limit 10 | field_names as abc | count() r1`, `*`, ``) + f(`* | blocks_count | count() r1`, ``, ``) + f(`* | limit 10 | blocks_count as abc | count() r1`, ``, ``) f(`* | fields a, b | count() r1`, ``, ``) f(`* | field_values a | count() r1`, `a`, ``) f(`* | limit 10 | filter a:b c:d | count() r1`, `a,c`, ``) @@ -2030,6 +2057,7 @@ func TestQueryCanReturnLastNResults(t *testing.T) { f("* | limit 10", false) f("* | offset 10", false) f("* | uniq (x)", false) + f("* | blocks_count", false) f("* | field_names", false) f("* | field_values x", false) f("* | top 5 by (x)", false) @@ -2056,6 +2084,7 @@ func TestQueryCanLiveTail(t *testing.T) { f("* | drop_empty_fields", true) f("* | extract 'foobaz'", true) f("* | extract_regexp 'foo(?Pbaz)'", true) + f("* | blocks_count a", false) f("* | field_names a", false) f("* | fields a, b", true) f("* | field_values a", false) @@ -2224,6 +2253,7 @@ func TestQueryGetStatsByFields_Failure(t *testing.T) { f(`foo | count() | drop_empty_fields`) f(`foo | count() | extract "foobaz"`) f(`foo | count() | extract_regexp "(?P([0-9]+[.]){3}[0-9]+)"`) + f(`foo | count() | blocks_count`) f(`foo | count() | field_names`) f(`foo | count() | field_values abc`) f(`foo | by (x) count() | fields a, b`) diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index d119fd89f..6279d98de 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -99,6 +99,12 @@ func parsePipes(lex *lexer) ([]pipe, error) { func parsePipe(lex *lexer) (pipe, error) { switch { + case lex.isKeyword("blocks_count"): + pc, err := parsePipeBlocksCount(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'blocks_count' pipe: %w", err) + } + return pc, nil case lex.isKeyword("copy", "cp"): pc, err := parsePipeCopy(lex) if err != nil { @@ -284,6 +290,7 @@ func parsePipe(lex *lexer) (pipe, error) { var pipeNames = func() map[string]struct{} { a := []string{ + "blocks_count", "copy", "cp", "delete", "del", "rm", "drop", "drop_empty_fields", diff --git a/lib/logstorage/pipe_blocks_count.go b/lib/logstorage/pipe_blocks_count.go new file mode 100644 index 000000000..f1381b241 --- /dev/null +++ b/lib/logstorage/pipe_blocks_count.go @@ -0,0 +1,136 @@ +package logstorage + +import ( + "fmt" + "unsafe" +) + +// pipeBlocksCount processes '| blocks_count' pipe. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#blocks_count-pipe +type pipeBlocksCount struct { + // resultName is an optional name of the column to write results to. + // By default results are written into 'blocks_count' column. + resultName string +} + +func (pc *pipeBlocksCount) String() string { + s := "blocks_count" + if pc.resultName != "blocks_count" { + s += " as " + quoteTokenIfNeeded(pc.resultName) + } + return s +} + +func (pc *pipeBlocksCount) canLiveTail() bool { + return false +} + +func (pc *pipeBlocksCount) updateNeededFields(neededFields, unneededFields fieldsSet) { + neededFields.reset() + unneededFields.reset() +} + +func (pc *pipeBlocksCount) optimize() { + // nothing to do +} + +func (pc *pipeBlocksCount) hasFilterInWithQuery() bool { + return false +} + +func (pc *pipeBlocksCount) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) { + return pc, nil +} + +func (pc *pipeBlocksCount) newPipeProcessor(workersCount int, stopCh <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { + shards := make([]pipeBlocksCountProcessorShard, workersCount) + + pcp := &pipeBlocksCountProcessor{ + pc: pc, + stopCh: stopCh, + ppNext: ppNext, + + shards: shards, + } + return pcp +} + +type pipeBlocksCountProcessor struct { + pc *pipeBlocksCount + stopCh <-chan struct{} + ppNext pipeProcessor + + shards []pipeBlocksCountProcessorShard +} + +type pipeBlocksCountProcessorShard struct { + pipeBlocksCountProcessorShardNopad + + // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof(pipeBlocksCountProcessorShardNopad{})%128]byte +} + +type pipeBlocksCountProcessorShardNopad struct { + blocksCount uint64 +} + +func (pcp *pipeBlocksCountProcessor) writeBlock(workerID uint, _ *blockResult) { + shard := &pcp.shards[workerID] + shard.blocksCount++ +} + +func (pcp *pipeBlocksCountProcessor) flush() error { + if needStop(pcp.stopCh) { + return nil + } + + // merge state across shards + shards := pcp.shards + blocksCount := shards[0].blocksCount + shards = shards[1:] + for i := range shards { + blocksCount += shards[i].blocksCount + } + + // write result + rowsCountStr := string(marshalUint64String(nil, blocksCount)) + + rcs := [1]resultColumn{} + rcs[0].name = pcp.pc.resultName + rcs[0].addValue(rowsCountStr) + + var br blockResult + br.setResultColumns(rcs[:], 1) + pcp.ppNext.writeBlock(0, &br) + + return nil +} + +func parsePipeBlocksCount(lex *lexer) (*pipeBlocksCount, error) { + if !lex.isKeyword("blocks_count") { + return nil, fmt.Errorf("expecting 'blocks_count'; got %q", lex.token) + } + lex.nextToken() + + resultName := "blocks_count" + if lex.isKeyword("as") { + lex.nextToken() + name, err := parseFieldName(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse result name for 'blocks_count': %w", err) + } + resultName = name + } else if !lex.isKeyword("", "|") { + name, err := parseFieldName(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse result name for 'blocks_count': %w", err) + } + resultName = name + } + + pc := &pipeBlocksCount{ + resultName: resultName, + } + return pc, nil +} diff --git a/lib/logstorage/pipe_blocks_count_test.go b/lib/logstorage/pipe_blocks_count_test.go new file mode 100644 index 000000000..8907636ca --- /dev/null +++ b/lib/logstorage/pipe_blocks_count_test.go @@ -0,0 +1,48 @@ +package logstorage + +import ( + "testing" +) + +func TestParsePipeBlocksCountSuccess(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeSuccess(t, pipeStr) + } + + f(`blocks_count`) + f(`blocks_count as x`) +} + +func TestParsePipeBlocksCountFailure(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeFailure(t, pipeStr) + } + + f(`blocks_count(foo)`) + f(`blocks_count a b`) + f(`blocks_count as`) +} + +func TestPipeBlocksCountUpdateNeededFields(t *testing.T) { + f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { + t.Helper() + expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected) + } + + // all the needed fields + f("blocks_count as f1", "*", "", "", "") + + // all the needed fields, unneeded fields do not intersect with src + f("blocks_count as f3", "*", "f1,f2", "", "") + + // all the needed fields, unneeded fields intersect with src + f("blocks_count as f1", "*", "s1,f1,f2", "", "") + + // needed fields do not intersect with src + f("blocks_count as f3", "f1,f2", "", "", "") + + // needed fields intersect with src + f("blocks_count as f1", "s1,f1,f2", "", "", "") +} diff --git a/lib/logstorage/pipe_field_names.go b/lib/logstorage/pipe_field_names.go index 0ef824ed7..1cc1b79cf 100644 --- a/lib/logstorage/pipe_field_names.go +++ b/lib/logstorage/pipe_field_names.go @@ -8,7 +8,7 @@ import ( // pipeFieldNames processes '| field_names' pipe. // -// See https://docs.victoriametrics.com/victorialogs/logsql/#field-names-pipe +// See https://docs.victoriametrics.com/victorialogs/logsql/#field_names-pipe type pipeFieldNames struct { // resultName is an optional name of the column to write results to. // By default results are written into 'name' column.