mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/logstorage: properly reset cached output fields for extract and extract_regexp pipes after the log entry matches if(...) condition
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7162
This commit is contained in:
parent
2e635a42d8
commit
c5d08d317c
5 changed files with 20 additions and 9 deletions
|
@ -18,6 +18,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
|
||||||
* FEATURE: add support for extra filters across all the [HTTP querying APIs](https://docs.victoriametrics.com/victorialogs/querying/#http-api). See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#extra-filters) for details. This is needed for implementing quick filtering on field values at [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7365).
|
* FEATURE: add support for extra filters across all the [HTTP querying APIs](https://docs.victoriametrics.com/victorialogs/querying/#http-api). See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#extra-filters) for details. This is needed for implementing quick filtering on field values at [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7365).
|
||||||
|
|
||||||
* BUGFIX: properly apply [`replace`](https://docs.victoriametrics.com/victorialogs/logsql/#replace-pipe) and [`replace_regexp`](https://docs.victoriametrics.com/victorialogs/logsql/#replace_regexp-pipe) pipes to identical values in adjacent log entries. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7162).
|
* BUGFIX: properly apply [`replace`](https://docs.victoriametrics.com/victorialogs/logsql/#replace-pipe) and [`replace_regexp`](https://docs.victoriametrics.com/victorialogs/logsql/#replace_regexp-pipe) pipes to identical values in adjacent log entries. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7162).
|
||||||
|
* BUGFIX: properly apply [`extract`](https://docs.victoriametrics.com/victorialogs/logsql/#extract-pipe) and [`extract_regexp`](https://docs.victoriametrics.com/victorialogs/logsql/#extract_regexp-pipe) pipe with additional `if (...)` filter (aka [conditional extract](https://docs.victoriametrics.com/victorialogs/logsql/#conditional-extract) and [conditional extract_regexp](https://docs.victoriametrics.com/victorialogs/logsql/#conditional-extract_regexp)).
|
||||||
|
|
||||||
## [v0.39.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.39.0-victorialogs)
|
## [v0.39.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.39.0-victorialogs)
|
||||||
|
|
||||||
|
|
|
@ -192,13 +192,13 @@ func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) {
|
||||||
shard.resultValues = slicesutil.SetLength(shard.resultValues, len(rcs))
|
shard.resultValues = slicesutil.SetLength(shard.resultValues, len(rcs))
|
||||||
resultValues := shard.resultValues
|
resultValues := shard.resultValues
|
||||||
|
|
||||||
hadUpdates := false
|
needUpdates := true
|
||||||
vPrev := ""
|
vPrev := ""
|
||||||
for rowIdx, v := range values {
|
for rowIdx, v := range values {
|
||||||
if bm.isSetBit(rowIdx) {
|
if bm.isSetBit(rowIdx) {
|
||||||
if !hadUpdates || vPrev != v {
|
if needUpdates || vPrev != v {
|
||||||
vPrev = v
|
vPrev = v
|
||||||
hadUpdates = true
|
needUpdates = false
|
||||||
|
|
||||||
ptn.apply(v)
|
ptn.apply(v)
|
||||||
|
|
||||||
|
@ -219,6 +219,7 @@ func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) {
|
||||||
for i, c := range resultColumns {
|
for i, c := range resultColumns {
|
||||||
resultValues[i] = c.getValueAtRow(br, rowIdx)
|
resultValues[i] = c.getValueAtRow(br, rowIdx)
|
||||||
}
|
}
|
||||||
|
needUpdates = true
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, v := range resultValues {
|
for i, v := range resultValues {
|
||||||
|
|
|
@ -215,13 +215,13 @@ func (pep *pipeExtractRegexpProcessor) writeBlock(workerID uint, br *blockResult
|
||||||
shard.resultValues = slicesutil.SetLength(shard.resultValues, len(rcs))
|
shard.resultValues = slicesutil.SetLength(shard.resultValues, len(rcs))
|
||||||
resultValues := shard.resultValues
|
resultValues := shard.resultValues
|
||||||
|
|
||||||
hadUpdates := false
|
needUpdates := true
|
||||||
vPrev := ""
|
vPrev := ""
|
||||||
for rowIdx, v := range values {
|
for rowIdx, v := range values {
|
||||||
if bm.isSetBit(rowIdx) {
|
if bm.isSetBit(rowIdx) {
|
||||||
if !hadUpdates || vPrev != v {
|
if needUpdates || vPrev != v {
|
||||||
vPrev = v
|
vPrev = v
|
||||||
hadUpdates = true
|
needUpdates = false
|
||||||
|
|
||||||
shard.apply(pe.re, v)
|
shard.apply(pe.re, v)
|
||||||
|
|
||||||
|
@ -246,6 +246,7 @@ func (pep *pipeExtractRegexpProcessor) writeBlock(workerID uint, br *blockResult
|
||||||
resultValues[i] = c.getValueAtRow(br, rowIdx)
|
resultValues[i] = c.getValueAtRow(br, rowIdx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
needUpdates = true
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, v := range resultValues {
|
for i, v := range resultValues {
|
||||||
|
|
|
@ -211,12 +211,20 @@ func TestPipeExtract(t *testing.T) {
|
||||||
{"x", `a foo=cc baz=aa b`},
|
{"x", `a foo=cc baz=aa b`},
|
||||||
{"bar", "abc"},
|
{"bar", "abc"},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
{"x", `a foo=cc baz=aa b`},
|
||||||
|
},
|
||||||
}, [][]Field{
|
}, [][]Field{
|
||||||
{
|
{
|
||||||
{"x", `a foo=cc baz=aa b`},
|
{"x", `a foo=cc baz=aa b`},
|
||||||
{"bar", `cc`},
|
{"bar", `cc`},
|
||||||
{"xx", `aa b`},
|
{"xx", `aa b`},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
{"x", `a foo=cc baz=aa b`},
|
||||||
|
{"bar", `cc`},
|
||||||
|
{"xx", `aa b`},
|
||||||
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
// single row, if mismatch
|
// single row, if mismatch
|
||||||
|
|
|
@ -84,14 +84,14 @@ func (pup *pipeUpdateProcessor) writeBlock(workerID uint, br *blockResult) {
|
||||||
c := br.getColumnByName(pup.field)
|
c := br.getColumnByName(pup.field)
|
||||||
values := c.getValues(br)
|
values := c.getValues(br)
|
||||||
|
|
||||||
hadUpdates := false
|
needUpdates := true
|
||||||
vPrev := ""
|
vPrev := ""
|
||||||
vNew := ""
|
vNew := ""
|
||||||
for rowIdx, v := range values {
|
for rowIdx, v := range values {
|
||||||
if bm.isSetBit(rowIdx) {
|
if bm.isSetBit(rowIdx) {
|
||||||
if !hadUpdates || vPrev != v {
|
if needUpdates || vPrev != v {
|
||||||
vPrev = v
|
vPrev = v
|
||||||
hadUpdates = true
|
needUpdates = false
|
||||||
|
|
||||||
vNew = pup.updateFunc(&shard.a, v)
|
vNew = pup.updateFunc(&shard.a, v)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue