From 87e0d69bf45d1157f8d79b415ff9292959077c3f Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 17 Aug 2022 14:07:49 +0300 Subject: [PATCH] app/vmselect/netstorage: fix a bug introduced in 1a254ea20c561561093d60f91815d775189ff93b The bug results in `duplicate output time series` error because the same time series is added two times into the orderedMetricNames list inside the tmpBlocksFileWrapper.Finalize(). While at it, properly release all the tmpBlocksFile structs on tbf.Finalize() error. Previously only the remaining tbf entries were released. This could result in resource leak. --- app/vmselect/netstorage/netstorage.go | 53 ++++++++++++++------------- 1 file changed, 27 insertions(+), 26 deletions(-) diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index b6fb194491..d382822efa 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -1139,30 +1139,6 @@ func newTmpBlocksFileWrapper() *tmpBlocksFileWrapper { } } -func (tbfw *tmpBlocksFileWrapper) Finalize() ([]string, map[string][]tmpBlockAddr, uint64, error) { - var bytesTotal uint64 - for i, tbf := range tbfw.tbfs { - if err := tbf.Finalize(); err != nil { - // Close the remaining tbfs before returning the error - closeTmpBlockFiles(tbfw.tbfs[i:]) - return nil, nil, 0, fmt.Errorf("cannot finalize temporary blocks file with %d series: %w", len(tbfw.ms[i]), err) - } - bytesTotal += tbf.Len() - } - orderedMetricNames := tbfw.orderedMetricNamess[0] - addrsByMetricName := make(map[string][]tmpBlockAddr) - for i, m := range tbfw.ms { - for _, metricName := range tbfw.orderedMetricNamess[i] { - dstAddrs, ok := addrsByMetricName[metricName] - if !ok { - orderedMetricNames = append(orderedMetricNames, metricName) - } - addrsByMetricName[metricName] = append(dstAddrs, m[metricName]...) - } - } - return orderedMetricNames, addrsByMetricName, bytesTotal, nil -} - func (tbfw *tmpBlocksFileWrapper) RegisterAndWriteBlock(mb *storage.MetricBlock, workerIdx int) error { bb := tmpBufPool.Get() bb.B = storage.MarshalBlock(bb.B[:0], &mb.Block) @@ -1180,13 +1156,38 @@ func (tbfw *tmpBlocksFileWrapper) RegisterAndWriteBlock(mb *storage.MetricBlock, } else { // An optimization for big number of time series with long names: store only a single copy of metricNameStr // in both tbfw.orderedMetricNamess and tbfw.ms. - tbfw.orderedMetricNamess[workerIdx] = append(tbfw.orderedMetricNamess[workerIdx], string(metricName)) - metricNameStr := tbfw.orderedMetricNamess[workerIdx][len(tbfw.orderedMetricNamess[workerIdx])-1] + orderedMetricNames := tbfw.orderedMetricNamess[workerIdx] + orderedMetricNames = append(orderedMetricNames, string(metricName)) + metricNameStr := orderedMetricNames[len(orderedMetricNames)-1] m[metricNameStr] = addrs + tbfw.orderedMetricNamess[workerIdx] = orderedMetricNames } return nil } +func (tbfw *tmpBlocksFileWrapper) Finalize() ([]string, map[string][]tmpBlockAddr, uint64, error) { + var bytesTotal uint64 + for i, tbf := range tbfw.tbfs { + if err := tbf.Finalize(); err != nil { + closeTmpBlockFiles(tbfw.tbfs) + return nil, nil, 0, fmt.Errorf("cannot finalize temporary blocks file with %d series: %w", len(tbfw.ms[i]), err) + } + bytesTotal += tbf.Len() + } + orderedMetricNames := tbfw.orderedMetricNamess[0] + addrsByMetricName := tbfw.ms[0] + for i, m := range tbfw.ms[1:] { + for _, metricName := range tbfw.orderedMetricNamess[i] { + dstAddrs, ok := addrsByMetricName[metricName] + if !ok { + orderedMetricNames = append(orderedMetricNames, metricName) + } + addrsByMetricName[metricName] = append(dstAddrs, m[metricName]...) + } + } + return orderedMetricNames, addrsByMetricName, bytesTotal, nil +} + var metricNamePool = &sync.Pool{ New: func() interface{} { return &storage.MetricName{}