From 95608885eaad25625a343d90302109b590037b48 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sun, 12 May 2024 10:10:46 +0200 Subject: [PATCH 1/4] app/vmselect/promql: properly estimate the needed amounts of memory for executing aggregate function over rollup function in incremental mode Incremental aggregation processes only GOMAXPROCS time series at a time, so its' memory usage doesn't depend on the number of input time series. The issue has been introduced in 5138eaeea0791caa34bcfab410e0ca9cd253cd8f Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3203 --- app/vmselect/promql/eval.go | 2 +- docs/CHANGELOG.md | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index e4ab2f42e..611325473 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -1729,7 +1729,7 @@ func evalRollupFuncNoCache(qt *querytracer.Tracer, ec *EvalConfig, funcName stri } } rollupPoints := mulNoOverflow(pointsPerSeries, int64(timeseriesLen*len(rcs))) - rollupMemorySize := sumNoOverflow(mulNoOverflow(int64(rssLen), 1000), mulNoOverflow(rollupPoints, 16)) + rollupMemorySize := sumNoOverflow(mulNoOverflow(int64(timeseriesLen), 1000), mulNoOverflow(rollupPoints, 16)) if maxMemory := int64(logQueryMemoryUsage.N); maxMemory > 0 && rollupMemorySize > maxMemory { memoryIntensiveQueries.Inc() requestURI := ec.GetRequestURI() diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 0ec5a1665..aee3a8d20 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -44,6 +44,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix bug that prevents the first query trace from expanding on click event. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6186). The issue was introduced in [v1.100.0](https://docs.victoriametrics.com/changelog/#v11000) release. * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent/): prevent potential panic during [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) if more than one `--remoteWrite.streamAggr.dedupInterval` is configured. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6205). * BUGFIX: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): set correct suffix `_prometheus` for aggregation outputs [increase_prometheus](https://docs.victoriametrics.com/stream-aggregation/#increase_prometheus) and [total_prometheus](https://docs.victoriametrics.com/stream-aggregation/#total_prometheus). Before, outputs `total` and `total_prometheus` or `increase` and `increase_prometheus` had the same suffix. +* BUGFIX: properly estimate the needed memory for query execution if it has the format [`aggr_func`](https://docs.victoriametrics.com/metricsql/#aggregate-functions)([`rollup_func[d]`](https://docs.victoriametrics.com/metricsql/#rollup-functions) (for example, `sum(rate(request_duration_seconds_bucket[5m]))`). This should allow performing aggregations over bigger number of time series when VictoriaMetrics runs in environments with small amounts of available memory. The issue has been introduced in [this commit](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/5138eaeea0791caa34bcfab410e0ca9cd253cd8f) in [v1.83.0](https://docs.victoriametrics.com/changelog_2022/#v1830). ## [v1.101.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.101.0) From 92de6ea3407ad35ad816baf9278ae9dc470a3d63 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sun, 12 May 2024 10:20:39 +0200 Subject: [PATCH 2/4] app/vmselect: use strings.EqualFold instead of strings.ToLower where appropriate Strings.EqualFold doesn't allocate memory contrary to strings.ToLower if the input string contains uppercase chars --- app/vmselect/graphiteql/parser.go | 4 ++-- app/vmselect/promql/binary_op.go | 4 ++-- app/vmselect/promql/eval.go | 10 +++++----- app/vmselect/promql/exec.go | 2 +- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/app/vmselect/graphiteql/parser.go b/app/vmselect/graphiteql/parser.go index b827c0f0e..a56bf74e6 100644 --- a/app/vmselect/graphiteql/parser.go +++ b/app/vmselect/graphiteql/parser.go @@ -213,11 +213,11 @@ func (p *parser) parseMetricExprOrFuncCall() (Expr, error) { // Metric epxression or bool expression or None. if isBool(ident) { be := &BoolExpr{ - B: strings.ToLower(ident) == "true", + B: strings.EqualFold(ident, "true"), } return be, nil } - if strings.ToLower(ident) == "none" { + if strings.EqualFold(ident, "none") { nne := &NoneExpr{} return nne, nil } diff --git a/app/vmselect/promql/binary_op.go b/app/vmselect/promql/binary_op.go index 0cf2f7528..ac347e504 100644 --- a/app/vmselect/promql/binary_op.go +++ b/app/vmselect/promql/binary_op.go @@ -112,7 +112,7 @@ func binaryOpNeqFunc(bfa *binaryOpFuncArg) ([]*timeseries, error) { } func isUnionFunc(e metricsql.Expr) bool { - if fe, ok := e.(*metricsql.FuncExpr); ok && (fe.Name == "" || strings.ToLower(fe.Name) == "union") { + if fe, ok := e.(*metricsql.FuncExpr); ok && (fe.Name == "" || strings.EqualFold(fe.Name, "union")) { return true } return false @@ -303,7 +303,7 @@ func ensureSingleTimeseries(side string, be *metricsql.BinaryOpExpr, tss []*time func groupJoin(singleTimeseriesSide string, be *metricsql.BinaryOpExpr, rvsLeft, rvsRight, tssLeft, tssRight []*timeseries) ([]*timeseries, []*timeseries, error) { joinTags := be.JoinModifier.Args var skipTags []string - if strings.ToLower(be.GroupModifier.Op) == "on" { + if strings.EqualFold(be.GroupModifier.Op, "on") { skipTags = be.GroupModifier.Args } joinPrefix := "" diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index 611325473..2f8b32cda 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -542,7 +542,7 @@ func execBinaryOpArgs(qt *querytracer.Tracer, ec *EvalConfig, exprFirst, exprSec if err != nil { return nil, nil, err } - if len(tssFirst) == 0 && strings.ToLower(be.Op) != "or" { + if len(tssFirst) == 0 && !strings.EqualFold(be.Op, "or") { // Fast path: there is no sense in executing the exprSecond when exprFirst returns an empty result, // since the "exprFirst op exprSecond" would return an empty result in any case. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3349 @@ -1168,7 +1168,7 @@ func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string, return evalExpr(qt, ec, be) case "rate": if iafc != nil { - if strings.ToLower(iafc.ae.Name) != "sum" { + if !strings.EqualFold(iafc.ae.Name, "sum") { qt.Printf("do not apply instant rollup optimization for incremental aggregate %s()", iafc.ae.Name) return evalAt(qt, timestamp, window) } @@ -1214,7 +1214,7 @@ func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string, return evalExpr(qt, ec, be) case "max_over_time": if iafc != nil { - if strings.ToLower(iafc.ae.Name) != "max" { + if !strings.EqualFold(iafc.ae.Name, "max") { qt.Printf("do not apply instant rollup optimization for non-max incremental aggregate %s()", iafc.ae.Name) return evalAt(qt, timestamp, window) } @@ -1276,7 +1276,7 @@ func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string, return tss, nil case "min_over_time": if iafc != nil { - if strings.ToLower(iafc.ae.Name) != "min" { + if !strings.EqualFold(iafc.ae.Name, "min") { qt.Printf("do not apply instant rollup optimization for non-min incremental aggregate %s()", iafc.ae.Name) return evalAt(qt, timestamp, window) } @@ -1345,7 +1345,7 @@ func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string, "increase", "increase_pure", "sum_over_time": - if iafc != nil && strings.ToLower(iafc.ae.Name) != "sum" { + if iafc != nil && !strings.EqualFold(iafc.ae.Name, "sum") { qt.Printf("do not apply instant rollup optimization for non-sum incremental aggregate %s()", iafc.ae.Name) return evalAt(qt, timestamp, window) } diff --git a/app/vmselect/promql/exec.go b/app/vmselect/promql/exec.go index 5414387ab..c38de1293 100644 --- a/app/vmselect/promql/exec.go +++ b/app/vmselect/promql/exec.go @@ -139,7 +139,7 @@ func maySortResults(e metricsql.Expr) bool { return false } case *metricsql.BinaryOpExpr: - if strings.ToLower(v.Op) == "or" { + if strings.EqualFold(v.Op, "or") { // Do not sort results for `a or b` in the same way as Prometheus does. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4763 return false From f20d4521963208ceacbb61b2f4226467b5f41618 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sun, 12 May 2024 10:23:53 +0200 Subject: [PATCH 3/4] lib/storage: remove outdated misleading comments --- lib/storage/partition.go | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 2756fc943..9c30185ea 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -28,14 +28,10 @@ import ( // This time shouldn't exceed a few days. const maxBigPartSize = 1e12 -// The maximum number of inmemory parts per partition. +// The maximum expected number of inmemory parts per partition. // -// This limit allows reducing querying CPU usage under high ingestion rate. -// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5212 -// -// This number may be reached when the insertion pace outreaches merger pace. -// If this number is reached, then the data ingestion is paused until background -// mergers reduce the number of parts below this number. +// The actual number of inmemory parts may exceed this value if in-memory mergers +// cannot keep up with the rate of creating new in-memory parts. const maxInmemoryParts = 60 // Default number of parts to merge at once. @@ -1275,10 +1271,6 @@ func (pt *partition) getMaxSmallPartSize() uint64 { // Small parts are cached in the OS page cache, // so limit their size by the remaining free RAM. mem := memory.Remaining() - // It is expected no more than defaultPartsToMerge/2 parts exist - // in the OS page cache before they are merged into bigger part. - // Half of the remaining RAM must be left for lib/mergeset parts, - // so the maxItems is calculated using the below code: n := uint64(mem) / defaultPartsToMerge if n < 10e6 { n = 10e6 From 590160ddbb96e61322b8c881ad5d51243521ac5f Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sun, 12 May 2024 11:24:48 +0200 Subject: [PATCH 4/4] lib/slicesutil: add helper functions for setting slice length and extending its capacity The added helper functions - SetLength() and ExtendCapacity() - replace error-prone code with simple function calls. --- app/vminsert/common/insert_ctx.go | 6 ++---- app/vmselect/promql/rollup_result_cache.go | 6 ++---- app/vmselect/promql/timeseries.go | 6 ++---- lib/bytesutil/bytebuffer.go | 4 ++-- lib/decimal/decimal.go | 19 ++++-------------- lib/encoding/int.go | 17 +++++----------- lib/leveledbytebufferpool/pool_test.go | 4 +++- lib/logstorage/indexdb.go | 11 ++++------- lib/mergeset/block_header.go | 6 ++---- lib/mergeset/encoding.go | 11 +++-------- lib/mergeset/table_search.go | 6 ++---- lib/prompbmarshal/util.go | 7 +++---- lib/slicesutil/slicesutil.go | 23 ++++++++++++++++++++++ lib/storage/block_header.go | 6 ++---- lib/storage/index_db.go | 8 ++------ lib/storage/metric_name.go | 12 ++++------- lib/storage/partition_search.go | 6 ++---- lib/storage/search.go | 11 +++-------- lib/storage/table_search.go | 6 ++---- lib/uint64set/uint64set.go | 10 +++++----- 20 files changed, 77 insertions(+), 108 deletions(-) create mode 100644 lib/slicesutil/slicesutil.go diff --git a/app/vminsert/common/insert_ctx.go b/app/vminsert/common/insert_ctx.go index 02be393ea..e5ce6d3ea 100644 --- a/app/vminsert/common/insert_ctx.go +++ b/app/vminsert/common/insert_ctx.go @@ -9,6 +9,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" ) @@ -37,12 +38,9 @@ func (ctx *InsertCtx) Reset(rowsLen int) { for i := range mrs { cleanMetricRow(&mrs[i]) } + mrs = slicesutil.SetLength(mrs, rowsLen) ctx.mrs = mrs[:0] - if n := rowsLen - cap(ctx.mrs); n > 0 { - ctx.mrs = append(ctx.mrs[:cap(ctx.mrs)], make([]storage.MetricRow, n)...) - } - ctx.mrs = ctx.mrs[:0] ctx.metricNamesBuf = ctx.metricNamesBuf[:0] ctx.relabelCtx.Reset() ctx.streamAggrCtx.Reset() diff --git a/app/vmselect/promql/rollup_result_cache.go b/app/vmselect/promql/rollup_result_cache.go index 7687d13e1..aee2c2fa8 100644 --- a/app/vmselect/promql/rollup_result_cache.go +++ b/app/vmselect/promql/rollup_result_cache.go @@ -16,6 +16,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" @@ -736,10 +737,7 @@ func (mi *rollupResultCacheMetainfo) Unmarshal(src []byte) error { } entriesLen := int(encoding.UnmarshalUint32(src)) src = src[4:] - if n := entriesLen - cap(mi.entries); n > 0 { - mi.entries = append(mi.entries[:cap(mi.entries)], make([]rollupResultCacheMetainfoEntry, n)...) - } - mi.entries = mi.entries[:entriesLen] + mi.entries = slicesutil.SetLength(mi.entries, entriesLen) for i := 0; i < entriesLen; i++ { tail, err := mi.entries[i].Unmarshal(src) if err != nil { diff --git a/app/vmselect/promql/timeseries.go b/app/vmselect/promql/timeseries.go index 5cb8cbdd4..ff58cbcbe 100644 --- a/app/vmselect/promql/timeseries.go +++ b/app/vmselect/promql/timeseries.go @@ -10,6 +10,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" ) @@ -246,10 +247,7 @@ func unmarshalMetricNameFast(mn *storage.MetricName, src []byte) ([]byte, error) } tagsLen := encoding.UnmarshalUint16(src) src = src[2:] - if n := int(tagsLen) - cap(mn.Tags); n > 0 { - mn.Tags = append(mn.Tags[:cap(mn.Tags)], make([]storage.Tag, n)...) - } - mn.Tags = mn.Tags[:tagsLen] + mn.Tags = slicesutil.SetLength(mn.Tags, int(tagsLen)) for i := range mn.Tags { tail, key, err := unmarshalBytesFast(src) if err != nil { diff --git a/lib/bytesutil/bytebuffer.go b/lib/bytesutil/bytebuffer.go index 013ab67bc..2060cae89 100644 --- a/lib/bytesutil/bytebuffer.go +++ b/lib/bytesutil/bytebuffer.go @@ -8,6 +8,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" ) var ( @@ -64,8 +65,7 @@ func (bb *ByteBuffer) ReadFrom(r io.Reader) (int64, error) { offset := bLen for { if free := len(b) - offset; free < offset { - n := len(b) - b = append(b, make([]byte, n)...) + b = slicesutil.SetLength(b, 2*len(b)) } n, err := r.Read(b[offset:]) offset += n diff --git a/lib/decimal/decimal.go b/lib/decimal/decimal.go index d356e6002..24701be9e 100644 --- a/lib/decimal/decimal.go +++ b/lib/decimal/decimal.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fastnum" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" ) // CalibrateScale calibrates a and b with the corresponding exponents ae, be @@ -81,29 +82,17 @@ var decimalMultipliers = []int64{1e0, 1e1, 1e2, 1e3, 1e4, 1e5, 1e6, 1e7, 1e8, 1e // ExtendFloat64sCapacity extends dst capacity to hold additionalItems // and returns the extended dst. func ExtendFloat64sCapacity(dst []float64, additionalItems int) []float64 { - dstLen := len(dst) - if n := dstLen + additionalItems - cap(dst); n > 0 { - dst = append(dst[:cap(dst)], make([]float64, n)...) - } - return dst[:dstLen] + return slicesutil.ExtendCapacity(dst, additionalItems) } // ExtendInt64sCapacity extends dst capacity to hold additionalItems // and returns the extended dst. func ExtendInt64sCapacity(dst []int64, additionalItems int) []int64 { - dstLen := len(dst) - if n := dstLen + additionalItems - cap(dst); n > 0 { - dst = append(dst[:cap(dst)], make([]int64, n)...) - } - return dst[:dstLen] + return slicesutil.ExtendCapacity(dst, additionalItems) } func extendInt16sCapacity(dst []int16, additionalItems int) []int16 { - dstLen := len(dst) - if n := dstLen + additionalItems - cap(dst); n > 0 { - dst = append(dst[:cap(dst)], make([]int16, n)...) - } - return dst[:dstLen] + return slicesutil.ExtendCapacity(dst, additionalItems) } // AppendDecimalToFloat converts each item in va to f=v*10^e, appends it diff --git a/lib/encoding/int.go b/lib/encoding/int.go index 3295e6ad4..7be990b0c 100644 --- a/lib/encoding/int.go +++ b/lib/encoding/int.go @@ -4,6 +4,8 @@ import ( "encoding/binary" "fmt" "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" ) // MarshalUint16 appends marshaled v to dst and returns the result. @@ -490,10 +492,7 @@ func GetInt64s(size int) *Int64s { } } is := v.(*Int64s) - if n := size - cap(is.A); n > 0 { - is.A = append(is.A[:cap(is.A)], make([]int64, n)...) - } - is.A = is.A[:size] + is.A = slicesutil.SetLength(is.A, size) return is } @@ -519,10 +518,7 @@ func GetUint64s(size int) *Uint64s { } } is := v.(*Uint64s) - if n := size - cap(is.A); n > 0 { - is.A = append(is.A[:cap(is.A)], make([]uint64, n)...) - } - is.A = is.A[:size] + is.A = slicesutil.SetLength(is.A, size) return is } @@ -548,10 +544,7 @@ func GetUint32s(size int) *Uint32s { } } is := v.(*Uint32s) - if n := size - cap(is.A); n > 0 { - is.A = append(is.A[:cap(is.A)], make([]uint32, n)...) - } - is.A = is.A[:size] + is.A = slicesutil.SetLength(is.A, size) return is } diff --git a/lib/leveledbytebufferpool/pool_test.go b/lib/leveledbytebufferpool/pool_test.go index 851513c68..d1aa84ec1 100644 --- a/lib/leveledbytebufferpool/pool_test.go +++ b/lib/leveledbytebufferpool/pool_test.go @@ -4,6 +4,8 @@ import ( "fmt" "testing" "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" ) func TestGetPutConcurrent(t *testing.T) { @@ -19,7 +21,7 @@ func TestGetPutConcurrent(t *testing.T) { if capacity < 0 { capacity = 0 } - bb.B = append(bb.B, make([]byte, capacity)...) + bb.B = slicesutil.SetLength(bb.B, len(bb.B)+capacity) Put(bb) } doneCh <- struct{}{} diff --git a/lib/logstorage/indexdb.go b/lib/logstorage/indexdb.go index d71775258..1579d871b 100644 --- a/lib/logstorage/indexdb.go +++ b/lib/logstorage/indexdb.go @@ -14,6 +14,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset" "github.com/VictoriaMetrics/VictoriaMetrics/lib/regexutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" ) const ( @@ -853,13 +854,9 @@ func (sp *tagToStreamIDsRowParser) ParseStreamIDs() { } tail := sp.tail n := len(tail) / 16 - streamIDs := sp.StreamIDs[:0] - if n <= cap(streamIDs) { - streamIDs = streamIDs[:n] - } else { - streamIDs = append(streamIDs[:cap(streamIDs)], make([]u128, n-cap(streamIDs))...) - } - sp.StreamIDs = streamIDs + sp.StreamIDs = slicesutil.SetLength(sp.StreamIDs, n) + streamIDs := sp.StreamIDs + _ = streamIDs[n-1] for i := 0; i < n; i++ { var err error tail, err = streamIDs[i].unmarshal(tail) diff --git a/lib/mergeset/block_header.go b/lib/mergeset/block_header.go index a957427ae..817fee7e6 100644 --- a/lib/mergeset/block_header.go +++ b/lib/mergeset/block_header.go @@ -7,6 +7,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" ) type blockHeader struct { @@ -160,10 +161,7 @@ func unmarshalBlockHeadersNoCopy(dst []blockHeader, src []byte, blockHeadersCoun logger.Panicf("BUG: blockHeadersCount must be greater than 0; got %d", blockHeadersCount) } dstLen := len(dst) - if n := dstLen + blockHeadersCount - cap(dst); n > 0 { - dst = append(dst[:cap(dst)], make([]blockHeader, n)...) - } - dst = dst[:dstLen+blockHeadersCount] + dst = slicesutil.SetLength(dst, dstLen+blockHeadersCount) for i := 0; i < blockHeadersCount; i++ { tail, err := dst[dstLen+i].UnmarshalNoCopy(src) if err != nil { diff --git a/lib/mergeset/encoding.go b/lib/mergeset/encoding.go index 93429744d..7a29a0ca8 100644 --- a/lib/mergeset/encoding.go +++ b/lib/mergeset/encoding.go @@ -12,6 +12,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" ) // Item represents a single item for storing in a mergeset. @@ -412,10 +413,7 @@ func (ib *inmemoryBlock) UnmarshalData(sb *storageBlock, firstItem, commonPrefix // since the data isn't going to be resized after unmarshaling. // This may save memory for caching the unmarshaled block. data := bytesutil.ResizeNoCopyNoOverallocate(ib.data, dataLen) - if n := int(itemsCount) - cap(ib.items); n > 0 { - ib.items = append(ib.items[:cap(ib.items)], make([]Item, n)...) - } - ib.items = ib.items[:itemsCount] + ib.items = slicesutil.SetLength(ib.items, int(itemsCount)) data = append(data[:0], firstItem...) items := ib.items items[0] = Item{ @@ -554,10 +552,7 @@ func getLensBuffer(n int) *lensBuffer { v = &lensBuffer{} } lb := v.(*lensBuffer) - if nn := n - cap(lb.lens); nn > 0 { - lb.lens = append(lb.lens[:cap(lb.lens)], make([]uint64, nn)...) - } - lb.lens = lb.lens[:n] + lb.lens = slicesutil.SetLength(lb.lens, n) return lb } diff --git a/lib/mergeset/table_search.go b/lib/mergeset/table_search.go index fdd03d586..70f741561 100644 --- a/lib/mergeset/table_search.go +++ b/lib/mergeset/table_search.go @@ -7,6 +7,7 @@ import ( "io" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" ) // TableSearch is a reusable cursor used for searching in the Table. @@ -71,10 +72,7 @@ func (ts *TableSearch) Init(tb *Table) { ts.pws = ts.tb.getParts(ts.pws[:0]) // Initialize the psPool. - if n := len(ts.pws) - cap(ts.psPool); n > 0 { - ts.psPool = append(ts.psPool[:cap(ts.psPool)], make([]partSearch, n)...) - } - ts.psPool = ts.psPool[:len(ts.pws)] + ts.psPool = slicesutil.SetLength(ts.psPool, len(ts.pws)) for i, pw := range ts.pws { ts.psPool[i].Init(pw.p) } diff --git a/lib/prompbmarshal/util.go b/lib/prompbmarshal/util.go index e2ccc00fa..688c450a5 100644 --- a/lib/prompbmarshal/util.go +++ b/lib/prompbmarshal/util.go @@ -2,16 +2,15 @@ package prompbmarshal import ( "fmt" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" ) // MarshalProtobuf marshals wr to dst and returns the result. func (wr *WriteRequest) MarshalProtobuf(dst []byte) []byte { size := wr.Size() dstLen := len(dst) - if n := size - (cap(dst) - dstLen); n > 0 { - dst = append(dst[:cap(dst)], make([]byte, n)...) - } - dst = dst[:dstLen+size] + dst = slicesutil.SetLength(dst, dstLen+size) n, err := wr.MarshalToSizedBuffer(dst[dstLen:]) if err != nil { panic(fmt.Errorf("BUG: unexpected error when marshaling WriteRequest: %w", err)) diff --git a/lib/slicesutil/slicesutil.go b/lib/slicesutil/slicesutil.go new file mode 100644 index 000000000..14cdeb1ae --- /dev/null +++ b/lib/slicesutil/slicesutil.go @@ -0,0 +1,23 @@ +package slicesutil + +// ExtendCapacity returns a with the capacity extended to len(a)+itemsToAdd. +// +// It may allocate new slice if cap(a) is smaller than len(a)+itemsToAdd. +func ExtendCapacity[T any](a []T, itemsToAdd int) []T { + aLen := len(a) + if n := aLen + itemsToAdd - cap(a); n > 0 { + a = append(a[:cap(a)], make([]T, n)...) + return a[:aLen] + } + return a +} + +// SetLength sets len(a) to newLen and returns the result. +// +// It may allocate new slice if cap(a) is smaller than newLen. +func SetLength[T any](a []T, newLen int) []T { + if n := newLen - cap(a); n > 0 { + a = append(a[:cap(a)], make([]T, n)...) + } + return a[:newLen] +} diff --git a/lib/storage/block_header.go b/lib/storage/block_header.go index 6bade6216..f7689f5f1 100644 --- a/lib/storage/block_header.go +++ b/lib/storage/block_header.go @@ -7,6 +7,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" ) // blockHeader is a header for a time series block. @@ -252,10 +253,7 @@ func unmarshalBlockHeaders(dst []blockHeader, src []byte, blockHeadersCount int) logger.Panicf("BUG: blockHeadersCount must be greater than zero; got %d", blockHeadersCount) } dstLen := len(dst) - if n := dstLen + blockHeadersCount - cap(dst); n > 0 { - dst = append(dst[:cap(dst)], make([]blockHeader, n)...) - dst = dst[:dstLen] - } + dst = slicesutil.ExtendCapacity(dst, blockHeadersCount) var bh blockHeader for len(src) > 0 { tmp, err := bh.Unmarshal(src) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 01df33fd0..af698e5bd 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -22,6 +22,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset" "github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" "github.com/VictoriaMetrics/fastcache" @@ -3153,13 +3154,8 @@ func (mp *tagToMetricIDsRowParser) ParseMetricIDs() { return } tail := mp.tail - mp.MetricIDs = mp.MetricIDs[:0] n := len(tail) / 8 - if n <= cap(mp.MetricIDs) { - mp.MetricIDs = mp.MetricIDs[:n] - } else { - mp.MetricIDs = append(mp.MetricIDs[:cap(mp.MetricIDs)], make([]uint64, n-cap(mp.MetricIDs))...) - } + mp.MetricIDs = slicesutil.SetLength(mp.MetricIDs, n) metricIDs := mp.MetricIDs _ = metricIDs[n-1] for i := 0; i < n; i++ { diff --git a/lib/storage/metric_name.go b/lib/storage/metric_name.go index 903dbd4ce..4f102bd79 100644 --- a/lib/storage/metric_name.go +++ b/lib/storage/metric_name.go @@ -15,6 +15,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" ) const ( @@ -702,10 +703,8 @@ func (mn *MetricName) sortTags() { } cts := getCanonicalTags() - if n := len(mn.Tags) - cap(cts.tags); n > 0 { - cts.tags = append(cts.tags[:cap(cts.tags)], make([]canonicalTag, n)...) - } - dst := cts.tags[:len(mn.Tags)] + cts.tags = slicesutil.SetLength(cts.tags, len(mn.Tags)) + dst := cts.tags for i := range mn.Tags { tag := &mn.Tags[i] ct := &dst[i] @@ -775,10 +774,7 @@ func (ts *canonicalTagsSort) Swap(i, j int) { func copyTags(dst, src []Tag) []Tag { dstLen := len(dst) - if n := dstLen + len(src) - cap(dst); n > 0 { - dst = append(dst[:cap(dst)], make([]Tag, n)...) - } - dst = dst[:dstLen+len(src)] + dst = slicesutil.SetLength(dst, dstLen+len(src)) for i := range src { dst[dstLen+i].copyFrom(&src[i]) } diff --git a/lib/storage/partition_search.go b/lib/storage/partition_search.go index 52e530887..cc1bacd82 100644 --- a/lib/storage/partition_search.go +++ b/lib/storage/partition_search.go @@ -6,6 +6,7 @@ import ( "io" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" ) // partitionSearch represents a search in the partition. @@ -83,10 +84,7 @@ func (pts *partitionSearch) Init(pt *partition, tsids []TSID, tr TimeRange) { pts.pws = pt.GetParts(pts.pws[:0], true) // Initialize psPool. - if n := len(pts.pws) - cap(pts.psPool); n > 0 { - pts.psPool = append(pts.psPool[:cap(pts.psPool)], make([]partSearch, n)...) - } - pts.psPool = pts.psPool[:len(pts.pws)] + pts.psPool = slicesutil.SetLength(pts.psPool, len(pts.pws)) for i, pw := range pts.pws { pts.psPool[i].Init(pw.p, tsids, tr) } diff --git a/lib/storage/search.go b/lib/storage/search.go index 6dee1c5c0..0e406b344 100644 --- a/lib/storage/search.go +++ b/lib/storage/search.go @@ -10,6 +10,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil" ) @@ -413,10 +414,7 @@ func (sq *SearchQuery) Unmarshal(src []byte) ([]byte, error) { if err != nil { return src, fmt.Errorf("cannot unmarshal the count of TagFilterss: %w", err) } - if n := int(tfssCount) - cap(sq.TagFilterss); n > 0 { - sq.TagFilterss = append(sq.TagFilterss[:cap(sq.TagFilterss)], make([][]TagFilter, n)...) - } - sq.TagFilterss = sq.TagFilterss[:tfssCount] + sq.TagFilterss = slicesutil.SetLength(sq.TagFilterss, int(tfssCount)) src = tail for i := 0; i < int(tfssCount); i++ { @@ -427,10 +425,7 @@ func (sq *SearchQuery) Unmarshal(src []byte) ([]byte, error) { src = tail tagFilters := sq.TagFilterss[i] - if n := int(tfsCount) - cap(tagFilters); n > 0 { - tagFilters = append(tagFilters[:cap(tagFilters)], make([]TagFilter, n)...) - } - tagFilters = tagFilters[:tfsCount] + tagFilters = slicesutil.SetLength(tagFilters, int(tfsCount)) for j := 0; j < int(tfsCount); j++ { tail, err := tagFilters[j].Unmarshal(src) if err != nil { diff --git a/lib/storage/table_search.go b/lib/storage/table_search.go index bd03d67ca..1809a8ae1 100644 --- a/lib/storage/table_search.go +++ b/lib/storage/table_search.go @@ -7,6 +7,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" ) // tableSearch performs searches in the table. @@ -84,10 +85,7 @@ func (ts *tableSearch) Init(tb *table, tsids []TSID, tr TimeRange) { ts.ptws = tb.GetPartitions(ts.ptws[:0]) // Initialize the ptsPool. - if n := len(ts.ptws) - cap(ts.ptsPool); n > 0 { - ts.ptsPool = append(ts.ptsPool[:cap(ts.ptsPool)], make([]partitionSearch, n)...) - } - ts.ptsPool = ts.ptsPool[:len(ts.ptws)] + ts.ptsPool = slicesutil.SetLength(ts.ptsPool, len(ts.ptws)) for i, ptw := range ts.ptws { ts.ptsPool[i].Init(ptw.pt, tsids, tr) } diff --git a/lib/uint64set/uint64set.go b/lib/uint64set/uint64set.go index ff827693d..44204b509 100644 --- a/lib/uint64set/uint64set.go +++ b/lib/uint64set/uint64set.go @@ -6,6 +6,8 @@ import ( "sync" "sync/atomic" "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" ) // Set is a fast set for uint64. @@ -226,11 +228,9 @@ func (s *Set) AppendTo(dst []uint64) []uint64 { } // pre-allocate memory for dst - dstLen := len(dst) - if n := s.Len() - cap(dst) + dstLen; n > 0 { - dst = append(dst[:cap(dst)], make([]uint64, n)...) - dst = dst[:dstLen] - } + sLen := s.Len() + dst = slicesutil.ExtendCapacity(dst, sLen) + s.sort() for i := range s.buckets { dst = s.buckets[i].appendTo(dst)