From 508e498ae3f277f8ac8652fe6c9c161ad3cb0968 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 16 Oct 2024 19:43:50 +0200 Subject: [PATCH] lib/logstorage: follow-up for 72941eac367eaac955b5ec2db2297dbc2738644f - Allow dropping metrics if the query result contains at least a single metric. - Allow copying by(...) fields. - Disallow overriding by(...) fields via `math` pipe. - Allow using `format` pipe in stats query. This is useful for constructing some labels from the existing by(...) fields. - Add more tests. - Remove the check for time range in the query filter according to https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7254/files#r1803405826 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7254 --- docs/VictoriaLogs/CHANGELOG.md | 1 + lib/logstorage/parser.go | 115 +++++++++++++++++++++---------- lib/logstorage/parser_test.go | 55 ++++++++++++++- lib/logstorage/storage_search.go | 3 - 4 files changed, 132 insertions(+), 42 deletions(-) diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index d870ac6d4..332271262 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -18,6 +18,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta * FEATURE: optimize [LogsQL queries](https://docs.victoriametrics.com/victorialogs/logsql/), which need to scan big number of logs with big number of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) (aka `wide events`). The performance for such queries is improved by 10x and more depending on the number of log fields in the scanned logs. The performance improvement is visible when querying logs ingested after the upgrade to this release. * FEATURE: add support for forced merge. See [these docs](https://docs.victoriametrics.com/victorialogs/#forced-merge). * FEATURE: skip empty log fields in query results, since they are treated as non-existing fields in [VictoriaLogs data model](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). This should reduce the level of confusion for end users when they see empty log fields. +* FEATURE: allow using [`format` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#format-pipe) for creating output labels from existing log fields at [`/select/logsql/query_stats`](https://docs.victoriametrics.com/victorialogs/querying/#querying-log-stats) and [`/select/logsql/query_range_stats`](https://docs.victoriametrics.com/victorialogs/querying/#querying-log-range-stats) endpoints. * FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): add the ability to cancel running queries. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7097). * BUGFIX: avoid possible panic when logs for a new day are ingested during execution of concurrent queries. diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index 4e3eda51c..46ed4cc4f 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -478,15 +478,19 @@ func (q *Query) GetStatsByFieldsAddGroupingByTime(step int64) ([]string, error) ps.byFields = addByTimeField(ps.byFields, step) // extract by(...) field names from stats pipe - byFields := ps.byFields - fields := make([]string, len(byFields)) - for i, f := range byFields { - fields[i] = f.name + byFields := make([]string, len(ps.byFields)) + for i, f := range ps.byFields { + byFields[i] = f.name } - resultNames := make([]string, len(ps.funcs)) - for i, f := range ps.funcs { - resultNames[i] = f.resultName + // extract metric fields from stats pipe + metricFields := make(map[string]struct{}, len(ps.funcs)) + for i := range ps.funcs { + f := &ps.funcs[i] + if slices.Contains(byFields, f.resultName) { + return nil, fmt.Errorf("the %q field cannot be overridden at %q in the query [%s]", f.resultName, ps, q) + } + metricFields[f.resultName] = struct{}{} } // verify that all the pipes after the idx do not add new fields @@ -496,60 +500,98 @@ func (q *Query) GetStatsByFieldsAddGroupingByTime(step int64) ([]string, error) case *pipeSort, *pipeOffset, *pipeLimit, *pipeFilter: // These pipes do not change the set of fields. case *pipeMath: - // Allow pipeMath, since it adds additional metrics to the given set of fields. - for _, f := range t.entries { - resultNames = append(resultNames, f.resultField) + // Allow `| math ...` pipe, since it adds additional metrics to the given set of fields. + // Verify that the result fields at math pipe do not override byFields. + for _, me := range t.entries { + if slices.Contains(byFields, me.resultField) { + return nil, fmt.Errorf("the %q field cannot be overridden at %q in the query [%s]", me.resultField, t, q) + } + metricFields[me.resultField] = struct{}{} } case *pipeFields: // `| fields ...` pipe must contain all the by(...) fields, otherwise it breaks output. - for _, f := range fields { + for _, f := range byFields { if !slices.Contains(t.fields, f) { return nil, fmt.Errorf("missing %q field at %q pipe in the query [%s]", f, p, q) } } - // field in `| fields ...` pipe must exist, otherwise it breaks output. + + remainingMetricFields := make(map[string]struct{}) for _, f := range t.fields { - if !slices.Contains(fields, f) && !slices.Contains(resultNames, f) { - return nil, fmt.Errorf("unknown %q field at %q pipe in the query [%s]", f, p, q) + if _, ok := metricFields[f]; ok { + remainingMetricFields[f] = struct{}{} } } + metricFields = remainingMetricFields case *pipeDelete: // Disallow deleting by(...) fields, since this breaks output. for _, f := range t.fields { - if slices.Contains(fields, f) { + if slices.Contains(byFields, f) { return nil, fmt.Errorf("the %q field cannot be deleted via %q in the query [%s]", f, p, q) } - for i := range resultNames { - if resultNames[i] == f { - resultNames = append(resultNames[:i], resultNames[i+1:]...) - break - } - } + delete(metricFields, f) } case *pipeCopy: - // Disallow copying by(...) fields, since this breaks output. - for _, f := range t.srcFields { - if slices.Contains(fields, f) { - return nil, fmt.Errorf("the %q field cannot be copied via %q in the query [%s]", f, p, q) + // Add copied fields to by(...) fields list. + for i := range t.srcFields { + fSrc := t.srcFields[i] + fDst := t.dstFields[i] + + if slices.Contains(byFields, fDst) { + return nil, fmt.Errorf("the %q field cannot be overridden at %q in the query [%s]", fDst, t, q) + } + if _, ok := metricFields[fDst]; ok { + if _, ok := metricFields[fSrc]; !ok { + delete(metricFields, fDst) + } + } + + if slices.Contains(byFields, fSrc) { + if !slices.Contains(byFields, fDst) { + byFields = append(byFields, fDst) + } + } + if _, ok := metricFields[fSrc]; ok { + metricFields[fDst] = struct{}{} } } - resultNames = append(resultNames, t.dstFields...) case *pipeRename: // Update by(...) fields with dst fields - for i, f := range t.srcFields { - if n := slices.Index(fields, f); n >= 0 { - fields[n] = t.dstFields[i] + for i := range t.srcFields { + fSrc := t.srcFields[i] + fDst := t.dstFields[i] + + if slices.Contains(byFields, fDst) { + return nil, fmt.Errorf("the %q field cannot be overridden at %q in the query [%s]", fDst, t, q) } - if n := slices.Index(resultNames, f); n >= 0 { - resultNames[n] = t.dstFields[i] + delete(metricFields, fDst) + + if n := slices.Index(byFields, fSrc); n >= 0 { + byFields[n] = fDst + } + if _, ok := metricFields[fSrc]; ok { + delete(metricFields, fSrc) + metricFields[fDst] = struct{}{} } } + case *pipeFormat: + // Assume that `| format ...` pipe generates an additional by(...) label + if slices.Contains(byFields, t.resultField) { + return nil, fmt.Errorf("the %q field cannot be overridden at %q in the query [%s]", t.resultField, t, q) + } else { + byFields = append(byFields, t.resultField) + } + delete(metricFields, t.resultField) default: return nil, fmt.Errorf("the %q pipe cannot be put after %q pipe in the query [%s]", p, ps, q) } } - return fields, nil + if len(metricFields) == 0 { + return nil, fmt.Errorf("missing metric fields in the results of query [%s]", q) + } + + return byFields, nil } func getLastPipeStatsIdx(pipes []pipe) int { @@ -734,13 +776,14 @@ func ParseQuery(s string) (*Query, error) { // ParseStatsQuery parses s with needed stats query checks. func ParseStatsQuery(s string) (*Query, error) { - timestamp := time.Now().UnixNano() - query, err := ParseQueryAtTimestamp(s, timestamp) + q, err := ParseQuery(s) if err != nil { return nil, err } - _, err = query.GetStatsByFields() - return query, err + if _, err := q.GetStatsByFields(); err != nil { + return nil, err + } + return q, nil } // ParseQueryAtTimestamp parses s in the context of the given timestamp. diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index a8fbc02ed..642b88364 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -2242,7 +2242,7 @@ func TestQueryGetStatsByFieldsAddGroupingByTime_Failure(t *testing.T) { f(`* | by (x) count() | keep x`) f(`* | stats by (host) count() total | fields total`) f(`* | stats by (host) count() total | delete host`) - f(`* | stats by (host) count() total | copy host as server`) + f(`* | stats by (host) count() total | copy total as host`) f(`* | stats by (host) count() total | rename host as server | fields host, total`) } @@ -2280,14 +2280,39 @@ func TestQueryGetStatsByFields_Success(t *testing.T) { // math pipe is allowed after stats f(`foo | stats by (x) count() total, count() if (error) errors | math errors / total`, []string{"x"}) + // derive math results + f(`foo | stats count() x | math x / 10 as y | rm x`, []string{}) + f(`foo | stats by (z) count() x | math x / 10 as y | rm x`, []string{"z"}) + + // keep containing all the by(...) fields + f(`foo | stats by (x) count() total | keep x, y, total`, []string{"x"}) + + // keep drops some metrics, but leaves others + f(`foo | stats by (x) count() y, count_uniq() z | keep x, z, abc`, []string{"x"}) + // drop which doesn't contain by(...) fields f(`foo | stats by (x) count() total | drop y`, []string{"x"}) + f(`foo | stats by (x) count() total, count_uniq() z | drop z`, []string{"x"}) // copy which doesn't contain by(...) fields f(`foo | stats by (x) count() total | copy total abc`, []string{"x"}) + // copy by(...) fields + f(`foo | stats by (x) count() | copy x y, y z`, []string{"x", "y", "z"}) + + // copy metrics + f(`foo | stats by (x) count() y | copy y z | drop y`, []string{"x"}) + // mv by(...) fields f(`foo | stats by (x) count() total | mv x y`, []string{"y"}) + + // mv metrics + f(`foo | stats by (x) count() y | mv y z`, []string{"x"}) + f(`foo | stats by (x) count() y | mv y z | rm y`, []string{"x"}) + + // format result is treated as by(...) field + f(`foo | count() | format "foobaz" as x`, []string{"x"}) + f(`foo | by (x) count() | format "foobaz" as y`, []string{"x", "y"}) } func TestQueryGetStatsByFields_Failure(t *testing.T) { @@ -2318,7 +2343,6 @@ func TestQueryGetStatsByFields_Failure(t *testing.T) { f(`foo | count() | field_names`) f(`foo | count() | field_values abc`) f(`foo | by (x) count() | fields a, b`) - f(`foo | count() | format "foobaz"`) f(`foo | count() | pack_json`) f(`foo | count() | pack_logfmt`) f(`foo | rename x y`) @@ -2332,6 +2356,31 @@ func TestQueryGetStatsByFields_Failure(t *testing.T) { f(`foo | count() | unpack_syslog`) f(`foo | count() | unroll by (x)`) + // drop by(...) field f(`* | by (x) count() as rows | math rows * 10, rows / 10 | drop x`) - f(`* | by (x) count() total | keep x, y`) + + // missing metric fields + f(`* | count() x | fields y`) + f(`* | by (x) count() y | fields x`) + + // math results override by(...) fields + f(`* | by (x) count() y | math y*100 as x`) + + // copy to existing by(...) field + f(`* | by (x) count() | cp a x`) + + // copy to the remaining metric field + f(`* | by (x) count() y | cp a y`) + + // mv to existing by(...) field + f(`* | by (x) count() | mv a x`) + + // mv to the remaining metric fields + f(`* | by (x) count() y | mv x y`) + + // format to by(...) field + f(`* | by (x) count() | format 'foo' as x`) + + // format to the remaining metric field + f(`* | by (x) count() y | format 'foo' as y`) } diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index d6fe4924f..2e269f85e 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -119,9 +119,6 @@ func (s *Storage) runQuery(ctx context.Context, tenantIDs []TenantID, q *Query, }) minTimestamp, maxTimestamp := q.GetFilterTimeRange() - if minTimestamp > maxTimestamp { - return fmt.Errorf("invalid query time range: minTimestamp=%d cannot be bigger than maxTimestamp=%d", minTimestamp, maxTimestamp) - } neededColumnNames, unneededColumnNames := q.getNeededColumns() so := &genericSearchOptions{