From 4bdd10ab9039d8ab0b4fe2d7a7dee064536da002 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 1 Feb 2022 00:18:39 +0200 Subject: [PATCH] lib/bytesutil: split Resize* funcs to MayOverallocate and NoOverallocate for more fine-grained control over memory allocations Follow-up for f4989edd962c8d9b1e80e7013451a92d7c229fc1 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2007 --- app/vmselect/netstorage/tmp_blocks_file.go | 2 +- app/vmselect/promql/timeseries.go | 2 +- lib/bytesutil/bytebuffer.go | 2 +- lib/bytesutil/bytesutil.go | 45 +++++- lib/bytesutil/bytesutil_test.go | 161 ++++++++++++++++++--- lib/ingestserver/graphite/server.go | 2 +- lib/ingestserver/influx/server.go | 2 +- lib/ingestserver/opentsdb/server.go | 2 +- lib/mergeset/block_stream_reader.go | 6 +- lib/mergeset/encoding.go | 20 ++- lib/mergeset/part_search.go | 6 +- lib/persistentqueue/persistentqueue.go | 4 +- lib/protoparser/common/lines_reader.go | 4 +- lib/protoparser/native/streamparser.go | 4 +- lib/storage/block_stream_reader.go | 6 +- lib/storage/metric_name.go | 4 +- lib/storage/part_search.go | 2 +- lib/storage/search.go | 4 +- 18 files changed, 220 insertions(+), 58 deletions(-) diff --git a/app/vmselect/netstorage/tmp_blocks_file.go b/app/vmselect/netstorage/tmp_blocks_file.go index 8c8c44acd..1cc05abc5 100644 --- a/app/vmselect/netstorage/tmp_blocks_file.go +++ b/app/vmselect/netstorage/tmp_blocks_file.go @@ -149,7 +149,7 @@ func (tbf *tmpBlocksFile) MustReadBlockRefAt(partRef storage.PartRef, addr tmpBl } else { bb := tmpBufPool.Get() defer tmpBufPool.Put(bb) - bb.B = bytesutil.ResizeNoCopy(bb.B, addr.size) + bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, addr.size) tbf.r.MustReadAt(bb.B, int64(addr.offset)) buf = bb.B } diff --git a/app/vmselect/promql/timeseries.go b/app/vmselect/promql/timeseries.go index 718162b03..0ec0ee89f 100644 --- a/app/vmselect/promql/timeseries.go +++ b/app/vmselect/promql/timeseries.go @@ -98,7 +98,7 @@ func marshalTimeseriesFast(dst []byte, tss []*timeseries, maxSize int, step int6 // Allocate the buffer for the marshaled tss before its' marshaling. // This should reduce memory fragmentation and memory usage. - dst = bytesutil.ResizeNoCopy(dst, size) + dst = bytesutil.ResizeNoCopyMayOverallocate(dst, size) dst = marshalFastTimestamps(dst[:0], tss[0].Timestamps) for _, ts := range tss { dst = ts.marshalFastNoTimestamps(dst) diff --git a/lib/bytesutil/bytebuffer.go b/lib/bytesutil/bytebuffer.go index 3aec86498..7a1351fb3 100644 --- a/lib/bytesutil/bytebuffer.go +++ b/lib/bytesutil/bytebuffer.go @@ -53,7 +53,7 @@ func (bb *ByteBuffer) MustReadAt(p []byte, offset int64) { func (bb *ByteBuffer) ReadFrom(r io.Reader) (int64, error) { b := bb.B bLen := len(b) - b = ResizeWithCopy(b, 4*1024) + b = ResizeWithCopyMayOverallocate(b, 4*1024) b = b[:cap(b)] offset := bLen for { diff --git a/lib/bytesutil/bytesutil.go b/lib/bytesutil/bytesutil.go index 1ead40f5c..daa01232d 100644 --- a/lib/bytesutil/bytesutil.go +++ b/lib/bytesutil/bytesutil.go @@ -1,35 +1,66 @@ package bytesutil import ( + "math/bits" "reflect" "unsafe" ) -// ResizeWithCopy resizes b to n bytes and returns the resized buffer (which may be newly allocated). +// ResizeWithCopyMayOverallocate resizes b to minimum n bytes and returns the resized buffer (which may be newly allocated). // // If newly allocated buffer is returned then b contents is copied to it. -func ResizeWithCopy(b []byte, n int) []byte { +func ResizeWithCopyMayOverallocate(b []byte, n int) []byte { + if n <= cap(b) { + return b[:n] + } + nNew := roundToNearestPow2(n) + bNew := make([]byte, nNew) + copy(bNew, b) + return bNew[:n] +} + +// ResizeWithCopyNoOverallocate resizes b to exactly n bytes and returns the resized buffer (which may be newly allocated). +// +// If newly allocated buffer is returned then b contents is copied to it. +func ResizeWithCopyNoOverallocate(b []byte, n int) []byte { if n <= cap(b) { return b[:n] } - // Allocate the exact number of bytes instead of using `b = append(b[:cap(b)], make([]byte, nn)...)`, - // since `append()` may allocate more than the requested bytes for additional capacity. - // Using make() instead of append() should save RAM when the resized slice is cached somewhere. bNew := make([]byte, n) copy(bNew, b) return bNew } -// ResizeNoCopy resizes b to n bytes and returns the resized buffer (which may be newly allocated). +// ResizeNoCopyMayOverallocate resizes b to minimum n bytes and returns the resized buffer (which may be newly allocated). // // If newly allocated buffer is returned then b contents isn't copied to it. -func ResizeNoCopy(b []byte, n int) []byte { +func ResizeNoCopyMayOverallocate(b []byte, n int) []byte { + if n <= cap(b) { + return b[:n] + } + nNew := roundToNearestPow2(n) + bNew := make([]byte, nNew) + return bNew[:n] +} + +// ResizeNoCopyNoOverallocate resizes b to exactly n bytes and returns the resized buffer (which may be newly allocated). +// +// If newly allocated buffer is returned then b contents isn't copied to it. +func ResizeNoCopyNoOverallocate(b []byte, n int) []byte { if n <= cap(b) { return b[:n] } return make([]byte, n) } +// roundToNearestPow2 rounds n to the nearest power of 2 +// +// It is expected that n > 0 +func roundToNearestPow2(n int) int { + pow2 := uint8(bits.Len(uint(n - 1))) + return 1 << pow2 +} + // ToUnsafeString converts b to string without memory allocations. // // The returned string is valid only until b is reachable and unmodified. diff --git a/lib/bytesutil/bytesutil_test.go b/lib/bytesutil/bytesutil_test.go index c59ff4e86..a3ab29a92 100644 --- a/lib/bytesutil/bytesutil_test.go +++ b/lib/bytesutil/bytesutil_test.go @@ -5,26 +5,63 @@ import ( "testing" ) -func TestResizeNoCopy(t *testing.T) { +func TestRoundToNearestPow2(t *testing.T) { + f := func(n, resultExpected int) { + t.Helper() + result := roundToNearestPow2(n) + if result != resultExpected { + t.Fatalf("unexpected roundtoNearestPow2(%d); got %d; want %d", n, result, resultExpected) + } + } + f(1, 1) + f(2, 2) + f(3, 4) + f(4, 4) + f(5, 8) + f(6, 8) + f(7, 8) + f(8, 8) + f(9, 16) + f(10, 16) + f(16, 16) + f(17, 32) + f(32, 32) + f(33, 64) + f(64, 64) +} + +func TestResizeNoCopyNoOverallocate(t *testing.T) { for i := 0; i < 1000; i++ { - b := ResizeNoCopy(nil, i) + b := ResizeNoCopyNoOverallocate(nil, i) if len(b) != i { - t.Fatalf("invalid b size; got %d; expecting %d", len(b), i) + t.Fatalf("invalid b size; got %d; want %d", len(b), i) } - b1 := ResizeNoCopy(b, i) + if cap(b) != i { + t.Fatalf("invalid cap(b); got %d; want %d", cap(b), i) + } + b1 := ResizeNoCopyNoOverallocate(b, i) if len(b1) != len(b) || (len(b) > 0 && &b1[0] != &b[0]) { - t.Fatalf("invalid b1; got %x; expecting %x", &b1[0], &b[0]) + t.Fatalf("invalid b1; got %x; want %x", &b1[0], &b[0]) } - b2 := ResizeNoCopy(b[:0], i) + if cap(b1) != i { + t.Fatalf("invalid cap(b1); got %d; want %d", cap(b1), i) + } + b2 := ResizeNoCopyNoOverallocate(b[:0], i) if len(b2) != len(b) || (len(b) > 0 && &b2[0] != &b[0]) { - t.Fatalf("invalid b2; got %x; expecting %x", &b2[0], &b[0]) + t.Fatalf("invalid b2; got %x; want %x", &b2[0], &b[0]) + } + if cap(b2) != i { + t.Fatalf("invalid cap(b2); got %d; want %d", cap(b2), i) } if i > 0 { b[0] = 123 - b3 := ResizeNoCopy(b, i+1) + b3 := ResizeNoCopyNoOverallocate(b, i+1) if len(b3) != i+1 { t.Fatalf("invalid b3 len; got %d; want %d", len(b3), i+1) } + if cap(b3) != i+1 { + t.Fatalf("invalid cap(b3); got %d; want %d", cap(b3), i+1) + } if &b3[0] == &b[0] { t.Fatalf("b3 must be newly allocated") } @@ -35,26 +72,75 @@ func TestResizeNoCopy(t *testing.T) { } } -func TestResizeWithCopy(t *testing.T) { +func TestResizeNoCopyMayOverallocate(t *testing.T) { for i := 0; i < 1000; i++ { - b := ResizeWithCopy(nil, i) + b := ResizeNoCopyMayOverallocate(nil, i) if len(b) != i { - t.Fatalf("invalid b size; got %d; expecting %d", len(b), i) + t.Fatalf("invalid b size; got %d; want %d", len(b), i) } - b1 := ResizeWithCopy(b, i) + capExpected := roundToNearestPow2(i) + if cap(b) != capExpected { + t.Fatalf("invalid cap(b); got %d; want %d", cap(b), capExpected) + } + b1 := ResizeNoCopyMayOverallocate(b, i) if len(b1) != len(b) || (len(b) > 0 && &b1[0] != &b[0]) { - t.Fatalf("invalid b1; got %x; expecting %x", &b1[0], &b[0]) + t.Fatalf("invalid b1; got %x; want %x", &b1[0], &b[0]) } - b2 := ResizeWithCopy(b[:0], i) + if cap(b1) != capExpected { + t.Fatalf("invalid cap(b1); got %d; want %d", cap(b1), capExpected) + } + b2 := ResizeNoCopyMayOverallocate(b[:0], i) if len(b2) != len(b) || (len(b) > 0 && &b2[0] != &b[0]) { - t.Fatalf("invalid b2; got %x; expecting %x", &b2[0], &b[0]) + t.Fatalf("invalid b2; got %x; want %x", &b2[0], &b[0]) + } + if cap(b2) != capExpected { + t.Fatalf("invalid cap(b2); got %d; want %d", cap(b2), capExpected) } if i > 0 { - b[0] = 123 - b3 := ResizeWithCopy(b, i+1) + b3 := ResizeNoCopyMayOverallocate(b, i+1) if len(b3) != i+1 { t.Fatalf("invalid b3 len; got %d; want %d", len(b3), i+1) } + capExpected = roundToNearestPow2(i + 1) + if cap(b3) != capExpected { + t.Fatalf("invalid cap(b3); got %d; want %d", cap(b3), capExpected) + } + } + } +} + +func TestResizeWithCopyNoOverallocate(t *testing.T) { + for i := 0; i < 1000; i++ { + b := ResizeWithCopyNoOverallocate(nil, i) + if len(b) != i { + t.Fatalf("invalid b size; got %d; want %d", len(b), i) + } + if cap(b) != i { + t.Fatalf("invalid cap(b); got %d; want %d", cap(b), i) + } + b1 := ResizeWithCopyNoOverallocate(b, i) + if len(b1) != len(b) || (len(b) > 0 && &b1[0] != &b[0]) { + t.Fatalf("invalid b1; got %x; want %x", &b1[0], &b[0]) + } + if cap(b1) != i { + t.Fatalf("invalid cap(b1); got %d; want %d", cap(b1), i) + } + b2 := ResizeWithCopyNoOverallocate(b[:0], i) + if len(b2) != len(b) || (len(b) > 0 && &b2[0] != &b[0]) { + t.Fatalf("invalid b2; got %x; want %x", &b2[0], &b[0]) + } + if cap(b2) != i { + t.Fatalf("invalid cap(b2); got %d; want %d", cap(b2), i) + } + if i > 0 { + b[0] = 123 + b3 := ResizeWithCopyNoOverallocate(b, i+1) + if len(b3) != i+1 { + t.Fatalf("invalid b3 len; got %d; want %d", len(b3), i+1) + } + if cap(b3) != i+1 { + t.Fatalf("invalid cap(b3); got %d; want %d", cap(b3), i+1) + } if &b3[0] == &b[0] { t.Fatalf("b3 must be newly allocated for i=%d", i) } @@ -65,6 +151,47 @@ func TestResizeWithCopy(t *testing.T) { } } +func TestResizeWithCopyMayOverallocate(t *testing.T) { + for i := 0; i < 1000; i++ { + b := ResizeWithCopyMayOverallocate(nil, i) + if len(b) != i { + t.Fatalf("invalid b size; got %d; want %d", len(b), i) + } + capExpected := roundToNearestPow2(i) + if cap(b) != capExpected { + t.Fatalf("invalid cap(b); got %d; want %d", cap(b), capExpected) + } + b1 := ResizeWithCopyMayOverallocate(b, i) + if len(b1) != len(b) || (len(b) > 0 && &b1[0] != &b[0]) { + t.Fatalf("invalid b1; got %x; want %x", &b1[0], &b[0]) + } + if cap(b1) != capExpected { + t.Fatalf("invalid cap(b1); got %d; want %d", cap(b1), capExpected) + } + b2 := ResizeWithCopyMayOverallocate(b[:0], i) + if len(b2) != len(b) || (len(b) > 0 && &b2[0] != &b[0]) { + t.Fatalf("invalid b2; got %x; want %x", &b2[0], &b[0]) + } + if cap(b2) != capExpected { + t.Fatalf("invalid cap(b2); got %d; want %d", cap(b2), capExpected) + } + if i > 0 { + b[0] = 123 + b3 := ResizeWithCopyMayOverallocate(b, i+1) + if len(b3) != i+1 { + t.Fatalf("invalid b3 len; got %d; want %d", len(b3), i+1) + } + capExpected = roundToNearestPow2(i + 1) + if cap(b3) != capExpected { + t.Fatalf("invalid cap(b3); got %d; want %d", cap(b3), capExpected) + } + if b3[0] != b[0] || b3[0] != 123 { + t.Fatalf("b3[0] must equal b[0]; got %d; want %d", b3[0], b[0]) + } + } + } +} + func TestToUnsafeString(t *testing.T) { s := "str" if !bytes.Equal([]byte("str"), ToUnsafeBytes(s)) { diff --git a/lib/ingestserver/graphite/server.go b/lib/ingestserver/graphite/server.go index 6069f2def..3474200ee 100644 --- a/lib/ingestserver/graphite/server.go +++ b/lib/ingestserver/graphite/server.go @@ -135,7 +135,7 @@ func (s *Server) serveUDP(insertHandler func(r io.Reader) error) { go func() { defer wg.Done() var bb bytesutil.ByteBuffer - bb.B = bytesutil.ResizeNoCopy(bb.B, 64*1024) + bb.B = bytesutil.ResizeNoCopyNoOverallocate(bb.B, 64*1024) for { bb.Reset() bb.B = bb.B[:cap(bb.B)] diff --git a/lib/ingestserver/influx/server.go b/lib/ingestserver/influx/server.go index f6fa873cd..9fab5cb02 100644 --- a/lib/ingestserver/influx/server.go +++ b/lib/ingestserver/influx/server.go @@ -135,7 +135,7 @@ func (s *Server) serveUDP(insertHandler func(r io.Reader) error) { go func() { defer wg.Done() var bb bytesutil.ByteBuffer - bb.B = bytesutil.ResizeNoCopy(bb.B, 64*1024) + bb.B = bytesutil.ResizeNoCopyNoOverallocate(bb.B, 64*1024) for { bb.Reset() bb.B = bb.B[:cap(bb.B)] diff --git a/lib/ingestserver/opentsdb/server.go b/lib/ingestserver/opentsdb/server.go index ee5056f8c..a00a32267 100644 --- a/lib/ingestserver/opentsdb/server.go +++ b/lib/ingestserver/opentsdb/server.go @@ -153,7 +153,7 @@ func (s *Server) serveUDP(insertHandler func(r io.Reader) error) { go func() { defer wg.Done() var bb bytesutil.ByteBuffer - bb.B = bytesutil.ResizeNoCopy(bb.B, 64*1024) + bb.B = bytesutil.ResizeNoCopyNoOverallocate(bb.B, 64*1024) for { bb.Reset() bb.B = bb.B[:cap(bb.B)] diff --git a/lib/mergeset/block_stream_reader.go b/lib/mergeset/block_stream_reader.go index ea2f1b8ee..83d0fc5ed 100644 --- a/lib/mergeset/block_stream_reader.go +++ b/lib/mergeset/block_stream_reader.go @@ -211,13 +211,13 @@ func (bsr *blockStreamReader) Next() bool { bsr.bh = &bsr.bhs[bsr.bhIdx] bsr.bhIdx++ - bsr.sb.itemsData = bytesutil.ResizeNoCopy(bsr.sb.itemsData, int(bsr.bh.itemsBlockSize)) + bsr.sb.itemsData = bytesutil.ResizeNoCopyMayOverallocate(bsr.sb.itemsData, int(bsr.bh.itemsBlockSize)) if err := fs.ReadFullData(bsr.itemsReader, bsr.sb.itemsData); err != nil { bsr.err = fmt.Errorf("cannot read compressed items block with size %d: %w", bsr.bh.itemsBlockSize, err) return false } - bsr.sb.lensData = bytesutil.ResizeNoCopy(bsr.sb.lensData, int(bsr.bh.lensBlockSize)) + bsr.sb.lensData = bytesutil.ResizeNoCopyMayOverallocate(bsr.sb.lensData, int(bsr.bh.lensBlockSize)) if err := fs.ReadFullData(bsr.lensReader, bsr.sb.lensData); err != nil { bsr.err = fmt.Errorf("cannot read compressed lens block with size %d: %w", bsr.bh.lensBlockSize, err) return false @@ -260,7 +260,7 @@ func (bsr *blockStreamReader) readNextBHS() error { bsr.mrIdx++ // Read compressed index block. - bsr.packedBuf = bytesutil.ResizeNoCopy(bsr.packedBuf, int(mr.indexBlockSize)) + bsr.packedBuf = bytesutil.ResizeNoCopyMayOverallocate(bsr.packedBuf, int(mr.indexBlockSize)) if err := fs.ReadFullData(bsr.indexReader, bsr.packedBuf); err != nil { return fmt.Errorf("cannot read compressed index block with size %d: %w", mr.indexBlockSize, err) } diff --git a/lib/mergeset/encoding.go b/lib/mergeset/encoding.go index 2e9643a60..02fd61f87 100644 --- a/lib/mergeset/encoding.go +++ b/lib/mergeset/encoding.go @@ -119,7 +119,7 @@ func (ib *inmemoryBlock) Add(x []byte) bool { } if cap(data) < maxInmemoryBlockSize { dataLen := len(data) - data = bytesutil.ResizeWithCopy(data, maxInmemoryBlockSize)[:dataLen] + data = bytesutil.ResizeWithCopyNoOverallocate(data, maxInmemoryBlockSize)[:dataLen] } dataLen := len(data) data = append(data, x...) @@ -141,7 +141,7 @@ func (ib *inmemoryBlock) sort() { data := ib.data items := ib.items bb := bbPool.Get() - b := bytesutil.ResizeNoCopy(bb.B, len(data)) + b := bytesutil.ResizeNoCopyMayOverallocate(bb.B, len(data)) b = b[:0] for i, it := range items { bLen := len(b) @@ -394,7 +394,7 @@ func (ib *inmemoryBlock) UnmarshalData(sb *storageBlock, firstItem, commonPrefix // Resize ib.data to dataLen instead of maxInmemoryBlockSize, // since the data isn't going to be resized after unmarshaling. // This may save memory for caching the unmarshaled block. - data := bytesutil.ResizeNoCopy(ib.data, dataLen) + data := bytesutil.ResizeNoCopyNoOverallocate(ib.data, dataLen) if n := int(itemsCount) - cap(ib.items); n > 0 { ib.items = append(ib.items[:cap(ib.items)], make([]Item, n)...) } @@ -492,7 +492,8 @@ func (ib *inmemoryBlock) unmarshalDataPlain(sb *storageBlock, firstItem []byte, // Unmarshal items data. data := ib.data items := ib.items - data = bytesutil.ResizeNoCopy(data, len(firstItem)+len(sb.itemsData)+len(commonPrefix)*int(itemsCount)) + dataLen := len(firstItem) + len(sb.itemsData) + len(commonPrefix)*(int(itemsCount)-1) + data = bytesutil.ResizeNoCopyNoOverallocate(data, dataLen) data = append(data[:0], firstItem...) items = append(items[:0], Item{ Start: 0, @@ -504,20 +505,23 @@ func (ib *inmemoryBlock) unmarshalDataPlain(sb *storageBlock, firstItem []byte, if uint64(len(b)) < itemLen { return fmt.Errorf("not enough data for decoding item from itemsData; want %d bytes; remained %d bytes", itemLen, len(b)) } - dataLen := len(data) + dataStart := len(data) data = append(data, commonPrefix...) data = append(data, b[:itemLen]...) items = append(items, Item{ - Start: uint32(dataLen), + Start: uint32(dataStart), End: uint32(len(data)), }) b = b[itemLen:] } - ib.data = data - ib.items = items if len(b) > 0 { return fmt.Errorf("unexpected tail left after itemsData with len %d: %q", len(b), b) } + if len(data) != dataLen { + return fmt.Errorf("unexpected data len; got %d; want %d", len(data), dataLen) + } + ib.data = data + ib.items = items return nil } diff --git a/lib/mergeset/part_search.go b/lib/mergeset/part_search.go index 2e7979bf2..2d9bb8b82 100644 --- a/lib/mergeset/part_search.go +++ b/lib/mergeset/part_search.go @@ -274,7 +274,7 @@ func (ps *partSearch) nextBHS() error { } func (ps *partSearch) readIndexBlock(mr *metaindexRow) (*indexBlock, error) { - ps.compressedIndexBuf = bytesutil.ResizeNoCopy(ps.compressedIndexBuf, int(mr.indexBlockSize)) + ps.compressedIndexBuf = bytesutil.ResizeNoCopyMayOverallocate(ps.compressedIndexBuf, int(mr.indexBlockSize)) ps.p.indexFile.MustReadAt(ps.compressedIndexBuf, int64(mr.indexBlockOffset)) var err error @@ -311,10 +311,10 @@ func (ps *partSearch) getInmemoryBlock(bh *blockHeader) (*inmemoryBlock, error) func (ps *partSearch) readInmemoryBlock(bh *blockHeader) (*inmemoryBlock, error) { ps.sb.Reset() - ps.sb.itemsData = bytesutil.ResizeNoCopy(ps.sb.itemsData, int(bh.itemsBlockSize)) + ps.sb.itemsData = bytesutil.ResizeNoCopyMayOverallocate(ps.sb.itemsData, int(bh.itemsBlockSize)) ps.p.itemsFile.MustReadAt(ps.sb.itemsData, int64(bh.itemsBlockOffset)) - ps.sb.lensData = bytesutil.ResizeNoCopy(ps.sb.lensData, int(bh.lensBlockSize)) + ps.sb.lensData = bytesutil.ResizeNoCopyMayOverallocate(ps.sb.lensData, int(bh.lensBlockSize)) ps.p.lensFile.MustReadAt(ps.sb.lensData, int64(bh.lensBlockOffset)) ib := getInmemoryBlock() diff --git a/lib/persistentqueue/persistentqueue.go b/lib/persistentqueue/persistentqueue.go index 66ad247f9..ea412177f 100644 --- a/lib/persistentqueue/persistentqueue.go +++ b/lib/persistentqueue/persistentqueue.go @@ -499,7 +499,7 @@ func (q *queue) readBlock(dst []byte) ([]byte, error) { again: // Read block len. header := headerBufPool.Get() - header.B = bytesutil.ResizeNoCopy(header.B, 8) + header.B = bytesutil.ResizeNoCopyMayOverallocate(header.B, 8) err := q.readFull(header.B) blockLen := encoding.UnmarshalUint64(header.B) headerBufPool.Put(header) @@ -520,7 +520,7 @@ again: // Read block contents. dstLen := len(dst) - dst = bytesutil.ResizeWithCopy(dst, dstLen+int(blockLen)) + dst = bytesutil.ResizeWithCopyMayOverallocate(dst, dstLen+int(blockLen)) if err := q.readFull(dst[dstLen:]); err != nil { logger.Errorf("skipping corrupted %q, since contents with size %d bytes cannot be read from it: %s", q.readerPath, blockLen, err) if err := q.skipBrokenChunkFile(); err != nil { diff --git a/lib/protoparser/common/lines_reader.go b/lib/protoparser/common/lines_reader.go index fc8f52780..1549a2079 100644 --- a/lib/protoparser/common/lines_reader.go +++ b/lib/protoparser/common/lines_reader.go @@ -40,7 +40,7 @@ func ReadLinesBlock(r io.Reader, dstBuf, tailBuf []byte) ([]byte, []byte, error) func ReadLinesBlockExt(r io.Reader, dstBuf, tailBuf []byte, maxLineLen int) ([]byte, []byte, error) { startTime := time.Now() if cap(dstBuf) < defaultBlockSize { - dstBuf = bytesutil.ResizeNoCopy(dstBuf, defaultBlockSize) + dstBuf = bytesutil.ResizeNoCopyNoOverallocate(dstBuf, defaultBlockSize) } dstBuf = append(dstBuf[:0], tailBuf...) tailBuf = tailBuf[:0] @@ -79,7 +79,7 @@ again: if cap(dstBuf) < 2*len(dstBuf) { // Increase dsbBuf capacity, so more data could be read into it. dstBufLen := len(dstBuf) - dstBuf = bytesutil.ResizeWithCopy(dstBuf, 2*cap(dstBuf)) + dstBuf = bytesutil.ResizeWithCopyNoOverallocate(dstBuf, 2*cap(dstBuf)) dstBuf = dstBuf[:dstBufLen] } goto again diff --git a/lib/protoparser/native/streamparser.go b/lib/protoparser/native/streamparser.go index 80f89581e..0cf43d6db 100644 --- a/lib/protoparser/native/streamparser.go +++ b/lib/protoparser/native/streamparser.go @@ -84,7 +84,7 @@ func ParseStream(req *http.Request, callback func(block *Block) error) error { wg.Wait() return fmt.Errorf("too big metricName size; got %d; shouldn't exceed %d", bufSize, 1024*1024) } - uw.metricNameBuf = bytesutil.ResizeNoCopy(uw.metricNameBuf, int(bufSize)) + uw.metricNameBuf = bytesutil.ResizeNoCopyMayOverallocate(uw.metricNameBuf, int(bufSize)) if _, err := io.ReadFull(br, uw.metricNameBuf); err != nil { readErrors.Inc() wg.Wait() @@ -105,7 +105,7 @@ func ParseStream(req *http.Request, callback func(block *Block) error) error { wg.Wait() return fmt.Errorf("too big native block size; got %d; shouldn't exceed %d", bufSize, 1024*1024) } - uw.blockBuf = bytesutil.ResizeNoCopy(uw.blockBuf, int(bufSize)) + uw.blockBuf = bytesutil.ResizeNoCopyMayOverallocate(uw.blockBuf, int(bufSize)) if _, err := io.ReadFull(br, uw.blockBuf); err != nil { readErrors.Inc() wg.Wait() diff --git a/lib/storage/block_stream_reader.go b/lib/storage/block_stream_reader.go index 9d9361c47..aa3149d5b 100644 --- a/lib/storage/block_stream_reader.go +++ b/lib/storage/block_stream_reader.go @@ -308,7 +308,7 @@ func (bsr *blockStreamReader) readBlock() error { if usePrevTimestamps { bsr.Block.timestampsData = append(bsr.Block.timestampsData[:0], bsr.prevTimestampsData...) } else { - bsr.Block.timestampsData = bytesutil.ResizeNoCopy(bsr.Block.timestampsData, int(bsr.Block.bh.TimestampsBlockSize)) + bsr.Block.timestampsData = bytesutil.ResizeNoCopyMayOverallocate(bsr.Block.timestampsData, int(bsr.Block.bh.TimestampsBlockSize)) if err := fs.ReadFullData(bsr.timestampsReader, bsr.Block.timestampsData); err != nil { return fmt.Errorf("cannot read timestamps block at offset %d: %w", bsr.timestampsBlockOffset, err) } @@ -317,7 +317,7 @@ func (bsr *blockStreamReader) readBlock() error { } // Read values data. - bsr.Block.valuesData = bytesutil.ResizeNoCopy(bsr.Block.valuesData, int(bsr.Block.bh.ValuesBlockSize)) + bsr.Block.valuesData = bytesutil.ResizeNoCopyMayOverallocate(bsr.Block.valuesData, int(bsr.Block.bh.ValuesBlockSize)) if err := fs.ReadFullData(bsr.valuesReader, bsr.Block.valuesData); err != nil { return fmt.Errorf("cannot read values block at offset %d: %w", bsr.valuesBlockOffset, err) } @@ -352,7 +352,7 @@ func (bsr *blockStreamReader) readIndexBlock() error { } // Read index block. - bsr.compressedIndexData = bytesutil.ResizeNoCopy(bsr.compressedIndexData, int(bsr.mr.IndexBlockSize)) + bsr.compressedIndexData = bytesutil.ResizeNoCopyMayOverallocate(bsr.compressedIndexData, int(bsr.mr.IndexBlockSize)) if err := fs.ReadFullData(bsr.indexReader, bsr.compressedIndexData); err != nil { return fmt.Errorf("cannot read index block from index data at offset %d: %w", bsr.indexBlockOffset, err) } diff --git a/lib/storage/metric_name.go b/lib/storage/metric_name.go index 6ec6fa18b..0d8d73ed2 100644 --- a/lib/storage/metric_name.go +++ b/lib/storage/metric_name.go @@ -370,7 +370,7 @@ func (mn *MetricName) Marshal(dst []byte) []byte { tag := &mn.Tags[i] requiredSize += len(tag.Key) + len(tag.Value) + 2 } - dst = bytesutil.ResizeWithCopy(dst, requiredSize)[:dstLen] + dst = bytesutil.ResizeWithCopyMayOverallocate(dst, requiredSize)[:dstLen] // Marshal MetricGroup dst = marshalTagValue(dst, mn.MetricGroup) @@ -477,7 +477,7 @@ func MarshalMetricNameRaw(dst []byte, labels []prompb.Label) []byte { dstSize += len(label.Value) dstSize += 4 } - dst = bytesutil.ResizeWithCopy(dst, dstSize)[:dstLen] + dst = bytesutil.ResizeWithCopyMayOverallocate(dst, dstSize)[:dstLen] // Marshal labels to dst. for i := range labels { diff --git a/lib/storage/part_search.go b/lib/storage/part_search.go index b2542f663..1ceacd828 100644 --- a/lib/storage/part_search.go +++ b/lib/storage/part_search.go @@ -211,7 +211,7 @@ func skipSmallMetaindexRows(metaindex []metaindexRow, tsid *TSID) []metaindexRow } func (ps *partSearch) readIndexBlock(mr *metaindexRow) (*indexBlock, error) { - ps.compressedIndexBuf = bytesutil.ResizeNoCopy(ps.compressedIndexBuf, int(mr.IndexBlockSize)) + ps.compressedIndexBuf = bytesutil.ResizeNoCopyMayOverallocate(ps.compressedIndexBuf, int(mr.IndexBlockSize)) ps.p.indexFile.MustReadAt(ps.compressedIndexBuf, int64(mr.IndexBlockOffset)) var err error diff --git a/lib/storage/search.go b/lib/storage/search.go index 9f399a79c..7b9e15dc1 100644 --- a/lib/storage/search.go +++ b/lib/storage/search.go @@ -75,10 +75,10 @@ func (br *BlockRef) MustReadBlock(dst *Block, fetchData bool) { return } - dst.timestampsData = bytesutil.ResizeNoCopy(dst.timestampsData, int(br.bh.TimestampsBlockSize)) + dst.timestampsData = bytesutil.ResizeNoCopyMayOverallocate(dst.timestampsData, int(br.bh.TimestampsBlockSize)) br.p.timestampsFile.MustReadAt(dst.timestampsData, int64(br.bh.TimestampsBlockOffset)) - dst.valuesData = bytesutil.ResizeNoCopy(dst.valuesData, int(br.bh.ValuesBlockSize)) + dst.valuesData = bytesutil.ResizeNoCopyMayOverallocate(dst.valuesData, int(br.bh.ValuesBlockSize)) br.p.valuesFile.MustReadAt(dst.valuesData, int64(br.bh.ValuesBlockOffset)) }