lib/storage: remove inmemory index for recent hour, since it uses too much memory

Production workload shows that the index requires ~4Kb of RAM per active time series.
This is too much for high number of active time series, so let's delete this index.

Now the queries should fall back to the index for the current day instead of the index
for the recent hour. The query performance for the current day index should be good enough
given the 100M rows/sec scan speed per CPU core.
This commit is contained in:
Aliaksandr Valialkin 2019-11-13 17:58:05 +02:00
parent 33895d4a0f
commit 86a1cd700b
8 changed files with 18 additions and 487 deletions

View file

@ -25,8 +25,6 @@ var (
// DataPath is a path to storage data.
DataPath = flag.String("storageDataPath", "victoria-metrics-data", "Path to storage data")
disableRecentHourIndex = flag.Bool("disableRecentHourIndex", false, "Whether to disable inmemory inverted index for recent hour. "+
"This may be useful in order to reduce memory usage when working with high number of time series")
bigMergeConcurrency = flag.Int("bigMergeConcurrency", 0, "The maximum number of CPU cores to use for big merges. Default value is used if set to 0")
smallMergeConcurrency = flag.Int("smallMergeConcurrency", 0, "The maximum number of CPU cores to use for small merges. Default value is used if set to 0")
)
@ -45,9 +43,6 @@ func InitWithoutMetrics() {
logger.Fatalf("invalid `-precisionBits`: %s", err)
}
if *disableRecentHourIndex {
storage.DisableRecentHourIndex()
}
storage.SetBigMergeWorkersCount(*bigMergeConcurrency)
storage.SetSmallMergeWorkersCount(*smallMergeConcurrency)
@ -412,25 +407,6 @@ func registerStorageMetrics() {
return float64(idbm().ItemsCount)
})
metrics.NewGauge(`vm_recent_hour_inverted_index_entries`, func() float64 {
return float64(m().RecentHourInvertedIndexSize)
})
metrics.NewGauge(`vm_recent_hour_inverted_index_size_bytes`, func() float64 {
return float64(m().RecentHourInvertedIndexSizeBytes)
})
metrics.NewGauge(`vm_recent_hour_inverted_index_unique_tag_pairs`, func() float64 {
return float64(m().RecentHourInvertedIndexUniqueTagPairsSize)
})
metrics.NewGauge(`vm_recent_hour_inverted_index_pending_metric_ids`, func() float64 {
return float64(m().RecentHourInvertedIndexPendingMetricIDsSize)
})
metrics.NewGauge(`vm_recent_hour_inverted_index_search_calls_total`, func() float64 {
return float64(idbm().RecentHourInvertedIndexSearchCalls)
})
metrics.NewGauge(`vm_recent_hour_inverted_index_search_hits_total`, func() float64 {
return float64(idbm().RecentHourInvertedIndexSearchHits)
})
metrics.NewGauge(`vm_date_range_search_calls_total`, func() float64 {
return float64(idbm().DateRangeSearchCalls)
})
@ -491,6 +467,9 @@ func registerStorageMetrics() {
metrics.NewGauge(`vm_cache_size_bytes{type="storage/metricName"}`, func() float64 {
return float64(m().MetricNameCacheSizeBytes)
})
metrics.NewGauge(`vm_cache_size_bytes{type="storage/date_metricID"}`, func() float64 {
return float64(m().DateMetricIDCacheSizeBytes)
})
metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/tagFilters"}`, func() float64 {
return float64(idbm().TagCacheSizeBytes)
})

View file

@ -95,12 +95,6 @@ type indexDB struct {
// The number of successful searches for metric ids by days.
dateMetricIDsSearchHits uint64
// The number of calls for recent hour searches over inverted index.
recentHourInvertedIndexSearchCalls uint64
// The number of hits for recent hour searches over inverted index.
recentHourInvertedIndexSearchHits uint64
// The number of calls for date range searches.
dateRangeSearchCalls uint64
@ -231,9 +225,6 @@ type IndexDBMetrics struct {
DateMetricIDsSearchCalls uint64
DateMetricIDsSearchHits uint64
RecentHourInvertedIndexSearchCalls uint64
RecentHourInvertedIndexSearchHits uint64
DateRangeSearchCalls uint64
DateRangeSearchHits uint64
@ -275,9 +266,6 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
m.DateMetricIDsSearchCalls += atomic.LoadUint64(&db.dateMetricIDsSearchCalls)
m.DateMetricIDsSearchHits += atomic.LoadUint64(&db.dateMetricIDsSearchHits)
m.RecentHourInvertedIndexSearchCalls += atomic.LoadUint64(&db.recentHourInvertedIndexSearchCalls)
m.RecentHourInvertedIndexSearchHits += atomic.LoadUint64(&db.recentHourInvertedIndexSearchHits)
m.DateRangeSearchCalls += atomic.LoadUint64(&db.dateRangeSearchCalls)
m.DateRangeSearchHits += atomic.LoadUint64(&db.dateRangeSearchHits)
@ -1667,10 +1655,6 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf
return bytes.Compare(a.prefix, b.prefix) < 0
})
if is.tryUpdatingMetricIDsForRecentHour(metricIDs, tfs, tr) {
// Fast path: found metricIDs in the inmemoryInvertedIndex for the last hour.
return nil
}
ok, err := is.tryUpdatingMetricIDsForDateRange(metricIDs, tfs, tr, maxMetrics)
if err != nil {
return err
@ -2193,38 +2177,6 @@ func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int)
return nil, false
}
func (is *indexSearch) tryUpdatingMetricIDsForRecentHour(metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange) bool {
if disableRecentHourIndex {
return false
}
atomic.AddUint64(&is.db.recentHourInvertedIndexSearchCalls, 1)
minHour := uint64(tr.MinTimestamp) / msecPerHour
maxHour := uint64(tr.MaxTimestamp) / msecPerHour
hmCurr := is.db.currHourMetricIDs.Load().(*hourMetricIDs)
if maxHour == hmCurr.hour && minHour == maxHour && hmCurr.isFull {
// The tr fits the current hour.
hmCurr.iidx.UpdateMetricIDsForTagFilters(metricIDs, hmCurr.m, tfs)
atomic.AddUint64(&is.db.recentHourInvertedIndexSearchHits, 1)
return true
}
hmPrev := is.db.prevHourMetricIDs.Load().(*hourMetricIDs)
if maxHour == hmPrev.hour && minHour == maxHour && hmPrev.isFull {
// The tr fits the previous hour.
hmPrev.iidx.UpdateMetricIDsForTagFilters(metricIDs, hmPrev.m, tfs)
atomic.AddUint64(&is.db.recentHourInvertedIndexSearchHits, 1)
return true
}
if maxHour == hmCurr.hour && minHour == hmPrev.hour && hmCurr.isFull && hmPrev.isFull {
// The tr spans the previous and the current hours.
hmPrev.iidx.UpdateMetricIDsForTagFilters(metricIDs, hmPrev.m, tfs)
hmCurr.iidx.UpdateMetricIDsForTagFilters(metricIDs, hmCurr.m, tfs)
atomic.AddUint64(&is.db.recentHourInvertedIndexSearchHits, 1)
return true
}
return false
}
func (db *indexDB) storeDateMetricID(date, metricID uint64) error {
is := db.getIndexSearch()
ok, err := is.hasDateMetricID(date, metricID)

View file

@ -1418,10 +1418,6 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
prevMetricIDs.m.Add(tsids[i].MetricID)
currMetricIDs.m.Add(tsids[i].MetricID)
}
prevMetricIDs.iidx = newInmemoryInvertedIndex()
prevMetricIDs.iidx.MustUpdate(db, prevMetricIDs.m)
currMetricIDs.iidx = newInmemoryInvertedIndex()
currMetricIDs.iidx.MustUpdate(db, currMetricIDs.m)
}
}

View file

@ -1,283 +0,0 @@
package storage
import (
"bytes"
"fmt"
"io"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
)
type inmemoryInvertedIndex struct {
mu sync.RWMutex
m map[string]*uint64set.Set
pendingMetricIDs []uint64
}
func (iidx *inmemoryInvertedIndex) Marshal(dst []byte) []byte {
iidx.mu.RLock()
defer iidx.mu.RUnlock()
// Marshal iidx.m
var metricIDs []uint64
dst = encoding.MarshalUint64(dst, uint64(len(iidx.m)))
for k, v := range iidx.m {
dst = encoding.MarshalBytes(dst, []byte(k))
metricIDs = v.AppendTo(metricIDs[:0])
dst = marshalMetricIDs(dst, metricIDs)
}
// Marshal iidx.pendingMetricIDs
dst = marshalMetricIDs(dst, iidx.pendingMetricIDs)
return dst
}
func (iidx *inmemoryInvertedIndex) Unmarshal(src []byte) ([]byte, error) {
iidx.mu.Lock()
defer iidx.mu.Unlock()
// Unmarshal iidx.m
if len(src) < 8 {
return src, fmt.Errorf("cannot read len(iidx.m) from %d bytes; want at least 8 bytes", len(src))
}
mLen := int(encoding.UnmarshalUint64(src))
src = src[8:]
m := make(map[string]*uint64set.Set, mLen)
var metricIDs []uint64
for i := 0; i < mLen; i++ {
tail, k, err := encoding.UnmarshalBytes(src)
if err != nil {
return tail, fmt.Errorf("cannot unmarshal key #%d for iidx.m: %s", i, err)
}
src = tail
tail, metricIDs, err = unmarshalMetricIDs(metricIDs[:0], src)
if err != nil {
return tail, fmt.Errorf("cannot unmarshal value #%d for iidx.m: %s", i, err)
}
src = tail
var v uint64set.Set
for _, metricID := range metricIDs {
v.Add(metricID)
}
m[string(k)] = &v
}
iidx.m = m
// Unmarshal iidx.pendingMetricIDs
var err error
var tail []byte
tail, metricIDs, err = unmarshalMetricIDs(metricIDs[:0], src)
if err != nil {
return tail, fmt.Errorf("cannot unmarshal iidx.pendingMetricIDs: %s", err)
}
src = tail
iidx.pendingMetricIDs = append(iidx.pendingMetricIDs[:0], metricIDs...)
return src, nil
}
func marshalMetricIDs(dst []byte, metricIDs []uint64) []byte {
dst = encoding.MarshalUint64(dst, uint64(len(metricIDs)))
for _, metricID := range metricIDs {
dst = encoding.MarshalUint64(dst, metricID)
}
return dst
}
func unmarshalMetricIDs(dst []uint64, src []byte) ([]byte, []uint64, error) {
if len(src) < 8 {
return src, dst, fmt.Errorf("cannot unmarshal metricIDs len from %d bytes; want at least 8 bytes", len(src))
}
metricIDsLen := int(encoding.UnmarshalUint64(src))
src = src[8:]
if len(src) < 8*metricIDsLen {
return src, dst, fmt.Errorf("not enough bytes for unmarshaling %d metricIDs; want %d bytes; got %d bytes", metricIDsLen, 8*metricIDsLen, len(src))
}
for i := 0; i < metricIDsLen; i++ {
metricID := encoding.UnmarshalUint64(src)
src = src[8:]
dst = append(dst, metricID)
}
return src, dst, nil
}
func (iidx *inmemoryInvertedIndex) SizeBytes() uint64 {
n := uint64(0)
iidx.mu.RLock()
for k, v := range iidx.m {
n += uint64(len(k))
n += v.SizeBytes()
}
n += uint64(len(iidx.pendingMetricIDs)) * 8
iidx.mu.RUnlock()
return n
}
func (iidx *inmemoryInvertedIndex) GetUniqueTagPairsLen() int {
if iidx == nil {
return 0
}
iidx.mu.RLock()
n := len(iidx.m)
iidx.mu.RUnlock()
return n
}
func (iidx *inmemoryInvertedIndex) GetEntriesCount() int {
if iidx == nil {
return 0
}
n := 0
iidx.mu.RLock()
for _, v := range iidx.m {
n += v.Len()
}
iidx.mu.RUnlock()
return n
}
func (iidx *inmemoryInvertedIndex) GetPendingMetricIDsLen() int {
if iidx == nil {
return 0
}
iidx.mu.RLock()
n := len(iidx.pendingMetricIDs)
iidx.mu.RUnlock()
return n
}
func newInmemoryInvertedIndex() *inmemoryInvertedIndex {
return &inmemoryInvertedIndex{
m: make(map[string]*uint64set.Set),
}
}
func (iidx *inmemoryInvertedIndex) MustUpdate(idb *indexDB, src *uint64set.Set) {
metricIDs := src.AppendTo(nil)
iidx.mu.Lock()
iidx.pendingMetricIDs = append(iidx.pendingMetricIDs, metricIDs...)
if err := iidx.addPendingEntriesLocked(idb); err != nil {
logger.Panicf("FATAL: cannot update inmemoryInvertedIndex with pendingMetricIDs: %s", err)
}
iidx.mu.Unlock()
}
func (iidx *inmemoryInvertedIndex) AddMetricID(idb *indexDB, metricID uint64) {
iidx.mu.Lock()
iidx.pendingMetricIDs = append(iidx.pendingMetricIDs, metricID)
if err := iidx.addPendingEntriesLocked(idb); err != nil {
logger.Panicf("FATAL: cannot update inmemoryInvertedIndex with pendingMetricIDs: %s", err)
}
iidx.mu.Unlock()
}
func (iidx *inmemoryInvertedIndex) UpdateMetricIDsForTagFilters(metricIDs, allMetricIDs *uint64set.Set, tfs *TagFilters) {
if iidx == nil {
return
}
var result *uint64set.Set
var tfFirst *tagFilter
for i := range tfs.tfs {
if tfs.tfs[i].isNegative {
continue
}
tfFirst = &tfs.tfs[i]
break
}
iidx.mu.RLock()
defer iidx.mu.RUnlock()
if tfFirst == nil {
result = allMetricIDs.Clone()
} else {
result = iidx.getMetricIDsForTagFilterLocked(tfFirst, tfs.commonPrefix)
}
for i := range tfs.tfs {
tf := &tfs.tfs[i]
if tf == tfFirst {
continue
}
m := iidx.getMetricIDsForTagFilterLocked(tf, tfs.commonPrefix)
if tf.isNegative {
result.Subtract(m)
} else {
result.Intersect(m)
}
if result.Len() == 0 {
return
}
}
metricIDs.Union(result)
}
func (iidx *inmemoryInvertedIndex) getMetricIDsForTagFilterLocked(tf *tagFilter, commonPrefix []byte) *uint64set.Set {
if !bytes.HasPrefix(tf.prefix, commonPrefix) {
logger.Panicf("BUG: tf.prefix must start with commonPrefix=%q; got %q", commonPrefix, tf.prefix)
}
prefix := tf.prefix[len(commonPrefix):]
var m uint64set.Set
kb := kbPool.Get()
defer kbPool.Put(kb)
for k, v := range iidx.m {
if len(k) < len(prefix) || k[:len(prefix)] != string(prefix) {
continue
}
kb.B = append(kb.B[:0], k[len(prefix):]...)
ok, err := tf.matchSuffix(kb.B)
if err != nil {
logger.Panicf("BUG: unexpected error from matchSuffix(%q): %s", kb.B, err)
}
if !ok {
continue
}
m.Union(v)
}
return &m
}
func (iidx *inmemoryInvertedIndex) addPendingEntriesLocked(idb *indexDB) error {
metricIDs := iidx.pendingMetricIDs
iidx.pendingMetricIDs = iidx.pendingMetricIDs[:0]
kb := kbPool.Get()
defer kbPool.Put(kb)
mn := GetMetricName()
defer PutMetricName(mn)
for _, metricID := range metricIDs {
var err error
kb.B, err = idb.searchMetricName(kb.B[:0], metricID)
if err != nil {
if err == io.EOF {
iidx.pendingMetricIDs = append(iidx.pendingMetricIDs, metricID)
continue
}
return fmt.Errorf("cannot find metricName by metricID %d: %s", metricID, err)
}
if err = mn.Unmarshal(kb.B); err != nil {
return fmt.Errorf("cannot unmarshal metricName %q: %s", kb.B, err)
}
kb.B = marshalTagValue(kb.B[:0], nil)
kb.B = marshalTagValue(kb.B, mn.MetricGroup)
iidx.addMetricIDLocked(kb.B, metricID)
for i := range mn.Tags {
kb.B = mn.Tags[i].Marshal(kb.B[:0])
iidx.addMetricIDLocked(kb.B, metricID)
}
}
return nil
}
func (iidx *inmemoryInvertedIndex) addMetricIDLocked(key []byte, metricID uint64) {
v := iidx.m[string(key)]
if v == nil {
v = &uint64set.Set{}
iidx.m[string(key)] = v
}
v.Add(metricID)
}

View file

@ -1,44 +0,0 @@
package storage
import (
"fmt"
"reflect"
"testing"
)
func TestInmemoryInvertedIndexMarshalUnmarshal(t *testing.T) {
iidx := newInmemoryInvertedIndex()
const keysCount = 100
const metricIDsCount = 10000
for i := 0; i < metricIDsCount; i++ {
k := fmt.Sprintf("key %d", i%keysCount)
iidx.addMetricIDLocked([]byte(k), uint64(i))
}
for i := 0; i < 10; i++ {
metricID := uint64(i * 43)
iidx.pendingMetricIDs = append(iidx.pendingMetricIDs, metricID)
}
data := iidx.Marshal(nil)
iidx2 := newInmemoryInvertedIndex()
tail, err := iidx2.Unmarshal(data)
if err != nil {
t.Fatalf("cannot unmarshal iidx: %s", err)
}
if len(tail) != 0 {
t.Fatalf("unexpected tail left after iidx unmarshaling: %d bytes", len(tail))
}
if len(iidx.m) != len(iidx2.m) {
t.Fatalf("unexpected len(iidx2.m); got %d; want %d", len(iidx2.m), len(iidx.m))
}
if !reflect.DeepEqual(iidx.pendingMetricIDs, iidx2.pendingMetricIDs) {
t.Fatalf("unexpected pendingMetricIDs; got\n%d;\nwant\n%d", iidx2.pendingMetricIDs, iidx.pendingMetricIDs)
}
for k, v := range iidx.m {
v2 := iidx2.m[k]
if !v.Equal(v2) {
t.Fatalf("unexpected set for key %q", k)
}
}
}

View file

@ -72,21 +72,15 @@ func TestSearchQueryMarshalUnmarshal(t *testing.T) {
func TestSearch(t *testing.T) {
t.Run("global_inverted_index", func(t *testing.T) {
testSearchGeneric(t, false, false)
testSearchGeneric(t, false)
})
t.Run("perday_inverted_index", func(t *testing.T) {
testSearchGeneric(t, false, true)
})
t.Run("recent_hour_global_inverted_index", func(t *testing.T) {
testSearchGeneric(t, true, false)
})
t.Run("recent_hour_perday_inverted_index", func(t *testing.T) {
testSearchGeneric(t, true, true)
testSearchGeneric(t, true)
})
}
func testSearchGeneric(t *testing.T, forceRecentHourInvertedIndex, forcePerDayInvertedIndex bool) {
path := fmt.Sprintf("TestSearch_%v_%v", forceRecentHourInvertedIndex, forcePerDayInvertedIndex)
func testSearchGeneric(t *testing.T, forcePerDayInvertedIndex bool) {
path := fmt.Sprintf("TestSearch_%v", forcePerDayInvertedIndex)
st, err := OpenStorage(path, 0)
if err != nil {
t.Fatalf("cannot open storage %q: %s", path, err)
@ -147,10 +141,6 @@ func testSearchGeneric(t *testing.T, forceRecentHourInvertedIndex, forcePerDayIn
extDB.startDateForPerDayInvertedIndex = 0
})
}
if forceRecentHourInvertedIndex {
hm := st.currHourMetricIDs.Load().(*hourMetricIDs)
hm.isFull = true
}
// Run search.
tr := TimeRange{

View file

@ -27,17 +27,6 @@ import (
const maxRetentionMonths = 12 * 100
var disableRecentHourIndex = false
// DisableRecentHourIndex disables in-memory inverted index for recent hour.
//
// This may be useful in order to save RAM for high cardinality data.
//
// This function must be called before OpenStorage.
func DisableRecentHourIndex() {
disableRecentHourIndex = true
}
// Storage represents TSDB storage.
type Storage struct {
// Atomic counters must go at the top of the structure in order to properly align by 8 bytes on 32-bit archs.
@ -317,16 +306,12 @@ type Metrics struct {
MetricNameCacheCollisions uint64
DateMetricIDCacheSize uint64
DateMetricIDCacheSizeBytes uint64
DateMetricIDCacheSyncsCount uint64
DateMetricIDCacheResetsCount uint64
HourMetricIDCacheSize uint64
RecentHourInvertedIndexSize uint64
RecentHourInvertedIndexSizeBytes uint64
RecentHourInvertedIndexUniqueTagPairsSize uint64
RecentHourInvertedIndexPendingMetricIDsSize uint64
IndexDBMetrics IndexDBMetrics
TableMetrics TableMetrics
}
@ -372,6 +357,7 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
m.MetricNameCacheCollisions += cs.Collisions
m.DateMetricIDCacheSize += uint64(s.dateMetricIDCache.EntriesCount())
m.DateMetricIDCacheSizeBytes += uint64(s.dateMetricIDCache.SizeBytes())
m.DateMetricIDCacheSyncsCount += atomic.LoadUint64(&s.dateMetricIDCache.syncsCount)
m.DateMetricIDCacheResetsCount += atomic.LoadUint64(&s.dateMetricIDCache.resetsCount)
@ -383,18 +369,6 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
}
m.HourMetricIDCacheSize += uint64(hourMetricIDsLen)
m.RecentHourInvertedIndexSize += uint64(hmPrev.iidx.GetEntriesCount())
m.RecentHourInvertedIndexSize += uint64(hmCurr.iidx.GetEntriesCount())
m.RecentHourInvertedIndexSizeBytes += hmPrev.iidx.SizeBytes()
m.RecentHourInvertedIndexSizeBytes += hmCurr.iidx.SizeBytes()
m.RecentHourInvertedIndexUniqueTagPairsSize += uint64(hmPrev.iidx.GetUniqueTagPairsLen())
m.RecentHourInvertedIndexUniqueTagPairsSize += uint64(hmCurr.iidx.GetUniqueTagPairsLen())
m.RecentHourInvertedIndexPendingMetricIDsSize += uint64(hmPrev.iidx.GetPendingMetricIDsLen())
m.RecentHourInvertedIndexPendingMetricIDsSize += uint64(hmCurr.iidx.GetPendingMetricIDsLen())
s.idb().UpdateMetrics(&m.IndexDBMetrics)
s.tb.UpdateMetrics(&m.TableMetrics)
}
@ -509,7 +483,6 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
if !fs.IsPathExist(path) {
logger.Infof("nothing to load from %q", path)
return &hourMetricIDs{
iidx: newInmemoryInvertedIndex(),
hour: hour,
}
}
@ -521,7 +494,6 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
if len(src) < 24 {
logger.Errorf("discarding %s, since it has broken header; got %d bytes; want %d bytes", path, len(src), 24)
return &hourMetricIDs{
iidx: newInmemoryInvertedIndex(),
hour: hour,
}
}
@ -534,7 +506,6 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
if hourLoaded != hour {
logger.Infof("discarding %s, since it contains outdated hour; got %d; want %d", name, hourLoaded, hour)
return &hourMetricIDs{
iidx: newInmemoryInvertedIndex(),
hour: hour,
}
}
@ -545,7 +516,6 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
if uint64(len(src)) < 8*hmLen {
logger.Errorf("discarding %s, since it has broken hm.m data; got %d bytes; want at least %d bytes", path, len(src), 8*hmLen)
return &hourMetricIDs{
iidx: newInmemoryInvertedIndex(),
hour: hour,
}
}
@ -556,30 +526,9 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
m.Add(metricID)
}
// Unmarshal hm.iidx
iidx := newInmemoryInvertedIndex()
if !disableRecentHourIndex {
tail, err := iidx.Unmarshal(src)
if err != nil {
logger.Errorf("discarding %s, since it has broken hm.iidx data: %s", path, err)
return &hourMetricIDs{
iidx: newInmemoryInvertedIndex(),
hour: hour,
}
}
if len(tail) > 0 {
logger.Errorf("discarding %s, since it contains superflouos %d bytes of data", path, len(tail))
return &hourMetricIDs{
iidx: newInmemoryInvertedIndex(),
hour: hour,
}
}
}
logger.Infof("loaded %s from %q in %s; entriesCount: %d; sizeBytes: %d", name, path, time.Since(startTime), hmLen, srcOrigLen)
return &hourMetricIDs{
m: m,
iidx: iidx,
hour: hourLoaded,
isFull: isFull != 0,
}
@ -605,11 +554,6 @@ func (s *Storage) mustSaveHourMetricIDs(hm *hourMetricIDs, name string) {
dst = encoding.MarshalUint64(dst, metricID)
}
if !disableRecentHourIndex {
// Marshal hm.iidx
dst = hm.iidx.Marshal(dst)
}
if err := ioutil.WriteFile(path, dst, 0644); err != nil {
logger.Panicf("FATAL: cannot write %d bytes to %q: %s", len(dst), path, err)
}
@ -960,9 +904,6 @@ func (s *Storage) updatePerDateData(rows []rawRow, lastError error) error {
s.pendingHourEntriesLock.Lock()
s.pendingHourEntries.Add(metricID)
s.pendingHourEntriesLock.Unlock()
if !disableRecentHourIndex {
hm.iidx.AddMetricID(idb, metricID)
}
}
// Slower path: check global cache for (date, metricID) entry.
@ -1028,6 +969,15 @@ func (dmc *dateMetricIDCache) EntriesCount() int {
return n
}
func (dmc *dateMetricIDCache) SizeBytes() uint64 {
byDate := dmc.byDate.Load().(*byDateMetricIDMap)
n := uint64(0)
for _, e := range byDate.m {
n += e.v.SizeBytes()
}
return n
}
func (dmc *dateMetricIDCache) Has(date, metricID uint64) bool {
byDate := dmc.byDate.Load().(*byDateMetricIDMap)
v := byDate.get(date)
@ -1140,20 +1090,16 @@ func (s *Storage) updateCurrHourMetricIDs() {
// Slow path: hm.m must be updated with non-empty s.pendingHourEntries.
var m *uint64set.Set
var iidx *inmemoryInvertedIndex
isFull := hm.isFull
if hm.hour == hour {
m = hm.m.Clone()
iidx = hm.iidx
} else {
m = &uint64set.Set{}
iidx = newInmemoryInvertedIndex()
isFull = true
}
m.Union(newMetricIDs)
hmNew := &hourMetricIDs{
m: m,
iidx: iidx,
hour: hour,
isFull: isFull,
}
@ -1165,7 +1111,6 @@ func (s *Storage) updateCurrHourMetricIDs() {
type hourMetricIDs struct {
m *uint64set.Set
iidx *inmemoryInvertedIndex
hour uint64
isFull bool
}

View file

@ -108,7 +108,6 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hmOrig := &hourMetricIDs{
m: &uint64set.Set{},
iidx: newInmemoryInvertedIndex(),
hour: 123,
}
hmOrig.m.Add(12)
@ -144,7 +143,6 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hmOrig := &hourMetricIDs{
m: &uint64set.Set{},
iidx: newInmemoryInvertedIndex(),
hour: hour,
}
hmOrig.m.Add(12)
@ -189,7 +187,6 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hmOrig := &hourMetricIDs{
m: &uint64set.Set{},
iidx: newInmemoryInvertedIndex(),
hour: 123,
}
hmOrig.m.Add(12)
@ -231,7 +228,6 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hmOrig := &hourMetricIDs{
m: &uint64set.Set{},
iidx: newInmemoryInvertedIndex(),
hour: hour,
}
hmOrig.m.Add(12)