diff --git a/lib/logstorage/filter.go b/lib/logstorage/filter.go index 93a2e6005..ba4e445e2 100644 --- a/lib/logstorage/filter.go +++ b/lib/logstorage/filter.go @@ -72,45 +72,6 @@ func (sf *streamFilter) apply(bs *blockSearch, bm *bitmap) { } } -// timeFilter filters by time. -// -// It is expressed as `_time:(start, end]` in LogsQL. -type timeFilter struct { - minTimestamp int64 - maxTimestamp int64 - - stringRepr string -} - -func (tf *timeFilter) String() string { - return "_time:" + tf.stringRepr -} - -func (tf *timeFilter) apply(bs *blockSearch, bm *bitmap) { - minTimestamp := tf.minTimestamp - maxTimestamp := tf.maxTimestamp - - if minTimestamp > maxTimestamp { - bm.resetBits() - return - } - - th := bs.bsw.bh.timestampsHeader - if minTimestamp > th.maxTimestamp || maxTimestamp < th.minTimestamp { - bm.resetBits() - return - } - if minTimestamp <= th.minTimestamp && maxTimestamp >= th.maxTimestamp { - return - } - - timestamps := bs.getTimestamps() - bm.forEachSetBit(func(idx int) bool { - ts := timestamps[idx] - return ts >= minTimestamp && ts <= maxTimestamp - }) -} - // sequenceFilter matches an ordered sequence of phrases // // Example LogsQL: `fieldName:seq(foo, "bar baz")` diff --git a/lib/logstorage/filter_test.go b/lib/logstorage/filter_test.go index 2779f5095..ece7cffbb 100644 --- a/lib/logstorage/filter_test.go +++ b/lib/logstorage/filter_test.go @@ -480,84 +480,6 @@ func TestComplexFilters(t *testing.T) { testFilterMatchForColumns(t, columns, f, "foo", []int{1, 3, 6}) } -func TestTimeFilter(t *testing.T) { - timestamps := []int64{ - 1, - 9, - 123, - 456, - 789, - } - - // match - tf := &timeFilter{ - minTimestamp: -10, - maxTimestamp: 1, - } - testFilterMatchForTimestamps(t, timestamps, tf, []int{0}) - - tf = &timeFilter{ - minTimestamp: -10, - maxTimestamp: 10, - } - testFilterMatchForTimestamps(t, timestamps, tf, []int{0, 1}) - - tf = &timeFilter{ - minTimestamp: 1, - maxTimestamp: 1, - } - testFilterMatchForTimestamps(t, timestamps, tf, []int{0}) - - tf = &timeFilter{ - minTimestamp: 2, - maxTimestamp: 456, - } - testFilterMatchForTimestamps(t, timestamps, tf, []int{1, 2, 3}) - - tf = &timeFilter{ - minTimestamp: 2, - maxTimestamp: 457, - } - testFilterMatchForTimestamps(t, timestamps, tf, []int{1, 2, 3}) - - tf = &timeFilter{ - minTimestamp: 120, - maxTimestamp: 788, - } - testFilterMatchForTimestamps(t, timestamps, tf, []int{2, 3}) - - tf = &timeFilter{ - minTimestamp: 120, - maxTimestamp: 789, - } - testFilterMatchForTimestamps(t, timestamps, tf, []int{2, 3, 4}) - - tf = &timeFilter{ - minTimestamp: 120, - maxTimestamp: 10000, - } - testFilterMatchForTimestamps(t, timestamps, tf, []int{2, 3, 4}) - - tf = &timeFilter{ - minTimestamp: 789, - maxTimestamp: 1000, - } - testFilterMatchForTimestamps(t, timestamps, tf, []int{4}) - - // mismatch - tf = &timeFilter{ - minTimestamp: -1000, - maxTimestamp: 0, - } - testFilterMatchForTimestamps(t, timestamps, tf, nil) - - tf = &timeFilter{ - minTimestamp: 790, - maxTimestamp: 1000, - } - testFilterMatchForTimestamps(t, timestamps, tf, nil) -} - func TestStreamFilter(t *testing.T) { columns := []column{ { diff --git a/lib/logstorage/filter_time_test.go b/lib/logstorage/filter_time_test.go new file mode 100644 index 000000000..6ac1e20c5 --- /dev/null +++ b/lib/logstorage/filter_time_test.go @@ -0,0 +1,83 @@ +package logstorage + +import ( + "testing" +) + +func TestFilterTime(t *testing.T) { + timestamps := []int64{ + 1, + 9, + 123, + 456, + 789, + } + + // match + ft := &filterTime{ + minTimestamp: -10, + maxTimestamp: 1, + } + testFilterMatchForTimestamps(t, timestamps, ft, []int{0}) + + ft = &filterTime{ + minTimestamp: -10, + maxTimestamp: 10, + } + testFilterMatchForTimestamps(t, timestamps, ft, []int{0, 1}) + + ft = &filterTime{ + minTimestamp: 1, + maxTimestamp: 1, + } + testFilterMatchForTimestamps(t, timestamps, ft, []int{0}) + + ft = &filterTime{ + minTimestamp: 2, + maxTimestamp: 456, + } + testFilterMatchForTimestamps(t, timestamps, ft, []int{1, 2, 3}) + + ft = &filterTime{ + minTimestamp: 2, + maxTimestamp: 457, + } + testFilterMatchForTimestamps(t, timestamps, ft, []int{1, 2, 3}) + + ft = &filterTime{ + minTimestamp: 120, + maxTimestamp: 788, + } + testFilterMatchForTimestamps(t, timestamps, ft, []int{2, 3}) + + ft = &filterTime{ + minTimestamp: 120, + maxTimestamp: 789, + } + testFilterMatchForTimestamps(t, timestamps, ft, []int{2, 3, 4}) + + ft = &filterTime{ + minTimestamp: 120, + maxTimestamp: 10000, + } + testFilterMatchForTimestamps(t, timestamps, ft, []int{2, 3, 4}) + + ft = &filterTime{ + minTimestamp: 789, + maxTimestamp: 1000, + } + testFilterMatchForTimestamps(t, timestamps, ft, []int{4}) + + // mismatch + ft = &filterTime{ + minTimestamp: -1000, + maxTimestamp: 0, + } + testFilterMatchForTimestamps(t, timestamps, ft, nil) + + ft = &filterTime{ + minTimestamp: 790, + maxTimestamp: 1000, + } + testFilterMatchForTimestamps(t, timestamps, ft, nil) +} diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index 2072f33dd..e8cd88711 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -432,7 +432,7 @@ func parseFilterForPhrase(lex *lexer, phrase, fieldName string) (filter, error) } switch fieldName { case "_time": - return parseTimeFilterWithOffset(lex) + return parseFilterTimeWithOffset(lex) case "_stream": return parseStreamFilter(lex) default: @@ -812,30 +812,30 @@ func startsWithYear(s string) bool { return c == '-' || c == '+' || c == 'Z' || c == 'z' } -func parseTimeFilterWithOffset(lex *lexer) (*timeFilter, error) { - tf, err := parseTimeFilter(lex) +func parseFilterTimeWithOffset(lex *lexer) (*filterTime, error) { + ft, err := parseFilterTime(lex) if err != nil { return nil, err } if !lex.isKeyword("offset") { - return tf, nil + return ft, nil } if !lex.mustNextToken() { - return nil, fmt.Errorf("missing offset for _time filter %s", tf) + return nil, fmt.Errorf("missing offset for _time filter %s", ft) } s := getCompoundToken(lex) d, err := promutils.ParseDuration(s) if err != nil { - return nil, fmt.Errorf("cannot parse offset for _time filter %s: %w", tf, err) + return nil, fmt.Errorf("cannot parse offset for _time filter %s: %w", ft, err) } offset := int64(d) - tf.minTimestamp -= offset - tf.maxTimestamp -= offset - tf.stringRepr += " offset " + s - return tf, nil + ft.minTimestamp -= offset + ft.maxTimestamp -= offset + ft.stringRepr += " offset " + s + return ft, nil } -func parseTimeFilter(lex *lexer) (*timeFilter, error) { +func parseFilterTime(lex *lexer) (*filterTime, error) { startTimeInclude := false switch { case lex.isKeyword("["): @@ -853,13 +853,13 @@ func parseTimeFilter(lex *lexer) (*timeFilter, error) { } startTime := int64(t * 1e9) endTime := getMatchingEndTime(startTime, s) - tf := &timeFilter{ + ft := &filterTime{ minTimestamp: startTime, maxTimestamp: endTime, stringRepr: s, } - return tf, nil + return ft, nil } // Parse _time:duration, which transforms to '_time:(now-duration, now]' d, err := promutils.ParseDuration(s) @@ -869,13 +869,13 @@ func parseTimeFilter(lex *lexer) (*timeFilter, error) { if d < 0 { d = -d } - tf := &timeFilter{ + ft := &filterTime{ minTimestamp: lex.currentTimestamp - int64(d), maxTimestamp: lex.currentTimestamp, stringRepr: s, } - return tf, nil + return ft, nil } if !lex.mustNextToken() { return nil, fmt.Errorf("missing start time in _time filter") @@ -926,13 +926,13 @@ func parseTimeFilter(lex *lexer) (*timeFilter, error) { endTime-- } - tf := &timeFilter{ + ft := &filterTime{ minTimestamp: startTime, maxTimestamp: endTime, stringRepr: stringRepr, } - return tf, nil + return ft, nil } func getMatchingEndTime(startTime int64, stringRepr string) int64 { diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 6d2901a4d..25e1846a0 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -86,14 +86,14 @@ func TestParseTimeDuration(t *testing.T) { if err != nil { t.Fatalf("unexpected error: %s", err) } - tf, ok := q.f.(*timeFilter) + ft, ok := q.f.(*filterTime) if !ok { - t.Fatalf("unexpected filter; got %T; want *timeFilter; filter: %s", q.f, q.f) + t.Fatalf("unexpected filter; got %T; want *filterTime; filter: %s", q.f, q.f) } - if tf.stringRepr != s { - t.Fatalf("unexpected string represenation for timeFilter; got %q; want %q", tf.stringRepr, s) + if ft.stringRepr != s { + t.Fatalf("unexpected string represenation for filterTime; got %q; want %q", ft.stringRepr, s) } - duration := time.Duration(tf.maxTimestamp - tf.minTimestamp) + duration := time.Duration(ft.maxTimestamp - ft.minTimestamp) if duration != durationExpected { t.Fatalf("unexpected duration; got %s; want %s", duration, durationExpected) } @@ -114,18 +114,18 @@ func TestParseTimeRange(t *testing.T) { if err != nil { t.Fatalf("unexpected error: %s", err) } - tf, ok := q.f.(*timeFilter) + ft, ok := q.f.(*filterTime) if !ok { - t.Fatalf("unexpected filter; got %T; want *timeFilter; filter: %s", q.f, q.f) + t.Fatalf("unexpected filter; got %T; want *filterTime; filter: %s", q.f, q.f) } - if tf.stringRepr != s { - t.Fatalf("unexpected string represenation for timeFilter; got %q; want %q", tf.stringRepr, s) + if ft.stringRepr != s { + t.Fatalf("unexpected string represenation for filterTime; got %q; want %q", ft.stringRepr, s) } - if tf.minTimestamp != minTimestampExpected { - t.Fatalf("unexpected minTimestamp; got %s; want %s", timestampToString(tf.minTimestamp), timestampToString(minTimestampExpected)) + if ft.minTimestamp != minTimestampExpected { + t.Fatalf("unexpected minTimestamp; got %s; want %s", timestampToString(ft.minTimestamp), timestampToString(minTimestampExpected)) } - if tf.maxTimestamp != maxTimestampExpected { - t.Fatalf("unexpected maxTimestamp; got %s; want %s", timestampToString(tf.maxTimestamp), timestampToString(maxTimestampExpected)) + if ft.maxTimestamp != maxTimestampExpected { + t.Fatalf("unexpected maxTimestamp; got %s; want %s", timestampToString(ft.maxTimestamp), timestampToString(maxTimestampExpected)) } } diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index b09bfba85..e94b46972 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -248,17 +248,17 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch } // Obtain common time filter from so.filter - tf, f := getCommonTimeFilter(so.filter) + ft, f := getCommonFilterTime(so.filter) // Select partitions according to the selected time range s.partitionsLock.Lock() ptws := s.partitions - minDay := tf.minTimestamp / nsecPerDay + minDay := ft.minTimestamp / nsecPerDay n := sort.Search(len(ptws), func(i int) bool { return ptws[i].day >= minDay }) ptws = ptws[n:] - maxDay := tf.maxTimestamp / nsecPerDay + maxDay := ft.maxTimestamp / nsecPerDay n = sort.Search(len(ptws), func(i int) bool { return ptws[i].day > maxDay }) @@ -279,7 +279,7 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch partitionSearchConcurrencyLimitCh <- struct{}{} wgSearchers.Add(1) go func(idx int, pt *partition) { - psfs[idx] = pt.search(tf, sf, f, so, workCh, stopCh) + psfs[idx] = pt.search(ft, sf, f, so, workCh, stopCh) wgSearchers.Done() <-partitionSearchConcurrencyLimitCh }(i, ptw.pt) @@ -308,7 +308,7 @@ var partitionSearchConcurrencyLimitCh = make(chan struct{}, cgroup.AvailableCPUs type partitionSearchFinalizer func() -func (pt *partition) search(tf *timeFilter, sf *StreamFilter, f filter, so *genericSearchOptions, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) partitionSearchFinalizer { +func (pt *partition) search(ft *filterTime, sf *StreamFilter, f filter, so *genericSearchOptions, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) partitionSearchFinalizer { select { case <-stopCh: // Do not spend CPU time on search, since it is already stopped. @@ -328,8 +328,8 @@ func (pt *partition) search(tf *timeFilter, sf *StreamFilter, f filter, so *gene soInternal := &searchOptions{ tenantIDs: tenantIDs, streamIDs: streamIDs, - minTimestamp: tf.minTimestamp, - maxTimestamp: tf.maxTimestamp, + minTimestamp: ft.minTimestamp, + maxTimestamp: ft.maxTimestamp, filter: f, resultColumnNames: so.resultColumnNames, needAllColumns: so.needAllColumns, @@ -713,23 +713,23 @@ func getCommonStreamFilter(f filter) (*StreamFilter, filter) { return nil, f } -func getCommonTimeFilter(f filter) (*timeFilter, filter) { +func getCommonFilterTime(f filter) (*filterTime, filter) { switch t := f.(type) { case *filterAnd: for _, filter := range t.filters { - tf, ok := filter.(*timeFilter) + ft, ok := filter.(*filterTime) if ok { - // The tf must remain in t.filters order to properly filter out rows outside the selected time range - return tf, f + // The ft must remain in t.filters order to properly filter out rows outside the selected time range + return ft, f } } - case *timeFilter: + case *filterTime: return t, f } - return allTimeFilter, f + return allFilterTime, f } -var allTimeFilter = &timeFilter{ +var allFilterTime = &filterTime{ minTimestamp: math.MinInt64, maxTimestamp: math.MaxInt64, } diff --git a/lib/logstorage/storage_search_test.go b/lib/logstorage/storage_search_test.go index 067b143ee..c2a0ad56f 100644 --- a/lib/logstorage/storage_search_test.go +++ b/lib/logstorage/storage_search_test.go @@ -379,7 +379,7 @@ func TestStorageSearch(t *testing.T) { getBaseFilter := func(minTimestamp, maxTimestamp int64, sf *StreamFilter) filter { var filters []filter - filters = append(filters, &timeFilter{ + filters = append(filters, &filterTime{ minTimestamp: minTimestamp, maxTimestamp: maxTimestamp, })