This commit is contained in:
Aliaksandr Valialkin 2024-05-25 18:26:07 +02:00
parent bbb9887ccf
commit b83024330f
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
5 changed files with 576 additions and 0 deletions

View file

@ -19,6 +19,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip ## tip
* FEATURE: add [`unroll` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unroll-pipe), which can be used for unrolling JSON arrays stored in [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
* FEATURE: add [`replace_regexp` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#replace_regexp-pipe), which allows updating [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with regular expressions. * FEATURE: add [`replace_regexp` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#replace_regexp-pipe), which allows updating [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with regular expressions.
* FEATURE: improve performance for [`format`](https://docs.victoriametrics.com/victorialogs/logsql/#format-pipe) and [`extract`](https://docs.victoriametrics.com/victorialogs/logsql/#extract-pipe) pipes. * FEATURE: improve performance for [`format`](https://docs.victoriametrics.com/victorialogs/logsql/#format-pipe) and [`extract`](https://docs.victoriametrics.com/victorialogs/logsql/#extract-pipe) pipes.

View file

@ -1088,6 +1088,7 @@ LogsQL supports the following pipes:
- [`uniq`](#uniq-pipe) returns unique log entires. - [`uniq`](#uniq-pipe) returns unique log entires.
- [`unpack_json`](#unpack_json-pipe) unpacks JSON fields from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`unpack_json`](#unpack_json-pipe) unpacks JSON fields from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`unpack_logfmt`](#unpack_logfmt-pipe) unpacks [logfmt](https://brandur.org/logfmt) fields from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`unpack_logfmt`](#unpack_logfmt-pipe) unpacks [logfmt](https://brandur.org/logfmt) fields from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`unroll`](#unroll-pipe) unrolls JSON arrays from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
### copy pipe ### copy pipe
@ -1880,6 +1881,7 @@ See also:
- [Conditional `unpack_json`](#conditional-unpack_json) - [Conditional `unpack_json`](#conditional-unpack_json)
- [`unpack_logfmt` pipe](#unpack_logfmt-pipe) - [`unpack_logfmt` pipe](#unpack_logfmt-pipe)
- [`extract` pipe](#extract-pipe) - [`extract` pipe](#extract-pipe)
- [`unroll` pipe](#unroll-pipe)
#### Conditional unpack_json #### Conditional unpack_json
@ -1974,6 +1976,24 @@ only if `ip` field in the current log entry isn't set or empty:
_time:5m | unpack_logfmt if (ip:"") from foo _time:5m | unpack_logfmt if (ip:"") from foo
``` ```
### unroll pipe
`| unroll by (field1, ..., fieldN)` [pipe](#pipes) can be used for unrolling JSON arrays from `field1`, `fieldN`
[log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into separate rows.
For example, the following query unrolls `timestamp` and `value` [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) from logs for the last 5 minutes:
```logsql
_time:5m | unroll (timestamp, value)
```
See also:
- [`unpack_json` pipe](#unpack_json-pipe)
- [`extract` pipe](#extract-pipe)
- [`uniq_values` stats function](#uniq_values-stats)
- [`values` stats function](#values-stats)
## stats pipe functions ## stats pipe functions
LogsQL supports the following functions for [`stats` pipe](#stats-pipe): LogsQL supports the following functions for [`stats` pipe](#stats-pipe):
@ -2278,6 +2298,8 @@ over logs for the last 5 minutes:
_time:5m | stats uniq_values(ip) unique_ips _time:5m | stats uniq_values(ip) unique_ips
``` ```
The returned unique ip addresses can be unrolled into distinct log entries with [`unroll` pipe](#unroll-pipe).
Every unique value is stored in memory during query execution. Big number of unique values may require a lot of memory. Sometimes it is enough to return Every unique value is stored in memory during query execution. Big number of unique values may require a lot of memory. Sometimes it is enough to return
only a subset of unique values. In this case add `limit N` after `uniq_values(...)` in order to limit the number of returned unique values to `N`, only a subset of unique values. In this case add `limit N` after `uniq_values(...)` in order to limit the number of returned unique values to `N`,
while limiting the maximum memory usage. while limiting the maximum memory usage.
@ -2310,6 +2332,8 @@ over logs for the last 5 minutes:
_time:5m | stats values(ip) ips _time:5m | stats values(ip) ips
``` ```
The returned ip addresses can be unrolled into distinct log entries with [`unroll` pipe](#unroll-pipe).
See also: See also:
- [`uniq_values`](#uniq_values-stats) - [`uniq_values`](#uniq_values-stats)

View file

@ -194,6 +194,12 @@ func parsePipe(lex *lexer) (pipe, error) {
return nil, fmt.Errorf("cannot parse 'unpack_logfmt' pipe: %w", err) return nil, fmt.Errorf("cannot parse 'unpack_logfmt' pipe: %w", err)
} }
return pu, nil return pu, nil
case lex.isKeyword("unroll"):
pu, err := parsePipeUnroll(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'unroll' pipe: %w", err)
}
return pu, nil
default: default:
return nil, fmt.Errorf("unexpected pipe %q", lex.token) return nil, fmt.Errorf("unexpected pipe %q", lex.token)
} }

View file

@ -0,0 +1,284 @@
package logstorage
import (
"fmt"
"slices"
"unsafe"
"github.com/valyala/fastjson"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
)
// pipeUnroll processes '| unroll ...' pipe.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#unroll-pipe
type pipeUnroll struct {
// fields to unroll
fields []string
// iff is an optional filter for skipping the unroll
iff *ifFilter
}
func (pu *pipeUnroll) String() string {
s := "unroll"
if pu.iff != nil {
s += " " + pu.iff.String()
}
s += " by (" + fieldNamesString(pu.fields) + ")"
return s
}
func (pu *pipeUnroll) optimize() {
pu.iff.optimizeFilterIn()
}
func (pu *pipeUnroll) hasFilterInWithQuery() bool {
return pu.iff.hasFilterInWithQuery()
}
func (pu *pipeUnroll) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) {
iffNew, err := pu.iff.initFilterInValues(cache, getFieldValuesFunc)
if err != nil {
return nil, err
}
puNew := *pu
puNew.iff = iffNew
return &puNew, nil
}
func (pu *pipeUnroll) updateNeededFields(neededFields, unneededFields fieldsSet) {
if neededFields.contains("*") {
unneededFieldsCount := 0
for _, f := range pu.fields {
if unneededFields.contains(f) {
unneededFieldsCount++
}
}
if unneededFieldsCount < len(pu.fields) && pu.iff != nil {
unneededFields.removeFields(pu.iff.neededFields)
}
} else {
needIfFields := false
for _, f := range pu.fields {
if neededFields.contains(f) {
needIfFields = true
}
}
if needIfFields && pu.iff != nil {
neededFields.addFields(pu.iff.neededFields)
}
}
}
func (pu *pipeUnroll) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor {
return &pipeUnrollProcessor{
pu: pu,
ppBase: ppBase,
shards: make([]pipeUnrollProcessorShard, workersCount),
}
}
type pipeUnrollProcessor struct {
pu *pipeUnroll
ppBase pipeProcessor
shards []pipeUnrollProcessorShard
}
type pipeUnrollProcessorShard struct {
pipeUnrollProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(pipeUnrollProcessorShardNopad{})%128]byte
}
type pipeUnrollProcessorShardNopad struct {
bm bitmap
wctx pipeUnpackWriteContext
a arena
columnValues [][]string
unrolledValues [][]string
valuesBuf []string
fields []Field
}
func (pup *pipeUnrollProcessor) writeBlock(workerID uint, br *blockResult) {
if len(br.timestamps) == 0 {
return
}
pu := pup.pu
shard := &pup.shards[workerID]
shard.wctx.init(workerID, pup.ppBase, false, false, br)
bm := &shard.bm
bm.init(len(br.timestamps))
bm.setBits()
if iff := pu.iff; iff != nil {
iff.f.applyToBlockResult(br, bm)
if bm.isZero() {
pup.ppBase.writeBlock(workerID, br)
return
}
}
shard.columnValues = slicesutil.SetLength(shard.columnValues, len(pu.fields))
columnValues := shard.columnValues
for i, f := range pu.fields {
c := br.getColumnByName(f)
columnValues[i] = c.getValues(br)
}
fields := shard.fields
for rowIdx := range br.timestamps {
if bm.isSetBit(rowIdx) {
shard.writeUnrolledFields(br, pu.fields, columnValues, rowIdx)
} else {
fields = fields[:0]
for i, f := range pu.fields {
v := columnValues[i][rowIdx]
fields = append(fields, Field{
Name: f,
Value: v,
})
}
shard.wctx.writeRow(rowIdx, fields)
}
}
shard.wctx.flush()
shard.wctx.reset()
shard.a.reset()
}
func (shard *pipeUnrollProcessorShard) writeUnrolledFields(br *blockResult, fieldNames []string, columnValues [][]string, rowIdx int) {
// unroll values at rowIdx row
shard.unrolledValues = slicesutil.SetLength(shard.unrolledValues, len(columnValues))
unrolledValues := shard.unrolledValues
valuesBuf := shard.valuesBuf[:0]
for i, values := range columnValues {
v := values[rowIdx]
valuesBufLen := len(valuesBuf)
valuesBuf = unpackJSONArray(valuesBuf, &shard.a, v)
unrolledValues[i] = valuesBuf[valuesBufLen:]
}
shard.valuesBuf = valuesBuf
// find the number of rows across unrolled values
rows := len(unrolledValues[0])
for _, values := range unrolledValues[1:] {
if len(values) > rows {
rows = len(values)
}
}
if rows == 0 {
// Unroll too a single row with empty unrolled values.
rows = 1
}
// write unrolled values to the next pipe.
fields := shard.fields
for unrollIdx := 0; unrollIdx < rows; unrollIdx++ {
fields = fields[:0]
for i, values := range unrolledValues {
v := ""
if unrollIdx < len(values) {
v = values[unrollIdx]
}
fields = append(fields, Field{
Name: fieldNames[i],
Value: v,
})
}
shard.wctx.writeRow(rowIdx, fields)
}
}
func (pup *pipeUnrollProcessor) flush() error {
return nil
}
func parsePipeUnroll(lex *lexer) (*pipeUnroll, error) {
if !lex.isKeyword("unroll") {
return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "unroll")
}
lex.nextToken()
// parse optional if (...)
var iff *ifFilter
if lex.isKeyword("if") {
f, err := parseIfFilter(lex)
if err != nil {
return nil, err
}
iff = f
}
// parse by (...)
if lex.isKeyword("by") {
lex.nextToken()
}
fields, err := parseFieldNamesInParens(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'by(...)' at 'unroll': %w", err)
}
if len(fields) == 0 {
return nil, fmt.Errorf("'by(...)' at 'unroll' must contain at least a single field")
}
if slices.Contains(fields, "*") {
return nil, fmt.Errorf("unroll by '*' isn't supported")
}
pu := &pipeUnroll{
fields: fields,
iff: iff,
}
return pu, nil
}
func unpackJSONArray(dst []string, a *arena, s string) []string {
if s == "" || s[0] != '[' {
return dst
}
p := jspp.Get()
defer jspp.Put(p)
jsv, err := p.Parse(s)
if err != nil {
return dst
}
jsa, err := jsv.Array()
if err != nil {
return dst
}
for _, jsv := range jsa {
if jsv.Type() == fastjson.TypeString {
sb, err := jsv.StringBytes()
if err != nil {
logger.Panicf("BUG: unexpected error returned from StringBytes(): %s", err)
}
v := a.copyBytesToString(sb)
dst = append(dst, v)
} else {
bLen := len(a.b)
a.b = jsv.MarshalTo(a.b)
v := bytesutil.ToUnsafeString(a.b[bLen:])
dst = append(dst, v)
}
}
return dst
}
var jspp fastjson.ParserPool

View file

@ -0,0 +1,261 @@
package logstorage
import (
"reflect"
"testing"
)
func TestParsePipeUnrollSuccess(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParsePipeSuccess(t, pipeStr)
}
f(`unroll by (foo)`)
f(`unroll if (x:y) by (foo, bar)`)
}
func TestParsePipeUrollFailure(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParsePipeFailure(t, pipeStr)
}
f(`unroll`)
f(`unroll by ()`)
f(`unroll by (*)`)
f(`unroll by (f, *)`)
f(`unroll by`)
f(`unroll (`)
f(`unroll by (foo) bar`)
f(`unroll by (x) if (a:b)`)
}
func TestPipeUnroll(t *testing.T) {
f := func(pipeStr string, rows, rowsExpected [][]Field) {
t.Helper()
expectPipeResults(t, pipeStr, rows, rowsExpected)
}
// unroll by missing field
f("unroll (x)", [][]Field{
{
{"a", `["foo",1,{"baz":"x"},[1,2],null,NaN]`},
{"q", "w"},
},
}, [][]Field{
{
{"a", `["foo",1,{"baz":"x"},[1,2],null,NaN]`},
{"q", "w"},
{"x", ""},
},
})
// unroll by field without JSON array
f("unroll (q)", [][]Field{
{
{"a", `["foo",1,{"baz":"x"},[1,2],null,NaN]`},
{"q", "w"},
},
}, [][]Field{
{
{"a", `["foo",1,{"baz":"x"},[1,2],null,NaN]`},
{"q", ""},
},
})
// unroll by a single field
f("unroll (a)", [][]Field{
{
{"a", `["foo",1,{"baz":"x"},[1,2],null,NaN]`},
{"q", "w"},
},
{
{"a", "b"},
{"c", "d"},
},
}, [][]Field{
{
{"a", "foo"},
{"q", "w"},
},
{
{"a", "1"},
{"q", "w"},
},
{
{"a", `{"baz":"x"}`},
{"q", "w"},
},
{
{"a", "[1,2]"},
{"q", "w"},
},
{
{"a", "null"},
{"q", "w"},
},
{
{"a", "NaN"},
{"q", "w"},
},
{
{"a", ""},
{"c", "d"},
},
})
// unroll by multiple fields
f("unroll by (timestamp, value)", [][]Field{
{
{"timestamp", "[1,2,3]"},
{"value", `["foo","bar","baz"]`},
{"other", "abc"},
{"x", "y"},
},
{
{"timestamp", "[1]"},
{"value", `["foo","bar"]`},
},
{
{"timestamp", "[1]"},
{"value", `bar`},
{"q", "w"},
},
}, [][]Field{
{
{"timestamp", "1"},
{"value", "foo"},
{"other", "abc"},
{"x", "y"},
},
{
{"timestamp", "2"},
{"value", "bar"},
{"other", "abc"},
{"x", "y"},
},
{
{"timestamp", "3"},
{"value", "baz"},
{"other", "abc"},
{"x", "y"},
},
{
{"timestamp", "1"},
{"value", "foo"},
},
{
{"timestamp", ""},
{"value", "bar"},
},
{
{"timestamp", "1"},
{"value", ""},
{"q", "w"},
},
})
// conditional unroll by missing field
f("unroll if (q:abc) (a)", [][]Field{
{
{"a", `asd`},
{"q", "w"},
},
{
{"a", `["foo",123]`},
{"q", "abc"},
},
}, [][]Field{
{
{"a", `asd`},
{"q", "w"},
},
{
{"a", "foo"},
{"q", "abc"},
},
{
{"a", "123"},
{"q", "abc"},
},
})
// unroll by non-existing field
f("unroll (a)", [][]Field{
{
{"a", `asd`},
{"q", "w"},
},
{
{"a", `["foo",123]`},
{"q", "abc"},
},
}, [][]Field{
{
{"a", ``},
{"q", "w"},
},
{
{"a", "foo"},
{"q", "abc"},
},
{
{"a", "123"},
{"q", "abc"},
},
})
}
func TestPipeUnrollUpdateNeededFields(t *testing.T) {
f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
t.Helper()
expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected)
}
// all the needed fields
f("unroll (x)", "*", "", "*", "")
f("unroll (x, y)", "*", "", "*", "")
f("unroll if (y:z) (a, b)", "*", "", "*", "")
// all the needed fields, unneeded fields do not intersect with src
f("unroll (x)", "*", "f1,f2", "*", "f1,f2")
f("unroll if (a:b) (x)", "*", "f1,f2", "*", "f1,f2")
f("unroll if (f1:b) (x)", "*", "f1,f2", "*", "f2")
// all the needed fields, unneeded fields intersect with src
f("unroll (x)", "*", "f2,x", "*", "f2,x")
f("unroll if (a:b) (x)", "*", "f2,x", "*", "f2,x")
f("unroll if (f2:b) (x)", "*", "f2,x", "*", "f2,x")
// needed fields do not intersect with src
f("unroll (x)", "f1,f2", "", "f1,f2", "")
f("unroll if (a:b) (x)", "f1,f2", "", "f1,f2", "")
// needed fields intersect with src
f("unroll (x)", "f2,x", "", "f2,x", "")
f("unroll if (a:b) (x)", "f2,x", "", "a,f2,x", "")
}
func TestUnpackJSONArray(t *testing.T) {
f := func(s string, resultExpected []string) {
t.Helper()
var a arena
result := unpackJSONArray(nil, &a, s)
if !reflect.DeepEqual(result, resultExpected) {
t.Fatalf("unexpected result for unpackJSONArray(%q)\ngot\n%q\nwant\n%q", s, result, resultExpected)
}
}
f("", nil)
f("123", nil)
f("foo", nil)
f(`"foo"`, nil)
f(`{"foo":"bar"}`, nil)
f(`[foo`, nil)
f(`[]`, nil)
f(`[1]`, []string{"1"})
f(`[1,"foo",["bar",12],{"baz":"x"},NaN,null]`, []string{"1", "foo", `["bar",12]`, `{"baz":"x"}`, "NaN", "null"})
}