This commit is contained in:
Aliaksandr Valialkin 2024-04-29 04:37:57 +02:00
parent 7ae71cab2e
commit b6a1576e0d
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
7 changed files with 128 additions and 162 deletions

View file

@ -72,45 +72,6 @@ func (sf *streamFilter) apply(bs *blockSearch, bm *bitmap) {
} }
} }
// timeFilter filters by time.
//
// It is expressed as `_time:(start, end]` in LogsQL.
type timeFilter struct {
minTimestamp int64
maxTimestamp int64
stringRepr string
}
func (tf *timeFilter) String() string {
return "_time:" + tf.stringRepr
}
func (tf *timeFilter) apply(bs *blockSearch, bm *bitmap) {
minTimestamp := tf.minTimestamp
maxTimestamp := tf.maxTimestamp
if minTimestamp > maxTimestamp {
bm.resetBits()
return
}
th := bs.bsw.bh.timestampsHeader
if minTimestamp > th.maxTimestamp || maxTimestamp < th.minTimestamp {
bm.resetBits()
return
}
if minTimestamp <= th.minTimestamp && maxTimestamp >= th.maxTimestamp {
return
}
timestamps := bs.getTimestamps()
bm.forEachSetBit(func(idx int) bool {
ts := timestamps[idx]
return ts >= minTimestamp && ts <= maxTimestamp
})
}
// sequenceFilter matches an ordered sequence of phrases // sequenceFilter matches an ordered sequence of phrases
// //
// Example LogsQL: `fieldName:seq(foo, "bar baz")` // Example LogsQL: `fieldName:seq(foo, "bar baz")`

View file

@ -480,84 +480,6 @@ func TestComplexFilters(t *testing.T) {
testFilterMatchForColumns(t, columns, f, "foo", []int{1, 3, 6}) testFilterMatchForColumns(t, columns, f, "foo", []int{1, 3, 6})
} }
func TestTimeFilter(t *testing.T) {
timestamps := []int64{
1,
9,
123,
456,
789,
}
// match
tf := &timeFilter{
minTimestamp: -10,
maxTimestamp: 1,
}
testFilterMatchForTimestamps(t, timestamps, tf, []int{0})
tf = &timeFilter{
minTimestamp: -10,
maxTimestamp: 10,
}
testFilterMatchForTimestamps(t, timestamps, tf, []int{0, 1})
tf = &timeFilter{
minTimestamp: 1,
maxTimestamp: 1,
}
testFilterMatchForTimestamps(t, timestamps, tf, []int{0})
tf = &timeFilter{
minTimestamp: 2,
maxTimestamp: 456,
}
testFilterMatchForTimestamps(t, timestamps, tf, []int{1, 2, 3})
tf = &timeFilter{
minTimestamp: 2,
maxTimestamp: 457,
}
testFilterMatchForTimestamps(t, timestamps, tf, []int{1, 2, 3})
tf = &timeFilter{
minTimestamp: 120,
maxTimestamp: 788,
}
testFilterMatchForTimestamps(t, timestamps, tf, []int{2, 3})
tf = &timeFilter{
minTimestamp: 120,
maxTimestamp: 789,
}
testFilterMatchForTimestamps(t, timestamps, tf, []int{2, 3, 4})
tf = &timeFilter{
minTimestamp: 120,
maxTimestamp: 10000,
}
testFilterMatchForTimestamps(t, timestamps, tf, []int{2, 3, 4})
tf = &timeFilter{
minTimestamp: 789,
maxTimestamp: 1000,
}
testFilterMatchForTimestamps(t, timestamps, tf, []int{4})
// mismatch
tf = &timeFilter{
minTimestamp: -1000,
maxTimestamp: 0,
}
testFilterMatchForTimestamps(t, timestamps, tf, nil)
tf = &timeFilter{
minTimestamp: 790,
maxTimestamp: 1000,
}
testFilterMatchForTimestamps(t, timestamps, tf, nil)
}
func TestStreamFilter(t *testing.T) { func TestStreamFilter(t *testing.T) {
columns := []column{ columns := []column{
{ {

View file

@ -0,0 +1,83 @@
package logstorage
import (
"testing"
)
func TestFilterTime(t *testing.T) {
timestamps := []int64{
1,
9,
123,
456,
789,
}
// match
ft := &filterTime{
minTimestamp: -10,
maxTimestamp: 1,
}
testFilterMatchForTimestamps(t, timestamps, ft, []int{0})
ft = &filterTime{
minTimestamp: -10,
maxTimestamp: 10,
}
testFilterMatchForTimestamps(t, timestamps, ft, []int{0, 1})
ft = &filterTime{
minTimestamp: 1,
maxTimestamp: 1,
}
testFilterMatchForTimestamps(t, timestamps, ft, []int{0})
ft = &filterTime{
minTimestamp: 2,
maxTimestamp: 456,
}
testFilterMatchForTimestamps(t, timestamps, ft, []int{1, 2, 3})
ft = &filterTime{
minTimestamp: 2,
maxTimestamp: 457,
}
testFilterMatchForTimestamps(t, timestamps, ft, []int{1, 2, 3})
ft = &filterTime{
minTimestamp: 120,
maxTimestamp: 788,
}
testFilterMatchForTimestamps(t, timestamps, ft, []int{2, 3})
ft = &filterTime{
minTimestamp: 120,
maxTimestamp: 789,
}
testFilterMatchForTimestamps(t, timestamps, ft, []int{2, 3, 4})
ft = &filterTime{
minTimestamp: 120,
maxTimestamp: 10000,
}
testFilterMatchForTimestamps(t, timestamps, ft, []int{2, 3, 4})
ft = &filterTime{
minTimestamp: 789,
maxTimestamp: 1000,
}
testFilterMatchForTimestamps(t, timestamps, ft, []int{4})
// mismatch
ft = &filterTime{
minTimestamp: -1000,
maxTimestamp: 0,
}
testFilterMatchForTimestamps(t, timestamps, ft, nil)
ft = &filterTime{
minTimestamp: 790,
maxTimestamp: 1000,
}
testFilterMatchForTimestamps(t, timestamps, ft, nil)
}

View file

@ -432,7 +432,7 @@ func parseFilterForPhrase(lex *lexer, phrase, fieldName string) (filter, error)
} }
switch fieldName { switch fieldName {
case "_time": case "_time":
return parseTimeFilterWithOffset(lex) return parseFilterTimeWithOffset(lex)
case "_stream": case "_stream":
return parseStreamFilter(lex) return parseStreamFilter(lex)
default: default:
@ -812,30 +812,30 @@ func startsWithYear(s string) bool {
return c == '-' || c == '+' || c == 'Z' || c == 'z' return c == '-' || c == '+' || c == 'Z' || c == 'z'
} }
func parseTimeFilterWithOffset(lex *lexer) (*timeFilter, error) { func parseFilterTimeWithOffset(lex *lexer) (*filterTime, error) {
tf, err := parseTimeFilter(lex) ft, err := parseFilterTime(lex)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if !lex.isKeyword("offset") { if !lex.isKeyword("offset") {
return tf, nil return ft, nil
} }
if !lex.mustNextToken() { if !lex.mustNextToken() {
return nil, fmt.Errorf("missing offset for _time filter %s", tf) return nil, fmt.Errorf("missing offset for _time filter %s", ft)
} }
s := getCompoundToken(lex) s := getCompoundToken(lex)
d, err := promutils.ParseDuration(s) d, err := promutils.ParseDuration(s)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse offset for _time filter %s: %w", tf, err) return nil, fmt.Errorf("cannot parse offset for _time filter %s: %w", ft, err)
} }
offset := int64(d) offset := int64(d)
tf.minTimestamp -= offset ft.minTimestamp -= offset
tf.maxTimestamp -= offset ft.maxTimestamp -= offset
tf.stringRepr += " offset " + s ft.stringRepr += " offset " + s
return tf, nil return ft, nil
} }
func parseTimeFilter(lex *lexer) (*timeFilter, error) { func parseFilterTime(lex *lexer) (*filterTime, error) {
startTimeInclude := false startTimeInclude := false
switch { switch {
case lex.isKeyword("["): case lex.isKeyword("["):
@ -853,13 +853,13 @@ func parseTimeFilter(lex *lexer) (*timeFilter, error) {
} }
startTime := int64(t * 1e9) startTime := int64(t * 1e9)
endTime := getMatchingEndTime(startTime, s) endTime := getMatchingEndTime(startTime, s)
tf := &timeFilter{ ft := &filterTime{
minTimestamp: startTime, minTimestamp: startTime,
maxTimestamp: endTime, maxTimestamp: endTime,
stringRepr: s, stringRepr: s,
} }
return tf, nil return ft, nil
} }
// Parse _time:duration, which transforms to '_time:(now-duration, now]' // Parse _time:duration, which transforms to '_time:(now-duration, now]'
d, err := promutils.ParseDuration(s) d, err := promutils.ParseDuration(s)
@ -869,13 +869,13 @@ func parseTimeFilter(lex *lexer) (*timeFilter, error) {
if d < 0 { if d < 0 {
d = -d d = -d
} }
tf := &timeFilter{ ft := &filterTime{
minTimestamp: lex.currentTimestamp - int64(d), minTimestamp: lex.currentTimestamp - int64(d),
maxTimestamp: lex.currentTimestamp, maxTimestamp: lex.currentTimestamp,
stringRepr: s, stringRepr: s,
} }
return tf, nil return ft, nil
} }
if !lex.mustNextToken() { if !lex.mustNextToken() {
return nil, fmt.Errorf("missing start time in _time filter") return nil, fmt.Errorf("missing start time in _time filter")
@ -926,13 +926,13 @@ func parseTimeFilter(lex *lexer) (*timeFilter, error) {
endTime-- endTime--
} }
tf := &timeFilter{ ft := &filterTime{
minTimestamp: startTime, minTimestamp: startTime,
maxTimestamp: endTime, maxTimestamp: endTime,
stringRepr: stringRepr, stringRepr: stringRepr,
} }
return tf, nil return ft, nil
} }
func getMatchingEndTime(startTime int64, stringRepr string) int64 { func getMatchingEndTime(startTime int64, stringRepr string) int64 {

View file

@ -86,14 +86,14 @@ func TestParseTimeDuration(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("unexpected error: %s", err) t.Fatalf("unexpected error: %s", err)
} }
tf, ok := q.f.(*timeFilter) ft, ok := q.f.(*filterTime)
if !ok { if !ok {
t.Fatalf("unexpected filter; got %T; want *timeFilter; filter: %s", q.f, q.f) t.Fatalf("unexpected filter; got %T; want *filterTime; filter: %s", q.f, q.f)
} }
if tf.stringRepr != s { if ft.stringRepr != s {
t.Fatalf("unexpected string represenation for timeFilter; got %q; want %q", tf.stringRepr, s) t.Fatalf("unexpected string represenation for filterTime; got %q; want %q", ft.stringRepr, s)
} }
duration := time.Duration(tf.maxTimestamp - tf.minTimestamp) duration := time.Duration(ft.maxTimestamp - ft.minTimestamp)
if duration != durationExpected { if duration != durationExpected {
t.Fatalf("unexpected duration; got %s; want %s", duration, durationExpected) t.Fatalf("unexpected duration; got %s; want %s", duration, durationExpected)
} }
@ -114,18 +114,18 @@ func TestParseTimeRange(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("unexpected error: %s", err) t.Fatalf("unexpected error: %s", err)
} }
tf, ok := q.f.(*timeFilter) ft, ok := q.f.(*filterTime)
if !ok { if !ok {
t.Fatalf("unexpected filter; got %T; want *timeFilter; filter: %s", q.f, q.f) t.Fatalf("unexpected filter; got %T; want *filterTime; filter: %s", q.f, q.f)
} }
if tf.stringRepr != s { if ft.stringRepr != s {
t.Fatalf("unexpected string represenation for timeFilter; got %q; want %q", tf.stringRepr, s) t.Fatalf("unexpected string represenation for filterTime; got %q; want %q", ft.stringRepr, s)
} }
if tf.minTimestamp != minTimestampExpected { if ft.minTimestamp != minTimestampExpected {
t.Fatalf("unexpected minTimestamp; got %s; want %s", timestampToString(tf.minTimestamp), timestampToString(minTimestampExpected)) t.Fatalf("unexpected minTimestamp; got %s; want %s", timestampToString(ft.minTimestamp), timestampToString(minTimestampExpected))
} }
if tf.maxTimestamp != maxTimestampExpected { if ft.maxTimestamp != maxTimestampExpected {
t.Fatalf("unexpected maxTimestamp; got %s; want %s", timestampToString(tf.maxTimestamp), timestampToString(maxTimestampExpected)) t.Fatalf("unexpected maxTimestamp; got %s; want %s", timestampToString(ft.maxTimestamp), timestampToString(maxTimestampExpected))
} }
} }

View file

@ -248,17 +248,17 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch
} }
// Obtain common time filter from so.filter // Obtain common time filter from so.filter
tf, f := getCommonTimeFilter(so.filter) ft, f := getCommonFilterTime(so.filter)
// Select partitions according to the selected time range // Select partitions according to the selected time range
s.partitionsLock.Lock() s.partitionsLock.Lock()
ptws := s.partitions ptws := s.partitions
minDay := tf.minTimestamp / nsecPerDay minDay := ft.minTimestamp / nsecPerDay
n := sort.Search(len(ptws), func(i int) bool { n := sort.Search(len(ptws), func(i int) bool {
return ptws[i].day >= minDay return ptws[i].day >= minDay
}) })
ptws = ptws[n:] ptws = ptws[n:]
maxDay := tf.maxTimestamp / nsecPerDay maxDay := ft.maxTimestamp / nsecPerDay
n = sort.Search(len(ptws), func(i int) bool { n = sort.Search(len(ptws), func(i int) bool {
return ptws[i].day > maxDay return ptws[i].day > maxDay
}) })
@ -279,7 +279,7 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch
partitionSearchConcurrencyLimitCh <- struct{}{} partitionSearchConcurrencyLimitCh <- struct{}{}
wgSearchers.Add(1) wgSearchers.Add(1)
go func(idx int, pt *partition) { go func(idx int, pt *partition) {
psfs[idx] = pt.search(tf, sf, f, so, workCh, stopCh) psfs[idx] = pt.search(ft, sf, f, so, workCh, stopCh)
wgSearchers.Done() wgSearchers.Done()
<-partitionSearchConcurrencyLimitCh <-partitionSearchConcurrencyLimitCh
}(i, ptw.pt) }(i, ptw.pt)
@ -308,7 +308,7 @@ var partitionSearchConcurrencyLimitCh = make(chan struct{}, cgroup.AvailableCPUs
type partitionSearchFinalizer func() type partitionSearchFinalizer func()
func (pt *partition) search(tf *timeFilter, sf *StreamFilter, f filter, so *genericSearchOptions, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) partitionSearchFinalizer { func (pt *partition) search(ft *filterTime, sf *StreamFilter, f filter, so *genericSearchOptions, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) partitionSearchFinalizer {
select { select {
case <-stopCh: case <-stopCh:
// Do not spend CPU time on search, since it is already stopped. // Do not spend CPU time on search, since it is already stopped.
@ -328,8 +328,8 @@ func (pt *partition) search(tf *timeFilter, sf *StreamFilter, f filter, so *gene
soInternal := &searchOptions{ soInternal := &searchOptions{
tenantIDs: tenantIDs, tenantIDs: tenantIDs,
streamIDs: streamIDs, streamIDs: streamIDs,
minTimestamp: tf.minTimestamp, minTimestamp: ft.minTimestamp,
maxTimestamp: tf.maxTimestamp, maxTimestamp: ft.maxTimestamp,
filter: f, filter: f,
resultColumnNames: so.resultColumnNames, resultColumnNames: so.resultColumnNames,
needAllColumns: so.needAllColumns, needAllColumns: so.needAllColumns,
@ -713,23 +713,23 @@ func getCommonStreamFilter(f filter) (*StreamFilter, filter) {
return nil, f return nil, f
} }
func getCommonTimeFilter(f filter) (*timeFilter, filter) { func getCommonFilterTime(f filter) (*filterTime, filter) {
switch t := f.(type) { switch t := f.(type) {
case *filterAnd: case *filterAnd:
for _, filter := range t.filters { for _, filter := range t.filters {
tf, ok := filter.(*timeFilter) ft, ok := filter.(*filterTime)
if ok { if ok {
// The tf must remain in t.filters order to properly filter out rows outside the selected time range // The ft must remain in t.filters order to properly filter out rows outside the selected time range
return tf, f return ft, f
} }
} }
case *timeFilter: case *filterTime:
return t, f return t, f
} }
return allTimeFilter, f return allFilterTime, f
} }
var allTimeFilter = &timeFilter{ var allFilterTime = &filterTime{
minTimestamp: math.MinInt64, minTimestamp: math.MinInt64,
maxTimestamp: math.MaxInt64, maxTimestamp: math.MaxInt64,
} }

View file

@ -379,7 +379,7 @@ func TestStorageSearch(t *testing.T) {
getBaseFilter := func(minTimestamp, maxTimestamp int64, sf *StreamFilter) filter { getBaseFilter := func(minTimestamp, maxTimestamp int64, sf *StreamFilter) filter {
var filters []filter var filters []filter
filters = append(filters, &timeFilter{ filters = append(filters, &filterTime{
minTimestamp: minTimestamp, minTimestamp: minTimestamp,
maxTimestamp: maxTimestamp, maxTimestamp: maxTimestamp,
}) })