lib/logstorage: properly take into account the end query arg when calculating time range for _time:duration filters

This commit is contained in:
Aliaksandr Valialkin 2024-11-08 15:47:14 +01:00
parent a98fb495c6
commit e5537bc64d
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
6 changed files with 127 additions and 44 deletions

View file

@ -962,14 +962,29 @@ func parseCommonArgs(r *http.Request) (*logstorage.Query, []logstorage.TenantID,
} }
tenantIDs := []logstorage.TenantID{tenantID} tenantIDs := []logstorage.TenantID{tenantID}
// Parse optional start and end args
start, okStart, err := getTimeNsec(r, "start")
if err != nil {
return nil, nil, err
}
end, okEnd, err := getTimeNsec(r, "end")
if err != nil {
return nil, nil, err
}
// Parse optional time arg // Parse optional time arg
timestamp, okTime, err := getTimeNsec(r, "time") timestamp, okTime, err := getTimeNsec(r, "time")
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
if !okTime { if !okTime {
// If time arg is missing, then evaluate query at the current timestamp // If time arg is missing, then evaluate query either at the end timestamp (if it is set)
timestamp = time.Now().UnixNano() // or at the current timestamp (if end query arg isn't set)
if okEnd {
timestamp = end
} else {
timestamp = time.Now().UnixNano()
}
} }
// decrease timestamp by one nanosecond in order to avoid capturing logs belonging // decrease timestamp by one nanosecond in order to avoid capturing logs belonging
@ -983,16 +998,8 @@ func parseCommonArgs(r *http.Request) (*logstorage.Query, []logstorage.TenantID,
return nil, nil, fmt.Errorf("cannot parse query [%s]: %s", qStr, err) return nil, nil, fmt.Errorf("cannot parse query [%s]: %s", qStr, err)
} }
// Parse optional start and end args
start, okStart, err := getTimeNsec(r, "start")
if err != nil {
return nil, nil, err
}
end, okEnd, err := getTimeNsec(r, "end")
if err != nil {
return nil, nil, err
}
if okStart || okEnd { if okStart || okEnd {
// Add _time:[start, end] filter if start or end args were set.
if !okStart { if !okStart {
start = math.MinInt64 start = math.MinInt64
} }

View file

@ -71,7 +71,7 @@ func (t *Type) ValidateExpr(expr string) error {
return fmt.Errorf("bad prometheus expr: %q, err: %w", expr, err) return fmt.Errorf("bad prometheus expr: %q, err: %w", expr, err)
} }
case "vlogs": case "vlogs":
if _, err := logstorage.ParseStatsQuery(expr); err != nil { if _, err := logstorage.ParseStatsQuery(expr, 0); err != nil {
return fmt.Errorf("bad LogsQL expr: %q, err: %w", expr, err) return fmt.Errorf("bad LogsQL expr: %q, err: %w", expr, err)
} }
default: default:

View file

@ -10,6 +10,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
) )
@ -221,6 +222,9 @@ func setIntervalAsTimeFilter(dType, expr string) bool {
if dType != "vlogs" { if dType != "vlogs" {
return false return false
} }
q, _ := logstorage.ParseStatsQuery(expr) q, err := logstorage.ParseStatsQuery(expr, 0)
return !q.ContainAnyTimeFilter() if err != nil {
logger.Panicf("BUG: the LogsQL query must be valid here; got error: %s; query=[%s]", err, expr)
}
return !q.HasGlobalTimeFilter()
} }

View file

@ -18,6 +18,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
* FEATURE: [`join` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#join-pipe): add an ability to add prefix to all the log field names from the joined query, by using `| join by (<by_fields>) (<query>) prefix "some_prefix"` syntax. * FEATURE: [`join` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#join-pipe): add an ability to add prefix to all the log field names from the joined query, by using `| join by (<by_fields>) (<query>) prefix "some_prefix"` syntax.
* FEATURE: [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter): allow specifying offset without time range. For example, `_time:offset 1d` matches all the logs until `now-1d` in the [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field). This is useful when building graphs for time ranges with some offset in the past. * FEATURE: [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter): allow specifying offset without time range. For example, `_time:offset 1d` matches all the logs until `now-1d` in the [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field). This is useful when building graphs for time ranges with some offset in the past.
* BUGFIX: [HTTP querying APIs](https://docs.victoriametrics.com/victorialogs/querying/#http-api): properly take into account the `end` query arg when calculating time range for [`_time:duration` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter). Previously the `_time:duration` filter was treated as `_time:[now-duration, now)`, while it should be treated as `_time:[end-duration, end)`.
## [v0.41.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.41.0-victorialogs) ## [v0.41.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.41.0-victorialogs)
Released at 2024-11-06 Released at 2024-11-06

View file

@ -531,7 +531,7 @@ func (q *Query) AddPipeLimit(n uint64) {
}) })
} }
// optimize tries optimizing the query. // optimize applies various optimations to q.
func (q *Query) optimize() { func (q *Query) optimize() {
q.pipes = optimizeSortOffsetPipes(q.pipes) q.pipes = optimizeSortOffsetPipes(q.pipes)
q.pipes = optimizeSortLimitPipes(q.pipes) q.pipes = optimizeSortLimitPipes(q.pipes)
@ -555,6 +555,12 @@ func (q *Query) optimize() {
} }
} }
// flatten nested AND filters
q.f = flattenFiltersAnd(q.f)
// flatten nested OR filters
q.f = flattenFiltersOr(q.f)
// Substitute '*' prefixFilter with filterNoop in order to avoid reading _msg data. // Substitute '*' prefixFilter with filterNoop in order to avoid reading _msg data.
q.f = removeStarFilters(q.f) q.f = removeStarFilters(q.f)
} }
@ -734,6 +740,78 @@ func addByTimeField(byFields []*byStatsField, step int64) []*byStatsField {
return dstFields return dstFields
} }
func flattenFiltersAnd(f filter) filter {
visitFunc := func(f filter) bool {
fa, ok := f.(*filterAnd)
if !ok {
return false
}
for _, f := range fa.filters {
if _, ok := f.(*filterAnd); ok {
return true
}
}
return false
}
copyFunc := func(f filter) (filter, error) {
fa := f.(*filterAnd)
var resultFilters []filter
for _, f := range fa.filters {
child, ok := f.(*filterAnd)
if !ok {
resultFilters = append(resultFilters, f)
continue
}
resultFilters = append(resultFilters, child.filters...)
}
return &filterAnd{
filters: resultFilters,
}, nil
}
f, err := copyFilter(f, visitFunc, copyFunc)
if err != nil {
logger.Fatalf("BUG: unexpected error: %s", err)
}
return f
}
func flattenFiltersOr(f filter) filter {
visitFunc := func(f filter) bool {
fo, ok := f.(*filterOr)
if !ok {
return false
}
for _, f := range fo.filters {
if _, ok := f.(*filterOr); ok {
return true
}
}
return false
}
copyFunc := func(f filter) (filter, error) {
fo := f.(*filterOr)
var resultFilters []filter
for _, f := range fo.filters {
child, ok := f.(*filterOr)
if !ok {
resultFilters = append(resultFilters, f)
continue
}
resultFilters = append(resultFilters, child.filters...)
}
return &filterOr{
filters: resultFilters,
}, nil
}
f, err := copyFilter(f, visitFunc, copyFunc)
if err != nil {
logger.Fatalf("BUG: unexpected error: %s", err)
}
return f
}
func removeStarFilters(f filter) filter { func removeStarFilters(f filter) filter {
// Substitute `*` filterPrefix with filterNoop // Substitute `*` filterPrefix with filterNoop
visitFunc := func(f filter) bool { visitFunc := func(f filter) bool {
@ -915,9 +993,9 @@ func ParseQuery(s string) (*Query, error) {
return ParseQueryAtTimestamp(s, timestamp) return ParseQueryAtTimestamp(s, timestamp)
} }
// ParseStatsQuery parses s with needed stats query checks. // ParseStatsQuery parses LogsQL query s at the given timestamp with the needed stats query checks.
func ParseStatsQuery(s string) (*Query, error) { func ParseStatsQuery(s string, timestamp int64) (*Query, error) {
q, err := ParseQuery(s) q, err := ParseQueryAtTimestamp(s, timestamp)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -927,29 +1005,12 @@ func ParseStatsQuery(s string) (*Query, error) {
return q, nil return q, nil
} }
// ContainAnyTimeFilter returns true when query contains a global time filter. // HasGlobalTimeFilter returns true when query contains a global time filter.
func (q *Query) ContainAnyTimeFilter() bool { func (q *Query) HasGlobalTimeFilter() bool {
if hasTimeFilter(q.f) { switch t := q.f.(type) {
return true
}
for _, p := range q.pipes {
if pf, ok := p.(*pipeFilter); ok {
if hasTimeFilter(pf.f) {
return true
}
}
}
return false
}
func hasTimeFilter(f filter) bool {
if f == nil {
return false
}
switch t := f.(type) {
case *filterAnd: case *filterAnd:
for _, subF := range t.filters { for _, subF := range t.filters {
if hasTimeFilter(subF) { if _, ok := subF.(*filterTime); ok {
return true return true
} }
} }

View file

@ -700,6 +700,14 @@ func TestParseQuerySuccess(t *testing.T) {
f("level: ( ((error or warn*) and re(foo))) (not (bar))", `(level:error or level:warn*) level:~foo !bar`) f("level: ( ((error or warn*) and re(foo))) (not (bar))", `(level:error or level:warn*) level:~foo !bar`)
f("!(foo bar or baz and not aa*)", `!(foo bar or baz !aa*)`) f("!(foo bar or baz and not aa*)", `!(foo bar or baz !aa*)`)
// nested AND filters
f(`(foo AND bar) AND (baz AND x:y)`, `foo bar baz x:y`)
f(`(foo AND bar) OR (baz AND x:y)`, `foo bar or baz x:y`)
// nested OR filters
f(`(foo OR bar) OR (baz OR x:y)`, `foo or bar or baz or x:y`)
f(`(foo OR bar) AND (baz OR x:y)`, `(foo or bar) (baz or x:y)`)
// prefix search // prefix search
f(`'foo'* and (a:x* and x:* or y:i(""*)) and i("abc def"*)`, `foo* (a:x* x:* or y:i(*)) i("abc def"*)`) f(`'foo'* and (a:x* and x:* or y:i(""*)) and i("abc def"*)`, `foo* (a:x* x:* or y:i(*)) i("abc def"*)`)
@ -2421,16 +2429,17 @@ func TestQueryGetStatsByFields_Failure(t *testing.T) {
f(`* | by (x) count() y | format 'foo' as y`) f(`* | by (x) count() y | format 'foo' as y`)
} }
func TestHasTimeFilter(t *testing.T) { func TestQueryHasGlobalTimeFilter(t *testing.T) {
f := func(qStr string, expected bool) { f := func(qStr string, resultExpected bool) {
t.Helper() t.Helper()
q, err := ParseStatsQuery(qStr) q, err := ParseStatsQuery(qStr, 0)
if err != nil { if err != nil {
t.Fatalf("cannot parse [%s]: %s", qStr, err) t.Fatalf("cannot parse [%s]: %s", qStr, err)
} }
if q.ContainAnyTimeFilter() != expected { result := q.HasGlobalTimeFilter()
t.Fatalf("unexpected result for hasTimeFilter(%q); want %v", qStr, expected) if result != resultExpected {
t.Fatalf("unexpected result for hasTimeFilter(%q); got %v; want %v", qStr, result, resultExpected)
} }
} }