diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index eb585fedd..bca9f734e 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -2381,7 +2381,13 @@ func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, filter *uint64set metricIDs := &uint64set.Set{} if len(tf.orSuffixes) > 0 { // Fast path for orSuffixes - seek for rows for each value from orSuffixes. - loopsCount, err := is.updateMetricIDsForOrSuffixesNoFilter(tf, maxMetrics, metricIDs) + var loopsCount uint64 + var err error + if filter != nil { + loopsCount, err = is.updateMetricIDsForOrSuffixesWithFilter(tf, metricIDs, filter) + } else { + loopsCount, err = is.updateMetricIDsForOrSuffixesNoFilter(tf, maxMetrics, metricIDs) + } if err != nil { return nil, loopsCount, fmt.Errorf("error when searching for metricIDs for tagFilter in fast path: %w; tagFilter=%s", err, tf) } @@ -2518,19 +2524,22 @@ func (is *indexSearch) updateMetricIDsForOrSuffixesNoFilter(tf *tagFilter, maxMe return loopsCount, nil } -func (is *indexSearch) updateMetricIDsForOrSuffixesWithFilter(tf *tagFilter, metricIDs, filter *uint64set.Set) error { +func (is *indexSearch) updateMetricIDsForOrSuffixesWithFilter(tf *tagFilter, metricIDs, filter *uint64set.Set) (uint64, error) { sortedFilter := filter.AppendTo(nil) kb := kbPool.Get() defer kbPool.Put(kb) + var loopsCount uint64 for _, orSuffix := range tf.orSuffixes { kb.B = append(kb.B[:0], tf.prefix...) kb.B = append(kb.B, orSuffix...) kb.B = append(kb.B, tagSeparatorChar) - if err := is.updateMetricIDsForOrSuffixWithFilter(kb.B, metricIDs, sortedFilter, tf.isNegative); err != nil { - return err + lc, err := is.updateMetricIDsForOrSuffixWithFilter(kb.B, metricIDs, sortedFilter, tf.isNegative) + if err != nil { + return loopsCount, err } + loopsCount += lc } - return nil + return loopsCount, nil } func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetrics int, metricIDs *uint64set.Set) (uint64, error) { @@ -2564,15 +2573,16 @@ func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetr return loopsCount, nil } -func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metricIDs *uint64set.Set, sortedFilter []uint64, isNegative bool) error { +func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metricIDs *uint64set.Set, sortedFilter []uint64, isNegative bool) (uint64, error) { if len(sortedFilter) == 0 { - return nil + return 0, nil } firstFilterMetricID := sortedFilter[0] lastFilterMetricID := sortedFilter[len(sortedFilter)-1] ts := &is.ts mp := &is.mp mp.Reset() + var loopsCount uint64 loopsPaceLimiter := 0 ts.Seek(prefix) var sf []uint64 @@ -2580,17 +2590,18 @@ func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metri for ts.NextItem() { if loopsPaceLimiter&paceLimiterMediumIterationsMask == 0 { if err := checkSearchDeadlineAndPace(is.deadline); err != nil { - return err + return loopsCount, err } } loopsPaceLimiter++ item := ts.Item if !bytes.HasPrefix(item, prefix) { - return nil + return loopsCount, nil } if err := mp.InitOnlyTail(item, item[len(prefix):]); err != nil { - return err + return loopsCount, err } + loopsCount += uint64(mp.MetricIDsLen()) firstMetricID, lastMetricID := mp.FirstAndLastMetricIDs() if lastMetricID < firstFilterMetricID { // Skip the item, since it contains metricIDs lower @@ -2600,10 +2611,11 @@ func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metri if firstMetricID > lastFilterMetricID { // Stop searching, since the current item and all the subsequent items // contain metricIDs higher than metricIDs in sortedFilter. - return nil + return loopsCount, nil } sf = sortedFilter mp.ParseMetricIDs() + matchingMetricIDs := mp.MetricIDs[:0] for _, metricID = range mp.MetricIDs { if len(sf) == 0 { break @@ -2618,18 +2630,23 @@ func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metri if metricID < sf[0] { continue } - if isNegative { - metricIDs.Del(metricID) - } else { - metricIDs.Add(metricID) - } + matchingMetricIDs = append(matchingMetricIDs, metricID) sf = sf[1:] } + if len(matchingMetricIDs) > 0 { + if isNegative { + for _, metricID := range matchingMetricIDs { + metricIDs.Del(metricID) + } + } else { + metricIDs.AddMulti(matchingMetricIDs) + } + } } if err := ts.Error(); err != nil { - return fmt.Errorf("error when searching for tag filter prefix %q: %w", prefix, err) + return loopsCount, fmt.Errorf("error when searching for tag filter prefix %q: %w", prefix, err) } - return nil + return loopsCount, nil } func binarySearchUint64(a []uint64, v uint64) uint { @@ -2817,10 +2834,9 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter }) // Populate metricIDs for the first non-negative filter. - var tfsPostponed []*tagFilter var metricIDs *uint64set.Set tfwsRemaining := tfws[:0] - maxDateMetrics := maxMetrics * 100 + maxDateMetrics := maxMetrics * 50 for i := range tfws { tfw := tfws[i] tf := tfw.tf @@ -2829,13 +2845,16 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter continue } m, loopsCount, err := is.getMetricIDsForDateTagFilter(tf, date, nil, tfs.commonPrefix, maxDateMetrics) - is.storeLoopsCountForDateFilter(date, tf, loopsCount) + if loopsCount > tfw.loopsCount { + is.storeLoopsCountForDateFilter(date, tf, loopsCount) + } if err != nil { return nil, err } if m.Len() >= maxDateMetrics { - // Too many time series found by a single tag filter. Postpone applying this filter via metricName match. - tfsPostponed = append(tfsPostponed, tf) + // Too many time series found by a single tag filter. Postpone applying this filter. + tfwsRemaining = append(tfwsRemaining, tfw) + tfw.loopsCount = loopsCount continue } metricIDs = m @@ -2871,6 +2890,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter // when the intial tag filters significantly reduce the number of found metricIDs, // so the remaining filters could be performed via much faster metricName matching instead // of slow selecting of matching metricIDs. + var tfsPostponed []*tagFilter for i := range tfwsRemaining { tfw := tfwsRemaining[i] tf := tfw.tf @@ -2879,24 +2899,26 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter // Short circuit - there is no need in applying the remaining filters to an empty set. break } - if uint64(metricIDsLen)*maxIndexScanLoopsPerMetric < tfw.loopsCount { + if tfw.loopsCount > uint64(metricIDsLen)*loopsCountPerMetricNameMatch { // It should be faster performing metricName match on the remaining filters // instead of scanning big number of entries in the inverted index for these filters. - tfsPostponed = append(tfsPostponed, tf) - // Store stats for non-executed tf, since it could be updated during protection from thundered herd. - is.storeLoopsCountForDateFilter(date, tf, tfw.loopsCount) - continue + for i < len(tfwsRemaining) { + tfw := tfwsRemaining[i] + tf := tfw.tf + tfsPostponed = append(tfsPostponed, tf) + // Store stats for non-executed tf, since it could be updated during protection from thundered herd. + is.storeLoopsCountForDateFilter(date, tf, tfw.loopsCount) + i++ + } + break + } + m, loopsCount, err := is.getMetricIDsForDateTagFilter(tf, date, metricIDs, tfs.commonPrefix, 0) + if loopsCount > tfw.loopsCount { + is.storeLoopsCountForDateFilter(date, tf, loopsCount) } - m, loopsCount, err := is.getMetricIDsForDateTagFilter(tf, date, metricIDs, tfs.commonPrefix, maxDateMetrics) - is.storeLoopsCountForDateFilter(date, tf, loopsCount) if err != nil { return nil, err } - if m.Len() >= maxDateMetrics { - // Too many time series found by a single tag filter. Postpone applying this filter via metricName match. - tfsPostponed = append(tfsPostponed, tf) - continue - } if tf.isNegative { metricIDs.Subtract(m) } else { @@ -3082,7 +3104,7 @@ func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64, // Set high loopsCount for failing filter, so it is moved to the end of filter list. loopsCount = 20e9 } - if metricIDs.Len() >= maxMetrics { + if filter == nil && metricIDs.Len() >= maxMetrics { // Increase loopsCount for tag filter matching too many metrics, // So next time it is moved to the end of filter list. loopsCount *= 2 @@ -3179,10 +3201,8 @@ func (is *indexSearch) updateMetricIDsForPrefix(prefix []byte, metricIDs *uint64 return nil } -// The maximum number of index scan loops. -// Bigger number of loops is slower than updateMetricIDsByMetricNameMatch -// over the found metrics. -const maxIndexScanLoopsPerMetric = 100 +// The estimated number of index scan loops a single loop in updateMetricIDsByMetricNameMatch takes. +const loopsCountPerMetricNameMatch = 500 func (is *indexSearch) intersectMetricIDsWithTagFilter(tf *tagFilter, filter *uint64set.Set) (*uint64set.Set, error) { if filter.Len() == 0 { @@ -3194,7 +3214,8 @@ func (is *indexSearch) intersectMetricIDsWithTagFilter(tf *tagFilter, filter *ui } if len(tf.orSuffixes) > 0 { // Fast path for orSuffixes - seek for rows for each value from orSuffixes. - if err := is.updateMetricIDsForOrSuffixesWithFilter(tf, metricIDs, filter); err != nil { + _, err := is.updateMetricIDsForOrSuffixesWithFilter(tf, metricIDs, filter) + if err != nil { return nil, fmt.Errorf("error when intersecting metricIDs for tagFilter in fast path: %w; tagFilter=%s", err, tf) } return metricIDs, nil