diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 0f0350148..2925cea91 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +* FEATURE: add [`drop_empty_fields` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#drop_empty_fields-pipe) for dropping [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with empty values. + ## [v0.15.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.15.0-victorialogs) Released at 2024-05-30 diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 32725a5c0..edffeb18e 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1155,6 +1155,7 @@ LogsQL supports the following pipes: - [`copy`](#copy-pipe) copies [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`delete`](#delete-pipe) deletes [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +- [`drop_empty_fields`](#drop_empty_fields-pipe) drops [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with empty values. - [`extract`](#extract-pipe) extracts the sepcified text into the given log fields. - [`extract_regexp`](#extract_regexp-pipe) extracts the sepcified text into the given log fields via [RE2 regular expressions](https://github.com/google/re2/wiki/Syntax). - [`field_names`](#field_names-pipe) returns all the names of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). @@ -1219,6 +1220,22 @@ See also: - [`rename` pipe](#rename-pipe) - [`fields` pipe](#fields-pipe) +### drop_empty_fields pipe + +`| drop_empty_fields` pipe drops [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with empty values. It also skips log entries with zero non-empty fields. + +For example, the following query drops possible empty `email` field generated by [`extract` pipe](#extract-pipe) if the `foo` field doesn't contain email: + +```logsql +_time:5m | extract 'email: ,' from foo | drop_empty_fields +``` + +See also: + +- [`filter` pipe](#filter-pipe) +- [`extract` pipe](#extract-pipe) + + ### extract pipe `| extract "pattern" from field_name` [pipe](#pipes) allows extracting abitrary text into output fields according to the [`pattern`](#format-for-extract-pipe-pattern) from the given @@ -1295,6 +1312,7 @@ the corresponding matching substring to. Matching starts from the first occurence of the `text1` in the input text. If the `pattern` starts with `` and doesn't contain `text1`, then the matching starts from the beginning of the input text. Matching is performed sequentially according to the `pattern`. If some `textX` isn't found in the remaining input text, then the remaining named placeholders receive empty string values and the matching finishes prematurely. +The empty string values can be dropped with [`drop_empty_fields` pipe](#drop_empty_fields-pipe). Matching finishes successfully when `textN+1` is found in the input text. If the `pattern` ends with `` and doesn't contain `textN+1`, then the `` matches the remaining input text. diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index 32a72d0dc..4f4000718 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -106,6 +106,12 @@ func parsePipe(lex *lexer) (pipe, error) { return nil, fmt.Errorf("cannot parse 'delete' pipe: %w", err) } return pd, nil + case lex.isKeyword("drop_empty_fields"): + pd, err := parsePipeDropEmptyFields(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'drop_empty_fields' pipe: %w", err) + } + return pd, nil case lex.isKeyword("extract"): pe, err := parsePipeExtract(lex) if err != nil { diff --git a/lib/logstorage/pipe_drop_empty_fields.go b/lib/logstorage/pipe_drop_empty_fields.go new file mode 100644 index 000000000..c6b1594a9 --- /dev/null +++ b/lib/logstorage/pipe_drop_empty_fields.go @@ -0,0 +1,223 @@ +package logstorage + +import ( + "fmt" + "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" +) + +// pipeDropEmptyFields processes '| drop_empty_fields ...' pipe. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#drop_empty_fields-pipe +type pipeDropEmptyFields struct { +} + +func (pd *pipeDropEmptyFields) String() string { + return "drop_empty_fields" +} + +func (pd *pipeDropEmptyFields) optimize() { + // nothing to do +} + +func (pd *pipeDropEmptyFields) hasFilterInWithQuery() bool { + return false +} + +func (pd *pipeDropEmptyFields) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) { + return pd, nil +} + +func (pd *pipeDropEmptyFields) updateNeededFields(_, _ fieldsSet) { + // nothing to do +} + +func (pd *pipeDropEmptyFields) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { + return &pipeDropEmptyFieldsProcessor{ + ppNext: ppNext, + + shards: make([]pipeDropEmptyFieldsProcessorShard, workersCount), + } +} + +type pipeDropEmptyFieldsProcessor struct { + ppNext pipeProcessor + + shards []pipeDropEmptyFieldsProcessorShard +} + +type pipeDropEmptyFieldsProcessorShard struct { + pipeDropEmptyFieldsProcessorShardNopad + + // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof(pipeDropEmptyFieldsProcessorShardNopad{})%128]byte +} + +type pipeDropEmptyFieldsProcessorShardNopad struct { + columnValues [][]string + fields []Field + + wctx pipeDropEmptyFieldsWriteContext +} + +func (pdp *pipeDropEmptyFieldsProcessor) writeBlock(workerID uint, br *blockResult) { + if len(br.timestamps) == 0 { + return + } + + shard := &pdp.shards[workerID] + + cs := br.getColumns() + + shard.columnValues = slicesutil.SetLength(shard.columnValues, len(cs)) + columnValues := shard.columnValues + for i, c := range cs { + columnValues[i] = c.getValues(br) + } + + if !hasEmptyValues(columnValues) { + // Fast path - just write br to ppNext, since it has no empty values. + pdp.ppNext.writeBlock(workerID, br) + return + } + + // Slow path - drop fields with empty values + shard.wctx.init(workerID, pdp.ppNext) + + fields := shard.fields + for rowIdx := range br.timestamps { + fields = fields[:0] + for i, values := range columnValues { + v := values[rowIdx] + if v == "" { + continue + } + fields = append(fields, Field{ + Name: cs[i].name, + Value: values[rowIdx], + }) + } + shard.wctx.writeRow(fields) + } + shard.fields = fields + + shard.wctx.flush() +} + +func (pdp *pipeDropEmptyFieldsProcessor) flush() error { + return nil +} + +type pipeDropEmptyFieldsWriteContext struct { + workerID uint + ppNext pipeProcessor + + rcs []resultColumn + br blockResult + + // rowsCount is the number of rows in the current block + rowsCount int + + // valuesLen is the total length of values in the current block + valuesLen int +} + +func (wctx *pipeDropEmptyFieldsWriteContext) reset() { + wctx.workerID = 0 + wctx.ppNext = nil + + rcs := wctx.rcs + for i := range rcs { + rcs[i].reset() + } + wctx.rcs = rcs[:0] + + wctx.rowsCount = 0 + wctx.valuesLen = 0 +} + +func (wctx *pipeDropEmptyFieldsWriteContext) init(workerID uint, ppNext pipeProcessor) { + wctx.reset() + + wctx.workerID = workerID + wctx.ppNext = ppNext +} + +func (wctx *pipeDropEmptyFieldsWriteContext) writeRow(fields []Field) { + if len(fields) == 0 { + // skip rows without non-empty fields + return + } + + rcs := wctx.rcs + + areEqualColumns := len(rcs) == len(fields) + if areEqualColumns { + for i, f := range fields { + if rcs[i].name != f.Name { + areEqualColumns = false + break + } + } + } + if !areEqualColumns { + // send the current block to ppNext and construct a block with new set of columns + wctx.flush() + + rcs = wctx.rcs[:0] + for _, f := range fields { + rcs = appendResultColumnWithName(rcs, f.Name) + } + wctx.rcs = rcs + } + + for i, f := range fields { + v := f.Value + rcs[i].addValue(v) + wctx.valuesLen += len(v) + } + + wctx.rowsCount++ + if wctx.valuesLen >= 1_000_000 { + wctx.flush() + } +} + +func (wctx *pipeDropEmptyFieldsWriteContext) flush() { + rcs := wctx.rcs + + wctx.valuesLen = 0 + + // Flush rcs to ppNext + br := &wctx.br + br.setResultColumns(rcs, wctx.rowsCount) + wctx.rowsCount = 0 + wctx.ppNext.writeBlock(wctx.workerID, br) + br.reset() + for i := range rcs { + rcs[i].resetValues() + } +} + +func parsePipeDropEmptyFields(lex *lexer) (*pipeDropEmptyFields, error) { + if !lex.isKeyword("drop_empty_fields") { + return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "drop_empty_fields") + } + lex.nextToken() + + pd := &pipeDropEmptyFields{} + + return pd, nil +} + +func hasEmptyValues(columnValues [][]string) bool { + for _, values := range columnValues { + for _, v := range values { + if v == "" { + return true + } + } + } + return false +} diff --git a/lib/logstorage/pipe_drop_empty_fields_test.go b/lib/logstorage/pipe_drop_empty_fields_test.go new file mode 100644 index 000000000..fa2422f82 --- /dev/null +++ b/lib/logstorage/pipe_drop_empty_fields_test.go @@ -0,0 +1,94 @@ +package logstorage + +import ( + "testing" +) + +func TestParsePipeDropEmptyFieldsSuccess(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeSuccess(t, pipeStr) + } + + f(`drop_empty_fields`) +} + +func TestParsePipeDropEmptyFieldsFailure(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeFailure(t, pipeStr) + } + + f(`drop_empty_fields foo`) +} + +func TestPipeDropEmptyFields(t *testing.T) { + f := func(pipeStr string, rows, rowsExpected [][]Field) { + t.Helper() + expectPipeResults(t, pipeStr, rows, rowsExpected) + } + + f(`drop_empty_fields`, [][]Field{ + { + {"a", "foo"}, + {"b", "bar"}, + {"c", "baz"}, + }, + }, [][]Field{ + { + {"a", "foo"}, + {"b", "bar"}, + {"c", "baz"}, + }, + }) + f(`drop_empty_fields`, [][]Field{ + { + {"a", "foo"}, + {"b", "bar"}, + {"c", "baz"}, + }, + { + {"a", "foo1"}, + {"b", ""}, + {"c", "baz1"}, + }, + { + {"a", ""}, + {"b", "bar2"}, + }, + { + {"a", ""}, + {"b", ""}, + {"c", ""}, + }, + }, [][]Field{ + { + {"a", "foo"}, + {"b", "bar"}, + {"c", "baz"}, + }, + { + {"a", "foo1"}, + {"c", "baz1"}, + }, + { + {"b", "bar2"}, + }, + }) +} + +func TestPipeDropEmptyFieldsUpdateNeededFields(t *testing.T) { + f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { + t.Helper() + expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected) + } + + // all the needed fields + f(`drop_empty_fields`, "*", "", "*", "") + + // non-empty unneeded fields + f(`drop_empty_fields`, "*", "f1,f2", "*", "f1,f2") + + // non-empty needed fields + f(`drop_empty_fields`, "x,y", "", "x,y", "") +}