mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-02-09 15:27:11 +00:00
app/vmselect: prevent from possible incomplete query results after timed out query
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/711
This commit is contained in:
parent
9b4e608199
commit
7bafaad46d
2 changed files with 17 additions and 16 deletions
|
@ -82,7 +82,7 @@ func (rss *Results) Cancel() {
|
|||
var timeseriesWorkCh = make(chan *timeseriesWork, gomaxprocs*16)
|
||||
|
||||
type timeseriesWork struct {
|
||||
mustStop uint64
|
||||
mustStop *uint32
|
||||
rss *Results
|
||||
pts *packedTimeseries
|
||||
f func(rs *Result, workerID uint) error
|
||||
|
@ -92,7 +92,7 @@ type timeseriesWork struct {
|
|||
}
|
||||
|
||||
func (tsw *timeseriesWork) reset() {
|
||||
tsw.mustStop = 0
|
||||
tsw.mustStop = nil
|
||||
tsw.rss = nil
|
||||
tsw.pts = nil
|
||||
tsw.f = nil
|
||||
|
@ -129,21 +129,24 @@ func timeseriesWorker(workerID uint) {
|
|||
var rs Result
|
||||
var rsLastResetTime uint64
|
||||
for tsw := range timeseriesWorkCh {
|
||||
rss := tsw.rss
|
||||
if rss.deadline.Exceeded() {
|
||||
tsw.doneCh <- fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.String())
|
||||
continue
|
||||
}
|
||||
if atomic.LoadUint64(&tsw.mustStop) != 0 {
|
||||
if atomic.LoadUint32(tsw.mustStop) != 0 {
|
||||
tsw.doneCh <- nil
|
||||
continue
|
||||
}
|
||||
rss := tsw.rss
|
||||
if rss.deadline.Exceeded() {
|
||||
atomic.StoreUint32(tsw.mustStop, 1)
|
||||
tsw.doneCh <- fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.String())
|
||||
continue
|
||||
}
|
||||
if err := tsw.pts.Unpack(rss.tbf, &rs, rss.tr, rss.fetchData, rss.at); err != nil {
|
||||
atomic.StoreUint32(tsw.mustStop, 1)
|
||||
tsw.doneCh <- fmt.Errorf("error during time series unpacking: %w", err)
|
||||
continue
|
||||
}
|
||||
if len(rs.Timestamps) > 0 || !rss.fetchData {
|
||||
if err := tsw.f(&rs, workerID); err != nil {
|
||||
atomic.StoreUint32(tsw.mustStop, 1)
|
||||
tsw.doneCh <- err
|
||||
continue
|
||||
}
|
||||
|
@ -174,11 +177,13 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint) error) error {
|
|||
|
||||
// Feed workers with work.
|
||||
tsws := make([]*timeseriesWork, len(rss.packedTimeseries))
|
||||
var mustStop uint32
|
||||
for i := range rss.packedTimeseries {
|
||||
tsw := getTimeseriesWork()
|
||||
tsw.rss = rss
|
||||
tsw.pts = &rss.packedTimeseries[i]
|
||||
tsw.f = f
|
||||
tsw.mustStop = &mustStop
|
||||
timeseriesWorkCh <- tsw
|
||||
tsws[i] = tsw
|
||||
}
|
||||
|
@ -189,15 +194,9 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint) error) error {
|
|||
var firstErr error
|
||||
rowsProcessedTotal := 0
|
||||
for _, tsw := range tsws {
|
||||
err := <-tsw.doneCh
|
||||
if err != nil && firstErr == nil {
|
||||
// Return just the first error, since other errors
|
||||
// are likely duplicate the first error.
|
||||
if err := <-tsw.doneCh; err != nil && firstErr == nil {
|
||||
// Return just the first error, since other errors are likely duplicate the first error.
|
||||
firstErr = err
|
||||
// Notify all the the tsws that they shouldn't be executed.
|
||||
for _, tsw := range tsws {
|
||||
atomic.StoreUint64(&tsw.mustStop, 1)
|
||||
}
|
||||
}
|
||||
rowsProcessedTotal += tsw.rowsProcessed
|
||||
putTimeseriesWork(tsw)
|
||||
|
|
|
@ -4,6 +4,8 @@
|
|||
|
||||
* FEATURE: publish vmutils for `GOOS=arm` on [releases page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases).
|
||||
|
||||
* BUGFIX: prevent from possible incomplete query results after timed out query.
|
||||
|
||||
|
||||
# [v1.57.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.57.0)
|
||||
|
||||
|
|
Loading…
Reference in a new issue