diff --git a/lib/storage/merge.go b/lib/storage/merge.go index 8b326bf9d..347f2c792 100644 --- a/lib/storage/merge.go +++ b/lib/storage/merge.go @@ -7,7 +7,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" ) // mergeBlockStreams merges bsrs into bsw and updates ph. @@ -15,13 +14,13 @@ import ( // mergeBlockStreams returns immediately if stopCh is closed. // // rowsMerged is atomically updated with the number of merged rows during the merge. -func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{}, - dmis *uint64set.Set, retentionDeadline int64, rowsMerged, rowsDeleted *uint64) error { +func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{}, s *Storage, retentionDeadline int64, + rowsMerged, rowsDeleted *uint64) error { ph.Reset() bsm := bsmPool.Get().(*blockStreamMerger) bsm.Init(bsrs, retentionDeadline) - err := mergeBlockStreamsInternal(ph, bsw, bsm, stopCh, dmis, rowsMerged, rowsDeleted) + err := mergeBlockStreamsInternal(ph, bsw, bsm, stopCh, s, rowsMerged, rowsDeleted) bsm.reset() bsmPool.Put(bsm) bsw.MustClose() @@ -39,8 +38,8 @@ var bsmPool = &sync.Pool{ var errForciblyStopped = fmt.Errorf("forcibly stopped") -func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *blockStreamMerger, stopCh <-chan struct{}, - dmis *uint64set.Set, rowsMerged, rowsDeleted *uint64) error { +func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *blockStreamMerger, stopCh <-chan struct{}, s *Storage, rowsMerged, rowsDeleted *uint64) error { + dmis := s.getDeletedMetricIDs() pendingBlockIsEmpty := true pendingBlock := getBlock() defer putBlock(pendingBlock) diff --git a/lib/storage/merge_test.go b/lib/storage/merge_test.go index dd12ccaff..a329340bf 100644 --- a/lib/storage/merge_test.go +++ b/lib/storage/merge_test.go @@ -365,7 +365,10 @@ func TestMergeForciblyStop(t *testing.T) { ch := make(chan struct{}) var rowsMerged, rowsDeleted uint64 close(ch) - if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, ch, nil, 0, &rowsMerged, &rowsDeleted); !errors.Is(err, errForciblyStopped) { + + strg := &Storage{} + strg.setDeletedMetricIDs(nil) + if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, ch, strg, 0, &rowsMerged, &rowsDeleted); !errors.Is(err, errForciblyStopped) { t.Fatalf("unexpected error in mergeBlockStreams: got %v; want %v", err, errForciblyStopped) } if rowsMerged != 0 { @@ -384,8 +387,10 @@ func testMergeBlockStreams(t *testing.T, bsrs []*blockStreamReader, expectedBloc var bsw blockStreamWriter bsw.InitFromInmemoryPart(&mp) + strg := &Storage{} + strg.setDeletedMetricIDs(nil) var rowsMerged, rowsDeleted uint64 - if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, nil, nil, 0, &rowsMerged, &rowsDeleted); err != nil { + if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, nil, strg, 0, &rowsMerged, &rowsDeleted); err != nil { t.Fatalf("unexpected error in mergeBlockStreams: %s", err) } diff --git a/lib/storage/merge_timing_test.go b/lib/storage/merge_timing_test.go index 38d2f5ba6..5c68bae5e 100644 --- a/lib/storage/merge_timing_test.go +++ b/lib/storage/merge_timing_test.go @@ -24,6 +24,8 @@ func BenchmarkMergeBlockStreamsFourSourcesBestCase(b *testing.B) { func benchmarkMergeBlockStreams(b *testing.B, mps []*inmemoryPart, rowsPerLoop int64) { var rowsMerged, rowsDeleted uint64 + strg := &Storage{} + strg.setDeletedMetricIDs(nil) b.ReportAllocs() b.SetBytes(rowsPerLoop) @@ -41,7 +43,7 @@ func benchmarkMergeBlockStreams(b *testing.B, mps []*inmemoryPart, rowsPerLoop i } mpOut.Reset() bsw.InitFromInmemoryPart(&mpOut) - if err := mergeBlockStreams(&mpOut.ph, &bsw, bsrs, nil, nil, 0, &rowsMerged, &rowsDeleted); err != nil { + if err := mergeBlockStreams(&mpOut.ph, &bsw, bsrs, nil, strg, 0, &rowsMerged, &rowsDeleted); err != nil { panic(fmt.Errorf("cannot merge block streams: %w", err)) } } diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 62f7982cd..d5b38a62d 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -21,7 +21,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" ) func maxSmallPartSize() uint64 { @@ -118,8 +117,8 @@ type partition struct { smallPartsPath string bigPartsPath string - // The callack that returns deleted metric ids which must be skipped during merge. - getDeletedMetricIDs func() *uint64set.Set + // The parent storage. + s *Storage // data retention in milliseconds. // Used for deleting data outside the retention during background merge. @@ -202,8 +201,7 @@ func (pw *partWrapper) decRef() { // createPartition creates new partition for the given timestamp and the given paths // to small and big partitions. -func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string, - getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64, isReadOnly *uint32) (*partition, error) { +func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string, s *Storage, retentionMsecs int64, isReadOnly *uint32) (*partition, error) { name := timestampToPartitionName(timestamp) smallPartsPath := filepath.Clean(smallPartitionsPath) + "/" + name bigPartsPath := filepath.Clean(bigPartitionsPath) + "/" + name @@ -216,7 +214,7 @@ func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath str return nil, fmt.Errorf("cannot create directories for big parts %q: %w", bigPartsPath, err) } - pt := newPartition(name, smallPartsPath, bigPartsPath, getDeletedMetricIDs, retentionMsecs, isReadOnly) + pt := newPartition(name, smallPartsPath, bigPartsPath, s, retentionMsecs, isReadOnly) pt.tr.fromPartitionTimestamp(timestamp) pt.startMergeWorkers() pt.startRawRowsFlusher() @@ -242,7 +240,7 @@ func (pt *partition) Drop() { } // openPartition opens the existing partition from the given paths. -func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64, isReadOnly *uint32) (*partition, error) { +func openPartition(smallPartsPath, bigPartsPath string, s *Storage, retentionMsecs int64, isReadOnly *uint32) (*partition, error) { smallPartsPath = filepath.Clean(smallPartsPath) bigPartsPath = filepath.Clean(bigPartsPath) @@ -266,7 +264,7 @@ func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func return nil, fmt.Errorf("cannot open big parts from %q: %w", bigPartsPath, err) } - pt := newPartition(name, smallPartsPath, bigPartsPath, getDeletedMetricIDs, retentionMsecs, isReadOnly) + pt := newPartition(name, smallPartsPath, bigPartsPath, s, retentionMsecs, isReadOnly) pt.smallParts = smallParts pt.bigParts = bigParts if err := pt.tr.fromPartitionName(name); err != nil { @@ -280,15 +278,15 @@ func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func return pt, nil } -func newPartition(name, smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64, isReadOnly *uint32) *partition { +func newPartition(name, smallPartsPath, bigPartsPath string, s *Storage, retentionMsecs int64, isReadOnly *uint32) *partition { p := &partition{ name: name, smallPartsPath: smallPartsPath, bigPartsPath: bigPartsPath, - getDeletedMetricIDs: getDeletedMetricIDs, - retentionMsecs: retentionMsecs, - isReadOnly: isReadOnly, + s: s, + retentionMsecs: retentionMsecs, + isReadOnly: isReadOnly, mergeIdx: uint64(time.Now().UnixNano()), stopCh: make(chan struct{}), @@ -1205,7 +1203,6 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro } // Merge parts. - dmis := pt.getDeletedMetricIDs() var ph partHeader rowsMerged := &pt.smallRowsMerged rowsDeleted := &pt.smallRowsDeleted @@ -1219,7 +1216,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro atomic.AddUint64(&pt.activeSmallMerges, 1) } retentionDeadline := timestampFromTime(startTime) - pt.retentionMsecs - err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, dmis, retentionDeadline, rowsMerged, rowsDeleted) + err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, pt.s, retentionDeadline, rowsMerged, rowsDeleted) if isBigPart { atomic.AddUint64(&pt.activeBigMerges, ^uint64(0)) } else { @@ -1252,7 +1249,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro dstPartPath := "" if ph.RowsCount > 0 { // The destination part may have no rows if they are deleted - // during the merge due to dmis. + // during the merge due to deleted time series. dstPartPath = ph.Path(ptPath, mergeIdx) } fmt.Fprintf(&bb, "%s -> %s\n", tmpPartPath, dstPartPath) diff --git a/lib/storage/partition_search_test.go b/lib/storage/partition_search_test.go index 630796d36..6e4fb62d6 100644 --- a/lib/storage/partition_search_test.go +++ b/lib/storage/partition_search_test.go @@ -7,8 +7,6 @@ import ( "sort" "testing" "time" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" ) func TestPartitionSearch(t *testing.T) { @@ -167,9 +165,11 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma }) // Create partition from rowss and test search on it. + strg := &Storage{} + strg.setDeletedMetricIDs(nil) retentionMsecs := timestampFromTime(time.Now()) - ptr.MinTimestamp + 3600*1000 var isReadOnly uint32 - pt, err := createPartition(ptt, "./small-table", "./big-table", nilGetDeletedMetricIDs, retentionMsecs, &isReadOnly) + pt, err := createPartition(ptt, "./small-table", "./big-table", strg, retentionMsecs, &isReadOnly) if err != nil { t.Fatalf("cannot create partition: %s", err) } @@ -193,7 +193,7 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma pt.MustClose() // Open the created partition and test search on it. - pt, err = openPartition(smallPartsPath, bigPartsPath, nilGetDeletedMetricIDs, retentionMsecs, &isReadOnly) + pt, err = openPartition(smallPartsPath, bigPartsPath, strg, retentionMsecs, &isReadOnly) if err != nil { t.Fatalf("cannot open partition: %s", err) } @@ -278,7 +278,3 @@ func testPartitionSearchSerial(pt *partition, tsids []TSID, tr TimeRange, rbsExp return nil } - -func nilGetDeletedMetricIDs() *uint64set.Set { - return nil -} diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 3280229b9..8d8cecc6f 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -258,7 +258,7 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer // Load data tablePath := path + "/data" - tb, err := openTable(tablePath, s.getDeletedMetricIDs, retentionMsecs, &s.isReadOnly) + tb, err := openTable(tablePath, s, retentionMsecs, &s.isReadOnly) if err != nil { s.idb().MustClose() return nil, fmt.Errorf("cannot open table at %q: %w", tablePath, err) diff --git a/lib/storage/table.go b/lib/storage/table.go index cf005814b..e24b15a95 100644 --- a/lib/storage/table.go +++ b/lib/storage/table.go @@ -12,7 +12,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" ) // table represents a single table with time series data. @@ -21,9 +20,9 @@ type table struct { smallPartitionsPath string bigPartitionsPath string - getDeletedMetricIDs func() *uint64set.Set - retentionMsecs int64 - isReadOnly *uint32 + s *Storage + retentionMsecs int64 + isReadOnly *uint32 ptws []*partitionWrapper ptwsLock sync.Mutex @@ -84,7 +83,7 @@ func (ptw *partitionWrapper) scheduleToDrop() { // The table is created if it doesn't exist. // // Data older than the retentionMsecs may be dropped at any time. -func openTable(path string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64, isReadOnly *uint32) (*table, error) { +func openTable(path string, s *Storage, retentionMsecs int64, isReadOnly *uint32) (*table, error) { path = filepath.Clean(path) // Create a directory for the table if it doesn't exist yet. @@ -122,7 +121,7 @@ func openTable(path string, getDeletedMetricIDs func() *uint64set.Set, retention fs.MustRemoveTemporaryDirs(bigSnapshotsPath) // Open partitions. - pts, err := openPartitions(smallPartitionsPath, bigPartitionsPath, getDeletedMetricIDs, retentionMsecs, isReadOnly) + pts, err := openPartitions(smallPartitionsPath, bigPartitionsPath, s, retentionMsecs, isReadOnly) if err != nil { return nil, fmt.Errorf("cannot open partitions in the table %q: %w", path, err) } @@ -131,7 +130,7 @@ func openTable(path string, getDeletedMetricIDs func() *uint64set.Set, retention path: path, smallPartitionsPath: smallPartitionsPath, bigPartitionsPath: bigPartitionsPath, - getDeletedMetricIDs: getDeletedMetricIDs, + s: s, retentionMsecs: retentionMsecs, isReadOnly: isReadOnly, @@ -366,7 +365,7 @@ func (tb *table) AddRows(rows []rawRow) error { continue } - pt, err := createPartition(r.Timestamp, tb.smallPartitionsPath, tb.bigPartitionsPath, tb.getDeletedMetricIDs, tb.retentionMsecs, tb.isReadOnly) + pt, err := createPartition(r.Timestamp, tb.smallPartitionsPath, tb.bigPartitionsPath, tb.s, tb.retentionMsecs, tb.isReadOnly) if err != nil { // Return only the first error, since it has no sense in returning all errors. tb.ptwsLock.Unlock() @@ -504,7 +503,7 @@ func (tb *table) PutPartitions(ptws []*partitionWrapper) { } } -func openPartitions(smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64, isReadOnly *uint32) ([]*partition, error) { +func openPartitions(smallPartitionsPath, bigPartitionsPath string, s *Storage, retentionMsecs int64, isReadOnly *uint32) ([]*partition, error) { // Certain partition directories in either `big` or `small` dir may be missing // after restoring from backup. So populate partition names from both dirs. ptNames := make(map[string]bool) @@ -518,7 +517,7 @@ func openPartitions(smallPartitionsPath, bigPartitionsPath string, getDeletedMet for ptName := range ptNames { smallPartsPath := smallPartitionsPath + "/" + ptName bigPartsPath := bigPartitionsPath + "/" + ptName - pt, err := openPartition(smallPartsPath, bigPartsPath, getDeletedMetricIDs, retentionMsecs, isReadOnly) + pt, err := openPartition(smallPartsPath, bigPartsPath, s, retentionMsecs, isReadOnly) if err != nil { mustClosePartitions(pts) return nil, fmt.Errorf("cannot open partition %q: %w", ptName, err) diff --git a/lib/storage/table_search_test.go b/lib/storage/table_search_test.go index fe30243dc..7f4bf08c7 100644 --- a/lib/storage/table_search_test.go +++ b/lib/storage/table_search_test.go @@ -181,8 +181,10 @@ func testTableSearchEx(t *testing.T, trData, trSearch TimeRange, partitionsCount }) // Create a table from rowss and test search on it. + strg := &Storage{} + strg.setDeletedMetricIDs(nil) var isReadOnly uint32 - tb, err := openTable("./test-table", nilGetDeletedMetricIDs, maxRetentionMsecs, &isReadOnly) + tb, err := openTable("./test-table", strg, maxRetentionMsecs, &isReadOnly) if err != nil { t.Fatalf("cannot create table: %s", err) } @@ -203,7 +205,7 @@ func testTableSearchEx(t *testing.T, trData, trSearch TimeRange, partitionsCount tb.MustClose() // Open the created table and test search on it. - tb, err = openTable("./test-table", nilGetDeletedMetricIDs, maxRetentionMsecs, &isReadOnly) + tb, err = openTable("./test-table", strg, maxRetentionMsecs, &isReadOnly) if err != nil { t.Fatalf("cannot open table: %s", err) } diff --git a/lib/storage/table_search_timing_test.go b/lib/storage/table_search_timing_test.go index 342765076..5c0140176 100644 --- a/lib/storage/table_search_timing_test.go +++ b/lib/storage/table_search_timing_test.go @@ -44,8 +44,10 @@ func openBenchTable(b *testing.B, startTimestamp int64, rowsPerInsert, rowsCount createBenchTable(b, path, startTimestamp, rowsPerInsert, rowsCount, tsidsCount) createdBenchTables[path] = true } + strg := &Storage{} + strg.setDeletedMetricIDs(nil) var isReadOnly uint32 - tb, err := openTable(path, nilGetDeletedMetricIDs, maxRetentionMsecs, &isReadOnly) + tb, err := openTable(path, strg, maxRetentionMsecs, &isReadOnly) if err != nil { b.Fatalf("cnanot open table %q: %s", path, err) } @@ -68,8 +70,10 @@ var createdBenchTables = make(map[string]bool) func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerInsert, rowsCount, tsidsCount int) { b.Helper() + strg := &Storage{} + strg.setDeletedMetricIDs(nil) var isReadOnly uint32 - tb, err := openTable(path, nilGetDeletedMetricIDs, maxRetentionMsecs, &isReadOnly) + tb, err := openTable(path, strg, maxRetentionMsecs, &isReadOnly) if err != nil { b.Fatalf("cannot open table %q: %s", path, err) } diff --git a/lib/storage/table_test.go b/lib/storage/table_test.go index 1d1ce4cd5..2a3533a24 100644 --- a/lib/storage/table_test.go +++ b/lib/storage/table_test.go @@ -17,8 +17,10 @@ func TestTableOpenClose(t *testing.T) { }() // Create a new table + strg := &Storage{} + strg.setDeletedMetricIDs(nil) var isReadOnly uint32 - tb, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs, &isReadOnly) + tb, err := openTable(path, strg, retentionMsecs, &isReadOnly) if err != nil { t.Fatalf("cannot create new table: %s", err) } @@ -28,7 +30,7 @@ func TestTableOpenClose(t *testing.T) { // Re-open created table multiple times. for i := 0; i < 10; i++ { - tb, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs, &isReadOnly) + tb, err := openTable(path, strg, retentionMsecs, &isReadOnly) if err != nil { t.Fatalf("cannot open created table: %s", err) } @@ -44,15 +46,17 @@ func TestTableOpenMultipleTimes(t *testing.T) { _ = os.RemoveAll(path) }() + strg := &Storage{} + strg.setDeletedMetricIDs(nil) var isReadOnly uint32 - tb1, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs, &isReadOnly) + tb1, err := openTable(path, strg, retentionMsecs, &isReadOnly) if err != nil { t.Fatalf("cannot open table the first time: %s", err) } defer tb1.MustClose() for i := 0; i < 10; i++ { - tb2, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs, &isReadOnly) + tb2, err := openTable(path, strg, retentionMsecs, &isReadOnly) if err == nil { tb2.MustClose() t.Fatalf("expecting non-nil error when opening already opened table") diff --git a/lib/storage/table_timing_test.go b/lib/storage/table_timing_test.go index 8125d83ce..2b4cd6316 100644 --- a/lib/storage/table_timing_test.go +++ b/lib/storage/table_timing_test.go @@ -45,9 +45,11 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) { b.ReportAllocs() b.SetBytes(int64(rowsCountExpected)) tablePath := "./benchmarkTableAddRows" + strg := &Storage{} + strg.setDeletedMetricIDs(nil) for i := 0; i < b.N; i++ { var isReadOnly uint32 - tb, err := openTable(tablePath, nilGetDeletedMetricIDs, maxRetentionMsecs, &isReadOnly) + tb, err := openTable(tablePath, strg, maxRetentionMsecs, &isReadOnly) if err != nil { b.Fatalf("cannot open table %q: %s", tablePath, err) } @@ -95,7 +97,7 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) { tb.MustClose() // Open the table from files and verify the rows count on it - tb, err = openTable(tablePath, nilGetDeletedMetricIDs, maxRetentionMsecs, &isReadOnly) + tb, err = openTable(tablePath, strg, maxRetentionMsecs, &isReadOnly) if err != nil { b.Fatalf("cannot open table %q: %s", tablePath, err) }