VictoriaMetrics/lib/storage/inmemory_part.go
Aliaksandr Valialkin 25f089de9d
lib/{mergeset,storage}: properly fsync part directory listing after writing in-memory part to disk
This is a follow-up after 42bba64aa7

Previously the part directory listing was fsync'ed implicitly inside partHeader.WriteMetadata()
by calling fs.WriteFileAtomically(). Now it must be fsync'ed explicitly.

There is no need in fsync'ing the parent directory, since it is fsync'ed by the caller
when updating parts.json file.
2023-04-13 21:19:04 -07:00

115 lines
3.4 KiB
Go

package storage
import (
"fmt"
"path/filepath"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// inmemoryPart represents in-memory partition.
type inmemoryPart struct {
ph partHeader
timestampsData bytesutil.ByteBuffer
valuesData bytesutil.ByteBuffer
indexData bytesutil.ByteBuffer
metaindexData bytesutil.ByteBuffer
creationTime uint64
}
// Reset resets mp.
func (mp *inmemoryPart) Reset() {
mp.ph.Reset()
mp.timestampsData.Reset()
mp.valuesData.Reset()
mp.indexData.Reset()
mp.metaindexData.Reset()
mp.creationTime = 0
}
// StoreToDisk stores the mp to the given path on disk.
func (mp *inmemoryPart) StoreToDisk(path string) error {
if err := fs.MkdirAllIfNotExist(path); err != nil {
return fmt.Errorf("cannot create directory %q: %w", path, err)
}
timestampsPath := filepath.Join(path, timestampsFilename)
if err := fs.WriteFileAndSync(timestampsPath, mp.timestampsData.B); err != nil {
return fmt.Errorf("cannot store timestamps: %w", err)
}
valuesPath := filepath.Join(path, valuesFilename)
if err := fs.WriteFileAndSync(valuesPath, mp.valuesData.B); err != nil {
return fmt.Errorf("cannot store values: %w", err)
}
indexPath := filepath.Join(path, indexFilename)
if err := fs.WriteFileAndSync(indexPath, mp.indexData.B); err != nil {
return fmt.Errorf("cannot store index: %w", err)
}
metaindexPath := filepath.Join(path, metaindexFilename)
if err := fs.WriteFileAndSync(metaindexPath, mp.metaindexData.B); err != nil {
return fmt.Errorf("cannot store metaindex: %w", err)
}
if err := mp.ph.WriteMetadata(path); err != nil {
return fmt.Errorf("cannot store metadata: %w", err)
}
fs.MustSyncPath(path)
// Do not sync parent directory - it must be synced by the caller.
return nil
}
// InitFromRows initializes mp from the given rows.
func (mp *inmemoryPart) InitFromRows(rows []rawRow) {
if len(rows) == 0 {
logger.Panicf("BUG: Inmemory.InitFromRows must accept at least one row")
}
mp.Reset()
rrm := getRawRowsMarshaler()
rrm.marshalToInmemoryPart(mp, rows)
putRawRowsMarshaler(rrm)
mp.creationTime = fasttime.UnixTimestamp()
}
// NewPart creates new part from mp.
//
// It is safe calling NewPart multiple times.
// It is unsafe re-using mp while the returned part is in use.
func (mp *inmemoryPart) NewPart() (*part, error) {
size := mp.size()
return newPart(&mp.ph, "", size, mp.metaindexData.NewReader(), &mp.timestampsData, &mp.valuesData, &mp.indexData)
}
func (mp *inmemoryPart) size() uint64 {
return uint64(cap(mp.timestampsData.B) + cap(mp.valuesData.B) + cap(mp.indexData.B) + cap(mp.metaindexData.B))
}
func getInmemoryPart() *inmemoryPart {
select {
case mp := <-mpPool:
return mp
default:
return &inmemoryPart{}
}
}
func putInmemoryPart(mp *inmemoryPart) {
mp.Reset()
select {
case mpPool <- mp:
default:
// Drop mp in order to reduce memory usage.
}
}
// Use chan instead of sync.Pool in order to reduce memory usage on systems with big number of CPU cores,
// since sync.Pool maintains per-CPU pool of inmemoryPart objects.
//
// The inmemoryPart object size can exceed 64KB, so it is better to use chan instead of sync.Pool for reducing memory usage.
var mpPool = make(chan *inmemoryPart, cgroup.AvailableCPUs())