This commit is contained in:
Aliaksandr Valialkin 2024-05-18 23:58:01 +02:00
parent bf2a031382
commit d87635cca4
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
4 changed files with 26 additions and 23 deletions

View file

@ -28,7 +28,7 @@ func ProcessFieldNamesRequest(ctx context.Context, w http.ResponseWriter, r *htt
q.Optimize() q.Optimize()
fieldNames, err := vlstorage.GetFieldNames(ctx, tenantIDs, q) fieldNames, err := vlstorage.GetFieldNames(ctx, tenantIDs, q)
if err != nil { if err != nil {
httpserver.Errorf(w, r, "cannot obtain field names: %w", err) httpserver.Errorf(w, r, "cannot obtain field names: %s", err)
return return
} }

View file

@ -6,7 +6,7 @@ import (
// filterTime filters by time. // filterTime filters by time.
// //
// It is expressed as `_time:(start, end]` in LogsQL. // It is expressed as `_time:[start, end]` in LogsQL.
type filterTime struct { type filterTime struct {
// mintimestamp is the minimum timestamp in nanoseconds to find // mintimestamp is the minimum timestamp in nanoseconds to find
minTimestamp int64 minTimestamp int64

View file

@ -268,7 +268,7 @@ func (q *Query) Optimize() {
} }
} }
// Optimize `q | field_names ...` by marking pipeFieldNames as first pipe // Optimize `q | field_names ...` by marking pipeFieldNames as first pipe.
if len(q.pipes) > 0 { if len(q.pipes) > 0 {
pf, ok := q.pipes[0].(*pipeFieldNames) pf, ok := q.pipes[0].(*pipeFieldNames)
if ok { if ok {
@ -276,10 +276,10 @@ func (q *Query) Optimize() {
} }
} }
// Substitute '*' prefixFilter with filterNoop. // Substitute '*' prefixFilter with filterNoop in order to avoid reading _msg data.
q.f = removeStarFilters(q.f) q.f = removeStarFilters(q.f)
// Optimize 'in(query)' filters // Call Optimize for queries from 'in(query)' filters.
optimizeFilterIn(q.f) optimizeFilterIn(q.f)
for _, p := range q.pipes { for _, p := range q.pipes {
switch t := p.(type) { switch t := p.(type) {

View file

@ -397,18 +397,19 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch
}(uint(i)) }(uint(i))
} }
// Obtain common time filter from so.filter // Obtain time range from so.filter
ft, f := getCommonFilterTime(so.filter) f := so.filter
minTimestamp, maxTimestamp := getFilterTimeRange(f)
// Select partitions according to the selected time range // Select partitions according to the selected time range
s.partitionsLock.Lock() s.partitionsLock.Lock()
ptws := s.partitions ptws := s.partitions
minDay := ft.minTimestamp / nsecPerDay minDay := minTimestamp / nsecPerDay
n := sort.Search(len(ptws), func(i int) bool { n := sort.Search(len(ptws), func(i int) bool {
return ptws[i].day >= minDay return ptws[i].day >= minDay
}) })
ptws = ptws[n:] ptws = ptws[n:]
maxDay := ft.maxTimestamp / nsecPerDay maxDay := maxTimestamp / nsecPerDay
n = sort.Search(len(ptws), func(i int) bool { n = sort.Search(len(ptws), func(i int) bool {
return ptws[i].day > maxDay return ptws[i].day > maxDay
}) })
@ -429,7 +430,7 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch
partitionSearchConcurrencyLimitCh <- struct{}{} partitionSearchConcurrencyLimitCh <- struct{}{}
wgSearchers.Add(1) wgSearchers.Add(1)
go func(idx int, pt *partition) { go func(idx int, pt *partition) {
psfs[idx] = pt.search(ft, sf, f, so, workCh, stopCh) psfs[idx] = pt.search(minTimestamp, maxTimestamp, sf, f, so, workCh, stopCh)
wgSearchers.Done() wgSearchers.Done()
<-partitionSearchConcurrencyLimitCh <-partitionSearchConcurrencyLimitCh
}(i, ptw.pt) }(i, ptw.pt)
@ -458,7 +459,7 @@ var partitionSearchConcurrencyLimitCh = make(chan struct{}, cgroup.AvailableCPUs
type partitionSearchFinalizer func() type partitionSearchFinalizer func()
func (pt *partition) search(ft *filterTime, sf *StreamFilter, f filter, so *genericSearchOptions, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) partitionSearchFinalizer { func (pt *partition) search(minTimestamp, maxTimestamp int64, sf *StreamFilter, f filter, so *genericSearchOptions, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) partitionSearchFinalizer {
if needStop(stopCh) { if needStop(stopCh) {
// Do not spend CPU time on search, since it is already stopped. // Do not spend CPU time on search, since it is already stopped.
return func() {} return func() {}
@ -476,8 +477,8 @@ func (pt *partition) search(ft *filterTime, sf *StreamFilter, f filter, so *gene
soInternal := &searchOptions{ soInternal := &searchOptions{
tenantIDs: tenantIDs, tenantIDs: tenantIDs,
streamIDs: streamIDs, streamIDs: streamIDs,
minTimestamp: ft.minTimestamp, minTimestamp: minTimestamp,
maxTimestamp: ft.maxTimestamp, maxTimestamp: maxTimestamp,
filter: f, filter: f,
neededColumnNames: so.neededColumnNames, neededColumnNames: so.neededColumnNames,
unneededColumnNames: so.unneededColumnNames, unneededColumnNames: so.unneededColumnNames,
@ -813,23 +814,25 @@ func getCommonStreamFilter(f filter) (*StreamFilter, filter) {
return nil, f return nil, f
} }
func getCommonFilterTime(f filter) (*filterTime, filter) { func getFilterTimeRange(f filter) (int64, int64) {
switch t := f.(type) { switch t := f.(type) {
case *filterAnd: case *filterAnd:
minTimestamp := int64(math.MinInt64)
maxTimestamp := int64(math.MaxInt64)
for _, filter := range t.filters { for _, filter := range t.filters {
ft, ok := filter.(*filterTime) ft, ok := filter.(*filterTime)
if ok { if ok {
// The ft must remain in t.filters order to properly filter out rows outside the selected time range if ft.minTimestamp > minTimestamp {
return ft, f minTimestamp = ft.minTimestamp
}
if ft.maxTimestamp < maxTimestamp {
maxTimestamp = ft.maxTimestamp
}
} }
} }
return minTimestamp, maxTimestamp
case *filterTime: case *filterTime:
return t, f return t.minTimestamp, t.maxTimestamp
} }
return allFilterTime, f return math.MinInt64, math.MaxInt64
}
var allFilterTime = &filterTime{
minTimestamp: math.MinInt64,
maxTimestamp: math.MaxInt64,
} }