diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index ddccd83f2..e290db6a0 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -1233,18 +1233,6 @@ func mergeTSIDs(a, b []TSID) []TSID { } func (is *indexSearch) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]TSID, error) { - accountID := tfss[0].accountID - projectID := tfss[0].projectID - - // Verify whether `is` contains data for the given tr. - ok, err := is.containsTimeRange(tr, accountID, projectID) - if err != nil { - return nil, fmt.Errorf("error in containsTimeRange(%s): %s", &tr, err) - } - if !ok { - // Fast path: nothing to search. - return nil, nil - } metricIDs, err := is.searchMetricIDs(tfss, tr, maxMetrics) if err != nil { return nil, err @@ -1257,6 +1245,8 @@ func (is *indexSearch) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics // Obtain TSID values for the given metricIDs. tsids := make([]TSID, len(metricIDs)) i := 0 + accountID := tfss[0].accountID + projectID := tfss[0].projectID for _, metricID := range metricIDs { // Try obtaining TSIDs from db.tsidCache. This is much faster // than scanning the mergeset if it contains a lot of metricIDs. @@ -1726,14 +1716,14 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf return bytes.Compare(a.prefix, b.prefix) < 0 }) - ok, err := is.tryUpdatingMetricIDsForDateRange(metricIDs, tfs, tr, maxMetrics) - if err != nil { - return err - } - if ok { + err := is.tryUpdatingMetricIDsForDateRange(metricIDs, tfs, tr, maxMetrics) + if err == nil { // Fast path: found metricIDs by date range. return nil } + if err != errFallbackToMetricNameMatch { + return err + } // Slow path - try searching over the whole inverted index. minTf, minMetricIDs, err := is.getTagFilterWithMinMetricIDsCountOptimized(tfs, tr, maxMetrics) @@ -2077,30 +2067,41 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int, ac // Too much dates must be covered. Give up. return nil, errMissingMetricIDsForDate } + if minDate == maxDate { + // Fast path - query on a single day. + metricIDs, err := is.getMetricIDsForDate(minDate, accountID, projectID, maxMetrics) + if err != nil { + return nil, err + } + atomic.AddUint64(&is.db.dateMetricIDsSearchHits, 1) + return metricIDs, nil + } - // Search for metricIDs for each day in parallel. + // Slower path - query over multiple days in parallel. metricIDs = &uint64set.Set{} var wg sync.WaitGroup var errGlobal error var mu sync.Mutex // protects metricIDs + errGlobal from concurrent access below. for minDate <= maxDate { - date := minDate - isLocal := is.db.getIndexSearch() wg.Add(1) - go func() { + go func(date uint64) { defer wg.Done() + isLocal := is.db.getIndexSearch() defer is.db.putIndexSearch(isLocal) - var result uint64set.Set - err := isLocal.getMetricIDsForDate(date, &result, maxMetrics, accountID, projectID) + m, err := isLocal.getMetricIDsForDate(date, accountID, projectID, maxMetrics) mu.Lock() - if metricIDs.Len() < maxMetrics { - metricIDs.UnionMayOwn(&result) + defer mu.Unlock() + if errGlobal != nil { + return } if err != nil { errGlobal = err + return } - mu.Unlock() - }() + if metricIDs.Len() < maxMetrics { + metricIDs.UnionMayOwn(m) + } + }(minDate) minDate++ } wg.Wait() @@ -2111,57 +2112,72 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int, ac return metricIDs, nil } -func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange, maxMetrics int) (bool, error) { +func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange, maxMetrics int) error { atomic.AddUint64(&is.db.dateRangeSearchCalls, 1) minDate := uint64(tr.MinTimestamp) / msecPerDay maxDate := uint64(tr.MaxTimestamp) / msecPerDay if minDate < is.db.startDateForPerDayInvertedIndex || maxDate < minDate { // Per-day inverted index doesn't cover the selected date range. - return false, nil + return errFallbackToMetricNameMatch } if maxDate-minDate > maxDaysForDateMetricIDs { // Too much dates must be covered. Give up, since it may be slow. - return false, nil + return errFallbackToMetricNameMatch + } + if minDate == maxDate { + // Fast path - query only a single date. + m, err := is.getMetricIDsForDateAndFilters(minDate, tfs, maxMetrics) + if err != nil { + return err + } + metricIDs.UnionMayOwn(m) + atomic.AddUint64(&is.db.dateRangeSearchHits, 1) + return nil } - // Search for metricIDs for each day in parallel. + // Slower path - search for metricIDs for each day in parallel. var wg sync.WaitGroup var errGlobal error - okGlobal := true - var mu sync.Mutex // protects metricIDs + *Global vars from concurrent access below + var mu sync.Mutex // protects metricIDs + errGlobal vars from concurrent access below for minDate <= maxDate { - date := minDate - isLocal := is.db.getIndexSearch() wg.Add(1) - go func() { + go func(date uint64) { defer wg.Done() + isLocal := is.db.getIndexSearch() defer is.db.putIndexSearch(isLocal) - var result uint64set.Set - ok, err := isLocal.tryUpdatingMetricIDsForDate(date, &result, tfs, maxMetrics) + m, err := isLocal.getMetricIDsForDateAndFilters(date, tfs, maxMetrics) mu.Lock() - if metricIDs.Len() < maxMetrics { - metricIDs.UnionMayOwn(&result) - } - if !ok { - okGlobal = ok + defer mu.Unlock() + if errGlobal != nil { + return } if err != nil { + if err == errFallbackToMetricNameMatch { + // The per-date search is too expensive. Probably it is faster to perform global search + // using metric name match. + errGlobal = err + return + } dateStr := time.Unix(int64(date*24*3600), 0) errGlobal = fmt.Errorf("cannot search for metricIDs for %s: %s", dateStr, err) + return } - mu.Unlock() - }() + if metricIDs.Len() < maxMetrics { + metricIDs.UnionMayOwn(m) + } + }(minDate) minDate++ } wg.Wait() if errGlobal != nil { - return false, errGlobal + return errGlobal } atomic.AddUint64(&is.db.dateRangeSearchHits, 1) - return okGlobal, nil + return nil } -func (is *indexSearch) tryUpdatingMetricIDsForDate(date uint64, metricIDs *uint64set.Set, tfs *TagFilters, maxMetrics int) (bool, error) { +func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilters, maxMetrics int) (*uint64set.Set, error) { + // Populate metricIDs with the first non-negative filter. var tfFirst *tagFilter for i := range tfs.tfs { tf := &tfs.tfs[i] @@ -2171,37 +2187,36 @@ func (is *indexSearch) tryUpdatingMetricIDsForDate(date uint64, metricIDs *uint6 tfFirst = tf break } - - var result *uint64set.Set + var metricIDs *uint64set.Set maxDateMetrics := maxMetrics * 50 if tfFirst == nil { - result = &uint64set.Set{} - if err := is.updateMetricIDsForDateAll(result, date, tfs.accountID, tfs.projectID, maxDateMetrics); err != nil { + // All the filters in tfs are negative. Populate all the metricIDs for the given (date, accountID, projectID), + // so later they can be filtered out with negative filters. + m, err := is.getMetricIDsForDate(date, tfs.accountID, tfs.projectID, maxDateMetrics) + if err != nil { if err == errMissingMetricIDsForDate { - // Zero data points were written on the given date. + // Zero time series were written on the given date. // It is OK, since (date, metricID) entries must exist for the given date // according to startDateForPerDayInvertedIndex. - return true, nil + return nil, nil } - return false, fmt.Errorf("cannot obtain all the metricIDs: %s", err) + return nil, fmt.Errorf("cannot obtain all the metricIDs: %s", err) } + metricIDs = m } else { + // Populate metricIDs for the given tfFirst on the given (date, accountId, projectID) m, err := is.getMetricIDsForDateTagFilter(tfFirst, date, tfs.commonPrefix, tfs.accountID, tfs.projectID, maxDateMetrics) if err != nil { - if err == errFallbackToMetricNameMatch { - // The per-date search is too expensive. Probably it is better to perform global search - // using metric name match. - return false, nil - } - return false, err + return nil, err } - result = m + metricIDs = m } - if result.Len() >= maxDateMetrics { + if metricIDs.Len() >= maxDateMetrics { // Too many time series found by a single tag filter. Fall back to global search. - return false, nil + return nil, errFallbackToMetricNameMatch } + // Intersect metricIDs with the rest of filters. for i := range tfs.tfs { tf := &tfs.tfs[i] if tf == tfFirst { @@ -2209,28 +2224,23 @@ func (is *indexSearch) tryUpdatingMetricIDsForDate(date uint64, metricIDs *uint6 } m, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, tfs.accountID, tfs.projectID, maxDateMetrics) if err != nil { - if err == errFallbackToMetricNameMatch { - // The per-date search is too expensive. Probably it is better to perform global search - // using metric name match. - return false, nil - } - return false, err + return nil, err } if m.Len() >= maxDateMetrics { // Too many time series found by a single tag filter. Fall back to global search. - return false, nil + return nil, errFallbackToMetricNameMatch } if tf.isNegative { - result.Subtract(m) + metricIDs.Subtract(m) } else { - result.Intersect(m) + metricIDs.Intersect(m) } - if result.Len() == 0 { - return true, nil + if metricIDs.Len() == 0 { + // Short circuit - there is no need in applying the remaining filters to empty set. + return nil, nil } } - metricIDs.UnionMayOwn(result) - return true, nil + return metricIDs, nil } func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int, accountID, projectID uint32) (*uint64set.Set, bool) { @@ -2369,67 +2379,23 @@ func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64, return is.getMetricIDsForTagFilter(&tfNew, maxMetrics) } -func (is *indexSearch) getMetricIDsForDate(date uint64, metricIDs *uint64set.Set, maxMetrics int, accountID, projectID uint32) error { - ts := &is.ts - kb := &is.kb - kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateToMetricID, accountID, projectID) - kb.B = encoding.MarshalUint64(kb.B, date) - ts.Seek(kb.B) - items := 0 - for metricIDs.Len() < maxMetrics && ts.NextItem() { - if !bytes.HasPrefix(ts.Item, kb.B) { - break - } - // Extract MetricID from ts.Item (the last 8 bytes). - v := ts.Item[len(kb.B):] - if len(v) != 8 { - return fmt.Errorf("cannot extract metricID from k; want %d bytes; got %d bytes", 8, len(v)) - } - metricID := encoding.UnmarshalUint64(v) - metricIDs.Add(metricID) - items++ - } - if err := ts.Error(); err != nil { - return fmt.Errorf("error when searching for metricIDs for date %d: %s", date, err) - } - if items == 0 { - // There are no metricIDs for the given date. - // This may be the case for old data when Date -> MetricID wasn't available. - return errMissingMetricIDsForDate - } - return nil -} - -func (is *indexSearch) containsTimeRange(tr TimeRange, accountID, projectID uint32) (bool, error) { - ts := &is.ts - kb := &is.kb - - // Verify whether the maximum date in `ts` covers tr.MinTimestamp. - minDate := uint64(tr.MinTimestamp) / msecPerDay - kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateToMetricID, accountID, projectID) - kb.B = encoding.MarshalUint64(kb.B, minDate) - ts.Seek(kb.B) - if !ts.NextItem() { - if err := ts.Error(); err != nil { - return false, fmt.Errorf("error when searching for minDate=%d, prefix %q: %s", minDate, kb.B, err) - } - return false, nil - } - if !bytes.HasPrefix(ts.Item, kb.B[:1]) { - // minDate exceeds max date from ts. - return false, nil - } - return true, nil -} - -func (is *indexSearch) updateMetricIDsForDateAll(metricIDs *uint64set.Set, date uint64, accountID, projectID uint32, maxMetrics int) error { +func (is *indexSearch) getMetricIDsForDate(date uint64, accountID, projectID uint32, maxMetrics int) (*uint64set.Set, error) { // Extract all the metricIDs from (date, __name__=value)->metricIDs entries. kb := kbPool.Get() defer kbPool.Put(kb) - kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs, accountID, projectID) + kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs, accountID, projectID) kb.B = encoding.MarshalUint64(kb.B, date) kb.B = marshalTagValue(kb.B, nil) - return is.updateMetricIDsForPrefix(kb.B, metricIDs, maxMetrics) + var metricIDs uint64set.Set + if err := is.updateMetricIDsForPrefix(kb.B, &metricIDs, maxMetrics); err != nil { + return nil, err + } + if metricIDs.Len() == 0 { + // There are no metricIDs for the given date. + // This may be the case for old data where (data, __name__=value)->metricIDs entries weren't available. + return nil, errMissingMetricIDsForDate + } + return &metricIDs, nil } func (is *indexSearch) updateMetricIDsAll(metricIDs *uint64set.Set, accountID, projectID uint32, maxMetrics int) error {