lib/storage: implement per-day inverted index

This commit is contained in:
Aliaksandr Valialkin 2019-11-09 23:17:42 +02:00
parent 5810ba57c2
commit ee7765b10d
6 changed files with 453 additions and 55 deletions

View file

@ -423,6 +423,13 @@ func registerStorageMetrics() {
return float64(idbm().RecentHourInvertedIndexSearchHits)
})
metrics.NewGauge(`vm_date_range_search_calls_total`, func() float64 {
return float64(idbm().DateRangeSearchCalls)
})
metrics.NewGauge(`vm_date_range_hits_total`, func() float64 {
return float64(idbm().DateRangeSearchHits)
})
metrics.NewGauge(`vm_cache_entries{type="storage/tsid"}`, func() float64 {
return float64(m().TSIDCacheSize)
})

View file

@ -42,6 +42,9 @@ const (
// Prefix for Date->MetricID entries.
nsPrefixDateToMetricID = 5
// Prefix for (Date,Tag)->MetricID entries.
nsPrefixDateTagToMetricIDs = 6
)
func shouldCacheBlock(item []byte) bool {
@ -50,8 +53,8 @@ func shouldCacheBlock(item []byte) bool {
}
// Do not cache items starting from
switch item[0] {
case nsPrefixTagToMetricIDs:
// Do not cache blocks with tag->metricIDs items, since:
case nsPrefixTagToMetricIDs, nsPrefixDateTagToMetricIDs:
// Do not cache blocks with tag->metricIDs and (date,tag)->metricIDs items, since:
// - these blocks are scanned sequentially, so the overhead
// on their unmarshaling is amortized by the sequential scan.
// - these blocks can occupy high amounts of RAM in cache
@ -98,8 +101,17 @@ type indexDB struct {
// The number of hits for recent hour searches over inverted index.
recentHourInvertedIndexSearchHits uint64
// The number of calls for date range searches.
dateRangeSearchCalls uint64
// The number of hits for date range searches.
dateRangeSearchHits uint64
mustDrop uint64
// Start date fully covered by per-day inverted index.
startDateForPerDayInvertedIndex uint64
name string
tb *mergeset.Table
@ -184,6 +196,14 @@ func openIndexDB(path string, metricIDCache, metricNameCache *workingsetcache.Ca
}
db.setDeletedMetricIDs(dmis)
is = db.getIndexSearch()
date, err := is.getStartDateForPerDayInvertedIndex()
db.putIndexSearch(is)
if err != nil {
return nil, fmt.Errorf("cannot obtain start date for per-day inverted index: %s", err)
}
db.startDateForPerDayInvertedIndex = date
return db, nil
}
@ -214,6 +234,9 @@ type IndexDBMetrics struct {
RecentHourInvertedIndexSearchCalls uint64
RecentHourInvertedIndexSearchHits uint64
DateRangeSearchCalls uint64
DateRangeSearchHits uint64
IndexBlocksWithMetricIDsProcessed uint64
IndexBlocksWithMetricIDsIncorrectOrder uint64
@ -255,6 +278,9 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
m.RecentHourInvertedIndexSearchCalls += atomic.LoadUint64(&db.recentHourInvertedIndexSearchCalls)
m.RecentHourInvertedIndexSearchHits += atomic.LoadUint64(&db.recentHourInvertedIndexSearchHits)
m.DateRangeSearchCalls += atomic.LoadUint64(&db.dateRangeSearchCalls)
m.DateRangeSearchHits += atomic.LoadUint64(&db.dateRangeSearchHits)
m.IndexBlocksWithMetricIDsProcessed = atomic.LoadUint64(&indexBlocksWithMetricIDsProcessed)
m.IndexBlocksWithMetricIDsIncorrectOrder = atomic.LoadUint64(&indexBlocksWithMetricIDsIncorrectOrder)
@ -734,7 +760,7 @@ func (is *indexSearch) searchTagKeys(tks map[string]struct{}, maxTagKeys int) er
if !bytes.HasPrefix(item, prefix) {
break
}
if err := mp.Init(item); err != nil {
if err := mp.Init(item, nsPrefixTagToMetricIDs); err != nil {
return err
}
if mp.IsDeletedTag(dmis) {
@ -802,7 +828,7 @@ func (is *indexSearch) searchTagValues(tvs map[string]struct{}, tagKey []byte, m
if !bytes.HasPrefix(item, prefix) {
break
}
if err := mp.Init(item); err != nil {
if err := mp.Init(item, nsPrefixTagToMetricIDs); err != nil {
return err
}
if mp.IsDeletedTag(dmis) {
@ -960,6 +986,35 @@ func (db *indexDB) updateDeletedMetricIDs(metricIDs *uint64set.Set) {
db.deletedMetricIDsUpdateLock.Unlock()
}
func (is *indexSearch) getStartDateForPerDayInvertedIndex() (uint64, error) {
minDate := uint64(timestampFromTime(time.Now())) / msecPerDay
kb := &is.kb
ts := &is.ts
kb.B = append(kb.B[:0], nsPrefixDateTagToMetricIDs)
prefix := kb.B
ts.Seek(kb.B)
for ts.NextItem() {
item := ts.Item
if !bytes.HasPrefix(item, prefix) {
break
}
suffix := item[len(prefix):]
// Suffix must contain encoded 64-bit date.
if len(suffix) < 8 {
return 0, fmt.Errorf("unexpected (date, tag)->metricIDs row len; must be at least 8 bytes; got %d bytes", len(suffix))
}
minDate = encoding.UnmarshalUint64(suffix)
break
}
if err := ts.Error(); err != nil {
return 0, err
}
// The minDate can contain incomplete inverted index, so increment it.
minDate++
return minDate, nil
}
func (is *indexSearch) loadDeletedMetricIDs() (*uint64set.Set, error) {
dmis := &uint64set.Set{}
ts := &is.ts
@ -1614,7 +1669,16 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf
// Fast path: found metricIDs in the inmemoryInvertedIndex for the last hour.
return nil
}
ok, err := is.tryUpdatingMetricIDsForDateRange(metricIDs, tfs, tr, maxMetrics)
if err != nil {
return err
}
if ok {
// Fast path: found metricIDs by date range.
return nil
}
// Slow path - try searching over the whole inverted index.
minTf, minMetricIDs, err := is.getTagFilterWithMinMetricIDsCountOptimized(tfs, tr, maxMetrics)
if err != nil {
return err
@ -1922,7 +1986,8 @@ var errMissingMetricIDsForDate = errors.New("missing metricIDs for date")
func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (*uint64set.Set, error) {
atomic.AddUint64(&is.db.recentHourMetricIDsSearchCalls, 1)
if metricIDs, ok := is.getMetricIDsForRecentHours(tr, maxMetrics); ok {
metricIDs, ok := is.getMetricIDsForRecentHours(tr, maxMetrics)
if ok {
// Fast path: tr covers the current and / or the previous hour.
// Return the full list of metric ids for this time range.
atomic.AddUint64(&is.db.recentHourMetricIDsSearchHits, 1)
@ -1937,17 +2002,160 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (*
// Too much dates must be covered. Give up.
return nil, errMissingMetricIDsForDate
}
metricIDs := &uint64set.Set{}
// Search for metricIDs for each day in parallel.
metricIDs = &uint64set.Set{}
var wg sync.WaitGroup
var errGlobal error
var mu sync.Mutex // protects metricIDs + errGlobal from concurrent access below.
for minDate <= maxDate {
if err := is.getMetricIDsForDate(minDate, metricIDs, maxMetrics); err != nil {
return nil, err
}
date := minDate
isLocal := is.db.getIndexSearch()
wg.Add(1)
go func() {
defer wg.Done()
defer is.db.putIndexSearch(isLocal)
var result uint64set.Set
err := isLocal.getMetricIDsForDate(date, &result, maxMetrics)
mu.Lock()
if metricIDs.Len() < maxMetrics {
metricIDs.Union(&result)
}
if err != nil {
errGlobal = err
}
mu.Unlock()
}()
minDate++
}
wg.Wait()
if errGlobal != nil {
return nil, errGlobal
}
atomic.AddUint64(&is.db.dateMetricIDsSearchHits, 1)
return metricIDs, nil
}
func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange, maxMetrics int) (bool, error) {
atomic.AddUint64(&is.db.dateRangeSearchCalls, 1)
minDate := uint64(tr.MinTimestamp) / msecPerDay
maxDate := uint64(tr.MaxTimestamp) / msecPerDay
if minDate < is.db.startDateForPerDayInvertedIndex || maxDate < minDate {
// Per-day inverted index doesn't cover the selected date range.
return false, nil
}
if maxDate-minDate > maxDaysForDateMetricIDs {
// Too much dates must be covered. Give up, since it may be slow.
return false, nil
}
// Search for metricIDs for each day in parallel.
var wg sync.WaitGroup
var errGlobal error
okGlobal := true
var mu sync.Mutex // protects metricIDs + *Global vars from concurrent access below
for minDate <= maxDate {
date := minDate
isLocal := is.db.getIndexSearch()
wg.Add(1)
go func() {
defer wg.Done()
defer is.db.putIndexSearch(isLocal)
var result uint64set.Set
ok, err := isLocal.tryUpdatingMetricIDsForDate(date, &result, tfs, maxMetrics)
mu.Lock()
if metricIDs.Len() < maxMetrics {
metricIDs.Union(&result)
}
if !ok {
okGlobal = ok
}
if err != nil {
errGlobal = fmt.Errorf("cannot search for metricIDs on date %d: %s", date, err)
}
mu.Unlock()
}()
minDate++
}
wg.Wait()
if errGlobal != nil {
return false, errGlobal
}
atomic.AddUint64(&is.db.dateRangeSearchHits, 1)
return okGlobal, nil
}
func (is *indexSearch) tryUpdatingMetricIDsForDate(date uint64, metricIDs *uint64set.Set, tfs *TagFilters, maxMetrics int) (bool, error) {
var tfFirst *tagFilter
for i := range tfs.tfs {
tf := &tfs.tfs[i]
if tf.isNegative {
continue
}
tfFirst = tf
break
}
var result *uint64set.Set
maxDateMetrics := maxMetrics * 20
if tfFirst == nil {
result = &uint64set.Set{}
if err := is.updateMetricIDsForDateAll(result, date, maxDateMetrics); err != nil {
if err == errMissingMetricIDsForDate {
// Zero data points were written on the given date.
// It is OK, since (date, metricID) entries must exist for the given date
// according to startDateForPerDayInvertedIndex.
return true, nil
}
return false, fmt.Errorf("cannot obtain all the metricIDs for date %d: %s", date, err)
}
} else {
m, err := is.getMetricIDsForDateTagFilter(tfFirst, date, tfs.commonPrefix, maxDateMetrics)
if err != nil {
if err == errFallbackToMetricNameMatch {
// The per-date search is too expensive. Probably it is better to perform global search
// using metric name match.
return false, nil
}
return false, err
}
result = m
}
if result.Len() >= maxDateMetrics {
return false, fmt.Errorf("more than %d time series found; narrow down the query or increase `-search.maxUniqueTimeseries`", maxDateMetrics)
}
for i := range tfs.tfs {
tf := &tfs.tfs[i]
if tf == tfFirst {
continue
}
m, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, maxDateMetrics)
if err != nil {
if err == errFallbackToMetricNameMatch {
// The per-date search is too expensive. Probably it is better to perform global search
// using metric name match.
return false, nil
}
return false, err
}
if m.Len() >= maxDateMetrics {
return false, fmt.Errorf("more than %d time series found for tag filter %s; narrow down the query or increase `-search.maxUniqueTimeseries`",
maxDateMetrics, tf)
}
if tf.isNegative {
result.Subtract(m)
} else {
result.Intersect(m)
}
if result.Len() == 0 {
return true, nil
}
}
metricIDs.Union(result)
return true, nil
}
func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int) (*uint64set.Set, bool) {
minHour := uint64(tr.MinTimestamp) / msecPerHour
maxHour := uint64(tr.MaxTimestamp) / msecPerHour
@ -2023,15 +2231,47 @@ func (db *indexDB) storeDateMetricID(date, metricID uint64) error {
return nil
}
// Slow path: create (date, metricID) entry.
// Slow path: create (date, metricID) entries.
items := getIndexItems()
items.B = marshalCommonPrefix(items.B[:0], nsPrefixDateToMetricID)
defer putIndexItems(items)
items.B = marshalCommonPrefix(items.B, nsPrefixDateToMetricID)
items.B = encoding.MarshalUint64(items.B, date)
items.B = encoding.MarshalUint64(items.B, metricID)
items.Next()
err = db.tb.AddItems(items.Items)
putIndexItems(items)
return err
// Create per-day inverted index entries for metricID.
kb := kbPool.Get()
defer kbPool.Put(kb)
mn := GetMetricName()
defer PutMetricName(mn)
kb.B, err = db.searchMetricName(kb.B[:0], metricID)
if err != nil {
return fmt.Errorf("cannot find metricName by metricID %d: %s", metricID, err)
}
if err = mn.Unmarshal(kb.B); err != nil {
return fmt.Errorf("cannot unmarshal metricName %q obtained by metricID %d: %s", metricID, kb.B, err)
}
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
kb.B = encoding.MarshalUint64(kb.B, date)
items.B = append(items.B, kb.B...)
items.B = marshalTagValue(items.B, nil)
items.B = marshalTagValue(items.B, mn.MetricGroup)
items.B = encoding.MarshalUint64(items.B, metricID)
items.Next()
for i := range mn.Tags {
tag := &mn.Tags[i]
items.B = append(items.B, kb.B...)
items.B = tag.Marshal(items.B)
items.B = encoding.MarshalUint64(items.B, metricID)
items.Next()
}
if err = db.tb.AddItems(items.Items); err != nil {
return fmt.Errorf("cannot add per-day entires for metricID %d: %s", metricID, err)
}
return nil
}
func (is *indexSearch) hasDateMetricID(date, metricID uint64) (bool, error) {
@ -2052,6 +2292,23 @@ func (is *indexSearch) hasDateMetricID(date, metricID uint64) (bool, error) {
return true, nil
}
func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64, commonPrefix []byte, maxMetrics int) (*uint64set.Set, error) {
// Augument tag filter prefix for per-date search instead of global search.
if !bytes.HasPrefix(tf.prefix, commonPrefix) {
logger.Panicf("BUG: unexpected tf.prefix %q; must start with commonPrefix %q", tf.prefix, commonPrefix)
}
kb := kbPool.Get()
defer kbPool.Put(kb)
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
kb.B = encoding.MarshalUint64(kb.B, date)
kb.B = append(kb.B, tf.prefix[len(commonPrefix):]...)
tfNew := *tf
tfNew.isNegative = false // isNegative for the original tf is handled by the caller.
tfNew.prefix = kb.B
return is.getMetricIDsForTagFilter(&tfNew, maxMetrics)
}
func (is *indexSearch) getMetricIDsForDate(date uint64, metricIDs *uint64set.Set, maxMetrics int) error {
ts := &is.ts
kb := &is.kb
@ -2105,15 +2362,28 @@ func (is *indexSearch) containsTimeRange(tr TimeRange) (bool, error) {
return true, nil
}
func (is *indexSearch) updateMetricIDsAll(metricIDs *uint64set.Set, maxMetrics int) error {
ts := &is.ts
kb := &is.kb
mp := &is.mp
func (is *indexSearch) updateMetricIDsForDateAll(metricIDs *uint64set.Set, date uint64, maxMetrics int) error {
// Extract all the metricIDs from (date, __name__=value)->metricIDs entries.
kb := kbPool.Get()
defer kbPool.Put(kb)
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs)
kb.B = encoding.MarshalUint64(kb.B, date)
kb.B = marshalTagValue(kb.B, nil)
return is.updateMetricIDsForPrefix(kb.B, metricIDs, maxMetrics)
}
// Extract all the mtricIDs from (__name__=value) metricIDs.
func (is *indexSearch) updateMetricIDsAll(metricIDs *uint64set.Set, maxMetrics int) error {
kb := kbPool.Get()
defer kbPool.Put(kb)
// Extract all the metricIDs from (__name__=value)->metricIDs entries.
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs)
kb.B = marshalTagValue(kb.B, nil)
prefix := kb.B
return is.updateMetricIDsForPrefix(kb.B, metricIDs, maxMetrics)
}
func (is *indexSearch) updateMetricIDsForPrefix(prefix []byte, metricIDs *uint64set.Set, maxMetrics int) error {
ts := &is.ts
mp := &is.mp
ts.Seek(prefix)
for ts.NextItem() {
item := ts.Item
@ -2248,6 +2518,13 @@ func unmarshalCommonPrefix(src []byte) ([]byte, byte, error) {
const commonPrefixLen = 1
type tagToMetricIDsRowParser struct {
// NSPrefix contains the first byte parsed from the row after Init call.
// This is either nsPrefixTagToMetricIDs or nsPrefixDateTagToMetricIDs.
NSPrefix byte
// Date contains parsed date for nsPrefixDateTagToMetricIDs rows after Init call
Date uint64
// MetricIDs contains parsed MetricIDs after ParseMetricIDs call
MetricIDs []uint64
@ -2259,6 +2536,8 @@ type tagToMetricIDsRowParser struct {
}
func (mp *tagToMetricIDsRowParser) Reset() {
mp.NSPrefix = 0
mp.Date = 0
mp.MetricIDs = mp.MetricIDs[:0]
mp.Tag.Reset()
mp.tail = nil
@ -2267,14 +2546,23 @@ func (mp *tagToMetricIDsRowParser) Reset() {
// Init initializes mp from b, which should contain encoded tag->metricIDs row.
//
// b cannot be re-used until Reset call.
func (mp *tagToMetricIDsRowParser) Init(b []byte) error {
tail, prefix, err := unmarshalCommonPrefix(b)
func (mp *tagToMetricIDsRowParser) Init(b []byte, nsPrefixExpected byte) error {
tail, nsPrefix, err := unmarshalCommonPrefix(b)
if err != nil {
return fmt.Errorf("invalid tag->metricIDs row %q: %s", b, err)
}
if prefix != nsPrefixTagToMetricIDs {
return fmt.Errorf("invalid prefix for tag->metricIDs row %q; got %d; want %d", b, prefix, nsPrefixTagToMetricIDs)
if nsPrefix != nsPrefixExpected {
return fmt.Errorf("invalid prefix for tag->metricIDs row %q; got %d; want %d", b, nsPrefix, nsPrefixExpected)
}
if nsPrefix == nsPrefixDateTagToMetricIDs {
// unmarshal date.
if len(tail) < 8 {
return fmt.Errorf("cannot unmarshal date from (date, tag)->metricIDs row %q from %d bytes; want at least 8 bytes", b, len(tail))
}
mp.Date = encoding.UnmarshalUint64(tail)
tail = tail[8:]
}
mp.NSPrefix = nsPrefix
tail, err = mp.Tag.Unmarshal(tail)
if err != nil {
return fmt.Errorf("cannot unmarshal tag from tag->metricIDs row %q: %s", b, err)
@ -2282,6 +2570,16 @@ func (mp *tagToMetricIDsRowParser) Init(b []byte) error {
return mp.InitOnlyTail(b, tail)
}
// MarshalPrefix marshals row prefix without tail to dst.
func (mp *tagToMetricIDsRowParser) MarshalPrefix(dst []byte) []byte {
dst = marshalCommonPrefix(dst, mp.NSPrefix)
if mp.NSPrefix == nsPrefixDateTagToMetricIDs {
dst = encoding.MarshalUint64(dst, mp.Date)
}
dst = mp.Tag.Marshal(dst)
return dst
}
// InitOnlyTail initializes mp.tail from tail.
//
// b must contain tag->metricIDs row.
@ -2301,7 +2599,10 @@ func (mp *tagToMetricIDsRowParser) InitOnlyTail(b, tail []byte) error {
//
// Prefix contains (tag)
func (mp *tagToMetricIDsRowParser) EqualPrefix(x *tagToMetricIDsRowParser) bool {
return mp.Tag.Equal(&x.Tag)
if !mp.Tag.Equal(&x.Tag) {
return false
}
return mp.Date == x.Date && mp.NSPrefix == x.NSPrefix
}
// FirstAndLastMetricIDs returns the first and the last metricIDs in the mp.tail.
@ -2364,22 +2665,28 @@ func (mp *tagToMetricIDsRowParser) IsDeletedTag(dmis *uint64set.Set) bool {
}
func mergeTagToMetricIDsRows(data []byte, items [][]byte) ([]byte, [][]byte) {
// Perform quick checks whether items contain tag->metricIDs rows
data, items = mergeTagToMetricIDsRowsInternal(data, items, nsPrefixTagToMetricIDs)
data, items = mergeTagToMetricIDsRowsInternal(data, items, nsPrefixDateTagToMetricIDs)
return data, items
}
func mergeTagToMetricIDsRowsInternal(data []byte, items [][]byte, nsPrefix byte) ([]byte, [][]byte) {
// Perform quick checks whether items contain rows starting from nsPrefix
// based on the fact that items are sorted.
if len(items) <= 2 {
// The first and the last row must remain unchanged.
return data, items
}
firstItem := items[0]
if len(firstItem) > 0 && firstItem[0] > nsPrefixTagToMetricIDs {
if len(firstItem) > 0 && firstItem[0] > nsPrefix {
return data, items
}
lastItem := items[len(items)-1]
if len(lastItem) > 0 && lastItem[0] < nsPrefixTagToMetricIDs {
if len(lastItem) > 0 && lastItem[0] < nsPrefix {
return data, items
}
// items contain at least one tag->metricIDs row. Merge rows with common tag.
// items contain at least one row starting from nsPrefix. Merge rows with common tag.
tmm := getTagToMetricIDsRowsMerger()
tmm.dataCopy = append(tmm.dataCopy[:0], data...)
tmm.itemsCopy = append(tmm.itemsCopy[:0], items...)
@ -2388,29 +2695,25 @@ func mergeTagToMetricIDsRows(data []byte, items [][]byte) ([]byte, [][]byte) {
dstData := data[:0]
dstItems := items[:0]
for i, item := range items {
if len(item) == 0 || item[0] != nsPrefixTagToMetricIDs || i == 0 || i == len(items)-1 {
// Write rows other than tag->metricIDs as-is.
if len(item) == 0 || item[0] != nsPrefix || i == 0 || i == len(items)-1 {
// Write rows not starting with nsPrefix as-is.
// Additionally write the first and the last row as-is in order to preserve
// sort order for adjancent blocks.
if len(tmm.pendingMetricIDs) > 0 {
dstData, dstItems = tmm.flushPendingMetricIDs(dstData, dstItems, mpPrev)
}
dstData, dstItems = tmm.flushPendingMetricIDs(dstData, dstItems, mpPrev)
dstData = append(dstData, item...)
dstItems = append(dstItems, dstData[len(dstData)-len(item):])
continue
}
if err := mp.Init(item); err != nil {
logger.Panicf("FATAL: cannot parse tag->metricIDs row during merge: %s", err)
if err := mp.Init(item, nsPrefix); err != nil {
logger.Panicf("FATAL: cannot parse row starting with nsPrefix %d during merge: %s", nsPrefix, err)
}
if mp.MetricIDsLen() >= maxMetricIDsPerRow {
if len(tmm.pendingMetricIDs) > 0 {
dstData, dstItems = tmm.flushPendingMetricIDs(dstData, dstItems, mpPrev)
}
dstData, dstItems = tmm.flushPendingMetricIDs(dstData, dstItems, mpPrev)
dstData = append(dstData, item...)
dstItems = append(dstItems, dstData[len(dstData)-len(item):])
continue
}
if len(tmm.pendingMetricIDs) > 0 && !mp.EqualPrefix(mpPrev) {
if !mp.EqualPrefix(mpPrev) {
dstData, dstItems = tmm.flushPendingMetricIDs(dstData, dstItems, mpPrev)
}
mp.ParseMetricIDs()
@ -2510,7 +2813,8 @@ func (tmm *tagToMetricIDsRowsMerger) Reset() {
func (tmm *tagToMetricIDsRowsMerger) flushPendingMetricIDs(dstData []byte, dstItems [][]byte, mp *tagToMetricIDsRowParser) ([]byte, [][]byte) {
if len(tmm.pendingMetricIDs) == 0 {
logger.Panicf("BUG: pendingMetricIDs must be non-empty")
// Nothing to flush
return dstData, dstItems
}
// Use sort.Sort instead of sort.Slice in order to reduce memory allocations.
sort.Sort(&tmm.pendingMetricIDs)
@ -2518,8 +2822,7 @@ func (tmm *tagToMetricIDsRowsMerger) flushPendingMetricIDs(dstData []byte, dstIt
// Marshal pendingMetricIDs
dstDataLen := len(dstData)
dstData = marshalCommonPrefix(dstData, nsPrefixTagToMetricIDs)
dstData = mp.Tag.Marshal(dstData)
dstData = mp.MarshalPrefix(dstData)
for _, metricID := range tmm.pendingMetricIDs {
dstData = encoding.MarshalUint64(dstData, metricID)
}

View file

@ -52,8 +52,11 @@ func TestMergeTagToMetricIDsRows(t *testing.T) {
t.Fatalf("unexpected items;\ngot\n%X\nwant\n%X", resultItems, expectedItems)
}
}
x := func(key, value string, metricIDs []uint64) string {
dst := marshalCommonPrefix(nil, nsPrefixTagToMetricIDs)
xy := func(nsPrefix byte, key, value string, metricIDs []uint64) string {
dst := marshalCommonPrefix(nil, nsPrefix)
if nsPrefix == nsPrefixDateTagToMetricIDs {
dst = encoding.MarshalUint64(dst, 1234567901233)
}
t := &Tag{
Key: []byte(key),
Value: []byte(value),
@ -64,6 +67,12 @@ func TestMergeTagToMetricIDsRows(t *testing.T) {
}
return string(dst)
}
x := func(key, value string, metricIDs []uint64) string {
return xy(nsPrefixTagToMetricIDs, key, value, metricIDs)
}
y := func(key, value string, metricIDs []uint64) string {
return xy(nsPrefixDateTagToMetricIDs, key, value, metricIDs)
}
f(nil, nil)
f([]string{}, nil)
@ -80,6 +89,19 @@ func TestMergeTagToMetricIDsRows(t *testing.T) {
x("", "", []uint64{0}),
x("", "", []uint64{0}),
})
f([]string{
x("", "", []uint64{0}),
x("", "", []uint64{0}),
x("", "", []uint64{0}),
y("", "", []uint64{0}),
y("", "", []uint64{0}),
y("", "", []uint64{0}),
}, []string{
x("", "", []uint64{0}),
x("", "", []uint64{0}),
y("", "", []uint64{0}),
y("", "", []uint64{0}),
})
f([]string{
x("", "", []uint64{0}),
x("", "", []uint64{0}),
@ -102,6 +124,17 @@ func TestMergeTagToMetricIDsRows(t *testing.T) {
x("", "", []uint64{0}),
x("", "", []uint64{0}),
})
f([]string{
"\x00asdf",
y("", "", []uint64{0}),
y("", "", []uint64{0}),
y("", "", []uint64{0}),
y("", "", []uint64{0}),
}, []string{
"\x00asdf",
y("", "", []uint64{0}),
y("", "", []uint64{0}),
})
f([]string{
"\x00asdf",
x("", "", []uint64{0}),
@ -114,6 +147,19 @@ func TestMergeTagToMetricIDsRows(t *testing.T) {
x("", "", []uint64{0}),
"xyz",
})
f([]string{
"\x00asdf",
x("", "", []uint64{0}),
x("", "", []uint64{0}),
y("", "", []uint64{0}),
y("", "", []uint64{0}),
"xyz",
}, []string{
"\x00asdf",
x("", "", []uint64{0}),
y("", "", []uint64{0}),
"xyz",
})
f([]string{
"\x00asdf",
x("", "", []uint64{1}),
@ -210,10 +256,13 @@ func TestMergeTagToMetricIDsRows(t *testing.T) {
"\x00aa",
x("foo", "bar", metricIDs),
x("foo", "bar", metricIDs),
y("foo", "bar", metricIDs),
y("foo", "bar", metricIDs),
"x",
}, []string{
"\x00aa",
x("foo", "bar", metricIDs),
y("foo", "bar", metricIDs),
"x",
})
@ -271,10 +320,12 @@ func TestMergeTagToMetricIDsRows(t *testing.T) {
"\x00aa",
x("foo", "bar", metricIDs),
x("foo", "bar", metricIDs),
y("foo", "bar", metricIDs),
"x",
}, []string{
"\x00aa",
x("foo", "bar", []uint64{123}),
y("foo", "bar", []uint64{123}),
"x",
})
@ -301,11 +352,13 @@ func TestMergeTagToMetricIDsRows(t *testing.T) {
x("foo", "bar", metricIDs),
x("foo", "bar", []uint64{123, 123, 125}),
x("foo", "bar", []uint64{123, 124}),
y("foo", "bar", []uint64{123, 124}),
}, []string{
"\x00aa",
x("foo", "bar", metricIDs),
x("foo", "bar", []uint64{123, 123, 125}),
x("foo", "bar", []uint64{123, 124}),
y("foo", "bar", []uint64{123, 124}),
})
f([]string{
x("foo", "bar", metricIDs),

View file

@ -71,7 +71,22 @@ func TestSearchQueryMarshalUnmarshal(t *testing.T) {
}
func TestSearch(t *testing.T) {
path := "TestSearch"
t.Run("global_inverted_index", func(t *testing.T) {
testSearchGeneric(t, false, false)
})
t.Run("perday_inverted_index", func(t *testing.T) {
testSearchGeneric(t, false, true)
})
t.Run("recent_hour_global_inverted_index", func(t *testing.T) {
testSearchGeneric(t, true, false)
})
t.Run("recent_hour_perday_inverted_index", func(t *testing.T) {
testSearchGeneric(t, true, true)
})
}
func testSearchGeneric(t *testing.T, forceRecentHourInvertedIndex, forcePerDayInvertedIndex bool) {
path := fmt.Sprintf("TestSearch_%v_%v", forceRecentHourInvertedIndex, forcePerDayInvertedIndex)
st, err := OpenStorage(path, 0)
if err != nil {
t.Fatalf("cannot open storage %q: %s", path, err)
@ -125,6 +140,17 @@ func TestSearch(t *testing.T) {
if err != nil {
t.Fatalf("cannot re-open storage %q: %s", path, err)
}
if forcePerDayInvertedIndex {
idb := st.idb()
idb.startDateForPerDayInvertedIndex = 0
idb.doExtDB(func(extDB *indexDB) {
extDB.startDateForPerDayInvertedIndex = 0
})
}
if forceRecentHourInvertedIndex {
hm := st.currHourMetricIDs.Load().(*hourMetricIDs)
hm.isFull = true
}
// Run search.
tr := TimeRange{
@ -133,7 +159,7 @@ func TestSearch(t *testing.T) {
}
t.Run("serial", func(t *testing.T) {
if err := testSearch(st, tr, mrs, accountsCount); err != nil {
if err := testSearchInternal(st, tr, mrs, accountsCount); err != nil {
t.Fatalf("unexpected error: %s", err)
}
})
@ -142,7 +168,7 @@ func TestSearch(t *testing.T) {
ch := make(chan error, 3)
for i := 0; i < cap(ch); i++ {
go func() {
ch <- testSearch(st, tr, mrs, accountsCount)
ch <- testSearchInternal(st, tr, mrs, accountsCount)
}()
}
for i := 0; i < cap(ch); i++ {
@ -158,7 +184,7 @@ func TestSearch(t *testing.T) {
})
}
func testSearch(st *Storage, tr TimeRange, mrs []MetricRow, accountsCount int) error {
func testSearchInternal(st *Storage, tr TimeRange, mrs []MetricRow, accountsCount int) error {
var s Search
for i := 0; i < 10; i++ {
// Prepare TagFilters for search.

View file

@ -883,7 +883,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
if err := s.tb.AddRows(rows); err != nil {
lastError = fmt.Errorf("cannot add rows to table: %s", err)
}
if err := s.updateDateMetricIDCache(rows, lastError); err != nil {
if err := s.updatePerDateData(rows, lastError); err != nil {
lastError = err
}
if lastError != nil {
@ -892,7 +892,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
return rows, nil
}
func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error {
func (s *Storage) updatePerDateData(rows []rawRow, lastError error) error {
var date uint64
var hour uint64
var prevTimestamp int64
@ -910,6 +910,7 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error
// The r belongs to the current hour. Check for the current hour cache.
if hm.m.Has(metricID) {
// Fast path: the metricID is in the current hour cache.
// This means the metricID has been already added to per-day inverted index.
continue
}
s.pendingHourEntriesLock.Lock()
@ -920,17 +921,20 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error
// Slower path: check global cache for (date, metricID) entry.
if s.dateMetricIDCache.Has(date, metricID) {
// The metricID has been already added to per-day inverted index.
continue
}
// Slow path: store the entry in the (date, metricID) cache and in the indexDB.
// Slow path: store the entry (date, metricID) entry in the indexDB.
// It is OK if the (date, metricID) entry is added multiple times to db
// by concurrent goroutines.
s.dateMetricIDCache.Set(date, metricID)
if err := idb.storeDateMetricID(date, metricID); err != nil {
lastError = err
continue
}
// The metric must be added to cache only after it has been successfully added to indexDB.
s.dateMetricIDCache.Set(date, metricID)
}
return lastError
}
@ -1192,6 +1196,11 @@ func openIndexDBTables(path string, metricIDCache, metricNameCache *workingsetca
return nil, nil, fmt.Errorf("cannot open prev indexdb table at %q: %s", prevPath, err)
}
// Adjust startDateForPerDayInvertedIndex for the previous index.
if prev.startDateForPerDayInvertedIndex > curr.startDateForPerDayInvertedIndex {
prev.startDateForPerDayInvertedIndex = curr.startDateForPerDayInvertedIndex
}
return curr, prev, nil
}

View file

@ -385,7 +385,7 @@ func TestStorageDeleteMetrics(t *testing.T) {
t.Run("serial", func(t *testing.T) {
for i := 0; i < 3; i++ {
if err = testStorageDeleteMetrics(s, 0); err != nil {
t.Fatalf("unexpected error: %s", err)
t.Fatalf("unexpected error on iteration %d: %s", i, err)
}
// Re-open the storage in order to check how deleted metricIDs
@ -393,7 +393,7 @@ func TestStorageDeleteMetrics(t *testing.T) {
s.MustClose()
s, err = OpenStorage(path, 0)
if err != nil {
t.Fatalf("cannot open storage after closing: %s", err)
t.Fatalf("cannot open storage after closing on iteration %d: %s", i, err)
}
}
})