changes vmagent api (#1656)

* changes vmagent api
adds auth.Token to promremotewrite InsertHandlerReader
changes remoteWrite client constructor, allows to use multiple remoteWriteUrl schemes, like kafka://
changes url path concatenation for tenant remoteWrite

Update app/vmagent/remotewrite/client.go

Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>

* Update app/vmagent/remotewrite/remotewrite.go

* Apply suggestions from code review

Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
Nikolay 2021-09-29 00:52:07 +03:00 committed by GitHub
parent 5dc84bf210
commit cc72f9428d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 40 additions and 18 deletions

View file

@ -37,10 +37,10 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
} }
// InsertHandlerForReader processes metrics from given reader // InsertHandlerForReader processes metrics from given reader
func InsertHandlerForReader(r io.Reader) error { func InsertHandlerForReader(at *auth.Token, r io.Reader) error {
return writeconcurrencylimiter.Do(func() error { return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(r, func(tss []prompb.TimeSeries) error { return parser.ParseStream(r, func(tss []prompb.TimeSeries) error {
return insertRows(nil, tss, nil) return insertRows(at, tss, nil)
}) })
}) })
} }

View file

@ -67,7 +67,8 @@ type client struct {
fq *persistentqueue.FastQueue fq *persistentqueue.FastQueue
hc *http.Client hc *http.Client
authCfg *promauth.Config sendBlock func(block []byte) bool
authCfg *promauth.Config
rl rateLimiter rl rateLimiter
@ -84,7 +85,7 @@ type client struct {
stopCh chan struct{} stopCh chan struct{}
} }
func newClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persistentqueue.FastQueue, concurrency int) *client { func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persistentqueue.FastQueue, concurrency int) *client {
authCfg, err := getAuthConfig(argIdx) authCfg, err := getAuthConfig(argIdx)
if err != nil { if err != nil {
logger.Panicf("FATAL: cannot initialize auth config: %s", err) logger.Panicf("FATAL: cannot initialize auth config: %s", err)
@ -121,6 +122,11 @@ func newClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persistentqu
}, },
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
} }
c.sendBlock = c.sendBlockHTTP
return c
}
func (c *client) init(argIdx, concurrency int, sanitizedURL string) {
if bytesPerSec := rateLimit.GetOptionalArgOrDefault(argIdx, 0); bytesPerSec > 0 { if bytesPerSec := rateLimit.GetOptionalArgOrDefault(argIdx, 0); bytesPerSec > 0 {
logger.Infof("applying %d bytes per second rate limit for -remoteWrite.url=%q", bytesPerSec, sanitizedURL) logger.Infof("applying %d bytes per second rate limit for -remoteWrite.url=%q", bytesPerSec, sanitizedURL)
c.rl.perSecondLimit = int64(bytesPerSec) c.rl.perSecondLimit = int64(bytesPerSec)
@ -143,7 +149,6 @@ func newClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persistentqu
}() }()
} }
logger.Infof("initialized client for -remoteWrite.url=%q", c.sanitizedURL) logger.Infof("initialized client for -remoteWrite.url=%q", c.sanitizedURL)
return c
} }
func (c *client) MustStop() { func (c *client) MustStop() {
@ -237,9 +242,9 @@ func (c *client) runWorker() {
} }
} }
// sendBlock returns false only if c.stopCh is closed. // sendBlockHTTP returns false only if c.stopCh is closed.
// Otherwise it tries sending the block to remote storage indefinitely. // Otherwise it tries sending the block to remote storage indefinitely.
func (c *client) sendBlock(block []byte) bool { func (c *client) sendBlockHTTP(block []byte) bool {
c.rl.register(len(block), c.stopCh) c.rl.register(len(block), c.stopCh)
retryDuration := time.Second retryDuration := time.Second
retriesCount := 0 retriesCount := 0

View file

@ -3,6 +3,7 @@ package remotewrite
import ( import (
"flag" "flag"
"fmt" "fmt"
"net/url"
"strconv" "strconv"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -181,17 +182,21 @@ func newRemoteWriteCtxs(at *auth.Token, urls []string) []*remoteWriteCtx {
maxInmemoryBlocks = 2 maxInmemoryBlocks = 2
} }
rwctxs := make([]*remoteWriteCtx, len(urls)) rwctxs := make([]*remoteWriteCtx, len(urls))
for i, remoteWriteURL := range urls { for i, remoteWriteURLRaw := range urls {
remoteWriteURL, err := url.Parse(remoteWriteURLRaw)
if err != nil {
logger.Fatalf("invalid -remoteWrite.url=%q: %s", remoteWriteURL, err)
}
sanitizedURL := fmt.Sprintf("%d:secret-url", i+1) sanitizedURL := fmt.Sprintf("%d:secret-url", i+1)
if at != nil { if at != nil {
// Construct full remote_write url for the given tenant according to https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format // Construct full remote_write url for the given tenant according to https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format
remoteWriteURL = fmt.Sprintf("%s/insert/%d:%d/prometheus/api/v1/write", remoteWriteURL, at.AccountID, at.ProjectID) remoteWriteURL.Path = fmt.Sprintf("%s/insert/%d:%d/prometheus/api/v1/write", remoteWriteURL.Path, at.AccountID, at.ProjectID)
sanitizedURL = fmt.Sprintf("%s:%d:%d", sanitizedURL, at.AccountID, at.ProjectID) sanitizedURL = fmt.Sprintf("%s:%d:%d", sanitizedURL, at.AccountID, at.ProjectID)
} }
if *showRemoteWriteURL { if *showRemoteWriteURL {
sanitizedURL = fmt.Sprintf("%d:%s", i+1, remoteWriteURL) sanitizedURL = fmt.Sprintf("%d:%s", i+1, remoteWriteURL)
} }
rwctxs[i] = newRemoteWriteCtx(i, remoteWriteURL, maxInmemoryBlocks, sanitizedURL) rwctxs[i] = newRemoteWriteCtx(i, at, remoteWriteURL, maxInmemoryBlocks, sanitizedURL)
} }
return rwctxs return rwctxs
} }
@ -403,17 +408,29 @@ type remoteWriteCtx struct {
relabelMetricsDropped *metrics.Counter relabelMetricsDropped *metrics.Counter
} }
func newRemoteWriteCtx(argIdx int, remoteWriteURL string, maxInmemoryBlocks int, sanitizedURL string) *remoteWriteCtx { func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxInmemoryBlocks int, sanitizedURL string) *remoteWriteCtx {
h := xxhash.Sum64([]byte(remoteWriteURL)) // strip query params, otherwise changing params resets pq
path := fmt.Sprintf("%s/persistent-queue/%d_%016X", *tmpDataPath, argIdx+1, h) pqURL := *remoteWriteURL
fq := persistentqueue.MustOpenFastQueue(path, sanitizedURL, maxInmemoryBlocks, maxPendingBytesPerURL.N) pqURL.RawQuery = ""
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{path=%q, url=%q}`, path, sanitizedURL), func() float64 { pqURL.Fragment = ""
h := xxhash.Sum64([]byte(pqURL.String()))
queuePath := fmt.Sprintf("%s/persistent-queue/%d_%016X", *tmpDataPath, argIdx+1, h)
fq := persistentqueue.MustOpenFastQueue(queuePath, sanitizedURL, maxInmemoryBlocks, maxPendingBytesPerURL.N)
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 {
return float64(fq.GetPendingBytes()) return float64(fq.GetPendingBytes())
}) })
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_inmemory_blocks{path=%q, url=%q}`, path, sanitizedURL), func() float64 { _ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_inmemory_blocks{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 {
return float64(fq.GetInmemoryQueueLen()) return float64(fq.GetInmemoryQueueLen())
}) })
c := newClient(argIdx, remoteWriteURL, sanitizedURL, fq, *queues) var c *client
switch remoteWriteURL.Scheme {
case "http", "https":
c = newHTTPClient(argIdx, remoteWriteURL.String(), sanitizedURL, fq, *queues)
default:
logger.Fatalf("unsupported scheme: %s for remoteWriteURL: %s, want `http`, `https`", remoteWriteURL.Scheme, sanitizedURL)
}
c.init(argIdx, *queues, sanitizedURL)
sf := significantFigures.GetOptionalArgOrDefault(argIdx, 0) sf := significantFigures.GetOptionalArgOrDefault(argIdx, 0)
rd := roundDigits.GetOptionalArgOrDefault(argIdx, 100) rd := roundDigits.GetOptionalArgOrDefault(argIdx, 100)
pssLen := *queues pssLen := *queues
@ -432,7 +449,7 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL string, maxInmemoryBlocks int,
c: c, c: c,
pss: pss, pss: pss,
relabelMetricsDropped: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{path=%q, url=%q}`, path, sanitizedURL)), relabelMetricsDropped: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{path=%q, url=%q}`, queuePath, sanitizedURL)),
} }
} }