diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 26c09f65f..ed752dd55 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +* FEATURE: add [`field_values` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#field_values-pipe), which returns unique values for the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). * FEATURE: allow omitting `stats` prefix in [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe). For example, `_time:5m | count() rows` is a valid query now. It is equivalent to `_time:5m | stats count() as rows`. * FEATURE: allow omitting `filter` prefix in [`filter` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#filter-pipe) if the filter doesn't clash with [pipe names](#https://docs.victoriametrics.com/victorialogs/logsql/#pipes). For example, `_time:5m | stats by (host) count() rows | rows:>1000` is a valid query now. It is equivalent to `_time:5m | stats by (host) count() rows | filter rows:>1000`. * FEATURE: allow [`head` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe) without number. For example, `error | head`. In this case 10 last values are returned as `head` Unix command does by default. diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 8a6e081fa..7dfe0d727 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1153,6 +1153,7 @@ LogsQL supports the following pipes: - [`delete`](#delete-pipe) deletes [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`extract`](#extract-pipe) extracts the sepcified text into the given log fields. - [`field_names`](#field_names-pipe) returns all the names of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +- [`field_values`](#field_values-pipe) returns all the values for the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`fields`](#fields-pipe) selects the given set of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`filter`](#filter-pipe) applies additional [filters](#filters) to results. - [`format`](#format-pipe) formats ouptut field from input [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). @@ -1365,8 +1366,33 @@ Field names are returned in arbitrary order. Use [`sort` pipe](#sort-pipe) in or See also: +- [`field_values` pipe](#field_values-pipe) - [`uniq` pipe](#uniq-pipe) +### field_values pipe + +`| field_values field_name` [pipe](#pipe) returns all the values for the given [`field_name` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) +with the number of logs per each value. +For example, the following query returns all the values with the number of matching logs for the field `level` over logs logs for the last 5 minutes: + +```logsql +_time:5m | field_values level +``` + +It is possible limiting the number of returned values by adding `limit N` to the end of the `field_values ...`. For example, the following query returns +up to 10 values for the field `user_id` over logs for the last 5 minutes: + +```logsql +_time:5m | field_values user_id limit 10 +``` + +If the limit is reached, then the set of returned values is random. Also the number of matchin logs per each returned value is zeroed for performance reasons. + +See also: + +- [`field_names` pipe](#field_names-pipe) +- [`uniq` pipe)(#uniq-pipe) + ### fields pipe By default all the [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) are returned in the response. diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index 464c4e5c2..ada7a0d20 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -118,6 +118,12 @@ func parsePipe(lex *lexer) (pipe, error) { return nil, fmt.Errorf("cannot parse 'field_names' pipe: %w", err) } return pf, nil + case lex.isKeyword("field_values"): + pf, err := parsePipeFieldValues(lex) + if err != nil { + return nil, fmt.Errorf("cannot pase 'field_values' pipe: %w", err) + } + return pf, nil case lex.isKeyword("fields", "keep"): pf, err := parsePipeFields(lex) if err != nil { diff --git a/lib/logstorage/pipe_field_values.go b/lib/logstorage/pipe_field_values.go new file mode 100644 index 000000000..d9c1f57ac --- /dev/null +++ b/lib/logstorage/pipe_field_values.go @@ -0,0 +1,93 @@ +package logstorage + +import ( + "fmt" +) + +// pipeFieldValues processes '| field_values ...' queries. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#field_values-pipe +type pipeFieldValues struct { + field string + + limit uint64 +} + +func (pf *pipeFieldValues) String() string { + s := "field_values " + quoteTokenIfNeeded(pf.field) + if pf.limit > 0 { + s += fmt.Sprintf(" limit %d", pf.limit) + } + return s +} + +func (pf *pipeFieldValues) updateNeededFields(neededFields, unneededFields fieldsSet) { + if neededFields.contains("*") { + neededFields.reset() + if !unneededFields.contains(pf.field) { + neededFields.add(pf.field) + } + unneededFields.reset() + } else { + neededFieldsOrig := neededFields.clone() + neededFields.reset() + if neededFieldsOrig.contains(pf.field) { + neededFields.add(pf.field) + } + } +} + +func (pf *pipeFieldValues) optimize() { + // nothing to do +} + +func (pf *pipeFieldValues) hasFilterInWithQuery() bool { + return false +} + +func (pf *pipeFieldValues) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) { + return pf, nil +} + +func (pf *pipeFieldValues) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor { + hitsFieldName := "hits" + if hitsFieldName == pf.field { + hitsFieldName = "hitss" + } + pu := &pipeUniq{ + byFields: []string{pf.field}, + hitsFieldName: hitsFieldName, + limit: pf.limit, + } + return pu.newPipeProcessor(workersCount, stopCh, cancel, ppNext) +} + +func parsePipeFieldValues(lex *lexer) (*pipeFieldValues, error) { + if !lex.isKeyword("field_values") { + return nil, fmt.Errorf("expecting 'field_values'; got %q", lex.token) + } + lex.nextToken() + + field, err := parseFieldName(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse field name for 'field_values': %w", err) + } + + limit := uint64(0) + if lex.isKeyword("limit") { + lex.nextToken() + n, ok := tryParseUint64(lex.token) + if !ok { + return nil, fmt.Errorf("cannot parse 'limit %s'", lex.token) + } + lex.nextToken() + limit = n + } + + pf := &pipeFieldValues{ + field: field, + limit: limit, + } + + return pf, nil +} diff --git a/lib/logstorage/pipe_field_values_test.go b/lib/logstorage/pipe_field_values_test.go new file mode 100644 index 000000000..26b0419b6 --- /dev/null +++ b/lib/logstorage/pipe_field_values_test.go @@ -0,0 +1,148 @@ +package logstorage + +import ( + "testing" +) + +func TestParsePipeFieldValuesSuccess(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeSuccess(t, pipeStr) + } + + f(`field_values x`) + f(`field_values x limit 10`) +} + +func TestParsePipeFieldValuesFailure(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeFailure(t, pipeStr) + } + + f(`field_values`) + f(`field_values a b`) + f(`field_values a limit`) + f(`field_values limit N`) +} + +func TestPipeFieldValues(t *testing.T) { + f := func(pipeStr string, rows, rowsExpected [][]Field) { + t.Helper() + expectPipeResults(t, pipeStr, rows, rowsExpected) + } + + f("field_values a", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"a", "2"}, + {"hits", "3"}, + }, + }) + + f("field_values b", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"b", "3"}, + {"hits", "2"}, + }, + { + {"b", "54"}, + {"hits", "1"}, + }, + }) + + f("field_values c", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"c", ""}, + {"hits", "2"}, + }, + { + {"c", "d"}, + {"hits", "1"}, + }, + }) + + f("field_values d", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"d", ""}, + {"hits", "3"}, + }, + }) +} + +func TestPipeFieldValuesUpdateNeededFields(t *testing.T) { + f := func(s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { + t.Helper() + expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected) + } + + // all the needed fields + f("field_values x", "*", "", "x", "") + + // all the needed fields, unneeded fields do not intersect with src + f("field_values x", "*", "f1,f2", "x", "") + + // all the needed fields, unneeded fields intersect with src + f("field_values x", "*", "f1,x", "", "") + + // needed fields do not intersect with src + f("field_values x", "f1,f2", "", "", "") + + // needed fields intersect with src + f("field_values x", "f1,x", "", "x", "") +} diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index f7e43b89b..bf681fc0e 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -229,12 +229,12 @@ func (s *Storage) getFieldValuesNoHits(ctx context.Context, tenantIDs []TenantID func (s *Storage) GetFieldValues(ctx context.Context, tenantIDs []TenantID, q *Query, fieldName string, limit uint64) ([]ValueWithHits, error) { pipes := append([]pipe{}, q.pipes...) quotedFieldName := quoteTokenIfNeeded(fieldName) - pipeStr := fmt.Sprintf("uniq by (%s) with hits limit %d", quotedFieldName, limit) + pipeStr := fmt.Sprintf("field_values %s limit %d", quotedFieldName, limit) lex := newLexer(pipeStr) - pu, err := parsePipeUniq(lex) + pu, err := parsePipeFieldValues(lex) if err != nil { - logger.Panicf("BUG: unexpected error when parsing 'uniq' pipe at [%s]: %s", pipeStr, err) + logger.Panicf("BUG: unexpected error when parsing 'field_values' pipe at [%s]: %s", pipeStr, err) } if !lex.isEnd() { diff --git a/lib/logstorage/storage_search_test.go b/lib/logstorage/storage_search_test.go index f664a6f63..488c46c48 100644 --- a/lib/logstorage/storage_search_test.go +++ b/lib/logstorage/storage_search_test.go @@ -3,6 +3,7 @@ package logstorage import ( "context" "fmt" + "reflect" "strings" "sync" "sync/atomic" @@ -311,6 +312,157 @@ func TestStorageRunQuery(t *testing.T) { tenantIDs := []TenantID{tenantID} mustRunQuery(tenantIDs, q, writeBlock) }) + t.Run("field_names-all", func(t *testing.T) { + q := mustParseQuery("*") + names, err := s.GetFieldNames(context.Background(), allTenantIDs, q) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + resultExpected := []ValueWithHits{ + {"_msg", 1155}, + {"_stream", 1155}, + {"_time", 1155}, + {"instance", 1155}, + {"job", 1155}, + {"source-file", 1155}, + {"stream-id", 1155}, + {"tenant.id", 1155}, + } + if !reflect.DeepEqual(names, resultExpected) { + t.Fatalf("unexpected result; got\n%v\nwant\n%v", names, resultExpected) + } + }) + t.Run("field_names-some", func(t *testing.T) { + q := mustParseQuery(`_stream:{instance=~"host-1:.+"}`) + names, err := s.GetFieldNames(context.Background(), allTenantIDs, q) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + resultExpected := []ValueWithHits{ + {"_msg", 385}, + {"_stream", 385}, + {"_time", 385}, + {"instance", 385}, + {"job", 385}, + {"source-file", 385}, + {"stream-id", 385}, + {"tenant.id", 385}, + } + if !reflect.DeepEqual(names, resultExpected) { + t.Fatalf("unexpected result; got\n%v\nwant\n%v", names, resultExpected) + } + }) + t.Run("field_values-nolimit", func(t *testing.T) { + q := mustParseQuery("*") + values, err := s.GetFieldValues(context.Background(), allTenantIDs, q, "_stream", 0) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + resultExpected := []ValueWithHits{ + {`{instance="host-0:234",job="foobar"}`, 385}, + {`{instance="host-1:234",job="foobar"}`, 385}, + {`{instance="host-2:234",job="foobar"}`, 385}, + } + if !reflect.DeepEqual(values, resultExpected) { + t.Fatalf("unexpected result; got\n%v\nwant\n%v", values, resultExpected) + } + }) + t.Run("field_values-limit", func(t *testing.T) { + q := mustParseQuery("*") + values, err := s.GetFieldValues(context.Background(), allTenantIDs, q, "_stream", 3) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + resultExpected := []ValueWithHits{ + {`{instance="host-0:234",job="foobar"}`, 0}, + {`{instance="host-1:234",job="foobar"}`, 0}, + {`{instance="host-2:234",job="foobar"}`, 0}, + } + if !reflect.DeepEqual(values, resultExpected) { + t.Fatalf("unexpected result; got\n%v\nwant\n%v", values, resultExpected) + } + }) + t.Run("field_values-limit", func(t *testing.T) { + q := mustParseQuery("instance:='host-1:234'") + values, err := s.GetFieldValues(context.Background(), allTenantIDs, q, "_stream", 4) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + resultExpected := []ValueWithHits{ + {`{instance="host-1:234",job="foobar"}`, 385}, + } + if !reflect.DeepEqual(values, resultExpected) { + t.Fatalf("unexpected result; got\n%v\nwant\n%v", values, resultExpected) + } + }) + t.Run("stream_field_names", func(t *testing.T) { + q := mustParseQuery("*") + names, err := s.GetStreamFieldNames(context.Background(), allTenantIDs, q) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + resultExpected := []ValueWithHits{ + {"instance", 1155}, + {"job", 1155}, + } + if !reflect.DeepEqual(names, resultExpected) { + t.Fatalf("unexpected result; got\n%v\nwant\n%v", names, resultExpected) + } + }) + t.Run("stream_field_values-nolimit", func(t *testing.T) { + q := mustParseQuery("*") + values, err := s.GetStreamFieldValues(context.Background(), allTenantIDs, q, "instance", 0) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + resultExpected := []ValueWithHits{ + {`host-0:234`, 385}, + {`host-1:234`, 385}, + {`host-2:234`, 385}, + } + if !reflect.DeepEqual(values, resultExpected) { + t.Fatalf("unexpected result; got\n%v\nwant\n%v", values, resultExpected) + } + }) + t.Run("stream_field_values-limit", func(t *testing.T) { + q := mustParseQuery("*") + values, err := s.GetStreamFieldValues(context.Background(), allTenantIDs, q, "instance", 3) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + resultExpected := []ValueWithHits{ + {`host-0:234`, 385}, + {`host-1:234`, 385}, + {`host-2:234`, 385}, + } + if !reflect.DeepEqual(values, resultExpected) { + t.Fatalf("unexpected result; got\n%v\nwant\n%v", values, resultExpected) + } + }) + t.Run("streams", func(t *testing.T) { + q := mustParseQuery("*") + names, err := s.GetStreams(context.Background(), allTenantIDs, q, 0) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + resultExpected := []ValueWithHits{ + {`{instance="host-0:234",job="foobar"}`, 385}, + {`{instance="host-1:234",job="foobar"}`, 385}, + {`{instance="host-2:234",job="foobar"}`, 385}, + } + if !reflect.DeepEqual(names, resultExpected) { + t.Fatalf("unexpected result; got\n%v\nwant\n%v", names, resultExpected) + } + }) // Run more complex tests f := func(t *testing.T, query string, rowsExpected [][]Field) {