diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 6003ded54..bfd636495 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -12,6 +12,7 @@ Thanks to @johnseekins! * BUGFIX: vmagent: properly update `role: endpoints` and `role: endpointslices` scrape targets if the underlying service changes in `kubernetes_sd_config`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240). * BUGFIX: vmagent: apply `scrape_timeout` on receiving the first response byte from `stream_parse: true` scrape targets. Previously it was applied to receiving and *processing* the full response stream. This could result in false timeout errors when scrape target exposes millions of metrics as described [here](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1017#issuecomment-767235047). +* BUGFIX: vmagent: eliminate possible data race when obtaining value for the metric `vm_persistentqueue_bytes_pending`. The data race could result in incorrect value for this metric. * BUGFIX: vmstorage: remove empty directories on startup. Such directories can be left after unclean shutdown on NFS storage. Previously such directories could lead to crashloop until manually removed. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1142). diff --git a/lib/persistentqueue/fastqueue.go b/lib/persistentqueue/fastqueue.go index 7dda73a86..9c99f2b01 100644 --- a/lib/persistentqueue/fastqueue.go +++ b/lib/persistentqueue/fastqueue.go @@ -1,11 +1,13 @@ package persistentqueue import ( + "fmt" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/metrics" ) // FastQueue is fast persistent queue, which prefers sending data via memory. @@ -47,6 +49,12 @@ func MustOpenFastQueue(path, name string, maxInmemoryBlocks, maxPendingBytes int } fq.cond.L = &fq.mu fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp() + _ = metrics.GetOrCreateGauge(fmt.Sprintf(`vm_persistentqueue_bytes_pending{path=%q}`, path), func() float64 { + fq.mu.Lock() + n := fq.pq.GetPendingBytes() + fq.mu.Unlock() + return float64(n) + }) pendingBytes := fq.GetPendingBytes() logger.Infof("opened fast persistent queue at %q with maxInmemoryBlocks=%d, it contains %d pending bytes", path, maxInmemoryBlocks, pendingBytes) return fq diff --git a/lib/persistentqueue/persistentqueue.go b/lib/persistentqueue/persistentqueue.go index 4c082a9f8..474c559a5 100644 --- a/lib/persistentqueue/persistentqueue.go +++ b/lib/persistentqueue/persistentqueue.go @@ -61,8 +61,6 @@ type queue struct { blocksRead *metrics.Counter bytesRead *metrics.Counter - - bytesPending *metrics.Gauge } // ResetIfEmpty resets q if it is empty. @@ -173,9 +171,6 @@ func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingB q.bytesWritten = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_bytes_written_total{path=%q}`, path)) q.blocksRead = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_blocks_read_total{path=%q}`, path)) q.bytesRead = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_bytes_read_total{path=%q}`, path)) - q.bytesPending = metrics.GetOrCreateGauge(fmt.Sprintf(`vm_persistentqueue_bytes_pending{path=%q}`, path), func() float64 { - return float64(q.GetPendingBytes()) - }) cleanOnError := func() { if q.reader != nil {