From b688960db0e5ac740900b08f89b0365176842a67 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 17 Nov 2021 16:41:35 +0200 Subject: [PATCH] lib/persistentqueue: add vm_persistentqueue_read_duration_seconds_total and vm_persistentqueue_write_duration_seconds_total metrics for determining disk usage saturation at vmagent --- docs/CHANGELOG.md | 1 + lib/persistentqueue/persistentqueue.go | 15 +++++++++++++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index e6af31900..e25bfc5b8 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -8,6 +8,7 @@ sort: 15 * FEATURE: add `now()` function to MetricsQL. This function returns the current timestamp in seconds. See [these docs](https://docs.victoriametrics.com/MetricsQL.html#now). * FEATURE: vmauth: allow using optional `name` field in configs. This field is then used as `username` label value for `vmauth_user_requests_total` metric. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1805). +* FEATURE: vmagent: export `vm_persistentqueue_read_duration_seconds_total` and `vm_persistentqueue_write_duration_seconds_total` metrics, which can be used for detecting persistent queue saturation with `rate(vm_persistentqueue_write_duration_seconds_total) > 0.9s` alerting rule. * FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html), [vmrestore](https://docs.victoriametrics.com/vmrestore.html): add `-s3ForcePathStyle` command-line flag, which can be used for making backups to [Aliyun OSS](https://www.aliyun.com/product/oss). See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1802). * BUGFIX: vmstorage [enterprise](https://victoriametrics.com/enterprise.html): added missing `vm_tenant_used_tenant_bytes` metric, which shows the approximate per-tenant disk usage. See [these docs](https://docs.victoriametrics.com/PerTenantStatistic.html) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1605). diff --git a/lib/persistentqueue/persistentqueue.go b/lib/persistentqueue/persistentqueue.go index 8d8212bb4..7af7933ce 100644 --- a/lib/persistentqueue/persistentqueue.go +++ b/lib/persistentqueue/persistentqueue.go @@ -8,6 +8,7 @@ import ( "os" "regexp" "strconv" + "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" @@ -363,8 +364,6 @@ func (q *queue) metainfoPath() string { // MustWriteBlock writes block to q. // // The block size cannot exceed MaxBlockSize. -// -// It is safe calling this function from concurrent goroutines. func (q *queue) MustWriteBlock(block []byte) { if uint64(len(block)) > q.maxBlockSize { logger.Panicf("BUG: too big block to send: %d bytes; it mustn't exceed %d bytes", len(block), q.maxBlockSize) @@ -408,6 +407,10 @@ func (q *queue) MustWriteBlock(block []byte) { var blockBufPool bytesutil.ByteBufferPool func (q *queue) writeBlock(block []byte) error { + startTime := time.Now() + defer func() { + writeDurationSeconds.Add(time.Since(startTime).Seconds()) + }() if q.writerLocalOffset+q.maxBlockSize+8 > q.chunkFileSize { if err := q.nextChunkFileForWrite(); err != nil { return fmt.Errorf("cannot create next chunk file: %w", err) @@ -433,6 +436,8 @@ func (q *queue) writeBlock(block []byte) error { return q.flushWriterMetainfoIfNeeded() } +var writeDurationSeconds = metrics.NewFloatCounter(`vm_persistentqueue_write_duration_seconds_total`) + func (q *queue) nextChunkFileForWrite() error { // Finalize the current chunk and start new one. q.writer.MustClose() @@ -478,6 +483,10 @@ func (q *queue) MustReadBlockNonblocking(dst []byte) ([]byte, bool) { } func (q *queue) readBlock(dst []byte) ([]byte, error) { + startTime := time.Now() + defer func() { + readDurationSeconds.Add(time.Since(startTime).Seconds()) + }() if q.readerLocalOffset+q.maxBlockSize+8 > q.chunkFileSize { if err := q.nextChunkFileForRead(); err != nil { return dst, fmt.Errorf("cannot open next chunk file: %w", err) @@ -524,6 +533,8 @@ again: return dst, nil } +var readDurationSeconds = metrics.NewFloatCounter(`vm_persistentqueue_read_duration_seconds_total`) + func (q *queue) skipBrokenChunkFile() error { // Try to recover from broken chunk file by skipping it. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1030