mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/flagutil/bytes.go: properly handle values bigger than 2GiB on 32-bit architectures
This fixes handling of values bigger than 2GiB for the following command-line flags: - -storage.minFreeDiskSpaceBytes - -remoteWrite.maxDiskUsagePerURL
This commit is contained in:
parent
3a28a52667
commit
1a88fe5b1f
19 changed files with 56 additions and 42 deletions
|
@ -195,7 +195,7 @@ func pushWriteRequest(wr *prompbmarshal.WriteRequest, pushBlock func(block []byt
|
|||
}
|
||||
bb := writeRequestBufPool.Get()
|
||||
bb.B = prompbmarshal.MarshalWriteRequest(bb.B[:0], wr)
|
||||
if len(bb.B) <= maxUnpackedBlockSize.N {
|
||||
if len(bb.B) <= maxUnpackedBlockSize.IntN() {
|
||||
zb := snappyBufPool.Get()
|
||||
zb.B = snappy.Encode(zb.B[:cap(zb.B)], bb.B)
|
||||
writeRequestBufPool.Put(bb)
|
||||
|
|
|
@ -145,7 +145,7 @@ func newSrcFS() (*fslocal.FS, error) {
|
|||
|
||||
fs := &fslocal.FS{
|
||||
Dir: snapshotPath,
|
||||
MaxBytesPerSecond: maxBytesPerSecond.N,
|
||||
MaxBytesPerSecond: maxBytesPerSecond.IntN(),
|
||||
}
|
||||
if err := fs.Init(); err != nil {
|
||||
return nil, fmt.Errorf("cannot initialize fs: %w", err)
|
||||
|
|
|
@ -83,7 +83,7 @@ func newDstFS() (*fslocal.FS, error) {
|
|||
}
|
||||
fs := &fslocal.FS{
|
||||
Dir: *storageDataPath,
|
||||
MaxBytesPerSecond: maxBytesPerSecond.N,
|
||||
MaxBytesPerSecond: maxBytesPerSecond.IntN(),
|
||||
}
|
||||
if err := fs.Init(); err != nil {
|
||||
return nil, fmt.Errorf("cannot initialize local fs: %w", err)
|
||||
|
|
|
@ -776,7 +776,7 @@ func QueryHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Token, w
|
|||
step = defaultStep
|
||||
}
|
||||
|
||||
if len(query) > maxQueryLen.N {
|
||||
if len(query) > maxQueryLen.IntN() {
|
||||
return fmt.Errorf("too long query; got %d bytes; mustn't exceed `-search.maxQueryLen=%d` bytes", len(query), maxQueryLen.N)
|
||||
}
|
||||
etfs, err := searchutils.GetExtraTagFilters(r)
|
||||
|
@ -927,7 +927,7 @@ func queryRangeHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Tok
|
|||
}
|
||||
|
||||
// Validate input args.
|
||||
if len(query) > maxQueryLen.N {
|
||||
if len(query) > maxQueryLen.IntN() {
|
||||
return fmt.Errorf("too long query; got %d bytes; mustn't exceed `-search.maxQueryLen=%d` bytes", len(query), maxQueryLen.N)
|
||||
}
|
||||
if start > end {
|
||||
|
|
|
@ -83,10 +83,10 @@ func main() {
|
|||
storage.SetMergeWorkersCount(*smallMergeConcurrency)
|
||||
storage.SetRetentionTimezoneOffset(*retentionTimezoneOffset)
|
||||
storage.SetFreeDiskSpaceLimit(minFreeDiskSpaceBytes.N)
|
||||
storage.SetTSIDCacheSize(cacheSizeStorageTSID.N)
|
||||
storage.SetTagFiltersCacheSize(cacheSizeIndexDBTagFilters.N)
|
||||
mergeset.SetIndexBlocksCacheSize(cacheSizeIndexDBIndexBlocks.N)
|
||||
mergeset.SetDataBlocksCacheSize(cacheSizeIndexDBDataBlocks.N)
|
||||
storage.SetTSIDCacheSize(cacheSizeStorageTSID.IntN())
|
||||
storage.SetTagFiltersCacheSize(cacheSizeIndexDBTagFilters.IntN())
|
||||
mergeset.SetIndexBlocksCacheSize(cacheSizeIndexDBIndexBlocks.IntN())
|
||||
mergeset.SetDataBlocksCacheSize(cacheSizeIndexDBDataBlocks.IntN())
|
||||
|
||||
if retentionPeriod.Msecs < 24*3600*1000 {
|
||||
logger.Fatalf("-retentionPeriod cannot be smaller than a day; got %s", retentionPeriod)
|
||||
|
|
|
@ -15,6 +15,8 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
|||
|
||||
## tip
|
||||
|
||||
* BUGFIX: allow specifying values bigger than 2GiB to the following command-line flag values on 32-bit architectures (`386` and `arm`): `-storage.minFreeDiskSpaceBytes` and `-remoteWrite.maxDiskUsagePerURL`. Previously values bigger than 2GiB were incorrectly truncated on these architectures.
|
||||
|
||||
|
||||
## [v1.85.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.85.1)
|
||||
|
||||
|
|
|
@ -308,7 +308,7 @@ func (a *ArrayBytes) Set(value string) error {
|
|||
}
|
||||
|
||||
// GetOptionalArgOrDefault returns optional arg under the given argIdx.
|
||||
func (a *ArrayBytes) GetOptionalArgOrDefault(argIdx, defaultValue int) int {
|
||||
func (a *ArrayBytes) GetOptionalArgOrDefault(argIdx int, defaultValue int64) int64 {
|
||||
x := *a
|
||||
if argIdx < len(x) {
|
||||
return x[argIdx].N
|
||||
|
|
|
@ -265,8 +265,8 @@ func TestArrayInt_String(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestArrayBytes(t *testing.T) {
|
||||
expected := []int{10000000, 23, 10240}
|
||||
result := make([]int, len(fooFlagBytes))
|
||||
expected := []int64{10000000, 23, 10240}
|
||||
result := make([]int64, len(fooFlagBytes))
|
||||
for i, b := range fooFlagBytes {
|
||||
result[i] = b.N
|
||||
}
|
||||
|
@ -276,11 +276,11 @@ func TestArrayBytes(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestArrayBytes_Set(t *testing.T) {
|
||||
f := func(s string, expectedValues []int) {
|
||||
f := func(s string, expectedValues []int64) {
|
||||
t.Helper()
|
||||
var a ArrayBytes
|
||||
_ = a.Set(s)
|
||||
values := make([]int, len(a))
|
||||
values := make([]int64, len(a))
|
||||
for i, v := range a {
|
||||
values[i] = v.N
|
||||
}
|
||||
|
@ -288,13 +288,13 @@ func TestArrayBytes_Set(t *testing.T) {
|
|||
t.Fatalf("unexpected values parsed;\ngot\n%d\nwant\n%d", values, expectedValues)
|
||||
}
|
||||
}
|
||||
f("", []int{})
|
||||
f(`1`, []int{1})
|
||||
f(`-2,3,10kb`, []int{-2, 3, 10000})
|
||||
f("", []int64{})
|
||||
f(`1`, []int64{1})
|
||||
f(`-2,3,10kb`, []int64{-2, 3, 10000})
|
||||
}
|
||||
|
||||
func TestArrayBytes_GetOptionalArg(t *testing.T) {
|
||||
f := func(s string, argIdx, defaultValue, expectedValue int) {
|
||||
f := func(s string, argIdx int, defaultValue, expectedValue int64) {
|
||||
t.Helper()
|
||||
var a ArrayBytes
|
||||
_ = a.Set(s)
|
||||
|
|
|
@ -3,12 +3,13 @@ package flagutil
|
|||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"math"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// NewBytes returns new `bytes` flag with the given name, defaultValue and description.
|
||||
func NewBytes(name string, defaultValue int, description string) *Bytes {
|
||||
func NewBytes(name string, defaultValue int64, description string) *Bytes {
|
||||
description += "\nSupports the following optional suffixes for `size` values: KB, MB, GB, TB, KiB, MiB, GiB, TiB"
|
||||
b := Bytes{
|
||||
N: defaultValue,
|
||||
|
@ -23,11 +24,22 @@ func NewBytes(name string, defaultValue int, description string) *Bytes {
|
|||
// It supports the following optional suffixes for values: KB, MB, GB, TB, KiB, MiB, GiB, TiB.
|
||||
type Bytes struct {
|
||||
// N contains parsed value for the given flag.
|
||||
N int
|
||||
N int64
|
||||
|
||||
valueString string
|
||||
}
|
||||
|
||||
// IntN returns the stored value capped by int type.
|
||||
func (b *Bytes) IntN() int {
|
||||
if b.N > math.MaxInt {
|
||||
return math.MaxInt
|
||||
}
|
||||
if b.N < math.MinInt {
|
||||
return math.MinInt
|
||||
}
|
||||
return int(b.N)
|
||||
}
|
||||
|
||||
// String implements flag.Value interface
|
||||
func (b *Bytes) String() string {
|
||||
return b.valueString
|
||||
|
@ -42,7 +54,7 @@ func (b *Bytes) Set(value string) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b.N = int(f * 1000)
|
||||
b.N = int64(f * 1000)
|
||||
b.valueString = value
|
||||
return nil
|
||||
case strings.HasSuffix(value, "MB"):
|
||||
|
@ -50,7 +62,7 @@ func (b *Bytes) Set(value string) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b.N = int(f * 1000 * 1000)
|
||||
b.N = int64(f * 1000 * 1000)
|
||||
b.valueString = value
|
||||
return nil
|
||||
case strings.HasSuffix(value, "GB"):
|
||||
|
@ -58,7 +70,7 @@ func (b *Bytes) Set(value string) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b.N = int(f * 1000 * 1000 * 1000)
|
||||
b.N = int64(f * 1000 * 1000 * 1000)
|
||||
b.valueString = value
|
||||
return nil
|
||||
case strings.HasSuffix(value, "TB"):
|
||||
|
@ -66,7 +78,7 @@ func (b *Bytes) Set(value string) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b.N = int(f * 1000 * 1000 * 1000 * 1000)
|
||||
b.N = int64(f * 1000 * 1000 * 1000 * 1000)
|
||||
b.valueString = value
|
||||
return nil
|
||||
case strings.HasSuffix(value, "KiB"):
|
||||
|
@ -74,7 +86,7 @@ func (b *Bytes) Set(value string) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b.N = int(f * 1024)
|
||||
b.N = int64(f * 1024)
|
||||
b.valueString = value
|
||||
return nil
|
||||
case strings.HasSuffix(value, "MiB"):
|
||||
|
@ -82,7 +94,7 @@ func (b *Bytes) Set(value string) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b.N = int(f * 1024 * 1024)
|
||||
b.N = int64(f * 1024 * 1024)
|
||||
b.valueString = value
|
||||
return nil
|
||||
case strings.HasSuffix(value, "GiB"):
|
||||
|
@ -90,7 +102,7 @@ func (b *Bytes) Set(value string) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b.N = int(f * 1024 * 1024 * 1024)
|
||||
b.N = int64(f * 1024 * 1024 * 1024)
|
||||
b.valueString = value
|
||||
return nil
|
||||
case strings.HasSuffix(value, "TiB"):
|
||||
|
@ -98,7 +110,7 @@ func (b *Bytes) Set(value string) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b.N = int(f * 1024 * 1024 * 1024 * 1024)
|
||||
b.N = int64(f * 1024 * 1024 * 1024 * 1024)
|
||||
b.valueString = value
|
||||
return nil
|
||||
default:
|
||||
|
@ -106,7 +118,7 @@ func (b *Bytes) Set(value string) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b.N = int(f)
|
||||
b.N = int64(f)
|
||||
b.valueString = value
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -24,7 +24,7 @@ func TestBytesSetFailure(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestBytesSetSuccess(t *testing.T) {
|
||||
f := func(value string, expectedResult int) {
|
||||
f := func(value string, expectedResult int64) {
|
||||
t.Helper()
|
||||
var b Bytes
|
||||
if err := b.Set(value); err != nil {
|
||||
|
|
|
@ -34,14 +34,14 @@ func initOnce() {
|
|||
memoryLimit = sysTotalMemory()
|
||||
if allowedBytes.N <= 0 {
|
||||
if *allowedPercent < 1 || *allowedPercent > 200 {
|
||||
logger.Panicf("FATAL: -memory.allowedPercent must be in the range [1...200]; got %g", *allowedPercent)
|
||||
logger.Fatalf("FATAL: -memory.allowedPercent must be in the range [1...200]; got %g", *allowedPercent)
|
||||
}
|
||||
percent := *allowedPercent / 100
|
||||
allowedMemory = int(float64(memoryLimit) * percent)
|
||||
remainingMemory = memoryLimit - allowedMemory
|
||||
logger.Infof("limiting caches to %d bytes, leaving %d bytes to the OS according to -memory.allowedPercent=%g", allowedMemory, remainingMemory, *allowedPercent)
|
||||
} else {
|
||||
allowedMemory = allowedBytes.N
|
||||
allowedMemory = allowedBytes.IntN()
|
||||
remainingMemory = memoryLimit - allowedMemory
|
||||
logger.Infof("limiting caches to %d bytes, leaving %d bytes to the OS according to -memory.allowedBytes=%s", allowedMemory, remainingMemory, allowedBytes.String())
|
||||
}
|
||||
|
|
|
@ -41,7 +41,7 @@ type FastQueue struct {
|
|||
// if maxPendingBytes is 0, then the queue size is unlimited.
|
||||
// Otherwise its size is limited by maxPendingBytes. The oldest data is dropped when the queue
|
||||
// reaches maxPendingSize.
|
||||
func MustOpenFastQueue(path, name string, maxInmemoryBlocks, maxPendingBytes int) *FastQueue {
|
||||
func MustOpenFastQueue(path, name string, maxInmemoryBlocks int, maxPendingBytes int64) *FastQueue {
|
||||
pq := mustOpen(path, name, maxPendingBytes)
|
||||
fq := &FastQueue{
|
||||
pq: pq,
|
||||
|
|
|
@ -123,7 +123,7 @@ func (q *queue) GetPendingBytes() uint64 {
|
|||
//
|
||||
// If maxPendingBytes is greater than 0, then the max queue size is limited by this value.
|
||||
// The oldest data is deleted when queue size exceeds maxPendingBytes.
|
||||
func mustOpen(path, name string, maxPendingBytes int) *queue {
|
||||
func mustOpen(path, name string, maxPendingBytes int64) *queue {
|
||||
if maxPendingBytes < 0 {
|
||||
maxPendingBytes = 0
|
||||
}
|
||||
|
|
|
@ -128,9 +128,9 @@ func newClient(sw *ScrapeWork) *client {
|
|||
MaxIdleConnDuration: 2 * sw.ScrapeInterval,
|
||||
ReadTimeout: sw.ScrapeTimeout,
|
||||
WriteTimeout: 10 * time.Second,
|
||||
MaxResponseBodySize: maxScrapeSize.N,
|
||||
MaxResponseBodySize: maxScrapeSize.IntN(),
|
||||
MaxIdempotentRequestAttempts: 1,
|
||||
ReadBufferSize: maxResponseHeadersSize.N,
|
||||
ReadBufferSize: maxResponseHeadersSize.IntN(),
|
||||
}
|
||||
var sc *http.Client
|
||||
var proxyURLFunc func(*http.Request) (*url.URL, error)
|
||||
|
|
|
@ -235,7 +235,7 @@ func (sw *scrapeWork) loadLastScrape() string {
|
|||
}
|
||||
|
||||
func (sw *scrapeWork) storeLastScrape(lastScrape []byte) {
|
||||
mustCompress := minResponseSizeForStreamParse.N > 0 && len(lastScrape) >= minResponseSizeForStreamParse.N
|
||||
mustCompress := minResponseSizeForStreamParse.N > 0 && len(lastScrape) >= minResponseSizeForStreamParse.IntN()
|
||||
if mustCompress {
|
||||
sw.lastScrapeCompressed = encoding.CompressZSTDLevel(sw.lastScrapeCompressed[:0], lastScrape, 1)
|
||||
sw.lastScrape = nil
|
||||
|
@ -384,7 +384,7 @@ func (sw *scrapeWork) mustSwitchToStreamParseMode(responseSize int) bool {
|
|||
if minResponseSizeForStreamParse.N <= 0 {
|
||||
return false
|
||||
}
|
||||
return sw.Config.canSwitchToStreamParseMode() && responseSize >= minResponseSizeForStreamParse.N
|
||||
return sw.Config.canSwitchToStreamParseMode() && responseSize >= minResponseSizeForStreamParse.IntN()
|
||||
}
|
||||
|
||||
// getTargetResponse() fetches response from sw target in the same way as when scraping the target.
|
||||
|
|
|
@ -76,7 +76,7 @@ func (ctx *streamContext) Read() bool {
|
|||
if ctx.err != nil || ctx.hasCallbackError() {
|
||||
return false
|
||||
}
|
||||
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlockExt(ctx.br, ctx.reqBuf, ctx.tailBuf, maxLineSize.N)
|
||||
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlockExt(ctx.br, ctx.reqBuf, ctx.tailBuf, maxLineSize.IntN())
|
||||
if ctx.err != nil {
|
||||
if ctx.err != io.EOF {
|
||||
readErrors.Inc()
|
||||
|
|
|
@ -37,7 +37,7 @@ func ParseStream(r io.Reader, callback func(tss []prompb.TimeSeries) error) erro
|
|||
if err != nil {
|
||||
return fmt.Errorf("cannot decompress request with length %d: %w", len(ctx.reqBuf.B), err)
|
||||
}
|
||||
if len(bb.B) > maxInsertRequestSize.N {
|
||||
if int64(len(bb.B)) > maxInsertRequestSize.N {
|
||||
return fmt.Errorf("too big unpacked request; mustn't exceed `-maxInsertRequestSize=%d` bytes; got %d bytes", maxInsertRequestSize.N, len(bb.B))
|
||||
}
|
||||
wr := getWriteRequest()
|
||||
|
|
|
@ -52,7 +52,7 @@ func (ctx *streamContext) Read() bool {
|
|||
if ctx.err != nil || ctx.hasCallbackError() {
|
||||
return false
|
||||
}
|
||||
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlockExt(ctx.br, ctx.reqBuf, ctx.tailBuf, maxLineLen.N)
|
||||
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlockExt(ctx.br, ctx.reqBuf, ctx.tailBuf, maxLineLen.IntN())
|
||||
if ctx.err != nil {
|
||||
if ctx.err != io.EOF {
|
||||
readErrors.Inc()
|
||||
|
|
|
@ -628,7 +628,7 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
|
|||
// SetFreeDiskSpaceLimit sets the minimum free disk space size of current storage path
|
||||
//
|
||||
// The function must be called before opening or creating any storage.
|
||||
func SetFreeDiskSpaceLimit(bytes int) {
|
||||
func SetFreeDiskSpaceLimit(bytes int64) {
|
||||
freeDiskSpaceLimitBytes = uint64(bytes)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue