lib/storage: remove obsolete code

This commit is contained in:
Aliaksandr Valialkin 2020-11-02 19:11:48 +02:00
parent d396c265a6
commit fe289331dd
4 changed files with 32 additions and 89 deletions

View file

@ -103,9 +103,6 @@ type indexDB struct {
mustDrop uint64
// Start date fully covered by per-day inverted index.
startDateForPerDayInvertedIndex uint64
name string
tb *mergeset.Table
@ -186,15 +183,6 @@ func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *working
return nil, fmt.Errorf("cannot load deleted metricIDs: %w", err)
}
db.setDeletedMetricIDs(dmis)
is = db.getIndexSearch(noDeadline)
date, err := is.getStartDateForPerDayInvertedIndex()
db.putIndexSearch(is)
if err != nil {
return nil, fmt.Errorf("cannot obtain start date for per-day inverted index: %w", err)
}
db.startDateForPerDayInvertedIndex = date
return db, nil
}
@ -1406,36 +1394,6 @@ func (db *indexDB) updateDeletedMetricIDs(metricIDs *uint64set.Set) {
db.deletedMetricIDsUpdateLock.Unlock()
}
func (is *indexSearch) getStartDateForPerDayInvertedIndex() (uint64, error) {
kb := &is.kb
ts := &is.ts
kb.B = append(kb.B[:0], nsPrefixDateTagToMetricIDs)
prefix := kb.B
ts.Seek(kb.B)
if ts.NextItem() {
item := ts.Item
if !bytes.HasPrefix(item, prefix) {
// The databse doesn't contain per-day inverted index yet.
// Return the minimum possible date, i.e. 0.
return 0, nil
}
suffix := item[len(prefix):]
// Suffix must contain encoded 64-bit date.
if len(suffix) < 8 {
return 0, fmt.Errorf("unexpected (date, tag)->metricIDs row len; must be at least 8 bytes; got %d bytes", len(suffix))
}
minDate := encoding.UnmarshalUint64(suffix)
return minDate, nil
}
if err := ts.Error(); err != nil {
return 0, err
}
// There are no (date,tag)->metricIDs entries in the database yet.
// Return the minimum possible date, i.e. 0.
return 0, nil
}
func (is *indexSearch) loadDeletedMetricIDs() (*uint64set.Set, error) {
dmis := &uint64set.Set{}
ts := &is.ts
@ -1780,14 +1738,6 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCountOptimized(tfs *TagFilter
maxTimeRangeMetrics := 20 * maxMetrics
metricIDsForTimeRange, err := is.getMetricIDsForTimeRange(tr, maxTimeRangeMetrics+1)
if err == errMissingMetricIDsForDate {
// Slow path: try to find the tag filter without maxMetrics adjustement.
minTf, minMetricIDs, err = is.getTagFilterWithMinMetricIDsCountAdaptive(tfs, maxMetrics)
if err == nil {
return minTf, minMetricIDs, nil
}
if err != errTooManyMetrics {
return nil, nil, err
}
return nil, nil, fmt.Errorf("cannot find tag filter matching less than %d time series; "+
"either increase -search.maxUniqueTimeseries or use more specific tag filters", maxMetrics)
}
@ -1797,15 +1747,6 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCountOptimized(tfs *TagFilter
if metricIDsForTimeRange.Len() <= maxTimeRangeMetrics {
return nil, metricIDsForTimeRange, nil
}
// Slow path: try to select the tag filter without maxMetrics adjustement.
minTf, minMetricIDs, err = is.getTagFilterWithMinMetricIDsCountAdaptive(tfs, maxMetrics)
if err == nil {
return minTf, minMetricIDs, nil
}
if err != errTooManyMetrics {
return nil, nil, err
}
return nil, nil, fmt.Errorf("more than %d time series found on the time range %s; either increase -search.maxUniqueTimeseries or shrink the time range",
maxMetrics, tr.String())
}
@ -2498,7 +2439,7 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set
atomic.AddUint64(&is.db.dateRangeSearchCalls, 1)
minDate := uint64(tr.MinTimestamp) / msecPerDay
maxDate := uint64(tr.MaxTimestamp) / msecPerDay
if minDate < is.db.startDateForPerDayInvertedIndex || maxDate < minDate {
if maxDate < minDate {
// Per-day inverted index doesn't cover the selected date range.
return errFallbackToMetricNameMatch
}
@ -2627,8 +2568,6 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
if err != nil {
if err == errMissingMetricIDsForDate {
// Zero time series were written on the given date.
// It is OK, since (date, metricID) entries must exist for the given date
// according to startDateForPerDayInvertedIndex.
return nil, nil
}
return nil, fmt.Errorf("cannot obtain all the metricIDs: %w", err)
@ -2848,11 +2787,6 @@ func (is *indexSearch) getMetricIDsForDate(date uint64, maxMetrics int) (*uint64
if err := is.updateMetricIDsForPrefix(kb.B, &metricIDs, maxMetrics); err != nil {
return nil, err
}
if metricIDs.Len() == 0 {
// There are no metricIDs for the given date.
// This may be the case for old data where (data, __name__=value)->metricIDs entries weren't available.
return nil, errMissingMetricIDsForDate
}
return &metricIDs, nil
}

View file

@ -13,6 +13,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
)
@ -1484,6 +1485,8 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
now := uint64(timestampFromTime(theDay))
baseDate := now / msecPerDay
var metricNameBuf []byte
perDayMetricIDs := make(map[uint64]*uint64set.Set)
var allMetricIDs uint64set.Set
for day := 0; day < days; day++ {
var tsids []TSID
for metric := 0; metric < metricsPerDay; metric++ {
@ -1513,17 +1516,44 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
// Add the metrics to the per-day stores
date := baseDate - uint64(day)
var metricIDs uint64set.Set
for i := range tsids {
tsid := &tsids[i]
metricIDs.Add(tsid.MetricID)
if err := is.storeDateMetricID(date, tsid.MetricID); err != nil {
t.Fatalf("error in storeDateMetricID(%d, %d): %s", date, tsid.MetricID, err)
}
}
allMetricIDs.Union(&metricIDs)
perDayMetricIDs[date] = &metricIDs
}
// Flush index to disk, so it becomes visible for search
db.tb.DebugFlush()
is2 := db.getIndexSearch(noDeadline)
defer db.putIndexSearch(is2)
// Check that all the metrics are found for all the days.
for date := baseDate - days + 1; date <= baseDate; date++ {
metricIDs, err := is2.getMetricIDsForDate(date, metricsPerDay)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if !perDayMetricIDs[date].Equal(metricIDs) {
t.Fatalf("unexpected metricIDs found;\ngot\n%d\nwant\n%d", metricIDs.AppendTo(nil), perDayMetricIDs[date].AppendTo(nil))
}
}
// Check that all the metrics are found in updateMetricIDsAll
var metricIDs uint64set.Set
if err := is2.updateMetricIDsAll(&metricIDs, metricsPerDay*days); err != nil {
t.Fatalf("unexpected error: %s", err)
}
if !allMetricIDs.Equal(&metricIDs) {
t.Fatalf("unexpected metricIDs found;\ngot\n%d\nwant\n%d", metricIDs.AppendTo(nil), allMetricIDs.AppendTo(nil))
}
// Create a filter that will match series that occur across multiple days
tfs := NewTagFilters()
if err := tfs.Add([]byte("constant"), []byte("const"), false, false); err != nil {

View file

@ -71,16 +71,7 @@ func TestSearchQueryMarshalUnmarshal(t *testing.T) {
}
func TestSearch(t *testing.T) {
t.Run("global_inverted_index", func(t *testing.T) {
testSearchGeneric(t, false)
})
t.Run("perday_inverted_index", func(t *testing.T) {
testSearchGeneric(t, true)
})
}
func testSearchGeneric(t *testing.T, forcePerDayInvertedIndex bool) {
path := fmt.Sprintf("TestSearch_%v", forcePerDayInvertedIndex)
path := fmt.Sprintf("TestSearch")
st, err := OpenStorage(path, 0)
if err != nil {
t.Fatalf("cannot open storage %q: %s", path, err)
@ -134,13 +125,6 @@ func testSearchGeneric(t *testing.T, forcePerDayInvertedIndex bool) {
if err != nil {
t.Fatalf("cannot re-open storage %q: %s", path, err)
}
if forcePerDayInvertedIndex {
idb := st.idb()
idb.startDateForPerDayInvertedIndex = 0
idb.doExtDB(func(extDB *indexDB) {
extDB.startDateForPerDayInvertedIndex = 0
})
}
// Run search.
tr := TimeRange{

View file

@ -1739,11 +1739,6 @@ func openIndexDBTables(path string, metricIDCache, metricNameCache, tsidCache *w
return nil, nil, fmt.Errorf("cannot open prev indexdb table at %q: %w", prevPath, err)
}
// Adjust startDateForPerDayInvertedIndex for the previous index.
if prev.startDateForPerDayInvertedIndex > curr.startDateForPerDayInvertedIndex {
prev.startDateForPerDayInvertedIndex = curr.startDateForPerDayInvertedIndex
}
return curr, prev, nil
}