lib/storage: remove obsolete code

This commit is contained in:
Aliaksandr Valialkin 2020-03-13 22:42:22 +02:00
parent 3c7c71a49c
commit df91d2d91f

View file

@ -1210,15 +1210,6 @@ func mergeTSIDs(a, b []TSID) []TSID {
} }
func (is *indexSearch) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]TSID, error) { func (is *indexSearch) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]TSID, error) {
// Verify whether `is` contains data for the given tr.
ok, err := is.containsTimeRange(tr)
if err != nil {
return nil, fmt.Errorf("error in containsTimeRange(%s): %s", &tr, err)
}
if !ok {
// Fast path: nothing to search.
return nil, nil
}
metricIDs, err := is.searchMetricIDs(tfss, tr, maxMetrics) metricIDs, err := is.searchMetricIDs(tfss, tr, maxMetrics)
if err != nil { if err != nil {
return nil, err return nil, err
@ -1700,14 +1691,14 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf
return bytes.Compare(a.prefix, b.prefix) < 0 return bytes.Compare(a.prefix, b.prefix) < 0
}) })
ok, err := is.tryUpdatingMetricIDsForDateRange(metricIDs, tfs, tr, maxMetrics) err := is.tryUpdatingMetricIDsForDateRange(metricIDs, tfs, tr, maxMetrics)
if err != nil { if err == nil {
return err
}
if ok {
// Fast path: found metricIDs by date range. // Fast path: found metricIDs by date range.
return nil return nil
} }
if err != errFallbackToMetricNameMatch {
return err
}
// Slow path - try searching over the whole inverted index. // Slow path - try searching over the whole inverted index.
minTf, minMetricIDs, err := is.getTagFilterWithMinMetricIDsCountOptimized(tfs, tr, maxMetrics) minTf, minMetricIDs, err := is.getTagFilterWithMinMetricIDsCountOptimized(tfs, tr, maxMetrics)
@ -2051,30 +2042,41 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (*
// Too much dates must be covered. Give up. // Too much dates must be covered. Give up.
return nil, errMissingMetricIDsForDate return nil, errMissingMetricIDsForDate
} }
if minDate == maxDate {
// Fast path - query on a single day.
metricIDs, err := is.getMetricIDsForDate(minDate, maxMetrics)
if err != nil {
return nil, err
}
atomic.AddUint64(&is.db.dateMetricIDsSearchHits, 1)
return metricIDs, nil
}
// Search for metricIDs for each day in parallel. // Slower path - query over multiple days in parallel.
metricIDs = &uint64set.Set{} metricIDs = &uint64set.Set{}
var wg sync.WaitGroup var wg sync.WaitGroup
var errGlobal error var errGlobal error
var mu sync.Mutex // protects metricIDs + errGlobal from concurrent access below. var mu sync.Mutex // protects metricIDs + errGlobal from concurrent access below.
for minDate <= maxDate { for minDate <= maxDate {
date := minDate
isLocal := is.db.getIndexSearch()
wg.Add(1) wg.Add(1)
go func() { go func(date uint64) {
defer wg.Done() defer wg.Done()
isLocal := is.db.getIndexSearch()
defer is.db.putIndexSearch(isLocal) defer is.db.putIndexSearch(isLocal)
var result uint64set.Set m, err := isLocal.getMetricIDsForDate(date, maxMetrics)
err := isLocal.getMetricIDsForDate(date, &result, maxMetrics)
mu.Lock() mu.Lock()
if metricIDs.Len() < maxMetrics { defer mu.Unlock()
metricIDs.UnionMayOwn(&result) if errGlobal != nil {
return
} }
if err != nil { if err != nil {
errGlobal = err errGlobal = err
return
} }
mu.Unlock() if metricIDs.Len() < maxMetrics {
}() metricIDs.UnionMayOwn(m)
}
}(minDate)
minDate++ minDate++
} }
wg.Wait() wg.Wait()
@ -2085,57 +2087,72 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (*
return metricIDs, nil return metricIDs, nil
} }
func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange, maxMetrics int) (bool, error) { func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange, maxMetrics int) error {
atomic.AddUint64(&is.db.dateRangeSearchCalls, 1) atomic.AddUint64(&is.db.dateRangeSearchCalls, 1)
minDate := uint64(tr.MinTimestamp) / msecPerDay minDate := uint64(tr.MinTimestamp) / msecPerDay
maxDate := uint64(tr.MaxTimestamp) / msecPerDay maxDate := uint64(tr.MaxTimestamp) / msecPerDay
if minDate < is.db.startDateForPerDayInvertedIndex || maxDate < minDate { if minDate < is.db.startDateForPerDayInvertedIndex || maxDate < minDate {
// Per-day inverted index doesn't cover the selected date range. // Per-day inverted index doesn't cover the selected date range.
return false, nil return errFallbackToMetricNameMatch
} }
if maxDate-minDate > maxDaysForDateMetricIDs { if maxDate-minDate > maxDaysForDateMetricIDs {
// Too much dates must be covered. Give up, since it may be slow. // Too much dates must be covered. Give up, since it may be slow.
return false, nil return errFallbackToMetricNameMatch
}
if minDate == maxDate {
// Fast path - query only a single date.
m, err := is.getMetricIDsForDateAndFilters(minDate, tfs, maxMetrics)
if err != nil {
return err
}
metricIDs.UnionMayOwn(m)
atomic.AddUint64(&is.db.dateRangeSearchHits, 1)
return nil
} }
// Search for metricIDs for each day in parallel. // Slower path - search for metricIDs for each day in parallel.
var wg sync.WaitGroup var wg sync.WaitGroup
var errGlobal error var errGlobal error
okGlobal := true var mu sync.Mutex // protects metricIDs + errGlobal vars from concurrent access below
var mu sync.Mutex // protects metricIDs + *Global vars from concurrent access below
for minDate <= maxDate { for minDate <= maxDate {
date := minDate
isLocal := is.db.getIndexSearch()
wg.Add(1) wg.Add(1)
go func() { go func(date uint64) {
defer wg.Done() defer wg.Done()
isLocal := is.db.getIndexSearch()
defer is.db.putIndexSearch(isLocal) defer is.db.putIndexSearch(isLocal)
var result uint64set.Set m, err := isLocal.getMetricIDsForDateAndFilters(date, tfs, maxMetrics)
ok, err := isLocal.tryUpdatingMetricIDsForDate(date, &result, tfs, maxMetrics)
mu.Lock() mu.Lock()
if metricIDs.Len() < maxMetrics { defer mu.Unlock()
metricIDs.UnionMayOwn(&result) if errGlobal != nil {
} return
if !ok {
okGlobal = ok
} }
if err != nil { if err != nil {
if err == errFallbackToMetricNameMatch {
// The per-date search is too expensive. Probably it is faster to perform global search
// using metric name match.
errGlobal = err
return
}
dateStr := time.Unix(int64(date*24*3600), 0) dateStr := time.Unix(int64(date*24*3600), 0)
errGlobal = fmt.Errorf("cannot search for metricIDs for %s: %s", dateStr, err) errGlobal = fmt.Errorf("cannot search for metricIDs for %s: %s", dateStr, err)
return
} }
mu.Unlock() if metricIDs.Len() < maxMetrics {
}() metricIDs.UnionMayOwn(m)
}
}(minDate)
minDate++ minDate++
} }
wg.Wait() wg.Wait()
if errGlobal != nil { if errGlobal != nil {
return false, errGlobal return errGlobal
} }
atomic.AddUint64(&is.db.dateRangeSearchHits, 1) atomic.AddUint64(&is.db.dateRangeSearchHits, 1)
return okGlobal, nil return nil
} }
func (is *indexSearch) tryUpdatingMetricIDsForDate(date uint64, metricIDs *uint64set.Set, tfs *TagFilters, maxMetrics int) (bool, error) { func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilters, maxMetrics int) (*uint64set.Set, error) {
// Populate metricIDs with the first non-negative filter.
var tfFirst *tagFilter var tfFirst *tagFilter
for i := range tfs.tfs { for i := range tfs.tfs {
tf := &tfs.tfs[i] tf := &tfs.tfs[i]
@ -2145,37 +2162,36 @@ func (is *indexSearch) tryUpdatingMetricIDsForDate(date uint64, metricIDs *uint6
tfFirst = tf tfFirst = tf
break break
} }
var metricIDs *uint64set.Set
var result *uint64set.Set
maxDateMetrics := maxMetrics * 50 maxDateMetrics := maxMetrics * 50
if tfFirst == nil { if tfFirst == nil {
result = &uint64set.Set{} // All the filters in tfs are negative. Populate all the metricIDs for the given (date),
if err := is.updateMetricIDsForDateAll(result, date, maxDateMetrics); err != nil { // so later they can be filtered out with negative filters.
m, err := is.getMetricIDsForDate(date, maxDateMetrics)
if err != nil {
if err == errMissingMetricIDsForDate { if err == errMissingMetricIDsForDate {
// Zero data points were written on the given date. // Zero time series were written on the given date.
// It is OK, since (date, metricID) entries must exist for the given date // It is OK, since (date, metricID) entries must exist for the given date
// according to startDateForPerDayInvertedIndex. // according to startDateForPerDayInvertedIndex.
return true, nil return nil, nil
} }
return false, fmt.Errorf("cannot obtain all the metricIDs: %s", err) return nil, fmt.Errorf("cannot obtain all the metricIDs: %s", err)
} }
metricIDs = m
} else { } else {
// Populate metricIDs for the given tfFirst on the given (date)
m, err := is.getMetricIDsForDateTagFilter(tfFirst, date, tfs.commonPrefix, maxDateMetrics) m, err := is.getMetricIDsForDateTagFilter(tfFirst, date, tfs.commonPrefix, maxDateMetrics)
if err != nil { if err != nil {
if err == errFallbackToMetricNameMatch { return nil, err
// The per-date search is too expensive. Probably it is better to perform global search
// using metric name match.
return false, nil
}
return false, err
} }
result = m metricIDs = m
} }
if result.Len() >= maxDateMetrics { if metricIDs.Len() >= maxDateMetrics {
// Too many time series found by a single tag filter. Fall back to global search. // Too many time series found by a single tag filter. Fall back to global search.
return false, nil return nil, errFallbackToMetricNameMatch
} }
// Intersect metricIDs with the rest of filters.
for i := range tfs.tfs { for i := range tfs.tfs {
tf := &tfs.tfs[i] tf := &tfs.tfs[i]
if tf == tfFirst { if tf == tfFirst {
@ -2183,28 +2199,23 @@ func (is *indexSearch) tryUpdatingMetricIDsForDate(date uint64, metricIDs *uint6
} }
m, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, maxDateMetrics) m, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, maxDateMetrics)
if err != nil { if err != nil {
if err == errFallbackToMetricNameMatch { return nil, err
// The per-date search is too expensive. Probably it is better to perform global search
// using metric name match.
return false, nil
}
return false, err
} }
if m.Len() >= maxDateMetrics { if m.Len() >= maxDateMetrics {
// Too many time series found by a single tag filter. Fall back to global search. // Too many time series found by a single tag filter. Fall back to global search.
return false, nil return nil, errFallbackToMetricNameMatch
} }
if tf.isNegative { if tf.isNegative {
result.Subtract(m) metricIDs.Subtract(m)
} else { } else {
result.Intersect(m) metricIDs.Intersect(m)
} }
if result.Len() == 0 { if metricIDs.Len() == 0 {
return true, nil // Short circuit - there is no need in applying the remaining filters to empty set.
return nil, nil
} }
} }
metricIDs.UnionMayOwn(result) return metricIDs, nil
return true, nil
} }
func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int) (*uint64set.Set, bool) { func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int) (*uint64set.Set, bool) {
@ -2332,67 +2343,23 @@ func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64,
return is.getMetricIDsForTagFilter(&tfNew, maxMetrics) return is.getMetricIDsForTagFilter(&tfNew, maxMetrics)
} }
func (is *indexSearch) getMetricIDsForDate(date uint64, metricIDs *uint64set.Set, maxMetrics int) error { func (is *indexSearch) getMetricIDsForDate(date uint64, maxMetrics int) (*uint64set.Set, error) {
ts := &is.ts
kb := &is.kb
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateToMetricID)
kb.B = encoding.MarshalUint64(kb.B, date)
ts.Seek(kb.B)
items := 0
for metricIDs.Len() < maxMetrics && ts.NextItem() {
if !bytes.HasPrefix(ts.Item, kb.B) {
break
}
// Extract MetricID from ts.Item (the last 8 bytes).
v := ts.Item[len(kb.B):]
if len(v) != 8 {
return fmt.Errorf("cannot extract metricID from k; want %d bytes; got %d bytes", 8, len(v))
}
metricID := encoding.UnmarshalUint64(v)
metricIDs.Add(metricID)
items++
}
if err := ts.Error(); err != nil {
return fmt.Errorf("error when searching for metricIDs for date %d: %s", date, err)
}
if items == 0 {
// There are no metricIDs for the given date.
// This may be the case for old data when Date -> MetricID wasn't available.
return errMissingMetricIDsForDate
}
return nil
}
func (is *indexSearch) containsTimeRange(tr TimeRange) (bool, error) {
ts := &is.ts
kb := &is.kb
// Verify whether the maximum date in `ts` covers tr.MinTimestamp.
minDate := uint64(tr.MinTimestamp) / msecPerDay
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateToMetricID)
kb.B = encoding.MarshalUint64(kb.B, minDate)
ts.Seek(kb.B)
if !ts.NextItem() {
if err := ts.Error(); err != nil {
return false, fmt.Errorf("error when searching for minDate=%d, prefix %q: %s", minDate, kb.B, err)
}
return false, nil
}
if !bytes.HasPrefix(ts.Item, kb.B[:1]) {
// minDate exceeds max date from ts.
return false, nil
}
return true, nil
}
func (is *indexSearch) updateMetricIDsForDateAll(metricIDs *uint64set.Set, date uint64, maxMetrics int) error {
// Extract all the metricIDs from (date, __name__=value)->metricIDs entries. // Extract all the metricIDs from (date, __name__=value)->metricIDs entries.
kb := kbPool.Get() kb := kbPool.Get()
defer kbPool.Put(kb) defer kbPool.Put(kb)
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs) kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
kb.B = encoding.MarshalUint64(kb.B, date) kb.B = encoding.MarshalUint64(kb.B, date)
kb.B = marshalTagValue(kb.B, nil) kb.B = marshalTagValue(kb.B, nil)
return is.updateMetricIDsForPrefix(kb.B, metricIDs, maxMetrics) var metricIDs uint64set.Set
if err := is.updateMetricIDsForPrefix(kb.B, &metricIDs, maxMetrics); err != nil {
return nil, err
}
if metricIDs.Len() == 0 {
// There are no metricIDs for the given date.
// This may be the case for old data where (data, __name__=value)->metricIDs entries weren't available.
return nil, errMissingMetricIDsForDate
}
return &metricIDs, nil
} }
func (is *indexSearch) updateMetricIDsAll(metricIDs *uint64set.Set, maxMetrics int) error { func (is *indexSearch) updateMetricIDsAll(metricIDs *uint64set.Set, maxMetrics int) error {