mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-01 14:47:38 +00:00
Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files
This commit is contained in:
commit
079888f719
12 changed files with 106 additions and 16 deletions
|
@ -8,6 +8,7 @@ import (
|
|||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -16,10 +17,14 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
rateLimit = flagutil.NewArray("remoteWrite.rateLimit", "Optional rate limit in bytes per second for data sent to -remoteWrite.url. "+
|
||||
"By default the rate limit is disabled. It can be useful for limiting load on remote storage when big amounts of buffered data "+
|
||||
"is sent after temporary unavailability of the remote storage")
|
||||
sendTimeout = flagutil.NewArrayDuration("remoteWrite.sendTimeout", "Timeout for sending a single block of data to -remoteWrite.url")
|
||||
proxyURL = flagutil.NewArray("remoteWrite.proxyURL", "Optional proxy URL for writing data to -remoteWrite.url. Supported proxies: http, https, socks5. "+
|
||||
"Example: -remoteWrite.proxyURL=socks5://proxy:1234")
|
||||
|
@ -49,6 +54,8 @@ type client struct {
|
|||
fq *persistentqueue.FastQueue
|
||||
hc *http.Client
|
||||
|
||||
rl rateLimiter
|
||||
|
||||
bytesSent *metrics.Counter
|
||||
blocksSent *metrics.Counter
|
||||
requestDuration *metrics.Histogram
|
||||
|
@ -113,6 +120,18 @@ func newClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persistentqu
|
|||
},
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
if bytesPerSec := rateLimit.GetOptionalArg(argIdx); bytesPerSec != "" {
|
||||
limit, err := strconv.ParseInt(bytesPerSec, 10, 64)
|
||||
if err != nil {
|
||||
logger.Fatalf("cannot parse -remoteWrite.rateLimit=%q for -remoteWrite.url=%q: %s", bytesPerSec, sanitizedURL, err)
|
||||
}
|
||||
if limit > 0 {
|
||||
logger.Infof("applying %d bytes per second rate limit for -remoteWrite.url=%q", limit, sanitizedURL)
|
||||
c.rl.perSecondLimit = limit
|
||||
}
|
||||
}
|
||||
c.rl.limitReached = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remote_write_rate_limit_reached_total{url=%q}`, c.sanitizedURL))
|
||||
|
||||
c.bytesSent = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_bytes_sent_total{url=%q}`, c.sanitizedURL))
|
||||
c.blocksSent = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_blocks_sent_total{url=%q}`, c.sanitizedURL))
|
||||
c.requestDuration = metrics.GetOrCreateHistogram(fmt.Sprintf(`vmagent_remotewrite_duration_seconds{url=%q}`, c.sanitizedURL))
|
||||
|
@ -189,6 +208,7 @@ func (c *client) runWorker() {
|
|||
}
|
||||
|
||||
func (c *client) sendBlock(block []byte) {
|
||||
c.rl.register(len(block), c.stopCh)
|
||||
retryDuration := time.Second
|
||||
retriesCount := 0
|
||||
c.bytesSent.Add(len(block))
|
||||
|
@ -219,12 +239,13 @@ again:
|
|||
}
|
||||
logger.Errorf("couldn't send a block with size %d bytes to %q: %s; re-sending the block in %.3f seconds",
|
||||
len(block), c.sanitizedURL, err, retryDuration.Seconds())
|
||||
t := time.NewTimer(retryDuration)
|
||||
t := timerpool.Get(retryDuration)
|
||||
select {
|
||||
case <-c.stopCh:
|
||||
t.Stop()
|
||||
timerpool.Put(t)
|
||||
return
|
||||
case <-t.C:
|
||||
timerpool.Put(t)
|
||||
}
|
||||
c.retriesCount.Inc()
|
||||
goto again
|
||||
|
@ -261,13 +282,50 @@ again:
|
|||
logger.Errorf("unexpected status code received after sending a block with size %d bytes to %q during retry #%d: %d; response body=%q; "+
|
||||
"re-sending the block in %.3f seconds", len(block), c.sanitizedURL, retriesCount, statusCode, body, retryDuration.Seconds())
|
||||
}
|
||||
t := time.NewTimer(retryDuration)
|
||||
t := timerpool.Get(retryDuration)
|
||||
select {
|
||||
case <-c.stopCh:
|
||||
t.Stop()
|
||||
timerpool.Put(t)
|
||||
return
|
||||
case <-t.C:
|
||||
timerpool.Put(t)
|
||||
}
|
||||
c.retriesCount.Inc()
|
||||
goto again
|
||||
}
|
||||
|
||||
type rateLimiter struct {
|
||||
perSecondLimit int64
|
||||
|
||||
// The current budget. It is increased by perSecondLimit every second.
|
||||
budget int64
|
||||
|
||||
// The next deadline for increasing the budget by perSecondLimit
|
||||
deadline time.Time
|
||||
|
||||
limitReached *metrics.Counter
|
||||
}
|
||||
|
||||
func (rl *rateLimiter) register(dataLen int, stopCh <-chan struct{}) {
|
||||
limit := rl.perSecondLimit
|
||||
if limit <= 0 {
|
||||
return
|
||||
}
|
||||
for rl.budget <= 0 {
|
||||
now := time.Now()
|
||||
if d := rl.deadline.Sub(now); d > 0 {
|
||||
rl.limitReached.Inc()
|
||||
t := timerpool.Get(d)
|
||||
select {
|
||||
case <-stopCh:
|
||||
timerpool.Put(t)
|
||||
return
|
||||
case <-t.C:
|
||||
timerpool.Put(t)
|
||||
}
|
||||
}
|
||||
rl.budget += limit
|
||||
rl.deadline = now.Add(time.Second)
|
||||
}
|
||||
rl.budget -= int64(dataLen)
|
||||
}
|
||||
|
|
|
@ -24,6 +24,8 @@ var (
|
|||
|
||||
lookBack = flag.Duration("datasource.lookback", 0, "Lookback defines how far to look into past when evaluating queries. "+
|
||||
"For example, if datasource.lookback=5m then param \"time\" with value now()-5m will be added to every query.")
|
||||
queryStep = flag.Duration("datasource.queryStep", 0, "queryStep defines how far a value can fallback to when evaluating queries. "+
|
||||
"For example, if datasource.queryStep=15s then param \"step\" with value \"15s\" will be added to every query.")
|
||||
maxIdleConnections = flag.Int("datasource.maxIdleConnections", 100, "Defines the number of idle (keep-alive connections) to configured datasource."+
|
||||
"Consider to set this value equal to the value: groups_total * group.concurrency. Too low value may result into high number of sockets in TIME_WAIT state.")
|
||||
)
|
||||
|
@ -39,5 +41,5 @@ func Init() (Querier, error) {
|
|||
}
|
||||
tr.MaxIdleConns = *maxIdleConnections
|
||||
c := &http.Client{Transport: tr}
|
||||
return NewVMStorage(*addr, *basicAuthUsername, *basicAuthPassword, *lookBack, c), nil
|
||||
return NewVMStorage(*addr, *basicAuthUsername, *basicAuthPassword, *lookBack, *queryStep, c), nil
|
||||
}
|
||||
|
|
|
@ -53,18 +53,20 @@ type VMStorage struct {
|
|||
basicAuthUser string
|
||||
basicAuthPass string
|
||||
lookBack time.Duration
|
||||
queryStep time.Duration
|
||||
}
|
||||
|
||||
const queryPath = "/api/v1/query?query="
|
||||
|
||||
// NewVMStorage is a constructor for VMStorage
|
||||
func NewVMStorage(baseURL, basicAuthUser, basicAuthPass string, lookBack time.Duration, c *http.Client) *VMStorage {
|
||||
func NewVMStorage(baseURL, basicAuthUser, basicAuthPass string, lookBack time.Duration, queryStep time.Duration, c *http.Client) *VMStorage {
|
||||
return &VMStorage{
|
||||
c: c,
|
||||
basicAuthUser: basicAuthUser,
|
||||
basicAuthPass: basicAuthPass,
|
||||
queryURL: strings.TrimSuffix(baseURL, "/") + queryPath,
|
||||
lookBack: lookBack,
|
||||
queryStep: queryStep,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -78,6 +80,9 @@ func (s *VMStorage) Query(ctx context.Context, query string) ([]Metric, error) {
|
|||
lookBack := time.Now().Add(-s.lookBack)
|
||||
q += fmt.Sprintf("&time=%d", lookBack.Unix())
|
||||
}
|
||||
if s.queryStep > 0 {
|
||||
q += fmt.Sprintf("&step=%s", s.queryStep.String())
|
||||
}
|
||||
req, err := http.NewRequest("POST", q, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -61,7 +61,7 @@ func TestVMSelectQuery(t *testing.T) {
|
|||
|
||||
srv := httptest.NewServer(mux)
|
||||
defer srv.Close()
|
||||
am := NewVMStorage(srv.URL, basicAuthName, basicAuthPass, time.Minute, srv.Client())
|
||||
am := NewVMStorage(srv.URL, basicAuthName, basicAuthPass, time.Minute, 0, srv.Client())
|
||||
if _, err := am.Query(ctx, query); err == nil {
|
||||
t.Fatalf("expected connection error got nil")
|
||||
}
|
||||
|
|
|
@ -35,5 +35,5 @@ func Init() (datasource.Querier, error) {
|
|||
return nil, fmt.Errorf("failed to create transport: %w", err)
|
||||
}
|
||||
c := &http.Client{Transport: tr}
|
||||
return datasource.NewVMStorage(*addr, *basicAuthUsername, *basicAuthPassword, 0, c), nil
|
||||
return datasource.NewVMStorage(*addr, *basicAuthUsername, *basicAuthPassword, 0, 0, c), nil
|
||||
}
|
||||
|
|
|
@ -19,7 +19,9 @@ import (
|
|||
|
||||
var (
|
||||
disableCache = flag.Bool("search.disableCache", false, "Whether to disable response caching. This may be useful during data backfilling")
|
||||
maxPointsPerTimeseries = flag.Int("search.maxPointsPerTimeseries", 30e3, "The maximum points per a single timeseries returned from the search")
|
||||
maxPointsPerTimeseries = flag.Int("search.maxPointsPerTimeseries", 30e3, "The maximum points per a single timeseries returned from /api/v1/query_range. "+
|
||||
"This option doesn't limit the number of scanned raw samples in the database. The main purpose of this option is to limit the number of per-series points "+
|
||||
"returned to graphing UI such as Grafana. There is no sense in setting this limit to values significantly exceeding horizontal resoultion of the graph")
|
||||
)
|
||||
|
||||
// The minimum number of points per timeseries for enabling time rounding.
|
||||
|
|
|
@ -2,9 +2,14 @@
|
|||
|
||||
# tip
|
||||
|
||||
* FEATURE: added `search.maxStepForPointsAdjustment` command-line flag, which can be used for disabling adjustment for points returned `/api/v1/query_range` handler if such points have timestamps closer than `-search.latencyOffset` to the current time. Such points may contain incomplete data, so they are substituted by the previous values for `step` query args smaller than one minute by default.
|
||||
* FEATURE: added `-loggerTimezone` command-line flag for adjusting time zone for timestamps in log messages. By default UTC is used.
|
||||
* FEATURE: added `-search.maxStepForPointsAdjustment` command-line flag, which can be used for disabling adjustment for points returned `/api/v1/query_range` handler if such points have timestamps closer than `-search.latencyOffset` to the current time. Such points may contain incomplete data, so they are substituted by the previous values for `step` query args smaller than one minute by default.
|
||||
* FEATURE: vmalert: added `-datasource.queryStep` command-line flag for passing `step` query arg to `/api/v1/query` endpoint. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1025
|
||||
* FEATURE: vmagent: added `-remoteWrite.rateLimit` command-line flag for limiting data transfer rate to `-remoteWrite.url`. This may be useful when big amounts of buffered data is sent after temporarily unavailability of the remote storage. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1035
|
||||
* FEATURE: vmagent: export `vm_promscrape_scrapes_failed_per_url_total` and `vm_promscrape_scrapes_skipped_by_sample_limit_per_url_total` counters, which may help identifying improperly working scrape targets.
|
||||
|
||||
* BUGFIX: vmagent: reduce the HTTP reconnection rate to scrape targets. Previously vmagent could errorneusly close HTTP keep-alive connections more often than needed.
|
||||
* BUGFIX: vmagent: retry scrape and service discovery requests when the remote server closes HTTP keep-alive connection. Previously `disable_keepalive: true` option could be used under `scrape_configs` section when working with such servers.
|
||||
|
||||
|
||||
# [v1.52.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.52.0)
|
||||
|
|
|
@ -198,7 +198,7 @@ func MustOpenReaderAt(path string) *ReaderAt {
|
|||
}
|
||||
|
||||
func pageCacheBitmapCleaner(pcbm *atomic.Value, stopCh <-chan struct{}) {
|
||||
t := time.NewTimer(time.Minute)
|
||||
t := time.NewTicker(time.Minute)
|
||||
for {
|
||||
select {
|
||||
case <-stopCh:
|
||||
|
|
|
@ -20,6 +20,7 @@ var (
|
|||
loggerLevel = flag.String("loggerLevel", "INFO", "Minimum level of errors to log. Possible values: INFO, WARN, ERROR, FATAL, PANIC")
|
||||
loggerFormat = flag.String("loggerFormat", "default", "Format for logs. Possible values: default, json")
|
||||
loggerOutput = flag.String("loggerOutput", "stderr", "Output for the logs. Supported values: stderr, stdout")
|
||||
loggerTimezone = flag.String("loggerTimezone", "UTC", "Timezone to use for timestamps in logs. Local timezone can be used")
|
||||
disableTimestamps = flag.Bool("loggerDisableTimestamps", false, "Whether to disable writing timestamps in logs")
|
||||
|
||||
errorsPerSecondLimit = flag.Int("loggerErrorsPerSecondLimit", 0, "Per-second limit on the number of ERROR messages. If more than the given number of errors "+
|
||||
|
@ -37,10 +38,23 @@ func Init() {
|
|||
setLoggerOutput()
|
||||
validateLoggerLevel()
|
||||
validateLoggerFormat()
|
||||
initTimezone()
|
||||
go logLimiterCleaner()
|
||||
logAllFlags()
|
||||
|
||||
}
|
||||
|
||||
func initTimezone() {
|
||||
tz, err := time.LoadLocation(*loggerTimezone)
|
||||
if err != nil {
|
||||
log.Printf("cannot load timezone %q, so using UTC; error: %s", *loggerTimezone, err)
|
||||
tz = time.UTC
|
||||
}
|
||||
timezone = tz
|
||||
}
|
||||
|
||||
var timezone = time.UTC
|
||||
|
||||
func setLoggerOutput() {
|
||||
switch *loggerOutput {
|
||||
case "stderr":
|
||||
|
@ -192,7 +206,7 @@ func (lw *logWriter) Write(p []byte) (int, error) {
|
|||
func logMessage(level, msg string, skipframes int) {
|
||||
timestamp := ""
|
||||
if !*disableTimestamps {
|
||||
timestamp = time.Now().UTC().Format("2006-01-02T15:04:05.000Z")
|
||||
timestamp = time.Now().In(timezone).Format("2006-01-02T15:04:05.000Z0700")
|
||||
}
|
||||
levelLowercase := strings.ToLower(level)
|
||||
_, file, line, ok := runtime.Caller(skipframes)
|
||||
|
|
|
@ -252,7 +252,7 @@ func doRequestWithPossibleRetry(hc *fasthttp.HostClient, req *fasthttp.Request,
|
|||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
if err != fasthttp.ErrConnectionClosed {
|
||||
if err != fasthttp.ErrConnectionClosed && !strings.Contains(err.Error(), "broken pipe") {
|
||||
return err
|
||||
}
|
||||
// Retry request if the server closes the keep-alive connection unless deadline exceeds.
|
||||
|
|
|
@ -199,7 +199,7 @@ func doRequestWithPossibleRetry(hc *fasthttp.HostClient, req *fasthttp.Request,
|
|||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
if err != fasthttp.ErrConnectionClosed {
|
||||
if err != fasthttp.ErrConnectionClosed && !strings.Contains(err.Error(), "broken pipe") {
|
||||
return err
|
||||
}
|
||||
// Retry request if the server closes the keep-alive connection unless deadline exceeds.
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
||||
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
xxhash "github.com/cespare/xxhash/v2"
|
||||
)
|
||||
|
@ -193,14 +194,15 @@ func (sw *scrapeWork) run(stopCh <-chan struct{}) {
|
|||
randSleep += uint64(scrapeInterval)
|
||||
}
|
||||
randSleep -= sleepOffset
|
||||
timer := time.NewTimer(time.Duration(randSleep))
|
||||
timer := timerpool.Get(time.Duration(randSleep))
|
||||
var timestamp int64
|
||||
var ticker *time.Ticker
|
||||
select {
|
||||
case <-stopCh:
|
||||
timer.Stop()
|
||||
timerpool.Put(timer)
|
||||
return
|
||||
case <-timer.C:
|
||||
timerpool.Put(timer)
|
||||
ticker = time.NewTicker(scrapeInterval)
|
||||
timestamp = time.Now().UnixNano() / 1e6
|
||||
sw.scrapeAndLogError(timestamp, timestamp)
|
||||
|
@ -267,6 +269,7 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
|
|||
if err != nil {
|
||||
up = 0
|
||||
scrapesFailed.Inc()
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_scrapes_failed_per_url_total{url=%q}`, sw.Config.ScrapeURL)).Inc()
|
||||
} else {
|
||||
bodyString := bytesutil.ToUnsafeString(body.B)
|
||||
wc.rows.UnmarshalWithErrLogger(bodyString, sw.logError)
|
||||
|
@ -278,6 +281,7 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
|
|||
srcRows = srcRows[:0]
|
||||
up = 0
|
||||
scrapesSkippedBySampleLimit.Inc()
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_scrapes_skipped_by_sample_limit_per_url_total{url=%q}`, sw.Config.ScrapeURL)).Inc()
|
||||
}
|
||||
samplesPostRelabeling := 0
|
||||
for i := range srcRows {
|
||||
|
|
Loading…
Reference in a new issue