From 1a88fe5b1f5f5195c20bd5a45810f59567360365 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 14 Dec 2022 19:26:24 -0800 Subject: [PATCH] lib/flagutil/bytes.go: properly handle values bigger than 2GiB on 32-bit architectures This fixes handling of values bigger than 2GiB for the following command-line flags: - -storage.minFreeDiskSpaceBytes - -remoteWrite.maxDiskUsagePerURL --- app/vmagent/remotewrite/pendingseries.go | 2 +- app/vmbackup/main.go | 2 +- app/vmrestore/main.go | 2 +- app/vmselect/prometheus/prometheus.go | 4 +-- app/vmstorage/main.go | 8 ++--- docs/CHANGELOG.md | 2 ++ lib/flagutil/array.go | 2 +- lib/flagutil/array_test.go | 16 ++++----- lib/flagutil/bytes.go | 34 +++++++++++++------ lib/flagutil/bytes_test.go | 2 +- lib/memory/memory.go | 4 +-- lib/persistentqueue/fastqueue.go | 2 +- lib/persistentqueue/persistentqueue.go | 2 +- lib/promscrape/client.go | 4 +-- lib/promscrape/scrapework.go | 4 +-- lib/protoparser/influx/streamparser.go | 2 +- .../promremotewrite/streamparser.go | 2 +- lib/protoparser/vmimport/streamparser.go | 2 +- lib/storage/storage.go | 2 +- 19 files changed, 56 insertions(+), 42 deletions(-) diff --git a/app/vmagent/remotewrite/pendingseries.go b/app/vmagent/remotewrite/pendingseries.go index 5b9f6548b..37627f44d 100644 --- a/app/vmagent/remotewrite/pendingseries.go +++ b/app/vmagent/remotewrite/pendingseries.go @@ -195,7 +195,7 @@ func pushWriteRequest(wr *prompbmarshal.WriteRequest, pushBlock func(block []byt } bb := writeRequestBufPool.Get() bb.B = prompbmarshal.MarshalWriteRequest(bb.B[:0], wr) - if len(bb.B) <= maxUnpackedBlockSize.N { + if len(bb.B) <= maxUnpackedBlockSize.IntN() { zb := snappyBufPool.Get() zb.B = snappy.Encode(zb.B[:cap(zb.B)], bb.B) writeRequestBufPool.Put(bb) diff --git a/app/vmbackup/main.go b/app/vmbackup/main.go index 57771ca64..3759e92ff 100644 --- a/app/vmbackup/main.go +++ b/app/vmbackup/main.go @@ -145,7 +145,7 @@ func newSrcFS() (*fslocal.FS, error) { fs := &fslocal.FS{ Dir: snapshotPath, - MaxBytesPerSecond: maxBytesPerSecond.N, + MaxBytesPerSecond: maxBytesPerSecond.IntN(), } if err := fs.Init(); err != nil { return nil, fmt.Errorf("cannot initialize fs: %w", err) diff --git a/app/vmrestore/main.go b/app/vmrestore/main.go index 66f72f335..b7510c762 100644 --- a/app/vmrestore/main.go +++ b/app/vmrestore/main.go @@ -83,7 +83,7 @@ func newDstFS() (*fslocal.FS, error) { } fs := &fslocal.FS{ Dir: *storageDataPath, - MaxBytesPerSecond: maxBytesPerSecond.N, + MaxBytesPerSecond: maxBytesPerSecond.IntN(), } if err := fs.Init(); err != nil { return nil, fmt.Errorf("cannot initialize local fs: %w", err) diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index c1c59e2c5..1dc7f03df 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -776,7 +776,7 @@ func QueryHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Token, w step = defaultStep } - if len(query) > maxQueryLen.N { + if len(query) > maxQueryLen.IntN() { return fmt.Errorf("too long query; got %d bytes; mustn't exceed `-search.maxQueryLen=%d` bytes", len(query), maxQueryLen.N) } etfs, err := searchutils.GetExtraTagFilters(r) @@ -927,7 +927,7 @@ func queryRangeHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Tok } // Validate input args. - if len(query) > maxQueryLen.N { + if len(query) > maxQueryLen.IntN() { return fmt.Errorf("too long query; got %d bytes; mustn't exceed `-search.maxQueryLen=%d` bytes", len(query), maxQueryLen.N) } if start > end { diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index d96c90e76..f16f0591f 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -83,10 +83,10 @@ func main() { storage.SetMergeWorkersCount(*smallMergeConcurrency) storage.SetRetentionTimezoneOffset(*retentionTimezoneOffset) storage.SetFreeDiskSpaceLimit(minFreeDiskSpaceBytes.N) - storage.SetTSIDCacheSize(cacheSizeStorageTSID.N) - storage.SetTagFiltersCacheSize(cacheSizeIndexDBTagFilters.N) - mergeset.SetIndexBlocksCacheSize(cacheSizeIndexDBIndexBlocks.N) - mergeset.SetDataBlocksCacheSize(cacheSizeIndexDBDataBlocks.N) + storage.SetTSIDCacheSize(cacheSizeStorageTSID.IntN()) + storage.SetTagFiltersCacheSize(cacheSizeIndexDBTagFilters.IntN()) + mergeset.SetIndexBlocksCacheSize(cacheSizeIndexDBIndexBlocks.IntN()) + mergeset.SetDataBlocksCacheSize(cacheSizeIndexDBDataBlocks.IntN()) if retentionPeriod.Msecs < 24*3600*1000 { logger.Fatalf("-retentionPeriod cannot be smaller than a day; got %s", retentionPeriod) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index b71adbb92..f5f5e399b 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -15,6 +15,8 @@ The following tip changes can be tested by building VictoriaMetrics components f ## tip +* BUGFIX: allow specifying values bigger than 2GiB to the following command-line flag values on 32-bit architectures (`386` and `arm`): `-storage.minFreeDiskSpaceBytes` and `-remoteWrite.maxDiskUsagePerURL`. Previously values bigger than 2GiB were incorrectly truncated on these architectures. + ## [v1.85.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.85.1) diff --git a/lib/flagutil/array.go b/lib/flagutil/array.go index 661fa45ea..be81f09f7 100644 --- a/lib/flagutil/array.go +++ b/lib/flagutil/array.go @@ -308,7 +308,7 @@ func (a *ArrayBytes) Set(value string) error { } // GetOptionalArgOrDefault returns optional arg under the given argIdx. -func (a *ArrayBytes) GetOptionalArgOrDefault(argIdx, defaultValue int) int { +func (a *ArrayBytes) GetOptionalArgOrDefault(argIdx int, defaultValue int64) int64 { x := *a if argIdx < len(x) { return x[argIdx].N diff --git a/lib/flagutil/array_test.go b/lib/flagutil/array_test.go index a4e8dad03..0b0811d92 100644 --- a/lib/flagutil/array_test.go +++ b/lib/flagutil/array_test.go @@ -265,8 +265,8 @@ func TestArrayInt_String(t *testing.T) { } func TestArrayBytes(t *testing.T) { - expected := []int{10000000, 23, 10240} - result := make([]int, len(fooFlagBytes)) + expected := []int64{10000000, 23, 10240} + result := make([]int64, len(fooFlagBytes)) for i, b := range fooFlagBytes { result[i] = b.N } @@ -276,11 +276,11 @@ func TestArrayBytes(t *testing.T) { } func TestArrayBytes_Set(t *testing.T) { - f := func(s string, expectedValues []int) { + f := func(s string, expectedValues []int64) { t.Helper() var a ArrayBytes _ = a.Set(s) - values := make([]int, len(a)) + values := make([]int64, len(a)) for i, v := range a { values[i] = v.N } @@ -288,13 +288,13 @@ func TestArrayBytes_Set(t *testing.T) { t.Fatalf("unexpected values parsed;\ngot\n%d\nwant\n%d", values, expectedValues) } } - f("", []int{}) - f(`1`, []int{1}) - f(`-2,3,10kb`, []int{-2, 3, 10000}) + f("", []int64{}) + f(`1`, []int64{1}) + f(`-2,3,10kb`, []int64{-2, 3, 10000}) } func TestArrayBytes_GetOptionalArg(t *testing.T) { - f := func(s string, argIdx, defaultValue, expectedValue int) { + f := func(s string, argIdx int, defaultValue, expectedValue int64) { t.Helper() var a ArrayBytes _ = a.Set(s) diff --git a/lib/flagutil/bytes.go b/lib/flagutil/bytes.go index 8b3fcdd8a..8b86c9c13 100644 --- a/lib/flagutil/bytes.go +++ b/lib/flagutil/bytes.go @@ -3,12 +3,13 @@ package flagutil import ( "flag" "fmt" + "math" "strconv" "strings" ) // NewBytes returns new `bytes` flag with the given name, defaultValue and description. -func NewBytes(name string, defaultValue int, description string) *Bytes { +func NewBytes(name string, defaultValue int64, description string) *Bytes { description += "\nSupports the following optional suffixes for `size` values: KB, MB, GB, TB, KiB, MiB, GiB, TiB" b := Bytes{ N: defaultValue, @@ -23,11 +24,22 @@ func NewBytes(name string, defaultValue int, description string) *Bytes { // It supports the following optional suffixes for values: KB, MB, GB, TB, KiB, MiB, GiB, TiB. type Bytes struct { // N contains parsed value for the given flag. - N int + N int64 valueString string } +// IntN returns the stored value capped by int type. +func (b *Bytes) IntN() int { + if b.N > math.MaxInt { + return math.MaxInt + } + if b.N < math.MinInt { + return math.MinInt + } + return int(b.N) +} + // String implements flag.Value interface func (b *Bytes) String() string { return b.valueString @@ -42,7 +54,7 @@ func (b *Bytes) Set(value string) error { if err != nil { return err } - b.N = int(f * 1000) + b.N = int64(f * 1000) b.valueString = value return nil case strings.HasSuffix(value, "MB"): @@ -50,7 +62,7 @@ func (b *Bytes) Set(value string) error { if err != nil { return err } - b.N = int(f * 1000 * 1000) + b.N = int64(f * 1000 * 1000) b.valueString = value return nil case strings.HasSuffix(value, "GB"): @@ -58,7 +70,7 @@ func (b *Bytes) Set(value string) error { if err != nil { return err } - b.N = int(f * 1000 * 1000 * 1000) + b.N = int64(f * 1000 * 1000 * 1000) b.valueString = value return nil case strings.HasSuffix(value, "TB"): @@ -66,7 +78,7 @@ func (b *Bytes) Set(value string) error { if err != nil { return err } - b.N = int(f * 1000 * 1000 * 1000 * 1000) + b.N = int64(f * 1000 * 1000 * 1000 * 1000) b.valueString = value return nil case strings.HasSuffix(value, "KiB"): @@ -74,7 +86,7 @@ func (b *Bytes) Set(value string) error { if err != nil { return err } - b.N = int(f * 1024) + b.N = int64(f * 1024) b.valueString = value return nil case strings.HasSuffix(value, "MiB"): @@ -82,7 +94,7 @@ func (b *Bytes) Set(value string) error { if err != nil { return err } - b.N = int(f * 1024 * 1024) + b.N = int64(f * 1024 * 1024) b.valueString = value return nil case strings.HasSuffix(value, "GiB"): @@ -90,7 +102,7 @@ func (b *Bytes) Set(value string) error { if err != nil { return err } - b.N = int(f * 1024 * 1024 * 1024) + b.N = int64(f * 1024 * 1024 * 1024) b.valueString = value return nil case strings.HasSuffix(value, "TiB"): @@ -98,7 +110,7 @@ func (b *Bytes) Set(value string) error { if err != nil { return err } - b.N = int(f * 1024 * 1024 * 1024 * 1024) + b.N = int64(f * 1024 * 1024 * 1024 * 1024) b.valueString = value return nil default: @@ -106,7 +118,7 @@ func (b *Bytes) Set(value string) error { if err != nil { return err } - b.N = int(f) + b.N = int64(f) b.valueString = value return nil } diff --git a/lib/flagutil/bytes_test.go b/lib/flagutil/bytes_test.go index 2b0fc7e1c..4396b239b 100644 --- a/lib/flagutil/bytes_test.go +++ b/lib/flagutil/bytes_test.go @@ -24,7 +24,7 @@ func TestBytesSetFailure(t *testing.T) { } func TestBytesSetSuccess(t *testing.T) { - f := func(value string, expectedResult int) { + f := func(value string, expectedResult int64) { t.Helper() var b Bytes if err := b.Set(value); err != nil { diff --git a/lib/memory/memory.go b/lib/memory/memory.go index d403d4a48..9c93fac63 100644 --- a/lib/memory/memory.go +++ b/lib/memory/memory.go @@ -34,14 +34,14 @@ func initOnce() { memoryLimit = sysTotalMemory() if allowedBytes.N <= 0 { if *allowedPercent < 1 || *allowedPercent > 200 { - logger.Panicf("FATAL: -memory.allowedPercent must be in the range [1...200]; got %g", *allowedPercent) + logger.Fatalf("FATAL: -memory.allowedPercent must be in the range [1...200]; got %g", *allowedPercent) } percent := *allowedPercent / 100 allowedMemory = int(float64(memoryLimit) * percent) remainingMemory = memoryLimit - allowedMemory logger.Infof("limiting caches to %d bytes, leaving %d bytes to the OS according to -memory.allowedPercent=%g", allowedMemory, remainingMemory, *allowedPercent) } else { - allowedMemory = allowedBytes.N + allowedMemory = allowedBytes.IntN() remainingMemory = memoryLimit - allowedMemory logger.Infof("limiting caches to %d bytes, leaving %d bytes to the OS according to -memory.allowedBytes=%s", allowedMemory, remainingMemory, allowedBytes.String()) } diff --git a/lib/persistentqueue/fastqueue.go b/lib/persistentqueue/fastqueue.go index 957e1ad71..1b85b2235 100644 --- a/lib/persistentqueue/fastqueue.go +++ b/lib/persistentqueue/fastqueue.go @@ -41,7 +41,7 @@ type FastQueue struct { // if maxPendingBytes is 0, then the queue size is unlimited. // Otherwise its size is limited by maxPendingBytes. The oldest data is dropped when the queue // reaches maxPendingSize. -func MustOpenFastQueue(path, name string, maxInmemoryBlocks, maxPendingBytes int) *FastQueue { +func MustOpenFastQueue(path, name string, maxInmemoryBlocks int, maxPendingBytes int64) *FastQueue { pq := mustOpen(path, name, maxPendingBytes) fq := &FastQueue{ pq: pq, diff --git a/lib/persistentqueue/persistentqueue.go b/lib/persistentqueue/persistentqueue.go index dea3dc216..bcca20d28 100644 --- a/lib/persistentqueue/persistentqueue.go +++ b/lib/persistentqueue/persistentqueue.go @@ -123,7 +123,7 @@ func (q *queue) GetPendingBytes() uint64 { // // If maxPendingBytes is greater than 0, then the max queue size is limited by this value. // The oldest data is deleted when queue size exceeds maxPendingBytes. -func mustOpen(path, name string, maxPendingBytes int) *queue { +func mustOpen(path, name string, maxPendingBytes int64) *queue { if maxPendingBytes < 0 { maxPendingBytes = 0 } diff --git a/lib/promscrape/client.go b/lib/promscrape/client.go index d6d4aac9f..d809bbaab 100644 --- a/lib/promscrape/client.go +++ b/lib/promscrape/client.go @@ -128,9 +128,9 @@ func newClient(sw *ScrapeWork) *client { MaxIdleConnDuration: 2 * sw.ScrapeInterval, ReadTimeout: sw.ScrapeTimeout, WriteTimeout: 10 * time.Second, - MaxResponseBodySize: maxScrapeSize.N, + MaxResponseBodySize: maxScrapeSize.IntN(), MaxIdempotentRequestAttempts: 1, - ReadBufferSize: maxResponseHeadersSize.N, + ReadBufferSize: maxResponseHeadersSize.IntN(), } var sc *http.Client var proxyURLFunc func(*http.Request) (*url.URL, error) diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index 340afddde..2a5f4dcc8 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -235,7 +235,7 @@ func (sw *scrapeWork) loadLastScrape() string { } func (sw *scrapeWork) storeLastScrape(lastScrape []byte) { - mustCompress := minResponseSizeForStreamParse.N > 0 && len(lastScrape) >= minResponseSizeForStreamParse.N + mustCompress := minResponseSizeForStreamParse.N > 0 && len(lastScrape) >= minResponseSizeForStreamParse.IntN() if mustCompress { sw.lastScrapeCompressed = encoding.CompressZSTDLevel(sw.lastScrapeCompressed[:0], lastScrape, 1) sw.lastScrape = nil @@ -384,7 +384,7 @@ func (sw *scrapeWork) mustSwitchToStreamParseMode(responseSize int) bool { if minResponseSizeForStreamParse.N <= 0 { return false } - return sw.Config.canSwitchToStreamParseMode() && responseSize >= minResponseSizeForStreamParse.N + return sw.Config.canSwitchToStreamParseMode() && responseSize >= minResponseSizeForStreamParse.IntN() } // getTargetResponse() fetches response from sw target in the same way as when scraping the target. diff --git a/lib/protoparser/influx/streamparser.go b/lib/protoparser/influx/streamparser.go index a3820f62a..2f21309a5 100644 --- a/lib/protoparser/influx/streamparser.go +++ b/lib/protoparser/influx/streamparser.go @@ -76,7 +76,7 @@ func (ctx *streamContext) Read() bool { if ctx.err != nil || ctx.hasCallbackError() { return false } - ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlockExt(ctx.br, ctx.reqBuf, ctx.tailBuf, maxLineSize.N) + ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlockExt(ctx.br, ctx.reqBuf, ctx.tailBuf, maxLineSize.IntN()) if ctx.err != nil { if ctx.err != io.EOF { readErrors.Inc() diff --git a/lib/protoparser/promremotewrite/streamparser.go b/lib/protoparser/promremotewrite/streamparser.go index 573e817db..12c118c0a 100644 --- a/lib/protoparser/promremotewrite/streamparser.go +++ b/lib/protoparser/promremotewrite/streamparser.go @@ -37,7 +37,7 @@ func ParseStream(r io.Reader, callback func(tss []prompb.TimeSeries) error) erro if err != nil { return fmt.Errorf("cannot decompress request with length %d: %w", len(ctx.reqBuf.B), err) } - if len(bb.B) > maxInsertRequestSize.N { + if int64(len(bb.B)) > maxInsertRequestSize.N { return fmt.Errorf("too big unpacked request; mustn't exceed `-maxInsertRequestSize=%d` bytes; got %d bytes", maxInsertRequestSize.N, len(bb.B)) } wr := getWriteRequest() diff --git a/lib/protoparser/vmimport/streamparser.go b/lib/protoparser/vmimport/streamparser.go index dbcd84e56..bc5a1570d 100644 --- a/lib/protoparser/vmimport/streamparser.go +++ b/lib/protoparser/vmimport/streamparser.go @@ -52,7 +52,7 @@ func (ctx *streamContext) Read() bool { if ctx.err != nil || ctx.hasCallbackError() { return false } - ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlockExt(ctx.br, ctx.reqBuf, ctx.tailBuf, maxLineLen.N) + ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlockExt(ctx.br, ctx.reqBuf, ctx.tailBuf, maxLineLen.IntN()) if ctx.err != nil { if ctx.err != io.EOF { readErrors.Inc() diff --git a/lib/storage/storage.go b/lib/storage/storage.go index a319efb87..550370843 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -628,7 +628,7 @@ func (s *Storage) UpdateMetrics(m *Metrics) { // SetFreeDiskSpaceLimit sets the minimum free disk space size of current storage path // // The function must be called before opening or creating any storage. -func SetFreeDiskSpaceLimit(bytes int) { +func SetFreeDiskSpaceLimit(bytes int64) { freeDiskSpaceLimitBytes = uint64(bytes) }