lib/persistentqueue: eliminate possible data race when obtaining vm_persistentqueue_bytes_pending metric value

This commit is contained in:
Aliaksandr Valialkin 2021-04-27 00:23:25 +03:00
parent 87018650dd
commit 60947fb2d5
3 changed files with 9 additions and 5 deletions

View file

@ -12,6 +12,7 @@ Thanks to @johnseekins!
* BUGFIX: vmagent: properly update `role: endpoints` and `role: endpointslices` scrape targets if the underlying service changes in `kubernetes_sd_config`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240).
* BUGFIX: vmagent: apply `scrape_timeout` on receiving the first response byte from `stream_parse: true` scrape targets. Previously it was applied to receiving and *processing* the full response stream. This could result in false timeout errors when scrape target exposes millions of metrics as described [here](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1017#issuecomment-767235047).
* BUGFIX: vmagent: eliminate possible data race when obtaining value for the metric `vm_persistentqueue_bytes_pending`. The data race could result in incorrect value for this metric.
* BUGFIX: vmstorage: remove empty directories on startup. Such directories can be left after unclean shutdown on NFS storage. Previously such directories could lead to crashloop until manually removed. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1142).

View file

@ -1,11 +1,13 @@
package persistentqueue
import (
"fmt"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/metrics"
)
// FastQueue is fast persistent queue, which prefers sending data via memory.
@ -47,6 +49,12 @@ func MustOpenFastQueue(path, name string, maxInmemoryBlocks, maxPendingBytes int
}
fq.cond.L = &fq.mu
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vm_persistentqueue_bytes_pending{path=%q}`, path), func() float64 {
fq.mu.Lock()
n := fq.pq.GetPendingBytes()
fq.mu.Unlock()
return float64(n)
})
pendingBytes := fq.GetPendingBytes()
logger.Infof("opened fast persistent queue at %q with maxInmemoryBlocks=%d, it contains %d pending bytes", path, maxInmemoryBlocks, pendingBytes)
return fq

View file

@ -61,8 +61,6 @@ type queue struct {
blocksRead *metrics.Counter
bytesRead *metrics.Counter
bytesPending *metrics.Gauge
}
// ResetIfEmpty resets q if it is empty.
@ -173,9 +171,6 @@ func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingB
q.bytesWritten = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_bytes_written_total{path=%q}`, path))
q.blocksRead = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_blocks_read_total{path=%q}`, path))
q.bytesRead = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_bytes_read_total{path=%q}`, path))
q.bytesPending = metrics.GetOrCreateGauge(fmt.Sprintf(`vm_persistentqueue_bytes_pending{path=%q}`, path), func() float64 {
return float64(q.GetPendingBytes())
})
cleanOnError := func() {
if q.reader != nil {