lib/flagutil/bytes.go: properly handle values bigger than 2GiB on 32-bit architectures

This fixes handling of values bigger than 2GiB for the following command-line flags:

- -storage.minFreeDiskSpaceBytes
- -remoteWrite.maxDiskUsagePerURL
This commit is contained in:
Aliaksandr Valialkin 2022-12-14 19:26:24 -08:00
parent 13bad64268
commit 9f66f84aef
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
17 changed files with 61 additions and 32 deletions

View file

@ -195,7 +195,7 @@ func pushWriteRequest(wr *prompbmarshal.WriteRequest, pushBlock func(block []byt
}
bb := writeRequestBufPool.Get()
bb.B = prompbmarshal.MarshalWriteRequest(bb.B[:0], wr)
if len(bb.B) <= maxUnpackedBlockSize.N {
if len(bb.B) <= maxUnpackedBlockSize.IntN() {
zb := snappyBufPool.Get()
zb.B = snappy.Encode(zb.B[:cap(zb.B)], bb.B)
writeRequestBufPool.Put(bb)

View file

@ -143,7 +143,7 @@ func newSrcFS() (*fslocal.FS, error) {
fs := &fslocal.FS{
Dir: snapshotPath,
MaxBytesPerSecond: maxBytesPerSecond.N,
MaxBytesPerSecond: maxBytesPerSecond.IntN(),
}
if err := fs.Init(); err != nil {
return nil, fmt.Errorf("cannot initialize fs: %w", err)

View file

@ -81,7 +81,7 @@ func newDstFS() (*fslocal.FS, error) {
}
fs := &fslocal.FS{
Dir: *storageDataPath,
MaxBytesPerSecond: maxBytesPerSecond.N,
MaxBytesPerSecond: maxBytesPerSecond.IntN(),
}
if err := fs.Init(); err != nil {
return nil, fmt.Errorf("cannot initialize local fs: %w", err)

View file

@ -665,7 +665,7 @@ func QueryHandler(qt *querytracer.Tracer, startTime time.Time, w http.ResponseWr
step = defaultStep
}
if len(query) > maxQueryLen.N {
if len(query) > maxQueryLen.IntN() {
return fmt.Errorf("too long query; got %d bytes; mustn't exceed `-search.maxQueryLen=%d` bytes", len(query), maxQueryLen.N)
}
etfs, err := searchutils.GetExtraTagFilters(r)
@ -812,7 +812,7 @@ func queryRangeHandler(qt *querytracer.Tracer, startTime time.Time, w http.Respo
}
// Validate input args.
if len(query) > maxQueryLen.N {
if len(query) > maxQueryLen.IntN() {
return fmt.Errorf("too long query; got %d bytes; mustn't exceed `-search.maxQueryLen=%d` bytes", len(query), maxQueryLen.N)
}
if start > end {

View file

@ -97,10 +97,10 @@ func InitWithoutMetrics(resetCacheIfNeeded func(mrs []storage.MetricRow)) {
storage.SetSmallMergeWorkersCount(*smallMergeConcurrency)
storage.SetRetentionTimezoneOffset(*retentionTimezoneOffset)
storage.SetFreeDiskSpaceLimit(minFreeDiskSpaceBytes.N)
storage.SetTSIDCacheSize(cacheSizeStorageTSID.N)
storage.SetTagFilterCacheSize(cacheSizeIndexDBTagFilters.N)
mergeset.SetIndexBlocksCacheSize(cacheSizeIndexDBIndexBlocks.N)
mergeset.SetDataBlocksCacheSize(cacheSizeIndexDBDataBlocks.N)
storage.SetTSIDCacheSize(cacheSizeStorageTSID.IntN())
storage.SetTagFilterCacheSize(cacheSizeIndexDBTagFilters.IntN())
mergeset.SetIndexBlocksCacheSize(cacheSizeIndexDBIndexBlocks.IntN())
mergeset.SetDataBlocksCacheSize(cacheSizeIndexDBDataBlocks.IntN())
if retentionPeriod.Msecs < 24*3600*1000 {
logger.Fatalf("-retentionPeriod cannot be smaller than a day; got %s", retentionPeriod)

View file

@ -15,6 +15,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
## v1.79.x long-time support release (LTS)
* BUGFIX: allow specifying values bigger than 2GiB to the following command-line flag values on 32-bit architectures (`386` and `arm`): `-storage.minFreeDiskSpaceBytes` and `-remoteWrite.maxDiskUsagePerURL`. Previously values bigger than 2GiB were incorrectly truncated on these architectures.
* BUGFIX: [VictoriaMetrics enterprise](https://docs.victoriametrics.com/enterprise.html): expose proper values for `vm_downsampling_partitions_scheduled` and `vm_downsampling_partitions_scheduled_size_bytes` metrics, which were added at [v1.78.0](https://docs.victoriametrics.com/CHANGELOG.html#v1780). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2612).
* BUGFIX: [DataDog protocol parser](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent): do not re-use `host` and `device` fields from the previously parsed messages if these fields are missing in the currently parsed message. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3432).

View file

@ -3,13 +3,14 @@ package flagutil
import (
"flag"
"fmt"
"math"
"strconv"
"strings"
)
// NewBytes returns new `bytes` flag with the given name, defaultValue and description.
func NewBytes(name string, defaultValue int, description string) *Bytes {
description += "\nSupports the following optional suffixes for `size` values: KB, MB, GB, KiB, MiB, GiB"
func NewBytes(name string, defaultValue int64, description string) *Bytes {
description += "\nSupports the following optional suffixes for `size` values: KB, MB, GB, KiB, MiB, GiB, TiB"
b := Bytes{
N: defaultValue,
valueString: fmt.Sprintf("%d", defaultValue),
@ -23,11 +24,22 @@ func NewBytes(name string, defaultValue int, description string) *Bytes {
// It supports the following optional suffixes for values: KB, MB, GB, KiB, MiB, GiB.
type Bytes struct {
// N contains parsed value for the given flag.
N int
N int64
valueString string
}
// IntN returns the stored value capped by int type.
func (b *Bytes) IntN() int {
if b.N > math.MaxInt {
return math.MaxInt
}
if b.N < math.MinInt {
return math.MinInt
}
return int(b.N)
}
// String implements flag.Value interface
func (b *Bytes) String() string {
return b.valueString
@ -42,7 +54,7 @@ func (b *Bytes) Set(value string) error {
if err != nil {
return err
}
b.N = int(f * 1000)
b.N = int64(f * 1000)
b.valueString = value
return nil
case strings.HasSuffix(value, "MB"):
@ -50,7 +62,7 @@ func (b *Bytes) Set(value string) error {
if err != nil {
return err
}
b.N = int(f * 1000 * 1000)
b.N = int64(f * 1000 * 1000)
b.valueString = value
return nil
case strings.HasSuffix(value, "GB"):
@ -58,7 +70,15 @@ func (b *Bytes) Set(value string) error {
if err != nil {
return err
}
b.N = int(f * 1000 * 1000 * 1000)
b.N = int64(f * 1000 * 1000 * 1000)
b.valueString = value
return nil
case strings.HasSuffix(value, "TB"):
f, err := strconv.ParseFloat(value[:len(value)-2], 64)
if err != nil {
return err
}
b.N = int64(f * 1000 * 1000 * 1000 * 1000)
b.valueString = value
return nil
case strings.HasSuffix(value, "KiB"):
@ -66,7 +86,7 @@ func (b *Bytes) Set(value string) error {
if err != nil {
return err
}
b.N = int(f * 1024)
b.N = int64(f * 1024)
b.valueString = value
return nil
case strings.HasSuffix(value, "MiB"):
@ -74,7 +94,7 @@ func (b *Bytes) Set(value string) error {
if err != nil {
return err
}
b.N = int(f * 1024 * 1024)
b.N = int64(f * 1024 * 1024)
b.valueString = value
return nil
case strings.HasSuffix(value, "GiB"):
@ -82,7 +102,15 @@ func (b *Bytes) Set(value string) error {
if err != nil {
return err
}
b.N = int(f * 1024 * 1024 * 1024)
b.N = int64(f * 1024 * 1024 * 1024)
b.valueString = value
return nil
case strings.HasSuffix(value, "TiB"):
f, err := strconv.ParseFloat(value[:len(value)-3], 64)
if err != nil {
return err
}
b.N = int64(f * 1024 * 1024 * 1024 * 1024)
b.valueString = value
return nil
default:
@ -90,7 +118,7 @@ func (b *Bytes) Set(value string) error {
if err != nil {
return err
}
b.N = int(f)
b.N = int64(f)
b.valueString = value
return nil
}

View file

@ -24,7 +24,7 @@ func TestBytesSetFailure(t *testing.T) {
}
func TestBytesSetSuccess(t *testing.T) {
f := func(value string, expectedResult int) {
f := func(value string, expectedResult int64) {
t.Helper()
var b Bytes
if err := b.Set(value); err != nil {

View file

@ -34,14 +34,14 @@ func initOnce() {
memoryLimit = sysTotalMemory()
if allowedBytes.N <= 0 {
if *allowedPercent < 1 || *allowedPercent > 200 {
logger.Panicf("FATAL: -memory.allowedPercent must be in the range [1...200]; got %g", *allowedPercent)
logger.Fatalf("FATAL: -memory.allowedPercent must be in the range [1...200]; got %g", *allowedPercent)
}
percent := *allowedPercent / 100
allowedMemory = int(float64(memoryLimit) * percent)
remainingMemory = memoryLimit - allowedMemory
logger.Infof("limiting caches to %d bytes, leaving %d bytes to the OS according to -memory.allowedPercent=%g", allowedMemory, remainingMemory, *allowedPercent)
} else {
allowedMemory = allowedBytes.N
allowedMemory = allowedBytes.IntN()
remainingMemory = memoryLimit - allowedMemory
logger.Infof("limiting caches to %d bytes, leaving %d bytes to the OS according to -memory.allowedBytes=%s", allowedMemory, remainingMemory, allowedBytes.String())
}

View file

@ -41,7 +41,7 @@ type FastQueue struct {
// if maxPendingBytes is 0, then the queue size is unlimited.
// Otherwise its size is limited by maxPendingBytes. The oldest data is dropped when the queue
// reaches maxPendingSize.
func MustOpenFastQueue(path, name string, maxInmemoryBlocks, maxPendingBytes int) *FastQueue {
func MustOpenFastQueue(path, name string, maxInmemoryBlocks int, maxPendingBytes int64) *FastQueue {
pq := mustOpen(path, name, maxPendingBytes)
fq := &FastQueue{
pq: pq,

View file

@ -124,7 +124,7 @@ func (q *queue) GetPendingBytes() uint64 {
//
// If maxPendingBytes is greater than 0, then the max queue size is limited by this value.
// The oldest data is deleted when queue size exceeds maxPendingBytes.
func mustOpen(path, name string, maxPendingBytes int) *queue {
func mustOpen(path, name string, maxPendingBytes int64) *queue {
if maxPendingBytes < 0 {
maxPendingBytes = 0
}

View file

@ -116,9 +116,9 @@ func newClient(sw *ScrapeWork) *client {
MaxIdleConnDuration: 2 * sw.ScrapeInterval,
ReadTimeout: sw.ScrapeTimeout,
WriteTimeout: 10 * time.Second,
MaxResponseBodySize: maxScrapeSize.N,
MaxResponseBodySize: maxScrapeSize.IntN(),
MaxIdempotentRequestAttempts: 1,
ReadBufferSize: maxResponseHeadersSize.N,
ReadBufferSize: maxResponseHeadersSize.IntN(),
}
var sc *http.Client
var proxyURLFunc func(*http.Request) (*url.URL, error)

View file

@ -245,7 +245,7 @@ func (sw *scrapeWork) loadLastScrape() string {
}
func (sw *scrapeWork) storeLastScrape(lastScrape []byte) {
mustCompress := minResponseSizeForStreamParse.N > 0 && len(lastScrape) >= minResponseSizeForStreamParse.N
mustCompress := minResponseSizeForStreamParse.N > 0 && len(lastScrape) >= minResponseSizeForStreamParse.IntN()
if mustCompress {
sw.lastScrapeCompressed = encoding.CompressZSTDLevel(sw.lastScrapeCompressed[:0], lastScrape, 1)
sw.lastScrape = nil
@ -400,7 +400,7 @@ func (sw *scrapeWork) mustSwitchToStreamParseMode(responseSize int) bool {
if minResponseSizeForStreamParse.N <= 0 {
return false
}
return sw.Config.canSwitchToStreamParseMode() && responseSize >= minResponseSizeForStreamParse.N
return sw.Config.canSwitchToStreamParseMode() && responseSize >= minResponseSizeForStreamParse.IntN()
}
// getTargetResponse() fetches response from sw target in the same way as when scraping the target.

View file

@ -76,7 +76,7 @@ func (ctx *streamContext) Read() bool {
if ctx.err != nil || ctx.hasCallbackError() {
return false
}
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlockExt(ctx.br, ctx.reqBuf, ctx.tailBuf, maxLineSize.N)
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlockExt(ctx.br, ctx.reqBuf, ctx.tailBuf, maxLineSize.IntN())
if ctx.err != nil {
if ctx.err != io.EOF {
readErrors.Inc()

View file

@ -37,7 +37,7 @@ func ParseStream(r io.Reader, callback func(tss []prompb.TimeSeries) error) erro
if err != nil {
return fmt.Errorf("cannot decompress request with length %d: %w", len(ctx.reqBuf.B), err)
}
if len(bb.B) > maxInsertRequestSize.N {
if int64(len(bb.B)) > maxInsertRequestSize.N {
return fmt.Errorf("too big unpacked request; mustn't exceed `-maxInsertRequestSize=%d` bytes; got %d bytes", maxInsertRequestSize.N, len(bb.B))
}
wr := getWriteRequest()

View file

@ -52,7 +52,7 @@ func (ctx *streamContext) Read() bool {
if ctx.err != nil || ctx.hasCallbackError() {
return false
}
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlockExt(ctx.br, ctx.reqBuf, ctx.tailBuf, maxLineLen.N)
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlockExt(ctx.br, ctx.reqBuf, ctx.tailBuf, maxLineLen.IntN())
if ctx.err != nil {
if ctx.err != io.EOF {
readErrors.Inc()

View file

@ -612,7 +612,7 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
// SetFreeDiskSpaceLimit sets the minimum free disk space size of current storage path
//
// The function must be called before opening or creating any storage.
func SetFreeDiskSpaceLimit(bytes int) {
func SetFreeDiskSpaceLimit(bytes int64) {
freeDiskSpaceLimitBytes = uint64(bytes)
}