mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vmagent: add -remoteWrite.removeDanglingQueues
flag (#4017)
* app/vmagent: add `-remoteWrite.removeDanglingQueues` flag which allows to automatically remove dangling persistent queue contents Related issue: #4014 Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * app/vmagent: address review feedback - remove persistent queues files by default - rename `remoteWrite.removeDanglingQueues` to `remoteWrite.keepDanglingQueues` - update docs to reflect changed behaviour Related issue: #4014 * Apply suggestions from code review --------- Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
parent
9b1e002287
commit
f3a51e8b1d
5 changed files with 48 additions and 2 deletions
|
@ -1518,6 +1518,8 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
|
||||||
Supports array of values separated by comma or specified via multiple flags.
|
Supports array of values separated by comma or specified via multiple flags.
|
||||||
-remoteWrite.relabelConfig string
|
-remoteWrite.relabelConfig string
|
||||||
Optional path to file with relabeling configs, which are applied to all the metrics before sending them to -remoteWrite.url. See also -remoteWrite.urlRelabelConfig. The path can point either to local file or to http url. See https://docs.victoriametrics.com/vmagent.html#relabeling
|
Optional path to file with relabeling configs, which are applied to all the metrics before sending them to -remoteWrite.url. See also -remoteWrite.urlRelabelConfig. The path can point either to local file or to http url. See https://docs.victoriametrics.com/vmagent.html#relabeling
|
||||||
|
-remoteWrite.keepDanglingQueues
|
||||||
|
Keep persistent queues contents at -remoteWrite.tmpDataPath in case there are no matching -remoteWrite.url. Useful when -remoteWrite.url is changed temporarily and persistent queue files will be needed later on.
|
||||||
-remoteWrite.roundDigits array
|
-remoteWrite.roundDigits array
|
||||||
Round metric values to this number of decimal digits after the point before writing them to remote storage. Examples: -remoteWrite.roundDigits=2 would round 1.236 to 1.24, while -remoteWrite.roundDigits=-1 would round 126.78 to 130. By default digits rounding is disabled. Set it to 100 for disabling it for a particular remote storage. This option may be used for improving data compression for the stored metrics
|
Round metric values to this number of decimal digits after the point before writing them to remote storage. Examples: -remoteWrite.roundDigits=2 would round 1.236 to 1.24, while -remoteWrite.roundDigits=-1 would round 126.78 to 130. By default digits rounding is disabled. Set it to 100 for disabling it for a particular remote storage. This option may be used for improving data compression for the stored metrics
|
||||||
Supports array of values separated by comma or specified via multiple flags.
|
Supports array of values separated by comma or specified via multiple flags.
|
||||||
|
|
|
@ -4,17 +4,22 @@ import (
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/cespare/xxhash/v2"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
|
||||||
|
@ -24,7 +29,6 @@ import (
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
|
||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
"github.com/cespare/xxhash/v2"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -36,6 +40,8 @@ var (
|
||||||
"Pass multiple -remoteWrite.multitenantURL flags in order to replicate data to multiple remote storage systems. See also -remoteWrite.url")
|
"Pass multiple -remoteWrite.multitenantURL flags in order to replicate data to multiple remote storage systems. See also -remoteWrite.url")
|
||||||
tmpDataPath = flag.String("remoteWrite.tmpDataPath", "vmagent-remotewrite-data", "Path to directory where temporary data for remote write component is stored. "+
|
tmpDataPath = flag.String("remoteWrite.tmpDataPath", "vmagent-remotewrite-data", "Path to directory where temporary data for remote write component is stored. "+
|
||||||
"See also -remoteWrite.maxDiskUsagePerURL")
|
"See also -remoteWrite.maxDiskUsagePerURL")
|
||||||
|
keepDanglingQueues = flag.Bool("remoteWrite.keepDanglingQueues", false, "Keep persistent queues contents at -remoteWrite.tmpDataPath in case there are no matching -remoteWrite.url. "+
|
||||||
|
"Useful when -remoteWrite.url is changed temporarily and persistent queue files will be needed later on.")
|
||||||
queues = flag.Int("remoteWrite.queues", cgroup.AvailableCPUs()*2, "The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues "+
|
queues = flag.Int("remoteWrite.queues", cgroup.AvailableCPUs()*2, "The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues "+
|
||||||
"isn't enough for sending high volume of collected data to remote storage. Default value is 2 * numberOfAvailableCPUs")
|
"isn't enough for sending high volume of collected data to remote storage. Default value is 2 * numberOfAvailableCPUs")
|
||||||
showRemoteWriteURL = flag.Bool("remoteWrite.showURL", false, "Whether to show -remoteWrite.url in the exported metrics. "+
|
showRemoteWriteURL = flag.Bool("remoteWrite.showURL", false, "Whether to show -remoteWrite.url in the exported metrics. "+
|
||||||
|
@ -94,6 +100,8 @@ var allRelabelConfigs atomic.Value
|
||||||
// since it may lead to high memory usage due to big number of buffers.
|
// since it may lead to high memory usage due to big number of buffers.
|
||||||
var maxQueues = cgroup.AvailableCPUs() * 16
|
var maxQueues = cgroup.AvailableCPUs() * 16
|
||||||
|
|
||||||
|
const persistentQueueDir = "persistent-queue"
|
||||||
|
|
||||||
// InitSecretFlags must be called after flag.Parse and before any logging.
|
// InitSecretFlags must be called after flag.Parse and before any logging.
|
||||||
func InitSecretFlags() {
|
func InitSecretFlags() {
|
||||||
if !*showRemoteWriteURL {
|
if !*showRemoteWriteURL {
|
||||||
|
@ -225,6 +233,34 @@ func newRemoteWriteCtxs(at *auth.Token, urls []string) []*remoteWriteCtx {
|
||||||
}
|
}
|
||||||
rwctxs[i] = newRemoteWriteCtx(i, at, remoteWriteURL, maxInmemoryBlocks, sanitizedURL)
|
rwctxs[i] = newRemoteWriteCtx(i, at, remoteWriteURL, maxInmemoryBlocks, sanitizedURL)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !*keepDanglingQueues {
|
||||||
|
// Remove dangling queues, if any.
|
||||||
|
// This is required for the case when the number of queues has been changed or URL have been changed.
|
||||||
|
// See: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014
|
||||||
|
existingQueues := make(map[string]struct{}, len(rwctxs))
|
||||||
|
for _, rwctx := range rwctxs {
|
||||||
|
existingQueues[rwctx.fq.Dir()] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
queuesDir := filepath.Join(*tmpDataPath, persistentQueueDir)
|
||||||
|
files, err := os.ReadDir(queuesDir)
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatalf("cannot read queues dir %q: %s", queuesDir, err)
|
||||||
|
}
|
||||||
|
removed := 0
|
||||||
|
for _, f := range files {
|
||||||
|
fullPath := filepath.Join(queuesDir, f.Name())
|
||||||
|
if _, ok := existingQueues[fullPath]; !ok {
|
||||||
|
logger.Infof("removing dangling queue %q", fullPath)
|
||||||
|
fs.MustRemoveAll(fullPath)
|
||||||
|
removed++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Infof("removed %d dangling queues from %q, active queues: %d", removed, *tmpDataPath, len(rwctxs))
|
||||||
|
}
|
||||||
|
|
||||||
return rwctxs
|
return rwctxs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -466,7 +502,7 @@ func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxI
|
||||||
pqURL.RawQuery = ""
|
pqURL.RawQuery = ""
|
||||||
pqURL.Fragment = ""
|
pqURL.Fragment = ""
|
||||||
h := xxhash.Sum64([]byte(pqURL.String()))
|
h := xxhash.Sum64([]byte(pqURL.String()))
|
||||||
queuePath := fmt.Sprintf("%s/persistent-queue/%d_%016X", *tmpDataPath, argIdx+1, h)
|
queuePath := fmt.Sprintf("%s/%s/%d_%016X", *tmpDataPath, persistentQueueDir, argIdx+1, h)
|
||||||
maxPendingBytes := maxPendingBytesPerURL.GetOptionalArgOrDefault(argIdx, 0)
|
maxPendingBytes := maxPendingBytesPerURL.GetOptionalArgOrDefault(argIdx, 0)
|
||||||
fq := persistentqueue.MustOpenFastQueue(queuePath, sanitizedURL, maxInmemoryBlocks, maxPendingBytes)
|
fq := persistentqueue.MustOpenFastQueue(queuePath, sanitizedURL, maxInmemoryBlocks, maxPendingBytes)
|
||||||
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 {
|
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 {
|
||||||
|
|
|
@ -32,6 +32,7 @@ created by v1.90.0 or newer versions. The solution is to upgrade to v1.90.0 or n
|
||||||
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): updated usability and the search process in cardinality explorer. Made this process straightforward for user. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3986).
|
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): updated usability and the search process in cardinality explorer. Made this process straightforward for user. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3986).
|
||||||
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): automatically disable progress bar when TTY isn't available. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3823).
|
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): automatically disable progress bar when TTY isn't available. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3823).
|
||||||
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): add `-configCheckInterval` command-line flag, which can be used for automatic re-reading the `-auth.config` file. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3990).
|
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): add `-configCheckInterval` command-line flag, which can be used for automatic re-reading the `-auth.config` file. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3990).
|
||||||
|
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-remoteWrite.keepDanglingQueues` command-line flag. It allows to force `vmagent` to keep dangling queues on the disk when `vmagent` is started. By default, persistent queues without matching `remoteWrite.url` will be deleted. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014).
|
||||||
|
|
||||||
* BUGFIX: prevent from slow [snapshot creating](https://docs.victoriametrics.com/#how-to-work-with-snapshots) under high data ingestion rate. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3551).
|
* BUGFIX: prevent from slow [snapshot creating](https://docs.victoriametrics.com/#how-to-work-with-snapshots) under high data ingestion rate. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3551).
|
||||||
* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth.html): suppress [proxy protocol](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt) parsing errors in case of `EOF`. Usually, the error is caused by health checks and is not a sign of an actual error.
|
* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth.html): suppress [proxy protocol](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt) parsing errors in case of `EOF`. Usually, the error is caused by health checks and is not a sign of an actual error.
|
||||||
|
|
|
@ -1522,6 +1522,8 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
|
||||||
Supports array of values separated by comma or specified via multiple flags.
|
Supports array of values separated by comma or specified via multiple flags.
|
||||||
-remoteWrite.relabelConfig string
|
-remoteWrite.relabelConfig string
|
||||||
Optional path to file with relabeling configs, which are applied to all the metrics before sending them to -remoteWrite.url. See also -remoteWrite.urlRelabelConfig. The path can point either to local file or to http url. See https://docs.victoriametrics.com/vmagent.html#relabeling
|
Optional path to file with relabeling configs, which are applied to all the metrics before sending them to -remoteWrite.url. See also -remoteWrite.urlRelabelConfig. The path can point either to local file or to http url. See https://docs.victoriametrics.com/vmagent.html#relabeling
|
||||||
|
-remoteWrite.keepDanglingQueues
|
||||||
|
Keep persistent queues contents in case there are no matching -remoteWrite.url. Useful when -remoteWrite.url is changed temporarily and persistent queue files will be needed later on.
|
||||||
-remoteWrite.roundDigits array
|
-remoteWrite.roundDigits array
|
||||||
Round metric values to this number of decimal digits after the point before writing them to remote storage. Examples: -remoteWrite.roundDigits=2 would round 1.236 to 1.24, while -remoteWrite.roundDigits=-1 would round 126.78 to 130. By default digits rounding is disabled. Set it to 100 for disabling it for a particular remote storage. This option may be used for improving data compression for the stored metrics
|
Round metric values to this number of decimal digits after the point before writing them to remote storage. Examples: -remoteWrite.roundDigits=2 would round 1.236 to 1.24, while -remoteWrite.roundDigits=-1 would round 126.78 to 130. By default digits rounding is disabled. Set it to 100 for disabling it for a particular remote storage. This option may be used for improving data compression for the stored metrics
|
||||||
Supports array of values separated by comma or specified via multiple flags.
|
Supports array of values separated by comma or specified via multiple flags.
|
||||||
|
|
|
@ -199,3 +199,8 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
|
||||||
fq.cond.Wait()
|
fq.cond.Wait()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Dir returns the directory for persistent queue.
|
||||||
|
func (fq *FastQueue) Dir() string {
|
||||||
|
return fq.pq.dir
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue