lib/logstorage: improve error logging for incorrect queries passed to /select/logsql/stats_query and /select/logsql/stats_query_range functions

This commit is contained in:
Aliaksandr Valialkin 2024-09-08 11:24:44 +02:00
parent 1cd06ace5a
commit eaee2d7db4
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
4 changed files with 51 additions and 28 deletions

View file

@ -593,9 +593,8 @@ func ProcessStatsQueryRangeRequest(ctx context.Context, w http.ResponseWriter, r
// Obtain `by(...)` fields from the last `| stats` pipe in q.
// Add `_time:step` to the `by(...)` list.
byFields, ok := q.GetStatsByFields(int64(step))
if !ok {
err := fmt.Errorf("the query must end with '| stats ...'; got [%s]", q)
byFields, err := q.GetStatsByFieldsAddGroupingByTime(int64(step))
if err != nil {
httpserver.SendPrometheusError(w, r, err)
return
}
@ -706,9 +705,8 @@ func ProcessStatsQueryRequest(ctx context.Context, w http.ResponseWriter, r *htt
}
// Obtain `by(...)` fields from the last `| stats` pipe in q.
byFields, ok := q.GetStatsByFields(0)
if !ok {
err := fmt.Errorf("the query must end with '| stats ...'; got [%s]", q)
byFields, err := q.GetStatsByFields()
if err != nil {
httpserver.SendPrometheusError(w, r, err)
return
}

View file

@ -289,7 +289,7 @@ for the given [`query`](https://docs.victoriametrics.com/victorialogs/logsql/) a
in the format compatible with [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries).
The `<query>` must contain [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe). The calculated stats is converted into metrics
with labels enumerated in `by(...)` clause of the `| stats by(...)` pipe.
with labels from `by(...)` clause of the `| stats by(...)` pipe.
The `<t>` arg can contain values in [any supported format](https://docs.victoriametrics.com/#timestamp-formats).
If `<t>` is missing, then it equals to the current time.
@ -360,7 +360,7 @@ for the given [`query`](https://docs.victoriametrics.com/victorialogs/logsql/) o
The stats is returned in the format compatible with [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries).
The `<query>` must contain [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe). The calculated stats is converted into metrics
with labels enumerated in `by(...)` clause of the `| stats by(...)` pipe.
with labels from `by(...)` clause of the `| stats by(...)` pipe.
The `<start>` and `<end>` args can contain values in [any supported format](https://docs.victoriametrics.com/#timestamp-formats).
If `<start>` is missing, then it equals to the minimum timestamp across logs stored in VictoriaLogs.

View file

@ -455,16 +455,19 @@ func (q *Query) Optimize() {
}
// GetStatsByFields returns `by (...)` fields from the last `stats` pipe at q.
func (q *Query) GetStatsByFields() ([]string, error) {
return q.GetStatsByFieldsAddGroupingByTime(0)
}
// GetStatsByFieldsAddGroupingByTime returns `by (...)` fields from the last `stats` pipe at q.
//
// If step > 0, then _time:step field is added to the last `stats by(...)` pipe at q.
//
// False is returned if q doesn't contain safe `| stats ...` pipe.
func (q *Query) GetStatsByFields(step int64) ([]string, bool) {
// if step > 0, then _time:step is added to the last `stats by (...)` pipe at q.
func (q *Query) GetStatsByFieldsAddGroupingByTime(step int64) ([]string, error) {
pipes := q.pipes
idx := getLastPipeStatsIdx(pipes)
if idx < 0 {
return nil, false
return nil, fmt.Errorf("missing `| stats ...` pipe in the query [%s]", q)
}
ps := pipes[idx].(*pipeStats)
@ -491,21 +494,21 @@ func (q *Query) GetStatsByFields(step int64) ([]string, bool) {
// `| fields ...` pipe must contain all the by(...) fields, otherwise it breaks output.
for _, f := range fields {
if !slices.Contains(t.fields, f) {
return nil, false
return nil, fmt.Errorf("missing %q field at %q pipe in the query [%s]", f, p, q)
}
}
case *pipeDelete:
// Disallow deleting by(...) fields, since this breaks output.
for _, f := range t.fields {
if slices.Contains(fields, f) {
return nil, false
return nil, fmt.Errorf("the %q field cannot be deleted via %q in the query [%s]", f, p, q)
}
}
case *pipeCopy:
// Disallow copying by(...) fields, since this breaks output.
for _, f := range t.srcFields {
if slices.Contains(fields, f) {
return nil, false
return nil, fmt.Errorf("the %q field cannot be copied via %q in the query [%s]", f, p, q)
}
}
case *pipeRename:
@ -516,11 +519,11 @@ func (q *Query) GetStatsByFields(step int64) ([]string, bool) {
}
}
default:
return nil, false
return nil, fmt.Errorf("the %q pipe cannot be put after %q pipe in the query [%s]", p, ps, q)
}
}
return fields, true
return fields, nil
}
func getLastPipeStatsIdx(pipes []pipe) int {

View file

@ -2101,7 +2101,7 @@ func TestQueryDropAllPipes(t *testing.T) {
f(`foo | filter bar:baz | stats by (x) min(y)`, `foo bar:baz`)
}
func TestQueryGetStatsByFields_PositiveStep(t *testing.T) {
func TestQueryGetStatsByFieldsAddGroupingByTime_Success(t *testing.T) {
f := func(qStr string, step int64, fieldsExpected []string, qExpected string) {
t.Helper()
@ -2109,9 +2109,9 @@ func TestQueryGetStatsByFields_PositiveStep(t *testing.T) {
if err != nil {
t.Fatalf("cannot parse [%s]: %s", qStr, err)
}
fields, ok := q.GetStatsByFields(step)
if !ok {
t.Fatalf("cannot obtain byFields from the query [%s]", qStr)
fields, err := q.GetStatsByFieldsAddGroupingByTime(step)
if err != nil {
t.Fatalf("unexpected error in GetStatsByFieldsAddGroupingByTime(): %s", err)
}
if !reflect.DeepEqual(fields, fieldsExpected) {
t.Fatalf("unexpected byFields;\ngot\n%q\nwant\n%q", fields, fieldsExpected)
@ -2130,6 +2130,28 @@ func TestQueryGetStatsByFields_PositiveStep(t *testing.T) {
f(`* | by (_time:1m offset 30s,level) count() x, count_uniq(z) y`, nsecsPerDay, []string{"_time", "level"}, `* | stats by (_time:86400000000000, level) count(*) as x, count_uniq(z) as y`)
}
func TestQueryGetStatsByFieldsAddGroupingByTime_Failure(t *testing.T) {
f := func(qStr string) {
t.Helper()
q, err := ParseQuery(qStr)
if err != nil {
t.Fatalf("cannot parse [%s]: %s", qStr, err)
}
fields, err := q.GetStatsByFieldsAddGroupingByTime(nsecsPerHour)
if err == nil {
t.Fatalf("expecting non-nil error")
}
if fields != nil {
t.Fatalf("unexpected non-nil fields: %q", fields)
}
}
f(`*`)
f(`_time:5m | count() | drop _time`)
f(`* | by (x) count() | keep x`)
}
func TestQueryGetStatsByFields_Success(t *testing.T) {
f := func(qStr string, fieldsExpected []string) {
t.Helper()
@ -2138,9 +2160,9 @@ func TestQueryGetStatsByFields_Success(t *testing.T) {
if err != nil {
t.Fatalf("cannot parse [%s]: %s", qStr, err)
}
fields, ok := q.GetStatsByFields(0)
if !ok {
t.Fatalf("cannot obtain byFields from the query [%s]", qStr)
fields, err := q.GetStatsByFields()
if err != nil {
t.Fatalf("unexpected error in GetStatsByFields(): %s", err)
}
if !reflect.DeepEqual(fields, fieldsExpected) {
t.Fatalf("unexpected byFields;\ngot\n%q\nwant\n%q", fields, fieldsExpected)
@ -2185,9 +2207,9 @@ func TestQueryGetStatsByFields_Failure(t *testing.T) {
if err != nil {
t.Fatalf("cannot parse [%s]: %s", qStr, err)
}
fields, ok := q.GetStatsByFields(0)
if ok {
t.Fatalf("expecting failure to get byFields for the query [%s]", qStr)
fields, err := q.GetStatsByFields()
if err == nil {
t.Fatalf("expecting non-nil error")
}
if fields != nil {
t.Fatalf("expectig nil fields; got %q", fields)