This commit is contained in:
Aliaksandr Valialkin 2024-05-27 18:50:22 +02:00
parent 66e294edfb
commit 741fb6b038
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
7 changed files with 429 additions and 3 deletions

View file

@ -19,6 +19,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip
* FEATURE: add [`field_values` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#field_values-pipe), which returns unique values for the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
* FEATURE: allow omitting `stats` prefix in [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe). For example, `_time:5m | count() rows` is a valid query now. It is equivalent to `_time:5m | stats count() as rows`.
* FEATURE: allow omitting `filter` prefix in [`filter` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#filter-pipe) if the filter doesn't clash with [pipe names](#https://docs.victoriametrics.com/victorialogs/logsql/#pipes). For example, `_time:5m | stats by (host) count() rows | rows:>1000` is a valid query now. It is equivalent to `_time:5m | stats by (host) count() rows | filter rows:>1000`.
* FEATURE: allow [`head` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe) without number. For example, `error | head`. In this case 10 last values are returned as `head` Unix command does by default.

View file

@ -1153,6 +1153,7 @@ LogsQL supports the following pipes:
- [`delete`](#delete-pipe) deletes [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`extract`](#extract-pipe) extracts the sepcified text into the given log fields.
- [`field_names`](#field_names-pipe) returns all the names of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`field_values`](#field_values-pipe) returns all the values for the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`fields`](#fields-pipe) selects the given set of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`filter`](#filter-pipe) applies additional [filters](#filters) to results.
- [`format`](#format-pipe) formats ouptut field from input [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
@ -1365,8 +1366,33 @@ Field names are returned in arbitrary order. Use [`sort` pipe](#sort-pipe) in or
See also:
- [`field_values` pipe](#field_values-pipe)
- [`uniq` pipe](#uniq-pipe)
### field_values pipe
`| field_values field_name` [pipe](#pipe) returns all the values for the given [`field_name` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
with the number of logs per each value.
For example, the following query returns all the values with the number of matching logs for the field `level` over logs logs for the last 5 minutes:
```logsql
_time:5m | field_values level
```
It is possible limiting the number of returned values by adding `limit N` to the end of the `field_values ...`. For example, the following query returns
up to 10 values for the field `user_id` over logs for the last 5 minutes:
```logsql
_time:5m | field_values user_id limit 10
```
If the limit is reached, then the set of returned values is random. Also the number of matchin logs per each returned value is zeroed for performance reasons.
See also:
- [`field_names` pipe](#field_names-pipe)
- [`uniq` pipe)(#uniq-pipe)
### fields pipe
By default all the [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) are returned in the response.

View file

@ -118,6 +118,12 @@ func parsePipe(lex *lexer) (pipe, error) {
return nil, fmt.Errorf("cannot parse 'field_names' pipe: %w", err)
}
return pf, nil
case lex.isKeyword("field_values"):
pf, err := parsePipeFieldValues(lex)
if err != nil {
return nil, fmt.Errorf("cannot pase 'field_values' pipe: %w", err)
}
return pf, nil
case lex.isKeyword("fields", "keep"):
pf, err := parsePipeFields(lex)
if err != nil {

View file

@ -0,0 +1,93 @@
package logstorage
import (
"fmt"
)
// pipeFieldValues processes '| field_values ...' queries.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#field_values-pipe
type pipeFieldValues struct {
field string
limit uint64
}
func (pf *pipeFieldValues) String() string {
s := "field_values " + quoteTokenIfNeeded(pf.field)
if pf.limit > 0 {
s += fmt.Sprintf(" limit %d", pf.limit)
}
return s
}
func (pf *pipeFieldValues) updateNeededFields(neededFields, unneededFields fieldsSet) {
if neededFields.contains("*") {
neededFields.reset()
if !unneededFields.contains(pf.field) {
neededFields.add(pf.field)
}
unneededFields.reset()
} else {
neededFieldsOrig := neededFields.clone()
neededFields.reset()
if neededFieldsOrig.contains(pf.field) {
neededFields.add(pf.field)
}
}
}
func (pf *pipeFieldValues) optimize() {
// nothing to do
}
func (pf *pipeFieldValues) hasFilterInWithQuery() bool {
return false
}
func (pf *pipeFieldValues) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) {
return pf, nil
}
func (pf *pipeFieldValues) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor {
hitsFieldName := "hits"
if hitsFieldName == pf.field {
hitsFieldName = "hitss"
}
pu := &pipeUniq{
byFields: []string{pf.field},
hitsFieldName: hitsFieldName,
limit: pf.limit,
}
return pu.newPipeProcessor(workersCount, stopCh, cancel, ppNext)
}
func parsePipeFieldValues(lex *lexer) (*pipeFieldValues, error) {
if !lex.isKeyword("field_values") {
return nil, fmt.Errorf("expecting 'field_values'; got %q", lex.token)
}
lex.nextToken()
field, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse field name for 'field_values': %w", err)
}
limit := uint64(0)
if lex.isKeyword("limit") {
lex.nextToken()
n, ok := tryParseUint64(lex.token)
if !ok {
return nil, fmt.Errorf("cannot parse 'limit %s'", lex.token)
}
lex.nextToken()
limit = n
}
pf := &pipeFieldValues{
field: field,
limit: limit,
}
return pf, nil
}

View file

@ -0,0 +1,148 @@
package logstorage
import (
"testing"
)
func TestParsePipeFieldValuesSuccess(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParsePipeSuccess(t, pipeStr)
}
f(`field_values x`)
f(`field_values x limit 10`)
}
func TestParsePipeFieldValuesFailure(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParsePipeFailure(t, pipeStr)
}
f(`field_values`)
f(`field_values a b`)
f(`field_values a limit`)
f(`field_values limit N`)
}
func TestPipeFieldValues(t *testing.T) {
f := func(pipeStr string, rows, rowsExpected [][]Field) {
t.Helper()
expectPipeResults(t, pipeStr, rows, rowsExpected)
}
f("field_values a", [][]Field{
{
{"a", `2`},
{"b", `3`},
},
{
{"a", "2"},
{"b", "3"},
},
{
{"a", `2`},
{"b", `54`},
{"c", "d"},
},
}, [][]Field{
{
{"a", "2"},
{"hits", "3"},
},
})
f("field_values b", [][]Field{
{
{"a", `2`},
{"b", `3`},
},
{
{"a", "2"},
{"b", "3"},
},
{
{"a", `2`},
{"b", `54`},
{"c", "d"},
},
}, [][]Field{
{
{"b", "3"},
{"hits", "2"},
},
{
{"b", "54"},
{"hits", "1"},
},
})
f("field_values c", [][]Field{
{
{"a", `2`},
{"b", `3`},
},
{
{"a", "2"},
{"b", "3"},
},
{
{"a", `2`},
{"b", `54`},
{"c", "d"},
},
}, [][]Field{
{
{"c", ""},
{"hits", "2"},
},
{
{"c", "d"},
{"hits", "1"},
},
})
f("field_values d", [][]Field{
{
{"a", `2`},
{"b", `3`},
},
{
{"a", "2"},
{"b", "3"},
},
{
{"a", `2`},
{"b", `54`},
{"c", "d"},
},
}, [][]Field{
{
{"d", ""},
{"hits", "3"},
},
})
}
func TestPipeFieldValuesUpdateNeededFields(t *testing.T) {
f := func(s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
t.Helper()
expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected)
}
// all the needed fields
f("field_values x", "*", "", "x", "")
// all the needed fields, unneeded fields do not intersect with src
f("field_values x", "*", "f1,f2", "x", "")
// all the needed fields, unneeded fields intersect with src
f("field_values x", "*", "f1,x", "", "")
// needed fields do not intersect with src
f("field_values x", "f1,f2", "", "", "")
// needed fields intersect with src
f("field_values x", "f1,x", "", "x", "")
}

View file

@ -229,12 +229,12 @@ func (s *Storage) getFieldValuesNoHits(ctx context.Context, tenantIDs []TenantID
func (s *Storage) GetFieldValues(ctx context.Context, tenantIDs []TenantID, q *Query, fieldName string, limit uint64) ([]ValueWithHits, error) {
pipes := append([]pipe{}, q.pipes...)
quotedFieldName := quoteTokenIfNeeded(fieldName)
pipeStr := fmt.Sprintf("uniq by (%s) with hits limit %d", quotedFieldName, limit)
pipeStr := fmt.Sprintf("field_values %s limit %d", quotedFieldName, limit)
lex := newLexer(pipeStr)
pu, err := parsePipeUniq(lex)
pu, err := parsePipeFieldValues(lex)
if err != nil {
logger.Panicf("BUG: unexpected error when parsing 'uniq' pipe at [%s]: %s", pipeStr, err)
logger.Panicf("BUG: unexpected error when parsing 'field_values' pipe at [%s]: %s", pipeStr, err)
}
if !lex.isEnd() {

View file

@ -3,6 +3,7 @@ package logstorage
import (
"context"
"fmt"
"reflect"
"strings"
"sync"
"sync/atomic"
@ -311,6 +312,157 @@ func TestStorageRunQuery(t *testing.T) {
tenantIDs := []TenantID{tenantID}
mustRunQuery(tenantIDs, q, writeBlock)
})
t.Run("field_names-all", func(t *testing.T) {
q := mustParseQuery("*")
names, err := s.GetFieldNames(context.Background(), allTenantIDs, q)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resultExpected := []ValueWithHits{
{"_msg", 1155},
{"_stream", 1155},
{"_time", 1155},
{"instance", 1155},
{"job", 1155},
{"source-file", 1155},
{"stream-id", 1155},
{"tenant.id", 1155},
}
if !reflect.DeepEqual(names, resultExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", names, resultExpected)
}
})
t.Run("field_names-some", func(t *testing.T) {
q := mustParseQuery(`_stream:{instance=~"host-1:.+"}`)
names, err := s.GetFieldNames(context.Background(), allTenantIDs, q)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resultExpected := []ValueWithHits{
{"_msg", 385},
{"_stream", 385},
{"_time", 385},
{"instance", 385},
{"job", 385},
{"source-file", 385},
{"stream-id", 385},
{"tenant.id", 385},
}
if !reflect.DeepEqual(names, resultExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", names, resultExpected)
}
})
t.Run("field_values-nolimit", func(t *testing.T) {
q := mustParseQuery("*")
values, err := s.GetFieldValues(context.Background(), allTenantIDs, q, "_stream", 0)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resultExpected := []ValueWithHits{
{`{instance="host-0:234",job="foobar"}`, 385},
{`{instance="host-1:234",job="foobar"}`, 385},
{`{instance="host-2:234",job="foobar"}`, 385},
}
if !reflect.DeepEqual(values, resultExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", values, resultExpected)
}
})
t.Run("field_values-limit", func(t *testing.T) {
q := mustParseQuery("*")
values, err := s.GetFieldValues(context.Background(), allTenantIDs, q, "_stream", 3)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resultExpected := []ValueWithHits{
{`{instance="host-0:234",job="foobar"}`, 0},
{`{instance="host-1:234",job="foobar"}`, 0},
{`{instance="host-2:234",job="foobar"}`, 0},
}
if !reflect.DeepEqual(values, resultExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", values, resultExpected)
}
})
t.Run("field_values-limit", func(t *testing.T) {
q := mustParseQuery("instance:='host-1:234'")
values, err := s.GetFieldValues(context.Background(), allTenantIDs, q, "_stream", 4)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resultExpected := []ValueWithHits{
{`{instance="host-1:234",job="foobar"}`, 385},
}
if !reflect.DeepEqual(values, resultExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", values, resultExpected)
}
})
t.Run("stream_field_names", func(t *testing.T) {
q := mustParseQuery("*")
names, err := s.GetStreamFieldNames(context.Background(), allTenantIDs, q)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resultExpected := []ValueWithHits{
{"instance", 1155},
{"job", 1155},
}
if !reflect.DeepEqual(names, resultExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", names, resultExpected)
}
})
t.Run("stream_field_values-nolimit", func(t *testing.T) {
q := mustParseQuery("*")
values, err := s.GetStreamFieldValues(context.Background(), allTenantIDs, q, "instance", 0)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resultExpected := []ValueWithHits{
{`host-0:234`, 385},
{`host-1:234`, 385},
{`host-2:234`, 385},
}
if !reflect.DeepEqual(values, resultExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", values, resultExpected)
}
})
t.Run("stream_field_values-limit", func(t *testing.T) {
q := mustParseQuery("*")
values, err := s.GetStreamFieldValues(context.Background(), allTenantIDs, q, "instance", 3)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resultExpected := []ValueWithHits{
{`host-0:234`, 385},
{`host-1:234`, 385},
{`host-2:234`, 385},
}
if !reflect.DeepEqual(values, resultExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", values, resultExpected)
}
})
t.Run("streams", func(t *testing.T) {
q := mustParseQuery("*")
names, err := s.GetStreams(context.Background(), allTenantIDs, q, 0)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resultExpected := []ValueWithHits{
{`{instance="host-0:234",job="foobar"}`, 385},
{`{instance="host-1:234",job="foobar"}`, 385},
{`{instance="host-2:234",job="foobar"}`, 385},
}
if !reflect.DeepEqual(names, resultExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", names, resultExpected)
}
})
// Run more complex tests
f := func(t *testing.T, query string, rowsExpected [][]Field) {