diff --git a/deployment/docker/docker-compose-victorialogs.yml b/deployment/docker/docker-compose-victorialogs.yml index 5f0cceb97..284169784 100644 --- a/deployment/docker/docker-compose-victorialogs.yml +++ b/deployment/docker/docker-compose-victorialogs.yml @@ -43,7 +43,7 @@ services: # storing logs and serving read queries. victorialogs: container_name: victorialogs - image: docker.io/victoriametrics/victoria-logs:v0.13.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.14.0-victorialogs command: - "--storageDataPath=/vlogs" - "--httpListenAddr=:9428" diff --git a/deployment/docker/victorialogs/filebeat-docker/docker-compose.yml b/deployment/docker/victorialogs/filebeat-docker/docker-compose.yml index a1f8d45cb..36e9d8ab3 100644 --- a/deployment/docker/victorialogs/filebeat-docker/docker-compose.yml +++ b/deployment/docker/victorialogs/filebeat-docker/docker-compose.yml @@ -22,7 +22,7 @@ services: - -beat.uri=http://filebeat-victorialogs:5066 victorialogs: - image: docker.io/victoriametrics/victoria-logs:v0.13.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.14.0-victorialogs volumes: - victorialogs-filebeat-docker-vl:/vlogs ports: diff --git a/deployment/docker/victorialogs/filebeat-syslog/docker-compose.yml b/deployment/docker/victorialogs/filebeat-syslog/docker-compose.yml index 851de92ff..15169435b 100644 --- a/deployment/docker/victorialogs/filebeat-syslog/docker-compose.yml +++ b/deployment/docker/victorialogs/filebeat-syslog/docker-compose.yml @@ -13,7 +13,7 @@ services: - "5140:5140" victorialogs: - image: docker.io/victoriametrics/victoria-logs:v0.13.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.14.0-victorialogs volumes: - victorialogs-filebeat-syslog-vl:/vlogs ports: diff --git a/deployment/docker/victorialogs/fluentbit-docker/docker-compose.yml b/deployment/docker/victorialogs/fluentbit-docker/docker-compose.yml index f6438fe0f..f2f46f561 100644 --- a/deployment/docker/victorialogs/fluentbit-docker/docker-compose.yml +++ b/deployment/docker/victorialogs/fluentbit-docker/docker-compose.yml @@ -11,7 +11,7 @@ services: - "5140:5140" victorialogs: - image: docker.io/victoriametrics/victoria-logs:v0.13.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.14.0-victorialogs volumes: - victorialogs-fluentbit-vl:/vlogs ports: diff --git a/deployment/docker/victorialogs/logstash/docker-compose.yml b/deployment/docker/victorialogs/logstash/docker-compose.yml index d4d3be206..eb5a966aa 100644 --- a/deployment/docker/victorialogs/logstash/docker-compose.yml +++ b/deployment/docker/victorialogs/logstash/docker-compose.yml @@ -14,7 +14,7 @@ services: - "5140:5140" victorialogs: - image: docker.io/victoriametrics/victoria-logs:v0.13.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.14.0-victorialogs volumes: - victorialogs-logstash-vl:/vlogs ports: diff --git a/deployment/docker/victorialogs/promtail/docker-compose.yml b/deployment/docker/victorialogs/promtail/docker-compose.yml index 455694c0e..909dfba21 100644 --- a/deployment/docker/victorialogs/promtail/docker-compose.yml +++ b/deployment/docker/victorialogs/promtail/docker-compose.yml @@ -12,7 +12,7 @@ services: - "5140:5140" vlogs: - image: docker.io/victoriametrics/victoria-logs:v0.13.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.14.0-victorialogs volumes: - victorialogs-promtail-docker:/vlogs ports: diff --git a/deployment/docker/victorialogs/vector-docker/docker-compose.yml b/deployment/docker/victorialogs/vector-docker/docker-compose.yml index 61253b954..440253d6c 100644 --- a/deployment/docker/victorialogs/vector-docker/docker-compose.yml +++ b/deployment/docker/victorialogs/vector-docker/docker-compose.yml @@ -22,7 +22,7 @@ services: condition: service_healthy victorialogs: - image: docker.io/victoriametrics/victoria-logs:v0.13.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.14.0-victorialogs volumes: - victorialogs-vector-docker-vl:/vlogs ports: diff --git a/deployment/logs-benchmark/docker-compose.yml b/deployment/logs-benchmark/docker-compose.yml index 0ec8d8b63..acb9f048f 100644 --- a/deployment/logs-benchmark/docker-compose.yml +++ b/deployment/logs-benchmark/docker-compose.yml @@ -3,7 +3,7 @@ version: '3' services: # Run `make package-victoria-logs` to build victoria-logs image vlogs: - image: docker.io/victoriametrics/victoria-logs:v0.13.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.14.0-victorialogs volumes: - vlogs:/vlogs ports: diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 30dc2cc32..d1e2f2240 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,14 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +## [v0.14.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.14.0-victorialogs) + +Released at 2024-05-29 + +* FEATURE: allow specifying fields, which must be packed into JSON in [`pack_json` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#pack_json-pipe) via `pack_json fields (field1, ..., fieldN)` syntax. + +* BUGFIX: properly apply `if (...)` filters to calculated results in [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe) when [grouping by fields](https://docs.victoriametrics.com/victorialogs/logsql/#stats-by-fields) is enabled. For example, `_time:5m | stats by (host) count() logs, count() if (error) errors` now properly calculates per-`host` `errors`. + ## [v0.13.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.13.0-victorialogs) Released at 2024-05-28 diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 2687664bd..a0f9c3c73 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1375,14 +1375,48 @@ So the following query is equivalent to the previous one: _time:5m | extract_regexp "(?P([0-9]+[.]){3}[0-9]+)" ``` +Add `keep_original_fields` to the end of `extract_regexp ...` when the original non-empty values of the fields mentioned in the pattern must be preserved +instead of overwriting it with the extracted values. For example, the following query extracts `` only if the original value for `ip` field is missing or is empty: + +```logsql +_time:5m | extract_regexp 'ip=(?P([0-9]+[.]){3}[0-9]+)' keep_original_fields +``` + +By default `extract_regexp` writes empty matching fields to the output, which may overwrite existing values. Add `skip_empty_results` to the end of `extract_regexp ...` +in order to prevent from overwriting the existing values for the corresponding fields with empty values. +For example, the following query preserves the original `ip` field value if `foo` field doesn't contain the matching ip: + +```logsql +_time:5m | extract_regexp 'ip=(?P([0-9]+[.]){3}[0-9]+)' from foo skip_empty_results +``` + Performance tip: it is recommended using [`extract` pipe](#extract-pipe) instead of `extract_regexp` for achieving higher query performance. See also: +- [Conditional `extract_regexp`](#conditional-extract_regexp) - [`extract` pipe](#extract-pipe) - [`replace_regexp` pipe](#replace_regexp-pipe) - [`unpack_json` pipe](#unpack_json-pipe) +#### Conditional extract_regexp + +If some log entries must be skipped from [`extract_regexp` pipe](#extract-pipe), then add `if ()` filter after the `extract` word. +The `` can contain arbitrary [filters](#filters). For example, the following query extracts `ip` +from [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) only +if the input [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) doesn't contain `ip` field or this field is empty: + +```logsql +_time:5m | extract_regexp if (ip:"") "ip=(?P([0-9]+[.]){3}[0-9]+)" +``` + +An alternative approach is to add `keep_original_fields` to the end of `extract_regexp`, in order to keep the original non-empty values for the extracted fields. +For example, the following query is equivalent to the previous one: + +```logsql +_time:5m | extract_regexp "ip=(?P([0-9]+[.]){3}[0-9]+)" keep_original_fields +``` + ### field_names pipe `| field_names` [pipe](#pipes) returns all the names of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) @@ -1639,6 +1673,13 @@ The following query is equivalent to the previous one: _time:5m | pack_json ``` +If only a subset of labels must be packed into JSON, then it must be listed inside `fields (...)` after `pack_json`. For example, the following query builds JSON with `foo` and `bar` fields +only and stores the result in `baz` field: + +```logsql +_time:5m | pack_json fields (foo, bar) as baz +``` + The `pack_json` doesn't modify or delete other labels. If you do not need them, then add [`| fields ...`](#fields-pipe) after the `pack_json` pipe. For example, the following query leaves only the `foo` label with the original log fields packed into JSON: diff --git a/docs/VictoriaLogs/QuickStart.md b/docs/VictoriaLogs/QuickStart.md index 8063e10fe..41dc659f5 100644 --- a/docs/VictoriaLogs/QuickStart.md +++ b/docs/VictoriaLogs/QuickStart.md @@ -34,8 +34,8 @@ Just download archive for the needed Operating system and architecture, unpack i For example, the following commands download VictoriaLogs archive for Linux/amd64, unpack and run it: ```sh -curl -L -O https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v0.13.0-victorialogs/victoria-logs-linux-amd64-v0.13.0-victorialogs.tar.gz -tar xzf victoria-logs-linux-amd64-v0.13.0-victorialogs.tar.gz +curl -L -O https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v0.14.0-victorialogs/victoria-logs-linux-amd64-v0.14.0-victorialogs.tar.gz +tar xzf victoria-logs-linux-amd64-v0.14.0-victorialogs.tar.gz ./victoria-logs-prod ``` @@ -59,7 +59,7 @@ Here is the command to run VictoriaLogs in a Docker container: ```sh docker run --rm -it -p 9428:9428 -v ./victoria-logs-data:/victoria-logs-data \ - docker.io/victoriametrics/victoria-logs:v0.13.0-victorialogs + docker.io/victoriametrics/victoria-logs:v0.14.0-victorialogs ``` See also: diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index 0e8c7bf6a..1f1d312d4 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -128,25 +128,6 @@ func (br *blockResult) initFromFilterAllColumns(brSrc *blockResult, bm *bitmap) } } -// initFromFilterNeededColumns initializes br from brSrc by copying only the given neededColumns for rows identified by set bits at bm. -// -// The br is valid until brSrc or bm is updated. -func (br *blockResult) initFromFilterNeededColumns(brSrc *blockResult, bm *bitmap, neededColumns []string) { - br.reset() - - srcTimestamps := brSrc.timestamps - dstTimestamps := br.timestamps[:0] - bm.forEachSetBitReadonly(func(idx int) { - dstTimestamps = append(dstTimestamps, srcTimestamps[idx]) - }) - br.timestamps = dstTimestamps - - for _, neededColumn := range neededColumns { - cSrc := brSrc.getColumnByName(neededColumn) - br.appendFilteredColumn(brSrc, cSrc, bm) - } -} - // appendFilteredColumn adds cSrc with the given bm filter to br. // // the br is valid until brSrc, cSrc or bm is updated. diff --git a/lib/logstorage/pipe_pack_json.go b/lib/logstorage/pipe_pack_json.go index d320fb6fb..af44c8750 100644 --- a/lib/logstorage/pipe_pack_json.go +++ b/lib/logstorage/pipe_pack_json.go @@ -2,6 +2,7 @@ package logstorage import ( "fmt" + "slices" "unsafe" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -12,10 +13,15 @@ import ( // See https://docs.victoriametrics.com/victorialogs/logsql/#pack_json-pipe type pipePackJSON struct { resultField string + + fields []string } func (pp *pipePackJSON) String() string { s := "pack_json" + if len(pp.fields) > 0 { + s += " fields (" + fieldsToString(pp.fields) + ")" + } if !isMsgFieldName(pp.resultField) { s += " as " + quoteTokenIfNeeded(pp.resultField) } @@ -25,11 +31,19 @@ func (pp *pipePackJSON) String() string { func (pp *pipePackJSON) updateNeededFields(neededFields, unneededFields fieldsSet) { if neededFields.contains("*") { if !unneededFields.contains(pp.resultField) { - unneededFields.reset() + if len(pp.fields) > 0 { + unneededFields.removeFields(pp.fields) + } else { + unneededFields.reset() + } } } else { if neededFields.contains(pp.resultField) { - neededFields.add("*") + if len(pp.fields) > 0 { + neededFields.addFields(pp.fields) + } else { + neededFields.add("*") + } } } } @@ -74,6 +88,8 @@ type pipePackJSONProcessorShardNopad struct { buf []byte fields []Field + + cs []*blockResultColumn } func (ppp *pipePackJSONProcessor) writeBlock(workerID uint, br *blockResult) { @@ -85,7 +101,17 @@ func (ppp *pipePackJSONProcessor) writeBlock(workerID uint, br *blockResult) { shard.rc.name = ppp.pp.resultField - cs := br.getColumns() + cs := shard.cs[:0] + if len(ppp.pp.fields) == 0 { + csAll := br.getColumns() + cs = append(cs, csAll...) + } else { + for _, f := range ppp.pp.fields { + c := br.getColumnByName(f) + cs = append(cs, c) + } + } + shard.cs = cs buf := shard.buf[:0] fields := shard.fields @@ -122,10 +148,25 @@ func parsePackJSON(lex *lexer) (*pipePackJSON, error) { } lex.nextToken() + var fields []string + if lex.isKeyword("fields") { + lex.nextToken() + fs, err := parseFieldNamesInParens(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse fields: %w", err) + } + if slices.Contains(fs, "*") { + fs = nil + } + fields = fs + } + // parse optional 'as ...` part resultField := "_msg" if lex.isKeyword("as") { lex.nextToken() + } + if !lex.isKeyword("|", ")", "") { field, err := parseFieldName(lex) if err != nil { return nil, fmt.Errorf("cannot parse result field for 'pack_json': %w", err) @@ -135,6 +176,7 @@ func parsePackJSON(lex *lexer) (*pipePackJSON, error) { pp := &pipePackJSON{ resultField: resultField, + fields: fields, } return pp, nil diff --git a/lib/logstorage/pipe_pack_json_test.go b/lib/logstorage/pipe_pack_json_test.go index baf137447..b57f150c8 100644 --- a/lib/logstorage/pipe_pack_json_test.go +++ b/lib/logstorage/pipe_pack_json_test.go @@ -12,6 +12,8 @@ func TestParsePipePackJSONSuccess(t *testing.T) { f(`pack_json`) f(`pack_json as x`) + f(`pack_json fields (a, b)`) + f(`pack_json fields (a, b) as x`) } func TestParsePipePackJSONFailure(t *testing.T) { @@ -21,6 +23,7 @@ func TestParsePipePackJSONFailure(t *testing.T) { } f(`pack_json foo bar`) + f(`pack_json fields`) } func TestPipePackJSON(t *testing.T) { @@ -76,6 +79,30 @@ func TestPipePackJSON(t *testing.T) { {"c", "d"}, }, }) + + // pack only the needed fields + f(`pack_json fields (foo, baz) a`, [][]Field{ + { + {"_msg", "x"}, + {"foo", `abc`}, + {"bar", `cde`}, + }, + { + {"a", "b"}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"_msg", `x`}, + {"foo", `abc`}, + {"bar", `cde`}, + {"a", `{"foo":"abc","baz":""}`}, + }, + { + {"a", `{"foo":"","baz":""}`}, + {"c", "d"}, + }, + }) } func TestPipePackJSONUpdateNeededFields(t *testing.T) { diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index ab3852e95..31506bfa3 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -204,10 +204,9 @@ type pipeStatsProcessorShardNopad struct { m map[string]*pipeStatsGroup - // bms, brs and brsBuf are used for applying per-func filters. - bms []bitmap - brs []*blockResult - brsBuf []blockResult + // bms and brTmp are used for applying per-func filters. + bms []bitmap + brTmp blockResult columnValues [][]string keyBuf []byte @@ -225,22 +224,20 @@ func (shard *pipeStatsProcessorShard) init() { shard.m = make(map[string]*pipeStatsGroup) shard.bms = make([]bitmap, funcsLen) - shard.brs = make([]*blockResult, funcsLen) - shard.brsBuf = make([]blockResult, funcsLen) } func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) { shard.init() byFields := shard.ps.byFields - // Apply per-function filters - brs := shard.applyPerFunctionFilters(br) + // Update shard.bms by applying per-function filters + shard.applyPerFunctionFilters(br) // Process stats for the defined functions if len(byFields) == 0 { // Fast path - pass all the rows to a single group with empty key. psg := shard.getPipeStatsGroup(nil) - shard.stateSizeBudget -= psg.updateStatsForAllRows(brs) + shard.stateSizeBudget -= psg.updateStatsForAllRows(shard.bms, br, &shard.brTmp) return } if len(byFields) == 1 { @@ -252,7 +249,7 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) { v := br.getBucketedValue(c.valuesEncoded[0], bf) shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(v)) psg := shard.getPipeStatsGroup(shard.keyBuf) - shard.stateSizeBudget -= psg.updateStatsForAllRows(brs) + shard.stateSizeBudget -= psg.updateStatsForAllRows(shard.bms, br, &shard.brTmp) return } @@ -261,7 +258,7 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) { // Fast path for column with constant values. shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(values[0])) psg := shard.getPipeStatsGroup(shard.keyBuf) - shard.stateSizeBudget -= psg.updateStatsForAllRows(brs) + shard.stateSizeBudget -= psg.updateStatsForAllRows(shard.bms, br, &shard.brTmp) return } @@ -273,7 +270,7 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) { keyBuf = encoding.MarshalBytes(keyBuf[:0], bytesutil.ToUnsafeBytes(values[i])) psg = shard.getPipeStatsGroup(keyBuf) } - shard.stateSizeBudget -= psg.updateStatsForRow(brs, i) + shard.stateSizeBudget -= psg.updateStatsForRow(shard.bms, br, i) } shard.keyBuf = keyBuf return @@ -303,7 +300,7 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) { keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[0])) } psg := shard.getPipeStatsGroup(keyBuf) - shard.stateSizeBudget -= psg.updateStatsForAllRows(brs) + shard.stateSizeBudget -= psg.updateStatsForAllRows(shard.bms, br, &shard.brTmp) shard.keyBuf = keyBuf return } @@ -328,42 +325,23 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) { } psg = shard.getPipeStatsGroup(keyBuf) } - shard.stateSizeBudget -= psg.updateStatsForRow(brs, i) + shard.stateSizeBudget -= psg.updateStatsForRow(shard.bms, br, i) } shard.keyBuf = keyBuf } -func (shard *pipeStatsProcessorShard) applyPerFunctionFilters(brSrc *blockResult) []*blockResult { +func (shard *pipeStatsProcessorShard) applyPerFunctionFilters(br *blockResult) { funcs := shard.ps.funcs - brs := shard.brs for i := range funcs { - iff := funcs[i].iff - if iff == nil { - // Fast path - there are no per-function filters - brs[i] = brSrc - continue - } - bm := &shard.bms[i] - bm.init(len(brSrc.timestamps)) + bm.init(len(br.timestamps)) bm.setBits() - iff.f.applyToBlockResult(brSrc, bm) - if bm.areAllBitsSet() { - // Fast path - per-function filter doesn't filter out rows - brs[i] = brSrc - continue - } - // Store the remaining rows for the needed per-func fields to brDst - brDst := &shard.brsBuf[i] - if bm.isZero() { - brDst.reset() - } else { - brDst.initFromFilterNeededColumns(brSrc, bm, iff.neededFields) + iff := funcs[i].iff + if iff != nil { + iff.f.applyToBlockResult(br, bm) } - brs[i] = brDst } - return brs } func (shard *pipeStatsProcessorShard) getPipeStatsGroup(key []byte) *pipeStatsGroup { @@ -379,7 +357,8 @@ func (shard *pipeStatsProcessorShard) getPipeStatsGroup(key []byte) *pipeStatsGr shard.stateSizeBudget -= stateSize } psg = &pipeStatsGroup{ - sfps: sfps, + funcs: shard.ps.funcs, + sfps: sfps, } shard.m[string(key)] = psg shard.stateSizeBudget -= len(key) + int(unsafe.Sizeof("")+unsafe.Sizeof(psg)+unsafe.Sizeof(sfps[0])*uintptr(len(sfps))) @@ -388,21 +367,30 @@ func (shard *pipeStatsProcessorShard) getPipeStatsGroup(key []byte) *pipeStatsGr } type pipeStatsGroup struct { - sfps []statsProcessor + funcs []pipeStatsFunc + sfps []statsProcessor } -func (psg *pipeStatsGroup) updateStatsForAllRows(brs []*blockResult) int { +func (psg *pipeStatsGroup) updateStatsForAllRows(bms []bitmap, br, brTmp *blockResult) int { n := 0 for i, sfp := range psg.sfps { - n += sfp.updateStatsForAllRows(brs[i]) + iff := psg.funcs[i].iff + if iff == nil { + n += sfp.updateStatsForAllRows(br) + } else { + brTmp.initFromFilterAllColumns(br, &bms[i]) + n += sfp.updateStatsForAllRows(brTmp) + } } return n } -func (psg *pipeStatsGroup) updateStatsForRow(brs []*blockResult, rowIdx int) int { +func (psg *pipeStatsGroup) updateStatsForRow(bms []bitmap, br *blockResult, rowIdx int) int { n := 0 for i, sfp := range psg.sfps { - n += sfp.updateStatsForRow(brs[i], rowIdx) + if bms[i].isSetBit(rowIdx) { + n += sfp.updateStatsForRow(br, rowIdx) + } } return n }