From 46bc1c3435c19383ae0f1a718c2dccff6d6b590e Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 25 May 2024 15:51:47 +0200 Subject: [PATCH] wip --- docs/VictoriaLogs/CHANGELOG.md | 1 + lib/logstorage/block_result.go | 25 ++++- lib/logstorage/pipe_extract.go | 122 ++++++++++++++++++++-- lib/logstorage/pipe_unpack_json_test.go | 2 - lib/logstorage/pipe_unpack_logfmt_test.go | 2 - lib/logstorage/stats_fields_max_test.go | 2 +- lib/logstorage/stats_fields_min_test.go | 2 +- 7 files changed, 139 insertions(+), 17 deletions(-) diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 59ae6447f..0cd780394 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -21,6 +21,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta * FEATURE: add [`replace_regexp` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#replace_regexp-pipe), which allows updating [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with regular expressions. +* BUGFIX: do not return referenced fields if they weren't present in the original logs. For example, `_time:5m | format if (non_existing_field:"") "abc"` could return empty `non_exiting_field`, while it shuldn't be returned because it is missing in the original logs. * BUGFIX: properly initialize values for [`in(...)` filter](https://docs.victoriametrics.com/victorialogs/logsql/#exact-filter) inside [`filter` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#filter-pipe) if the `in(...)` contains other [filters](https://docs.victoriametrics.com/victorialogs/logsql/#filters). For example, `_time:5m | filter ip:in(user_type:admin | fields ip)` now works correctly. ## [v0.11.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.11.0-victorialogs) diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index 9069579d6..e88916cc6 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -31,6 +31,9 @@ type blockResult struct { // csBuf contains requested columns. csBuf []blockResultColumn + // csEmpty contains non-existing columns, which were referenced via getColumnByName() + csEmpty []blockResultColumn + // cs contains cached pointers to requested columns returned from getColumns() if csInitialized=true. cs []*blockResultColumn @@ -49,6 +52,9 @@ func (br *blockResult) reset() { clear(br.csBuf) br.csBuf = br.csBuf[:0] + clear(br.csEmpty) + br.csEmpty = br.csEmpty[:0] + clear(br.cs) br.cs = br.cs[:0] @@ -88,6 +94,8 @@ func (br *blockResult) clone() *blockResult { } brNew.csBuf = csNew + // do not clone br.csEmpty - it will be populated by the caller via getColumnByName(). + return brNew } @@ -1325,8 +1333,21 @@ func (br *blockResult) getColumnByName(columnName string) *blockResultColumn { return cs[idx] } - br.addConstColumn(columnName, "") - return &br.csBuf[len(br.csBuf)-1] + // Search for empty column with the given name + csEmpty := br.csEmpty + for i := range csEmpty { + if csEmpty[i].name == columnName { + return &csEmpty[i] + } + } + + // Create missing empty column + br.csEmpty = append(br.csEmpty, blockResultColumn{ + name: br.a.copyString(columnName), + isConst: true, + valuesEncoded: getEmptyStrings(1), + }) + return &br.csEmpty[len(br.csEmpty)-1] } func (br *blockResult) getColumns() []*blockResultColumn { diff --git a/lib/logstorage/pipe_extract.go b/lib/logstorage/pipe_extract.go index 0171bf313..b0e16ae35 100644 --- a/lib/logstorage/pipe_extract.go +++ b/lib/logstorage/pipe_extract.go @@ -2,6 +2,9 @@ package logstorage import ( "fmt" + "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" ) // pipeExtract processes '| extract ...' pipe. @@ -99,20 +102,121 @@ func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet } func (pe *pipeExtract) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { - patterns := make([]*pattern, workersCount) - for i := range patterns { - patterns[i] = pe.ptn.clone() + return &pipeExtractProcessor{ + pe: pe, + ppBase: ppBase, + + shards: make([]pipeExtractProcessorShard, workersCount), + } +} + +type pipeExtractProcessor struct { + pe *pipeExtract + ppBase pipeProcessor + + shards []pipeExtractProcessorShard +} + +type pipeExtractProcessorShard struct { + pipeExtractProcessorShardNopad + + // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof(pipeExtractProcessorShardNopad{})%128]byte +} + +type pipeExtractProcessorShardNopad struct { + bm bitmap + ptn *pattern + + resultValues []string + rcs []resultColumn + a arena +} + +func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) { + if len(br.timestamps) == 0 { + return } - unpackFunc := func(uctx *fieldsUnpackerContext, s string) { - ptn := patterns[uctx.workerID] - ptn.apply(s) - for _, f := range ptn.fields { - uctx.addField(f.name, *f.value) + pe := pep.pe + shard := &pep.shards[workerID] + + bm := &shard.bm + bm.init(len(br.timestamps)) + bm.setBits() + if iff := pe.iff; iff != nil { + iff.f.applyToBlockResult(br, bm) + if bm.isZero() { + pep.ppBase.writeBlock(workerID, br) + return } } - return newPipeUnpackProcessor(workersCount, unpackFunc, ppBase, pe.fromField, "", pe.keepOriginalFields, pe.skipEmptyResults, pe.iff) + if shard.ptn == nil { + shard.ptn = pe.ptn.clone() + } + ptn := shard.ptn + + shard.rcs = slicesutil.SetLength(shard.rcs, len(ptn.fields)) + rcs := shard.rcs + for i := range ptn.fields { + rcs[i].name = ptn.fields[i].name + } + + c := br.getColumnByName(pe.fromField) + values := c.getValues(br) + + shard.resultValues = slicesutil.SetLength(shard.resultValues, len(rcs)) + resultValues := shard.resultValues + hadUpdates := false + vPrev := "" + for rowIdx, v := range values { + if bm.isSetBit(rowIdx) { + if !hadUpdates || vPrev != v { + vPrev = v + hadUpdates = true + + ptn.apply(v) + + for i, f := range ptn.fields { + v := *f.value + if v == "" && pe.skipEmptyResults || pe.keepOriginalFields { + c := br.getColumnByName(rcs[i].name) + if vOrig := c.getValueAtRow(br, rowIdx); vOrig != "" { + v = vOrig + } + } else { + v = shard.a.copyString(v) + } + resultValues[i] = v + } + } + } else { + for i := range rcs { + c := br.getColumnByName(rcs[i].name) + v := c.getValueAtRow(br, rowIdx) + resultValues[i] = v + } + } + + for i, v := range resultValues { + rcs[i].addValue(v) + } + } + + for i := range rcs { + br.addResultColumn(&rcs[i]) + } + pep.ppBase.writeBlock(workerID, br) + + for i := range rcs { + rcs[i].reset() + } + shard.a.reset() +} + +func (pep *pipeExtractProcessor) flush() error { + return nil } func parsePipeExtract(lex *lexer) (*pipeExtract, error) { diff --git a/lib/logstorage/pipe_unpack_json_test.go b/lib/logstorage/pipe_unpack_json_test.go index 2bc2e6564..889565734 100644 --- a/lib/logstorage/pipe_unpack_json_test.go +++ b/lib/logstorage/pipe_unpack_json_test.go @@ -166,7 +166,6 @@ func TestPipeUnpackJSON(t *testing.T) { }, [][]Field{ { {"_msg", `{"foo":"bar"}`}, - {"x", ""}, }, }) @@ -313,7 +312,6 @@ func TestPipeUnpackJSON(t *testing.T) { {"y", `abc`}, }, { - {"y", ""}, {"z", `foobar`}, {"x", `{"z":["bar",123]}`}, }, diff --git a/lib/logstorage/pipe_unpack_logfmt_test.go b/lib/logstorage/pipe_unpack_logfmt_test.go index 1ef3db2e5..90720118d 100644 --- a/lib/logstorage/pipe_unpack_logfmt_test.go +++ b/lib/logstorage/pipe_unpack_logfmt_test.go @@ -151,7 +151,6 @@ func TestPipeUnpackLogfmt(t *testing.T) { }, }, [][]Field{ { - {"foo", ""}, {"_msg", `foo=bar baz="x y=z" a=b`}, }, }) @@ -291,7 +290,6 @@ func TestPipeUnpackLogfmt(t *testing.T) { {"y", `abc`}, }, { - {"y", ""}, {"z", `foobar`}, {"x", `z=bar`}, }, diff --git a/lib/logstorage/stats_fields_max_test.go b/lib/logstorage/stats_fields_max_test.go index 6f1a59ce5..de4e6e5e5 100644 --- a/lib/logstorage/stats_fields_max_test.go +++ b/lib/logstorage/stats_fields_max_test.go @@ -275,7 +275,7 @@ func TestStatsFieldsMax(t *testing.T) { { {"a", "1"}, {"b", ""}, - {"x", `{"_msg":"def","a":"1","c":"foo","b":""}`}, + {"x", `{"_msg":"def","a":"1","c":"foo"}`}, }, { {"a", "3"}, diff --git a/lib/logstorage/stats_fields_min_test.go b/lib/logstorage/stats_fields_min_test.go index f45d3a139..dbbdb8d27 100644 --- a/lib/logstorage/stats_fields_min_test.go +++ b/lib/logstorage/stats_fields_min_test.go @@ -274,7 +274,7 @@ func TestStatsFieldsMin(t *testing.T) { { {"a", "1"}, {"b", ""}, - {"x", `{"_msg":"def","a":"1","c":"foo","b":""}`}, + {"x", `{"_msg":"def","a":"1","c":"foo"}`}, }, { {"a", "3"},