mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-11 14:53:49 +00:00
app/vmselect/netstorage: prevent from calling processBlocks callback after the exit from ProcessBlocks function
This should prevent from panic at multi-level vmselect when the top-level vmselect is configured with -replicationFactor > 1
This commit is contained in:
parent
aea2bcd495
commit
bc254cb497
2 changed files with 21 additions and 27 deletions
|
@ -1179,16 +1179,9 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear
|
||||||
MinTimestamp: sq.MinTimestamp,
|
MinTimestamp: sq.MinTimestamp,
|
||||||
MaxTimestamp: sq.MaxTimestamp,
|
MaxTimestamp: sq.MaxTimestamp,
|
||||||
}
|
}
|
||||||
var wg syncwg.WaitGroup
|
|
||||||
var stopped uint32
|
|
||||||
var blocksRead uint64
|
var blocksRead uint64
|
||||||
var samples uint64
|
var samples uint64
|
||||||
processBlock := func(mb *storage.MetricBlock) error {
|
processBlock := func(mb *storage.MetricBlock) error {
|
||||||
wg.Add(1)
|
|
||||||
defer wg.Done()
|
|
||||||
if atomic.LoadUint32(&stopped) != 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
mn := metricNamePool.Get().(*storage.MetricName)
|
mn := metricNamePool.Get().(*storage.MetricName)
|
||||||
if err := mn.Unmarshal(mb.MetricName); err != nil {
|
if err := mn.Unmarshal(mb.MetricName); err != nil {
|
||||||
return fmt.Errorf("cannot unmarshal metricName: %w", err)
|
return fmt.Errorf("cannot unmarshal metricName: %w", err)
|
||||||
|
@ -1203,12 +1196,7 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
_, err := ProcessBlocks(qt, true, sq, processBlock, deadline)
|
_, err := ProcessBlocks(qt, true, sq, processBlock, deadline)
|
||||||
|
qt.Printf("export blocks=%d, samples=%d, err=%v", blocksRead, samples, err)
|
||||||
// Make sure processBlock isn't called anymore in order to prevent from data races.
|
|
||||||
atomic.StoreUint32(&stopped, 1)
|
|
||||||
wg.Wait()
|
|
||||||
qt.Printf("export blocks=%d, samples=%d", blocksRead, samples)
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error occured during export: %w", err)
|
return fmt.Errorf("error occured during export: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -1288,16 +1276,9 @@ func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *st
|
||||||
tbf: getTmpBlocksFile(),
|
tbf: getTmpBlocksFile(),
|
||||||
m: make(map[string][]tmpBlockAddr),
|
m: make(map[string][]tmpBlockAddr),
|
||||||
}
|
}
|
||||||
var wg syncwg.WaitGroup
|
|
||||||
var stopped uint32
|
|
||||||
var blocksRead uint64
|
var blocksRead uint64
|
||||||
var samples uint64
|
var samples uint64
|
||||||
processBlock := func(mb *storage.MetricBlock) error {
|
processBlock := func(mb *storage.MetricBlock) error {
|
||||||
wg.Add(1)
|
|
||||||
defer wg.Done()
|
|
||||||
if atomic.LoadUint32(&stopped) != 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
atomic.AddUint64(&blocksRead, 1)
|
atomic.AddUint64(&blocksRead, 1)
|
||||||
n := atomic.AddUint64(&samples, uint64(mb.Block.RowsCount()))
|
n := atomic.AddUint64(&samples, uint64(mb.Block.RowsCount()))
|
||||||
if *maxSamplesPerQuery > 0 && n > uint64(*maxSamplesPerQuery) {
|
if *maxSamplesPerQuery > 0 && n > uint64(*maxSamplesPerQuery) {
|
||||||
|
@ -1309,11 +1290,6 @@ func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *st
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
isPartial, err := ProcessBlocks(qt, denyPartialResponse, sq, processBlock, deadline)
|
isPartial, err := ProcessBlocks(qt, denyPartialResponse, sq, processBlock, deadline)
|
||||||
|
|
||||||
// Make sure processBlock isn't called anymore in order to protect from data races.
|
|
||||||
atomic.StoreUint32(&stopped, 1)
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
putTmpBlocksFile(tbfw.tbf)
|
putTmpBlocksFile(tbfw.tbf)
|
||||||
return nil, false, fmt.Errorf("error occured during search: %w", err)
|
return nil, false, fmt.Errorf("error occured during search: %w", err)
|
||||||
|
@ -1344,10 +1320,22 @@ func ProcessBlocks(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage
|
||||||
processBlock func(mb *storage.MetricBlock) error, deadline searchutils.Deadline) (bool, error) {
|
processBlock func(mb *storage.MetricBlock) error, deadline searchutils.Deadline) (bool, error) {
|
||||||
requestData := sq.Marshal(nil)
|
requestData := sq.Marshal(nil)
|
||||||
|
|
||||||
|
// Make sure that processBlock is no longer called after the exit from ProcessBlocks() function.
|
||||||
|
var stopped uint32
|
||||||
|
var wg syncwg.WaitGroup
|
||||||
|
f := func(mb *storage.MetricBlock) error {
|
||||||
|
wg.Add(1)
|
||||||
|
defer wg.Done()
|
||||||
|
if atomic.LoadUint32(&stopped) != 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return processBlock(mb)
|
||||||
|
}
|
||||||
|
|
||||||
// Send the query to all the storage nodes in parallel.
|
// Send the query to all the storage nodes in parallel.
|
||||||
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, idx int, sn *storageNode) interface{} {
|
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, idx int, sn *storageNode) interface{} {
|
||||||
sn.searchRequests.Inc()
|
sn.searchRequests.Inc()
|
||||||
err := sn.processSearchQuery(qt, requestData, processBlock, deadline)
|
err := sn.processSearchQuery(qt, requestData, f, deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
sn.searchErrors.Inc()
|
sn.searchErrors.Inc()
|
||||||
err = fmt.Errorf("cannot perform search on vmstorage %s: %w", sn.connPool.Addr(), err)
|
err = fmt.Errorf("cannot perform search on vmstorage %s: %w", sn.connPool.Addr(), err)
|
||||||
|
@ -1360,6 +1348,9 @@ func ProcessBlocks(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage
|
||||||
errP := result.(*error)
|
errP := result.(*error)
|
||||||
return *errP
|
return *errP
|
||||||
})
|
})
|
||||||
|
// Make sure that processBlock is no longer called after the exit from ProcessBlocks() function.
|
||||||
|
atomic.StoreUint32(&stopped, 1)
|
||||||
|
wg.Wait()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return isPartial, fmt.Errorf("cannot fetch query results from vmstorage nodes: %w", err)
|
return isPartial, fmt.Errorf("cannot fetch query results from vmstorage nodes: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,7 +13,10 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
||||||
* [How to build vmauth](https://docs.victoriametrics.com/vmauth.html#how-to-build-from-sources)
|
* [How to build vmauth](https://docs.victoriametrics.com/vmauth.html#how-to-build-from-sources)
|
||||||
* [How to build vmctl](https://docs.victoriametrics.com/vmctl.html#how-to-build)
|
* [How to build vmctl](https://docs.victoriametrics.com/vmctl.html#how-to-build)
|
||||||
|
|
||||||
## tip
|
## v1.79.x long-time support release (LTS)
|
||||||
|
|
||||||
|
* BUGFIX: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): fix potential panic in [multi-level cluster setup](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multi-level-cluster-setup) when top-level `vmselect` is configured with `-replicationFactor` bigger than 1. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2961).
|
||||||
|
|
||||||
|
|
||||||
## [v1.79.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.79.1)
|
## [v1.79.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.79.1)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue