diff --git a/app/vmagent/remotewrite/pendingseries.go b/app/vmagent/remotewrite/pendingseries.go index e2b5f4ba4..508e8932a 100644 --- a/app/vmagent/remotewrite/pendingseries.go +++ b/app/vmagent/remotewrite/pendingseries.go @@ -7,6 +7,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/metrics" @@ -17,7 +18,7 @@ var ( flushInterval = flag.Duration("remoteWrite.flushInterval", time.Second, "Interval for flushing the data to remote storage. "+ "Higher value reduces network bandwidth usage at the cost of delayed push of scraped data to remote storage. "+ "Minimum supported interval is 1 second") - maxUnpackedBlockSize = flag.Int("remoteWrite.maxBlockSize", 32*1024*1024, "The maximum size in bytes of unpacked request to send to remote storage. "+ + maxUnpackedBlockSize = flagutil.NewBytes("remoteWrite.maxBlockSize", 32*1024*1024, "The maximum size in bytes of unpacked request to send to remote storage. "+ "It shouldn't exceed -maxInsertRequestSize from VictoriaMetrics") ) @@ -164,7 +165,7 @@ func pushWriteRequest(wr *prompbmarshal.WriteRequest, pushBlock func(block []byt } bb := writeRequestBufPool.Get() bb.B = prompbmarshal.MarshalWriteRequest(bb.B[:0], wr) - if len(bb.B) <= *maxUnpackedBlockSize { + if len(bb.B) <= maxUnpackedBlockSize.N { zb := snappyBufPool.Get() zb.B = snappy.Encode(zb.B[:cap(zb.B)], bb.B) writeRequestBufPool.Put(bb) diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 555544345..383fe8c4c 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -27,7 +27,7 @@ var ( "isn't enough for sending high volume of collected data to remote storage") showRemoteWriteURL = flag.Bool("remoteWrite.showURL", false, "Whether to show -remoteWrite.url in the exported metrics. "+ "It is hidden by default, since it can contain sensitive info such as auth key") - maxPendingBytesPerURL = flag.Int("remoteWrite.maxDiskUsagePerURL", 0, "The maximum file-based buffer size in bytes at -remoteWrite.tmpDataPath "+ + maxPendingBytesPerURL = flagutil.NewBytes("remoteWrite.maxDiskUsagePerURL", 0, "The maximum file-based buffer size in bytes at -remoteWrite.tmpDataPath "+ "for each -remoteWrite.url. When buffer size reaches the configured maximum, then old data is dropped when adding new data to the buffer. "+ "Buffered data is stored in ~500MB chunks, so the minimum practical value for this flag is 500000000. "+ "Disk usage is unlimited if the value is set to 0") @@ -183,7 +183,7 @@ type remoteWriteCtx struct { func newRemoteWriteCtx(argIdx int, remoteWriteURL string, maxInmemoryBlocks int, urlLabelValue string) *remoteWriteCtx { h := xxhash.Sum64([]byte(remoteWriteURL)) path := fmt.Sprintf("%s/persistent-queue/%016X", *tmpDataPath, h) - fq := persistentqueue.MustOpenFastQueue(path, remoteWriteURL, maxInmemoryBlocks, *maxPendingBytesPerURL) + fq := persistentqueue.MustOpenFastQueue(path, remoteWriteURL, maxInmemoryBlocks, maxPendingBytesPerURL.N) _ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{path=%q, url=%q}`, path, urlLabelValue), func() float64 { return float64(fq.GetPendingBytes()) }) diff --git a/app/vmbackup/main.go b/app/vmbackup/main.go index 7a46ef9c9..ef446ad93 100644 --- a/app/vmbackup/main.go +++ b/app/vmbackup/main.go @@ -13,6 +13,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) @@ -28,7 +29,7 @@ var ( "-dst can point to the previous backup. In this case incremental backup is performed, i.e. only changed data is uploaded") origin = flag.String("origin", "", "Optional origin directory on the remote storage with old backup for server-side copying when performing full backup. This speeds up full backups") concurrency = flag.Int("concurrency", 10, "The number of concurrent workers. Higher concurrency may reduce backup duration") - maxBytesPerSecond = flag.Int("maxBytesPerSecond", 0, "The maximum upload speed. There is no limit if it is set to 0") + maxBytesPerSecond = flagutil.NewBytes("maxBytesPerSecond", 0, "The maximum upload speed. There is no limit if it is set to 0") ) func main() { @@ -126,7 +127,7 @@ func newSrcFS() (*fslocal.FS, error) { fs := &fslocal.FS{ Dir: snapshotPath, - MaxBytesPerSecond: *maxBytesPerSecond, + MaxBytesPerSecond: maxBytesPerSecond.N, } if err := fs.Init(); err != nil { return nil, fmt.Errorf("cannot initialize fs: %w", err) diff --git a/app/vmrestore/main.go b/app/vmrestore/main.go index 0f75d00da..b4d156b6a 100644 --- a/app/vmrestore/main.go +++ b/app/vmrestore/main.go @@ -11,6 +11,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) @@ -21,7 +22,7 @@ var ( "VictoriaMetrics must be stopped when restoring from backup. -storageDataPath dir can be non-empty. In this case the contents of -storageDataPath dir "+ "is synchronized with -src contents, i.e. it works like 'rsync --delete'") concurrency = flag.Int("concurrency", 10, "The number of concurrent workers. Higher concurrency may reduce restore duration") - maxBytesPerSecond = flag.Int("maxBytesPerSecond", 0, "The maximum download speed. There is no limit if it is set to 0") + maxBytesPerSecond = flagutil.NewBytes("maxBytesPerSecond", 0, "The maximum download speed. There is no limit if it is set to 0") skipBackupCompleteCheck = flag.Bool("skipBackupCompleteCheck", false, "Whether to skip checking for 'backup complete' file in -src. This may be useful for restoring from old backups, which were created without 'backup complete' file") ) @@ -71,7 +72,7 @@ func newDstFS() (*fslocal.FS, error) { } fs := &fslocal.FS{ Dir: *storageDataPath, - MaxBytesPerSecond: *maxBytesPerSecond, + MaxBytesPerSecond: maxBytesPerSecond.N, } if err := fs.Init(); err != nil { return nil, fmt.Errorf("cannot initialize local fs: %w", err) diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index 940be3792..72f38957a 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -15,6 +15,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" @@ -29,7 +30,7 @@ var ( "Too small value can result in incomplete last points for query results") maxExportDuration = flag.Duration("search.maxExportDuration", time.Hour*24*30, "The maximum duration for /api/v1/export call") maxQueryDuration = flag.Duration("search.maxQueryDuration", time.Second*30, "The maximum duration for search query execution") - maxQueryLen = flag.Int("search.maxQueryLen", 16*1024, "The maximum search query length in bytes") + maxQueryLen = flagutil.NewBytes("search.maxQueryLen", 16*1024, "The maximum search query length in bytes") maxLookback = flag.Duration("search.maxLookback", 0, "Synonim to -search.lookback-delta from Prometheus. "+ "The value is dynamically detected from interval between time series datapoints if not set. It can be overridden on per-query basis via max_lookback arg. "+ "See also '-search.maxStalenessInterval' flag, which has the same meaining due to historical reasons") @@ -648,8 +649,8 @@ func QueryHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) e } deadline := getDeadlineForQuery(r, startTime) - if len(query) > *maxQueryLen { - return fmt.Errorf("too long query; got %d bytes; mustn't exceed `-search.maxQueryLen=%d` bytes", len(query), *maxQueryLen) + if len(query) > maxQueryLen.N { + return fmt.Errorf("too long query; got %d bytes; mustn't exceed `-search.maxQueryLen=%d` bytes", len(query), maxQueryLen.N) } queryOffset := getLatencyOffsetMilliseconds() if !getBool(r, "nocache") && ct-start < queryOffset { @@ -773,8 +774,8 @@ func queryRangeHandler(startTime time.Time, w http.ResponseWriter, query string, } // Validate input args. - if len(query) > *maxQueryLen { - return fmt.Errorf("too long query; got %d bytes; mustn't exceed `-search.maxQueryLen=%d` bytes", len(query), *maxQueryLen) + if len(query) > maxQueryLen.N { + return fmt.Errorf("too long query; got %d bytes; mustn't exceed `-search.maxQueryLen=%d` bytes", len(query), maxQueryLen.N) } if start > end { end = start + defaultStep diff --git a/lib/flagutil/array.go b/lib/flagutil/array.go index ebd81a20b..d4454795f 100644 --- a/lib/flagutil/array.go +++ b/lib/flagutil/array.go @@ -9,9 +9,9 @@ import ( // NewArray returns new Array with the given name and description. func NewArray(name, description string) *Array { - var a Array description += "\nSupports `array` of values separated by comma" + " or specified via multiple flags." + var a Array flag.Var(&a, name, description) return &a } diff --git a/lib/flagutil/bytes.go b/lib/flagutil/bytes.go new file mode 100644 index 000000000..08f012fc0 --- /dev/null +++ b/lib/flagutil/bytes.go @@ -0,0 +1,102 @@ +package flagutil + +import ( + "flag" + "fmt" + "strconv" + "strings" +) + +// NewBytes returns new `bytes` flag with the given name, defaultValue and description. +func NewBytes(name string, defaultValue int, description string) *Bytes { + description += "\nSupports the following optional suffixes for values: KB, MB, GB, KiB, MiB, GiB" + b := Bytes{ + N: defaultValue, + valueString: fmt.Sprintf("%d", defaultValue), + } + flag.Var(&b, name, description) + return &b +} + +// Bytes is a flag for holding size in bytes. +// +// It supports the following optional suffixes for values: KB, MB, GB, KiB, MiB, GiB. +type Bytes struct { + // N contains parsed value for the given flag. + N int + + valueString string +} + +// String implements flag.Value interface +func (b *Bytes) String() string { + return b.valueString +} + +// Set implements flag.Value interface +func (b *Bytes) Set(value string) error { + value = normalizeBytesString(value) + switch { + case strings.HasSuffix(value, "KB"): + f, err := strconv.ParseFloat(value[:len(value)-2], 64) + if err != nil { + return err + } + b.N = int(f * 1000) + b.valueString = value + return nil + case strings.HasSuffix(value, "MB"): + f, err := strconv.ParseFloat(value[:len(value)-2], 64) + if err != nil { + return err + } + b.N = int(f * 1000 * 1000) + b.valueString = value + return nil + case strings.HasSuffix(value, "GB"): + f, err := strconv.ParseFloat(value[:len(value)-2], 64) + if err != nil { + return err + } + b.N = int(f * 1000 * 1000 * 1000) + b.valueString = value + return nil + case strings.HasSuffix(value, "KiB"): + f, err := strconv.ParseFloat(value[:len(value)-3], 64) + if err != nil { + return err + } + b.N = int(f * 1024) + b.valueString = value + return nil + case strings.HasSuffix(value, "MiB"): + f, err := strconv.ParseFloat(value[:len(value)-3], 64) + if err != nil { + return err + } + b.N = int(f * 1024 * 1024) + b.valueString = value + return nil + case strings.HasSuffix(value, "GiB"): + f, err := strconv.ParseFloat(value[:len(value)-3], 64) + if err != nil { + return err + } + b.N = int(f * 1024 * 1024 * 1024) + b.valueString = value + return nil + default: + f, err := strconv.ParseFloat(value, 64) + if err != nil { + return err + } + b.N = int(f) + b.valueString = value + return nil + } +} + +func normalizeBytesString(s string) string { + s = strings.ToUpper(s) + return strings.ReplaceAll(s, "I", "i") +} diff --git a/lib/flagutil/bytes_test.go b/lib/flagutil/bytes_test.go new file mode 100644 index 000000000..c3a719729 --- /dev/null +++ b/lib/flagutil/bytes_test.go @@ -0,0 +1,54 @@ +package flagutil + +import ( + "testing" +) + +func TestBytesSetFailure(t *testing.T) { + f := func(value string) { + t.Helper() + var b Bytes + if err := b.Set(value); err == nil { + t.Fatalf("expecting non-nil error in b.Set(%q)", value) + } + } + f("") + f("foobar") + f("5foobar") + f("aKB") + f("134xMB") + f("2.43sdfGb") + f("aKiB") + f("134xMiB") + f("2.43sdfGIb") +} + +func TestBytesSetSuccess(t *testing.T) { + f := func(value string, expectedResult int) { + t.Helper() + var b Bytes + if err := b.Set(value); err != nil { + t.Fatalf("unexpected error in b.Set(%q): %s", value, err) + } + if b.N != expectedResult { + t.Fatalf("unexpected result; got %d; want %d", b.N, expectedResult) + } + valueString := b.String() + valueExpected := normalizeBytesString(value) + if valueString != valueExpected { + t.Fatalf("unexpected valueString; got %q; want %q", valueString, valueExpected) + } + } + f("0", 0) + f("1", 1) + f("-1234", -1234) + f("123.456", 123) + f("1KiB", 1024) + f("1.5kib", 1.5*1024) + f("23MiB", 23*1024*1024) + f("5.25GiB", 5.25*1024*1024*1024) + f("1KB", 1000) + f("1.5kb", 1.5*1000) + f("23MB", 23*1000*1000) + f("5.25GB", 5.25*1000*1000*1000) +} diff --git a/lib/memory/memory.go b/lib/memory/memory.go index b7ce68fbc..aeab21812 100644 --- a/lib/memory/memory.go +++ b/lib/memory/memory.go @@ -5,6 +5,7 @@ import ( "fmt" "sync" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) @@ -13,7 +14,7 @@ var ( "See also -memory.allowedBytes. "+ "Too low value may increase cache miss rate, which usually results in higher CPU and disk IO usage. "+ "Too high value may evict too much data from OS page cache, which will result in higher disk IO usage") - allowedBytes = flag.Int("memory.allowedBytes", 0, "Allowed size of system memory VictoriaMetrics caches may occupy. "+ + allowedBytes = flagutil.NewBytes("memory.allowedBytes", 0, "Allowed size of system memory VictoriaMetrics caches may occupy. "+ "This option overrides -memory.allowedPercent if set to non-zero value. "+ "Too low value may increase cache miss rate, which usually results in higher CPU and disk IO usage. "+ "Too high value may evict too much data from OS page cache, which will result in higher disk IO usage") @@ -32,7 +33,7 @@ func initOnce() { panic(fmt.Errorf("BUG: memory.Allowed must be called only after flag.Parse call")) } mem := sysTotalMemory() - if *allowedBytes <= 0 { + if allowedBytes.N <= 0 { if *allowedPercent < 1 || *allowedPercent > 200 { logger.Panicf("FATAL: -memory.allowedPercent must be in the range [1...200]; got %f", *allowedPercent) } @@ -41,9 +42,9 @@ func initOnce() { remainingMemory = mem - allowedMemory logger.Infof("limiting caches to %d bytes, leaving %d bytes to the OS according to -memory.allowedPercent=%f", allowedMemory, remainingMemory, *allowedPercent) } else { - allowedMemory = *allowedBytes + allowedMemory = allowedBytes.N remainingMemory = mem - allowedMemory - logger.Infof("limiting caches to %d bytes, leaving %d bytes to the OS according to -memory.allowedBytes=%d", allowedMemory, remainingMemory, *allowedBytes) + logger.Infof("limiting caches to %d bytes, leaving %d bytes to the OS according to -memory.allowedBytes=%s", allowedMemory, remainingMemory, allowedBytes.String()) } } diff --git a/lib/promscrape/client.go b/lib/promscrape/client.go index d8a9701da..17894f2bb 100644 --- a/lib/promscrape/client.go +++ b/lib/promscrape/client.go @@ -7,12 +7,13 @@ import ( "strings" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/fasthttp" "github.com/VictoriaMetrics/metrics" ) var ( - maxScrapeSize = flag.Int("promscrape.maxScrapeSize", 16*1024*1024, "The maximum size of scrape response in bytes to process from Prometheus targets. "+ + maxScrapeSize = flagutil.NewBytes("promscrape.maxScrapeSize", 16*1024*1024, "The maximum size of scrape response in bytes to process from Prometheus targets. "+ "Bigger responses are rejected") disableCompression = flag.Bool("promscrape.disableCompression", false, "Whether to disable sending 'Accept-Encoding: gzip' request headers to all the scrape targets. "+ "This may reduce CPU usage on scrape targets at the cost of higher network bandwidth utilization. "+ @@ -60,7 +61,7 @@ func newClient(sw *ScrapeWork) *client { MaxIdleConnDuration: 2 * sw.ScrapeInterval, ReadTimeout: sw.ScrapeTimeout, WriteTimeout: 10 * time.Second, - MaxResponseBodySize: *maxScrapeSize, + MaxResponseBodySize: maxScrapeSize.N, MaxIdempotentRequestAttempts: 1, } return &client{ @@ -117,7 +118,7 @@ func (c *client) ReadData(dst []byte) ([]byte, error) { } if err == fasthttp.ErrBodyTooLarge { return dst, fmt.Errorf("the response from %q exceeds -promscrape.maxScrapeSize=%d; "+ - "either reduce the response size for the target or increase -promscrape.maxScrapeSize", c.scrapeURL, *maxScrapeSize) + "either reduce the response size for the target or increase -promscrape.maxScrapeSize", c.scrapeURL, maxScrapeSize.N) } return dst, fmt.Errorf("error when scraping %q: %w", c.scrapeURL, err) } diff --git a/lib/protoparser/opentsdbhttp/streamparser.go b/lib/protoparser/opentsdbhttp/streamparser.go index c9713f7df..3ba3bb11a 100644 --- a/lib/protoparser/opentsdbhttp/streamparser.go +++ b/lib/protoparser/opentsdbhttp/streamparser.go @@ -11,12 +11,13 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/metrics" ) var ( - maxInsertRequestSize = flag.Int("opentsdbhttp.maxInsertRequestSize", 32*1024*1024, "The maximum size of OpenTSDB HTTP put request") + maxInsertRequestSize = flagutil.NewBytes("opentsdbhttp.maxInsertRequestSize", 32*1024*1024, "The maximum size of OpenTSDB HTTP put request") trimTimestamp = flag.Duration("opentsdbhttpTrimTimestamp", time.Millisecond, "Trim timestamps for OpenTSDB HTTP data to this duration. "+ "Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data") ) @@ -43,15 +44,15 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error { defer putStreamContext(ctx) // Read the request in ctx.reqBuf - lr := io.LimitReader(r, int64(*maxInsertRequestSize)+1) + lr := io.LimitReader(r, int64(maxInsertRequestSize.N)+1) reqLen, err := ctx.reqBuf.ReadFrom(lr) if err != nil { readErrors.Inc() return fmt.Errorf("cannot read HTTP OpenTSDB request: %w", err) } - if reqLen > int64(*maxInsertRequestSize) { + if reqLen > int64(maxInsertRequestSize.N) { readErrors.Inc() - return fmt.Errorf("too big HTTP OpenTSDB request; mustn't exceed `-opentsdbhttp.maxInsertRequestSize=%d` bytes", *maxInsertRequestSize) + return fmt.Errorf("too big HTTP OpenTSDB request; mustn't exceed `-opentsdbhttp.maxInsertRequestSize=%d` bytes", maxInsertRequestSize.N) } // Unmarshal the request to ctx.Rows diff --git a/lib/protoparser/promremotewrite/streamparser.go b/lib/protoparser/promremotewrite/streamparser.go index 19e75d2e2..265e5df70 100644 --- a/lib/protoparser/promremotewrite/streamparser.go +++ b/lib/protoparser/promremotewrite/streamparser.go @@ -1,7 +1,6 @@ package promremotewrite import ( - "flag" "fmt" "io" "net/http" @@ -9,12 +8,13 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" "github.com/VictoriaMetrics/metrics" "github.com/golang/snappy" ) -var maxInsertRequestSize = flag.Int("maxInsertRequestSize", 32*1024*1024, "The maximum size in bytes of a single Prometheus remote_write API request") +var maxInsertRequestSize = flagutil.NewBytes("maxInsertRequestSize", 32*1024*1024, "The maximum size in bytes of a single Prometheus remote_write API request") // ParseStream parses Prometheus remote_write message req and calls callback for the parsed timeseries. // @@ -93,15 +93,15 @@ var pushCtxPool sync.Pool var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1)) func readSnappy(dst []byte, r io.Reader) ([]byte, error) { - lr := io.LimitReader(r, int64(*maxInsertRequestSize)+1) + lr := io.LimitReader(r, int64(maxInsertRequestSize.N)+1) bb := bodyBufferPool.Get() reqLen, err := bb.ReadFrom(lr) if err != nil { bodyBufferPool.Put(bb) return dst, fmt.Errorf("cannot read compressed request: %w", err) } - if reqLen > int64(*maxInsertRequestSize) { - return dst, fmt.Errorf("too big packed request; mustn't exceed `-maxInsertRequestSize=%d` bytes", *maxInsertRequestSize) + if reqLen > int64(maxInsertRequestSize.N) { + return dst, fmt.Errorf("too big packed request; mustn't exceed `-maxInsertRequestSize=%d` bytes", maxInsertRequestSize.N) } buf := dst[len(dst):cap(dst)] @@ -111,8 +111,8 @@ func readSnappy(dst []byte, r io.Reader) ([]byte, error) { err = fmt.Errorf("cannot decompress request with length %d: %w", reqLen, err) return dst, err } - if len(buf) > *maxInsertRequestSize { - return dst, fmt.Errorf("too big unpacked request; mustn't exceed `-maxInsertRequestSize=%d` bytes; got %d bytes", *maxInsertRequestSize, len(buf)) + if len(buf) > maxInsertRequestSize.N { + return dst, fmt.Errorf("too big unpacked request; mustn't exceed `-maxInsertRequestSize=%d` bytes; got %d bytes", maxInsertRequestSize.N, len(buf)) } if len(buf) > 0 && len(dst) < cap(dst) && &buf[0] == &dst[len(dst):cap(dst)][0] { dst = dst[:len(dst)+len(buf)] diff --git a/lib/protoparser/vmimport/streamparser.go b/lib/protoparser/vmimport/streamparser.go index eb8c450fb..9cfc0be24 100644 --- a/lib/protoparser/vmimport/streamparser.go +++ b/lib/protoparser/vmimport/streamparser.go @@ -1,7 +1,6 @@ package vmimport import ( - "flag" "fmt" "io" "net/http" @@ -9,11 +8,12 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/metrics" ) -var maxLineLen = flag.Int("import.maxLineLen", 100*1024*1024, "The maximum length in bytes of a single line accepted by /api/v1/import") +var maxLineLen = flagutil.NewBytes("import.maxLineLen", 100*1024*1024, "The maximum length in bytes of a single line accepted by /api/v1/import") // ParseStream parses /api/v1/import lines from req and calls callback for the parsed rows. // @@ -46,7 +46,7 @@ func (ctx *streamContext) Read(r io.Reader) bool { if ctx.err != nil { return false } - ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlockExt(r, ctx.reqBuf, ctx.tailBuf, *maxLineLen) + ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlockExt(r, ctx.reqBuf, ctx.tailBuf, maxLineLen.N) if ctx.err != nil { if ctx.err != io.EOF { readErrors.Inc()