all: allow using KB, MB, GB, KiB, MiB and GiB suffixes in command-line flag values related to byte sizes or byte rates

This commit is contained in:
Aliaksandr Valialkin 2020-08-16 17:05:52 +03:00
parent 7c0d6a8b88
commit 147c35ebd4
13 changed files with 198 additions and 35 deletions

View file

@ -7,6 +7,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue" "github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
@ -17,7 +18,7 @@ var (
flushInterval = flag.Duration("remoteWrite.flushInterval", time.Second, "Interval for flushing the data to remote storage. "+ flushInterval = flag.Duration("remoteWrite.flushInterval", time.Second, "Interval for flushing the data to remote storage. "+
"Higher value reduces network bandwidth usage at the cost of delayed push of scraped data to remote storage. "+ "Higher value reduces network bandwidth usage at the cost of delayed push of scraped data to remote storage. "+
"Minimum supported interval is 1 second") "Minimum supported interval is 1 second")
maxUnpackedBlockSize = flag.Int("remoteWrite.maxBlockSize", 32*1024*1024, "The maximum size in bytes of unpacked request to send to remote storage. "+ maxUnpackedBlockSize = flagutil.NewBytes("remoteWrite.maxBlockSize", 32*1024*1024, "The maximum size in bytes of unpacked request to send to remote storage. "+
"It shouldn't exceed -maxInsertRequestSize from VictoriaMetrics") "It shouldn't exceed -maxInsertRequestSize from VictoriaMetrics")
) )
@ -164,7 +165,7 @@ func pushWriteRequest(wr *prompbmarshal.WriteRequest, pushBlock func(block []byt
} }
bb := writeRequestBufPool.Get() bb := writeRequestBufPool.Get()
bb.B = prompbmarshal.MarshalWriteRequest(bb.B[:0], wr) bb.B = prompbmarshal.MarshalWriteRequest(bb.B[:0], wr)
if len(bb.B) <= *maxUnpackedBlockSize { if len(bb.B) <= maxUnpackedBlockSize.N {
zb := snappyBufPool.Get() zb := snappyBufPool.Get()
zb.B = snappy.Encode(zb.B[:cap(zb.B)], bb.B) zb.B = snappy.Encode(zb.B[:cap(zb.B)], bb.B)
writeRequestBufPool.Put(bb) writeRequestBufPool.Put(bb)

View file

@ -27,7 +27,7 @@ var (
"isn't enough for sending high volume of collected data to remote storage") "isn't enough for sending high volume of collected data to remote storage")
showRemoteWriteURL = flag.Bool("remoteWrite.showURL", false, "Whether to show -remoteWrite.url in the exported metrics. "+ showRemoteWriteURL = flag.Bool("remoteWrite.showURL", false, "Whether to show -remoteWrite.url in the exported metrics. "+
"It is hidden by default, since it can contain sensitive info such as auth key") "It is hidden by default, since it can contain sensitive info such as auth key")
maxPendingBytesPerURL = flag.Int("remoteWrite.maxDiskUsagePerURL", 0, "The maximum file-based buffer size in bytes at -remoteWrite.tmpDataPath "+ maxPendingBytesPerURL = flagutil.NewBytes("remoteWrite.maxDiskUsagePerURL", 0, "The maximum file-based buffer size in bytes at -remoteWrite.tmpDataPath "+
"for each -remoteWrite.url. When buffer size reaches the configured maximum, then old data is dropped when adding new data to the buffer. "+ "for each -remoteWrite.url. When buffer size reaches the configured maximum, then old data is dropped when adding new data to the buffer. "+
"Buffered data is stored in ~500MB chunks, so the minimum practical value for this flag is 500000000. "+ "Buffered data is stored in ~500MB chunks, so the minimum practical value for this flag is 500000000. "+
"Disk usage is unlimited if the value is set to 0") "Disk usage is unlimited if the value is set to 0")
@ -183,7 +183,7 @@ type remoteWriteCtx struct {
func newRemoteWriteCtx(argIdx int, remoteWriteURL string, maxInmemoryBlocks int, urlLabelValue string) *remoteWriteCtx { func newRemoteWriteCtx(argIdx int, remoteWriteURL string, maxInmemoryBlocks int, urlLabelValue string) *remoteWriteCtx {
h := xxhash.Sum64([]byte(remoteWriteURL)) h := xxhash.Sum64([]byte(remoteWriteURL))
path := fmt.Sprintf("%s/persistent-queue/%016X", *tmpDataPath, h) path := fmt.Sprintf("%s/persistent-queue/%016X", *tmpDataPath, h)
fq := persistentqueue.MustOpenFastQueue(path, remoteWriteURL, maxInmemoryBlocks, *maxPendingBytesPerURL) fq := persistentqueue.MustOpenFastQueue(path, remoteWriteURL, maxInmemoryBlocks, maxPendingBytesPerURL.N)
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{path=%q, url=%q}`, path, urlLabelValue), func() float64 { _ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{path=%q, url=%q}`, path, urlLabelValue), func() float64 {
return float64(fq.GetPendingBytes()) return float64(fq.GetPendingBytes())
}) })

View file

@ -13,6 +13,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
) )
@ -28,7 +29,7 @@ var (
"-dst can point to the previous backup. In this case incremental backup is performed, i.e. only changed data is uploaded") "-dst can point to the previous backup. In this case incremental backup is performed, i.e. only changed data is uploaded")
origin = flag.String("origin", "", "Optional origin directory on the remote storage with old backup for server-side copying when performing full backup. This speeds up full backups") origin = flag.String("origin", "", "Optional origin directory on the remote storage with old backup for server-side copying when performing full backup. This speeds up full backups")
concurrency = flag.Int("concurrency", 10, "The number of concurrent workers. Higher concurrency may reduce backup duration") concurrency = flag.Int("concurrency", 10, "The number of concurrent workers. Higher concurrency may reduce backup duration")
maxBytesPerSecond = flag.Int("maxBytesPerSecond", 0, "The maximum upload speed. There is no limit if it is set to 0") maxBytesPerSecond = flagutil.NewBytes("maxBytesPerSecond", 0, "The maximum upload speed. There is no limit if it is set to 0")
) )
func main() { func main() {
@ -126,7 +127,7 @@ func newSrcFS() (*fslocal.FS, error) {
fs := &fslocal.FS{ fs := &fslocal.FS{
Dir: snapshotPath, Dir: snapshotPath,
MaxBytesPerSecond: *maxBytesPerSecond, MaxBytesPerSecond: maxBytesPerSecond.N,
} }
if err := fs.Init(); err != nil { if err := fs.Init(); err != nil {
return nil, fmt.Errorf("cannot initialize fs: %w", err) return nil, fmt.Errorf("cannot initialize fs: %w", err)

View file

@ -11,6 +11,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
) )
@ -21,7 +22,7 @@ var (
"VictoriaMetrics must be stopped when restoring from backup. -storageDataPath dir can be non-empty. In this case the contents of -storageDataPath dir "+ "VictoriaMetrics must be stopped when restoring from backup. -storageDataPath dir can be non-empty. In this case the contents of -storageDataPath dir "+
"is synchronized with -src contents, i.e. it works like 'rsync --delete'") "is synchronized with -src contents, i.e. it works like 'rsync --delete'")
concurrency = flag.Int("concurrency", 10, "The number of concurrent workers. Higher concurrency may reduce restore duration") concurrency = flag.Int("concurrency", 10, "The number of concurrent workers. Higher concurrency may reduce restore duration")
maxBytesPerSecond = flag.Int("maxBytesPerSecond", 0, "The maximum download speed. There is no limit if it is set to 0") maxBytesPerSecond = flagutil.NewBytes("maxBytesPerSecond", 0, "The maximum download speed. There is no limit if it is set to 0")
skipBackupCompleteCheck = flag.Bool("skipBackupCompleteCheck", false, "Whether to skip checking for 'backup complete' file in -src. This may be useful for restoring from old backups, which were created without 'backup complete' file") skipBackupCompleteCheck = flag.Bool("skipBackupCompleteCheck", false, "Whether to skip checking for 'backup complete' file in -src. This may be useful for restoring from old backups, which were created without 'backup complete' file")
) )
@ -71,7 +72,7 @@ func newDstFS() (*fslocal.FS, error) {
} }
fs := &fslocal.FS{ fs := &fslocal.FS{
Dir: *storageDataPath, Dir: *storageDataPath,
MaxBytesPerSecond: *maxBytesPerSecond, MaxBytesPerSecond: maxBytesPerSecond.N,
} }
if err := fs.Init(); err != nil { if err := fs.Init(); err != nil {
return nil, fmt.Errorf("cannot initialize local fs: %w", err) return nil, fmt.Errorf("cannot initialize local fs: %w", err)

View file

@ -15,6 +15,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
@ -29,7 +30,7 @@ var (
"Too small value can result in incomplete last points for query results") "Too small value can result in incomplete last points for query results")
maxExportDuration = flag.Duration("search.maxExportDuration", time.Hour*24*30, "The maximum duration for /api/v1/export call") maxExportDuration = flag.Duration("search.maxExportDuration", time.Hour*24*30, "The maximum duration for /api/v1/export call")
maxQueryDuration = flag.Duration("search.maxQueryDuration", time.Second*30, "The maximum duration for search query execution") maxQueryDuration = flag.Duration("search.maxQueryDuration", time.Second*30, "The maximum duration for search query execution")
maxQueryLen = flag.Int("search.maxQueryLen", 16*1024, "The maximum search query length in bytes") maxQueryLen = flagutil.NewBytes("search.maxQueryLen", 16*1024, "The maximum search query length in bytes")
maxLookback = flag.Duration("search.maxLookback", 0, "Synonim to -search.lookback-delta from Prometheus. "+ maxLookback = flag.Duration("search.maxLookback", 0, "Synonim to -search.lookback-delta from Prometheus. "+
"The value is dynamically detected from interval between time series datapoints if not set. It can be overridden on per-query basis via max_lookback arg. "+ "The value is dynamically detected from interval between time series datapoints if not set. It can be overridden on per-query basis via max_lookback arg. "+
"See also '-search.maxStalenessInterval' flag, which has the same meaining due to historical reasons") "See also '-search.maxStalenessInterval' flag, which has the same meaining due to historical reasons")
@ -648,8 +649,8 @@ func QueryHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) e
} }
deadline := getDeadlineForQuery(r, startTime) deadline := getDeadlineForQuery(r, startTime)
if len(query) > *maxQueryLen { if len(query) > maxQueryLen.N {
return fmt.Errorf("too long query; got %d bytes; mustn't exceed `-search.maxQueryLen=%d` bytes", len(query), *maxQueryLen) return fmt.Errorf("too long query; got %d bytes; mustn't exceed `-search.maxQueryLen=%d` bytes", len(query), maxQueryLen.N)
} }
queryOffset := getLatencyOffsetMilliseconds() queryOffset := getLatencyOffsetMilliseconds()
if !getBool(r, "nocache") && ct-start < queryOffset { if !getBool(r, "nocache") && ct-start < queryOffset {
@ -773,8 +774,8 @@ func queryRangeHandler(startTime time.Time, w http.ResponseWriter, query string,
} }
// Validate input args. // Validate input args.
if len(query) > *maxQueryLen { if len(query) > maxQueryLen.N {
return fmt.Errorf("too long query; got %d bytes; mustn't exceed `-search.maxQueryLen=%d` bytes", len(query), *maxQueryLen) return fmt.Errorf("too long query; got %d bytes; mustn't exceed `-search.maxQueryLen=%d` bytes", len(query), maxQueryLen.N)
} }
if start > end { if start > end {
end = start + defaultStep end = start + defaultStep

View file

@ -9,9 +9,9 @@ import (
// NewArray returns new Array with the given name and description. // NewArray returns new Array with the given name and description.
func NewArray(name, description string) *Array { func NewArray(name, description string) *Array {
var a Array
description += "\nSupports `array` of values separated by comma" + description += "\nSupports `array` of values separated by comma" +
" or specified via multiple flags." " or specified via multiple flags."
var a Array
flag.Var(&a, name, description) flag.Var(&a, name, description)
return &a return &a
} }

102
lib/flagutil/bytes.go Normal file
View file

@ -0,0 +1,102 @@
package flagutil
import (
"flag"
"fmt"
"strconv"
"strings"
)
// NewBytes returns new `bytes` flag with the given name, defaultValue and description.
func NewBytes(name string, defaultValue int, description string) *Bytes {
description += "\nSupports the following optional suffixes for values: KB, MB, GB, KiB, MiB, GiB"
b := Bytes{
N: defaultValue,
valueString: fmt.Sprintf("%d", defaultValue),
}
flag.Var(&b, name, description)
return &b
}
// Bytes is a flag for holding size in bytes.
//
// It supports the following optional suffixes for values: KB, MB, GB, KiB, MiB, GiB.
type Bytes struct {
// N contains parsed value for the given flag.
N int
valueString string
}
// String implements flag.Value interface
func (b *Bytes) String() string {
return b.valueString
}
// Set implements flag.Value interface
func (b *Bytes) Set(value string) error {
value = normalizeBytesString(value)
switch {
case strings.HasSuffix(value, "KB"):
f, err := strconv.ParseFloat(value[:len(value)-2], 64)
if err != nil {
return err
}
b.N = int(f * 1000)
b.valueString = value
return nil
case strings.HasSuffix(value, "MB"):
f, err := strconv.ParseFloat(value[:len(value)-2], 64)
if err != nil {
return err
}
b.N = int(f * 1000 * 1000)
b.valueString = value
return nil
case strings.HasSuffix(value, "GB"):
f, err := strconv.ParseFloat(value[:len(value)-2], 64)
if err != nil {
return err
}
b.N = int(f * 1000 * 1000 * 1000)
b.valueString = value
return nil
case strings.HasSuffix(value, "KiB"):
f, err := strconv.ParseFloat(value[:len(value)-3], 64)
if err != nil {
return err
}
b.N = int(f * 1024)
b.valueString = value
return nil
case strings.HasSuffix(value, "MiB"):
f, err := strconv.ParseFloat(value[:len(value)-3], 64)
if err != nil {
return err
}
b.N = int(f * 1024 * 1024)
b.valueString = value
return nil
case strings.HasSuffix(value, "GiB"):
f, err := strconv.ParseFloat(value[:len(value)-3], 64)
if err != nil {
return err
}
b.N = int(f * 1024 * 1024 * 1024)
b.valueString = value
return nil
default:
f, err := strconv.ParseFloat(value, 64)
if err != nil {
return err
}
b.N = int(f)
b.valueString = value
return nil
}
}
func normalizeBytesString(s string) string {
s = strings.ToUpper(s)
return strings.ReplaceAll(s, "I", "i")
}

View file

@ -0,0 +1,54 @@
package flagutil
import (
"testing"
)
func TestBytesSetFailure(t *testing.T) {
f := func(value string) {
t.Helper()
var b Bytes
if err := b.Set(value); err == nil {
t.Fatalf("expecting non-nil error in b.Set(%q)", value)
}
}
f("")
f("foobar")
f("5foobar")
f("aKB")
f("134xMB")
f("2.43sdfGb")
f("aKiB")
f("134xMiB")
f("2.43sdfGIb")
}
func TestBytesSetSuccess(t *testing.T) {
f := func(value string, expectedResult int) {
t.Helper()
var b Bytes
if err := b.Set(value); err != nil {
t.Fatalf("unexpected error in b.Set(%q): %s", value, err)
}
if b.N != expectedResult {
t.Fatalf("unexpected result; got %d; want %d", b.N, expectedResult)
}
valueString := b.String()
valueExpected := normalizeBytesString(value)
if valueString != valueExpected {
t.Fatalf("unexpected valueString; got %q; want %q", valueString, valueExpected)
}
}
f("0", 0)
f("1", 1)
f("-1234", -1234)
f("123.456", 123)
f("1KiB", 1024)
f("1.5kib", 1.5*1024)
f("23MiB", 23*1024*1024)
f("5.25GiB", 5.25*1024*1024*1024)
f("1KB", 1000)
f("1.5kb", 1.5*1000)
f("23MB", 23*1000*1000)
f("5.25GB", 5.25*1000*1000*1000)
}

View file

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
) )
@ -13,7 +14,7 @@ var (
"See also -memory.allowedBytes. "+ "See also -memory.allowedBytes. "+
"Too low value may increase cache miss rate, which usually results in higher CPU and disk IO usage. "+ "Too low value may increase cache miss rate, which usually results in higher CPU and disk IO usage. "+
"Too high value may evict too much data from OS page cache, which will result in higher disk IO usage") "Too high value may evict too much data from OS page cache, which will result in higher disk IO usage")
allowedBytes = flag.Int("memory.allowedBytes", 0, "Allowed size of system memory VictoriaMetrics caches may occupy. "+ allowedBytes = flagutil.NewBytes("memory.allowedBytes", 0, "Allowed size of system memory VictoriaMetrics caches may occupy. "+
"This option overrides -memory.allowedPercent if set to non-zero value. "+ "This option overrides -memory.allowedPercent if set to non-zero value. "+
"Too low value may increase cache miss rate, which usually results in higher CPU and disk IO usage. "+ "Too low value may increase cache miss rate, which usually results in higher CPU and disk IO usage. "+
"Too high value may evict too much data from OS page cache, which will result in higher disk IO usage") "Too high value may evict too much data from OS page cache, which will result in higher disk IO usage")
@ -32,7 +33,7 @@ func initOnce() {
panic(fmt.Errorf("BUG: memory.Allowed must be called only after flag.Parse call")) panic(fmt.Errorf("BUG: memory.Allowed must be called only after flag.Parse call"))
} }
mem := sysTotalMemory() mem := sysTotalMemory()
if *allowedBytes <= 0 { if allowedBytes.N <= 0 {
if *allowedPercent < 1 || *allowedPercent > 200 { if *allowedPercent < 1 || *allowedPercent > 200 {
logger.Panicf("FATAL: -memory.allowedPercent must be in the range [1...200]; got %f", *allowedPercent) logger.Panicf("FATAL: -memory.allowedPercent must be in the range [1...200]; got %f", *allowedPercent)
} }
@ -41,9 +42,9 @@ func initOnce() {
remainingMemory = mem - allowedMemory remainingMemory = mem - allowedMemory
logger.Infof("limiting caches to %d bytes, leaving %d bytes to the OS according to -memory.allowedPercent=%f", allowedMemory, remainingMemory, *allowedPercent) logger.Infof("limiting caches to %d bytes, leaving %d bytes to the OS according to -memory.allowedPercent=%f", allowedMemory, remainingMemory, *allowedPercent)
} else { } else {
allowedMemory = *allowedBytes allowedMemory = allowedBytes.N
remainingMemory = mem - allowedMemory remainingMemory = mem - allowedMemory
logger.Infof("limiting caches to %d bytes, leaving %d bytes to the OS according to -memory.allowedBytes=%d", allowedMemory, remainingMemory, *allowedBytes) logger.Infof("limiting caches to %d bytes, leaving %d bytes to the OS according to -memory.allowedBytes=%s", allowedMemory, remainingMemory, allowedBytes.String())
} }
} }

View file

@ -7,12 +7,13 @@ import (
"strings" "strings"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/fasthttp" "github.com/VictoriaMetrics/fasthttp"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
var ( var (
maxScrapeSize = flag.Int("promscrape.maxScrapeSize", 16*1024*1024, "The maximum size of scrape response in bytes to process from Prometheus targets. "+ maxScrapeSize = flagutil.NewBytes("promscrape.maxScrapeSize", 16*1024*1024, "The maximum size of scrape response in bytes to process from Prometheus targets. "+
"Bigger responses are rejected") "Bigger responses are rejected")
disableCompression = flag.Bool("promscrape.disableCompression", false, "Whether to disable sending 'Accept-Encoding: gzip' request headers to all the scrape targets. "+ disableCompression = flag.Bool("promscrape.disableCompression", false, "Whether to disable sending 'Accept-Encoding: gzip' request headers to all the scrape targets. "+
"This may reduce CPU usage on scrape targets at the cost of higher network bandwidth utilization. "+ "This may reduce CPU usage on scrape targets at the cost of higher network bandwidth utilization. "+
@ -60,7 +61,7 @@ func newClient(sw *ScrapeWork) *client {
MaxIdleConnDuration: 2 * sw.ScrapeInterval, MaxIdleConnDuration: 2 * sw.ScrapeInterval,
ReadTimeout: sw.ScrapeTimeout, ReadTimeout: sw.ScrapeTimeout,
WriteTimeout: 10 * time.Second, WriteTimeout: 10 * time.Second,
MaxResponseBodySize: *maxScrapeSize, MaxResponseBodySize: maxScrapeSize.N,
MaxIdempotentRequestAttempts: 1, MaxIdempotentRequestAttempts: 1,
} }
return &client{ return &client{
@ -117,7 +118,7 @@ func (c *client) ReadData(dst []byte) ([]byte, error) {
} }
if err == fasthttp.ErrBodyTooLarge { if err == fasthttp.ErrBodyTooLarge {
return dst, fmt.Errorf("the response from %q exceeds -promscrape.maxScrapeSize=%d; "+ return dst, fmt.Errorf("the response from %q exceeds -promscrape.maxScrapeSize=%d; "+
"either reduce the response size for the target or increase -promscrape.maxScrapeSize", c.scrapeURL, *maxScrapeSize) "either reduce the response size for the target or increase -promscrape.maxScrapeSize", c.scrapeURL, maxScrapeSize.N)
} }
return dst, fmt.Errorf("error when scraping %q: %w", c.scrapeURL, err) return dst, fmt.Errorf("error when scraping %q: %w", c.scrapeURL, err)
} }

View file

@ -11,12 +11,13 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
var ( var (
maxInsertRequestSize = flag.Int("opentsdbhttp.maxInsertRequestSize", 32*1024*1024, "The maximum size of OpenTSDB HTTP put request") maxInsertRequestSize = flagutil.NewBytes("opentsdbhttp.maxInsertRequestSize", 32*1024*1024, "The maximum size of OpenTSDB HTTP put request")
trimTimestamp = flag.Duration("opentsdbhttpTrimTimestamp", time.Millisecond, "Trim timestamps for OpenTSDB HTTP data to this duration. "+ trimTimestamp = flag.Duration("opentsdbhttpTrimTimestamp", time.Millisecond, "Trim timestamps for OpenTSDB HTTP data to this duration. "+
"Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data") "Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data")
) )
@ -43,15 +44,15 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error {
defer putStreamContext(ctx) defer putStreamContext(ctx)
// Read the request in ctx.reqBuf // Read the request in ctx.reqBuf
lr := io.LimitReader(r, int64(*maxInsertRequestSize)+1) lr := io.LimitReader(r, int64(maxInsertRequestSize.N)+1)
reqLen, err := ctx.reqBuf.ReadFrom(lr) reqLen, err := ctx.reqBuf.ReadFrom(lr)
if err != nil { if err != nil {
readErrors.Inc() readErrors.Inc()
return fmt.Errorf("cannot read HTTP OpenTSDB request: %w", err) return fmt.Errorf("cannot read HTTP OpenTSDB request: %w", err)
} }
if reqLen > int64(*maxInsertRequestSize) { if reqLen > int64(maxInsertRequestSize.N) {
readErrors.Inc() readErrors.Inc()
return fmt.Errorf("too big HTTP OpenTSDB request; mustn't exceed `-opentsdbhttp.maxInsertRequestSize=%d` bytes", *maxInsertRequestSize) return fmt.Errorf("too big HTTP OpenTSDB request; mustn't exceed `-opentsdbhttp.maxInsertRequestSize=%d` bytes", maxInsertRequestSize.N)
} }
// Unmarshal the request to ctx.Rows // Unmarshal the request to ctx.Rows

View file

@ -1,7 +1,6 @@
package promremotewrite package promremotewrite
import ( import (
"flag"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
@ -9,12 +8,13 @@ import (
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
"github.com/golang/snappy" "github.com/golang/snappy"
) )
var maxInsertRequestSize = flag.Int("maxInsertRequestSize", 32*1024*1024, "The maximum size in bytes of a single Prometheus remote_write API request") var maxInsertRequestSize = flagutil.NewBytes("maxInsertRequestSize", 32*1024*1024, "The maximum size in bytes of a single Prometheus remote_write API request")
// ParseStream parses Prometheus remote_write message req and calls callback for the parsed timeseries. // ParseStream parses Prometheus remote_write message req and calls callback for the parsed timeseries.
// //
@ -93,15 +93,15 @@ var pushCtxPool sync.Pool
var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1)) var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1))
func readSnappy(dst []byte, r io.Reader) ([]byte, error) { func readSnappy(dst []byte, r io.Reader) ([]byte, error) {
lr := io.LimitReader(r, int64(*maxInsertRequestSize)+1) lr := io.LimitReader(r, int64(maxInsertRequestSize.N)+1)
bb := bodyBufferPool.Get() bb := bodyBufferPool.Get()
reqLen, err := bb.ReadFrom(lr) reqLen, err := bb.ReadFrom(lr)
if err != nil { if err != nil {
bodyBufferPool.Put(bb) bodyBufferPool.Put(bb)
return dst, fmt.Errorf("cannot read compressed request: %w", err) return dst, fmt.Errorf("cannot read compressed request: %w", err)
} }
if reqLen > int64(*maxInsertRequestSize) { if reqLen > int64(maxInsertRequestSize.N) {
return dst, fmt.Errorf("too big packed request; mustn't exceed `-maxInsertRequestSize=%d` bytes", *maxInsertRequestSize) return dst, fmt.Errorf("too big packed request; mustn't exceed `-maxInsertRequestSize=%d` bytes", maxInsertRequestSize.N)
} }
buf := dst[len(dst):cap(dst)] buf := dst[len(dst):cap(dst)]
@ -111,8 +111,8 @@ func readSnappy(dst []byte, r io.Reader) ([]byte, error) {
err = fmt.Errorf("cannot decompress request with length %d: %w", reqLen, err) err = fmt.Errorf("cannot decompress request with length %d: %w", reqLen, err)
return dst, err return dst, err
} }
if len(buf) > *maxInsertRequestSize { if len(buf) > maxInsertRequestSize.N {
return dst, fmt.Errorf("too big unpacked request; mustn't exceed `-maxInsertRequestSize=%d` bytes; got %d bytes", *maxInsertRequestSize, len(buf)) return dst, fmt.Errorf("too big unpacked request; mustn't exceed `-maxInsertRequestSize=%d` bytes; got %d bytes", maxInsertRequestSize.N, len(buf))
} }
if len(buf) > 0 && len(dst) < cap(dst) && &buf[0] == &dst[len(dst):cap(dst)][0] { if len(buf) > 0 && len(dst) < cap(dst) && &buf[0] == &dst[len(dst):cap(dst)][0] {
dst = dst[:len(dst)+len(buf)] dst = dst[:len(dst)+len(buf)]

View file

@ -1,7 +1,6 @@
package vmimport package vmimport
import ( import (
"flag"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
@ -9,11 +8,12 @@ import (
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
var maxLineLen = flag.Int("import.maxLineLen", 100*1024*1024, "The maximum length in bytes of a single line accepted by /api/v1/import") var maxLineLen = flagutil.NewBytes("import.maxLineLen", 100*1024*1024, "The maximum length in bytes of a single line accepted by /api/v1/import")
// ParseStream parses /api/v1/import lines from req and calls callback for the parsed rows. // ParseStream parses /api/v1/import lines from req and calls callback for the parsed rows.
// //
@ -46,7 +46,7 @@ func (ctx *streamContext) Read(r io.Reader) bool {
if ctx.err != nil { if ctx.err != nil {
return false return false
} }
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlockExt(r, ctx.reqBuf, ctx.tailBuf, *maxLineLen) ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlockExt(r, ctx.reqBuf, ctx.tailBuf, maxLineLen.N)
if ctx.err != nil { if ctx.err != nil {
if ctx.err != io.EOF { if ctx.err != io.EOF {
readErrors.Inc() readErrors.Inc()