VictoriaMetrics/lib/storage/inmemory_part.go
Aliaksandr Valialkin fc3d826d7f
all: add Windows build for VictoriaMetrics
This commit changes background merge algorithm, so it becomes compatible with Windows file semantics.

The previous algorithm for background merge:

1. Merge source parts into a destination part inside tmp directory.
2. Create a file in txn directory with instructions on how to atomically
   swap source parts with the destination part.
3. Perform instructions from the file.
4. Delete the file with instructions.

This algorithm guarantees that either source parts or destination part
is visible in the partition after unclean shutdown at any step above,
since the remaining files with instructions is replayed on the next restart,
after that the remaining contents of the tmp directory is deleted.

Unfortunately this algorithm doesn't work under Windows because
it disallows removing and moving files, which are in use.

So the new algorithm for background merge has been implemented:

1. Merge source parts into a destination part inside the partition directory itself.
   E.g. now the partition directory may contain both complete and incomplete parts.
2. Atomically update the parts.json file with the new list of parts after the merge,
   e.g. remove the source parts from the list and add the destination part to the list
   before storing it to parts.json file.
3. Remove the source parts from disk when they are no longer used.

This algorithm guarantees that either source parts or destination part
is visible in the partition after unclean shutdown at any step above,
since incomplete partitions from step 1 or old source parts from step 3 are removed
on the next startup by inspecting parts.json file.

This algorithm should work under Windows, since it doesn't remove or move files in use.
This algorithm has also the following benefits:

- It should work better for NFS.
- It fits object storage semantics.

The new algorithm changes data storage format, so it is impossible to downgrade
to the previous versions of VictoriaMetrics after upgrading to this algorithm.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3236
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3821
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/70
2023-03-19 23:28:26 -07:00

116 lines
3.4 KiB
Go

package storage
import (
"fmt"
"path/filepath"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// inmemoryPart represents in-memory partition.
type inmemoryPart struct {
ph partHeader
timestampsData bytesutil.ByteBuffer
valuesData bytesutil.ByteBuffer
indexData bytesutil.ByteBuffer
metaindexData bytesutil.ByteBuffer
creationTime uint64
}
// Reset resets mp.
func (mp *inmemoryPart) Reset() {
mp.ph.Reset()
mp.timestampsData.Reset()
mp.valuesData.Reset()
mp.indexData.Reset()
mp.metaindexData.Reset()
mp.creationTime = 0
}
// StoreToDisk stores the mp to the given path on disk.
func (mp *inmemoryPart) StoreToDisk(path string) error {
if err := fs.MkdirAllIfNotExist(path); err != nil {
return fmt.Errorf("cannot create directory %q: %w", path, err)
}
timestampsPath := path + "/timestamps.bin"
if err := fs.WriteFileAndSync(timestampsPath, mp.timestampsData.B); err != nil {
return fmt.Errorf("cannot store timestamps: %w", err)
}
valuesPath := path + "/values.bin"
if err := fs.WriteFileAndSync(valuesPath, mp.valuesData.B); err != nil {
return fmt.Errorf("cannot store values: %w", err)
}
indexPath := path + "/index.bin"
if err := fs.WriteFileAndSync(indexPath, mp.indexData.B); err != nil {
return fmt.Errorf("cannot store index: %w", err)
}
metaindexPath := path + "/metaindex.bin"
if err := fs.WriteFileAndSync(metaindexPath, mp.metaindexData.B); err != nil {
return fmt.Errorf("cannot store metaindex: %w", err)
}
if err := mp.ph.WriteMetadata(path); err != nil {
return fmt.Errorf("cannot store metadata: %w", err)
}
// Sync parent directory in order to make sure the written files remain visible after hardware reset
parentDirPath := filepath.Dir(path)
fs.MustSyncPath(parentDirPath)
return nil
}
// InitFromRows initializes mp from the given rows.
func (mp *inmemoryPart) InitFromRows(rows []rawRow) {
if len(rows) == 0 {
logger.Panicf("BUG: Inmemory.InitFromRows must accept at least one row")
}
mp.Reset()
rrm := getRawRowsMarshaler()
rrm.marshalToInmemoryPart(mp, rows)
putRawRowsMarshaler(rrm)
mp.creationTime = fasttime.UnixTimestamp()
}
// NewPart creates new part from mp.
//
// It is safe calling NewPart multiple times.
// It is unsafe re-using mp while the returned part is in use.
func (mp *inmemoryPart) NewPart() (*part, error) {
size := mp.size()
return newPart(&mp.ph, "", size, mp.metaindexData.NewReader(), &mp.timestampsData, &mp.valuesData, &mp.indexData)
}
func (mp *inmemoryPart) size() uint64 {
return uint64(cap(mp.timestampsData.B) + cap(mp.valuesData.B) + cap(mp.indexData.B) + cap(mp.metaindexData.B))
}
func getInmemoryPart() *inmemoryPart {
select {
case mp := <-mpPool:
return mp
default:
return &inmemoryPart{}
}
}
func putInmemoryPart(mp *inmemoryPart) {
mp.Reset()
select {
case mpPool <- mp:
default:
// Drop mp in order to reduce memory usage.
}
}
// Use chan instead of sync.Pool in order to reduce memory usage on systems with big number of CPU cores,
// since sync.Pool maintains per-CPU pool of inmemoryPart objects.
//
// The inmemoryPart object size can exceed 64KB, so it is better to use chan instead of sync.Pool for reducing memory usage.
var mpPool = make(chan *inmemoryPart, cgroup.AvailableCPUs())