Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files

This commit is contained in:
Aliaksandr Valialkin 2023-03-27 20:06:34 -07:00
commit 4280cc281a
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
8 changed files with 106 additions and 18 deletions

View file

@ -1518,6 +1518,8 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
Supports array of values separated by comma or specified via multiple flags.
-remoteWrite.relabelConfig string
Optional path to file with relabeling configs, which are applied to all the metrics before sending them to -remoteWrite.url. See also -remoteWrite.urlRelabelConfig. The path can point either to local file or to http url. See https://docs.victoriametrics.com/vmagent.html#relabeling
-remoteWrite.keepDanglingQueues
Keep persistent queues contents at -remoteWrite.tmpDataPath in case there are no matching -remoteWrite.url. Useful when -remoteWrite.url is changed temporarily and persistent queue files will be needed later on.
-remoteWrite.roundDigits array
Round metric values to this number of decimal digits after the point before writing them to remote storage. Examples: -remoteWrite.roundDigits=2 would round 1.236 to 1.24, while -remoteWrite.roundDigits=-1 would round 126.78 to 130. By default digits rounding is disabled. Set it to 100 for disabling it for a particular remote storage. This option may be used for improving data compression for the stored metrics
Supports array of values separated by comma or specified via multiple flags.

View file

@ -4,17 +4,22 @@ import (
"flag"
"fmt"
"net/url"
"os"
"path/filepath"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
@ -24,7 +29,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics"
"github.com/cespare/xxhash/v2"
)
var (
@ -36,6 +40,8 @@ var (
"Pass multiple -remoteWrite.multitenantURL flags in order to replicate data to multiple remote storage systems. See also -remoteWrite.url")
tmpDataPath = flag.String("remoteWrite.tmpDataPath", "vmagent-remotewrite-data", "Path to directory where temporary data for remote write component is stored. "+
"See also -remoteWrite.maxDiskUsagePerURL")
keepDanglingQueues = flag.Bool("remoteWrite.keepDanglingQueues", false, "Keep persistent queues contents at -remoteWrite.tmpDataPath in case there are no matching -remoteWrite.url. "+
"Useful when -remoteWrite.url is changed temporarily and persistent queue files will be needed later on.")
queues = flag.Int("remoteWrite.queues", cgroup.AvailableCPUs()*2, "The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues "+
"isn't enough for sending high volume of collected data to remote storage. Default value is 2 * numberOfAvailableCPUs")
showRemoteWriteURL = flag.Bool("remoteWrite.showURL", false, "Whether to show -remoteWrite.url in the exported metrics. "+
@ -94,6 +100,8 @@ var allRelabelConfigs atomic.Value
// since it may lead to high memory usage due to big number of buffers.
var maxQueues = cgroup.AvailableCPUs() * 16
const persistentQueueDirname = "persistent-queue"
// InitSecretFlags must be called after flag.Parse and before any logging.
func InitSecretFlags() {
if !*showRemoteWriteURL {
@ -225,6 +233,36 @@ func newRemoteWriteCtxs(at *auth.Token, urls []string) []*remoteWriteCtx {
}
rwctxs[i] = newRemoteWriteCtx(i, at, remoteWriteURL, maxInmemoryBlocks, sanitizedURL)
}
if !*keepDanglingQueues {
// Remove dangling queues, if any.
// This is required for the case when the number of queues has been changed or URL have been changed.
// See: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014
existingQueues := make(map[string]struct{}, len(rwctxs))
for _, rwctx := range rwctxs {
existingQueues[rwctx.fq.Dirname()] = struct{}{}
}
queuesDir := filepath.Join(*tmpDataPath, persistentQueueDirname)
files, err := os.ReadDir(queuesDir)
if err != nil {
logger.Fatalf("cannot read queues dir %q: %s", queuesDir, err)
}
removed := 0
for _, f := range files {
dirname := f.Name()
if _, ok := existingQueues[dirname]; !ok {
logger.Infof("removing dangling queue %q", dirname)
fullPath := filepath.Join(queuesDir, dirname)
fs.MustRemoveAll(fullPath)
removed++
}
}
if removed > 0 {
logger.Infof("removed %d dangling queues from %q, active queues: %d", removed, *tmpDataPath, len(rwctxs))
}
}
return rwctxs
}
@ -466,7 +504,7 @@ func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxI
pqURL.RawQuery = ""
pqURL.Fragment = ""
h := xxhash.Sum64([]byte(pqURL.String()))
queuePath := fmt.Sprintf("%s/persistent-queue/%d_%016X", *tmpDataPath, argIdx+1, h)
queuePath := filepath.Join(*tmpDataPath, persistentQueueDirname, fmt.Sprintf("%d_%016X", argIdx+1, h))
maxPendingBytes := maxPendingBytesPerURL.GetOptionalArgOrDefault(argIdx, 0)
fq := persistentqueue.MustOpenFastQueue(queuePath, sanitizedURL, maxInmemoryBlocks, maxPendingBytes)
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 {

View file

@ -179,14 +179,12 @@ func (tbf *tmpBlocksFile) MustClose() {
}
fname := tbf.f.Name()
// Remove the file at first, then close it.
// This way the OS shouldn't try to flush file contents to storage
// on close.
if err := os.Remove(fname); err != nil {
logger.Panicf("FATAL: cannot remove %q: %s", fname, err)
}
if err := tbf.f.Close(); err != nil {
logger.Panicf("FATAL: cannot close %q: %s", fname, err)
}
// We cannot remove unclosed at non-posix filesystems, like windows
if err := os.Remove(fname); err != nil {
logger.Panicf("FATAL: cannot remove %q: %s", fname, err)
}
tbf.f = nil
}

View file

@ -556,18 +556,28 @@ func vmrangeBucketsToLE(tss []*timeseries) []*timeseries {
for _, xs := range xss {
ts := xs.ts
if isZeroTS(ts) {
// Skip time series with zeros. They are substituted by xssNew below.
xsPrev = xs
// Skip buckets with zero values - they will be merged into a single bucket
// when the next non-zero bucket appears.
// Do not store xs in xsPrev in order to properly create `le` time series
// for zero buckets.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4021
continue
}
if xs.start != xsPrev.end && uniqTs[xs.startStr] == nil {
uniqTs[xs.startStr] = xs.ts
xssNew = append(xssNew, x{
endStr: xs.startStr,
end: xs.start,
ts: copyTS(ts, xs.startStr),
})
if xs.start != xsPrev.end {
// There is a gap between the previous bucket and the current bucket
// or the previous bucket is skipped because it was zero.
// Fill it with a time series with le=xs.start.
if uniqTs[xs.startStr] == nil {
uniqTs[xs.startStr] = xs.ts
xssNew = append(xssNew, x{
endStr: xs.startStr,
end: xs.start,
ts: copyTS(ts, xs.startStr),
})
}
}
// Convert the current time series to a time series with le=xs.end
ts.MetricName.AddTag("le", xs.endStr)
prevTs := uniqTs[xs.endStr]
if prevTs != nil {
@ -579,7 +589,7 @@ func vmrangeBucketsToLE(tss []*timeseries) []*timeseries {
}
xsPrev = xs
}
if !math.IsInf(xsPrev.end, 1) && !isZeroTS(xsPrev.ts) {
if xsPrev.ts != nil && !math.IsInf(xsPrev.end, 1) && !isZeroTS(xsPrev.ts) {
xssNew = append(xssNew, x{
endStr: "+Inf",
end: math.Inf(1),

View file

@ -78,6 +78,36 @@ foo{le="+Inf"} 1.23 456`,
foo{le="+Inf"} 5.3 0`,
)
// Adjacent empty vmrange bucket
f(
`foo{vmrange="7.743e+05...8.799e+05"} 5 123
foo{vmrange="6.813e+05...7.743e+05"} 0 123`,
`foo{le="7.743e+05"} 0 123
foo{le="8.799e+05"} 5 123
foo{le="+Inf"} 5 123`,
)
// Multiple adjacent empty vmrange bucket
f(
`foo{vmrange="7.743e+05...8.799e+05"} 5 123
foo{vmrange="6.813e+05...7.743e+05"} 0 123
foo{vmrange="5.813e+05...6.813e+05"} 0 123
`,
`foo{le="7.743e+05"} 0 123
foo{le="8.799e+05"} 5 123
foo{le="+Inf"} 5 123`,
)
f(
`foo{vmrange="8.799e+05...9.813e+05"} 0 123
foo{vmrange="7.743e+05...8.799e+05"} 5 123
foo{vmrange="6.813e+05...7.743e+05"} 0 123
foo{vmrange="5.813e+05...6.813e+05"} 0 123
`,
`foo{le="7.743e+05"} 0 123
foo{le="8.799e+05"} 5 123
foo{le="+Inf"} 5 123`,
)
// Multiple non-empty vmrange buckets
f(
`foo{vmrange="4.084e+02...4.642e+02"} 2 123

View file

@ -25,6 +25,7 @@ created by v1.90.0 or newer versions. The solution is to upgrade to v1.90.0 or n
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [VictoriaMetrics remote write protocol](https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol) when [sending / receiving data to / from Kafka](https://docs.victoriametrics.com/vmagent.html#kafka-integration). This protocol allows saving egress network bandwidth costs when sending data from `vmagent` to `Kafka` located in another datacenter or availability zone. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1225).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-kafka.consumer.topic.concurrency` command-line flag. It controls the number of Kafka consumer workers to use by `vmagent`. It should eliminate the need to start multiple `vmagent` instances to improve data transfer rate. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1957).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [Kafka producer and consumer](https://docs.victoriametrics.com/vmagent.html#kafka-integration) on `arm64` machines. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2271).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): delete unused buffered data at `-remoteWrite.tmpDataPath` directory when there is no matching `-remoteWrite.url` to send this data to. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): automatically draw a heatmap graph when the query selects a single [histogram](https://docs.victoriametrics.com/keyConcepts.html#histogram). This simplifies analyzing histograms. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3384).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add support for drag'n'drop and paste from clipboard in the "Trace analyzer" page. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3971).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): hide messages longer than 3 lines in the trace. You can view the full message by clicking on the `show more` button. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3971).
@ -39,6 +40,7 @@ created by v1.90.0 or newer versions. The solution is to upgrade to v1.90.0 or n
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): fix snapshot not being deleted in case of error during backup. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2055).
* BUGFIX: allow using dashes and dots in environment variables names referred in config files via `%{ENV-VAR.SYNTAX}`. See [these docs](https://docs.victoriametrics.com/#environment-variables) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3999).
* BUGFIX: return back query performance scalability on hosts with big number of CPU cores. The scalability has been reduced in [v1.86.0](https://docs.victoriametrics.com/CHANGELOG.html#v1860). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3966).
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly convert [VictoriaMetrics historgram buckets](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350) to Prometheus histogram buckets when VictoriaMetrics histogram contain zero buckets. Previously these buckets were ignored, and this could lead to missing Prometheus histogram buckets after the conversion. Thanks to @zklapow for [the fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4021).
## [v1.89.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.89.1)

View file

@ -1522,6 +1522,8 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
Supports array of values separated by comma or specified via multiple flags.
-remoteWrite.relabelConfig string
Optional path to file with relabeling configs, which are applied to all the metrics before sending them to -remoteWrite.url. See also -remoteWrite.urlRelabelConfig. The path can point either to local file or to http url. See https://docs.victoriametrics.com/vmagent.html#relabeling
-remoteWrite.keepDanglingQueues
Keep persistent queues contents in case there are no matching -remoteWrite.url. Useful when -remoteWrite.url is changed temporarily and persistent queue files will be needed later on.
-remoteWrite.roundDigits array
Round metric values to this number of decimal digits after the point before writing them to remote storage. Examples: -remoteWrite.roundDigits=2 would round 1.236 to 1.24, while -remoteWrite.roundDigits=-1 would round 126.78 to 130. By default digits rounding is disabled. Set it to 100 for disabling it for a particular remote storage. This option may be used for improving data compression for the stored metrics
Supports array of values separated by comma or specified via multiple flags.

View file

@ -2,6 +2,7 @@ package persistentqueue
import (
"fmt"
"path/filepath"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@ -199,3 +200,8 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
fq.cond.Wait()
}
}
// Dirname returns the directory name for persistent queue.
func (fq *FastQueue) Dirname() string {
return filepath.Base(fq.pq.dir)
}