mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-01 14:47:38 +00:00
Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files
This commit is contained in:
commit
b00fcad604
15 changed files with 127 additions and 88 deletions
|
@ -1,7 +1,8 @@
|
||||||
run:
|
run:
|
||||||
timeout: 2m
|
timeout: 2m
|
||||||
|
|
||||||
enable:
|
linters:
|
||||||
|
enable:
|
||||||
- revive
|
- revive
|
||||||
|
|
||||||
issues:
|
issues:
|
||||||
|
@ -9,6 +10,9 @@ issues:
|
||||||
- linters:
|
- linters:
|
||||||
- staticcheck
|
- staticcheck
|
||||||
text: "SA(4003|1019|5011):"
|
text: "SA(4003|1019|5011):"
|
||||||
|
include:
|
||||||
|
- EXC0012
|
||||||
|
- EXC0014
|
||||||
|
|
||||||
linters-settings:
|
linters-settings:
|
||||||
errcheck:
|
errcheck:
|
||||||
|
|
|
@ -193,9 +193,11 @@ VictoriaMetrics remote write protocol provides the following benefits comparing
|
||||||
In this case `vmagent` buffers the incoming data to disk using the VictoriaMetrics remote write format.
|
In this case `vmagent` buffers the incoming data to disk using the VictoriaMetrics remote write format.
|
||||||
This reduces disk read/write IO and disk space usage by 2x-5x comparing to Prometheus remote write format.
|
This reduces disk read/write IO and disk space usage by 2x-5x comparing to Prometheus remote write format.
|
||||||
|
|
||||||
`vmagent` automatically uses VictoriaMetrics remote write protocol when it sends data to VictoriaMetrics components such as other `vmagent` instances,
|
`vmagent` automatically switches to VictoriaMetrics remote write protocol when it sends data to VictoriaMetrics components such as other `vmagent` instances,
|
||||||
[single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html)
|
[single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html)
|
||||||
or `vminsert` at [cluster version](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html).
|
or `vminsert` at [cluster version](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html).
|
||||||
|
It is possible to force switch to VictoriaMetrics remote write protocol by specifying `-remoteWrite.forceVMProto`
|
||||||
|
command-line flag for the corresponding `-remoteWrite.url`.
|
||||||
|
|
||||||
`vmagent` automatically switches to Prometheus remote write protocol when it sends data to old versions of VictoriaMetrics components
|
`vmagent` automatically switches to Prometheus remote write protocol when it sends data to old versions of VictoriaMetrics components
|
||||||
or to other Prometheus-compatible remote storage systems. It is possible to force switch to Prometheus remote write protocol
|
or to other Prometheus-compatible remote storage systems. It is possible to force switch to Prometheus remote write protocol
|
||||||
|
@ -1451,6 +1453,9 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
|
||||||
-remoteWrite.forcePromProto array
|
-remoteWrite.forcePromProto array
|
||||||
Whether to force Prometheus remote write protocol for sending data to the corresponding -remoteWrite.url . See https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol
|
Whether to force Prometheus remote write protocol for sending data to the corresponding -remoteWrite.url . See https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol
|
||||||
Supports array of values separated by comma or specified via multiple flags.
|
Supports array of values separated by comma or specified via multiple flags.
|
||||||
|
-remoteWrite.forceVMProto array
|
||||||
|
Whether to force VictoriaMetrics remote write protocol for sending data to the corresponding -remoteWrite.url . See https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol
|
||||||
|
Supports array of values separated by comma or specified via multiple flags.
|
||||||
-remoteWrite.headers array
|
-remoteWrite.headers array
|
||||||
Optional HTTP headers to send with each request to the corresponding -remoteWrite.url. For example, -remoteWrite.headers='My-Auth:foobar' would send 'My-Auth: foobar' HTTP header with every request to the corresponding -remoteWrite.url. Multiple headers must be delimited by '^^': -remoteWrite.headers='header1:value1^^header2:value2'
|
Optional HTTP headers to send with each request to the corresponding -remoteWrite.url. For example, -remoteWrite.headers='My-Auth:foobar' would send 'My-Auth: foobar' HTTP header with every request to the corresponding -remoteWrite.url. Multiple headers must be delimited by '^^': -remoteWrite.headers='header1:value1^^header2:value2'
|
||||||
Supports an array of values separated by comma or specified via multiple flags.
|
Supports an array of values separated by comma or specified via multiple flags.
|
||||||
|
|
|
@ -15,11 +15,17 @@ import (
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
forcePromProto = flagutil.NewArrayBool("remoteWrite.forcePromProto", "Whether to force Prometheus remote write protocol for sending data "+
|
||||||
|
"to the corresponding -remoteWrite.url . See https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol")
|
||||||
|
forceVMProto = flagutil.NewArrayBool("remoteWrite.forceVMProto", "Whether to force VictoriaMetrics remote write protocol for sending data "+
|
||||||
|
"to the corresponding -remoteWrite.url . See https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol")
|
||||||
|
|
||||||
rateLimit = flagutil.NewArrayInt("remoteWrite.rateLimit", "Optional rate limit in bytes per second for data sent to the corresponding -remoteWrite.url. "+
|
rateLimit = flagutil.NewArrayInt("remoteWrite.rateLimit", "Optional rate limit in bytes per second for data sent to the corresponding -remoteWrite.url. "+
|
||||||
"By default the rate limit is disabled. It can be useful for limiting load on remote storage when big amounts of buffered data "+
|
"By default the rate limit is disabled. It can be useful for limiting load on remote storage when big amounts of buffered data "+
|
||||||
"is sent after temporary unavailability of the remote storage")
|
"is sent after temporary unavailability of the remote storage")
|
||||||
|
@ -69,7 +75,10 @@ var (
|
||||||
type client struct {
|
type client struct {
|
||||||
sanitizedURL string
|
sanitizedURL string
|
||||||
remoteWriteURL string
|
remoteWriteURL string
|
||||||
isVMRemoteWrite bool
|
|
||||||
|
// Whether to use VictoriaMetrics remote write protocol for sending the data to remoteWriteURL
|
||||||
|
useVMProto bool
|
||||||
|
|
||||||
fq *persistentqueue.FastQueue
|
fq *persistentqueue.FastQueue
|
||||||
hc *http.Client
|
hc *http.Client
|
||||||
|
|
||||||
|
@ -93,7 +102,7 @@ type client struct {
|
||||||
stopCh chan struct{}
|
stopCh chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persistentqueue.FastQueue, concurrency int, isVMRemoteWrite bool) *client {
|
func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persistentqueue.FastQueue, concurrency int) *client {
|
||||||
authCfg, err := getAuthConfig(argIdx)
|
authCfg, err := getAuthConfig(argIdx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Panicf("FATAL: cannot initialize auth config for remoteWrite.url=%q: %s", remoteWriteURL, err)
|
logger.Panicf("FATAL: cannot initialize auth config for remoteWrite.url=%q: %s", remoteWriteURL, err)
|
||||||
|
@ -130,7 +139,6 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste
|
||||||
c := &client{
|
c := &client{
|
||||||
sanitizedURL: sanitizedURL,
|
sanitizedURL: sanitizedURL,
|
||||||
remoteWriteURL: remoteWriteURL,
|
remoteWriteURL: remoteWriteURL,
|
||||||
isVMRemoteWrite: isVMRemoteWrite,
|
|
||||||
authCfg: authCfg,
|
authCfg: authCfg,
|
||||||
awsCfg: awsCfg,
|
awsCfg: awsCfg,
|
||||||
fq: fq,
|
fq: fq,
|
||||||
|
@ -138,10 +146,29 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste
|
||||||
stopCh: make(chan struct{}),
|
stopCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
c.sendBlock = c.sendBlockHTTP
|
c.sendBlock = c.sendBlockHTTP
|
||||||
|
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *client) init(argIdx, concurrency int, sanitizedURL string) {
|
func (c *client) init(argIdx, concurrency int, sanitizedURL string) {
|
||||||
|
useVMProto := forceVMProto.GetOptionalArg(argIdx)
|
||||||
|
usePromProto := forcePromProto.GetOptionalArg(argIdx)
|
||||||
|
if useVMProto && usePromProto {
|
||||||
|
logger.Fatalf("-remoteWrite.useVMProto and -remoteWrite.usePromProto cannot be set simultaneously for -remoteWrite.url=%s", sanitizedURL)
|
||||||
|
}
|
||||||
|
if !useVMProto && !usePromProto {
|
||||||
|
// Auto-detect whether the remote storage supports VictoriaMetrics remote write protocol.
|
||||||
|
doRequest := func(url string) (*http.Response, error) {
|
||||||
|
return c.doRequest(url, nil)
|
||||||
|
}
|
||||||
|
useVMProto = common.HandleVMProtoClientHandshake(c.remoteWriteURL, doRequest)
|
||||||
|
if !useVMProto {
|
||||||
|
logger.Infof("the remote storage at %q doesn't support VictoriaMetrics remote write protocol. Switching to Prometheus remote write protocol. "+
|
||||||
|
"See https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol", sanitizedURL)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
c.useVMProto = useVMProto
|
||||||
|
|
||||||
if bytesPerSec := rateLimit.GetOptionalArgOrDefault(argIdx, 0); bytesPerSec > 0 {
|
if bytesPerSec := rateLimit.GetOptionalArgOrDefault(argIdx, 0); bytesPerSec > 0 {
|
||||||
logger.Infof("applying %d bytes per second rate limit for -remoteWrite.url=%q", bytesPerSec, sanitizedURL)
|
logger.Infof("applying %d bytes per second rate limit for -remoteWrite.url=%q", bytesPerSec, sanitizedURL)
|
||||||
c.rl.perSecondLimit = int64(bytesPerSec)
|
c.rl.perSecondLimit = int64(bytesPerSec)
|
||||||
|
@ -294,6 +321,33 @@ func (c *client) runWorker() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *client) doRequest(url string, body []byte) (*http.Response, error) {
|
||||||
|
reqBody := bytes.NewBuffer(body)
|
||||||
|
req, err := http.NewRequest(http.MethodPost, url, reqBody)
|
||||||
|
if err != nil {
|
||||||
|
logger.Panicf("BUG: unexpected error from http.NewRequest(%q): %s", url, err)
|
||||||
|
}
|
||||||
|
c.authCfg.SetHeaders(req, true)
|
||||||
|
h := req.Header
|
||||||
|
h.Set("User-Agent", "vmagent")
|
||||||
|
h.Set("Content-Type", "application/x-protobuf")
|
||||||
|
if c.useVMProto {
|
||||||
|
h.Set("Content-Encoding", "zstd")
|
||||||
|
h.Set("X-VictoriaMetrics-Remote-Write-Version", "1")
|
||||||
|
} else {
|
||||||
|
h.Set("Content-Encoding", "snappy")
|
||||||
|
h.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
|
||||||
|
}
|
||||||
|
if c.awsCfg != nil {
|
||||||
|
sigv4Hash := awsapi.HashHex(body)
|
||||||
|
if err := c.awsCfg.SignRequest(req, sigv4Hash); err != nil {
|
||||||
|
// there is no need in retry, request will be rejected by client.Do and retried by code below
|
||||||
|
logger.Warnf("cannot sign remoteWrite request with AWS sigv4: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return c.hc.Do(req)
|
||||||
|
}
|
||||||
|
|
||||||
// sendBlockHTTP sends the given block to c.remoteWriteURL.
|
// sendBlockHTTP sends the given block to c.remoteWriteURL.
|
||||||
//
|
//
|
||||||
// The function returns false only if c.stopCh is closed.
|
// The function returns false only if c.stopCh is closed.
|
||||||
|
@ -302,37 +356,10 @@ func (c *client) sendBlockHTTP(block []byte) bool {
|
||||||
c.rl.register(len(block), c.stopCh)
|
c.rl.register(len(block), c.stopCh)
|
||||||
retryDuration := time.Second
|
retryDuration := time.Second
|
||||||
retriesCount := 0
|
retriesCount := 0
|
||||||
c.bytesSent.Add(len(block))
|
|
||||||
c.blocksSent.Inc()
|
|
||||||
sigv4Hash := ""
|
|
||||||
if c.awsCfg != nil {
|
|
||||||
sigv4Hash = awsapi.HashHex(block)
|
|
||||||
}
|
|
||||||
|
|
||||||
again:
|
again:
|
||||||
req, err := http.NewRequest(http.MethodPost, c.remoteWriteURL, bytes.NewBuffer(block))
|
|
||||||
if err != nil {
|
|
||||||
logger.Panicf("BUG: unexpected error from http.NewRequest(%q): %s", c.sanitizedURL, err)
|
|
||||||
}
|
|
||||||
c.authCfg.SetHeaders(req, true)
|
|
||||||
h := req.Header
|
|
||||||
h.Set("User-Agent", "vmagent")
|
|
||||||
h.Set("Content-Type", "application/x-protobuf")
|
|
||||||
if c.isVMRemoteWrite {
|
|
||||||
h.Set("Content-Encoding", "zstd")
|
|
||||||
h.Set("X-VictoriaMetrics-Remote-Write-Version", "1")
|
|
||||||
} else {
|
|
||||||
h.Set("Content-Encoding", "snappy")
|
|
||||||
h.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
|
|
||||||
}
|
|
||||||
if c.awsCfg != nil {
|
|
||||||
if err := c.awsCfg.SignRequest(req, sigv4Hash); err != nil {
|
|
||||||
// there is no need in retry, request will be rejected by client.Do and retried by code below
|
|
||||||
logger.Warnf("cannot sign remoteWrite request with AWS sigv4: %s", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
resp, err := c.hc.Do(req)
|
resp, err := c.doRequest(c.remoteWriteURL, block)
|
||||||
c.requestDuration.UpdateDuration(startTime)
|
c.requestDuration.UpdateDuration(startTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.errorsCount.Inc()
|
c.errorsCount.Inc()
|
||||||
|
@ -357,6 +384,8 @@ again:
|
||||||
if statusCode/100 == 2 {
|
if statusCode/100 == 2 {
|
||||||
_ = resp.Body.Close()
|
_ = resp.Body.Close()
|
||||||
c.requestsOKCount.Inc()
|
c.requestsOKCount.Inc()
|
||||||
|
c.bytesSent.Add(len(block))
|
||||||
|
c.blocksSent.Inc()
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_requests_total{url=%q, status_code="%d"}`, c.sanitizedURL, statusCode)).Inc()
|
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_requests_total{url=%q, status_code="%d"}`, c.sanitizedURL, statusCode)).Inc()
|
||||||
|
|
|
@ -21,7 +21,6 @@ import (
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
|
||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
|
@ -35,8 +34,6 @@ var (
|
||||||
remoteWriteMultitenantURLs = flagutil.NewArrayString("remoteWrite.multitenantURL", "Base path for multitenant remote storage URL to write data to. "+
|
remoteWriteMultitenantURLs = flagutil.NewArrayString("remoteWrite.multitenantURL", "Base path for multitenant remote storage URL to write data to. "+
|
||||||
"See https://docs.victoriametrics.com/vmagent.html#multitenancy for details. Example url: http://<vminsert>:8480 . "+
|
"See https://docs.victoriametrics.com/vmagent.html#multitenancy for details. Example url: http://<vminsert>:8480 . "+
|
||||||
"Pass multiple -remoteWrite.multitenantURL flags in order to replicate data to multiple remote storage systems. See also -remoteWrite.url")
|
"Pass multiple -remoteWrite.multitenantURL flags in order to replicate data to multiple remote storage systems. See also -remoteWrite.url")
|
||||||
forcePromProto = flagutil.NewArrayBool("remoteWrite.forcePromProto", "Whether to force Prometheus remote write protocol for sending data "+
|
|
||||||
"to the corresponding -remoteWrite.url . See https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol")
|
|
||||||
tmpDataPath = flag.String("remoteWrite.tmpDataPath", "vmagent-remotewrite-data", "Path to directory where temporary data for remote write component is stored. "+
|
tmpDataPath = flag.String("remoteWrite.tmpDataPath", "vmagent-remotewrite-data", "Path to directory where temporary data for remote write component is stored. "+
|
||||||
"See also -remoteWrite.maxDiskUsagePerURL")
|
"See also -remoteWrite.maxDiskUsagePerURL")
|
||||||
queues = flag.Int("remoteWrite.queues", cgroup.AvailableCPUs()*2, "The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues "+
|
queues = flag.Int("remoteWrite.queues", cgroup.AvailableCPUs()*2, "The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues "+
|
||||||
|
@ -479,21 +476,10 @@ func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxI
|
||||||
return float64(fq.GetInmemoryQueueLen())
|
return float64(fq.GetInmemoryQueueLen())
|
||||||
})
|
})
|
||||||
|
|
||||||
// Auto-detect whether the remote storage supports VictoriaMetrics remote write protocol.
|
|
||||||
isVMRemoteWrite := false
|
|
||||||
usePromProto := forcePromProto.GetOptionalArg(argIdx)
|
|
||||||
if !usePromProto {
|
|
||||||
isVMRemoteWrite = common.HandleVMProtoClientHandshake(remoteWriteURL)
|
|
||||||
if !isVMRemoteWrite {
|
|
||||||
logger.Infof("the remote storage at %q doesn't support VictoriaMetrics remote write protocol. Switching to Prometheus remote write protocol. "+
|
|
||||||
"See https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol", sanitizedURL)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var c *client
|
var c *client
|
||||||
switch remoteWriteURL.Scheme {
|
switch remoteWriteURL.Scheme {
|
||||||
case "http", "https":
|
case "http", "https":
|
||||||
c = newHTTPClient(argIdx, remoteWriteURL.String(), sanitizedURL, fq, *queues, isVMRemoteWrite)
|
c = newHTTPClient(argIdx, remoteWriteURL.String(), sanitizedURL, fq, *queues)
|
||||||
default:
|
default:
|
||||||
logger.Fatalf("unsupported scheme: %s for remoteWriteURL: %s, want `http`, `https`", remoteWriteURL.Scheme, sanitizedURL)
|
logger.Fatalf("unsupported scheme: %s for remoteWriteURL: %s, want `http`, `https`", remoteWriteURL.Scheme, sanitizedURL)
|
||||||
}
|
}
|
||||||
|
@ -510,7 +496,7 @@ func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxI
|
||||||
}
|
}
|
||||||
pss := make([]*pendingSeries, pssLen)
|
pss := make([]*pendingSeries, pssLen)
|
||||||
for i := range pss {
|
for i := range pss {
|
||||||
pss[i] = newPendingSeries(fq.MustWriteBlock, isVMRemoteWrite, sf, rd)
|
pss[i] = newPendingSeries(fq.MustWriteBlock, c.useVMProto, sf, rd)
|
||||||
}
|
}
|
||||||
|
|
||||||
rwctx := &remoteWriteCtx{
|
rwctx := &remoteWriteCtx{
|
||||||
|
|
|
@ -51,27 +51,27 @@ func main() {
|
||||||
|
|
||||||
if len(*snapshotCreateURL) > 0 {
|
if len(*snapshotCreateURL) > 0 {
|
||||||
// create net/url object
|
// create net/url object
|
||||||
createUrl, err := url.Parse(*snapshotCreateURL)
|
createURL, err := url.Parse(*snapshotCreateURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Fatalf("cannot parse snapshotCreateURL: %s", err)
|
logger.Fatalf("cannot parse snapshotCreateURL: %s", err)
|
||||||
}
|
}
|
||||||
if len(*snapshotName) > 0 {
|
if len(*snapshotName) > 0 {
|
||||||
logger.Fatalf("-snapshotName shouldn't be set if -snapshot.createURL is set, since snapshots are created automatically in this case")
|
logger.Fatalf("-snapshotName shouldn't be set if -snapshot.createURL is set, since snapshots are created automatically in this case")
|
||||||
}
|
}
|
||||||
logger.Infof("Snapshot create url %s", createUrl.Redacted())
|
logger.Infof("Snapshot create url %s", createURL.Redacted())
|
||||||
if len(*snapshotDeleteURL) <= 0 {
|
if len(*snapshotDeleteURL) <= 0 {
|
||||||
err := flag.Set("snapshot.deleteURL", strings.Replace(*snapshotCreateURL, "/create", "/delete", 1))
|
err := flag.Set("snapshot.deleteURL", strings.Replace(*snapshotCreateURL, "/create", "/delete", 1))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Fatalf("Failed to set snapshot.deleteURL flag: %v", err)
|
logger.Fatalf("Failed to set snapshot.deleteURL flag: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
deleteUrl, err := url.Parse(*snapshotDeleteURL)
|
deleteURL, err := url.Parse(*snapshotDeleteURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Fatalf("cannot parse snapshotDeleteURL: %s", err)
|
logger.Fatalf("cannot parse snapshotDeleteURL: %s", err)
|
||||||
}
|
}
|
||||||
logger.Infof("Snapshot delete url %s", deleteUrl.Redacted())
|
logger.Infof("Snapshot delete url %s", deleteURL.Redacted())
|
||||||
|
|
||||||
name, err := snapshot.Create(createUrl.String())
|
name, err := snapshot.Create(createURL.String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Fatalf("cannot create snapshot: %s", err)
|
logger.Fatalf("cannot create snapshot: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -81,7 +81,7 @@ func main() {
|
||||||
}
|
}
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
err := snapshot.Delete(deleteUrl.String(), name)
|
err := snapshot.Delete(deleteURL.String(), name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Fatalf("cannot delete snapshot: %s", err)
|
logger.Fatalf("cannot delete snapshot: %s", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@ const (
|
||||||
// retryableFunc describes call back which will repeat on errors
|
// retryableFunc describes call back which will repeat on errors
|
||||||
type retryableFunc func() error
|
type retryableFunc func() error
|
||||||
|
|
||||||
|
// ErrBadRequest is an error returned on bad request
|
||||||
var ErrBadRequest = errors.New("bad request")
|
var ErrBadRequest = errors.New("bad request")
|
||||||
|
|
||||||
// Backoff describes object with backoff policy params
|
// Backoff describes object with backoff policy params
|
||||||
|
|
|
@ -783,7 +783,7 @@ func fillNaNsAtIdx(idx int, k float64, tss []*timeseries) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func getIntK(k float64, kMax int) int {
|
func getIntK(k float64, max int) int {
|
||||||
if math.IsNaN(k) {
|
if math.IsNaN(k) {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
@ -791,8 +791,8 @@ func getIntK(k float64, kMax int) int {
|
||||||
if kn < 0 {
|
if kn < 0 {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
if kn > kMax {
|
if kn > max {
|
||||||
return kMax
|
return max
|
||||||
}
|
}
|
||||||
return kn
|
return kn
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,6 +15,8 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
||||||
|
|
||||||
## tip
|
## tip
|
||||||
|
|
||||||
|
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): use the provided `-remoteWrite.*` auth options when determining whether the remote storage supports [VictoriaMetrics remote write protocol](https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol). Previously the auth options were ignored. This was preventing from automatic switch to VictoriaMetrics remote write protocol.
|
||||||
|
|
||||||
## [v1.88.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.88.0)
|
## [v1.88.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.88.0)
|
||||||
|
|
||||||
Released at 2023-02-24
|
Released at 2023-02-24
|
||||||
|
|
|
@ -197,9 +197,11 @@ VictoriaMetrics remote write protocol provides the following benefits comparing
|
||||||
In this case `vmagent` buffers the incoming data to disk using the VictoriaMetrics remote write format.
|
In this case `vmagent` buffers the incoming data to disk using the VictoriaMetrics remote write format.
|
||||||
This reduces disk read/write IO and disk space usage by 2x-5x comparing to Prometheus remote write format.
|
This reduces disk read/write IO and disk space usage by 2x-5x comparing to Prometheus remote write format.
|
||||||
|
|
||||||
`vmagent` automatically uses VictoriaMetrics remote write protocol when it sends data to VictoriaMetrics components such as other `vmagent` instances,
|
`vmagent` automatically switches to VictoriaMetrics remote write protocol when it sends data to VictoriaMetrics components such as other `vmagent` instances,
|
||||||
[single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html)
|
[single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html)
|
||||||
or `vminsert` at [cluster version](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html).
|
or `vminsert` at [cluster version](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html).
|
||||||
|
It is possible to force switch to VictoriaMetrics remote write protocol by specifying `-remoteWrite.forceVMProto`
|
||||||
|
command-line flag for the corresponding `-remoteWrite.url`.
|
||||||
|
|
||||||
`vmagent` automatically switches to Prometheus remote write protocol when it sends data to old versions of VictoriaMetrics components
|
`vmagent` automatically switches to Prometheus remote write protocol when it sends data to old versions of VictoriaMetrics components
|
||||||
or to other Prometheus-compatible remote storage systems. It is possible to force switch to Prometheus remote write protocol
|
or to other Prometheus-compatible remote storage systems. It is possible to force switch to Prometheus remote write protocol
|
||||||
|
@ -1455,6 +1457,9 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
|
||||||
-remoteWrite.forcePromProto array
|
-remoteWrite.forcePromProto array
|
||||||
Whether to force Prometheus remote write protocol for sending data to the corresponding -remoteWrite.url . See https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol
|
Whether to force Prometheus remote write protocol for sending data to the corresponding -remoteWrite.url . See https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol
|
||||||
Supports array of values separated by comma or specified via multiple flags.
|
Supports array of values separated by comma or specified via multiple flags.
|
||||||
|
-remoteWrite.forceVMProto array
|
||||||
|
Whether to force VictoriaMetrics remote write protocol for sending data to the corresponding -remoteWrite.url . See https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol
|
||||||
|
Supports array of values separated by comma or specified via multiple flags.
|
||||||
-remoteWrite.headers array
|
-remoteWrite.headers array
|
||||||
Optional HTTP headers to send with each request to the corresponding -remoteWrite.url. For example, -remoteWrite.headers='My-Auth:foobar' would send 'My-Auth: foobar' HTTP header with every request to the corresponding -remoteWrite.url. Multiple headers must be delimited by '^^': -remoteWrite.headers='header1:value1^^header2:value2'
|
Optional HTTP headers to send with each request to the corresponding -remoteWrite.url. For example, -remoteWrite.headers='My-Auth:foobar' would send 'My-Auth: foobar' HTTP header with every request to the corresponding -remoteWrite.url. Multiple headers must be delimited by '^^': -remoteWrite.headers='header1:value1^^header2:value2'
|
||||||
Supports an array of values separated by comma or specified via multiple flags.
|
Supports an array of values separated by comma or specified via multiple flags.
|
||||||
|
|
|
@ -84,10 +84,10 @@ func signRequestWithTime(req *http.Request, service, region, payloadHash string,
|
||||||
}
|
}
|
||||||
|
|
||||||
func getSignatureKey(key, datestamp, region, service string) string {
|
func getSignatureKey(key, datestamp, region, service string) string {
|
||||||
kDate := hmacBin("AWS4"+key, datestamp)
|
dateKey := hmacBin("AWS4"+key, datestamp)
|
||||||
kRegion := hmacBin(kDate, region)
|
regionKey := hmacBin(dateKey, region)
|
||||||
kService := hmacBin(kRegion, service)
|
serviceKey := hmacBin(regionKey, service)
|
||||||
return hmacBin(kService, "aws4_request")
|
return hmacBin(serviceKey, "aws4_request")
|
||||||
}
|
}
|
||||||
|
|
||||||
func hashHex(s string) string {
|
func hashHex(s string) string {
|
||||||
|
|
|
@ -468,6 +468,7 @@ func isHTTPURL(targetURL string) bool {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func IsScheduledForRemoval(name string) bool {
|
// IsScheduledForRemoval returns true if the filename contains .must-remove. substring
|
||||||
return strings.Contains(name, ".must-remove.")
|
func IsScheduledForRemoval(filename string) bool {
|
||||||
|
return strings.Contains(filename, ".must-remove.")
|
||||||
}
|
}
|
||||||
|
|
|
@ -79,7 +79,7 @@ func concatTwoStrings(x, y string) string {
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
func newClient(sw *ScrapeWork, ctx context.Context) *client {
|
func newClient(ctx context.Context, sw *ScrapeWork) *client {
|
||||||
var u fasthttp.URI
|
var u fasthttp.URI
|
||||||
u.Update(sw.ScrapeURL)
|
u.Update(sw.ScrapeURL)
|
||||||
hostPort := string(u.Host())
|
hostPort := string(u.Host())
|
||||||
|
|
|
@ -22,6 +22,8 @@ func getServiceLabels(cfg *apiConfig) []*promutils.Labels {
|
||||||
return ms
|
return ms
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ServiceList is a list of Nomad services.
|
||||||
|
// See https://developer.hashicorp.com/nomad/api-docs/services#list-services
|
||||||
type ServiceList struct {
|
type ServiceList struct {
|
||||||
Namespace string `json:"Namespace"`
|
Namespace string `json:"Namespace"`
|
||||||
Services []struct {
|
Services []struct {
|
||||||
|
|
|
@ -442,7 +442,7 @@ func newScraper(sw *ScrapeWork, group string, pushData func(at *auth.Token, wr *
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
stoppedCh: make(chan struct{}),
|
stoppedCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
c := newClient(sw, ctx)
|
c := newClient(ctx, sw)
|
||||||
sc.sw.Config = sw
|
sc.sw.Config = sw
|
||||||
sc.sw.ScrapeGroup = group
|
sc.sw.ScrapeGroup = group
|
||||||
sc.sw.ReadData = c.ReadData
|
sc.sw.ReadData = c.ReadData
|
||||||
|
|
|
@ -3,16 +3,20 @@ package common
|
||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
func HandleVMProtoClientHandshake(remoteWriteURL *url.URL) bool {
|
// HandleVMProtoClientHandshake returns true if the server at remoteWriteURL supports VictoriaMetrics remote write protocol.
|
||||||
u := *remoteWriteURL
|
func HandleVMProtoClientHandshake(remoteWriteURL string, doRequest func(handshakeURL string) (*http.Response, error)) bool {
|
||||||
q := u.Query()
|
u := remoteWriteURL
|
||||||
q.Set("get_vm_proto_version", "1")
|
if strings.Contains(u, "?") {
|
||||||
u.RawQuery = q.Encode()
|
u += "&"
|
||||||
resp, err := http.Get(u.String())
|
} else {
|
||||||
|
u += "?"
|
||||||
|
}
|
||||||
|
u += "get_vm_proto_version=1"
|
||||||
|
resp, err := doRequest(u)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue