lib/logstorage: properly cache replace() and replace_regexp() results for identical adjacent field values

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7162
This commit is contained in:
Aliaksandr Valialkin 2024-10-30 22:14:18 +01:00
parent 6a6d08d03d
commit 2e635a42d8
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
4 changed files with 31 additions and 2 deletions

View file

@ -17,6 +17,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
* FEATURE: add support for extra filters across all the [HTTP querying APIs](https://docs.victoriametrics.com/victorialogs/querying/#http-api). See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#extra-filters) for details. This is needed for implementing quick filtering on field values at [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7365). * FEATURE: add support for extra filters across all the [HTTP querying APIs](https://docs.victoriametrics.com/victorialogs/querying/#http-api). See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#extra-filters) for details. This is needed for implementing quick filtering on field values at [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7365).
* BUGFIX: properly apply [`replace`](https://docs.victoriametrics.com/victorialogs/logsql/#replace-pipe) and [`replace_regexp`](https://docs.victoriametrics.com/victorialogs/logsql/#replace_regexp-pipe) pipes to identical values in adjacent log entries. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7162).
## [v0.39.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.39.0-victorialogs) ## [v0.39.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.39.0-victorialogs)
Released at 2024-10-30 Released at 2024-10-30

View file

@ -70,6 +70,12 @@ func TestPipeReplaceRegexp(t *testing.T) {
{"_msg", `a_bc_d/ef`}, {"_msg", `a_bc_d/ef`},
{"bar", `cde`}, {"bar", `cde`},
}, },
{
{"_msg", `a_bc_d/ef`},
},
{
{"_msg", `1234`},
},
{ {
{"_msg", `1234`}, {"_msg", `1234`},
}, },
@ -78,6 +84,12 @@ func TestPipeReplaceRegexp(t *testing.T) {
{"_msg", `a-bc-d-ef`}, {"_msg", `a-bc-d-ef`},
{"bar", `cde`}, {"bar", `cde`},
}, },
{
{"_msg", `a-bc-d-ef`},
},
{
{"_msg", `1234`},
},
{ {
{"_msg", `1234`}, {"_msg", `1234`},
}, },

View file

@ -48,6 +48,12 @@ func TestPipeReplace(t *testing.T) {
{"_msg", `a_bc_def`}, {"_msg", `a_bc_def`},
{"bar", `cde`}, {"bar", `cde`},
}, },
{
{"_msg", `a_bc_def`},
},
{
{"_msg", `1234`},
},
{ {
{"_msg", `1234`}, {"_msg", `1234`},
}, },
@ -56,6 +62,12 @@ func TestPipeReplace(t *testing.T) {
{"_msg", `a-bc-def`}, {"_msg", `a-bc-def`},
{"bar", `cde`}, {"bar", `cde`},
}, },
{
{"_msg", `a-bc-def`},
},
{
{"_msg", `1234`},
},
{ {
{"_msg", `1234`}, {"_msg", `1234`},
}, },

View file

@ -86,16 +86,19 @@ func (pup *pipeUpdateProcessor) writeBlock(workerID uint, br *blockResult) {
hadUpdates := false hadUpdates := false
vPrev := "" vPrev := ""
vNew := ""
for rowIdx, v := range values { for rowIdx, v := range values {
if bm.isSetBit(rowIdx) { if bm.isSetBit(rowIdx) {
if !hadUpdates || vPrev != v { if !hadUpdates || vPrev != v {
vPrev = v vPrev = v
hadUpdates = true hadUpdates = true
v = pup.updateFunc(&shard.a, v) vNew = pup.updateFunc(&shard.a, v)
} }
shard.rc.addValue(vNew)
} else {
shard.rc.addValue(v)
} }
shard.rc.addValue(v)
} }
br.addResultColumn(&shard.rc) br.addResultColumn(&shard.rc)