diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 8f1d353743..c28a8b946c 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -82,7 +82,7 @@ func (rss *Results) mustClose() { var timeseriesWorkCh = make(chan *timeseriesWork, gomaxprocs*16) type timeseriesWork struct { - mustStop uint64 + mustStop *uint32 rss *Results pts *packedTimeseries f func(rs *Result, workerID uint) error @@ -92,7 +92,7 @@ type timeseriesWork struct { } func (tsw *timeseriesWork) reset() { - tsw.mustStop = 0 + tsw.mustStop = nil tsw.rss = nil tsw.pts = nil tsw.f = nil @@ -129,21 +129,24 @@ func timeseriesWorker(workerID uint) { var rs Result var rsLastResetTime uint64 for tsw := range timeseriesWorkCh { - rss := tsw.rss - if rss.deadline.Exceeded() { - tsw.doneCh <- fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.String()) - continue - } - if atomic.LoadUint64(&tsw.mustStop) != 0 { + if atomic.LoadUint32(tsw.mustStop) != 0 { tsw.doneCh <- nil continue } + rss := tsw.rss + if rss.deadline.Exceeded() { + atomic.StoreUint32(tsw.mustStop, 1) + tsw.doneCh <- fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.String()) + continue + } if err := tsw.pts.Unpack(&rs, rss.tbf, rss.tr, rss.fetchData); err != nil { + atomic.StoreUint32(tsw.mustStop, 1) tsw.doneCh <- fmt.Errorf("error during time series unpacking: %w", err) continue } if len(rs.Timestamps) > 0 || !rss.fetchData { if err := tsw.f(&rs, workerID); err != nil { + atomic.StoreUint32(tsw.mustStop, 1) tsw.doneCh <- err continue } @@ -171,11 +174,13 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint) error) error { // Feed workers with work. tsws := make([]*timeseriesWork, len(rss.packedTimeseries)) + var mustStop uint32 for i := range rss.packedTimeseries { tsw := getTimeseriesWork() tsw.rss = rss tsw.pts = &rss.packedTimeseries[i] tsw.f = f + tsw.mustStop = &mustStop timeseriesWorkCh <- tsw tsws[i] = tsw } @@ -186,15 +191,9 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint) error) error { var firstErr error rowsProcessedTotal := 0 for _, tsw := range tsws { - err := <-tsw.doneCh - if err != nil && firstErr == nil { - // Return just the first error, since other errors - // are likely duplicate the first error. + if err := <-tsw.doneCh; err != nil && firstErr == nil { + // Return just the first error, since other errors are likely duplicate the first error. firstErr = err - // Notify all the the tsws that they shouldn't be executed. - for _, tsw := range tsws { - atomic.StoreUint64(&tsw.mustStop, 1) - } } rowsProcessedTotal += tsw.rowsProcessed putTimeseriesWork(tsw) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 260b06d610..4d0aef5a55 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -4,6 +4,8 @@ * FEATURE: publish vmutils for `GOOS=arm` on [releases page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases). +* BUGFIX: prevent from possible incomplete query results after timed out query. + # [v1.57.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.57.0)