diff --git a/app/vmselect/clusternative/vmselect.go b/app/vmselect/clusternative/vmselect.go index 14ebf116d..2748ea3e2 100644 --- a/app/vmselect/clusternative/vmselect.go +++ b/app/vmselect/clusternative/vmselect.go @@ -112,7 +112,7 @@ func newBlockIterator(qt *querytracer.Tracer, denyPartialResponse bool, sq *stor bi.workCh = make(chan workItem, 16) bi.wg.Add(1) go func() { - _, err := netstorage.ProcessBlocks(qt, denyPartialResponse, sq, func(mb *storage.MetricBlock) error { + _, err := netstorage.ProcessBlocks(qt, denyPartialResponse, sq, func(mb *storage.MetricBlock, workerIdx int) error { wi := workItem{ mb: mb, doneCh: make(chan struct{}), diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index ddd80d8fc..b6fb19449 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -65,7 +65,7 @@ type Results struct { tr storage.TimeRange deadline searchutils.Deadline - tbf *tmpBlocksFile + tbfs []*tmpBlocksFile packedTimeseries []packedTimeseries } @@ -77,8 +77,18 @@ func (rss *Results) Len() int { // Cancel cancels rss work. func (rss *Results) Cancel() { - putTmpBlocksFile(rss.tbf) - rss.tbf = nil + rss.closeTmpBlockFiles() +} + +func (rss *Results) closeTmpBlockFiles() { + closeTmpBlockFiles(rss.tbfs) + rss.tbfs = nil +} + +func closeTmpBlockFiles(tbfs []*tmpBlocksFile) { + for _, tbf := range tbfs { + putTmpBlocksFile(tbf) + } } type timeseriesWork struct { @@ -124,7 +134,7 @@ func (tsw *timeseriesWork) do(r *Result, workerID uint) error { atomic.StoreUint32(tsw.mustStop, 1) return fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.String()) } - if err := tsw.pts.Unpack(r, rss.tbf, rss.tr); err != nil { + if err := tsw.pts.Unpack(r, rss.tbfs, rss.tr); err != nil { atomic.StoreUint32(tsw.mustStop, 1) return fmt.Errorf("error during time series unpacking: %w", err) } @@ -173,10 +183,7 @@ var resultPool sync.Pool // rss becomes unusable after the call to RunParallel. func (rss *Results) RunParallel(qt *querytracer.Tracer, f func(rs *Result, workerID uint) error) error { qt = qt.NewChild("parallel process of fetched data") - defer func() { - putTmpBlocksFile(rss.tbf) - rss.tbf = nil - }() + defer rss.closeTmpBlockFiles() // Prepare work for workers. tsws := make([]*timeseriesWork, len(rss.packedTimeseries)) @@ -281,7 +288,7 @@ type unpackWorkItem struct { type unpackWork struct { ws []unpackWorkItem - tbf *tmpBlocksFile + tbfs []*tmpBlocksFile sbs []*sortBlock doneCh chan error } @@ -294,7 +301,7 @@ func (upw *unpackWork) reset() { w.tr = storage.TimeRange{} } upw.ws = upw.ws[:0] - upw.tbf = nil + upw.tbfs = nil sbs := upw.sbs for i := range sbs { sbs[i] = nil @@ -308,7 +315,7 @@ func (upw *unpackWork) reset() { func (upw *unpackWork) unpack(tmpBlock *storage.Block) { for _, w := range upw.ws { sb := getSortBlock() - if err := sb.unpackFrom(tmpBlock, upw.tbf, w.addr, w.tr); err != nil { + if err := sb.unpackFrom(tmpBlock, upw.tbfs, w.addr, w.tr); err != nil { putSortBlock(sb) upw.doneCh <- fmt.Errorf("cannot unpack block: %w", err) return @@ -380,7 +387,7 @@ var tmpBlockPool sync.Pool var unpackBatchSize = 5000 // Unpack unpacks pts to dst. -func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.TimeRange) error { +func (pts *packedTimeseries) Unpack(dst *Result, tbfs []*tmpBlocksFile, tr storage.TimeRange) error { dst.reset() if err := dst.MetricName.Unmarshal(bytesutil.ToUnsafeBytes(pts.metricName)); err != nil { return fmt.Errorf("cannot unmarshal metricName %q: %w", pts.metricName, err) @@ -415,13 +422,13 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage. // Feed workers with work upws := make([]*unpackWork, 0, 1+addrsLen/unpackBatchSize) upw := getUnpackWork() - upw.tbf = tbf + upw.tbfs = tbfs for _, addr := range pts.addrs { if len(upw.ws) >= unpackBatchSize { scheduleUnpackWork(workChs, upw) upws = append(upws, upw) upw = getUnpackWork() - upw.tbf = tbf + upw.tbfs = tbfs } upw.ws = append(upw.ws, unpackWorkItem{ addr: addr, @@ -583,9 +590,9 @@ func (sb *sortBlock) reset() { sb.NextIdx = 0 } -func (sb *sortBlock) unpackFrom(tmpBlock *storage.Block, tbf *tmpBlocksFile, addr tmpBlockAddr, tr storage.TimeRange) error { +func (sb *sortBlock) unpackFrom(tmpBlock *storage.Block, tbfs []*tmpBlocksFile, addr tmpBlockAddr, tr storage.TimeRange) error { tmpBlock.Reset() - tbf.MustReadBlockAt(tmpBlock, addr) + tbfs[addr.tbfIdx].MustReadBlockAt(tmpBlock, addr) if err := tmpBlock.UnmarshalData(); err != nil { return fmt.Errorf("cannot unmarshal block: %w", err) } @@ -1110,45 +1117,74 @@ func SeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, denyPartia } type tmpBlocksFileWrapper struct { - mu sync.Mutex - tbf *tmpBlocksFile - m map[string][]tmpBlockAddr - orderedMetricNames []string + tbfs []*tmpBlocksFile + ms []map[string][]tmpBlockAddr + orderedMetricNamess [][]string } -func (tbfw *tmpBlocksFileWrapper) RegisterEmptyBlock(mb *storage.MetricBlock) { - metricName := mb.MetricName - tbfw.mu.Lock() - if addrs := tbfw.m[string(metricName)]; addrs == nil { - // An optimization for big number of time series with long names: store only a single copy of metricNameStr - // in both tbfw.orderedMetricNames and tbfw.m. - tbfw.orderedMetricNames = append(tbfw.orderedMetricNames, string(metricName)) - tbfw.m[tbfw.orderedMetricNames[len(tbfw.orderedMetricNames)-1]] = []tmpBlockAddr{{}} +func newTmpBlocksFileWrapper() *tmpBlocksFileWrapper { + n := len(storageNodes) + tbfs := make([]*tmpBlocksFile, n) + for i := range tbfs { + tbfs[i] = getTmpBlocksFile() + } + ms := make([]map[string][]tmpBlockAddr, n) + for i := range ms { + ms[i] = make(map[string][]tmpBlockAddr) + } + return &tmpBlocksFileWrapper{ + tbfs: tbfs, + ms: ms, + orderedMetricNamess: make([][]string, n), } - tbfw.mu.Unlock() } -func (tbfw *tmpBlocksFileWrapper) RegisterAndWriteBlock(mb *storage.MetricBlock) error { - bb := tmpBufPool.Get() - bb.B = storage.MarshalBlock(bb.B[:0], &mb.Block) - tbfw.mu.Lock() - addr, err := tbfw.tbf.WriteBlockData(bb.B) - tmpBufPool.Put(bb) - if err == nil { - metricName := mb.MetricName - addrs := tbfw.m[string(metricName)] - addrs = append(addrs, addr) - if len(addrs) > 1 { - tbfw.m[string(metricName)] = addrs - } else { - // An optimization for big number of time series with long names: store only a single copy of metricNameStr - // in both tbfw.orderedMetricNames and tbfw.m. - tbfw.orderedMetricNames = append(tbfw.orderedMetricNames, string(metricName)) - tbfw.m[tbfw.orderedMetricNames[len(tbfw.orderedMetricNames)-1]] = addrs +func (tbfw *tmpBlocksFileWrapper) Finalize() ([]string, map[string][]tmpBlockAddr, uint64, error) { + var bytesTotal uint64 + for i, tbf := range tbfw.tbfs { + if err := tbf.Finalize(); err != nil { + // Close the remaining tbfs before returning the error + closeTmpBlockFiles(tbfw.tbfs[i:]) + return nil, nil, 0, fmt.Errorf("cannot finalize temporary blocks file with %d series: %w", len(tbfw.ms[i]), err) + } + bytesTotal += tbf.Len() + } + orderedMetricNames := tbfw.orderedMetricNamess[0] + addrsByMetricName := make(map[string][]tmpBlockAddr) + for i, m := range tbfw.ms { + for _, metricName := range tbfw.orderedMetricNamess[i] { + dstAddrs, ok := addrsByMetricName[metricName] + if !ok { + orderedMetricNames = append(orderedMetricNames, metricName) + } + addrsByMetricName[metricName] = append(dstAddrs, m[metricName]...) } } - tbfw.mu.Unlock() - return err + return orderedMetricNames, addrsByMetricName, bytesTotal, nil +} + +func (tbfw *tmpBlocksFileWrapper) RegisterAndWriteBlock(mb *storage.MetricBlock, workerIdx int) error { + bb := tmpBufPool.Get() + bb.B = storage.MarshalBlock(bb.B[:0], &mb.Block) + addr, err := tbfw.tbfs[workerIdx].WriteBlockData(bb.B, workerIdx) + tmpBufPool.Put(bb) + if err != nil { + return err + } + metricName := mb.MetricName + m := tbfw.ms[workerIdx] + addrs := m[string(metricName)] + addrs = append(addrs, addr) + if len(addrs) > 1 { + m[string(metricName)] = addrs + } else { + // An optimization for big number of time series with long names: store only a single copy of metricNameStr + // in both tbfw.orderedMetricNamess and tbfw.ms. + tbfw.orderedMetricNamess[workerIdx] = append(tbfw.orderedMetricNamess[workerIdx], string(metricName)) + metricNameStr := tbfw.orderedMetricNamess[workerIdx][len(tbfw.orderedMetricNamess[workerIdx])-1] + m[metricNameStr] = addrs + } + return nil } var metricNamePool = &sync.Pool{ @@ -1173,9 +1209,9 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear MinTimestamp: sq.MinTimestamp, MaxTimestamp: sq.MaxTimestamp, } - var blocksRead uint64 - var samples uint64 - processBlock := func(mb *storage.MetricBlock) error { + blocksRead := newPerNodeCounter() + samples := newPerNodeCounter() + processBlock := func(mb *storage.MetricBlock, workerIdx int) error { mn := metricNamePool.Get().(*storage.MetricName) if err := mn.Unmarshal(mb.MetricName); err != nil { return fmt.Errorf("cannot unmarshal metricName: %w", err) @@ -1185,12 +1221,12 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear } mn.Reset() metricNamePool.Put(mn) - atomic.AddUint64(&blocksRead, 1) - atomic.AddUint64(&samples, uint64(mb.Block.RowsCount())) + blocksRead.Add(workerIdx, 1) + samples.Add(workerIdx, uint64(mb.Block.RowsCount())) return nil } _, err := ProcessBlocks(qt, true, sq, processBlock, deadline) - qt.Printf("export blocks=%d, samples=%d, err=%v", blocksRead, samples, err) + qt.Printf("export blocks=%d, samples=%d, err=%v", blocksRead.GetTotal(), samples.GetTotal(), err) if err != nil { return fmt.Errorf("error occured during export: %w", err) } @@ -1266,43 +1302,43 @@ func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *st MinTimestamp: sq.MinTimestamp, MaxTimestamp: sq.MaxTimestamp, } - tbfw := &tmpBlocksFileWrapper{ - tbf: getTmpBlocksFile(), - m: make(map[string][]tmpBlockAddr), - } - var blocksRead uint64 - var samples uint64 - processBlock := func(mb *storage.MetricBlock) error { - atomic.AddUint64(&blocksRead, 1) - n := atomic.AddUint64(&samples, uint64(mb.Block.RowsCount())) - if *maxSamplesPerQuery > 0 && n > uint64(*maxSamplesPerQuery) { - return fmt.Errorf("cannot select more than -search.maxSamplesPerQuery=%d samples; possible solutions: to increase the -search.maxSamplesPerQuery; to reduce time range for the query; to use more specific label filters in order to select lower number of series", *maxSamplesPerQuery) + tbfw := newTmpBlocksFileWrapper() + blocksRead := newPerNodeCounter() + samples := newPerNodeCounter() + maxSamplesPerWorker := uint64(*maxSamplesPerQuery) / uint64(len(storageNodes)) + processBlock := func(mb *storage.MetricBlock, workerIdx int) error { + blocksRead.Add(workerIdx, 1) + n := samples.Add(workerIdx, uint64(mb.Block.RowsCount())) + if *maxSamplesPerQuery > 0 && n > maxSamplesPerWorker && samples.GetTotal() > uint64(*maxSamplesPerQuery) { + return fmt.Errorf("cannot select more than -search.maxSamplesPerQuery=%d samples; possible solutions: "+ + "to increase the -search.maxSamplesPerQuery; to reduce time range for the query; "+ + "to use more specific label filters in order to select lower number of series", *maxSamplesPerQuery) } - if err := tbfw.RegisterAndWriteBlock(mb); err != nil { + if err := tbfw.RegisterAndWriteBlock(mb, workerIdx); err != nil { return fmt.Errorf("cannot write MetricBlock to temporary blocks file: %w", err) } return nil } isPartial, err := ProcessBlocks(qt, denyPartialResponse, sq, processBlock, deadline) if err != nil { - putTmpBlocksFile(tbfw.tbf) + closeTmpBlockFiles(tbfw.tbfs) return nil, false, fmt.Errorf("error occured during search: %w", err) } - if err := tbfw.tbf.Finalize(); err != nil { - putTmpBlocksFile(tbfw.tbf) - return nil, false, fmt.Errorf("cannot finalize temporary blocks file with %d time series: %w", len(tbfw.m), err) + orderedMetricNames, addrsByMetricName, bytesTotal, err := tbfw.Finalize() + if err != nil { + return nil, false, fmt.Errorf("cannot finalize temporary blocks files: %w", err) } - qt.Printf("fetch unique series=%d, blocks=%d, samples=%d, bytes=%d", len(tbfw.m), blocksRead, samples, tbfw.tbf.Len()) + qt.Printf("fetch unique series=%d, blocks=%d, samples=%d, bytes=%d", len(addrsByMetricName), blocksRead.GetTotal(), samples.GetTotal(), bytesTotal) var rss Results rss.tr = tr rss.deadline = deadline - rss.tbf = tbfw.tbf - pts := make([]packedTimeseries, len(tbfw.orderedMetricNames)) - for i, metricName := range tbfw.orderedMetricNames { + rss.tbfs = tbfw.tbfs + pts := make([]packedTimeseries, len(orderedMetricNames)) + for i, metricName := range orderedMetricNames { pts[i] = packedTimeseries{ metricName: metricName, - addrs: tbfw.m[metricName], + addrs: addrsByMetricName[metricName], } } rss.packedTimeseries = pts @@ -1311,7 +1347,7 @@ func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *st // ProcessBlocks calls processBlock per each block matching the given sq. func ProcessBlocks(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.SearchQuery, - processBlock func(mb *storage.MetricBlock) error, deadline searchutils.Deadline) (bool, error) { + processBlock func(mb *storage.MetricBlock, workerIdx int) error, deadline searchutils.Deadline) (bool, error) { requestData := sq.Marshal(nil) // Make sure that processBlock is no longer called after the exit from ProcessBlocks() function. @@ -1332,7 +1368,7 @@ func ProcessBlocks(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage if atomic.LoadUint32(&stopped) != 0 { return nil } - return processBlock(mb) + return processBlock(mb, workerIdx) } // Send the query to all the storage nodes in parallel. @@ -2367,3 +2403,32 @@ func applyGraphiteRegexpFilter(filter string, ss []string) ([]string, error) { } return dst, nil } + +type uint64WithPadding struct { + n uint64 + // Prevents false sharing on widespread platforms with + // 128 mod (cache line size) = 0 . + pad [128 - unsafe.Sizeof(uint64(0))%128]byte +} + +type perNodeCounter struct { + ns []uint64WithPadding +} + +func newPerNodeCounter() *perNodeCounter { + return &perNodeCounter{ + ns: make([]uint64WithPadding, len(storageNodes)), + } +} + +func (pnc *perNodeCounter) Add(nodeIdx int, n uint64) uint64 { + return atomic.AddUint64(&pnc.ns[nodeIdx].n, n) +} + +func (pnc *perNodeCounter) GetTotal() uint64 { + var total uint64 + for _, n := range pnc.ns { + total += n.n + } + return total +} diff --git a/app/vmselect/netstorage/tmp_blocks_file.go b/app/vmselect/netstorage/tmp_blocks_file.go index b558b28bc..cb985801b 100644 --- a/app/vmselect/netstorage/tmp_blocks_file.go +++ b/app/vmselect/netstorage/tmp_blocks_file.go @@ -79,10 +79,11 @@ var tmpBlocksFilePool sync.Pool type tmpBlockAddr struct { offset uint64 size int + tbfIdx int } func (addr tmpBlockAddr) String() string { - return fmt.Sprintf("offset %d, size %d", addr.offset, addr.size) + return fmt.Sprintf("offset %d, size %d, tbfIdx %d", addr.offset, addr.size, addr.tbfIdx) } var ( @@ -96,8 +97,9 @@ var ( // // It returns errors since the operation may fail on space shortage // and this must be handled. -func (tbf *tmpBlocksFile) WriteBlockData(b []byte) (tmpBlockAddr, error) { +func (tbf *tmpBlocksFile) WriteBlockData(b []byte, tbfIdx int) (tmpBlockAddr, error) { var addr tmpBlockAddr + addr.tbfIdx = tbfIdx addr.offset = tbf.offset addr.size = len(b) tbf.offset += uint64(addr.size) diff --git a/app/vmselect/netstorage/tmp_blocks_file_test.go b/app/vmselect/netstorage/tmp_blocks_file_test.go index 46287ac06..eaef09490 100644 --- a/app/vmselect/netstorage/tmp_blocks_file_test.go +++ b/app/vmselect/netstorage/tmp_blocks_file_test.go @@ -86,10 +86,13 @@ func testTmpBlocksFile() error { for tbf.offset < uint64(size) { b := createBlock() bb.B = storage.MarshalBlock(bb.B[:0], b) - addr, err := tbf.WriteBlockData(bb.B) + addr, err := tbf.WriteBlockData(bb.B, 123) if err != nil { return fmt.Errorf("cannot write block at offset %d: %w", tbf.offset, err) } + if addr.tbfIdx != 123 { + return fmt.Errorf("unexpected tbfIdx; got %d; want 123", addr.tbfIdx) + } if addr.offset+uint64(addr.size) != tbf.offset { return fmt.Errorf("unexpected addr=%+v for offset %v", &addr, tbf.offset) }