diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index d94adf417..fe1f4bee4 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -18,7 +18,9 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta * FEATURE: add [`join` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#join-pipe), which can be used for performing SQL-like joins. * FEATURE: support returning historical logs from [live tailing API](https://docs.victoriametrics.com/victorialogs/querying/#live-tailing) via `start_offset` query arg. For example, request to `/select/logsql/tail?query=*&start_offset=5m` returns logs for the last 5 minutes before starting returning live tailing logs for the given `query`. * FEATURE: add an ability to specify extra fields for logs ingested via [HTTP-based data ingestion protocols](https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-apis). See `extra_fields` query arg and `VL-Extra-Fields` HTTP header in [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-parameters). -* BUGFIX: Properly parse structured metadata when ingesting logs with Loki ingestion protocol. An issue has been introduced in [v0.3.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.3.0-victorialogs). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7431) for the details. +* FEATURE: add [`block_stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#block_stats-pipe) for returning various per-block stats. This pipe is useful for debugging. + +* BUGFIX: properly parse structured metadata when ingesting logs with [Loki ingestion protocol](https://docs.victoriametrics.com/victorialogs/data-ingestion/promtail/). The issue has been introduced in [v0.3.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.3.0-victorialogs). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7431) for the details. ## [v0.40.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.40.0-victorialogs) diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 8658c6d40..024676aa6 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1293,6 +1293,7 @@ _time:5m | stats by (_stream) count() per_stream_logs | sort by (per_stream_logs LogsQL supports the following pipes: +- [`block_stats`](#block_stats-pipe) returns various stats for the selected blocks with logs. - [`blocks_count`](#blocks_count-pipe) counts the number of blocks with logs processed by the query. - [`copy`](#copy-pipe) copies [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`delete`](#delete-pipe) deletes [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). @@ -1325,10 +1326,34 @@ LogsQL supports the following pipes: - [`unpack_syslog`](#unpack_syslog-pipe) unpacks [syslog](https://en.wikipedia.org/wiki/Syslog) messages from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`unroll`](#unroll-pipe) unrolls JSON arrays from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +### block_stats pipe + +` | block_stats` [pipe](#pipes) returns the following stats per each block processed by ``. This pipe is needed mostly for debugging. + +The returned per-block stats: + +- `field` - [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) name +- `rows` - the number of rows at the given `field. +- `type` - internal storage type for the given `field` +- `values_bytes` - on-disk size of the data for the given `field` +- `bloom_bytes` - on-disk size of bloom filter data for the given `field` +- `dict_bytes` - on-disk size of the dictionary data for the given `field` +- `dict_items` - the number of unique values in the dictionary for the given `field` + +See also: + +- [`blocks_count` pipe](#blocks_count-pipe) +- [`len` pipe](#len-pipe) + ### blocks_count pipe ` | blocks_count` [pipe](#pipes) counts the number of blocks with logs processed by ``. This pipe is needed mostly for debugging. +See also: + +- [`block_stats` pipe](#block_stats-pipe) +- [`len` pipe](#len-pipe) + ### copy pipe If some [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) must be copied, then `| copy src1 as dst1, ..., srcN as dstN` [pipe](#pipes) can be used. @@ -1820,6 +1845,7 @@ See also: - [`sum_len` stats function](#sum_len-stats) - [`sort` pipe](#sort-pipe) - [`limit` pipe](#limit-pipe) +- [`block_stats` pipe](#block_stats-pipe) ### limit pipe diff --git a/lib/logstorage/encoding.go b/lib/logstorage/encoding.go index 083519957..ebca8f3df 100644 --- a/lib/logstorage/encoding.go +++ b/lib/logstorage/encoding.go @@ -243,14 +243,25 @@ func marshalBytesBlock(dst, src []byte) []byte { // Compress the block dst = append(dst, marshalBytesTypeZSTD) + compressLevel := getCompressLevel(len(src)) bb := bbPool.Get() - bb.B = encoding.CompressZSTDLevel(bb.B[:0], src, 1) + bb.B = encoding.CompressZSTDLevel(bb.B[:0], src, compressLevel) dst = encoding.MarshalVarUint64(dst, uint64(len(bb.B))) dst = append(dst, bb.B...) bbPool.Put(bb) return dst } +func getCompressLevel(dataLen int) int { + if dataLen <= 512 { + return 1 + } + if dataLen <= 4*1024 { + return 2 + } + return 3 +} + func unmarshalBytesBlock(dst, src []byte) ([]byte, []byte, error) { if len(src) < 1 { return dst, src, fmt.Errorf("cannot unmarshal block type from empty src") diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index 563878674..b183bdd46 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -349,9 +349,11 @@ func (q *Query) Clone(timestamp int64) *Query { func (q *Query) CanReturnLastNResults() bool { for _, p := range q.pipes { switch p.(type) { - case *pipeBlocksCount, + case *pipeBlockStats, + *pipeBlocksCount, *pipeFieldNames, *pipeFieldValues, + *pipeJoin, *pipeLimit, *pipeOffset, *pipeTop, diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index efc99246d..615a80a40 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -982,6 +982,9 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | field_values x`, `* | field_values x`) f(`* | field_values (x)`, `* | field_values x`) + // block_stats pipe + f(`foo | block_stats`, `foo | block_stats`) + // blocks_count pipe f(`foo | blocks_count as x`, `foo | blocks_count as x`) f(`foo | blocks_count y`, `foo | blocks_count as y`) @@ -1222,6 +1225,11 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | unpack_logfmt from x`, `* | unpack_logfmt from x`) f(`* | unpack_logfmt from x result_prefix y`, `* | unpack_logfmt from x result_prefix y`) + // join pipe + f(`* | join by (x) (foo:bar)`, `* | join by (x) (foo:bar)`) + f(`* | join on (x, y) (foo:bar)`, `* | join by (x, y) (foo:bar)`) + f(`* | join (x, y) (foo:bar)`, `* | join by (x, y) (foo:bar)`) + // multiple different pipes f(`* | fields foo, bar | limit 100 | stats by(foo,bar) count(baz) as qwert`, `* | fields foo, bar | limit 100 | stats by (foo, bar) count(baz) as qwert`) f(`* | skip 100 | head 20 | skip 10`, `* | offset 100 | limit 20 | offset 10`) @@ -1504,6 +1512,11 @@ func TestParseQueryFailure(t *testing.T) { f(`foo | field_names x y`) f(`foo | field_names x, y`) + // invalid block_stats + f(`foo | block_stats foo`) + f(`foo | block_stats ()`) + f(`foo | block_stats (foo)`) + // invalid blocks_count f(`foo | blocks_count |`) f(`foo | blocks_count (`) @@ -1894,6 +1907,10 @@ func TestQueryGetNeededColumns(t *testing.T) { f(`* | fields x,y | field_names as bar | fields baz`, `x,y`, ``) f(`* | rm x,y | field_names as bar | fields baz`, `*`, `x,y`) + f(`* | block_stats`, `*`, ``) + f(`* | block_stats | fields foo`, `*`, ``) + f(`* | block_stats | rm foo`, `*`, ``) + f(`* | blocks_count as foo`, ``, ``) f(`* | blocks_count foo | fields bar`, ``, ``) f(`* | blocks_count foo | fields foo`, ``, ``) @@ -2014,6 +2031,7 @@ func TestQueryGetNeededColumns(t *testing.T) { f(`* | extract_regexp if (q:w p:a) "(?P.*)bar" from x | count() r1`, `p,q`, ``) f(`* | field_names | count() r1`, ``, ``) f(`* | limit 10 | field_names as abc | count() r1`, `*`, ``) + f(`* | block_stats | count() r1`, `*`, ``) f(`* | blocks_count | count() r1`, ``, ``) f(`* | limit 10 | blocks_count as abc | count() r1`, ``, ``) f(`* | fields a, b | count() r1`, ``, ``) @@ -2117,10 +2135,12 @@ func TestQueryCanReturnLastNResults(t *testing.T) { f("* | limit 10", false) f("* | offset 10", false) f("* | uniq (x)", false) + f("* | block_stats", false) f("* | blocks_count", false) f("* | field_names", false) f("* | field_values x", false) f("* | top 5 by (x)", false) + f("* | join by (x) (foo)", false) } @@ -2144,6 +2164,7 @@ func TestQueryCanLiveTail(t *testing.T) { f("* | drop_empty_fields", true) f("* | extract 'foobaz'", true) f("* | extract_regexp 'foo(?Pbaz)'", true) + f("* | block_stats", false) f("* | blocks_count a", false) f("* | field_names a", false) f("* | fields a, b", true) @@ -2341,6 +2362,7 @@ func TestQueryGetStatsByFields_Failure(t *testing.T) { f(`foo | count() | drop_empty_fields`) f(`foo | count() | extract "foobaz"`) f(`foo | count() | extract_regexp "(?P([0-9]+[.]){3}[0-9]+)"`) + f(`foo | count() | block_stats`) f(`foo | count() | blocks_count`) f(`foo | count() | field_names`) f(`foo | count() | field_values abc`) diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index d324f0cd8..00ac68e42 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -99,6 +99,12 @@ func parsePipes(lex *lexer) ([]pipe, error) { func parsePipe(lex *lexer) (pipe, error) { switch { + case lex.isKeyword("block_stats"): + ps, err := parsePipeBlockStats(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'block_stats' pipe: %w", err) + } + return ps, nil case lex.isKeyword("blocks_count"): pc, err := parsePipeBlocksCount(lex) if err != nil { @@ -302,6 +308,7 @@ func parsePipe(lex *lexer) (pipe, error) { var pipeNames = func() map[string]struct{} { a := []string{ + "block_stats", "blocks_count", "copy", "cp", "delete", "del", "rm", "drop", diff --git a/lib/logstorage/pipe_block_stats.go b/lib/logstorage/pipe_block_stats.go new file mode 100644 index 000000000..8e6fc0b47 --- /dev/null +++ b/lib/logstorage/pipe_block_stats.go @@ -0,0 +1,218 @@ +package logstorage + +import ( + "fmt" + "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" +) + +// pipeBlockStats processes '| block_stats ...' pipe. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#block_stats-pipe +type pipeBlockStats struct { +} + +func (ps *pipeBlockStats) String() string { + return "block_stats" +} + +func (ps *pipeBlockStats) canLiveTail() bool { + return false +} + +func (ps *pipeBlockStats) optimize() { + // nothing to do +} + +func (ps *pipeBlockStats) hasFilterInWithQuery() bool { + return false +} + +func (ps *pipeBlockStats) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) { + return ps, nil +} + +func (ps *pipeBlockStats) updateNeededFields(neededFields, unneededFields fieldsSet) { + unneededFields.reset() + neededFields.add("*") +} + +func (ps *pipeBlockStats) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { + return &pipeBlockStatsProcessor{ + ppNext: ppNext, + + shards: make([]pipeBlockStatsProcessorShard, workersCount), + } +} + +type pipeBlockStatsProcessor struct { + ppNext pipeProcessor + + shards []pipeBlockStatsProcessorShard +} + +type pipeBlockStatsProcessorShard struct { + pipeBlockStatsProcessorShardNopad + + // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof(pipeBlockStatsProcessorShardNopad{})%128]byte +} + +type pipeBlockStatsProcessorShardNopad struct { + wctx pipeBlockStatsWriteContext +} + +func (psp *pipeBlockStatsProcessor) writeBlock(workerID uint, br *blockResult) { + if br.rowsLen == 0 { + return + } + + shard := &psp.shards[workerID] + shard.wctx.init(workerID, psp.ppNext, br.rowsLen) + + cs := br.getColumns() + for _, c := range cs { + if c.isConst { + shard.wctx.writeRow(c.name, "const", uint64(len(c.valuesEncoded[0])), 0, 0, 0) + continue + } + if c.isTime { + var blockSize uint64 + if br.bs != nil { + blockSize = br.bs.bsw.bh.timestampsHeader.blockSize + } + shard.wctx.writeRow(c.name, "time", blockSize, 0, 0, 0) + continue + } + if br.bs == nil { + shard.wctx.writeRow(c.name, "inmemory", 0, 0, 0, 0) + continue + } + + typ := c.valueType.String() + ch := br.bs.getColumnHeader(c.name) + dictSize := 0 + dictItemsCount := len(ch.valuesDict.values) + if c.valueType == valueTypeDict { + for _, v := range ch.valuesDict.values { + dictSize += len(v) + } + } + shard.wctx.writeRow(c.name, typ, ch.valuesSize, ch.bloomFilterSize, uint64(dictItemsCount), uint64(dictSize)) + } + + shard.wctx.flush() + shard.wctx.reset() +} + +func (psp *pipeBlockStatsProcessor) flush() error { + return nil +} + +func parsePipeBlockStats(lex *lexer) (*pipeBlockStats, error) { + if !lex.isKeyword("block_stats") { + return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "block_stats") + } + lex.nextToken() + + ps := &pipeBlockStats{} + + return ps, nil +} + +type pipeBlockStatsWriteContext struct { + workerID uint + ppNext pipeProcessor + + a arena + rowsLen int + tmpBuf []byte + + rcs []resultColumn + br blockResult + + // rowsCount is the number of rows in the current block + rowsCount int +} + +func (wctx *pipeBlockStatsWriteContext) reset() { + wctx.workerID = 0 + wctx.ppNext = nil + + wctx.a.reset() + wctx.rowsLen = 0 + wctx.tmpBuf = wctx.tmpBuf[:0] + + rcs := wctx.rcs + for i := range rcs { + rcs[i].reset() + } + wctx.rcs = rcs[:0] + + wctx.rowsCount = 0 +} + +func (wctx *pipeBlockStatsWriteContext) init(workerID uint, ppNext pipeProcessor, rowsLen int) { + wctx.reset() + + wctx.workerID = workerID + wctx.ppNext = ppNext + + wctx.rowsLen = rowsLen +} + +func (wctx *pipeBlockStatsWriteContext) writeRow(columnName, columnType string, valuesSize, bloomSize, dictItems, dictSize uint64) { + rcs := wctx.rcs + if len(rcs) == 0 { + wctx.rcs = slicesutil.SetLength(wctx.rcs, 7) + rcs = wctx.rcs + + rcs[0].name = "field" + rcs[1].name = "type" + rcs[2].name = "values_bytes" + rcs[3].name = "bloom_bytes" + rcs[4].name = "dict_items" + rcs[5].name = "dict_bytes" + rcs[6].name = "rows" + } + + wctx.addValue(&rcs[0], columnName) + wctx.addValue(&rcs[1], columnType) + wctx.addUint64Value(&rcs[2], valuesSize) + wctx.addUint64Value(&rcs[3], bloomSize) + wctx.addUint64Value(&rcs[4], dictItems) + wctx.addUint64Value(&rcs[5], dictSize) + wctx.addUint64Value(&rcs[6], uint64(wctx.rowsLen)) + + wctx.rowsCount++ + if len(wctx.a.b) >= 1_000_000 { + wctx.flush() + } +} + +func (wctx *pipeBlockStatsWriteContext) addUint64Value(rc *resultColumn, n uint64) { + wctx.tmpBuf = marshalUint64String(wctx.tmpBuf[:0], n) + wctx.addValue(rc, bytesutil.ToUnsafeString(wctx.tmpBuf)) +} + +func (wctx *pipeBlockStatsWriteContext) addValue(rc *resultColumn, v string) { + vCopy := wctx.a.copyString(v) + rc.addValue(vCopy) +} + +func (wctx *pipeBlockStatsWriteContext) flush() { + rcs := wctx.rcs + + // Flush rcs to ppNext + br := &wctx.br + br.setResultColumns(rcs, wctx.rowsCount) + wctx.rowsCount = 0 + wctx.ppNext.writeBlock(wctx.workerID, br) + br.reset() + for i := range rcs { + rcs[i].resetValues() + } + wctx.a.reset() +} diff --git a/lib/logstorage/pipe_block_stats_test.go b/lib/logstorage/pipe_block_stats_test.go new file mode 100644 index 000000000..837f23c68 --- /dev/null +++ b/lib/logstorage/pipe_block_stats_test.go @@ -0,0 +1,41 @@ +package logstorage + +import ( + "testing" +) + +func TestParsePipeBlockStatsSuccess(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeSuccess(t, pipeStr) + } + + f(`block_stats`) +} + +func TestParsePipeBlockStatsFailure(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeFailure(t, pipeStr) + } + + f(`block_stats foo`) + f(`block_stats ()`) + f(`block_stats (foo)`) +} + +func TestPipeBlockStatsUpdateNeededFields(t *testing.T) { + f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { + t.Helper() + expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected) + } + + // all the needed fields + f("block_stats", "*", "", "*", "") + + // all the needed fields, plus unneeded fields + f("block_stats", "*", "f1,f2", "*", "") + + // needed fields + f("block_stats", "f1,f2", "", "*", "") +} diff --git a/lib/logstorage/pipe_join.go b/lib/logstorage/pipe_join.go index b4b73ab15..82a584053 100644 --- a/lib/logstorage/pipe_join.go +++ b/lib/logstorage/pipe_join.go @@ -38,7 +38,7 @@ func (pj *pipeJoin) hasFilterInWithQuery() bool { return false } -func (pj *pipeJoin) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) { +func (pj *pipeJoin) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) { return pj, nil } @@ -122,6 +122,9 @@ func (pjp *pipeJoinProcessor) writeBlock(workerID uint, br *blockResult) { continue } for _, extraFields := range matchingRows { + if needStop(pjp.stopCh) { + return + } shard.wctx.writeRow(rowIdx, extraFields) } } diff --git a/lib/logstorage/pipe_unroll.go b/lib/logstorage/pipe_unroll.go index edeb3d5d2..fb9202caa 100644 --- a/lib/logstorage/pipe_unroll.go +++ b/lib/logstorage/pipe_unroll.go @@ -134,10 +134,10 @@ func (pup *pipeUnrollProcessor) writeBlock(workerID uint, br *blockResult) { fields := shard.fields for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ { + if needStop(pup.stopCh) { + return + } if bm.isSetBit(rowIdx) { - if needStop(pup.stopCh) { - return - } shard.writeUnrolledFields(pu.fields, columnValues, rowIdx) } else { fields = fields[:0] diff --git a/lib/logstorage/values_encoder.go b/lib/logstorage/values_encoder.go index 778e4fda8..ef5b91afa 100644 --- a/lib/logstorage/values_encoder.go +++ b/lib/logstorage/values_encoder.go @@ -55,6 +55,33 @@ const ( valueTypeTimestampISO8601 = valueType(9) ) +func (t valueType) String() string { + switch t { + case valueTypeUnknown: + return "unknown" + case valueTypeString: + return "string" + case valueTypeDict: + return "dict" + case valueTypeUint8: + return "uint8" + case valueTypeUint16: + return "uint16" + case valueTypeUint32: + return "uint32" + case valueTypeUint64: + return "uint64" + case valueTypeFloat64: + return "float64" + case valueTypeIPv4: + return "ipv4" + case valueTypeTimestampISO8601: + return "iso8601" + default: + return fmt.Sprintf("unknown valueType=%d", t) + } +} + type valuesEncoder struct { // buf contains data for values. buf []byte