From bc254cb49789ab644299a39322471f8d6641fff2 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 8 Aug 2022 12:54:55 +0300 Subject: [PATCH] app/vmselect/netstorage: prevent from calling processBlocks callback after the exit from ProcessBlocks function This should prevent from panic at multi-level vmselect when the top-level vmselect is configured with -replicationFactor > 1 --- app/vmselect/netstorage/netstorage.go | 43 +++++++++++---------------- docs/CHANGELOG.md | 5 +++- 2 files changed, 21 insertions(+), 27 deletions(-) diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 32bd5e1b1..7fe2366d3 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -1179,16 +1179,9 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear MinTimestamp: sq.MinTimestamp, MaxTimestamp: sq.MaxTimestamp, } - var wg syncwg.WaitGroup - var stopped uint32 var blocksRead uint64 var samples uint64 processBlock := func(mb *storage.MetricBlock) error { - wg.Add(1) - defer wg.Done() - if atomic.LoadUint32(&stopped) != 0 { - return nil - } mn := metricNamePool.Get().(*storage.MetricName) if err := mn.Unmarshal(mb.MetricName); err != nil { return fmt.Errorf("cannot unmarshal metricName: %w", err) @@ -1203,12 +1196,7 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear return nil } _, err := ProcessBlocks(qt, true, sq, processBlock, deadline) - - // Make sure processBlock isn't called anymore in order to prevent from data races. - atomic.StoreUint32(&stopped, 1) - wg.Wait() - qt.Printf("export blocks=%d, samples=%d", blocksRead, samples) - + qt.Printf("export blocks=%d, samples=%d, err=%v", blocksRead, samples, err) if err != nil { return fmt.Errorf("error occured during export: %w", err) } @@ -1288,16 +1276,9 @@ func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *st tbf: getTmpBlocksFile(), m: make(map[string][]tmpBlockAddr), } - var wg syncwg.WaitGroup - var stopped uint32 var blocksRead uint64 var samples uint64 processBlock := func(mb *storage.MetricBlock) error { - wg.Add(1) - defer wg.Done() - if atomic.LoadUint32(&stopped) != 0 { - return nil - } atomic.AddUint64(&blocksRead, 1) n := atomic.AddUint64(&samples, uint64(mb.Block.RowsCount())) if *maxSamplesPerQuery > 0 && n > uint64(*maxSamplesPerQuery) { @@ -1309,11 +1290,6 @@ func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *st return nil } isPartial, err := ProcessBlocks(qt, denyPartialResponse, sq, processBlock, deadline) - - // Make sure processBlock isn't called anymore in order to protect from data races. - atomic.StoreUint32(&stopped, 1) - wg.Wait() - if err != nil { putTmpBlocksFile(tbfw.tbf) return nil, false, fmt.Errorf("error occured during search: %w", err) @@ -1344,10 +1320,22 @@ func ProcessBlocks(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage processBlock func(mb *storage.MetricBlock) error, deadline searchutils.Deadline) (bool, error) { requestData := sq.Marshal(nil) + // Make sure that processBlock is no longer called after the exit from ProcessBlocks() function. + var stopped uint32 + var wg syncwg.WaitGroup + f := func(mb *storage.MetricBlock) error { + wg.Add(1) + defer wg.Done() + if atomic.LoadUint32(&stopped) != 0 { + return nil + } + return processBlock(mb) + } + // Send the query to all the storage nodes in parallel. snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, idx int, sn *storageNode) interface{} { sn.searchRequests.Inc() - err := sn.processSearchQuery(qt, requestData, processBlock, deadline) + err := sn.processSearchQuery(qt, requestData, f, deadline) if err != nil { sn.searchErrors.Inc() err = fmt.Errorf("cannot perform search on vmstorage %s: %w", sn.connPool.Addr(), err) @@ -1360,6 +1348,9 @@ func ProcessBlocks(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage errP := result.(*error) return *errP }) + // Make sure that processBlock is no longer called after the exit from ProcessBlocks() function. + atomic.StoreUint32(&stopped, 1) + wg.Wait() if err != nil { return isPartial, fmt.Errorf("cannot fetch query results from vmstorage nodes: %w", err) } diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 41dfb3ab5..dcb8115d0 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -13,7 +13,10 @@ The following tip changes can be tested by building VictoriaMetrics components f * [How to build vmauth](https://docs.victoriametrics.com/vmauth.html#how-to-build-from-sources) * [How to build vmctl](https://docs.victoriametrics.com/vmctl.html#how-to-build) -## tip +## v1.79.x long-time support release (LTS) + +* BUGFIX: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): fix potential panic in [multi-level cluster setup](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multi-level-cluster-setup) when top-level `vmselect` is configured with `-replicationFactor` bigger than 1. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2961). + ## [v1.79.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.79.1)