From d87635cca422bf3e937b596804d6fd70b42c447d Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 18 May 2024 23:58:01 +0200 Subject: [PATCH] wip --- app/vlselect/logsql/logsql.go | 2 +- lib/logstorage/filter_time.go | 2 +- lib/logstorage/parser.go | 6 ++--- lib/logstorage/storage_search.go | 39 +++++++++++++++++--------------- 4 files changed, 26 insertions(+), 23 deletions(-) diff --git a/app/vlselect/logsql/logsql.go b/app/vlselect/logsql/logsql.go index b078179a83..de438009ae 100644 --- a/app/vlselect/logsql/logsql.go +++ b/app/vlselect/logsql/logsql.go @@ -28,7 +28,7 @@ func ProcessFieldNamesRequest(ctx context.Context, w http.ResponseWriter, r *htt q.Optimize() fieldNames, err := vlstorage.GetFieldNames(ctx, tenantIDs, q) if err != nil { - httpserver.Errorf(w, r, "cannot obtain field names: %w", err) + httpserver.Errorf(w, r, "cannot obtain field names: %s", err) return } diff --git a/lib/logstorage/filter_time.go b/lib/logstorage/filter_time.go index 155a812442..976ce6891b 100644 --- a/lib/logstorage/filter_time.go +++ b/lib/logstorage/filter_time.go @@ -6,7 +6,7 @@ import ( // filterTime filters by time. // -// It is expressed as `_time:(start, end]` in LogsQL. +// It is expressed as `_time:[start, end]` in LogsQL. type filterTime struct { // mintimestamp is the minimum timestamp in nanoseconds to find minTimestamp int64 diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index af98534215..69b850cb55 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -268,7 +268,7 @@ func (q *Query) Optimize() { } } - // Optimize `q | field_names ...` by marking pipeFieldNames as first pipe + // Optimize `q | field_names ...` by marking pipeFieldNames as first pipe. if len(q.pipes) > 0 { pf, ok := q.pipes[0].(*pipeFieldNames) if ok { @@ -276,10 +276,10 @@ func (q *Query) Optimize() { } } - // Substitute '*' prefixFilter with filterNoop. + // Substitute '*' prefixFilter with filterNoop in order to avoid reading _msg data. q.f = removeStarFilters(q.f) - // Optimize 'in(query)' filters + // Call Optimize for queries from 'in(query)' filters. optimizeFilterIn(q.f) for _, p := range q.pipes { switch t := p.(type) { diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index dc80be15a7..6ed305ecaa 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -397,18 +397,19 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch }(uint(i)) } - // Obtain common time filter from so.filter - ft, f := getCommonFilterTime(so.filter) + // Obtain time range from so.filter + f := so.filter + minTimestamp, maxTimestamp := getFilterTimeRange(f) // Select partitions according to the selected time range s.partitionsLock.Lock() ptws := s.partitions - minDay := ft.minTimestamp / nsecPerDay + minDay := minTimestamp / nsecPerDay n := sort.Search(len(ptws), func(i int) bool { return ptws[i].day >= minDay }) ptws = ptws[n:] - maxDay := ft.maxTimestamp / nsecPerDay + maxDay := maxTimestamp / nsecPerDay n = sort.Search(len(ptws), func(i int) bool { return ptws[i].day > maxDay }) @@ -429,7 +430,7 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch partitionSearchConcurrencyLimitCh <- struct{}{} wgSearchers.Add(1) go func(idx int, pt *partition) { - psfs[idx] = pt.search(ft, sf, f, so, workCh, stopCh) + psfs[idx] = pt.search(minTimestamp, maxTimestamp, sf, f, so, workCh, stopCh) wgSearchers.Done() <-partitionSearchConcurrencyLimitCh }(i, ptw.pt) @@ -458,7 +459,7 @@ var partitionSearchConcurrencyLimitCh = make(chan struct{}, cgroup.AvailableCPUs type partitionSearchFinalizer func() -func (pt *partition) search(ft *filterTime, sf *StreamFilter, f filter, so *genericSearchOptions, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) partitionSearchFinalizer { +func (pt *partition) search(minTimestamp, maxTimestamp int64, sf *StreamFilter, f filter, so *genericSearchOptions, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) partitionSearchFinalizer { if needStop(stopCh) { // Do not spend CPU time on search, since it is already stopped. return func() {} @@ -476,8 +477,8 @@ func (pt *partition) search(ft *filterTime, sf *StreamFilter, f filter, so *gene soInternal := &searchOptions{ tenantIDs: tenantIDs, streamIDs: streamIDs, - minTimestamp: ft.minTimestamp, - maxTimestamp: ft.maxTimestamp, + minTimestamp: minTimestamp, + maxTimestamp: maxTimestamp, filter: f, neededColumnNames: so.neededColumnNames, unneededColumnNames: so.unneededColumnNames, @@ -813,23 +814,25 @@ func getCommonStreamFilter(f filter) (*StreamFilter, filter) { return nil, f } -func getCommonFilterTime(f filter) (*filterTime, filter) { +func getFilterTimeRange(f filter) (int64, int64) { switch t := f.(type) { case *filterAnd: + minTimestamp := int64(math.MinInt64) + maxTimestamp := int64(math.MaxInt64) for _, filter := range t.filters { ft, ok := filter.(*filterTime) if ok { - // The ft must remain in t.filters order to properly filter out rows outside the selected time range - return ft, f + if ft.minTimestamp > minTimestamp { + minTimestamp = ft.minTimestamp + } + if ft.maxTimestamp < maxTimestamp { + maxTimestamp = ft.maxTimestamp + } } } + return minTimestamp, maxTimestamp case *filterTime: - return t, f + return t.minTimestamp, t.maxTimestamp } - return allFilterTime, f -} - -var allFilterTime = &filterTime{ - minTimestamp: math.MinInt64, - maxTimestamp: math.MaxInt64, + return math.MinInt64, math.MaxInt64 }