app/vmselect/netstorage: substitute pointer to blockRefs by brssPool index at the metricName->blockRefs map

This should reduce the pressure on Go GC, since it will see lower number of pointers.

This change has been extracted from https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5527
This commit is contained in:
Aliaksandr Valialkin 2024-01-22 22:15:08 +02:00
parent b289f15f02
commit 5b05224eb9
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB

View file

@ -1348,18 +1348,23 @@ func SeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, denyPartia
}
type tmpBlocksFileWrapper struct {
tbfs []*tmpBlocksFile
ms []map[string]*blockAddrs
orderedMetricNamess [][]string
tbfs []*tmpBlocksFile
// metricNamesBufs are per-worker bufs for holding all the loaded unique metric names.
// It should reduce pressure on Go GC by reducing the number of string allocations
// when constructing metricName string from byte slice.
metricNamesBufs [][]byte
// addrssBufs are per-worker bufs for holding all the blockAddrs across all the loaded time series.
// addrssPools are per-worker bufs for holding all the blockAddrs across all the loaded time series.
// It should reduce pressure on Go GC by reducing the number of blockAddrs object allocations.
addrssBufs [][]blockAddrs
addrssPools [][]blockAddrs
// ms maps metricName to the addrssPools index.
ms []map[string]int
// orderedMetricNamess contain metric names in the order of their load.
// This order is important for sequential read of data from tmpBlocksFile.
orderedMetricNamess [][]string
}
type blockAddrs struct {
@ -1367,17 +1372,18 @@ type blockAddrs struct {
addrs []tmpBlockAddr
}
func (tbfw *tmpBlocksFileWrapper) newBlockAddrs(workerID uint) *blockAddrs {
addrssBuf := tbfw.addrssBufs[workerID]
if cap(addrssBuf) > len(addrssBuf) {
addrssBuf = addrssBuf[:len(addrssBuf)+1]
func (tbfw *tmpBlocksFileWrapper) newBlockAddrs(workerID uint) int {
addrssPool := tbfw.addrssPools[workerID]
if cap(addrssPool) > len(addrssPool) {
addrssPool = addrssPool[:len(addrssPool)+1]
} else {
addrssBuf = append(addrssBuf, blockAddrs{})
addrssPool = append(addrssPool, blockAddrs{})
}
addrs := &addrssBuf[len(addrssBuf)-1]
tbfw.addrssBufs[workerID] = addrssBuf
tbfw.addrssPools[workerID] = addrssPool
idx := len(addrssPool) - 1
addrs := &addrssPool[idx]
addrs.addrs = addrs.addrsPrealloc[:0]
return addrs
return idx
}
func newTmpBlocksFileWrapper(sns []*storageNode) *tmpBlocksFileWrapper {
@ -1386,16 +1392,16 @@ func newTmpBlocksFileWrapper(sns []*storageNode) *tmpBlocksFileWrapper {
for i := range tbfs {
tbfs[i] = getTmpBlocksFile()
}
ms := make([]map[string]*blockAddrs, n)
ms := make([]map[string]int, n)
for i := range ms {
ms[i] = make(map[string]*blockAddrs)
ms[i] = make(map[string]int)
}
return &tmpBlocksFileWrapper{
tbfs: tbfs,
ms: ms,
orderedMetricNamess: make([][]string, n),
metricNamesBufs: make([][]byte, n),
addrssBufs: make([][]blockAddrs, n),
addrssPools: make([][]blockAddrs, n),
}
}
@ -1409,10 +1415,11 @@ func (tbfw *tmpBlocksFileWrapper) RegisterAndWriteBlock(mb *storage.MetricBlock,
}
metricName := mb.MetricName
m := tbfw.ms[workerID]
addrs := m[string(metricName)]
if addrs == nil {
addrs = tbfw.newBlockAddrs(workerID)
addrsIdx, ok := m[string(metricName)]
if !ok {
addrsIdx = tbfw.newBlockAddrs(workerID)
}
addrs := &tbfw.addrssPools[workerID][addrsIdx]
addrs.addrs = append(addrs.addrs, addr)
if len(addrs.addrs) == 1 {
metricNamesBuf := tbfw.metricNamesBufs[workerID]
@ -1422,7 +1429,7 @@ func (tbfw *tmpBlocksFileWrapper) RegisterAndWriteBlock(mb *storage.MetricBlock,
orderedMetricNames := tbfw.orderedMetricNamess[workerID]
orderedMetricNames = append(orderedMetricNames, metricNameStr)
m[metricNameStr] = addrs
m[metricNameStr] = addrsIdx
tbfw.orderedMetricNamess[workerID] = orderedMetricNames
tbfw.metricNamesBufs[workerID] = metricNamesBuf
@ -1430,29 +1437,31 @@ func (tbfw *tmpBlocksFileWrapper) RegisterAndWriteBlock(mb *storage.MetricBlock,
return nil
}
func (tbfw *tmpBlocksFileWrapper) Finalize() ([]string, map[string]*blockAddrs, uint64, error) {
func (tbfw *tmpBlocksFileWrapper) Finalize() ([]string, []blockAddrs, map[string]int, uint64, error) {
var bytesTotal uint64
for i, tbf := range tbfw.tbfs {
if err := tbf.Finalize(); err != nil {
closeTmpBlockFiles(tbfw.tbfs)
return nil, nil, 0, fmt.Errorf("cannot finalize temporary blocks file with %d series: %w", len(tbfw.ms[i]), err)
return nil, nil, nil, 0, fmt.Errorf("cannot finalize temporary blocks file with %d series: %w", len(tbfw.ms[i]), err)
}
bytesTotal += tbf.Len()
}
orderedMetricNames := tbfw.orderedMetricNamess[0]
addrsByMetricName := tbfw.ms[0]
for i, m := range tbfw.ms[1:] {
addrssPools := tbfw.addrssPools[i]
for _, metricName := range tbfw.orderedMetricNamess[i+1] {
dstAddrs, ok := addrsByMetricName[metricName]
dstAddrsIdx, ok := addrsByMetricName[metricName]
if !ok {
orderedMetricNames = append(orderedMetricNames, metricName)
dstAddrs = tbfw.newBlockAddrs(uint(i))
addrsByMetricName[metricName] = dstAddrs
dstAddrsIdx = tbfw.newBlockAddrs(0)
addrsByMetricName[metricName] = dstAddrsIdx
}
dstAddrs.addrs = append(dstAddrs.addrs, m[metricName].addrs...)
dstAddrs := &tbfw.addrssPools[0][dstAddrsIdx]
dstAddrs.addrs = append(dstAddrs.addrs, addrssPools[m[metricName]].addrs...)
}
}
return orderedMetricNames, addrsByMetricName, bytesTotal, nil
return orderedMetricNames, tbfw.addrssPools[0], addrsByMetricName, bytesTotal, nil
}
var metricNamePool = &sync.Pool{
@ -1607,11 +1616,11 @@ func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *st
closeTmpBlockFiles(tbfw.tbfs)
return nil, false, fmt.Errorf("error occured during search: %w", err)
}
orderedMetricNames, addrsByMetricName, bytesTotal, err := tbfw.Finalize()
orderedMetricNames, addrssPool, m, bytesTotal, err := tbfw.Finalize()
if err != nil {
return nil, false, fmt.Errorf("cannot finalize temporary blocks files: %w", err)
}
qt.Printf("fetch unique series=%d, blocks=%d, samples=%d, bytes=%d", len(addrsByMetricName), blocksRead.GetTotal(), samples.GetTotal(), bytesTotal)
qt.Printf("fetch unique series=%d, blocks=%d, samples=%d, bytes=%d", len(m), blocksRead.GetTotal(), samples.GetTotal(), bytesTotal)
var rss Results
rss.tr = tr
@ -1621,7 +1630,7 @@ func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *st
for i, metricName := range orderedMetricNames {
pts[i] = packedTimeseries{
metricName: metricName,
addrs: addrsByMetricName[metricName].addrs,
addrs: addrssPool[m[metricName]].addrs,
}
}
rss.packedTimeseries = pts