mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-02-09 15:27:11 +00:00
lib/storage: remove obsolete code
This commit is contained in:
parent
9715d9a3a8
commit
f3a7e6f6e3
4 changed files with 32 additions and 97 deletions
|
@ -103,9 +103,6 @@ type indexDB struct {
|
||||||
|
|
||||||
mustDrop uint64
|
mustDrop uint64
|
||||||
|
|
||||||
// Start date fully covered by per-day inverted index.
|
|
||||||
startDateForPerDayInvertedIndex uint64
|
|
||||||
|
|
||||||
name string
|
name string
|
||||||
tb *mergeset.Table
|
tb *mergeset.Table
|
||||||
|
|
||||||
|
@ -186,15 +183,6 @@ func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *working
|
||||||
return nil, fmt.Errorf("cannot load deleted metricIDs: %w", err)
|
return nil, fmt.Errorf("cannot load deleted metricIDs: %w", err)
|
||||||
}
|
}
|
||||||
db.setDeletedMetricIDs(dmis)
|
db.setDeletedMetricIDs(dmis)
|
||||||
|
|
||||||
is = db.getIndexSearch(0, 0, noDeadline)
|
|
||||||
date, err := is.getStartDateForPerDayInvertedIndex()
|
|
||||||
db.putIndexSearch(is)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("cannot obtain start date for per-day inverted index: %w", err)
|
|
||||||
}
|
|
||||||
db.startDateForPerDayInvertedIndex = date
|
|
||||||
|
|
||||||
return db, nil
|
return db, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1429,44 +1417,6 @@ func (db *indexDB) updateDeletedMetricIDs(metricIDs *uint64set.Set) {
|
||||||
db.deletedMetricIDsUpdateLock.Unlock()
|
db.deletedMetricIDsUpdateLock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (is *indexSearch) getStartDateForPerDayInvertedIndex() (uint64, error) {
|
|
||||||
minDate := uint64(0)
|
|
||||||
kb := &is.kb
|
|
||||||
ts := &is.ts
|
|
||||||
kb.B = append(kb.B[:0], nsPrefixDateTagToMetricIDs)
|
|
||||||
prefix := kb.B
|
|
||||||
ts.Seek(kb.B)
|
|
||||||
for ts.NextItem() {
|
|
||||||
item := ts.Item
|
|
||||||
if !bytes.HasPrefix(item, prefix) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
suffix := item[len(prefix):]
|
|
||||||
|
|
||||||
// Suffix must contain encoded 32-bit (accountID, projectID) plus 64-bit date.
|
|
||||||
// Summary 16 bytes.
|
|
||||||
if len(suffix) < 16 {
|
|
||||||
return 0, fmt.Errorf("unexpected (date, tag)->metricIDs row len; must be at least 16 bytes; got %d bytes", len(suffix))
|
|
||||||
}
|
|
||||||
apNum := encoding.UnmarshalUint64(suffix[:8])
|
|
||||||
date := encoding.UnmarshalUint64(suffix[8:])
|
|
||||||
if date < minDate {
|
|
||||||
minDate = date
|
|
||||||
}
|
|
||||||
// Seek for the next (accountID, projectID) in order to obtain min date there.
|
|
||||||
apNumNext := apNum + 1
|
|
||||||
if apNumNext > apNum {
|
|
||||||
kb.B = append(kb.B[:0], nsPrefixDateTagToMetricIDs)
|
|
||||||
kb.B = encoding.MarshalUint64(kb.B, apNumNext)
|
|
||||||
ts.Seek(kb.B)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err := ts.Error(); err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
return minDate, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (is *indexSearch) loadDeletedMetricIDs() (*uint64set.Set, error) {
|
func (is *indexSearch) loadDeletedMetricIDs() (*uint64set.Set, error) {
|
||||||
dmis := &uint64set.Set{}
|
dmis := &uint64set.Set{}
|
||||||
ts := &is.ts
|
ts := &is.ts
|
||||||
|
@ -1813,14 +1763,6 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCountOptimized(tfs *TagFilter
|
||||||
maxTimeRangeMetrics := 20 * maxMetrics
|
maxTimeRangeMetrics := 20 * maxMetrics
|
||||||
metricIDsForTimeRange, err := is.getMetricIDsForTimeRange(tr, maxTimeRangeMetrics+1)
|
metricIDsForTimeRange, err := is.getMetricIDsForTimeRange(tr, maxTimeRangeMetrics+1)
|
||||||
if err == errMissingMetricIDsForDate {
|
if err == errMissingMetricIDsForDate {
|
||||||
// Slow path: try to find the tag filter without maxMetrics adjustement.
|
|
||||||
minTf, minMetricIDs, err = is.getTagFilterWithMinMetricIDsCountAdaptive(tfs, maxMetrics)
|
|
||||||
if err == nil {
|
|
||||||
return minTf, minMetricIDs, nil
|
|
||||||
}
|
|
||||||
if err != errTooManyMetrics {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
return nil, nil, fmt.Errorf("cannot find tag filter matching less than %d time series; "+
|
return nil, nil, fmt.Errorf("cannot find tag filter matching less than %d time series; "+
|
||||||
"either increase -search.maxUniqueTimeseries or use more specific tag filters", maxMetrics)
|
"either increase -search.maxUniqueTimeseries or use more specific tag filters", maxMetrics)
|
||||||
}
|
}
|
||||||
|
@ -1830,15 +1772,6 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCountOptimized(tfs *TagFilter
|
||||||
if metricIDsForTimeRange.Len() <= maxTimeRangeMetrics {
|
if metricIDsForTimeRange.Len() <= maxTimeRangeMetrics {
|
||||||
return nil, metricIDsForTimeRange, nil
|
return nil, metricIDsForTimeRange, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Slow path: try to select the tag filter without maxMetrics adjustement.
|
|
||||||
minTf, minMetricIDs, err = is.getTagFilterWithMinMetricIDsCountAdaptive(tfs, maxMetrics)
|
|
||||||
if err == nil {
|
|
||||||
return minTf, minMetricIDs, nil
|
|
||||||
}
|
|
||||||
if err != errTooManyMetrics {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
return nil, nil, fmt.Errorf("more than %d time series found on the time range %s; either increase -search.maxUniqueTimeseries or shrink the time range",
|
return nil, nil, fmt.Errorf("more than %d time series found on the time range %s; either increase -search.maxUniqueTimeseries or shrink the time range",
|
||||||
maxMetrics, tr.String())
|
maxMetrics, tr.String())
|
||||||
}
|
}
|
||||||
|
@ -2531,7 +2464,7 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set
|
||||||
atomic.AddUint64(&is.db.dateRangeSearchCalls, 1)
|
atomic.AddUint64(&is.db.dateRangeSearchCalls, 1)
|
||||||
minDate := uint64(tr.MinTimestamp) / msecPerDay
|
minDate := uint64(tr.MinTimestamp) / msecPerDay
|
||||||
maxDate := uint64(tr.MaxTimestamp) / msecPerDay
|
maxDate := uint64(tr.MaxTimestamp) / msecPerDay
|
||||||
if minDate < is.db.startDateForPerDayInvertedIndex || maxDate < minDate {
|
if maxDate < minDate {
|
||||||
// Per-day inverted index doesn't cover the selected date range.
|
// Per-day inverted index doesn't cover the selected date range.
|
||||||
return errFallbackToMetricNameMatch
|
return errFallbackToMetricNameMatch
|
||||||
}
|
}
|
||||||
|
@ -2660,8 +2593,6 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == errMissingMetricIDsForDate {
|
if err == errMissingMetricIDsForDate {
|
||||||
// Zero time series were written on the given date.
|
// Zero time series were written on the given date.
|
||||||
// It is OK, since (date, metricID) entries must exist for the given date
|
|
||||||
// according to startDateForPerDayInvertedIndex.
|
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("cannot obtain all the metricIDs: %w", err)
|
return nil, fmt.Errorf("cannot obtain all the metricIDs: %w", err)
|
||||||
|
@ -2881,11 +2812,6 @@ func (is *indexSearch) getMetricIDsForDate(date uint64, maxMetrics int) (*uint64
|
||||||
if err := is.updateMetricIDsForPrefix(kb.B, &metricIDs, maxMetrics); err != nil {
|
if err := is.updateMetricIDsForPrefix(kb.B, &metricIDs, maxMetrics); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if metricIDs.Len() == 0 {
|
|
||||||
// There are no metricIDs for the given date.
|
|
||||||
// This may be the case for old data where (data, __name__=value)->metricIDs entries weren't available.
|
|
||||||
return nil, errMissingMetricIDsForDate
|
|
||||||
}
|
|
||||||
return &metricIDs, nil
|
return &metricIDs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -13,6 +13,7 @@ import (
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1556,6 +1557,8 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
||||||
now := uint64(timestampFromTime(theDay))
|
now := uint64(timestampFromTime(theDay))
|
||||||
baseDate := now / msecPerDay
|
baseDate := now / msecPerDay
|
||||||
var metricNameBuf []byte
|
var metricNameBuf []byte
|
||||||
|
perDayMetricIDs := make(map[uint64]*uint64set.Set)
|
||||||
|
var allMetricIDs uint64set.Set
|
||||||
for day := 0; day < days; day++ {
|
for day := 0; day < days; day++ {
|
||||||
var tsids []TSID
|
var tsids []TSID
|
||||||
for metric := 0; metric < metricsPerDay; metric++ {
|
for metric := 0; metric < metricsPerDay; metric++ {
|
||||||
|
@ -1593,17 +1596,44 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
||||||
|
|
||||||
// Add the metrics to the per-day stores
|
// Add the metrics to the per-day stores
|
||||||
date := baseDate - uint64(day)
|
date := baseDate - uint64(day)
|
||||||
|
var metricIDs uint64set.Set
|
||||||
for i := range tsids {
|
for i := range tsids {
|
||||||
tsid := &tsids[i]
|
tsid := &tsids[i]
|
||||||
|
metricIDs.Add(tsid.MetricID)
|
||||||
if err := is.storeDateMetricID(date, tsid.MetricID); err != nil {
|
if err := is.storeDateMetricID(date, tsid.MetricID); err != nil {
|
||||||
t.Fatalf("error in storeDateMetricID(%d, %d): %s", date, tsid.MetricID, err)
|
t.Fatalf("error in storeDateMetricID(%d, %d): %s", date, tsid.MetricID, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
allMetricIDs.Union(&metricIDs)
|
||||||
|
perDayMetricIDs[date] = &metricIDs
|
||||||
}
|
}
|
||||||
|
|
||||||
// Flush index to disk, so it becomes visible for search
|
// Flush index to disk, so it becomes visible for search
|
||||||
db.tb.DebugFlush()
|
db.tb.DebugFlush()
|
||||||
|
|
||||||
|
is2 := db.getIndexSearch(accountID, projectID, noDeadline)
|
||||||
|
defer db.putIndexSearch(is2)
|
||||||
|
|
||||||
|
// Check that all the metrics are found for all the days.
|
||||||
|
for date := baseDate - days + 1; date <= baseDate; date++ {
|
||||||
|
metricIDs, err := is2.getMetricIDsForDate(date, metricsPerDay)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %s", err)
|
||||||
|
}
|
||||||
|
if !perDayMetricIDs[date].Equal(metricIDs) {
|
||||||
|
t.Fatalf("unexpected metricIDs found;\ngot\n%d\nwant\n%d", metricIDs.AppendTo(nil), perDayMetricIDs[date].AppendTo(nil))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that all the metrics are found in updateMetricIDsAll
|
||||||
|
var metricIDs uint64set.Set
|
||||||
|
if err := is2.updateMetricIDsAll(&metricIDs, metricsPerDay*days); err != nil {
|
||||||
|
t.Fatalf("unexpected error: %s", err)
|
||||||
|
}
|
||||||
|
if !allMetricIDs.Equal(&metricIDs) {
|
||||||
|
t.Fatalf("unexpected metricIDs found;\ngot\n%d\nwant\n%d", metricIDs.AppendTo(nil), allMetricIDs.AppendTo(nil))
|
||||||
|
}
|
||||||
|
|
||||||
// Create a filter that will match series that occur across multiple days
|
// Create a filter that will match series that occur across multiple days
|
||||||
tfs := NewTagFilters(accountID, projectID)
|
tfs := NewTagFilters(accountID, projectID)
|
||||||
if err := tfs.Add([]byte("constant"), []byte("const"), false, false); err != nil {
|
if err := tfs.Add([]byte("constant"), []byte("const"), false, false); err != nil {
|
||||||
|
|
|
@ -77,16 +77,7 @@ func TestSearchQueryMarshalUnmarshal(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSearch(t *testing.T) {
|
func TestSearch(t *testing.T) {
|
||||||
t.Run("global_inverted_index", func(t *testing.T) {
|
path := fmt.Sprintf("TestSearch")
|
||||||
testSearchGeneric(t, false)
|
|
||||||
})
|
|
||||||
t.Run("perday_inverted_index", func(t *testing.T) {
|
|
||||||
testSearchGeneric(t, true)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func testSearchGeneric(t *testing.T, forcePerDayInvertedIndex bool) {
|
|
||||||
path := fmt.Sprintf("TestSearch_%v", forcePerDayInvertedIndex)
|
|
||||||
st, err := OpenStorage(path, 0)
|
st, err := OpenStorage(path, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("cannot open storage %q: %s", path, err)
|
t.Fatalf("cannot open storage %q: %s", path, err)
|
||||||
|
@ -141,13 +132,6 @@ func testSearchGeneric(t *testing.T, forcePerDayInvertedIndex bool) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("cannot re-open storage %q: %s", path, err)
|
t.Fatalf("cannot re-open storage %q: %s", path, err)
|
||||||
}
|
}
|
||||||
if forcePerDayInvertedIndex {
|
|
||||||
idb := st.idb()
|
|
||||||
idb.startDateForPerDayInvertedIndex = 0
|
|
||||||
idb.doExtDB(func(extDB *indexDB) {
|
|
||||||
extDB.startDateForPerDayInvertedIndex = 0
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Run search.
|
// Run search.
|
||||||
tr := TimeRange{
|
tr := TimeRange{
|
||||||
|
|
|
@ -1857,11 +1857,6 @@ func openIndexDBTables(path string, metricIDCache, metricNameCache, tsidCache *w
|
||||||
return nil, nil, fmt.Errorf("cannot open prev indexdb table at %q: %w", prevPath, err)
|
return nil, nil, fmt.Errorf("cannot open prev indexdb table at %q: %w", prevPath, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Adjust startDateForPerDayInvertedIndex for the previous index.
|
|
||||||
if prev.startDateForPerDayInvertedIndex > curr.startDateForPerDayInvertedIndex {
|
|
||||||
prev.startDateForPerDayInvertedIndex = curr.startDateForPerDayInvertedIndex
|
|
||||||
}
|
|
||||||
|
|
||||||
return curr, prev, nil
|
return curr, prev, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue