mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-31 15:06:26 +00:00
wip
This commit is contained in:
parent
7865f52c4a
commit
29ebcb9d4c
4 changed files with 68 additions and 63 deletions
|
@ -21,6 +21,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
|
||||||
|
|
||||||
* FEATURE: allow specifying fields, which must be packed into JSON in [`pack_json` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#pack_json-pipe) via `pack_json fields (field1, ..., fieldN)` syntax.
|
* FEATURE: allow specifying fields, which must be packed into JSON in [`pack_json` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#pack_json-pipe) via `pack_json fields (field1, ..., fieldN)` syntax.
|
||||||
|
|
||||||
|
* BUGFIX: properly apply `if (...)` filters to calculated results in [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe) when [grouping by fields](https://docs.victoriametrics.com/victorialogs/logsql/#stats-by-fields) is enabled. For example, `_time:5m | stats by (host) count() logs, count() if (error) errors` now properly calculates per-`host` `errors`.
|
||||||
|
|
||||||
## [v0.13.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.13.0-victorialogs)
|
## [v0.13.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.13.0-victorialogs)
|
||||||
|
|
||||||
Released at 2024-05-28
|
Released at 2024-05-28
|
||||||
|
|
|
@ -1375,14 +1375,48 @@ So the following query is equivalent to the previous one:
|
||||||
_time:5m | extract_regexp "(?P<ip>([0-9]+[.]){3}[0-9]+)"
|
_time:5m | extract_regexp "(?P<ip>([0-9]+[.]){3}[0-9]+)"
|
||||||
```
|
```
|
||||||
|
|
||||||
|
Add `keep_original_fields` to the end of `extract_regexp ...` when the original non-empty values of the fields mentioned in the pattern must be preserved
|
||||||
|
instead of overwriting it with the extracted values. For example, the following query extracts `<ip>` only if the original value for `ip` field is missing or is empty:
|
||||||
|
|
||||||
|
```logsql
|
||||||
|
_time:5m | extract_regexp 'ip=(?P<ip>([0-9]+[.]){3}[0-9]+)' keep_original_fields
|
||||||
|
```
|
||||||
|
|
||||||
|
By default `extract_regexp` writes empty matching fields to the output, which may overwrite existing values. Add `skip_empty_results` to the end of `extract_regexp ...`
|
||||||
|
in order to prevent from overwriting the existing values for the corresponding fields with empty values.
|
||||||
|
For example, the following query preserves the original `ip` field value if `foo` field doesn't contain the matching ip:
|
||||||
|
|
||||||
|
```logsql
|
||||||
|
_time:5m | extract_regexp 'ip=(?P<ip>([0-9]+[.]){3}[0-9]+)' from foo skip_empty_results
|
||||||
|
```
|
||||||
|
|
||||||
Performance tip: it is recommended using [`extract` pipe](#extract-pipe) instead of `extract_regexp` for achieving higher query performance.
|
Performance tip: it is recommended using [`extract` pipe](#extract-pipe) instead of `extract_regexp` for achieving higher query performance.
|
||||||
|
|
||||||
See also:
|
See also:
|
||||||
|
|
||||||
|
- [Conditional `extract_regexp`](#conditional-extract_regexp)
|
||||||
- [`extract` pipe](#extract-pipe)
|
- [`extract` pipe](#extract-pipe)
|
||||||
- [`replace_regexp` pipe](#replace_regexp-pipe)
|
- [`replace_regexp` pipe](#replace_regexp-pipe)
|
||||||
- [`unpack_json` pipe](#unpack_json-pipe)
|
- [`unpack_json` pipe](#unpack_json-pipe)
|
||||||
|
|
||||||
|
#### Conditional extract_regexp
|
||||||
|
|
||||||
|
If some log entries must be skipped from [`extract_regexp` pipe](#extract-pipe), then add `if (<filters>)` filter after the `extract` word.
|
||||||
|
The `<filters>` can contain arbitrary [filters](#filters). For example, the following query extracts `ip`
|
||||||
|
from [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) only
|
||||||
|
if the input [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) doesn't contain `ip` field or this field is empty:
|
||||||
|
|
||||||
|
```logsql
|
||||||
|
_time:5m | extract_regexp if (ip:"") "ip=(?P<ip>([0-9]+[.]){3}[0-9]+)"
|
||||||
|
```
|
||||||
|
|
||||||
|
An alternative approach is to add `keep_original_fields` to the end of `extract_regexp`, in order to keep the original non-empty values for the extracted fields.
|
||||||
|
For example, the following query is equivalent to the previous one:
|
||||||
|
|
||||||
|
```logsql
|
||||||
|
_time:5m | extract_regexp "ip=(?P<ip>([0-9]+[.]){3}[0-9]+)" keep_original_fields
|
||||||
|
```
|
||||||
|
|
||||||
### field_names pipe
|
### field_names pipe
|
||||||
|
|
||||||
`| field_names` [pipe](#pipes) returns all the names of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
|
`| field_names` [pipe](#pipes) returns all the names of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
|
||||||
|
|
|
@ -128,25 +128,6 @@ func (br *blockResult) initFromFilterAllColumns(brSrc *blockResult, bm *bitmap)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// initFromFilterNeededColumns initializes br from brSrc by copying only the given neededColumns for rows identified by set bits at bm.
|
|
||||||
//
|
|
||||||
// The br is valid until brSrc or bm is updated.
|
|
||||||
func (br *blockResult) initFromFilterNeededColumns(brSrc *blockResult, bm *bitmap, neededColumns []string) {
|
|
||||||
br.reset()
|
|
||||||
|
|
||||||
srcTimestamps := brSrc.timestamps
|
|
||||||
dstTimestamps := br.timestamps[:0]
|
|
||||||
bm.forEachSetBitReadonly(func(idx int) {
|
|
||||||
dstTimestamps = append(dstTimestamps, srcTimestamps[idx])
|
|
||||||
})
|
|
||||||
br.timestamps = dstTimestamps
|
|
||||||
|
|
||||||
for _, neededColumn := range neededColumns {
|
|
||||||
cSrc := brSrc.getColumnByName(neededColumn)
|
|
||||||
br.appendFilteredColumn(brSrc, cSrc, bm)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// appendFilteredColumn adds cSrc with the given bm filter to br.
|
// appendFilteredColumn adds cSrc with the given bm filter to br.
|
||||||
//
|
//
|
||||||
// the br is valid until brSrc, cSrc or bm is updated.
|
// the br is valid until brSrc, cSrc or bm is updated.
|
||||||
|
|
|
@ -204,10 +204,9 @@ type pipeStatsProcessorShardNopad struct {
|
||||||
|
|
||||||
m map[string]*pipeStatsGroup
|
m map[string]*pipeStatsGroup
|
||||||
|
|
||||||
// bms, brs and brsBuf are used for applying per-func filters.
|
// bms and brTmp are used for applying per-func filters.
|
||||||
bms []bitmap
|
bms []bitmap
|
||||||
brs []*blockResult
|
brTmp blockResult
|
||||||
brsBuf []blockResult
|
|
||||||
|
|
||||||
columnValues [][]string
|
columnValues [][]string
|
||||||
keyBuf []byte
|
keyBuf []byte
|
||||||
|
@ -225,22 +224,20 @@ func (shard *pipeStatsProcessorShard) init() {
|
||||||
|
|
||||||
shard.m = make(map[string]*pipeStatsGroup)
|
shard.m = make(map[string]*pipeStatsGroup)
|
||||||
shard.bms = make([]bitmap, funcsLen)
|
shard.bms = make([]bitmap, funcsLen)
|
||||||
shard.brs = make([]*blockResult, funcsLen)
|
|
||||||
shard.brsBuf = make([]blockResult, funcsLen)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) {
|
func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) {
|
||||||
shard.init()
|
shard.init()
|
||||||
byFields := shard.ps.byFields
|
byFields := shard.ps.byFields
|
||||||
|
|
||||||
// Apply per-function filters
|
// Update shard.bms by applying per-function filters
|
||||||
brs := shard.applyPerFunctionFilters(br)
|
shard.applyPerFunctionFilters(br)
|
||||||
|
|
||||||
// Process stats for the defined functions
|
// Process stats for the defined functions
|
||||||
if len(byFields) == 0 {
|
if len(byFields) == 0 {
|
||||||
// Fast path - pass all the rows to a single group with empty key.
|
// Fast path - pass all the rows to a single group with empty key.
|
||||||
psg := shard.getPipeStatsGroup(nil)
|
psg := shard.getPipeStatsGroup(nil)
|
||||||
shard.stateSizeBudget -= psg.updateStatsForAllRows(brs)
|
shard.stateSizeBudget -= psg.updateStatsForAllRows(shard.bms, br, &shard.brTmp)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if len(byFields) == 1 {
|
if len(byFields) == 1 {
|
||||||
|
@ -252,7 +249,7 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) {
|
||||||
v := br.getBucketedValue(c.valuesEncoded[0], bf)
|
v := br.getBucketedValue(c.valuesEncoded[0], bf)
|
||||||
shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(v))
|
shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(v))
|
||||||
psg := shard.getPipeStatsGroup(shard.keyBuf)
|
psg := shard.getPipeStatsGroup(shard.keyBuf)
|
||||||
shard.stateSizeBudget -= psg.updateStatsForAllRows(brs)
|
shard.stateSizeBudget -= psg.updateStatsForAllRows(shard.bms, br, &shard.brTmp)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -261,7 +258,7 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) {
|
||||||
// Fast path for column with constant values.
|
// Fast path for column with constant values.
|
||||||
shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(values[0]))
|
shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(values[0]))
|
||||||
psg := shard.getPipeStatsGroup(shard.keyBuf)
|
psg := shard.getPipeStatsGroup(shard.keyBuf)
|
||||||
shard.stateSizeBudget -= psg.updateStatsForAllRows(brs)
|
shard.stateSizeBudget -= psg.updateStatsForAllRows(shard.bms, br, &shard.brTmp)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -273,7 +270,7 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) {
|
||||||
keyBuf = encoding.MarshalBytes(keyBuf[:0], bytesutil.ToUnsafeBytes(values[i]))
|
keyBuf = encoding.MarshalBytes(keyBuf[:0], bytesutil.ToUnsafeBytes(values[i]))
|
||||||
psg = shard.getPipeStatsGroup(keyBuf)
|
psg = shard.getPipeStatsGroup(keyBuf)
|
||||||
}
|
}
|
||||||
shard.stateSizeBudget -= psg.updateStatsForRow(brs, i)
|
shard.stateSizeBudget -= psg.updateStatsForRow(shard.bms, br, i)
|
||||||
}
|
}
|
||||||
shard.keyBuf = keyBuf
|
shard.keyBuf = keyBuf
|
||||||
return
|
return
|
||||||
|
@ -303,7 +300,7 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) {
|
||||||
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[0]))
|
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[0]))
|
||||||
}
|
}
|
||||||
psg := shard.getPipeStatsGroup(keyBuf)
|
psg := shard.getPipeStatsGroup(keyBuf)
|
||||||
shard.stateSizeBudget -= psg.updateStatsForAllRows(brs)
|
shard.stateSizeBudget -= psg.updateStatsForAllRows(shard.bms, br, &shard.brTmp)
|
||||||
shard.keyBuf = keyBuf
|
shard.keyBuf = keyBuf
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -328,42 +325,23 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) {
|
||||||
}
|
}
|
||||||
psg = shard.getPipeStatsGroup(keyBuf)
|
psg = shard.getPipeStatsGroup(keyBuf)
|
||||||
}
|
}
|
||||||
shard.stateSizeBudget -= psg.updateStatsForRow(brs, i)
|
shard.stateSizeBudget -= psg.updateStatsForRow(shard.bms, br, i)
|
||||||
}
|
}
|
||||||
shard.keyBuf = keyBuf
|
shard.keyBuf = keyBuf
|
||||||
}
|
}
|
||||||
|
|
||||||
func (shard *pipeStatsProcessorShard) applyPerFunctionFilters(brSrc *blockResult) []*blockResult {
|
func (shard *pipeStatsProcessorShard) applyPerFunctionFilters(br *blockResult) {
|
||||||
funcs := shard.ps.funcs
|
funcs := shard.ps.funcs
|
||||||
brs := shard.brs
|
|
||||||
for i := range funcs {
|
for i := range funcs {
|
||||||
iff := funcs[i].iff
|
|
||||||
if iff == nil {
|
|
||||||
// Fast path - there are no per-function filters
|
|
||||||
brs[i] = brSrc
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
bm := &shard.bms[i]
|
bm := &shard.bms[i]
|
||||||
bm.init(len(brSrc.timestamps))
|
bm.init(len(br.timestamps))
|
||||||
bm.setBits()
|
bm.setBits()
|
||||||
iff.f.applyToBlockResult(brSrc, bm)
|
|
||||||
if bm.areAllBitsSet() {
|
|
||||||
// Fast path - per-function filter doesn't filter out rows
|
|
||||||
brs[i] = brSrc
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Store the remaining rows for the needed per-func fields to brDst
|
iff := funcs[i].iff
|
||||||
brDst := &shard.brsBuf[i]
|
if iff != nil {
|
||||||
if bm.isZero() {
|
iff.f.applyToBlockResult(br, bm)
|
||||||
brDst.reset()
|
|
||||||
} else {
|
|
||||||
brDst.initFromFilterNeededColumns(brSrc, bm, iff.neededFields)
|
|
||||||
}
|
}
|
||||||
brs[i] = brDst
|
|
||||||
}
|
}
|
||||||
return brs
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (shard *pipeStatsProcessorShard) getPipeStatsGroup(key []byte) *pipeStatsGroup {
|
func (shard *pipeStatsProcessorShard) getPipeStatsGroup(key []byte) *pipeStatsGroup {
|
||||||
|
@ -379,7 +357,8 @@ func (shard *pipeStatsProcessorShard) getPipeStatsGroup(key []byte) *pipeStatsGr
|
||||||
shard.stateSizeBudget -= stateSize
|
shard.stateSizeBudget -= stateSize
|
||||||
}
|
}
|
||||||
psg = &pipeStatsGroup{
|
psg = &pipeStatsGroup{
|
||||||
sfps: sfps,
|
funcs: shard.ps.funcs,
|
||||||
|
sfps: sfps,
|
||||||
}
|
}
|
||||||
shard.m[string(key)] = psg
|
shard.m[string(key)] = psg
|
||||||
shard.stateSizeBudget -= len(key) + int(unsafe.Sizeof("")+unsafe.Sizeof(psg)+unsafe.Sizeof(sfps[0])*uintptr(len(sfps)))
|
shard.stateSizeBudget -= len(key) + int(unsafe.Sizeof("")+unsafe.Sizeof(psg)+unsafe.Sizeof(sfps[0])*uintptr(len(sfps)))
|
||||||
|
@ -388,21 +367,30 @@ func (shard *pipeStatsProcessorShard) getPipeStatsGroup(key []byte) *pipeStatsGr
|
||||||
}
|
}
|
||||||
|
|
||||||
type pipeStatsGroup struct {
|
type pipeStatsGroup struct {
|
||||||
sfps []statsProcessor
|
funcs []pipeStatsFunc
|
||||||
|
sfps []statsProcessor
|
||||||
}
|
}
|
||||||
|
|
||||||
func (psg *pipeStatsGroup) updateStatsForAllRows(brs []*blockResult) int {
|
func (psg *pipeStatsGroup) updateStatsForAllRows(bms []bitmap, br, brTmp *blockResult) int {
|
||||||
n := 0
|
n := 0
|
||||||
for i, sfp := range psg.sfps {
|
for i, sfp := range psg.sfps {
|
||||||
n += sfp.updateStatsForAllRows(brs[i])
|
iff := psg.funcs[i].iff
|
||||||
|
if iff == nil {
|
||||||
|
n += sfp.updateStatsForAllRows(br)
|
||||||
|
} else {
|
||||||
|
brTmp.initFromFilterAllColumns(br, &bms[i])
|
||||||
|
n += sfp.updateStatsForAllRows(brTmp)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return n
|
return n
|
||||||
}
|
}
|
||||||
|
|
||||||
func (psg *pipeStatsGroup) updateStatsForRow(brs []*blockResult, rowIdx int) int {
|
func (psg *pipeStatsGroup) updateStatsForRow(bms []bitmap, br *blockResult, rowIdx int) int {
|
||||||
n := 0
|
n := 0
|
||||||
for i, sfp := range psg.sfps {
|
for i, sfp := range psg.sfps {
|
||||||
n += sfp.updateStatsForRow(brs[i], rowIdx)
|
if bms[i].isSetBit(rowIdx) {
|
||||||
|
n += sfp.updateStatsForRow(br, rowIdx)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return n
|
return n
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue