lib/storage: add -disableRecentHourIndex flag for disabling inmemory index for recent hour

This may be useful for saving RAM on high number of time series aka high cardinality
This commit is contained in:
Aliaksandr Valialkin 2019-11-13 15:02:49 +02:00
parent f1620ba7c0
commit 633dd81bb5
3 changed files with 41 additions and 15 deletions

View file

@ -26,6 +26,8 @@ var (
vmselectAddr = flag.String("vmselectAddr", ":8401", "TCP address to accept connections from vmselect services") vmselectAddr = flag.String("vmselectAddr", ":8401", "TCP address to accept connections from vmselect services")
snapshotAuthKey = flag.String("snapshotAuthKey", "", "authKey, which must be passed in query string to /snapshot* pages") snapshotAuthKey = flag.String("snapshotAuthKey", "", "authKey, which must be passed in query string to /snapshot* pages")
disableRecentHourIndex = flag.Bool("disableRecentHourIndex", false, "Whether to disable inmemory inverted index for recent hour. "+
"This may be useful in order to reduce memory usage when working with high number of time series")
bigMergeConcurrency = flag.Int("bigMergeConcurrency", 0, "The maximum number of CPU cores to use for big merges. Default value is used if set to 0") bigMergeConcurrency = flag.Int("bigMergeConcurrency", 0, "The maximum number of CPU cores to use for big merges. Default value is used if set to 0")
smallMergeConcurrency = flag.Int("smallMergeConcurrency", 0, "The maximum number of CPU cores to use for small merges. Default value is used if set to 0") smallMergeConcurrency = flag.Int("smallMergeConcurrency", 0, "The maximum number of CPU cores to use for small merges. Default value is used if set to 0")
) )
@ -35,6 +37,9 @@ func main() {
buildinfo.Init() buildinfo.Init()
logger.Init() logger.Init()
if *disableRecentHourIndex {
storage.DisableRecentHourIndex()
}
storage.SetBigMergeWorkersCount(*bigMergeConcurrency) storage.SetBigMergeWorkersCount(*bigMergeConcurrency)
storage.SetSmallMergeWorkersCount(*smallMergeConcurrency) storage.SetSmallMergeWorkersCount(*smallMergeConcurrency)

View file

@ -2231,6 +2231,10 @@ func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int,
} }
func (is *indexSearch) tryUpdatingMetricIDsForRecentHour(metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange) bool { func (is *indexSearch) tryUpdatingMetricIDsForRecentHour(metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange) bool {
if disableRecentHourIndex {
return false
}
atomic.AddUint64(&is.db.recentHourInvertedIndexSearchCalls, 1) atomic.AddUint64(&is.db.recentHourInvertedIndexSearchCalls, 1)
k := accountProjectKey{ k := accountProjectKey{
AccountID: tfs.accountID, AccountID: tfs.accountID,

View file

@ -27,6 +27,17 @@ import (
const maxRetentionMonths = 12 * 100 const maxRetentionMonths = 12 * 100
var disableRecentHourIndex = false
// DisableRecentHourIndex disables in-memory inverted index for recent hour.
//
// This may be useful in order to save RAM for high cardinality data.
//
// This function must be called before OpenStorage.
func DisableRecentHourIndex() {
disableRecentHourIndex = true
}
// Storage represents TSDB storage. // Storage represents TSDB storage.
type Storage struct { type Storage struct {
// Atomic counters must go at the top of the structure in order to properly align by 8 bytes on 32-bit archs. // Atomic counters must go at the top of the structure in order to properly align by 8 bytes on 32-bit archs.
@ -593,19 +604,21 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
// Unmarshal hm.iidx // Unmarshal hm.iidx
iidx := newInmemoryInvertedIndex() iidx := newInmemoryInvertedIndex()
tail, err := iidx.Unmarshal(src) if !disableRecentHourIndex {
if err != nil { tail, err := iidx.Unmarshal(src)
logger.Errorf("discarding %s, since it has broken hm.iidx data: %s", path, err) if err != nil {
return &hourMetricIDs{ logger.Errorf("discarding %s, since it has broken hm.iidx data: %s", path, err)
iidx: newInmemoryInvertedIndex(), return &hourMetricIDs{
hour: hour, iidx: newInmemoryInvertedIndex(),
hour: hour,
}
} }
} if len(tail) > 0 {
if len(tail) > 0 { logger.Errorf("discarding %s, since it contains superflouos %d bytes of data", path, len(tail))
logger.Errorf("discarding %s, since it contains superflouos %d bytes of data", path, len(tail)) return &hourMetricIDs{
return &hourMetricIDs{ iidx: newInmemoryInvertedIndex(),
iidx: newInmemoryInvertedIndex(), hour: hour,
hour: hour, }
} }
} }
@ -652,8 +665,10 @@ func (s *Storage) mustSaveHourMetricIDs(hm *hourMetricIDs, name string) {
} }
} }
// Marshal hm.iidx if !disableRecentHourIndex {
dst = hm.iidx.Marshal(dst) // Marshal hm.iidx
dst = hm.iidx.Marshal(dst)
}
if err := ioutil.WriteFile(path, dst, 0644); err != nil { if err := ioutil.WriteFile(path, dst, 0644); err != nil {
logger.Panicf("FATAL: cannot write %d bytes to %q: %s", len(dst), path, err) logger.Panicf("FATAL: cannot write %d bytes to %q: %s", len(dst), path, err)
@ -1015,7 +1030,9 @@ func (s *Storage) updatePerDateData(rows []rawRow, lastError error) error {
} }
s.pendingHourEntries = append(s.pendingHourEntries, e) s.pendingHourEntries = append(s.pendingHourEntries, e)
s.pendingHourEntriesLock.Unlock() s.pendingHourEntriesLock.Unlock()
hm.iidx.AddMetricID(idb, e) if !disableRecentHourIndex {
hm.iidx.AddMetricID(idb, e)
}
} }
// Slower path: check global cache for (date, metricID) entry. // Slower path: check global cache for (date, metricID) entry.