lib/logstorage: work-in-progress

This commit is contained in:
Aliaksandr Valialkin 2024-05-29 01:52:13 +02:00
parent bf33e7eda7
commit 1de187bcb7
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
15 changed files with 164 additions and 77 deletions

View file

@ -43,7 +43,7 @@ services:
# storing logs and serving read queries.
victorialogs:
container_name: victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.13.0-victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.14.0-victorialogs
command:
- "--storageDataPath=/vlogs"
- "--httpListenAddr=:9428"

View file

@ -22,7 +22,7 @@ services:
- -beat.uri=http://filebeat-victorialogs:5066
victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.13.0-victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.14.0-victorialogs
volumes:
- victorialogs-filebeat-docker-vl:/vlogs
ports:

View file

@ -13,7 +13,7 @@ services:
- "5140:5140"
victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.13.0-victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.14.0-victorialogs
volumes:
- victorialogs-filebeat-syslog-vl:/vlogs
ports:

View file

@ -11,7 +11,7 @@ services:
- "5140:5140"
victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.13.0-victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.14.0-victorialogs
volumes:
- victorialogs-fluentbit-vl:/vlogs
ports:

View file

@ -14,7 +14,7 @@ services:
- "5140:5140"
victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.13.0-victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.14.0-victorialogs
volumes:
- victorialogs-logstash-vl:/vlogs
ports:

View file

@ -12,7 +12,7 @@ services:
- "5140:5140"
vlogs:
image: docker.io/victoriametrics/victoria-logs:v0.13.0-victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.14.0-victorialogs
volumes:
- victorialogs-promtail-docker:/vlogs
ports:

View file

@ -22,7 +22,7 @@ services:
condition: service_healthy
victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.13.0-victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.14.0-victorialogs
volumes:
- victorialogs-vector-docker-vl:/vlogs
ports:

View file

@ -3,7 +3,7 @@ version: '3'
services:
# Run `make package-victoria-logs` to build victoria-logs image
vlogs:
image: docker.io/victoriametrics/victoria-logs:v0.13.0-victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.14.0-victorialogs
volumes:
- vlogs:/vlogs
ports:

View file

@ -19,6 +19,14 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip
## [v0.14.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.14.0-victorialogs)
Released at 2024-05-29
* FEATURE: allow specifying fields, which must be packed into JSON in [`pack_json` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#pack_json-pipe) via `pack_json fields (field1, ..., fieldN)` syntax.
* BUGFIX: properly apply `if (...)` filters to calculated results in [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe) when [grouping by fields](https://docs.victoriametrics.com/victorialogs/logsql/#stats-by-fields) is enabled. For example, `_time:5m | stats by (host) count() logs, count() if (error) errors` now properly calculates per-`host` `errors`.
## [v0.13.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.13.0-victorialogs)
Released at 2024-05-28

View file

@ -1375,14 +1375,48 @@ So the following query is equivalent to the previous one:
_time:5m | extract_regexp "(?P<ip>([0-9]+[.]){3}[0-9]+)"
```
Add `keep_original_fields` to the end of `extract_regexp ...` when the original non-empty values of the fields mentioned in the pattern must be preserved
instead of overwriting it with the extracted values. For example, the following query extracts `<ip>` only if the original value for `ip` field is missing or is empty:
```logsql
_time:5m | extract_regexp 'ip=(?P<ip>([0-9]+[.]){3}[0-9]+)' keep_original_fields
```
By default `extract_regexp` writes empty matching fields to the output, which may overwrite existing values. Add `skip_empty_results` to the end of `extract_regexp ...`
in order to prevent from overwriting the existing values for the corresponding fields with empty values.
For example, the following query preserves the original `ip` field value if `foo` field doesn't contain the matching ip:
```logsql
_time:5m | extract_regexp 'ip=(?P<ip>([0-9]+[.]){3}[0-9]+)' from foo skip_empty_results
```
Performance tip: it is recommended using [`extract` pipe](#extract-pipe) instead of `extract_regexp` for achieving higher query performance.
See also:
- [Conditional `extract_regexp`](#conditional-extract_regexp)
- [`extract` pipe](#extract-pipe)
- [`replace_regexp` pipe](#replace_regexp-pipe)
- [`unpack_json` pipe](#unpack_json-pipe)
#### Conditional extract_regexp
If some log entries must be skipped from [`extract_regexp` pipe](#extract-pipe), then add `if (<filters>)` filter after the `extract` word.
The `<filters>` can contain arbitrary [filters](#filters). For example, the following query extracts `ip`
from [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) only
if the input [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) doesn't contain `ip` field or this field is empty:
```logsql
_time:5m | extract_regexp if (ip:"") "ip=(?P<ip>([0-9]+[.]){3}[0-9]+)"
```
An alternative approach is to add `keep_original_fields` to the end of `extract_regexp`, in order to keep the original non-empty values for the extracted fields.
For example, the following query is equivalent to the previous one:
```logsql
_time:5m | extract_regexp "ip=(?P<ip>([0-9]+[.]){3}[0-9]+)" keep_original_fields
```
### field_names pipe
`| field_names` [pipe](#pipes) returns all the names of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
@ -1639,6 +1673,13 @@ The following query is equivalent to the previous one:
_time:5m | pack_json
```
If only a subset of labels must be packed into JSON, then it must be listed inside `fields (...)` after `pack_json`. For example, the following query builds JSON with `foo` and `bar` fields
only and stores the result in `baz` field:
```logsql
_time:5m | pack_json fields (foo, bar) as baz
```
The `pack_json` doesn't modify or delete other labels. If you do not need them, then add [`| fields ...`](#fields-pipe) after the `pack_json` pipe. For example, the following query
leaves only the `foo` label with the original log fields packed into JSON:

View file

@ -34,8 +34,8 @@ Just download archive for the needed Operating system and architecture, unpack i
For example, the following commands download VictoriaLogs archive for Linux/amd64, unpack and run it:
```sh
curl -L -O https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v0.13.0-victorialogs/victoria-logs-linux-amd64-v0.13.0-victorialogs.tar.gz
tar xzf victoria-logs-linux-amd64-v0.13.0-victorialogs.tar.gz
curl -L -O https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v0.14.0-victorialogs/victoria-logs-linux-amd64-v0.14.0-victorialogs.tar.gz
tar xzf victoria-logs-linux-amd64-v0.14.0-victorialogs.tar.gz
./victoria-logs-prod
```
@ -59,7 +59,7 @@ Here is the command to run VictoriaLogs in a Docker container:
```sh
docker run --rm -it -p 9428:9428 -v ./victoria-logs-data:/victoria-logs-data \
docker.io/victoriametrics/victoria-logs:v0.13.0-victorialogs
docker.io/victoriametrics/victoria-logs:v0.14.0-victorialogs
```
See also:

View file

@ -128,25 +128,6 @@ func (br *blockResult) initFromFilterAllColumns(brSrc *blockResult, bm *bitmap)
}
}
// initFromFilterNeededColumns initializes br from brSrc by copying only the given neededColumns for rows identified by set bits at bm.
//
// The br is valid until brSrc or bm is updated.
func (br *blockResult) initFromFilterNeededColumns(brSrc *blockResult, bm *bitmap, neededColumns []string) {
br.reset()
srcTimestamps := brSrc.timestamps
dstTimestamps := br.timestamps[:0]
bm.forEachSetBitReadonly(func(idx int) {
dstTimestamps = append(dstTimestamps, srcTimestamps[idx])
})
br.timestamps = dstTimestamps
for _, neededColumn := range neededColumns {
cSrc := brSrc.getColumnByName(neededColumn)
br.appendFilteredColumn(brSrc, cSrc, bm)
}
}
// appendFilteredColumn adds cSrc with the given bm filter to br.
//
// the br is valid until brSrc, cSrc or bm is updated.

View file

@ -2,6 +2,7 @@ package logstorage
import (
"fmt"
"slices"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@ -12,10 +13,15 @@ import (
// See https://docs.victoriametrics.com/victorialogs/logsql/#pack_json-pipe
type pipePackJSON struct {
resultField string
fields []string
}
func (pp *pipePackJSON) String() string {
s := "pack_json"
if len(pp.fields) > 0 {
s += " fields (" + fieldsToString(pp.fields) + ")"
}
if !isMsgFieldName(pp.resultField) {
s += " as " + quoteTokenIfNeeded(pp.resultField)
}
@ -25,11 +31,19 @@ func (pp *pipePackJSON) String() string {
func (pp *pipePackJSON) updateNeededFields(neededFields, unneededFields fieldsSet) {
if neededFields.contains("*") {
if !unneededFields.contains(pp.resultField) {
unneededFields.reset()
if len(pp.fields) > 0 {
unneededFields.removeFields(pp.fields)
} else {
unneededFields.reset()
}
}
} else {
if neededFields.contains(pp.resultField) {
neededFields.add("*")
if len(pp.fields) > 0 {
neededFields.addFields(pp.fields)
} else {
neededFields.add("*")
}
}
}
}
@ -74,6 +88,8 @@ type pipePackJSONProcessorShardNopad struct {
buf []byte
fields []Field
cs []*blockResultColumn
}
func (ppp *pipePackJSONProcessor) writeBlock(workerID uint, br *blockResult) {
@ -85,7 +101,17 @@ func (ppp *pipePackJSONProcessor) writeBlock(workerID uint, br *blockResult) {
shard.rc.name = ppp.pp.resultField
cs := br.getColumns()
cs := shard.cs[:0]
if len(ppp.pp.fields) == 0 {
csAll := br.getColumns()
cs = append(cs, csAll...)
} else {
for _, f := range ppp.pp.fields {
c := br.getColumnByName(f)
cs = append(cs, c)
}
}
shard.cs = cs
buf := shard.buf[:0]
fields := shard.fields
@ -122,10 +148,25 @@ func parsePackJSON(lex *lexer) (*pipePackJSON, error) {
}
lex.nextToken()
var fields []string
if lex.isKeyword("fields") {
lex.nextToken()
fs, err := parseFieldNamesInParens(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse fields: %w", err)
}
if slices.Contains(fs, "*") {
fs = nil
}
fields = fs
}
// parse optional 'as ...` part
resultField := "_msg"
if lex.isKeyword("as") {
lex.nextToken()
}
if !lex.isKeyword("|", ")", "") {
field, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result field for 'pack_json': %w", err)
@ -135,6 +176,7 @@ func parsePackJSON(lex *lexer) (*pipePackJSON, error) {
pp := &pipePackJSON{
resultField: resultField,
fields: fields,
}
return pp, nil

View file

@ -12,6 +12,8 @@ func TestParsePipePackJSONSuccess(t *testing.T) {
f(`pack_json`)
f(`pack_json as x`)
f(`pack_json fields (a, b)`)
f(`pack_json fields (a, b) as x`)
}
func TestParsePipePackJSONFailure(t *testing.T) {
@ -21,6 +23,7 @@ func TestParsePipePackJSONFailure(t *testing.T) {
}
f(`pack_json foo bar`)
f(`pack_json fields`)
}
func TestPipePackJSON(t *testing.T) {
@ -76,6 +79,30 @@ func TestPipePackJSON(t *testing.T) {
{"c", "d"},
},
})
// pack only the needed fields
f(`pack_json fields (foo, baz) a`, [][]Field{
{
{"_msg", "x"},
{"foo", `abc`},
{"bar", `cde`},
},
{
{"a", "b"},
{"c", "d"},
},
}, [][]Field{
{
{"_msg", `x`},
{"foo", `abc`},
{"bar", `cde`},
{"a", `{"foo":"abc","baz":""}`},
},
{
{"a", `{"foo":"","baz":""}`},
{"c", "d"},
},
})
}
func TestPipePackJSONUpdateNeededFields(t *testing.T) {

View file

@ -204,10 +204,9 @@ type pipeStatsProcessorShardNopad struct {
m map[string]*pipeStatsGroup
// bms, brs and brsBuf are used for applying per-func filters.
bms []bitmap
brs []*blockResult
brsBuf []blockResult
// bms and brTmp are used for applying per-func filters.
bms []bitmap
brTmp blockResult
columnValues [][]string
keyBuf []byte
@ -225,22 +224,20 @@ func (shard *pipeStatsProcessorShard) init() {
shard.m = make(map[string]*pipeStatsGroup)
shard.bms = make([]bitmap, funcsLen)
shard.brs = make([]*blockResult, funcsLen)
shard.brsBuf = make([]blockResult, funcsLen)
}
func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) {
shard.init()
byFields := shard.ps.byFields
// Apply per-function filters
brs := shard.applyPerFunctionFilters(br)
// Update shard.bms by applying per-function filters
shard.applyPerFunctionFilters(br)
// Process stats for the defined functions
if len(byFields) == 0 {
// Fast path - pass all the rows to a single group with empty key.
psg := shard.getPipeStatsGroup(nil)
shard.stateSizeBudget -= psg.updateStatsForAllRows(brs)
shard.stateSizeBudget -= psg.updateStatsForAllRows(shard.bms, br, &shard.brTmp)
return
}
if len(byFields) == 1 {
@ -252,7 +249,7 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) {
v := br.getBucketedValue(c.valuesEncoded[0], bf)
shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(v))
psg := shard.getPipeStatsGroup(shard.keyBuf)
shard.stateSizeBudget -= psg.updateStatsForAllRows(brs)
shard.stateSizeBudget -= psg.updateStatsForAllRows(shard.bms, br, &shard.brTmp)
return
}
@ -261,7 +258,7 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) {
// Fast path for column with constant values.
shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(values[0]))
psg := shard.getPipeStatsGroup(shard.keyBuf)
shard.stateSizeBudget -= psg.updateStatsForAllRows(brs)
shard.stateSizeBudget -= psg.updateStatsForAllRows(shard.bms, br, &shard.brTmp)
return
}
@ -273,7 +270,7 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) {
keyBuf = encoding.MarshalBytes(keyBuf[:0], bytesutil.ToUnsafeBytes(values[i]))
psg = shard.getPipeStatsGroup(keyBuf)
}
shard.stateSizeBudget -= psg.updateStatsForRow(brs, i)
shard.stateSizeBudget -= psg.updateStatsForRow(shard.bms, br, i)
}
shard.keyBuf = keyBuf
return
@ -303,7 +300,7 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) {
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[0]))
}
psg := shard.getPipeStatsGroup(keyBuf)
shard.stateSizeBudget -= psg.updateStatsForAllRows(brs)
shard.stateSizeBudget -= psg.updateStatsForAllRows(shard.bms, br, &shard.brTmp)
shard.keyBuf = keyBuf
return
}
@ -328,42 +325,23 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) {
}
psg = shard.getPipeStatsGroup(keyBuf)
}
shard.stateSizeBudget -= psg.updateStatsForRow(brs, i)
shard.stateSizeBudget -= psg.updateStatsForRow(shard.bms, br, i)
}
shard.keyBuf = keyBuf
}
func (shard *pipeStatsProcessorShard) applyPerFunctionFilters(brSrc *blockResult) []*blockResult {
func (shard *pipeStatsProcessorShard) applyPerFunctionFilters(br *blockResult) {
funcs := shard.ps.funcs
brs := shard.brs
for i := range funcs {
iff := funcs[i].iff
if iff == nil {
// Fast path - there are no per-function filters
brs[i] = brSrc
continue
}
bm := &shard.bms[i]
bm.init(len(brSrc.timestamps))
bm.init(len(br.timestamps))
bm.setBits()
iff.f.applyToBlockResult(brSrc, bm)
if bm.areAllBitsSet() {
// Fast path - per-function filter doesn't filter out rows
brs[i] = brSrc
continue
}
// Store the remaining rows for the needed per-func fields to brDst
brDst := &shard.brsBuf[i]
if bm.isZero() {
brDst.reset()
} else {
brDst.initFromFilterNeededColumns(brSrc, bm, iff.neededFields)
iff := funcs[i].iff
if iff != nil {
iff.f.applyToBlockResult(br, bm)
}
brs[i] = brDst
}
return brs
}
func (shard *pipeStatsProcessorShard) getPipeStatsGroup(key []byte) *pipeStatsGroup {
@ -379,7 +357,8 @@ func (shard *pipeStatsProcessorShard) getPipeStatsGroup(key []byte) *pipeStatsGr
shard.stateSizeBudget -= stateSize
}
psg = &pipeStatsGroup{
sfps: sfps,
funcs: shard.ps.funcs,
sfps: sfps,
}
shard.m[string(key)] = psg
shard.stateSizeBudget -= len(key) + int(unsafe.Sizeof("")+unsafe.Sizeof(psg)+unsafe.Sizeof(sfps[0])*uintptr(len(sfps)))
@ -388,21 +367,30 @@ func (shard *pipeStatsProcessorShard) getPipeStatsGroup(key []byte) *pipeStatsGr
}
type pipeStatsGroup struct {
sfps []statsProcessor
funcs []pipeStatsFunc
sfps []statsProcessor
}
func (psg *pipeStatsGroup) updateStatsForAllRows(brs []*blockResult) int {
func (psg *pipeStatsGroup) updateStatsForAllRows(bms []bitmap, br, brTmp *blockResult) int {
n := 0
for i, sfp := range psg.sfps {
n += sfp.updateStatsForAllRows(brs[i])
iff := psg.funcs[i].iff
if iff == nil {
n += sfp.updateStatsForAllRows(br)
} else {
brTmp.initFromFilterAllColumns(br, &bms[i])
n += sfp.updateStatsForAllRows(brTmp)
}
}
return n
}
func (psg *pipeStatsGroup) updateStatsForRow(brs []*blockResult, rowIdx int) int {
func (psg *pipeStatsGroup) updateStatsForRow(bms []bitmap, br *blockResult, rowIdx int) int {
n := 0
for i, sfp := range psg.sfps {
n += sfp.updateStatsForRow(brs[i], rowIdx)
if bms[i].isSetBit(rowIdx) {
n += sfp.updateStatsForRow(br, rowIdx)
}
}
return n
}