diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index e646b9c87..4df0b82d1 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -59,6 +59,8 @@ const rawItemsFlushInterval = time.Second type Table struct { path string + flushCallback func() + partsLock sync.Mutex parts []*partWrapper @@ -121,8 +123,11 @@ func (pw *partWrapper) decRef() { // OpenTable opens a table on the given path. // +// Optional flushCallback is called every time new data batch is flushed +// to the underlying storage and becomes visible to search. +// // The table is created if it doesn't exist yet. -func OpenTable(path string) (*Table, error) { +func OpenTable(path string, flushCallback func()) (*Table, error) { path = filepath.Clean(path) logger.Infof("opening table %q...", path) startTime := time.Now() @@ -145,11 +150,12 @@ func OpenTable(path string) (*Table, error) { } tb := &Table{ - path: path, - parts: pws, - mergeIdx: uint64(time.Now().UnixNano()), - flockF: flockF, - stopCh: make(chan struct{}), + path: path, + flushCallback: flushCallback, + parts: pws, + mergeIdx: uint64(time.Now().UnixNano()), + flockF: flockF, + stopCh: make(chan struct{}), } tb.startPartMergers() tb.startRawItemsFlusher() @@ -444,6 +450,9 @@ func (tb *Table) mergeRawItemsBlocks(blocksToMerge []*inmemoryBlock) { if err := tb.mergeParts(pws, nil, true); err != nil { logger.Panicf("FATAL: cannot merge raw parts: %s", err) } + if tb.flushCallback != nil { + tb.flushCallback() + } } for { diff --git a/lib/mergeset/table_search_test.go b/lib/mergeset/table_search_test.go index fb09d5f2b..cfb721bb9 100644 --- a/lib/mergeset/table_search_test.go +++ b/lib/mergeset/table_search_test.go @@ -5,6 +5,7 @@ import ( "math/rand" "os" "sort" + "sync/atomic" "testing" "time" ) @@ -39,7 +40,7 @@ func TestTableSearchSerial(t *testing.T) { func() { // Re-open the table and verify the search works. - tb, err := OpenTable(path) + tb, err := OpenTable(path, nil) if err != nil { t.Fatalf("cannot open table: %s", err) } @@ -74,7 +75,7 @@ func TestTableSearchConcurrent(t *testing.T) { // Re-open the table and verify the search works. func() { - tb, err := OpenTable(path) + tb, err := OpenTable(path, nil) if err != nil { t.Fatalf("cannot open table: %s", err) } @@ -146,7 +147,11 @@ func testTableSearchSerial(tb *Table, items []string) error { } func newTestTable(path string, itemsCount int) (*Table, []string, error) { - tb, err := OpenTable(path) + var flushes uint64 + flushCallback := func() { + atomic.AddUint64(&flushes, 1) + } + tb, err := OpenTable(path, flushCallback) if err != nil { return nil, nil, fmt.Errorf("cannot open table: %s", err) } @@ -159,6 +164,9 @@ func newTestTable(path string, itemsCount int) (*Table, []string, error) { items[i] = item } tb.DebugFlush() + if itemsCount > 0 && atomic.LoadUint64(&flushes) == 0 { + return nil, nil, fmt.Errorf("unexpeted zero flushes for itemsCount=%d", itemsCount) + } sort.Strings(items) return tb, items, nil diff --git a/lib/mergeset/table_search_timing_test.go b/lib/mergeset/table_search_timing_test.go index ddafc9728..16c2f6035 100644 --- a/lib/mergeset/table_search_timing_test.go +++ b/lib/mergeset/table_search_timing_test.go @@ -32,7 +32,7 @@ func benchmarkTableSearch(b *testing.B, itemsCount int) { // Force finishing pending merges tb.MustClose() - tb, err = OpenTable(path) + tb, err = OpenTable(path, nil) if err != nil { b.Fatalf("unexpected error when re-opening table %q: %s", path, err) } diff --git a/lib/mergeset/table_test.go b/lib/mergeset/table_test.go index aeb658c77..db063f1eb 100644 --- a/lib/mergeset/table_test.go +++ b/lib/mergeset/table_test.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "sync" + "sync/atomic" "testing" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -20,7 +21,7 @@ func TestTableOpenClose(t *testing.T) { }() // Create a new table - tb, err := OpenTable(path) + tb, err := OpenTable(path, nil) if err != nil { t.Fatalf("cannot create new table: %s", err) } @@ -30,7 +31,7 @@ func TestTableOpenClose(t *testing.T) { // Re-open created table multiple times. for i := 0; i < 10; i++ { - tb, err := OpenTable(path) + tb, err := OpenTable(path, nil) if err != nil { t.Fatalf("cannot open created table: %s", err) } @@ -44,14 +45,14 @@ func TestTableOpenMultipleTimes(t *testing.T) { _ = os.RemoveAll(path) }() - tb1, err := OpenTable(path) + tb1, err := OpenTable(path, nil) if err != nil { t.Fatalf("cannot open table: %s", err) } defer tb1.MustClose() for i := 0; i < 10; i++ { - tb2, err := OpenTable(path) + tb2, err := OpenTable(path, nil) if err == nil { tb2.MustClose() t.Fatalf("expecting non-nil error when opening already opened table") @@ -68,7 +69,11 @@ func TestTableAddItemSerial(t *testing.T) { _ = os.RemoveAll(path) }() - tb, err := OpenTable(path) + var flushes uint64 + flushCallback := func() { + atomic.AddUint64(&flushes, 1) + } + tb, err := OpenTable(path, flushCallback) if err != nil { t.Fatalf("cannot open %q: %s", path, err) } @@ -78,6 +83,9 @@ func TestTableAddItemSerial(t *testing.T) { // Verify items count after pending items flush. tb.DebugFlush() + if atomic.LoadUint64(&flushes) == 0 { + t.Fatalf("unexpected zero flushes") + } var m TableMetrics tb.UpdateMetrics(&m) @@ -91,7 +99,7 @@ func TestTableAddItemSerial(t *testing.T) { testReopenTable(t, path, itemsCount) // Add more items in order to verify merge between inmemory parts and file-based parts. - tb, err = OpenTable(path) + tb, err = OpenTable(path, nil) if err != nil { t.Fatalf("cannot open %q: %s", path, err) } @@ -124,7 +132,7 @@ func TestTableCreateSnapshotAt(t *testing.T) { _ = os.RemoveAll(path) }() - tb, err := OpenTable(path) + tb, err := OpenTable(path, nil) if err != nil { t.Fatalf("cannot open %q: %s", path, err) } @@ -155,13 +163,13 @@ func TestTableCreateSnapshotAt(t *testing.T) { }() // Verify snapshots contain all the data. - tb1, err := OpenTable(snapshot1) + tb1, err := OpenTable(snapshot1, nil) if err != nil { t.Fatalf("cannot open %q: %s", path, err) } defer tb1.MustClose() - tb2, err := OpenTable(snapshot2) + tb2, err := OpenTable(snapshot2, nil) if err != nil { t.Fatalf("cannot open %q: %s", path, err) } @@ -205,7 +213,11 @@ func TestTableAddItemsConcurrent(t *testing.T) { _ = os.RemoveAll(path) }() - tb, err := OpenTable(path) + var flushes uint64 + flushCallback := func() { + atomic.AddUint64(&flushes, 1) + } + tb, err := OpenTable(path, flushCallback) if err != nil { t.Fatalf("cannot open %q: %s", path, err) } @@ -215,6 +227,10 @@ func TestTableAddItemsConcurrent(t *testing.T) { // Verify items count after pending items flush. tb.DebugFlush() + if atomic.LoadUint64(&flushes) == 0 { + t.Fatalf("unexpected zero flushes") + } + var m TableMetrics tb.UpdateMetrics(&m) if m.ItemsCount != itemsCount { @@ -227,7 +243,7 @@ func TestTableAddItemsConcurrent(t *testing.T) { testReopenTable(t, path, itemsCount) // Add more items in order to verify merge between inmemory parts and file-based parts. - tb, err = OpenTable(path) + tb, err = OpenTable(path, nil) if err != nil { t.Fatalf("cannot open %q: %s", path, err) } @@ -269,7 +285,7 @@ func testReopenTable(t *testing.T, path string, itemsCount int) { t.Helper() for i := 0; i < 10; i++ { - tb, err := OpenTable(path) + tb, err := OpenTable(path, nil) if err != nil { t.Fatalf("cannot re-open %q: %s", path, err) } diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 86b0bf41c..73af080b7 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -65,9 +65,6 @@ type indexDB struct { // matching low number of metrics. uselessTagFiltersCache *workingsetcache.Cache - tagCachePrefixes map[accountProjectKey]uint64 - tagCachePrefixesLock sync.RWMutex - indexSearchPool sync.Pool // An inmemory map[uint64]struct{} of deleted metricIDs. @@ -104,12 +101,6 @@ type indexDB struct { mustDrop uint64 } -// accountProjectKey is used for maps keyed by (AccountID, ProjectID). -type accountProjectKey struct { - AccountID uint32 - ProjectID uint32 -} - // openIndexDB opens index db from the given path with the given caches. func openIndexDB(path string, metricIDCache, metricNameCache *workingsetcache.Cache, currHourMetricIDs, prevHourMetricIDs *atomic.Value) (*indexDB, error) { if metricIDCache == nil { @@ -125,7 +116,7 @@ func openIndexDB(path string, metricIDCache, metricNameCache *workingsetcache.Ca logger.Panicf("BUG: prevHourMetricIDs must be non-nil") } - tb, err := mergeset.OpenTable(path) + tb, err := mergeset.OpenTable(path, invalidateTagCache) if err != nil { return nil, fmt.Errorf("cannot open indexDB %q: %s", path, err) } @@ -145,8 +136,6 @@ func openIndexDB(path string, metricIDCache, metricNameCache *workingsetcache.Ca metricNameCache: metricNameCache, uselessTagFiltersCache: workingsetcache.New(mem/128, time.Hour), - tagCachePrefixes: make(map[accountProjectKey]uint64), - currHourMetricIDs: currHourMetricIDs, prevHourMetricIDs: prevHourMetricIDs, } @@ -377,27 +366,17 @@ func (db *indexDB) putMetricNameToCache(metricID uint64, metricName []byte) { db.metricNameCache.Set(key[:], metricName) } -func (db *indexDB) marshalTagFiltersKey(dst []byte, tfss []*TagFilters, versioned bool) []byte { - if len(tfss) == 0 { - return nil - } +func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, versioned bool) []byte { prefix := ^uint64(0) if versioned { - k := accountProjectKey{ - AccountID: tfss[0].accountID, - ProjectID: tfss[0].projectID, - } - db.tagCachePrefixesLock.RLock() - prefix = db.tagCachePrefixes[k] - db.tagCachePrefixesLock.RUnlock() - if prefix == 0 { - // Create missing prefix. - // It is OK if multiple concurrent goroutines call invalidateTagCache - // for the same (accountID, projectID). - prefix = db.invalidateTagCache(k.AccountID, k.ProjectID) - } + prefix = atomic.LoadUint64(&tagFiltersKeyGen) } dst = encoding.MarshalUint64(dst, prefix) + if len(tfss) == 0 { + return dst + } + dst = encoding.MarshalUint32(dst, tfss[0].accountID) + dst = encoding.MarshalUint32(dst, tfss[0].projectID) for _, tfs := range tfss { dst = append(dst, 0) // separator between tfs groups. for i := range tfs.tfs { @@ -439,21 +418,13 @@ func unmarshalTSIDs(dst []TSID, src []byte) ([]TSID, error) { return dst, nil } -func (db *indexDB) invalidateTagCache(accountID, projectID uint32) uint64 { +func invalidateTagCache() { // This function must be fast, since it is called each // time new timeseries is added. - prefix := atomic.AddUint64(&tagCacheKeyPrefix, 1) - k := accountProjectKey{ - AccountID: accountID, - ProjectID: projectID, - } - db.tagCachePrefixesLock.Lock() - db.tagCachePrefixes[k] = prefix - db.tagCachePrefixesLock.Unlock() - return prefix + atomic.AddUint64(&tagFiltersKeyGen, 1) } -var tagCacheKeyPrefix uint64 +var tagFiltersKeyGen uint64 // getTSIDByNameNoCreate fills the dst with TSID for the given metricName. // @@ -555,9 +526,8 @@ func (db *indexDB) createTSIDByName(dst *TSID, metricName []byte) error { return fmt.Errorf("cannot create indexes: %s", err) } - // Invalidate tag cache for the given (AccountID, ProjectID), since - // it doesn't contain tags for the created mn -> TSID mapping. - _ = db.invalidateTagCache(mn.AccountID, mn.ProjectID) + // There is no need in invalidating tag cache, since it is invalidated + // on db.tb flush via invalidateTagCache flushCallback passed to OpenTable. return nil } @@ -904,8 +874,6 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) { if len(tfss) == 0 { return 0, nil } - accountID := tfss[0].accountID - projectID := tfss[0].projectID // Obtain metricIDs to delete. is := db.getIndexSearch() @@ -937,7 +905,7 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) { db.updateDeletedMetricIDs(metricIDs) // Reset TagFilters -> TSIDS cache, since it may contain deleted TSIDs. - _ = db.invalidateTagCache(accountID, projectID) + invalidateTagCache() // Do not reset uselessTagFiltersCache, since the found metricIDs // on cache miss are filtered out later with deletedMetricIDs. @@ -1010,7 +978,7 @@ func (db *indexDB) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) tfKeyBuf := tagFiltersKeyBufPool.Get() defer tagFiltersKeyBufPool.Put(tfKeyBuf) - tfKeyBuf.B = db.marshalTagFiltersKey(tfKeyBuf.B[:0], tfss, true) + tfKeyBuf.B = marshalTagFiltersKey(tfKeyBuf.B[:0], tfss, true) tsids, ok := db.getFromTagCache(tfKeyBuf.B) if ok { // Fast path - tsids found in the cache. @@ -1031,7 +999,7 @@ func (db *indexDB) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) defer tagFiltersKeyBufPool.Put(tfKeyExtBuf) // Data in extDB cannot be changed, so use unversioned keys for tag cache. - tfKeyExtBuf.B = extDB.marshalTagFiltersKey(tfKeyExtBuf.B[:0], tfss, false) + tfKeyExtBuf.B = marshalTagFiltersKey(tfKeyExtBuf.B[:0], tfss, false) tsids, ok := extDB.getFromTagCache(tfKeyExtBuf.B) if ok { extTSIDs = tsids diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index a87fce975..4c7a2880e 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -339,6 +339,10 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC return false } + type accountProjectKey struct { + AccountID uint32 + ProjectID uint32 + } allKeys := make(map[accountProjectKey]map[string]bool) timeseriesCounters := make(map[accountProjectKey]map[uint64]bool) var tsidCopy TSID