This commit is contained in:
Aliaksandr Valialkin 2024-05-25 15:51:47 +02:00
parent 33610de341
commit 46bc1c3435
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
7 changed files with 139 additions and 17 deletions

View file

@ -21,6 +21,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
* FEATURE: add [`replace_regexp` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#replace_regexp-pipe), which allows updating [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with regular expressions.
* BUGFIX: do not return referenced fields if they weren't present in the original logs. For example, `_time:5m | format if (non_existing_field:"") "abc"` could return empty `non_exiting_field`, while it shuldn't be returned because it is missing in the original logs.
* BUGFIX: properly initialize values for [`in(...)` filter](https://docs.victoriametrics.com/victorialogs/logsql/#exact-filter) inside [`filter` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#filter-pipe) if the `in(...)` contains other [filters](https://docs.victoriametrics.com/victorialogs/logsql/#filters). For example, `_time:5m | filter ip:in(user_type:admin | fields ip)` now works correctly.
## [v0.11.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.11.0-victorialogs)

View file

@ -31,6 +31,9 @@ type blockResult struct {
// csBuf contains requested columns.
csBuf []blockResultColumn
// csEmpty contains non-existing columns, which were referenced via getColumnByName()
csEmpty []blockResultColumn
// cs contains cached pointers to requested columns returned from getColumns() if csInitialized=true.
cs []*blockResultColumn
@ -49,6 +52,9 @@ func (br *blockResult) reset() {
clear(br.csBuf)
br.csBuf = br.csBuf[:0]
clear(br.csEmpty)
br.csEmpty = br.csEmpty[:0]
clear(br.cs)
br.cs = br.cs[:0]
@ -88,6 +94,8 @@ func (br *blockResult) clone() *blockResult {
}
brNew.csBuf = csNew
// do not clone br.csEmpty - it will be populated by the caller via getColumnByName().
return brNew
}
@ -1325,8 +1333,21 @@ func (br *blockResult) getColumnByName(columnName string) *blockResultColumn {
return cs[idx]
}
br.addConstColumn(columnName, "")
return &br.csBuf[len(br.csBuf)-1]
// Search for empty column with the given name
csEmpty := br.csEmpty
for i := range csEmpty {
if csEmpty[i].name == columnName {
return &csEmpty[i]
}
}
// Create missing empty column
br.csEmpty = append(br.csEmpty, blockResultColumn{
name: br.a.copyString(columnName),
isConst: true,
valuesEncoded: getEmptyStrings(1),
})
return &br.csEmpty[len(br.csEmpty)-1]
}
func (br *blockResult) getColumns() []*blockResultColumn {

View file

@ -2,6 +2,9 @@ package logstorage
import (
"fmt"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
)
// pipeExtract processes '| extract ...' pipe.
@ -99,20 +102,121 @@ func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet
}
func (pe *pipeExtract) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor {
patterns := make([]*pattern, workersCount)
for i := range patterns {
patterns[i] = pe.ptn.clone()
return &pipeExtractProcessor{
pe: pe,
ppBase: ppBase,
shards: make([]pipeExtractProcessorShard, workersCount),
}
}
type pipeExtractProcessor struct {
pe *pipeExtract
ppBase pipeProcessor
shards []pipeExtractProcessorShard
}
type pipeExtractProcessorShard struct {
pipeExtractProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(pipeExtractProcessorShardNopad{})%128]byte
}
type pipeExtractProcessorShardNopad struct {
bm bitmap
ptn *pattern
resultValues []string
rcs []resultColumn
a arena
}
func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) {
if len(br.timestamps) == 0 {
return
}
unpackFunc := func(uctx *fieldsUnpackerContext, s string) {
ptn := patterns[uctx.workerID]
ptn.apply(s)
for _, f := range ptn.fields {
uctx.addField(f.name, *f.value)
pe := pep.pe
shard := &pep.shards[workerID]
bm := &shard.bm
bm.init(len(br.timestamps))
bm.setBits()
if iff := pe.iff; iff != nil {
iff.f.applyToBlockResult(br, bm)
if bm.isZero() {
pep.ppBase.writeBlock(workerID, br)
return
}
}
return newPipeUnpackProcessor(workersCount, unpackFunc, ppBase, pe.fromField, "", pe.keepOriginalFields, pe.skipEmptyResults, pe.iff)
if shard.ptn == nil {
shard.ptn = pe.ptn.clone()
}
ptn := shard.ptn
shard.rcs = slicesutil.SetLength(shard.rcs, len(ptn.fields))
rcs := shard.rcs
for i := range ptn.fields {
rcs[i].name = ptn.fields[i].name
}
c := br.getColumnByName(pe.fromField)
values := c.getValues(br)
shard.resultValues = slicesutil.SetLength(shard.resultValues, len(rcs))
resultValues := shard.resultValues
hadUpdates := false
vPrev := ""
for rowIdx, v := range values {
if bm.isSetBit(rowIdx) {
if !hadUpdates || vPrev != v {
vPrev = v
hadUpdates = true
ptn.apply(v)
for i, f := range ptn.fields {
v := *f.value
if v == "" && pe.skipEmptyResults || pe.keepOriginalFields {
c := br.getColumnByName(rcs[i].name)
if vOrig := c.getValueAtRow(br, rowIdx); vOrig != "" {
v = vOrig
}
} else {
v = shard.a.copyString(v)
}
resultValues[i] = v
}
}
} else {
for i := range rcs {
c := br.getColumnByName(rcs[i].name)
v := c.getValueAtRow(br, rowIdx)
resultValues[i] = v
}
}
for i, v := range resultValues {
rcs[i].addValue(v)
}
}
for i := range rcs {
br.addResultColumn(&rcs[i])
}
pep.ppBase.writeBlock(workerID, br)
for i := range rcs {
rcs[i].reset()
}
shard.a.reset()
}
func (pep *pipeExtractProcessor) flush() error {
return nil
}
func parsePipeExtract(lex *lexer) (*pipeExtract, error) {

View file

@ -166,7 +166,6 @@ func TestPipeUnpackJSON(t *testing.T) {
}, [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"x", ""},
},
})
@ -313,7 +312,6 @@ func TestPipeUnpackJSON(t *testing.T) {
{"y", `abc`},
},
{
{"y", ""},
{"z", `foobar`},
{"x", `{"z":["bar",123]}`},
},

View file

@ -151,7 +151,6 @@ func TestPipeUnpackLogfmt(t *testing.T) {
},
}, [][]Field{
{
{"foo", ""},
{"_msg", `foo=bar baz="x y=z" a=b`},
},
})
@ -291,7 +290,6 @@ func TestPipeUnpackLogfmt(t *testing.T) {
{"y", `abc`},
},
{
{"y", ""},
{"z", `foobar`},
{"x", `z=bar`},
},

View file

@ -275,7 +275,7 @@ func TestStatsFieldsMax(t *testing.T) {
{
{"a", "1"},
{"b", ""},
{"x", `{"_msg":"def","a":"1","c":"foo","b":""}`},
{"x", `{"_msg":"def","a":"1","c":"foo"}`},
},
{
{"a", "3"},

View file

@ -274,7 +274,7 @@ func TestStatsFieldsMin(t *testing.T) {
{
{"a", "1"},
{"b", ""},
{"x", `{"_msg":"def","a":"1","c":"foo","b":""}`},
{"x", `{"_msg":"def","a":"1","c":"foo"}`},
},
{
{"a", "3"},