From df91d2d91f841501370ca6eadc6001f6607f56e2 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 13 Mar 2020 22:42:22 +0200 Subject: [PATCH] lib/storage: remove obsolete code --- lib/storage/index_db.go | 227 +++++++++++++++++----------------------- 1 file changed, 97 insertions(+), 130 deletions(-) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 2b260c569..6e54704c1 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -1210,15 +1210,6 @@ func mergeTSIDs(a, b []TSID) []TSID { } func (is *indexSearch) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]TSID, error) { - // Verify whether `is` contains data for the given tr. - ok, err := is.containsTimeRange(tr) - if err != nil { - return nil, fmt.Errorf("error in containsTimeRange(%s): %s", &tr, err) - } - if !ok { - // Fast path: nothing to search. - return nil, nil - } metricIDs, err := is.searchMetricIDs(tfss, tr, maxMetrics) if err != nil { return nil, err @@ -1700,14 +1691,14 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf return bytes.Compare(a.prefix, b.prefix) < 0 }) - ok, err := is.tryUpdatingMetricIDsForDateRange(metricIDs, tfs, tr, maxMetrics) - if err != nil { - return err - } - if ok { + err := is.tryUpdatingMetricIDsForDateRange(metricIDs, tfs, tr, maxMetrics) + if err == nil { // Fast path: found metricIDs by date range. return nil } + if err != errFallbackToMetricNameMatch { + return err + } // Slow path - try searching over the whole inverted index. minTf, minMetricIDs, err := is.getTagFilterWithMinMetricIDsCountOptimized(tfs, tr, maxMetrics) @@ -2051,30 +2042,41 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (* // Too much dates must be covered. Give up. return nil, errMissingMetricIDsForDate } + if minDate == maxDate { + // Fast path - query on a single day. + metricIDs, err := is.getMetricIDsForDate(minDate, maxMetrics) + if err != nil { + return nil, err + } + atomic.AddUint64(&is.db.dateMetricIDsSearchHits, 1) + return metricIDs, nil + } - // Search for metricIDs for each day in parallel. + // Slower path - query over multiple days in parallel. metricIDs = &uint64set.Set{} var wg sync.WaitGroup var errGlobal error var mu sync.Mutex // protects metricIDs + errGlobal from concurrent access below. for minDate <= maxDate { - date := minDate - isLocal := is.db.getIndexSearch() wg.Add(1) - go func() { + go func(date uint64) { defer wg.Done() + isLocal := is.db.getIndexSearch() defer is.db.putIndexSearch(isLocal) - var result uint64set.Set - err := isLocal.getMetricIDsForDate(date, &result, maxMetrics) + m, err := isLocal.getMetricIDsForDate(date, maxMetrics) mu.Lock() - if metricIDs.Len() < maxMetrics { - metricIDs.UnionMayOwn(&result) + defer mu.Unlock() + if errGlobal != nil { + return } if err != nil { errGlobal = err + return } - mu.Unlock() - }() + if metricIDs.Len() < maxMetrics { + metricIDs.UnionMayOwn(m) + } + }(minDate) minDate++ } wg.Wait() @@ -2085,57 +2087,72 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (* return metricIDs, nil } -func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange, maxMetrics int) (bool, error) { +func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange, maxMetrics int) error { atomic.AddUint64(&is.db.dateRangeSearchCalls, 1) minDate := uint64(tr.MinTimestamp) / msecPerDay maxDate := uint64(tr.MaxTimestamp) / msecPerDay if minDate < is.db.startDateForPerDayInvertedIndex || maxDate < minDate { // Per-day inverted index doesn't cover the selected date range. - return false, nil + return errFallbackToMetricNameMatch } if maxDate-minDate > maxDaysForDateMetricIDs { // Too much dates must be covered. Give up, since it may be slow. - return false, nil + return errFallbackToMetricNameMatch + } + if minDate == maxDate { + // Fast path - query only a single date. + m, err := is.getMetricIDsForDateAndFilters(minDate, tfs, maxMetrics) + if err != nil { + return err + } + metricIDs.UnionMayOwn(m) + atomic.AddUint64(&is.db.dateRangeSearchHits, 1) + return nil } - // Search for metricIDs for each day in parallel. + // Slower path - search for metricIDs for each day in parallel. var wg sync.WaitGroup var errGlobal error - okGlobal := true - var mu sync.Mutex // protects metricIDs + *Global vars from concurrent access below + var mu sync.Mutex // protects metricIDs + errGlobal vars from concurrent access below for minDate <= maxDate { - date := minDate - isLocal := is.db.getIndexSearch() wg.Add(1) - go func() { + go func(date uint64) { defer wg.Done() + isLocal := is.db.getIndexSearch() defer is.db.putIndexSearch(isLocal) - var result uint64set.Set - ok, err := isLocal.tryUpdatingMetricIDsForDate(date, &result, tfs, maxMetrics) + m, err := isLocal.getMetricIDsForDateAndFilters(date, tfs, maxMetrics) mu.Lock() - if metricIDs.Len() < maxMetrics { - metricIDs.UnionMayOwn(&result) - } - if !ok { - okGlobal = ok + defer mu.Unlock() + if errGlobal != nil { + return } if err != nil { + if err == errFallbackToMetricNameMatch { + // The per-date search is too expensive. Probably it is faster to perform global search + // using metric name match. + errGlobal = err + return + } dateStr := time.Unix(int64(date*24*3600), 0) errGlobal = fmt.Errorf("cannot search for metricIDs for %s: %s", dateStr, err) + return } - mu.Unlock() - }() + if metricIDs.Len() < maxMetrics { + metricIDs.UnionMayOwn(m) + } + }(minDate) minDate++ } wg.Wait() if errGlobal != nil { - return false, errGlobal + return errGlobal } atomic.AddUint64(&is.db.dateRangeSearchHits, 1) - return okGlobal, nil + return nil } -func (is *indexSearch) tryUpdatingMetricIDsForDate(date uint64, metricIDs *uint64set.Set, tfs *TagFilters, maxMetrics int) (bool, error) { +func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilters, maxMetrics int) (*uint64set.Set, error) { + // Populate metricIDs with the first non-negative filter. var tfFirst *tagFilter for i := range tfs.tfs { tf := &tfs.tfs[i] @@ -2145,37 +2162,36 @@ func (is *indexSearch) tryUpdatingMetricIDsForDate(date uint64, metricIDs *uint6 tfFirst = tf break } - - var result *uint64set.Set + var metricIDs *uint64set.Set maxDateMetrics := maxMetrics * 50 if tfFirst == nil { - result = &uint64set.Set{} - if err := is.updateMetricIDsForDateAll(result, date, maxDateMetrics); err != nil { + // All the filters in tfs are negative. Populate all the metricIDs for the given (date), + // so later they can be filtered out with negative filters. + m, err := is.getMetricIDsForDate(date, maxDateMetrics) + if err != nil { if err == errMissingMetricIDsForDate { - // Zero data points were written on the given date. + // Zero time series were written on the given date. // It is OK, since (date, metricID) entries must exist for the given date // according to startDateForPerDayInvertedIndex. - return true, nil + return nil, nil } - return false, fmt.Errorf("cannot obtain all the metricIDs: %s", err) + return nil, fmt.Errorf("cannot obtain all the metricIDs: %s", err) } + metricIDs = m } else { + // Populate metricIDs for the given tfFirst on the given (date) m, err := is.getMetricIDsForDateTagFilter(tfFirst, date, tfs.commonPrefix, maxDateMetrics) if err != nil { - if err == errFallbackToMetricNameMatch { - // The per-date search is too expensive. Probably it is better to perform global search - // using metric name match. - return false, nil - } - return false, err + return nil, err } - result = m + metricIDs = m } - if result.Len() >= maxDateMetrics { + if metricIDs.Len() >= maxDateMetrics { // Too many time series found by a single tag filter. Fall back to global search. - return false, nil + return nil, errFallbackToMetricNameMatch } + // Intersect metricIDs with the rest of filters. for i := range tfs.tfs { tf := &tfs.tfs[i] if tf == tfFirst { @@ -2183,28 +2199,23 @@ func (is *indexSearch) tryUpdatingMetricIDsForDate(date uint64, metricIDs *uint6 } m, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, maxDateMetrics) if err != nil { - if err == errFallbackToMetricNameMatch { - // The per-date search is too expensive. Probably it is better to perform global search - // using metric name match. - return false, nil - } - return false, err + return nil, err } if m.Len() >= maxDateMetrics { // Too many time series found by a single tag filter. Fall back to global search. - return false, nil + return nil, errFallbackToMetricNameMatch } if tf.isNegative { - result.Subtract(m) + metricIDs.Subtract(m) } else { - result.Intersect(m) + metricIDs.Intersect(m) } - if result.Len() == 0 { - return true, nil + if metricIDs.Len() == 0 { + // Short circuit - there is no need in applying the remaining filters to empty set. + return nil, nil } } - metricIDs.UnionMayOwn(result) - return true, nil + return metricIDs, nil } func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int) (*uint64set.Set, bool) { @@ -2332,67 +2343,23 @@ func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64, return is.getMetricIDsForTagFilter(&tfNew, maxMetrics) } -func (is *indexSearch) getMetricIDsForDate(date uint64, metricIDs *uint64set.Set, maxMetrics int) error { - ts := &is.ts - kb := &is.kb - kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateToMetricID) - kb.B = encoding.MarshalUint64(kb.B, date) - ts.Seek(kb.B) - items := 0 - for metricIDs.Len() < maxMetrics && ts.NextItem() { - if !bytes.HasPrefix(ts.Item, kb.B) { - break - } - // Extract MetricID from ts.Item (the last 8 bytes). - v := ts.Item[len(kb.B):] - if len(v) != 8 { - return fmt.Errorf("cannot extract metricID from k; want %d bytes; got %d bytes", 8, len(v)) - } - metricID := encoding.UnmarshalUint64(v) - metricIDs.Add(metricID) - items++ - } - if err := ts.Error(); err != nil { - return fmt.Errorf("error when searching for metricIDs for date %d: %s", date, err) - } - if items == 0 { - // There are no metricIDs for the given date. - // This may be the case for old data when Date -> MetricID wasn't available. - return errMissingMetricIDsForDate - } - return nil -} - -func (is *indexSearch) containsTimeRange(tr TimeRange) (bool, error) { - ts := &is.ts - kb := &is.kb - - // Verify whether the maximum date in `ts` covers tr.MinTimestamp. - minDate := uint64(tr.MinTimestamp) / msecPerDay - kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateToMetricID) - kb.B = encoding.MarshalUint64(kb.B, minDate) - ts.Seek(kb.B) - if !ts.NextItem() { - if err := ts.Error(); err != nil { - return false, fmt.Errorf("error when searching for minDate=%d, prefix %q: %s", minDate, kb.B, err) - } - return false, nil - } - if !bytes.HasPrefix(ts.Item, kb.B[:1]) { - // minDate exceeds max date from ts. - return false, nil - } - return true, nil -} - -func (is *indexSearch) updateMetricIDsForDateAll(metricIDs *uint64set.Set, date uint64, maxMetrics int) error { +func (is *indexSearch) getMetricIDsForDate(date uint64, maxMetrics int) (*uint64set.Set, error) { // Extract all the metricIDs from (date, __name__=value)->metricIDs entries. kb := kbPool.Get() defer kbPool.Put(kb) - kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs) + kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs) kb.B = encoding.MarshalUint64(kb.B, date) kb.B = marshalTagValue(kb.B, nil) - return is.updateMetricIDsForPrefix(kb.B, metricIDs, maxMetrics) + var metricIDs uint64set.Set + if err := is.updateMetricIDsForPrefix(kb.B, &metricIDs, maxMetrics); err != nil { + return nil, err + } + if metricIDs.Len() == 0 { + // There are no metricIDs for the given date. + // This may be the case for old data where (data, __name__=value)->metricIDs entries weren't available. + return nil, errMissingMetricIDsForDate + } + return &metricIDs, nil } func (is *indexSearch) updateMetricIDsAll(metricIDs *uint64set.Set, maxMetrics int) error {