mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-11 15:34:56 +00:00
lib/storage: invalidate tagFilters -> TSIDS
cache when newly added index data becomes visible to search
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/163
This commit is contained in:
parent
aeaa5de5fe
commit
e1d76ec1f3
5 changed files with 60 additions and 27 deletions
|
@ -59,6 +59,8 @@ const rawItemsFlushInterval = time.Second
|
|||
type Table struct {
|
||||
path string
|
||||
|
||||
flushCallback func()
|
||||
|
||||
partsLock sync.Mutex
|
||||
parts []*partWrapper
|
||||
|
||||
|
@ -121,8 +123,11 @@ func (pw *partWrapper) decRef() {
|
|||
|
||||
// OpenTable opens a table on the given path.
|
||||
//
|
||||
// Optional flushCallback is called every time new data batch is flushed
|
||||
// to the underlying storage and becomes visible to search.
|
||||
//
|
||||
// The table is created if it doesn't exist yet.
|
||||
func OpenTable(path string) (*Table, error) {
|
||||
func OpenTable(path string, flushCallback func()) (*Table, error) {
|
||||
path = filepath.Clean(path)
|
||||
logger.Infof("opening table %q...", path)
|
||||
startTime := time.Now()
|
||||
|
@ -145,11 +150,12 @@ func OpenTable(path string) (*Table, error) {
|
|||
}
|
||||
|
||||
tb := &Table{
|
||||
path: path,
|
||||
parts: pws,
|
||||
mergeIdx: uint64(time.Now().UnixNano()),
|
||||
flockF: flockF,
|
||||
stopCh: make(chan struct{}),
|
||||
path: path,
|
||||
flushCallback: flushCallback,
|
||||
parts: pws,
|
||||
mergeIdx: uint64(time.Now().UnixNano()),
|
||||
flockF: flockF,
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
tb.startPartMergers()
|
||||
tb.startRawItemsFlusher()
|
||||
|
@ -444,6 +450,9 @@ func (tb *Table) mergeRawItemsBlocks(blocksToMerge []*inmemoryBlock) {
|
|||
if err := tb.mergeParts(pws, nil, true); err != nil {
|
||||
logger.Panicf("FATAL: cannot merge raw parts: %s", err)
|
||||
}
|
||||
if tb.flushCallback != nil {
|
||||
tb.flushCallback()
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"math/rand"
|
||||
"os"
|
||||
"sort"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
@ -39,7 +40,7 @@ func TestTableSearchSerial(t *testing.T) {
|
|||
|
||||
func() {
|
||||
// Re-open the table and verify the search works.
|
||||
tb, err := OpenTable(path)
|
||||
tb, err := OpenTable(path, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open table: %s", err)
|
||||
}
|
||||
|
@ -74,7 +75,7 @@ func TestTableSearchConcurrent(t *testing.T) {
|
|||
|
||||
// Re-open the table and verify the search works.
|
||||
func() {
|
||||
tb, err := OpenTable(path)
|
||||
tb, err := OpenTable(path, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open table: %s", err)
|
||||
}
|
||||
|
@ -146,7 +147,11 @@ func testTableSearchSerial(tb *Table, items []string) error {
|
|||
}
|
||||
|
||||
func newTestTable(path string, itemsCount int) (*Table, []string, error) {
|
||||
tb, err := OpenTable(path)
|
||||
var flushes uint64
|
||||
flushCallback := func() {
|
||||
atomic.AddUint64(&flushes, 1)
|
||||
}
|
||||
tb, err := OpenTable(path, flushCallback)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("cannot open table: %s", err)
|
||||
}
|
||||
|
@ -159,6 +164,9 @@ func newTestTable(path string, itemsCount int) (*Table, []string, error) {
|
|||
items[i] = item
|
||||
}
|
||||
tb.DebugFlush()
|
||||
if itemsCount > 0 && atomic.LoadUint64(&flushes) == 0 {
|
||||
return nil, nil, fmt.Errorf("unexpeted zero flushes for itemsCount=%d", itemsCount)
|
||||
}
|
||||
|
||||
sort.Strings(items)
|
||||
return tb, items, nil
|
||||
|
|
|
@ -32,7 +32,7 @@ func benchmarkTableSearch(b *testing.B, itemsCount int) {
|
|||
|
||||
// Force finishing pending merges
|
||||
tb.MustClose()
|
||||
tb, err = OpenTable(path)
|
||||
tb, err = OpenTable(path, nil)
|
||||
if err != nil {
|
||||
b.Fatalf("unexpected error when re-opening table %q: %s", path, err)
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
|
@ -20,7 +21,7 @@ func TestTableOpenClose(t *testing.T) {
|
|||
}()
|
||||
|
||||
// Create a new table
|
||||
tb, err := OpenTable(path)
|
||||
tb, err := OpenTable(path, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot create new table: %s", err)
|
||||
}
|
||||
|
@ -30,7 +31,7 @@ func TestTableOpenClose(t *testing.T) {
|
|||
|
||||
// Re-open created table multiple times.
|
||||
for i := 0; i < 10; i++ {
|
||||
tb, err := OpenTable(path)
|
||||
tb, err := OpenTable(path, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open created table: %s", err)
|
||||
}
|
||||
|
@ -44,14 +45,14 @@ func TestTableOpenMultipleTimes(t *testing.T) {
|
|||
_ = os.RemoveAll(path)
|
||||
}()
|
||||
|
||||
tb1, err := OpenTable(path)
|
||||
tb1, err := OpenTable(path, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open table: %s", err)
|
||||
}
|
||||
defer tb1.MustClose()
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
tb2, err := OpenTable(path)
|
||||
tb2, err := OpenTable(path, nil)
|
||||
if err == nil {
|
||||
tb2.MustClose()
|
||||
t.Fatalf("expecting non-nil error when opening already opened table")
|
||||
|
@ -68,7 +69,11 @@ func TestTableAddItemSerial(t *testing.T) {
|
|||
_ = os.RemoveAll(path)
|
||||
}()
|
||||
|
||||
tb, err := OpenTable(path)
|
||||
var flushes uint64
|
||||
flushCallback := func() {
|
||||
atomic.AddUint64(&flushes, 1)
|
||||
}
|
||||
tb, err := OpenTable(path, flushCallback)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open %q: %s", path, err)
|
||||
}
|
||||
|
@ -78,6 +83,9 @@ func TestTableAddItemSerial(t *testing.T) {
|
|||
|
||||
// Verify items count after pending items flush.
|
||||
tb.DebugFlush()
|
||||
if atomic.LoadUint64(&flushes) == 0 {
|
||||
t.Fatalf("unexpected zero flushes")
|
||||
}
|
||||
|
||||
var m TableMetrics
|
||||
tb.UpdateMetrics(&m)
|
||||
|
@ -91,7 +99,7 @@ func TestTableAddItemSerial(t *testing.T) {
|
|||
testReopenTable(t, path, itemsCount)
|
||||
|
||||
// Add more items in order to verify merge between inmemory parts and file-based parts.
|
||||
tb, err = OpenTable(path)
|
||||
tb, err = OpenTable(path, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open %q: %s", path, err)
|
||||
}
|
||||
|
@ -124,7 +132,7 @@ func TestTableCreateSnapshotAt(t *testing.T) {
|
|||
_ = os.RemoveAll(path)
|
||||
}()
|
||||
|
||||
tb, err := OpenTable(path)
|
||||
tb, err := OpenTable(path, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open %q: %s", path, err)
|
||||
}
|
||||
|
@ -155,13 +163,13 @@ func TestTableCreateSnapshotAt(t *testing.T) {
|
|||
}()
|
||||
|
||||
// Verify snapshots contain all the data.
|
||||
tb1, err := OpenTable(snapshot1)
|
||||
tb1, err := OpenTable(snapshot1, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open %q: %s", path, err)
|
||||
}
|
||||
defer tb1.MustClose()
|
||||
|
||||
tb2, err := OpenTable(snapshot2)
|
||||
tb2, err := OpenTable(snapshot2, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open %q: %s", path, err)
|
||||
}
|
||||
|
@ -205,7 +213,11 @@ func TestTableAddItemsConcurrent(t *testing.T) {
|
|||
_ = os.RemoveAll(path)
|
||||
}()
|
||||
|
||||
tb, err := OpenTable(path)
|
||||
var flushes uint64
|
||||
flushCallback := func() {
|
||||
atomic.AddUint64(&flushes, 1)
|
||||
}
|
||||
tb, err := OpenTable(path, flushCallback)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open %q: %s", path, err)
|
||||
}
|
||||
|
@ -215,6 +227,10 @@ func TestTableAddItemsConcurrent(t *testing.T) {
|
|||
|
||||
// Verify items count after pending items flush.
|
||||
tb.DebugFlush()
|
||||
if atomic.LoadUint64(&flushes) == 0 {
|
||||
t.Fatalf("unexpected zero flushes")
|
||||
}
|
||||
|
||||
var m TableMetrics
|
||||
tb.UpdateMetrics(&m)
|
||||
if m.ItemsCount != itemsCount {
|
||||
|
@ -227,7 +243,7 @@ func TestTableAddItemsConcurrent(t *testing.T) {
|
|||
testReopenTable(t, path, itemsCount)
|
||||
|
||||
// Add more items in order to verify merge between inmemory parts and file-based parts.
|
||||
tb, err = OpenTable(path)
|
||||
tb, err = OpenTable(path, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open %q: %s", path, err)
|
||||
}
|
||||
|
@ -269,7 +285,7 @@ func testReopenTable(t *testing.T, path string, itemsCount int) {
|
|||
t.Helper()
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
tb, err := OpenTable(path)
|
||||
tb, err := OpenTable(path, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot re-open %q: %s", path, err)
|
||||
}
|
||||
|
|
|
@ -116,7 +116,7 @@ func openIndexDB(path string, metricIDCache, metricNameCache *workingsetcache.Ca
|
|||
logger.Panicf("BUG: prevHourMetricIDs must be non-nil")
|
||||
}
|
||||
|
||||
tb, err := mergeset.OpenTable(path)
|
||||
tb, err := mergeset.OpenTable(path, invalidateTagCache)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot open indexDB %q: %s", path, err)
|
||||
}
|
||||
|
@ -405,7 +405,7 @@ func unmarshalTSIDs(dst []TSID, src []byte) ([]TSID, error) {
|
|||
return dst, nil
|
||||
}
|
||||
|
||||
func (db *indexDB) invalidateTagCache() {
|
||||
func invalidateTagCache() {
|
||||
// This function must be fast, since it is called each
|
||||
// time new timeseries is added.
|
||||
atomic.AddUint64(&tagFiltersKeyGen, 1)
|
||||
|
@ -513,8 +513,8 @@ func (db *indexDB) createTSIDByName(dst *TSID, metricName []byte) error {
|
|||
return fmt.Errorf("cannot create indexes: %s", err)
|
||||
}
|
||||
|
||||
// Invalidate tag cache, since it doesn't contain tags for the created mn -> TSID mapping.
|
||||
db.invalidateTagCache()
|
||||
// There is no need in invalidating tag cache, since it is invalidated
|
||||
// on db.tb flush via invalidateTagCache flushCallback passed to OpenTable.
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -890,7 +890,7 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) {
|
|||
db.updateDeletedMetricIDs(metricIDs)
|
||||
|
||||
// Reset TagFilters -> TSIDS cache, since it may contain deleted TSIDs.
|
||||
db.invalidateTagCache()
|
||||
invalidateTagCache()
|
||||
|
||||
// Do not reset uselessTagFiltersCache, since the found metricIDs
|
||||
// on cache miss are filtered out later with deletedMetricIDs.
|
||||
|
|
Loading…
Reference in a new issue