app/vmselect/netstorage: remove common contention points related to inter-CPU communcations

This should improve vmselect performance scalability on systems with many CPU cores.

The following tasks were done:

- Use separate temporary files for storing the data read from each vmstorage node.
  This may result in the following potential issues:
  - Up to N times higher memory usage for performing each query where N is the number
    of vmstorage nodes known to vmselect.
    This issue shouldn't increase chances of out of memory errors in most cases,
    since per-query memory overhead is quite low comparing to the overall vmselect memory usage.
  - Up to N times higher number of open temporary files where N is the number
    of vmstorage nodes known to vmselect.
    This issue should be fixed by increasing the limit on the number of open files.

- Use separate counters per each vmstorage node for various stats calculation
  when reading the data from vmstorage nodes.
This commit is contained in:
Aliaksandr Valialkin 2022-08-11 23:22:53 +03:00
parent ec3df0b913
commit 1a254ea20c
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
4 changed files with 151 additions and 81 deletions

View file

@ -112,7 +112,7 @@ func newBlockIterator(qt *querytracer.Tracer, denyPartialResponse bool, sq *stor
bi.workCh = make(chan workItem, 16)
bi.wg.Add(1)
go func() {
_, err := netstorage.ProcessBlocks(qt, denyPartialResponse, sq, func(mb *storage.MetricBlock) error {
_, err := netstorage.ProcessBlocks(qt, denyPartialResponse, sq, func(mb *storage.MetricBlock, workerIdx int) error {
wi := workItem{
mb: mb,
doneCh: make(chan struct{}),

View file

@ -65,7 +65,7 @@ type Results struct {
tr storage.TimeRange
deadline searchutils.Deadline
tbf *tmpBlocksFile
tbfs []*tmpBlocksFile
packedTimeseries []packedTimeseries
}
@ -77,8 +77,18 @@ func (rss *Results) Len() int {
// Cancel cancels rss work.
func (rss *Results) Cancel() {
putTmpBlocksFile(rss.tbf)
rss.tbf = nil
rss.closeTmpBlockFiles()
}
func (rss *Results) closeTmpBlockFiles() {
closeTmpBlockFiles(rss.tbfs)
rss.tbfs = nil
}
func closeTmpBlockFiles(tbfs []*tmpBlocksFile) {
for _, tbf := range tbfs {
putTmpBlocksFile(tbf)
}
}
type timeseriesWork struct {
@ -124,7 +134,7 @@ func (tsw *timeseriesWork) do(r *Result, workerID uint) error {
atomic.StoreUint32(tsw.mustStop, 1)
return fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.String())
}
if err := tsw.pts.Unpack(r, rss.tbf, rss.tr); err != nil {
if err := tsw.pts.Unpack(r, rss.tbfs, rss.tr); err != nil {
atomic.StoreUint32(tsw.mustStop, 1)
return fmt.Errorf("error during time series unpacking: %w", err)
}
@ -173,10 +183,7 @@ var resultPool sync.Pool
// rss becomes unusable after the call to RunParallel.
func (rss *Results) RunParallel(qt *querytracer.Tracer, f func(rs *Result, workerID uint) error) error {
qt = qt.NewChild("parallel process of fetched data")
defer func() {
putTmpBlocksFile(rss.tbf)
rss.tbf = nil
}()
defer rss.closeTmpBlockFiles()
// Prepare work for workers.
tsws := make([]*timeseriesWork, len(rss.packedTimeseries))
@ -281,7 +288,7 @@ type unpackWorkItem struct {
type unpackWork struct {
ws []unpackWorkItem
tbf *tmpBlocksFile
tbfs []*tmpBlocksFile
sbs []*sortBlock
doneCh chan error
}
@ -294,7 +301,7 @@ func (upw *unpackWork) reset() {
w.tr = storage.TimeRange{}
}
upw.ws = upw.ws[:0]
upw.tbf = nil
upw.tbfs = nil
sbs := upw.sbs
for i := range sbs {
sbs[i] = nil
@ -308,7 +315,7 @@ func (upw *unpackWork) reset() {
func (upw *unpackWork) unpack(tmpBlock *storage.Block) {
for _, w := range upw.ws {
sb := getSortBlock()
if err := sb.unpackFrom(tmpBlock, upw.tbf, w.addr, w.tr); err != nil {
if err := sb.unpackFrom(tmpBlock, upw.tbfs, w.addr, w.tr); err != nil {
putSortBlock(sb)
upw.doneCh <- fmt.Errorf("cannot unpack block: %w", err)
return
@ -380,7 +387,7 @@ var tmpBlockPool sync.Pool
var unpackBatchSize = 5000
// Unpack unpacks pts to dst.
func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.TimeRange) error {
func (pts *packedTimeseries) Unpack(dst *Result, tbfs []*tmpBlocksFile, tr storage.TimeRange) error {
dst.reset()
if err := dst.MetricName.Unmarshal(bytesutil.ToUnsafeBytes(pts.metricName)); err != nil {
return fmt.Errorf("cannot unmarshal metricName %q: %w", pts.metricName, err)
@ -415,13 +422,13 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.
// Feed workers with work
upws := make([]*unpackWork, 0, 1+addrsLen/unpackBatchSize)
upw := getUnpackWork()
upw.tbf = tbf
upw.tbfs = tbfs
for _, addr := range pts.addrs {
if len(upw.ws) >= unpackBatchSize {
scheduleUnpackWork(workChs, upw)
upws = append(upws, upw)
upw = getUnpackWork()
upw.tbf = tbf
upw.tbfs = tbfs
}
upw.ws = append(upw.ws, unpackWorkItem{
addr: addr,
@ -583,9 +590,9 @@ func (sb *sortBlock) reset() {
sb.NextIdx = 0
}
func (sb *sortBlock) unpackFrom(tmpBlock *storage.Block, tbf *tmpBlocksFile, addr tmpBlockAddr, tr storage.TimeRange) error {
func (sb *sortBlock) unpackFrom(tmpBlock *storage.Block, tbfs []*tmpBlocksFile, addr tmpBlockAddr, tr storage.TimeRange) error {
tmpBlock.Reset()
tbf.MustReadBlockAt(tmpBlock, addr)
tbfs[addr.tbfIdx].MustReadBlockAt(tmpBlock, addr)
if err := tmpBlock.UnmarshalData(); err != nil {
return fmt.Errorf("cannot unmarshal block: %w", err)
}
@ -1110,45 +1117,74 @@ func SeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, denyPartia
}
type tmpBlocksFileWrapper struct {
mu sync.Mutex
tbf *tmpBlocksFile
m map[string][]tmpBlockAddr
orderedMetricNames []string
tbfs []*tmpBlocksFile
ms []map[string][]tmpBlockAddr
orderedMetricNamess [][]string
}
func (tbfw *tmpBlocksFileWrapper) RegisterEmptyBlock(mb *storage.MetricBlock) {
metricName := mb.MetricName
tbfw.mu.Lock()
if addrs := tbfw.m[string(metricName)]; addrs == nil {
// An optimization for big number of time series with long names: store only a single copy of metricNameStr
// in both tbfw.orderedMetricNames and tbfw.m.
tbfw.orderedMetricNames = append(tbfw.orderedMetricNames, string(metricName))
tbfw.m[tbfw.orderedMetricNames[len(tbfw.orderedMetricNames)-1]] = []tmpBlockAddr{{}}
func newTmpBlocksFileWrapper() *tmpBlocksFileWrapper {
n := len(storageNodes)
tbfs := make([]*tmpBlocksFile, n)
for i := range tbfs {
tbfs[i] = getTmpBlocksFile()
}
ms := make([]map[string][]tmpBlockAddr, n)
for i := range ms {
ms[i] = make(map[string][]tmpBlockAddr)
}
return &tmpBlocksFileWrapper{
tbfs: tbfs,
ms: ms,
orderedMetricNamess: make([][]string, n),
}
tbfw.mu.Unlock()
}
func (tbfw *tmpBlocksFileWrapper) RegisterAndWriteBlock(mb *storage.MetricBlock) error {
bb := tmpBufPool.Get()
bb.B = storage.MarshalBlock(bb.B[:0], &mb.Block)
tbfw.mu.Lock()
addr, err := tbfw.tbf.WriteBlockData(bb.B)
tmpBufPool.Put(bb)
if err == nil {
metricName := mb.MetricName
addrs := tbfw.m[string(metricName)]
addrs = append(addrs, addr)
if len(addrs) > 1 {
tbfw.m[string(metricName)] = addrs
} else {
// An optimization for big number of time series with long names: store only a single copy of metricNameStr
// in both tbfw.orderedMetricNames and tbfw.m.
tbfw.orderedMetricNames = append(tbfw.orderedMetricNames, string(metricName))
tbfw.m[tbfw.orderedMetricNames[len(tbfw.orderedMetricNames)-1]] = addrs
func (tbfw *tmpBlocksFileWrapper) Finalize() ([]string, map[string][]tmpBlockAddr, uint64, error) {
var bytesTotal uint64
for i, tbf := range tbfw.tbfs {
if err := tbf.Finalize(); err != nil {
// Close the remaining tbfs before returning the error
closeTmpBlockFiles(tbfw.tbfs[i:])
return nil, nil, 0, fmt.Errorf("cannot finalize temporary blocks file with %d series: %w", len(tbfw.ms[i]), err)
}
bytesTotal += tbf.Len()
}
orderedMetricNames := tbfw.orderedMetricNamess[0]
addrsByMetricName := make(map[string][]tmpBlockAddr)
for i, m := range tbfw.ms {
for _, metricName := range tbfw.orderedMetricNamess[i] {
dstAddrs, ok := addrsByMetricName[metricName]
if !ok {
orderedMetricNames = append(orderedMetricNames, metricName)
}
addrsByMetricName[metricName] = append(dstAddrs, m[metricName]...)
}
}
tbfw.mu.Unlock()
return err
return orderedMetricNames, addrsByMetricName, bytesTotal, nil
}
func (tbfw *tmpBlocksFileWrapper) RegisterAndWriteBlock(mb *storage.MetricBlock, workerIdx int) error {
bb := tmpBufPool.Get()
bb.B = storage.MarshalBlock(bb.B[:0], &mb.Block)
addr, err := tbfw.tbfs[workerIdx].WriteBlockData(bb.B, workerIdx)
tmpBufPool.Put(bb)
if err != nil {
return err
}
metricName := mb.MetricName
m := tbfw.ms[workerIdx]
addrs := m[string(metricName)]
addrs = append(addrs, addr)
if len(addrs) > 1 {
m[string(metricName)] = addrs
} else {
// An optimization for big number of time series with long names: store only a single copy of metricNameStr
// in both tbfw.orderedMetricNamess and tbfw.ms.
tbfw.orderedMetricNamess[workerIdx] = append(tbfw.orderedMetricNamess[workerIdx], string(metricName))
metricNameStr := tbfw.orderedMetricNamess[workerIdx][len(tbfw.orderedMetricNamess[workerIdx])-1]
m[metricNameStr] = addrs
}
return nil
}
var metricNamePool = &sync.Pool{
@ -1173,9 +1209,9 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear
MinTimestamp: sq.MinTimestamp,
MaxTimestamp: sq.MaxTimestamp,
}
var blocksRead uint64
var samples uint64
processBlock := func(mb *storage.MetricBlock) error {
blocksRead := newPerNodeCounter()
samples := newPerNodeCounter()
processBlock := func(mb *storage.MetricBlock, workerIdx int) error {
mn := metricNamePool.Get().(*storage.MetricName)
if err := mn.Unmarshal(mb.MetricName); err != nil {
return fmt.Errorf("cannot unmarshal metricName: %w", err)
@ -1185,12 +1221,12 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear
}
mn.Reset()
metricNamePool.Put(mn)
atomic.AddUint64(&blocksRead, 1)
atomic.AddUint64(&samples, uint64(mb.Block.RowsCount()))
blocksRead.Add(workerIdx, 1)
samples.Add(workerIdx, uint64(mb.Block.RowsCount()))
return nil
}
_, err := ProcessBlocks(qt, true, sq, processBlock, deadline)
qt.Printf("export blocks=%d, samples=%d, err=%v", blocksRead, samples, err)
qt.Printf("export blocks=%d, samples=%d, err=%v", blocksRead.GetTotal(), samples.GetTotal(), err)
if err != nil {
return fmt.Errorf("error occured during export: %w", err)
}
@ -1266,43 +1302,43 @@ func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *st
MinTimestamp: sq.MinTimestamp,
MaxTimestamp: sq.MaxTimestamp,
}
tbfw := &tmpBlocksFileWrapper{
tbf: getTmpBlocksFile(),
m: make(map[string][]tmpBlockAddr),
}
var blocksRead uint64
var samples uint64
processBlock := func(mb *storage.MetricBlock) error {
atomic.AddUint64(&blocksRead, 1)
n := atomic.AddUint64(&samples, uint64(mb.Block.RowsCount()))
if *maxSamplesPerQuery > 0 && n > uint64(*maxSamplesPerQuery) {
return fmt.Errorf("cannot select more than -search.maxSamplesPerQuery=%d samples; possible solutions: to increase the -search.maxSamplesPerQuery; to reduce time range for the query; to use more specific label filters in order to select lower number of series", *maxSamplesPerQuery)
tbfw := newTmpBlocksFileWrapper()
blocksRead := newPerNodeCounter()
samples := newPerNodeCounter()
maxSamplesPerWorker := uint64(*maxSamplesPerQuery) / uint64(len(storageNodes))
processBlock := func(mb *storage.MetricBlock, workerIdx int) error {
blocksRead.Add(workerIdx, 1)
n := samples.Add(workerIdx, uint64(mb.Block.RowsCount()))
if *maxSamplesPerQuery > 0 && n > maxSamplesPerWorker && samples.GetTotal() > uint64(*maxSamplesPerQuery) {
return fmt.Errorf("cannot select more than -search.maxSamplesPerQuery=%d samples; possible solutions: "+
"to increase the -search.maxSamplesPerQuery; to reduce time range for the query; "+
"to use more specific label filters in order to select lower number of series", *maxSamplesPerQuery)
}
if err := tbfw.RegisterAndWriteBlock(mb); err != nil {
if err := tbfw.RegisterAndWriteBlock(mb, workerIdx); err != nil {
return fmt.Errorf("cannot write MetricBlock to temporary blocks file: %w", err)
}
return nil
}
isPartial, err := ProcessBlocks(qt, denyPartialResponse, sq, processBlock, deadline)
if err != nil {
putTmpBlocksFile(tbfw.tbf)
closeTmpBlockFiles(tbfw.tbfs)
return nil, false, fmt.Errorf("error occured during search: %w", err)
}
if err := tbfw.tbf.Finalize(); err != nil {
putTmpBlocksFile(tbfw.tbf)
return nil, false, fmt.Errorf("cannot finalize temporary blocks file with %d time series: %w", len(tbfw.m), err)
orderedMetricNames, addrsByMetricName, bytesTotal, err := tbfw.Finalize()
if err != nil {
return nil, false, fmt.Errorf("cannot finalize temporary blocks files: %w", err)
}
qt.Printf("fetch unique series=%d, blocks=%d, samples=%d, bytes=%d", len(tbfw.m), blocksRead, samples, tbfw.tbf.Len())
qt.Printf("fetch unique series=%d, blocks=%d, samples=%d, bytes=%d", len(addrsByMetricName), blocksRead.GetTotal(), samples.GetTotal(), bytesTotal)
var rss Results
rss.tr = tr
rss.deadline = deadline
rss.tbf = tbfw.tbf
pts := make([]packedTimeseries, len(tbfw.orderedMetricNames))
for i, metricName := range tbfw.orderedMetricNames {
rss.tbfs = tbfw.tbfs
pts := make([]packedTimeseries, len(orderedMetricNames))
for i, metricName := range orderedMetricNames {
pts[i] = packedTimeseries{
metricName: metricName,
addrs: tbfw.m[metricName],
addrs: addrsByMetricName[metricName],
}
}
rss.packedTimeseries = pts
@ -1311,7 +1347,7 @@ func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *st
// ProcessBlocks calls processBlock per each block matching the given sq.
func ProcessBlocks(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.SearchQuery,
processBlock func(mb *storage.MetricBlock) error, deadline searchutils.Deadline) (bool, error) {
processBlock func(mb *storage.MetricBlock, workerIdx int) error, deadline searchutils.Deadline) (bool, error) {
requestData := sq.Marshal(nil)
// Make sure that processBlock is no longer called after the exit from ProcessBlocks() function.
@ -1332,7 +1368,7 @@ func ProcessBlocks(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage
if atomic.LoadUint32(&stopped) != 0 {
return nil
}
return processBlock(mb)
return processBlock(mb, workerIdx)
}
// Send the query to all the storage nodes in parallel.
@ -2367,3 +2403,32 @@ func applyGraphiteRegexpFilter(filter string, ss []string) ([]string, error) {
}
return dst, nil
}
type uint64WithPadding struct {
n uint64
// Prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
pad [128 - unsafe.Sizeof(uint64(0))%128]byte
}
type perNodeCounter struct {
ns []uint64WithPadding
}
func newPerNodeCounter() *perNodeCounter {
return &perNodeCounter{
ns: make([]uint64WithPadding, len(storageNodes)),
}
}
func (pnc *perNodeCounter) Add(nodeIdx int, n uint64) uint64 {
return atomic.AddUint64(&pnc.ns[nodeIdx].n, n)
}
func (pnc *perNodeCounter) GetTotal() uint64 {
var total uint64
for _, n := range pnc.ns {
total += n.n
}
return total
}

View file

@ -79,10 +79,11 @@ var tmpBlocksFilePool sync.Pool
type tmpBlockAddr struct {
offset uint64
size int
tbfIdx int
}
func (addr tmpBlockAddr) String() string {
return fmt.Sprintf("offset %d, size %d", addr.offset, addr.size)
return fmt.Sprintf("offset %d, size %d, tbfIdx %d", addr.offset, addr.size, addr.tbfIdx)
}
var (
@ -96,8 +97,9 @@ var (
//
// It returns errors since the operation may fail on space shortage
// and this must be handled.
func (tbf *tmpBlocksFile) WriteBlockData(b []byte) (tmpBlockAddr, error) {
func (tbf *tmpBlocksFile) WriteBlockData(b []byte, tbfIdx int) (tmpBlockAddr, error) {
var addr tmpBlockAddr
addr.tbfIdx = tbfIdx
addr.offset = tbf.offset
addr.size = len(b)
tbf.offset += uint64(addr.size)

View file

@ -86,10 +86,13 @@ func testTmpBlocksFile() error {
for tbf.offset < uint64(size) {
b := createBlock()
bb.B = storage.MarshalBlock(bb.B[:0], b)
addr, err := tbf.WriteBlockData(bb.B)
addr, err := tbf.WriteBlockData(bb.B, 123)
if err != nil {
return fmt.Errorf("cannot write block at offset %d: %w", tbf.offset, err)
}
if addr.tbfIdx != 123 {
return fmt.Errorf("unexpected tbfIdx; got %d; want 123", addr.tbfIdx)
}
if addr.offset+uint64(addr.size) != tbf.offset {
return fmt.Errorf("unexpected addr=%+v for offset %v", &addr, tbf.offset)
}