This commit is contained in:
Aliaksandr Valialkin 2024-05-18 17:52:53 +02:00
parent 22f35a7340
commit c4da926c9c
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
10 changed files with 459 additions and 94 deletions

View file

@ -107,7 +107,7 @@ func MustAddRows(lr *logstorage.LogRows) {
}
// RunQuery runs the given q and calls writeBlock for the returned data blocks
func RunQuery(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, writeBlock func(workerID uint, timestamps []int64, columns []logstorage.BlockColumn)) error {
func RunQuery(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, writeBlock logstorage.WriteBlockFunc) error {
return strg.RunQuery(ctx, tenantIDs, q, writeBlock)
}

View file

@ -19,6 +19,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta
## tip
* FEATURE: add ability to put arbitrary [queries](https://docs.victoriametrics.com/victorialogs/logsql/#query-syntax) inside [`in()` filter](https://docs.victoriametrics.com/victorialogs/logsql/#multi-exact-filter).
* FEATURE: add support for post-filtering of query results with [`filter` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#filter-pipe).
* FEATURE: allow applying individual [filters](https://docs.victoriametrics.com/victorialogs/logsql/#filters) per each [stats function](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe-functions). See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#stats-with-additional-filters).
* FEATURE: allow passing string values to [`min`](https://docs.victoriametrics.com/victorialogs/logsql/#min-stats) and [`max`](https://docs.victoriametrics.com/victorialogs/logsql/#max-stats) functions. Previously only numeric values could be passed to them.

View file

@ -652,16 +652,16 @@ log.level:in("error", "fatal")
It works very fast for long lists passed to `in()`.
The future VictoriaLogs versions will allow passing arbitrary [queries](#query-syntax) into `in()` filter.
For example, the following query selects all the logs for the last hour for users, who visited pages with `admin` [word](#word) in the `path`
It is possible to pass arbitrary [query](#query-syntax) inside `in(...)` filter in order to match against the results of this query.
The query inside `in(...)` must end with [`fields`](#fields-pipe) pipe containing a single field name, so VictoriaLogs could
fetch results from this field. For example, the following query selects all the logs for the last 5 minutes for users,
who visited pages with `admin` [word](#word) in the `path` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
during the last day:
```logsql
_time:1h AND user_id:in(_time:1d AND path:admin | fields user_id)
_time:5m AND user_id:in(_time:1d AND path:admin | fields user_id)
```
See the [Roadmap](https://docs.victoriametrics.com/VictoriaLogs/Roadmap.html) for details.
See also:
- [Exact filter](#exact-filter)

View file

@ -37,7 +37,6 @@ The following functionality is planned in the future versions of VictoriaLogs:
- Add missing functionality to [LogsQL](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html):
- [Stream context](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#stream-context).
- [Transformation functions](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#transformations).
- The ability to use subqueries inside [in()](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#multi-exact-filter) function.
- Live tailing for [LogsQL filters](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#filters) aka `tail -f`.
- Web UI with the following abilities:
- Explore the ingested logs ([partially done](https://docs.victoriametrics.com/VictoriaLogs/querying/#web-ui)).

View file

@ -14,3 +14,95 @@ type filter interface {
// applyToBlockResult must update bm according to the filter applied to the given br block
applyToBlockResult(br *blockResult, bm *bitmap)
}
// visitFilter sequentially calls visitFunc for filters inside f.
//
// It stops calling visitFunc on the remaining filters as soon as visitFunc returns true.
// It returns the result of the last visitFunc call.
func visitFilter(f filter, visitFunc func(f filter) bool) bool {
switch t := f.(type) {
case *filterAnd:
return visitFilters(t.filters, visitFunc)
case *filterOr:
return visitFilters(t.filters, visitFunc)
case *filterNot:
return visitFilter(t.f, visitFunc)
default:
return visitFunc(f)
}
}
// visitFilters calls visitFunc per each filter in filters.
//
// It stops calling visitFunc on the remaining filters as soon as visitFunc returns true.
// It returns the result of the last visitFunc call.
func visitFilters(filters []filter, visitFunc func(f filter) bool) bool {
for _, f := range filters {
if visitFilter(f, visitFunc) {
return true
}
}
return false
}
// copyFilter recursively copies f filters with the help of copyFunc if visitFunc returns true for them.
//
// It doesn't copy other filters by returning them as is.
func copyFilter(f filter, visitFunc func(f filter) bool, copyFunc func(f filter) (filter, error)) (filter, error) {
switch t := f.(type) {
case *filterAnd:
filters, err := copyFilters(t.filters, visitFunc, copyFunc)
if err != nil {
return nil, err
}
fa := &filterAnd{
filters: filters,
}
return fa, nil
case *filterOr:
filters, err := copyFilters(t.filters, visitFunc, copyFunc)
if err != nil {
return nil, err
}
fo := &filterOr{
filters: filters,
}
return fo, nil
case *filterNot:
f, err := copyFilter(t.f, visitFunc, copyFunc)
if err != nil {
return nil, err
}
fn := &filterNot{
f: f,
}
return fn, nil
default:
if !visitFunc(t) {
// Nothing to copy
return t, nil
}
return copyFunc(t)
}
}
// copyFilters recursively copies filters with the help of copyfunc if visitFunc returns true for them.
//
// It doesn't copy other filters by returning them as is.
func copyFilters(filters []filter, visitFunc func(f filter) bool, copyFunc func(f filter) (filter, error)) ([]filter, error) {
if !visitFilters(filters, visitFunc) {
// Nothing to copy
return filters, nil
}
// Copy filters.
filtersNew := make([]filter, len(filters))
for i, f := range filters {
fNew, err := copyFilter(f, visitFunc, copyFunc)
if err != nil {
return nil, err
}
filtersNew[i] = fNew
}
return filtersNew, nil
}

View file

@ -18,6 +18,15 @@ type filterIn struct {
fieldName string
values []string
// needeExecuteQuery is set to true if q must be executed for populating values before filter execution.
needExecuteQuery bool
// If q is non-nil, then values must be populated from q before filter execution.
q *Query
// qFieldName must be set to field name for obtaining values from if q is non-nil.
qFieldName string
tokenSetsOnce sync.Once
tokenSets [][]string
@ -47,12 +56,18 @@ type filterIn struct {
}
func (fi *filterIn) String() string {
values := fi.values
a := make([]string, len(values))
for i, value := range values {
a[i] = quoteTokenIfNeeded(value)
args := ""
if fi.q != nil {
args = fi.q.String()
} else {
values := fi.values
a := make([]string, len(values))
for i, value := range values {
a[i] = quoteTokenIfNeeded(value)
}
args = strings.Join(a, ",")
}
return fmt.Sprintf("%sin(%s)", quoteFieldNameIfNeeded(fi.fieldName), strings.Join(a, ","))
return fmt.Sprintf("%sin(%s)", quoteFieldNameIfNeeded(fi.fieldName), args)
}
func (fi *filterIn) updateNeededFields(neededFields fieldsSet) {

View file

@ -38,6 +38,20 @@ type lexer struct {
currentTimestamp int64
}
type lexerState struct {
lex lexer
}
func (lex *lexer) backupState() *lexerState {
return &lexerState{
lex: *lex,
}
}
func (lex *lexer) restoreState(ls *lexerState) {
*lex = ls.lex
}
// newLexer returns new lexer for the given s.
//
// The lex.token points to the first token in s.
@ -251,6 +265,30 @@ func (q *Query) Optimize() {
q.pipes = append(q.pipes[:0], q.pipes[1:]...)
}
}
// Optimize 'in(query)' filters
optimizeFilterIn(q.f)
for _, p := range q.pipes {
switch t := p.(type) {
case *pipeStats:
for _, f := range t.funcs {
if f.iff != nil {
optimizeFilterIn(f.iff)
}
}
}
}
}
func optimizeFilterIn(f filter) {
visitFunc := func(f filter) bool {
fi, ok := f.(*filterIn)
if ok && fi.q != nil {
fi.q.Optimize()
}
return false
}
_ = visitFilter(f, visitFunc)
}
func optimizeSortOffsetPipes(pipes []pipe) []pipe {
@ -355,7 +393,17 @@ func (q *Query) getNeededColumns() ([]string, []string) {
// ParseQuery parses s.
func ParseQuery(s string) (*Query, error) {
lex := newLexer(s)
q, err := parseQuery(lex)
if err != nil {
return nil, err
}
if !lex.isEnd() {
return nil, fmt.Errorf("unexpected unparsed tail; context: [%s]; tail: [%s]", lex.context(), lex.s)
}
return q, nil
}
func parseQuery(lex *lexer) (*Query, error) {
f, err := parseFilter(lex)
if err != nil {
return nil, fmt.Errorf("%w; context: [%s]", err, lex.context())
@ -370,10 +418,6 @@ func ParseQuery(s string) (*Query, error) {
}
q.pipes = pipes
if !lex.isEnd() {
return nil, fmt.Errorf("unexpected unparsed tail; context: [%s]; tail: [%s]", lex.context(), lex.s)
}
return q, nil
}
@ -538,7 +582,7 @@ func getCompoundFuncArg(lex *lexer) string {
rawArg := lex.rawToken
lex.nextToken()
suffix := ""
for !lex.isSkippedSpace && !lex.isKeyword("*", ",", ")", "") {
for !lex.isSkippedSpace && !lex.isKeyword("*", ",", ")", "|", "") {
suffix += lex.rawToken
lex.nextToken()
}
@ -759,13 +803,72 @@ func tryParseIPv4CIDR(s string) (uint32, uint32, bool) {
}
func parseFilterIn(lex *lexer, fieldName string) (filter, error) {
return parseFuncArgs(lex, fieldName, func(args []string) (filter, error) {
f := &filterIn{
if !lex.isKeyword("in") {
return nil, fmt.Errorf("expecting 'in' keyword")
}
// Try parsing in(arg1, ..., argN) at first
lexState := lex.backupState()
fi, err := parseFuncArgs(lex, fieldName, func(args []string) (filter, error) {
fi := &filterIn{
fieldName: fieldName,
values: args,
}
return f, nil
return fi, nil
})
if err == nil {
return fi, nil
}
// Parse in(query | fields someField) then
lex.restoreState(lexState)
lex.nextToken()
if !lex.isKeyword("(") {
return nil, fmt.Errorf("missing '(' after 'in'")
}
lex.nextToken()
q, err := parseQuery(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse query inside 'in(...)': %w", err)
}
if !lex.isKeyword(")") {
return nil, fmt.Errorf("missing ')' after 'in(%s)'", q)
}
lex.nextToken()
qFieldName, err := getFieldNameFromPipes(q.pipes)
if err != nil {
return nil, fmt.Errorf("cannot determine field name for values in 'in(%s)': %w", q, err)
}
fi = &filterIn{
fieldName: fieldName,
needExecuteQuery: true,
q: q,
qFieldName: qFieldName,
}
return fi, nil
}
func getFieldNameFromPipes(pipes []pipe) (string, error) {
if len(pipes) == 0 {
return "", fmt.Errorf("missing 'fields' or 'uniq' pipes at the end of query")
}
switch t := pipes[len(pipes)-1].(type) {
case *pipeFields:
if t.containsStar || len(t.fields) != 1 {
return "", fmt.Errorf("'%s' pipe must contain only a single non-star field name", t)
}
return t.fields[0], nil
case *pipeUniq:
if len(t.byFields) != 1 {
return "", fmt.Errorf("'%s' pipe must contain only a single non-star field name", t)
}
return t.byFields[0], nil
default:
return "", fmt.Errorf("missing 'fields' or 'uniq' pipe at the end of query")
}
}
func parseFilterSequence(lex *lexer, fieldName string) (filter, error) {

View file

@ -277,6 +277,10 @@ func TestParseFilterIn(t *testing.T) {
f(`:in("foo bar,baz")`, ``, []string{"foo bar,baz"})
f(`ip:in(1.2.3.4, 5.6.7.8, 9.10.11.12)`, `ip`, []string{"1.2.3.4", "5.6.7.8", "9.10.11.12"})
f(`foo-bar:in(foo,bar-baz.aa"bb","c,)d")`, `foo-bar`, []string{"foo", `bar-baz.aa"bb"`, "c,)d"})
// verify `in(query)` - it shouldn't set values
f(`in(x|fields foo)`, ``, nil)
f(`a:in(* | fields bar)`, `a`, nil)
}
func TestParseFilterIPv4Range(t *testing.T) {
@ -687,8 +691,8 @@ func TestParseQuerySuccess(t *testing.T) {
f("exact(foo*)", `exact(foo*)`)
f("exact('foo bar),|baz')", `exact("foo bar),|baz")`)
f("exact('foo bar),|baz'*)", `exact("foo bar),|baz"*)`)
f(`exact(foo|b:ar)`, `exact("foo|b:ar")`)
f(`foo:exact(foo|b:ar*)`, `foo:exact("foo|b:ar"*)`)
f(`exact(foo/b:ar)`, `exact("foo/b:ar")`)
f(`foo:exact(foo/b:ar*)`, `foo:exact("foo/b:ar"*)`)
// i filter
f("i(foo)", `i(foo)`)
@ -696,14 +700,19 @@ func TestParseQuerySuccess(t *testing.T) {
f("i(`foo`* )", `i(foo*)`)
f("i(' foo ) bar')", `i(" foo ) bar")`)
f("i('foo bar'*)", `i("foo bar"*)`)
f(`foo:i(foo:bar-baz|aa+bb)`, `foo:i("foo:bar-baz|aa+bb")`)
f(`foo:i(foo:bar-baz/aa+bb)`, `foo:i("foo:bar-baz/aa+bb")`)
// in filter
// in filter with values
f(`in()`, `in()`)
f(`in(foo)`, `in(foo)`)
f(`in(foo, bar)`, `in(foo,bar)`)
f(`in("foo bar", baz)`, `in("foo bar",baz)`)
f(`foo:in(foo-bar|baz)`, `foo:in("foo-bar|baz")`)
f(`foo:in(foo-bar/baz)`, `foo:in("foo-bar/baz")`)
// in filter with query
f(`in(err|fields x)`, `in(err | fields x)`)
f(`ip:in(foo and user:in(admin, moderator)|fields ip)`, `ip:in(foo user:in(admin,moderator) | fields ip)`)
f(`x:in(_time:5m y:in(*|fields z) | stats by (q) count() rows|fields q)`, `x:in(_time:5m y:in(* | fields z) | stats by (q) count(*) as rows | fields q)`)
// ipv4_range filter
f(`ipv4_range(1.2.3.4, "5.6.7.8")`, `ipv4_range(1.2.3.4, 5.6.7.8)`)
@ -743,7 +752,7 @@ func TestParseQuerySuccess(t *testing.T) {
// re filter
f("re('foo|ba(r.+)')", `re("foo|ba(r.+)")`)
f("re(foo)", `re("foo")`)
f(`foo:re(foo-bar|baz.)`, `foo:re("foo-bar|baz.")`)
f(`foo:re(foo-bar/baz.)`, `foo:re("foo-bar/baz.")`)
// seq filter
f(`seq()`, `seq()`)
@ -945,6 +954,7 @@ func TestParseQuerySuccess(t *testing.T) {
count() if (is_admin:true or _msg:"foo bar"*) as foo,
sum(duration) if (host:in('foo.com', 'bar.com') and path:/foobar) as bar`,
`* | stats by (_time:1d offset -2h, f2) count(*) if (is_admin:true or "foo bar"*) as foo, sum(duration) if (host:in(foo.com,bar.com) path:"/foobar") as bar`)
f(`* | stats count(x) if (error ip:in(_time:1d | fields ip)) rows`, `* | stats count(x) if (error ip:in(_time:1d | fields ip)) as rows`)
f(`* | stats count() if () rows`, `* | stats count(*) if () as rows`)
// sort pipe
@ -1116,6 +1126,10 @@ func TestParseQueryFailure(t *testing.T) {
f(`in(foo, "bar baz"*, abc)`)
f(`in(foo bar)`)
f(`in(foo, bar`)
f(`in(foo|bar)`)
f(`in(|foo`)
f(`in(x | limit 10)`)
f(`in(x | fields a,b)`)
// invalid ipv4_range
f(`ipv4_range(`)

View file

@ -2,12 +2,14 @@ package logstorage
import (
"context"
"fmt"
"math"
"slices"
"sort"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// genericSearchOptions contain options used for search.
@ -60,8 +62,21 @@ type searchOptions struct {
needAllColumns bool
}
// WriteBlockFunc must write a block with the given timestamps and columns.
//
// WriteBlockFunc cannot hold references to timestamps and columns after returning.
type WriteBlockFunc func(workerID uint, timestamps []int64, columns []BlockColumn)
// RunQuery runs the given q and calls writeBlock for results.
func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query, writeBlock func(workerID uint, timestamps []int64, columns []BlockColumn)) error {
func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query, writeBlock WriteBlockFunc) error {
qNew, err := s.initFilterInValues(ctx, tenantIDs, q)
if err != nil {
return err
}
return s.runQuery(ctx, tenantIDs, qNew, writeBlock)
}
func (s *Storage) runQuery(ctx context.Context, tenantIDs []TenantID, q *Query, writeBlock WriteBlockFunc) error {
neededColumnNames, unneededColumnNames := q.getNeededColumns()
so := &genericSearchOptions{
tenantIDs: tenantIDs,
@ -71,8 +86,6 @@ func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query,
needAllColumns: slices.Contains(neededColumnNames, "*"),
}
workersCount := cgroup.AvailableCPUs()
pp := newDefaultPipeProcessor(func(workerID uint, br *blockResult) {
brs := getBlockRows()
csDst := brs.cs
@ -90,6 +103,8 @@ func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query,
putBlockRows(brs)
})
workersCount := cgroup.AvailableCPUs()
ppMain := pp
stopCh := ctx.Done()
cancels := make([]func(), len(q.pipes))
@ -121,6 +136,159 @@ func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query,
return errFlush
}
// GetUniqueValuesForColumn returns unique values for the column with the given columnName returned by q for the given tenantIDs.
//
// If limit > 0, then up to limit unique values are returned. The values are returned in arbitrary order because of performance reasons.
// The caller may sort the returned values if needed.
func (s *Storage) GetUniqueValuesForColumn(ctx context.Context, tenantIDs []TenantID, q *Query, columnName string, limit uint64) ([]string, error) {
// add 'uniq columnName' to the end of q.pipes
if !endsWithPipeUniqSingleColumn(q.pipes, columnName) {
pipes := append([]pipe{}, q.pipes...)
pipes = append(pipes, &pipeUniq{
byFields: []string{columnName},
limit: limit,
})
q = &Query{
f: q.f,
pipes: pipes,
}
}
var values []string
var valuesLock sync.Mutex
writeBlock := func(workerID uint, timestamps []int64, columns []BlockColumn) {
if len(columns) != 1 {
logger.Panicf("BUG: expecting only a single column; got %d columns", len(columns))
}
valuesLock.Lock()
values = append(values, columns[0].Values...)
valuesLock.Unlock()
}
err := s.runQuery(ctx, tenantIDs, q, writeBlock)
if err != nil {
return nil, err
}
return values, nil
}
func endsWithPipeUniqSingleColumn(pipes []pipe, columnName string) bool {
if len(pipes) == 0 {
return false
}
pu, ok := pipes[len(pipes)-1].(*pipeUniq)
if !ok {
return false
}
return len(pu.byFields) == 1 && pu.byFields[0] == columnName
}
func (s *Storage) initFilterInValues(ctx context.Context, tenantIDs []TenantID, q *Query) (*Query, error) {
if !hasFilterInWithQueryForFilter(q.f) && !hasFilterInWithQueryForPipes(q.pipes) {
return q, nil
}
getUniqueValues := func(q *Query, columnName string) ([]string, error) {
return s.GetUniqueValuesForColumn(ctx, tenantIDs, q, columnName, 0)
}
cache := make(map[string][]string)
fNew, err := initFilterInValuesForFilter(cache, q.f, getUniqueValues)
if err != nil {
return nil, err
}
pipesNew, err := initFilterInValuesForPipes(cache, q.pipes, getUniqueValues)
if err != nil {
return nil, err
}
qNew := &Query{
f: fNew,
pipes: pipesNew,
}
return qNew, nil
}
func hasFilterInWithQueryForFilter(f filter) bool {
visitFunc := func(f filter) bool {
fi, ok := f.(*filterIn)
return ok && fi.needExecuteQuery
}
return visitFilter(f, visitFunc)
}
func hasFilterInWithQueryForPipes(pipes []pipe) bool {
for _, p := range pipes {
ps, ok := p.(*pipeStats)
if !ok {
continue
}
for _, f := range ps.funcs {
if f.iff != nil && hasFilterInWithQueryForFilter(f.iff) {
return true
}
}
}
return false
}
type getUniqueValuesFunc func(q *Query, columnName string) ([]string, error)
func initFilterInValuesForFilter(cache map[string][]string, f filter, getUniqueValuesFunc getUniqueValuesFunc) (filter, error) {
visitFunc := func(f filter) bool {
fi, ok := f.(*filterIn)
return ok && fi.needExecuteQuery
}
copyFunc := func(f filter) (filter, error) {
fi := f.(*filterIn)
qStr := fi.q.String()
values, ok := cache[qStr]
if !ok {
vs, err := getUniqueValuesFunc(fi.q, fi.qFieldName)
if err != nil {
return nil, fmt.Errorf("cannot obtain unique values for %s: %w", fi, err)
}
cache[qStr] = vs
values = vs
}
fiNew := &filterIn{
fieldName: fi.fieldName,
q: fi.q,
values: values,
}
return fiNew, nil
}
return copyFilter(f, visitFunc, copyFunc)
}
func initFilterInValuesForPipes(cache map[string][]string, pipes []pipe, getUniqueValuesFunc getUniqueValuesFunc) ([]pipe, error) {
pipesNew := make([]pipe, len(pipes))
for i, p := range pipes {
switch t := p.(type) {
case *pipeStats:
funcsNew := make([]pipeStatsFunc, len(t.funcs))
for j, f := range t.funcs {
if f.iff != nil {
fNew, err := initFilterInValuesForFilter(cache, f.iff, getUniqueValuesFunc)
if err != nil {
return nil, err
}
f.iff = fNew
}
funcsNew[j] = f
}
pipesNew[i] = &pipeStats{
byFields: t.byFields,
funcs: funcsNew,
}
default:
pipesNew[i] = p
}
}
return pipesNew, nil
}
type blockRows struct {
cs []BlockColumn
}
@ -169,7 +337,7 @@ type searchResultFunc func(workerID uint, br *blockResult)
// search searches for the matching rows according to so.
//
// It calls processBlockResult for each found matching block.
// It calls processBlockResult for each matching block.
func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-chan struct{}, processBlockResult searchResultFunc) {
// Spin up workers
var wgWorkers sync.WaitGroup
@ -294,60 +462,32 @@ func (pt *partition) search(ft *filterTime, sf *StreamFilter, f filter, so *gene
}
func hasStreamFilters(f filter) bool {
switch t := f.(type) {
case *filterAnd:
return hasStreamFiltersInList(t.filters)
case *filterOr:
return hasStreamFiltersInList(t.filters)
case *filterNot:
return hasStreamFilters(t.f)
case *filterStream:
return true
default:
return false
visitFunc := func(f filter) bool {
_, ok := f.(*filterStream)
return ok
}
}
func hasStreamFiltersInList(filters []filter) bool {
for _, f := range filters {
if hasStreamFilters(f) {
return true
}
}
return false
return visitFilter(f, visitFunc)
}
func initStreamFilters(tenantIDs []TenantID, idb *indexdb, f filter) filter {
switch t := f.(type) {
case *filterAnd:
return &filterAnd{
filters: initStreamFiltersList(tenantIDs, idb, t.filters),
}
case *filterOr:
return &filterOr{
filters: initStreamFiltersList(tenantIDs, idb, t.filters),
}
case *filterNot:
return &filterNot{
f: initStreamFilters(tenantIDs, idb, t.f),
}
case *filterStream:
return &filterStream{
f: t.f,
visitFunc := func(f filter) bool {
_, ok := f.(*filterStream)
return ok
}
copyFunc := func(f filter) (filter, error) {
fs := f.(*filterStream)
fsNew := &filterStream{
f: fs.f,
tenantIDs: tenantIDs,
idb: idb,
}
default:
return t
return fsNew, nil
}
}
func initStreamFiltersList(tenantIDs []TenantID, idb *indexdb, filters []filter) []filter {
result := make([]filter, len(filters))
for i, f := range filters {
result[i] = initStreamFilters(tenantIDs, idb, f)
f, err := copyFilter(f, visitFunc, copyFunc)
if err != nil {
logger.Panicf("BUG: unexpected error: %s", err)
}
return result
return f
}
func (ddb *datadb) search(so *searchOptions, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) partitionSearchFinalizer {

View file

@ -78,6 +78,14 @@ func TestStorageRunQuery(t *testing.T) {
}
s.debugFlush()
mustRunQuery := func(tenantIDs []TenantID, q *Query, writeBlock WriteBlockFunc) {
t.Helper()
err := s.RunQuery(context.Background(), tenantIDs, q, writeBlock)
if err != nil {
t.Fatalf("unexpected error returned from the query %s: %s", q, err)
}
}
// run tests on the storage data
t.Run("missing-tenant", func(_ *testing.T) {
q := mustParseQuery(`"log message"`)
@ -89,7 +97,7 @@ func TestStorageRunQuery(t *testing.T) {
panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
}
tenantIDs := []TenantID{tenantID}
checkErr(t, s.RunQuery(context.Background(), tenantIDs, q, writeBlock))
mustRunQuery(tenantIDs, q, writeBlock)
})
t.Run("missing-message-text", func(_ *testing.T) {
q := mustParseQuery(`foobar`)
@ -101,7 +109,7 @@ func TestStorageRunQuery(t *testing.T) {
panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
}
tenantIDs := []TenantID{tenantID}
checkErr(t, s.RunQuery(context.Background(), tenantIDs, q, writeBlock))
mustRunQuery(tenantIDs, q, writeBlock)
})
t.Run("matching-tenant-id", func(t *testing.T) {
q := mustParseQuery(`tenant.id:*`)
@ -135,7 +143,7 @@ func TestStorageRunQuery(t *testing.T) {
rowsCountTotal.Add(uint32(len(timestamps)))
}
tenantIDs := []TenantID{tenantID}
checkErr(t, s.RunQuery(context.Background(), tenantIDs, q, writeBlock))
mustRunQuery(tenantIDs, q, writeBlock)
expectedRowsCount := streamsPerTenant * blocksPerStream * rowsPerBlock
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
@ -149,7 +157,7 @@ func TestStorageRunQuery(t *testing.T) {
writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
rowsCountTotal.Add(uint32(len(timestamps)))
}
checkErr(t, s.RunQuery(context.Background(), allTenantIDs, q, writeBlock))
mustRunQuery(allTenantIDs, q, writeBlock)
expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
@ -162,7 +170,7 @@ func TestStorageRunQuery(t *testing.T) {
writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
rowsCountTotal.Add(uint32(len(timestamps)))
}
checkErr(t, s.RunQuery(context.Background(), allTenantIDs, q, writeBlock))
mustRunQuery(allTenantIDs, q, writeBlock)
expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
@ -174,7 +182,7 @@ func TestStorageRunQuery(t *testing.T) {
writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
}
checkErr(t, s.RunQuery(context.Background(), allTenantIDs, q, writeBlock))
mustRunQuery(allTenantIDs, q, writeBlock)
})
t.Run("matching-stream-id", func(t *testing.T) {
for i := 0; i < streamsPerTenant; i++ {
@ -208,7 +216,7 @@ func TestStorageRunQuery(t *testing.T) {
rowsCountTotal.Add(uint32(len(timestamps)))
}
tenantIDs := []TenantID{tenantID}
checkErr(t, s.RunQuery(context.Background(), tenantIDs, q, writeBlock))
mustRunQuery(tenantIDs, q, writeBlock)
expectedRowsCount := blocksPerStream * rowsPerBlock
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
@ -227,7 +235,7 @@ func TestStorageRunQuery(t *testing.T) {
rowsCountTotal.Add(uint32(len(timestamps)))
}
tenantIDs := []TenantID{tenantID}
checkErr(t, s.RunQuery(context.Background(), tenantIDs, q, writeBlock))
mustRunQuery(tenantIDs, q, writeBlock)
expectedRowsCount := streamsPerTenant * blocksPerStream * 2
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
@ -247,7 +255,7 @@ func TestStorageRunQuery(t *testing.T) {
rowsCountTotal.Add(uint32(len(timestamps)))
}
tenantIDs := []TenantID{tenantID}
checkErr(t, s.RunQuery(context.Background(), tenantIDs, q, writeBlock))
mustRunQuery(tenantIDs, q, writeBlock)
expectedRowsCount := streamsPerTenant * blocksPerStream
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
@ -267,7 +275,7 @@ func TestStorageRunQuery(t *testing.T) {
rowsCountTotal.Add(uint32(len(timestamps)))
}
tenantIDs := []TenantID{tenantID}
checkErr(t, s.RunQuery(context.Background(), tenantIDs, q, writeBlock))
mustRunQuery(tenantIDs, q, writeBlock)
expectedRowsCount := blocksPerStream
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
@ -286,7 +294,7 @@ func TestStorageRunQuery(t *testing.T) {
panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
}
tenantIDs := []TenantID{tenantID}
checkErr(t, s.RunQuery(context.Background(), tenantIDs, q, writeBlock))
mustRunQuery(tenantIDs, q, writeBlock)
})
t.Run("missing-time-range", func(_ *testing.T) {
minTimestamp := baseTimestamp + (rowsPerBlock+1)*1e9
@ -300,7 +308,7 @@ func TestStorageRunQuery(t *testing.T) {
panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
}
tenantIDs := []TenantID{tenantID}
checkErr(t, s.RunQuery(context.Background(), tenantIDs, q, writeBlock))
mustRunQuery(tenantIDs, q, writeBlock)
})
// Close the storage and delete its data
@ -308,13 +316,6 @@ func TestStorageRunQuery(t *testing.T) {
fs.MustRemoveAll(path)
}
func checkErr(t *testing.T, err error) {
t.Helper()
if err != nil {
t.Fatalf("unexpected err: %s", err)
}
}
func mustParseQuery(query string) *Query {
q, err := ParseQuery(query)
if err != nil {