lib/logstorage: add blocks_count pipe

This pipe is useful for debugging purposes when the number of processed blocks must be calculated for the given query:

    <query> | blocks_count

This helps detecting the root cause of query performance slowdown in cases like https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7070
This commit is contained in:
Aliaksandr Valialkin 2024-09-25 19:15:32 +02:00
parent 65b93b17b1
commit e9950f6307
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
8 changed files with 230 additions and 2 deletions

View file

@ -17,6 +17,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
* FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): add button for enabling auto refresh, similarly to VictoriaMetrics vmui. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7017). * FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): add button for enabling auto refresh, similarly to VictoriaMetrics vmui. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7017).
* FEATURE: improve performance of analytical queries, which do not need reading the `_time` field. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7070). * FEATURE: improve performance of analytical queries, which do not need reading the `_time` field. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7070).
* FEATURE: add [`blocks_count` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#blocks_count-pipe), which can be used for counting the number of matching blocks for the given query. For example, `_time:5m | blocks_count` returns the number of blocks with logs for the last 5 minutes. This pipe can be useful for debugging purposes.
* BUGFIX: properly return logs without [`_msg`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) field when `*` query is passed to [`/select/logsql/query` endpoint](https://docs.victoriametrics.com/victorialogs/querying/#querying-logs) together with positive `limit` arg. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6785). Thanks to @jiekun for itentifying the root cause of the issue. * BUGFIX: properly return logs without [`_msg`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) field when `*` query is passed to [`/select/logsql/query` endpoint](https://docs.victoriametrics.com/victorialogs/querying/#querying-logs) together with positive `limit` arg. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6785). Thanks to @jiekun for itentifying the root cause of the issue.

View file

@ -1281,6 +1281,7 @@ _time:5m | stats by (_stream) count() per_stream_logs | sort by (per_stream_logs
LogsQL supports the following pipes: LogsQL supports the following pipes:
- [`blocks_count`](#blocks_count-pipe) counts the number of blocks with logs processed by the query.
- [`copy`](#copy-pipe) copies [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`copy`](#copy-pipe) copies [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`delete`](#delete-pipe) deletes [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`delete`](#delete-pipe) deletes [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`drop_empty_fields`](#drop_empty_fields-pipe) drops [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with empty values. - [`drop_empty_fields`](#drop_empty_fields-pipe) drops [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with empty values.
@ -1310,6 +1311,10 @@ LogsQL supports the following pipes:
- [`unpack_syslog`](#unpack_syslog-pipe) unpacks [syslog](https://en.wikipedia.org/wiki/Syslog) messages from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`unpack_syslog`](#unpack_syslog-pipe) unpacks [syslog](https://en.wikipedia.org/wiki/Syslog) messages from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`unroll`](#unroll-pipe) unrolls JSON arrays from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`unroll`](#unroll-pipe) unrolls JSON arrays from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
### blocks_count
`<q> | blocks_count` [pipe](#pipes) counts the number of blocks with logs processed by `<q>`. This pipe is needed mostly for debugging.
### copy pipe ### copy pipe
If some [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) must be copied, then `| copy src1 as dst1, ..., srcN as dstN` [pipe](#pipes) can be used. If some [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) must be copied, then `| copy src1 as dst1, ..., srcN as dstN` [pipe](#pipes) can be used.

View file

@ -350,7 +350,8 @@ func (q *Query) Clone() *Query {
func (q *Query) CanReturnLastNResults() bool { func (q *Query) CanReturnLastNResults() bool {
for _, p := range q.pipes { for _, p := range q.pipes {
switch p.(type) { switch p.(type) {
case *pipeFieldNames, case *pipeBlocksCount,
*pipeFieldNames,
*pipeFieldValues, *pipeFieldValues,
*pipeLimit, *pipeLimit,
*pipeOffset, *pipeOffset,

View file

@ -967,6 +967,12 @@ func TestParseQuerySuccess(t *testing.T) {
// field_names pipe // field_names pipe
f(`foo | field_names as x`, `foo | field_names as x`) f(`foo | field_names as x`, `foo | field_names as x`)
f(`foo | field_names y`, `foo | field_names as y`) f(`foo | field_names y`, `foo | field_names as y`)
f(`foo | field_names`, `foo | field_names`)
// blocks_count pipe
f(`foo | blocks_count as x`, `foo | blocks_count as x`)
f(`foo | blocks_count y`, `foo | blocks_count as y`)
f(`foo | blocks_count`, `foo | blocks_count`)
// copy and cp pipe // copy and cp pipe
f(`* | copy foo as bar`, `* | copy foo as bar`) f(`* | copy foo as bar`, `* | copy foo as bar`)
@ -1458,6 +1464,17 @@ func TestParseQueryFailure(t *testing.T) {
f(`foo | field_names x y`) f(`foo | field_names x y`)
f(`foo | field_names x, y`) f(`foo | field_names x, y`)
// invalid blocks_count
f(`foo | blocks_count |`)
f(`foo | blocks_count (`)
f(`foo | blocks_count )`)
f(`foo | blocks_count ,`)
f(`foo | blocks_count ()`)
f(`foo | blocks_count (x)`)
f(`foo | blocks_count (x,y)`)
f(`foo | blocks_count x y`)
f(`foo | blocks_count x, y`)
// invalid copy and cp pipe // invalid copy and cp pipe
f(`foo | copy`) f(`foo | copy`)
f(`foo | cp`) f(`foo | cp`)
@ -1820,6 +1837,14 @@ func TestQueryGetNeededColumns(t *testing.T) {
f(`* | fields x,y | field_names as bar | fields baz`, `x,y`, ``) f(`* | fields x,y | field_names as bar | fields baz`, `x,y`, ``)
f(`* | rm x,y | field_names as bar | fields baz`, `*`, `x,y`) f(`* | rm x,y | field_names as bar | fields baz`, `*`, `x,y`)
f(`* | blocks_count as foo`, ``, ``)
f(`* | blocks_count foo | fields bar`, ``, ``)
f(`* | blocks_count foo | fields foo`, ``, ``)
f(`* | blocks_count foo | rm foo`, ``, ``)
f(`* | blocks_count foo | rm bar`, ``, ``)
f(`* | fields x,y | blocks_count as bar | fields baz`, ``, ``)
f(`* | rm x,y | blocks_count as bar | fields baz`, ``, ``)
f(`* | format "foo" as s1`, `*`, `s1`) f(`* | format "foo" as s1`, `*`, `s1`)
f(`* | format "foo<f1>" as s1`, `*`, `s1`) f(`* | format "foo<f1>" as s1`, `*`, `s1`)
f(`* | format "foo<s1>" as s1`, `*`, ``) f(`* | format "foo<s1>" as s1`, `*`, ``)
@ -1932,6 +1957,8 @@ func TestQueryGetNeededColumns(t *testing.T) {
f(`* | extract_regexp if (q:w p:a) "(?P<f1>.*)bar" from x | count() r1`, `p,q`, ``) f(`* | extract_regexp if (q:w p:a) "(?P<f1>.*)bar" from x | count() r1`, `p,q`, ``)
f(`* | field_names | count() r1`, `*`, `_time`) f(`* | field_names | count() r1`, `*`, `_time`)
f(`* | limit 10 | field_names as abc | count() r1`, `*`, ``) f(`* | limit 10 | field_names as abc | count() r1`, `*`, ``)
f(`* | blocks_count | count() r1`, ``, ``)
f(`* | limit 10 | blocks_count as abc | count() r1`, ``, ``)
f(`* | fields a, b | count() r1`, ``, ``) f(`* | fields a, b | count() r1`, ``, ``)
f(`* | field_values a | count() r1`, `a`, ``) f(`* | field_values a | count() r1`, `a`, ``)
f(`* | limit 10 | filter a:b c:d | count() r1`, `a,c`, ``) f(`* | limit 10 | filter a:b c:d | count() r1`, `a,c`, ``)
@ -2030,6 +2057,7 @@ func TestQueryCanReturnLastNResults(t *testing.T) {
f("* | limit 10", false) f("* | limit 10", false)
f("* | offset 10", false) f("* | offset 10", false)
f("* | uniq (x)", false) f("* | uniq (x)", false)
f("* | blocks_count", false)
f("* | field_names", false) f("* | field_names", false)
f("* | field_values x", false) f("* | field_values x", false)
f("* | top 5 by (x)", false) f("* | top 5 by (x)", false)
@ -2056,6 +2084,7 @@ func TestQueryCanLiveTail(t *testing.T) {
f("* | drop_empty_fields", true) f("* | drop_empty_fields", true)
f("* | extract 'foo<bar>baz'", true) f("* | extract 'foo<bar>baz'", true)
f("* | extract_regexp 'foo(?P<bar>baz)'", true) f("* | extract_regexp 'foo(?P<bar>baz)'", true)
f("* | blocks_count a", false)
f("* | field_names a", false) f("* | field_names a", false)
f("* | fields a, b", true) f("* | fields a, b", true)
f("* | field_values a", false) f("* | field_values a", false)
@ -2224,6 +2253,7 @@ func TestQueryGetStatsByFields_Failure(t *testing.T) {
f(`foo | count() | drop_empty_fields`) f(`foo | count() | drop_empty_fields`)
f(`foo | count() | extract "foo<bar>baz"`) f(`foo | count() | extract "foo<bar>baz"`)
f(`foo | count() | extract_regexp "(?P<ip>([0-9]+[.]){3}[0-9]+)"`) f(`foo | count() | extract_regexp "(?P<ip>([0-9]+[.]){3}[0-9]+)"`)
f(`foo | count() | blocks_count`)
f(`foo | count() | field_names`) f(`foo | count() | field_names`)
f(`foo | count() | field_values abc`) f(`foo | count() | field_values abc`)
f(`foo | by (x) count() | fields a, b`) f(`foo | by (x) count() | fields a, b`)

View file

@ -99,6 +99,12 @@ func parsePipes(lex *lexer) ([]pipe, error) {
func parsePipe(lex *lexer) (pipe, error) { func parsePipe(lex *lexer) (pipe, error) {
switch { switch {
case lex.isKeyword("blocks_count"):
pc, err := parsePipeBlocksCount(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'blocks_count' pipe: %w", err)
}
return pc, nil
case lex.isKeyword("copy", "cp"): case lex.isKeyword("copy", "cp"):
pc, err := parsePipeCopy(lex) pc, err := parsePipeCopy(lex)
if err != nil { if err != nil {
@ -284,6 +290,7 @@ func parsePipe(lex *lexer) (pipe, error) {
var pipeNames = func() map[string]struct{} { var pipeNames = func() map[string]struct{} {
a := []string{ a := []string{
"blocks_count",
"copy", "cp", "copy", "cp",
"delete", "del", "rm", "drop", "delete", "del", "rm", "drop",
"drop_empty_fields", "drop_empty_fields",

View file

@ -0,0 +1,136 @@
package logstorage
import (
"fmt"
"unsafe"
)
// pipeBlocksCount processes '| blocks_count' pipe.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#blocks_count-pipe
type pipeBlocksCount struct {
// resultName is an optional name of the column to write results to.
// By default results are written into 'blocks_count' column.
resultName string
}
func (pc *pipeBlocksCount) String() string {
s := "blocks_count"
if pc.resultName != "blocks_count" {
s += " as " + quoteTokenIfNeeded(pc.resultName)
}
return s
}
func (pc *pipeBlocksCount) canLiveTail() bool {
return false
}
func (pc *pipeBlocksCount) updateNeededFields(neededFields, unneededFields fieldsSet) {
neededFields.reset()
unneededFields.reset()
}
func (pc *pipeBlocksCount) optimize() {
// nothing to do
}
func (pc *pipeBlocksCount) hasFilterInWithQuery() bool {
return false
}
func (pc *pipeBlocksCount) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) {
return pc, nil
}
func (pc *pipeBlocksCount) newPipeProcessor(workersCount int, stopCh <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
shards := make([]pipeBlocksCountProcessorShard, workersCount)
pcp := &pipeBlocksCountProcessor{
pc: pc,
stopCh: stopCh,
ppNext: ppNext,
shards: shards,
}
return pcp
}
type pipeBlocksCountProcessor struct {
pc *pipeBlocksCount
stopCh <-chan struct{}
ppNext pipeProcessor
shards []pipeBlocksCountProcessorShard
}
type pipeBlocksCountProcessorShard struct {
pipeBlocksCountProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(pipeBlocksCountProcessorShardNopad{})%128]byte
}
type pipeBlocksCountProcessorShardNopad struct {
blocksCount uint64
}
func (pcp *pipeBlocksCountProcessor) writeBlock(workerID uint, _ *blockResult) {
shard := &pcp.shards[workerID]
shard.blocksCount++
}
func (pcp *pipeBlocksCountProcessor) flush() error {
if needStop(pcp.stopCh) {
return nil
}
// merge state across shards
shards := pcp.shards
blocksCount := shards[0].blocksCount
shards = shards[1:]
for i := range shards {
blocksCount += shards[i].blocksCount
}
// write result
rowsCountStr := string(marshalUint64String(nil, blocksCount))
rcs := [1]resultColumn{}
rcs[0].name = pcp.pc.resultName
rcs[0].addValue(rowsCountStr)
var br blockResult
br.setResultColumns(rcs[:], 1)
pcp.ppNext.writeBlock(0, &br)
return nil
}
func parsePipeBlocksCount(lex *lexer) (*pipeBlocksCount, error) {
if !lex.isKeyword("blocks_count") {
return nil, fmt.Errorf("expecting 'blocks_count'; got %q", lex.token)
}
lex.nextToken()
resultName := "blocks_count"
if lex.isKeyword("as") {
lex.nextToken()
name, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result name for 'blocks_count': %w", err)
}
resultName = name
} else if !lex.isKeyword("", "|") {
name, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result name for 'blocks_count': %w", err)
}
resultName = name
}
pc := &pipeBlocksCount{
resultName: resultName,
}
return pc, nil
}

View file

@ -0,0 +1,48 @@
package logstorage
import (
"testing"
)
func TestParsePipeBlocksCountSuccess(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParsePipeSuccess(t, pipeStr)
}
f(`blocks_count`)
f(`blocks_count as x`)
}
func TestParsePipeBlocksCountFailure(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParsePipeFailure(t, pipeStr)
}
f(`blocks_count(foo)`)
f(`blocks_count a b`)
f(`blocks_count as`)
}
func TestPipeBlocksCountUpdateNeededFields(t *testing.T) {
f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
t.Helper()
expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected)
}
// all the needed fields
f("blocks_count as f1", "*", "", "", "")
// all the needed fields, unneeded fields do not intersect with src
f("blocks_count as f3", "*", "f1,f2", "", "")
// all the needed fields, unneeded fields intersect with src
f("blocks_count as f1", "*", "s1,f1,f2", "", "")
// needed fields do not intersect with src
f("blocks_count as f3", "f1,f2", "", "", "")
// needed fields intersect with src
f("blocks_count as f1", "s1,f1,f2", "", "", "")
}

View file

@ -8,7 +8,7 @@ import (
// pipeFieldNames processes '| field_names' pipe. // pipeFieldNames processes '| field_names' pipe.
// //
// See https://docs.victoriametrics.com/victorialogs/logsql/#field-names-pipe // See https://docs.victoriametrics.com/victorialogs/logsql/#field_names-pipe
type pipeFieldNames struct { type pipeFieldNames struct {
// resultName is an optional name of the column to write results to. // resultName is an optional name of the column to write results to.
// By default results are written into 'name' column. // By default results are written into 'name' column.