diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 2cbfc8bde..d8652c5de 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -21,6 +21,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta * FEATURE: allow specifying fields, which must be packed into JSON in [`pack_json` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#pack_json-pipe) via `pack_json fields (field1, ..., fieldN)` syntax. +* BUGFIX: properly apply `if (...)` filters to calculated results in [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe) when [grouping by fields](https://docs.victoriametrics.com/victorialogs/logsql/#stats-by-fields) is enabled. For example, `_time:5m | stats by (host) count() logs, count() if (error) errors` now properly calculates per-`host` `errors`. + ## [v0.13.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.13.0-victorialogs) Released at 2024-05-28 diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index b0702d671..a0f9c3c73 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1375,14 +1375,48 @@ So the following query is equivalent to the previous one: _time:5m | extract_regexp "(?P([0-9]+[.]){3}[0-9]+)" ``` +Add `keep_original_fields` to the end of `extract_regexp ...` when the original non-empty values of the fields mentioned in the pattern must be preserved +instead of overwriting it with the extracted values. For example, the following query extracts `` only if the original value for `ip` field is missing or is empty: + +```logsql +_time:5m | extract_regexp 'ip=(?P([0-9]+[.]){3}[0-9]+)' keep_original_fields +``` + +By default `extract_regexp` writes empty matching fields to the output, which may overwrite existing values. Add `skip_empty_results` to the end of `extract_regexp ...` +in order to prevent from overwriting the existing values for the corresponding fields with empty values. +For example, the following query preserves the original `ip` field value if `foo` field doesn't contain the matching ip: + +```logsql +_time:5m | extract_regexp 'ip=(?P([0-9]+[.]){3}[0-9]+)' from foo skip_empty_results +``` + Performance tip: it is recommended using [`extract` pipe](#extract-pipe) instead of `extract_regexp` for achieving higher query performance. See also: +- [Conditional `extract_regexp`](#conditional-extract_regexp) - [`extract` pipe](#extract-pipe) - [`replace_regexp` pipe](#replace_regexp-pipe) - [`unpack_json` pipe](#unpack_json-pipe) +#### Conditional extract_regexp + +If some log entries must be skipped from [`extract_regexp` pipe](#extract-pipe), then add `if ()` filter after the `extract` word. +The `` can contain arbitrary [filters](#filters). For example, the following query extracts `ip` +from [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) only +if the input [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) doesn't contain `ip` field or this field is empty: + +```logsql +_time:5m | extract_regexp if (ip:"") "ip=(?P([0-9]+[.]){3}[0-9]+)" +``` + +An alternative approach is to add `keep_original_fields` to the end of `extract_regexp`, in order to keep the original non-empty values for the extracted fields. +For example, the following query is equivalent to the previous one: + +```logsql +_time:5m | extract_regexp "ip=(?P([0-9]+[.]){3}[0-9]+)" keep_original_fields +``` + ### field_names pipe `| field_names` [pipe](#pipes) returns all the names of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index 0e8c7bf6a..1f1d312d4 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -128,25 +128,6 @@ func (br *blockResult) initFromFilterAllColumns(brSrc *blockResult, bm *bitmap) } } -// initFromFilterNeededColumns initializes br from brSrc by copying only the given neededColumns for rows identified by set bits at bm. -// -// The br is valid until brSrc or bm is updated. -func (br *blockResult) initFromFilterNeededColumns(brSrc *blockResult, bm *bitmap, neededColumns []string) { - br.reset() - - srcTimestamps := brSrc.timestamps - dstTimestamps := br.timestamps[:0] - bm.forEachSetBitReadonly(func(idx int) { - dstTimestamps = append(dstTimestamps, srcTimestamps[idx]) - }) - br.timestamps = dstTimestamps - - for _, neededColumn := range neededColumns { - cSrc := brSrc.getColumnByName(neededColumn) - br.appendFilteredColumn(brSrc, cSrc, bm) - } -} - // appendFilteredColumn adds cSrc with the given bm filter to br. // // the br is valid until brSrc, cSrc or bm is updated. diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index ab3852e95..31506bfa3 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -204,10 +204,9 @@ type pipeStatsProcessorShardNopad struct { m map[string]*pipeStatsGroup - // bms, brs and brsBuf are used for applying per-func filters. - bms []bitmap - brs []*blockResult - brsBuf []blockResult + // bms and brTmp are used for applying per-func filters. + bms []bitmap + brTmp blockResult columnValues [][]string keyBuf []byte @@ -225,22 +224,20 @@ func (shard *pipeStatsProcessorShard) init() { shard.m = make(map[string]*pipeStatsGroup) shard.bms = make([]bitmap, funcsLen) - shard.brs = make([]*blockResult, funcsLen) - shard.brsBuf = make([]blockResult, funcsLen) } func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) { shard.init() byFields := shard.ps.byFields - // Apply per-function filters - brs := shard.applyPerFunctionFilters(br) + // Update shard.bms by applying per-function filters + shard.applyPerFunctionFilters(br) // Process stats for the defined functions if len(byFields) == 0 { // Fast path - pass all the rows to a single group with empty key. psg := shard.getPipeStatsGroup(nil) - shard.stateSizeBudget -= psg.updateStatsForAllRows(brs) + shard.stateSizeBudget -= psg.updateStatsForAllRows(shard.bms, br, &shard.brTmp) return } if len(byFields) == 1 { @@ -252,7 +249,7 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) { v := br.getBucketedValue(c.valuesEncoded[0], bf) shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(v)) psg := shard.getPipeStatsGroup(shard.keyBuf) - shard.stateSizeBudget -= psg.updateStatsForAllRows(brs) + shard.stateSizeBudget -= psg.updateStatsForAllRows(shard.bms, br, &shard.brTmp) return } @@ -261,7 +258,7 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) { // Fast path for column with constant values. shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(values[0])) psg := shard.getPipeStatsGroup(shard.keyBuf) - shard.stateSizeBudget -= psg.updateStatsForAllRows(brs) + shard.stateSizeBudget -= psg.updateStatsForAllRows(shard.bms, br, &shard.brTmp) return } @@ -273,7 +270,7 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) { keyBuf = encoding.MarshalBytes(keyBuf[:0], bytesutil.ToUnsafeBytes(values[i])) psg = shard.getPipeStatsGroup(keyBuf) } - shard.stateSizeBudget -= psg.updateStatsForRow(brs, i) + shard.stateSizeBudget -= psg.updateStatsForRow(shard.bms, br, i) } shard.keyBuf = keyBuf return @@ -303,7 +300,7 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) { keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[0])) } psg := shard.getPipeStatsGroup(keyBuf) - shard.stateSizeBudget -= psg.updateStatsForAllRows(brs) + shard.stateSizeBudget -= psg.updateStatsForAllRows(shard.bms, br, &shard.brTmp) shard.keyBuf = keyBuf return } @@ -328,42 +325,23 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) { } psg = shard.getPipeStatsGroup(keyBuf) } - shard.stateSizeBudget -= psg.updateStatsForRow(brs, i) + shard.stateSizeBudget -= psg.updateStatsForRow(shard.bms, br, i) } shard.keyBuf = keyBuf } -func (shard *pipeStatsProcessorShard) applyPerFunctionFilters(brSrc *blockResult) []*blockResult { +func (shard *pipeStatsProcessorShard) applyPerFunctionFilters(br *blockResult) { funcs := shard.ps.funcs - brs := shard.brs for i := range funcs { - iff := funcs[i].iff - if iff == nil { - // Fast path - there are no per-function filters - brs[i] = brSrc - continue - } - bm := &shard.bms[i] - bm.init(len(brSrc.timestamps)) + bm.init(len(br.timestamps)) bm.setBits() - iff.f.applyToBlockResult(brSrc, bm) - if bm.areAllBitsSet() { - // Fast path - per-function filter doesn't filter out rows - brs[i] = brSrc - continue - } - // Store the remaining rows for the needed per-func fields to brDst - brDst := &shard.brsBuf[i] - if bm.isZero() { - brDst.reset() - } else { - brDst.initFromFilterNeededColumns(brSrc, bm, iff.neededFields) + iff := funcs[i].iff + if iff != nil { + iff.f.applyToBlockResult(br, bm) } - brs[i] = brDst } - return brs } func (shard *pipeStatsProcessorShard) getPipeStatsGroup(key []byte) *pipeStatsGroup { @@ -379,7 +357,8 @@ func (shard *pipeStatsProcessorShard) getPipeStatsGroup(key []byte) *pipeStatsGr shard.stateSizeBudget -= stateSize } psg = &pipeStatsGroup{ - sfps: sfps, + funcs: shard.ps.funcs, + sfps: sfps, } shard.m[string(key)] = psg shard.stateSizeBudget -= len(key) + int(unsafe.Sizeof("")+unsafe.Sizeof(psg)+unsafe.Sizeof(sfps[0])*uintptr(len(sfps))) @@ -388,21 +367,30 @@ func (shard *pipeStatsProcessorShard) getPipeStatsGroup(key []byte) *pipeStatsGr } type pipeStatsGroup struct { - sfps []statsProcessor + funcs []pipeStatsFunc + sfps []statsProcessor } -func (psg *pipeStatsGroup) updateStatsForAllRows(brs []*blockResult) int { +func (psg *pipeStatsGroup) updateStatsForAllRows(bms []bitmap, br, brTmp *blockResult) int { n := 0 for i, sfp := range psg.sfps { - n += sfp.updateStatsForAllRows(brs[i]) + iff := psg.funcs[i].iff + if iff == nil { + n += sfp.updateStatsForAllRows(br) + } else { + brTmp.initFromFilterAllColumns(br, &bms[i]) + n += sfp.updateStatsForAllRows(brTmp) + } } return n } -func (psg *pipeStatsGroup) updateStatsForRow(brs []*blockResult, rowIdx int) int { +func (psg *pipeStatsGroup) updateStatsForRow(bms []bitmap, br *blockResult, rowIdx int) int { n := 0 for i, sfp := range psg.sfps { - n += sfp.updateStatsForRow(brs[i], rowIdx) + if bms[i].isSetBit(rowIdx) { + n += sfp.updateStatsForRow(br, rowIdx) + } } return n }