VictoriaMetrics/lib/storage/inmemory_part.go

71 lines
1.7 KiB
Go

package storage
import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// inmemoryPart represents in-memory partition.
type inmemoryPart struct {
ph partHeader
timestampsData bytesutil.ByteBuffer
valuesData bytesutil.ByteBuffer
indexData bytesutil.ByteBuffer
metaindexData bytesutil.ByteBuffer
creationTime uint64
}
// Reset resets mp.
func (mp *inmemoryPart) Reset() {
mp.ph.Reset()
mp.timestampsData.Reset()
mp.valuesData.Reset()
mp.indexData.Reset()
mp.metaindexData.Reset()
mp.creationTime = 0
}
// InitFromRows initializes mp from the given rows.
func (mp *inmemoryPart) InitFromRows(rows []rawRow) {
if len(rows) == 0 {
logger.Panicf("BUG: Inmemory.InitFromRows must accept at least one row")
}
mp.Reset()
rrm := getRawRowsMarshaler()
rrm.marshalToInmemoryPart(mp, rows)
putRawRowsMarshaler(rrm)
mp.creationTime = fasttime.UnixTimestamp()
}
// NewPart creates new part from mp.
//
// It is safe calling NewPart multiple times.
// It is unsafe re-using mp while the returned part is in use.
func (mp *inmemoryPart) NewPart() (*part, error) {
ph := mp.ph
size := uint64(len(mp.timestampsData.B) + len(mp.valuesData.B) + len(mp.indexData.B) + len(mp.metaindexData.B))
return newPart(&ph, "", size, mp.metaindexData.NewReader(), &mp.timestampsData, &mp.valuesData, &mp.indexData)
}
func getInmemoryPart() *inmemoryPart {
v := mpPool.Get()
if v == nil {
return &inmemoryPart{}
}
return v.(*inmemoryPart)
}
func putInmemoryPart(mp *inmemoryPart) {
mp.Reset()
mpPool.Put(mp)
}
var mpPool sync.Pool