This commit is contained in:
Aliaksandr Valialkin 2024-05-30 19:45:10 +02:00
parent b21c39a871
commit 68b3bd370e
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
5 changed files with 343 additions and 0 deletions

View file

@ -19,6 +19,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip
* FEATURE: add [`drop_empty_fields` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#drop_empty_fields-pipe) for dropping [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with empty values.
## [v0.15.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.15.0-victorialogs)
Released at 2024-05-30

View file

@ -1155,6 +1155,7 @@ LogsQL supports the following pipes:
- [`copy`](#copy-pipe) copies [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`delete`](#delete-pipe) deletes [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`drop_empty_fields`](#drop_empty_fields-pipe) drops [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with empty values.
- [`extract`](#extract-pipe) extracts the sepcified text into the given log fields.
- [`extract_regexp`](#extract_regexp-pipe) extracts the sepcified text into the given log fields via [RE2 regular expressions](https://github.com/google/re2/wiki/Syntax).
- [`field_names`](#field_names-pipe) returns all the names of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
@ -1219,6 +1220,22 @@ See also:
- [`rename` pipe](#rename-pipe)
- [`fields` pipe](#fields-pipe)
### drop_empty_fields pipe
`| drop_empty_fields` pipe drops [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with empty values. It also skips log entries with zero non-empty fields.
For example, the following query drops possible empty `email` field generated by [`extract` pipe](#extract-pipe) if the `foo` field doesn't contain email:
```logsql
_time:5m | extract 'email: <email>,' from foo | drop_empty_fields
```
See also:
- [`filter` pipe](#filter-pipe)
- [`extract` pipe](#extract-pipe)
### extract pipe
`| extract "pattern" from field_name` [pipe](#pipes) allows extracting abitrary text into output fields according to the [`pattern`](#format-for-extract-pipe-pattern) from the given
@ -1295,6 +1312,7 @@ the corresponding matching substring to.
Matching starts from the first occurence of the `text1` in the input text. If the `pattern` starts with `<field1>` and doesn't contain `text1`,
then the matching starts from the beginning of the input text. Matching is performed sequentially according to the `pattern`. If some `textX` isn't found
in the remaining input text, then the remaining named placeholders receive empty string values and the matching finishes prematurely.
The empty string values can be dropped with [`drop_empty_fields` pipe](#drop_empty_fields-pipe).
Matching finishes successfully when `textN+1` is found in the input text.
If the `pattern` ends with `<fieldN>` and doesn't contain `textN+1`, then the `<fieldN>` matches the remaining input text.

View file

@ -106,6 +106,12 @@ func parsePipe(lex *lexer) (pipe, error) {
return nil, fmt.Errorf("cannot parse 'delete' pipe: %w", err)
}
return pd, nil
case lex.isKeyword("drop_empty_fields"):
pd, err := parsePipeDropEmptyFields(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'drop_empty_fields' pipe: %w", err)
}
return pd, nil
case lex.isKeyword("extract"):
pe, err := parsePipeExtract(lex)
if err != nil {

View file

@ -0,0 +1,223 @@
package logstorage
import (
"fmt"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
)
// pipeDropEmptyFields processes '| drop_empty_fields ...' pipe.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#drop_empty_fields-pipe
type pipeDropEmptyFields struct {
}
func (pd *pipeDropEmptyFields) String() string {
return "drop_empty_fields"
}
func (pd *pipeDropEmptyFields) optimize() {
// nothing to do
}
func (pd *pipeDropEmptyFields) hasFilterInWithQuery() bool {
return false
}
func (pd *pipeDropEmptyFields) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) {
return pd, nil
}
func (pd *pipeDropEmptyFields) updateNeededFields(_, _ fieldsSet) {
// nothing to do
}
func (pd *pipeDropEmptyFields) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
return &pipeDropEmptyFieldsProcessor{
ppNext: ppNext,
shards: make([]pipeDropEmptyFieldsProcessorShard, workersCount),
}
}
type pipeDropEmptyFieldsProcessor struct {
ppNext pipeProcessor
shards []pipeDropEmptyFieldsProcessorShard
}
type pipeDropEmptyFieldsProcessorShard struct {
pipeDropEmptyFieldsProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(pipeDropEmptyFieldsProcessorShardNopad{})%128]byte
}
type pipeDropEmptyFieldsProcessorShardNopad struct {
columnValues [][]string
fields []Field
wctx pipeDropEmptyFieldsWriteContext
}
func (pdp *pipeDropEmptyFieldsProcessor) writeBlock(workerID uint, br *blockResult) {
if len(br.timestamps) == 0 {
return
}
shard := &pdp.shards[workerID]
cs := br.getColumns()
shard.columnValues = slicesutil.SetLength(shard.columnValues, len(cs))
columnValues := shard.columnValues
for i, c := range cs {
columnValues[i] = c.getValues(br)
}
if !hasEmptyValues(columnValues) {
// Fast path - just write br to ppNext, since it has no empty values.
pdp.ppNext.writeBlock(workerID, br)
return
}
// Slow path - drop fields with empty values
shard.wctx.init(workerID, pdp.ppNext)
fields := shard.fields
for rowIdx := range br.timestamps {
fields = fields[:0]
for i, values := range columnValues {
v := values[rowIdx]
if v == "" {
continue
}
fields = append(fields, Field{
Name: cs[i].name,
Value: values[rowIdx],
})
}
shard.wctx.writeRow(fields)
}
shard.fields = fields
shard.wctx.flush()
}
func (pdp *pipeDropEmptyFieldsProcessor) flush() error {
return nil
}
type pipeDropEmptyFieldsWriteContext struct {
workerID uint
ppNext pipeProcessor
rcs []resultColumn
br blockResult
// rowsCount is the number of rows in the current block
rowsCount int
// valuesLen is the total length of values in the current block
valuesLen int
}
func (wctx *pipeDropEmptyFieldsWriteContext) reset() {
wctx.workerID = 0
wctx.ppNext = nil
rcs := wctx.rcs
for i := range rcs {
rcs[i].reset()
}
wctx.rcs = rcs[:0]
wctx.rowsCount = 0
wctx.valuesLen = 0
}
func (wctx *pipeDropEmptyFieldsWriteContext) init(workerID uint, ppNext pipeProcessor) {
wctx.reset()
wctx.workerID = workerID
wctx.ppNext = ppNext
}
func (wctx *pipeDropEmptyFieldsWriteContext) writeRow(fields []Field) {
if len(fields) == 0 {
// skip rows without non-empty fields
return
}
rcs := wctx.rcs
areEqualColumns := len(rcs) == len(fields)
if areEqualColumns {
for i, f := range fields {
if rcs[i].name != f.Name {
areEqualColumns = false
break
}
}
}
if !areEqualColumns {
// send the current block to ppNext and construct a block with new set of columns
wctx.flush()
rcs = wctx.rcs[:0]
for _, f := range fields {
rcs = appendResultColumnWithName(rcs, f.Name)
}
wctx.rcs = rcs
}
for i, f := range fields {
v := f.Value
rcs[i].addValue(v)
wctx.valuesLen += len(v)
}
wctx.rowsCount++
if wctx.valuesLen >= 1_000_000 {
wctx.flush()
}
}
func (wctx *pipeDropEmptyFieldsWriteContext) flush() {
rcs := wctx.rcs
wctx.valuesLen = 0
// Flush rcs to ppNext
br := &wctx.br
br.setResultColumns(rcs, wctx.rowsCount)
wctx.rowsCount = 0
wctx.ppNext.writeBlock(wctx.workerID, br)
br.reset()
for i := range rcs {
rcs[i].resetValues()
}
}
func parsePipeDropEmptyFields(lex *lexer) (*pipeDropEmptyFields, error) {
if !lex.isKeyword("drop_empty_fields") {
return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "drop_empty_fields")
}
lex.nextToken()
pd := &pipeDropEmptyFields{}
return pd, nil
}
func hasEmptyValues(columnValues [][]string) bool {
for _, values := range columnValues {
for _, v := range values {
if v == "" {
return true
}
}
}
return false
}

View file

@ -0,0 +1,94 @@
package logstorage
import (
"testing"
)
func TestParsePipeDropEmptyFieldsSuccess(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParsePipeSuccess(t, pipeStr)
}
f(`drop_empty_fields`)
}
func TestParsePipeDropEmptyFieldsFailure(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParsePipeFailure(t, pipeStr)
}
f(`drop_empty_fields foo`)
}
func TestPipeDropEmptyFields(t *testing.T) {
f := func(pipeStr string, rows, rowsExpected [][]Field) {
t.Helper()
expectPipeResults(t, pipeStr, rows, rowsExpected)
}
f(`drop_empty_fields`, [][]Field{
{
{"a", "foo"},
{"b", "bar"},
{"c", "baz"},
},
}, [][]Field{
{
{"a", "foo"},
{"b", "bar"},
{"c", "baz"},
},
})
f(`drop_empty_fields`, [][]Field{
{
{"a", "foo"},
{"b", "bar"},
{"c", "baz"},
},
{
{"a", "foo1"},
{"b", ""},
{"c", "baz1"},
},
{
{"a", ""},
{"b", "bar2"},
},
{
{"a", ""},
{"b", ""},
{"c", ""},
},
}, [][]Field{
{
{"a", "foo"},
{"b", "bar"},
{"c", "baz"},
},
{
{"a", "foo1"},
{"c", "baz1"},
},
{
{"b", "bar2"},
},
})
}
func TestPipeDropEmptyFieldsUpdateNeededFields(t *testing.T) {
f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
t.Helper()
expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected)
}
// all the needed fields
f(`drop_empty_fields`, "*", "", "*", "")
// non-empty unneeded fields
f(`drop_empty_fields`, "*", "f1,f2", "*", "f1,f2")
// non-empty needed fields
f(`drop_empty_fields`, "x,y", "", "x,y", "")
}