mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vmagent/remotewrite: cosmetic updates after f3a51e8b1d
- Compare directory names instead of paths to directory when determining which persistent queues must be deleted This is less error-prone solution, since paths to the same directory can differ, which could lead to accidental directory removal for the existing -remoteWrite.url - Log the `removed %d dangling queues` message when at least a single queue has been removed - Consistently use filepath.Join() for creating paths to persistent queues. This is needed for Windows support (see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/70 ) - Clarify the description of the change at docs/CHANGELOG.md Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014
This commit is contained in:
parent
f3a51e8b1d
commit
aea6df8197
3 changed files with 15 additions and 12 deletions
|
@ -100,7 +100,7 @@ var allRelabelConfigs atomic.Value
|
|||
// since it may lead to high memory usage due to big number of buffers.
|
||||
var maxQueues = cgroup.AvailableCPUs() * 16
|
||||
|
||||
const persistentQueueDir = "persistent-queue"
|
||||
const persistentQueueDirname = "persistent-queue"
|
||||
|
||||
// InitSecretFlags must be called after flag.Parse and before any logging.
|
||||
func InitSecretFlags() {
|
||||
|
@ -240,26 +240,28 @@ func newRemoteWriteCtxs(at *auth.Token, urls []string) []*remoteWriteCtx {
|
|||
// See: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014
|
||||
existingQueues := make(map[string]struct{}, len(rwctxs))
|
||||
for _, rwctx := range rwctxs {
|
||||
existingQueues[rwctx.fq.Dir()] = struct{}{}
|
||||
existingQueues[rwctx.fq.Dirname()] = struct{}{}
|
||||
}
|
||||
|
||||
queuesDir := filepath.Join(*tmpDataPath, persistentQueueDir)
|
||||
queuesDir := filepath.Join(*tmpDataPath, persistentQueueDirname)
|
||||
files, err := os.ReadDir(queuesDir)
|
||||
if err != nil {
|
||||
logger.Fatalf("cannot read queues dir %q: %s", queuesDir, err)
|
||||
}
|
||||
removed := 0
|
||||
for _, f := range files {
|
||||
fullPath := filepath.Join(queuesDir, f.Name())
|
||||
if _, ok := existingQueues[fullPath]; !ok {
|
||||
logger.Infof("removing dangling queue %q", fullPath)
|
||||
dirname := f.Name()
|
||||
if _, ok := existingQueues[dirname]; !ok {
|
||||
logger.Infof("removing dangling queue %q", dirname)
|
||||
fullPath := filepath.Join(queuesDir, dirname)
|
||||
fs.MustRemoveAll(fullPath)
|
||||
removed++
|
||||
}
|
||||
}
|
||||
|
||||
if removed > 0 {
|
||||
logger.Infof("removed %d dangling queues from %q, active queues: %d", removed, *tmpDataPath, len(rwctxs))
|
||||
}
|
||||
}
|
||||
|
||||
return rwctxs
|
||||
}
|
||||
|
@ -502,7 +504,7 @@ func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxI
|
|||
pqURL.RawQuery = ""
|
||||
pqURL.Fragment = ""
|
||||
h := xxhash.Sum64([]byte(pqURL.String()))
|
||||
queuePath := fmt.Sprintf("%s/%s/%d_%016X", *tmpDataPath, persistentQueueDir, argIdx+1, h)
|
||||
queuePath := filepath.Join(*tmpDataPath, persistentQueueDirname, fmt.Sprintf("%d_%016X", argIdx+1, h))
|
||||
maxPendingBytes := maxPendingBytesPerURL.GetOptionalArgOrDefault(argIdx, 0)
|
||||
fq := persistentqueue.MustOpenFastQueue(queuePath, sanitizedURL, maxInmemoryBlocks, maxPendingBytes)
|
||||
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 {
|
||||
|
|
|
@ -25,6 +25,7 @@ created by v1.90.0 or newer versions. The solution is to upgrade to v1.90.0 or n
|
|||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [VictoriaMetrics remote write protocol](https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol) when [sending / receiving data to / from Kafka](https://docs.victoriametrics.com/vmagent.html#kafka-integration). This protocol allows saving egress network bandwidth costs when sending data from `vmagent` to `Kafka` located in another datacenter or availability zone. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1225).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-kafka.consumer.topic.concurrency` command-line flag. It controls the number of Kafka consumer workers to use by `vmagent`. It should eliminate the need to start multiple `vmagent` instances to improve data transfer rate. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1957).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [Kafka producer and consumer](https://docs.victoriametrics.com/vmagent.html#kafka-integration) on `arm64` machines. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2271).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): delete unused buffered data at `-remoteWrite.tmpDataPath` directory when there is no matching `-remoteWrite.url` to send this data to. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014).
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): automatically draw a heatmap graph when the query selects a single [histogram](https://docs.victoriametrics.com/keyConcepts.html#histogram). This simplifies analyzing histograms. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3384).
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add support for drag'n'drop and paste from clipboard in the "Trace analyzer" page. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3971).
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): hide messages longer than 3 lines in the trace. You can view the full message by clicking on the `show more` button. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3971).
|
||||
|
@ -32,7 +33,6 @@ created by v1.90.0 or newer versions. The solution is to upgrade to v1.90.0 or n
|
|||
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): updated usability and the search process in cardinality explorer. Made this process straightforward for user. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3986).
|
||||
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): automatically disable progress bar when TTY isn't available. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3823).
|
||||
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): add `-configCheckInterval` command-line flag, which can be used for automatic re-reading the `-auth.config` file. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3990).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-remoteWrite.keepDanglingQueues` command-line flag. It allows to force `vmagent` to keep dangling queues on the disk when `vmagent` is started. By default, persistent queues without matching `remoteWrite.url` will be deleted. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014).
|
||||
|
||||
* BUGFIX: prevent from slow [snapshot creating](https://docs.victoriametrics.com/#how-to-work-with-snapshots) under high data ingestion rate. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3551).
|
||||
* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth.html): suppress [proxy protocol](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt) parsing errors in case of `EOF`. Usually, the error is caused by health checks and is not a sign of an actual error.
|
||||
|
|
|
@ -2,6 +2,7 @@ package persistentqueue
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
|
@ -201,6 +202,6 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
|
|||
}
|
||||
|
||||
// Dir returns the directory for persistent queue.
|
||||
func (fq *FastQueue) Dir() string {
|
||||
return fq.pq.dir
|
||||
func (fq *FastQueue) Dirname() string {
|
||||
return filepath.Base(fq.pq.dir)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue