lib/storage: pass Storage to table and partition instead of getDeletedMetricIDs callback

This improves code readability a bit.
This commit is contained in:
Aliaksandr Valialkin 2022-10-23 16:08:54 +03:00
parent 54f35c175c
commit d2d30581a0
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
11 changed files with 63 additions and 53 deletions

View file

@ -7,7 +7,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
)
// mergeBlockStreams merges bsrs into bsw and updates ph.
@ -15,13 +14,13 @@ import (
// mergeBlockStreams returns immediately if stopCh is closed.
//
// rowsMerged is atomically updated with the number of merged rows during the merge.
func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{},
dmis *uint64set.Set, retentionDeadline int64, rowsMerged, rowsDeleted *uint64) error {
func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{}, s *Storage, retentionDeadline int64,
rowsMerged, rowsDeleted *uint64) error {
ph.Reset()
bsm := bsmPool.Get().(*blockStreamMerger)
bsm.Init(bsrs, retentionDeadline)
err := mergeBlockStreamsInternal(ph, bsw, bsm, stopCh, dmis, rowsMerged, rowsDeleted)
err := mergeBlockStreamsInternal(ph, bsw, bsm, stopCh, s, rowsMerged, rowsDeleted)
bsm.reset()
bsmPool.Put(bsm)
bsw.MustClose()
@ -39,8 +38,8 @@ var bsmPool = &sync.Pool{
var errForciblyStopped = fmt.Errorf("forcibly stopped")
func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *blockStreamMerger, stopCh <-chan struct{},
dmis *uint64set.Set, rowsMerged, rowsDeleted *uint64) error {
func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *blockStreamMerger, stopCh <-chan struct{}, s *Storage, rowsMerged, rowsDeleted *uint64) error {
dmis := s.getDeletedMetricIDs()
pendingBlockIsEmpty := true
pendingBlock := getBlock()
defer putBlock(pendingBlock)

View file

@ -365,7 +365,10 @@ func TestMergeForciblyStop(t *testing.T) {
ch := make(chan struct{})
var rowsMerged, rowsDeleted uint64
close(ch)
if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, ch, nil, 0, &rowsMerged, &rowsDeleted); !errors.Is(err, errForciblyStopped) {
strg := &Storage{}
strg.setDeletedMetricIDs(nil)
if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, ch, strg, 0, &rowsMerged, &rowsDeleted); !errors.Is(err, errForciblyStopped) {
t.Fatalf("unexpected error in mergeBlockStreams: got %v; want %v", err, errForciblyStopped)
}
if rowsMerged != 0 {
@ -384,8 +387,10 @@ func testMergeBlockStreams(t *testing.T, bsrs []*blockStreamReader, expectedBloc
var bsw blockStreamWriter
bsw.InitFromInmemoryPart(&mp)
strg := &Storage{}
strg.setDeletedMetricIDs(nil)
var rowsMerged, rowsDeleted uint64
if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, nil, nil, 0, &rowsMerged, &rowsDeleted); err != nil {
if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, nil, strg, 0, &rowsMerged, &rowsDeleted); err != nil {
t.Fatalf("unexpected error in mergeBlockStreams: %s", err)
}

View file

@ -24,6 +24,8 @@ func BenchmarkMergeBlockStreamsFourSourcesBestCase(b *testing.B) {
func benchmarkMergeBlockStreams(b *testing.B, mps []*inmemoryPart, rowsPerLoop int64) {
var rowsMerged, rowsDeleted uint64
strg := &Storage{}
strg.setDeletedMetricIDs(nil)
b.ReportAllocs()
b.SetBytes(rowsPerLoop)
@ -41,7 +43,7 @@ func benchmarkMergeBlockStreams(b *testing.B, mps []*inmemoryPart, rowsPerLoop i
}
mpOut.Reset()
bsw.InitFromInmemoryPart(&mpOut)
if err := mergeBlockStreams(&mpOut.ph, &bsw, bsrs, nil, nil, 0, &rowsMerged, &rowsDeleted); err != nil {
if err := mergeBlockStreams(&mpOut.ph, &bsw, bsrs, nil, strg, 0, &rowsMerged, &rowsDeleted); err != nil {
panic(fmt.Errorf("cannot merge block streams: %w", err))
}
}

View file

@ -21,7 +21,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
)
func maxSmallPartSize() uint64 {
@ -118,8 +117,8 @@ type partition struct {
smallPartsPath string
bigPartsPath string
// The callack that returns deleted metric ids which must be skipped during merge.
getDeletedMetricIDs func() *uint64set.Set
// The parent storage.
s *Storage
// data retention in milliseconds.
// Used for deleting data outside the retention during background merge.
@ -202,8 +201,7 @@ func (pw *partWrapper) decRef() {
// createPartition creates new partition for the given timestamp and the given paths
// to small and big partitions.
func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string,
getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64, isReadOnly *uint32) (*partition, error) {
func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string, s *Storage, retentionMsecs int64, isReadOnly *uint32) (*partition, error) {
name := timestampToPartitionName(timestamp)
smallPartsPath := filepath.Clean(smallPartitionsPath) + "/" + name
bigPartsPath := filepath.Clean(bigPartitionsPath) + "/" + name
@ -216,7 +214,7 @@ func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath str
return nil, fmt.Errorf("cannot create directories for big parts %q: %w", bigPartsPath, err)
}
pt := newPartition(name, smallPartsPath, bigPartsPath, getDeletedMetricIDs, retentionMsecs, isReadOnly)
pt := newPartition(name, smallPartsPath, bigPartsPath, s, retentionMsecs, isReadOnly)
pt.tr.fromPartitionTimestamp(timestamp)
pt.startMergeWorkers()
pt.startRawRowsFlusher()
@ -242,7 +240,7 @@ func (pt *partition) Drop() {
}
// openPartition opens the existing partition from the given paths.
func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64, isReadOnly *uint32) (*partition, error) {
func openPartition(smallPartsPath, bigPartsPath string, s *Storage, retentionMsecs int64, isReadOnly *uint32) (*partition, error) {
smallPartsPath = filepath.Clean(smallPartsPath)
bigPartsPath = filepath.Clean(bigPartsPath)
@ -266,7 +264,7 @@ func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func
return nil, fmt.Errorf("cannot open big parts from %q: %w", bigPartsPath, err)
}
pt := newPartition(name, smallPartsPath, bigPartsPath, getDeletedMetricIDs, retentionMsecs, isReadOnly)
pt := newPartition(name, smallPartsPath, bigPartsPath, s, retentionMsecs, isReadOnly)
pt.smallParts = smallParts
pt.bigParts = bigParts
if err := pt.tr.fromPartitionName(name); err != nil {
@ -280,15 +278,15 @@ func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func
return pt, nil
}
func newPartition(name, smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64, isReadOnly *uint32) *partition {
func newPartition(name, smallPartsPath, bigPartsPath string, s *Storage, retentionMsecs int64, isReadOnly *uint32) *partition {
p := &partition{
name: name,
smallPartsPath: smallPartsPath,
bigPartsPath: bigPartsPath,
getDeletedMetricIDs: getDeletedMetricIDs,
retentionMsecs: retentionMsecs,
isReadOnly: isReadOnly,
s: s,
retentionMsecs: retentionMsecs,
isReadOnly: isReadOnly,
mergeIdx: uint64(time.Now().UnixNano()),
stopCh: make(chan struct{}),
@ -1205,7 +1203,6 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro
}
// Merge parts.
dmis := pt.getDeletedMetricIDs()
var ph partHeader
rowsMerged := &pt.smallRowsMerged
rowsDeleted := &pt.smallRowsDeleted
@ -1219,7 +1216,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro
atomic.AddUint64(&pt.activeSmallMerges, 1)
}
retentionDeadline := timestampFromTime(startTime) - pt.retentionMsecs
err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, dmis, retentionDeadline, rowsMerged, rowsDeleted)
err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, pt.s, retentionDeadline, rowsMerged, rowsDeleted)
if isBigPart {
atomic.AddUint64(&pt.activeBigMerges, ^uint64(0))
} else {
@ -1252,7 +1249,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro
dstPartPath := ""
if ph.RowsCount > 0 {
// The destination part may have no rows if they are deleted
// during the merge due to dmis.
// during the merge due to deleted time series.
dstPartPath = ph.Path(ptPath, mergeIdx)
}
fmt.Fprintf(&bb, "%s -> %s\n", tmpPartPath, dstPartPath)

View file

@ -7,8 +7,6 @@ import (
"sort"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
)
func TestPartitionSearch(t *testing.T) {
@ -167,9 +165,11 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma
})
// Create partition from rowss and test search on it.
strg := &Storage{}
strg.setDeletedMetricIDs(nil)
retentionMsecs := timestampFromTime(time.Now()) - ptr.MinTimestamp + 3600*1000
var isReadOnly uint32
pt, err := createPartition(ptt, "./small-table", "./big-table", nilGetDeletedMetricIDs, retentionMsecs, &isReadOnly)
pt, err := createPartition(ptt, "./small-table", "./big-table", strg, retentionMsecs, &isReadOnly)
if err != nil {
t.Fatalf("cannot create partition: %s", err)
}
@ -193,7 +193,7 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma
pt.MustClose()
// Open the created partition and test search on it.
pt, err = openPartition(smallPartsPath, bigPartsPath, nilGetDeletedMetricIDs, retentionMsecs, &isReadOnly)
pt, err = openPartition(smallPartsPath, bigPartsPath, strg, retentionMsecs, &isReadOnly)
if err != nil {
t.Fatalf("cannot open partition: %s", err)
}
@ -278,7 +278,3 @@ func testPartitionSearchSerial(pt *partition, tsids []TSID, tr TimeRange, rbsExp
return nil
}
func nilGetDeletedMetricIDs() *uint64set.Set {
return nil
}

View file

@ -258,7 +258,7 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer
// Load data
tablePath := path + "/data"
tb, err := openTable(tablePath, s.getDeletedMetricIDs, retentionMsecs, &s.isReadOnly)
tb, err := openTable(tablePath, s, retentionMsecs, &s.isReadOnly)
if err != nil {
s.idb().MustClose()
return nil, fmt.Errorf("cannot open table at %q: %w", tablePath, err)

View file

@ -12,7 +12,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
)
// table represents a single table with time series data.
@ -21,9 +20,9 @@ type table struct {
smallPartitionsPath string
bigPartitionsPath string
getDeletedMetricIDs func() *uint64set.Set
retentionMsecs int64
isReadOnly *uint32
s *Storage
retentionMsecs int64
isReadOnly *uint32
ptws []*partitionWrapper
ptwsLock sync.Mutex
@ -84,7 +83,7 @@ func (ptw *partitionWrapper) scheduleToDrop() {
// The table is created if it doesn't exist.
//
// Data older than the retentionMsecs may be dropped at any time.
func openTable(path string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64, isReadOnly *uint32) (*table, error) {
func openTable(path string, s *Storage, retentionMsecs int64, isReadOnly *uint32) (*table, error) {
path = filepath.Clean(path)
// Create a directory for the table if it doesn't exist yet.
@ -122,7 +121,7 @@ func openTable(path string, getDeletedMetricIDs func() *uint64set.Set, retention
fs.MustRemoveTemporaryDirs(bigSnapshotsPath)
// Open partitions.
pts, err := openPartitions(smallPartitionsPath, bigPartitionsPath, getDeletedMetricIDs, retentionMsecs, isReadOnly)
pts, err := openPartitions(smallPartitionsPath, bigPartitionsPath, s, retentionMsecs, isReadOnly)
if err != nil {
return nil, fmt.Errorf("cannot open partitions in the table %q: %w", path, err)
}
@ -131,7 +130,7 @@ func openTable(path string, getDeletedMetricIDs func() *uint64set.Set, retention
path: path,
smallPartitionsPath: smallPartitionsPath,
bigPartitionsPath: bigPartitionsPath,
getDeletedMetricIDs: getDeletedMetricIDs,
s: s,
retentionMsecs: retentionMsecs,
isReadOnly: isReadOnly,
@ -366,7 +365,7 @@ func (tb *table) AddRows(rows []rawRow) error {
continue
}
pt, err := createPartition(r.Timestamp, tb.smallPartitionsPath, tb.bigPartitionsPath, tb.getDeletedMetricIDs, tb.retentionMsecs, tb.isReadOnly)
pt, err := createPartition(r.Timestamp, tb.smallPartitionsPath, tb.bigPartitionsPath, tb.s, tb.retentionMsecs, tb.isReadOnly)
if err != nil {
// Return only the first error, since it has no sense in returning all errors.
tb.ptwsLock.Unlock()
@ -504,7 +503,7 @@ func (tb *table) PutPartitions(ptws []*partitionWrapper) {
}
}
func openPartitions(smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64, isReadOnly *uint32) ([]*partition, error) {
func openPartitions(smallPartitionsPath, bigPartitionsPath string, s *Storage, retentionMsecs int64, isReadOnly *uint32) ([]*partition, error) {
// Certain partition directories in either `big` or `small` dir may be missing
// after restoring from backup. So populate partition names from both dirs.
ptNames := make(map[string]bool)
@ -518,7 +517,7 @@ func openPartitions(smallPartitionsPath, bigPartitionsPath string, getDeletedMet
for ptName := range ptNames {
smallPartsPath := smallPartitionsPath + "/" + ptName
bigPartsPath := bigPartitionsPath + "/" + ptName
pt, err := openPartition(smallPartsPath, bigPartsPath, getDeletedMetricIDs, retentionMsecs, isReadOnly)
pt, err := openPartition(smallPartsPath, bigPartsPath, s, retentionMsecs, isReadOnly)
if err != nil {
mustClosePartitions(pts)
return nil, fmt.Errorf("cannot open partition %q: %w", ptName, err)

View file

@ -181,8 +181,10 @@ func testTableSearchEx(t *testing.T, trData, trSearch TimeRange, partitionsCount
})
// Create a table from rowss and test search on it.
strg := &Storage{}
strg.setDeletedMetricIDs(nil)
var isReadOnly uint32
tb, err := openTable("./test-table", nilGetDeletedMetricIDs, maxRetentionMsecs, &isReadOnly)
tb, err := openTable("./test-table", strg, maxRetentionMsecs, &isReadOnly)
if err != nil {
t.Fatalf("cannot create table: %s", err)
}
@ -203,7 +205,7 @@ func testTableSearchEx(t *testing.T, trData, trSearch TimeRange, partitionsCount
tb.MustClose()
// Open the created table and test search on it.
tb, err = openTable("./test-table", nilGetDeletedMetricIDs, maxRetentionMsecs, &isReadOnly)
tb, err = openTable("./test-table", strg, maxRetentionMsecs, &isReadOnly)
if err != nil {
t.Fatalf("cannot open table: %s", err)
}

View file

@ -44,8 +44,10 @@ func openBenchTable(b *testing.B, startTimestamp int64, rowsPerInsert, rowsCount
createBenchTable(b, path, startTimestamp, rowsPerInsert, rowsCount, tsidsCount)
createdBenchTables[path] = true
}
strg := &Storage{}
strg.setDeletedMetricIDs(nil)
var isReadOnly uint32
tb, err := openTable(path, nilGetDeletedMetricIDs, maxRetentionMsecs, &isReadOnly)
tb, err := openTable(path, strg, maxRetentionMsecs, &isReadOnly)
if err != nil {
b.Fatalf("cnanot open table %q: %s", path, err)
}
@ -68,8 +70,10 @@ var createdBenchTables = make(map[string]bool)
func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerInsert, rowsCount, tsidsCount int) {
b.Helper()
strg := &Storage{}
strg.setDeletedMetricIDs(nil)
var isReadOnly uint32
tb, err := openTable(path, nilGetDeletedMetricIDs, maxRetentionMsecs, &isReadOnly)
tb, err := openTable(path, strg, maxRetentionMsecs, &isReadOnly)
if err != nil {
b.Fatalf("cannot open table %q: %s", path, err)
}

View file

@ -17,8 +17,10 @@ func TestTableOpenClose(t *testing.T) {
}()
// Create a new table
strg := &Storage{}
strg.setDeletedMetricIDs(nil)
var isReadOnly uint32
tb, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs, &isReadOnly)
tb, err := openTable(path, strg, retentionMsecs, &isReadOnly)
if err != nil {
t.Fatalf("cannot create new table: %s", err)
}
@ -28,7 +30,7 @@ func TestTableOpenClose(t *testing.T) {
// Re-open created table multiple times.
for i := 0; i < 10; i++ {
tb, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs, &isReadOnly)
tb, err := openTable(path, strg, retentionMsecs, &isReadOnly)
if err != nil {
t.Fatalf("cannot open created table: %s", err)
}
@ -44,15 +46,17 @@ func TestTableOpenMultipleTimes(t *testing.T) {
_ = os.RemoveAll(path)
}()
strg := &Storage{}
strg.setDeletedMetricIDs(nil)
var isReadOnly uint32
tb1, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs, &isReadOnly)
tb1, err := openTable(path, strg, retentionMsecs, &isReadOnly)
if err != nil {
t.Fatalf("cannot open table the first time: %s", err)
}
defer tb1.MustClose()
for i := 0; i < 10; i++ {
tb2, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs, &isReadOnly)
tb2, err := openTable(path, strg, retentionMsecs, &isReadOnly)
if err == nil {
tb2.MustClose()
t.Fatalf("expecting non-nil error when opening already opened table")

View file

@ -45,9 +45,11 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) {
b.ReportAllocs()
b.SetBytes(int64(rowsCountExpected))
tablePath := "./benchmarkTableAddRows"
strg := &Storage{}
strg.setDeletedMetricIDs(nil)
for i := 0; i < b.N; i++ {
var isReadOnly uint32
tb, err := openTable(tablePath, nilGetDeletedMetricIDs, maxRetentionMsecs, &isReadOnly)
tb, err := openTable(tablePath, strg, maxRetentionMsecs, &isReadOnly)
if err != nil {
b.Fatalf("cannot open table %q: %s", tablePath, err)
}
@ -95,7 +97,7 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) {
tb.MustClose()
// Open the table from files and verify the rows count on it
tb, err = openTable(tablePath, nilGetDeletedMetricIDs, maxRetentionMsecs, &isReadOnly)
tb, err = openTable(tablePath, strg, maxRetentionMsecs, &isReadOnly)
if err != nil {
b.Fatalf("cannot open table %q: %s", tablePath, err)
}