From 6232eaa9384bd2577743db9cecae8fd56dd716a2 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 25 Jan 2022 15:16:24 +0200 Subject: [PATCH] lib/bytesutil: split Resize() into ResizeNoCopy() and ResizeWithCopy() functions Previously bytesutil.Resize() was copying the original byte slice contents to a newly allocated slice. This wasted CPU cycles and memory bandwidth in some places, where the original slice contents wasn't needed after slize resizing. Switch such places to bytesutil.ResizeNoCopy(). Rename the original bytesutil.Resize() function to bytesutil.ResizeWithCopy() for the sake of improved readability. Additionally, allocate new slice with `make()` instead of `append()`. This guarantees that the capacity of the allocated slice exactly matches the requested size. The `append()` could return a slice with bigger capacity as an optimization for further `append()` calls. This could result in excess memory usage when the returned byte slice was cached (for instance, in lib/blockcache). Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2007 --- app/vmselect/netstorage/netstorage.go | 4 +- app/vmselect/netstorage/tmp_blocks_file.go | 2 +- app/vmselect/promql/timeseries.go | 2 +- app/vmstorage/transport/server.go | 12 ++-- lib/bytesutil/bytebuffer.go | 2 +- lib/bytesutil/bytesutil.go | 27 +++++++-- lib/bytesutil/bytesutil_test.go | 55 +++++++++++++++++-- lib/ingestserver/graphite/server.go | 2 +- lib/ingestserver/influx/server.go | 2 +- lib/ingestserver/opentsdb/server.go | 2 +- lib/mergeset/block_stream_reader.go | 6 +- lib/mergeset/encoding.go | 8 +-- lib/mergeset/part_search.go | 6 +- lib/persistentqueue/persistentqueue.go | 4 +- lib/protoparser/clusternative/streamparser.go | 4 +- lib/protoparser/common/lines_reader.go | 4 +- lib/protoparser/native/streamparser.go | 4 +- lib/storage/block_stream_reader.go | 6 +- lib/storage/metric_name.go | 8 +-- lib/storage/part_search.go | 2 +- lib/storage/search.go | 4 +- 21 files changed, 112 insertions(+), 54 deletions(-) diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 2ba37228ad..ba520f6e17 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -2537,7 +2537,7 @@ func sendAccountIDProjectID(bc *handshake.BufferedConn, accountID, projectID uin } func readBytes(buf []byte, bc *handshake.BufferedConn, maxDataSize int) ([]byte, error) { - buf = bytesutil.Resize(buf, 8) + buf = bytesutil.ResizeNoCopy(buf, 8) if n, err := io.ReadFull(bc, buf); err != nil { return buf, fmt.Errorf("cannot read %d bytes with data size: %w; read only %d bytes", len(buf), err, n) } @@ -2545,7 +2545,7 @@ func readBytes(buf []byte, bc *handshake.BufferedConn, maxDataSize int) ([]byte, if dataSize > uint64(maxDataSize) { return buf, fmt.Errorf("too big data size: %d; it mustn't exceed %d bytes", dataSize, maxDataSize) } - buf = bytesutil.Resize(buf, int(dataSize)) + buf = bytesutil.ResizeNoCopy(buf, int(dataSize)) if dataSize == 0 { return buf, nil } diff --git a/app/vmselect/netstorage/tmp_blocks_file.go b/app/vmselect/netstorage/tmp_blocks_file.go index 4b17be7801..fb96f44b50 100644 --- a/app/vmselect/netstorage/tmp_blocks_file.go +++ b/app/vmselect/netstorage/tmp_blocks_file.go @@ -149,7 +149,7 @@ func (tbf *tmpBlocksFile) MustReadBlockAt(dst *storage.Block, addr tmpBlockAddr) } else { bb := tmpBufPool.Get() defer tmpBufPool.Put(bb) - bb.B = bytesutil.Resize(bb.B, addr.size) + bb.B = bytesutil.ResizeNoCopy(bb.B, addr.size) tbf.r.MustReadAt(bb.B, int64(addr.offset)) buf = bb.B } diff --git a/app/vmselect/promql/timeseries.go b/app/vmselect/promql/timeseries.go index 36233fbffe..cc7fc664c1 100644 --- a/app/vmselect/promql/timeseries.go +++ b/app/vmselect/promql/timeseries.go @@ -98,7 +98,7 @@ func marshalTimeseriesFast(dst []byte, tss []*timeseries, maxSize int, step int6 // Allocate the buffer for the marshaled tss before its' marshaling. // This should reduce memory fragmentation and memory usage. - dst = bytesutil.Resize(dst, size) + dst = bytesutil.ResizeNoCopy(dst, size) dst = marshalFastTimestamps(dst[:0], tss[0].Timestamps) for _, ts := range tss { dst = ts.marshalFastNoTimestamps(dst) diff --git a/app/vmstorage/transport/server.go b/app/vmstorage/transport/server.go index 9cb19829dd..ce779bf7c8 100644 --- a/app/vmstorage/transport/server.go +++ b/app/vmstorage/transport/server.go @@ -321,7 +321,7 @@ func (ctx *vmselectRequestCtx) readTimeRange() (storage.TimeRange, error) { } func (ctx *vmselectRequestCtx) readUint32() (uint32, error) { - ctx.sizeBuf = bytesutil.Resize(ctx.sizeBuf, 4) + ctx.sizeBuf = bytesutil.ResizeNoCopy(ctx.sizeBuf, 4) if _, err := io.ReadFull(ctx.bc, ctx.sizeBuf); err != nil { if err == io.EOF { return 0, err @@ -333,7 +333,7 @@ func (ctx *vmselectRequestCtx) readUint32() (uint32, error) { } func (ctx *vmselectRequestCtx) readUint64() (uint64, error) { - ctx.sizeBuf = bytesutil.Resize(ctx.sizeBuf, 8) + ctx.sizeBuf = bytesutil.ResizeNoCopy(ctx.sizeBuf, 8) if _, err := io.ReadFull(ctx.bc, ctx.sizeBuf); err != nil { if err == io.EOF { return 0, err @@ -371,7 +371,7 @@ func (ctx *vmselectRequestCtx) readSearchQuery() error { } func (ctx *vmselectRequestCtx) readDataBufBytes(maxDataSize int) error { - ctx.sizeBuf = bytesutil.Resize(ctx.sizeBuf, 8) + ctx.sizeBuf = bytesutil.ResizeNoCopy(ctx.sizeBuf, 8) if _, err := io.ReadFull(ctx.bc, ctx.sizeBuf); err != nil { if err == io.EOF { return err @@ -382,7 +382,7 @@ func (ctx *vmselectRequestCtx) readDataBufBytes(maxDataSize int) error { if dataSize > uint64(maxDataSize) { return fmt.Errorf("too big data size: %d; it mustn't exceed %d bytes", dataSize, maxDataSize) } - ctx.dataBuf = bytesutil.Resize(ctx.dataBuf, int(dataSize)) + ctx.dataBuf = bytesutil.ResizeNoCopy(ctx.dataBuf, int(dataSize)) if dataSize == 0 { return nil } @@ -393,7 +393,7 @@ func (ctx *vmselectRequestCtx) readDataBufBytes(maxDataSize int) error { } func (ctx *vmselectRequestCtx) readBool() (bool, error) { - ctx.dataBuf = bytesutil.Resize(ctx.dataBuf, 1) + ctx.dataBuf = bytesutil.ResizeNoCopy(ctx.dataBuf, 1) if _, err := io.ReadFull(ctx.bc, ctx.dataBuf); err != nil { if err == io.EOF { return false, err @@ -405,7 +405,7 @@ func (ctx *vmselectRequestCtx) readBool() (bool, error) { } func (ctx *vmselectRequestCtx) readByte() (byte, error) { - ctx.dataBuf = bytesutil.Resize(ctx.dataBuf, 1) + ctx.dataBuf = bytesutil.ResizeNoCopy(ctx.dataBuf, 1) if _, err := io.ReadFull(ctx.bc, ctx.dataBuf); err != nil { if err == io.EOF { return 0, err diff --git a/lib/bytesutil/bytebuffer.go b/lib/bytesutil/bytebuffer.go index 6bdd9d921e..3aec86498b 100644 --- a/lib/bytesutil/bytebuffer.go +++ b/lib/bytesutil/bytebuffer.go @@ -53,7 +53,7 @@ func (bb *ByteBuffer) MustReadAt(p []byte, offset int64) { func (bb *ByteBuffer) ReadFrom(r io.Reader) (int64, error) { b := bb.B bLen := len(b) - b = Resize(b, 4*1024) + b = ResizeWithCopy(b, 4*1024) b = b[:cap(b)] offset := bLen for { diff --git a/lib/bytesutil/bytesutil.go b/lib/bytesutil/bytesutil.go index 4dd26840ba..1ead40f5ca 100644 --- a/lib/bytesutil/bytesutil.go +++ b/lib/bytesutil/bytesutil.go @@ -5,12 +5,29 @@ import ( "unsafe" ) -// Resize resizes b to n bytes and returns b (which may be newly allocated). -func Resize(b []byte, n int) []byte { - if nn := n - cap(b); nn > 0 { - b = append(b[:cap(b)], make([]byte, nn)...) +// ResizeWithCopy resizes b to n bytes and returns the resized buffer (which may be newly allocated). +// +// If newly allocated buffer is returned then b contents is copied to it. +func ResizeWithCopy(b []byte, n int) []byte { + if n <= cap(b) { + return b[:n] } - return b[:n] + // Allocate the exact number of bytes instead of using `b = append(b[:cap(b)], make([]byte, nn)...)`, + // since `append()` may allocate more than the requested bytes for additional capacity. + // Using make() instead of append() should save RAM when the resized slice is cached somewhere. + bNew := make([]byte, n) + copy(bNew, b) + return bNew +} + +// ResizeNoCopy resizes b to n bytes and returns the resized buffer (which may be newly allocated). +// +// If newly allocated buffer is returned then b contents isn't copied to it. +func ResizeNoCopy(b []byte, n int) []byte { + if n <= cap(b) { + return b[:n] + } + return make([]byte, n) } // ToUnsafeString converts b to string without memory allocations. diff --git a/lib/bytesutil/bytesutil_test.go b/lib/bytesutil/bytesutil_test.go index 721b5c2eda..c59ff4e863 100644 --- a/lib/bytesutil/bytesutil_test.go +++ b/lib/bytesutil/bytesutil_test.go @@ -5,19 +5,62 @@ import ( "testing" ) -func TestResize(t *testing.T) { +func TestResizeNoCopy(t *testing.T) { for i := 0; i < 1000; i++ { - b := Resize(nil, i) + b := ResizeNoCopy(nil, i) if len(b) != i { t.Fatalf("invalid b size; got %d; expecting %d", len(b), i) } - b1 := Resize(b, i) + b1 := ResizeNoCopy(b, i) if len(b1) != len(b) || (len(b) > 0 && &b1[0] != &b[0]) { - t.Fatalf("invalid b1; got %x; expecting %x", b1, b) + t.Fatalf("invalid b1; got %x; expecting %x", &b1[0], &b[0]) } - b2 := Resize(b[:0], i) + b2 := ResizeNoCopy(b[:0], i) if len(b2) != len(b) || (len(b) > 0 && &b2[0] != &b[0]) { - t.Fatalf("invalid b2; got %x; expecting %x", b2, b) + t.Fatalf("invalid b2; got %x; expecting %x", &b2[0], &b[0]) + } + if i > 0 { + b[0] = 123 + b3 := ResizeNoCopy(b, i+1) + if len(b3) != i+1 { + t.Fatalf("invalid b3 len; got %d; want %d", len(b3), i+1) + } + if &b3[0] == &b[0] { + t.Fatalf("b3 must be newly allocated") + } + if b3[0] != 0 { + t.Fatalf("b3[0] must be zeroed; got %d", b3[0]) + } + } + } +} + +func TestResizeWithCopy(t *testing.T) { + for i := 0; i < 1000; i++ { + b := ResizeWithCopy(nil, i) + if len(b) != i { + t.Fatalf("invalid b size; got %d; expecting %d", len(b), i) + } + b1 := ResizeWithCopy(b, i) + if len(b1) != len(b) || (len(b) > 0 && &b1[0] != &b[0]) { + t.Fatalf("invalid b1; got %x; expecting %x", &b1[0], &b[0]) + } + b2 := ResizeWithCopy(b[:0], i) + if len(b2) != len(b) || (len(b) > 0 && &b2[0] != &b[0]) { + t.Fatalf("invalid b2; got %x; expecting %x", &b2[0], &b[0]) + } + if i > 0 { + b[0] = 123 + b3 := ResizeWithCopy(b, i+1) + if len(b3) != i+1 { + t.Fatalf("invalid b3 len; got %d; want %d", len(b3), i+1) + } + if &b3[0] == &b[0] { + t.Fatalf("b3 must be newly allocated for i=%d", i) + } + if b3[0] != b[0] || b3[0] != 123 { + t.Fatalf("b3[0] must equal b[0]; got %d; want %d", b3[0], b[0]) + } } } } diff --git a/lib/ingestserver/graphite/server.go b/lib/ingestserver/graphite/server.go index bdc949f0c1..6069f2def9 100644 --- a/lib/ingestserver/graphite/server.go +++ b/lib/ingestserver/graphite/server.go @@ -135,7 +135,7 @@ func (s *Server) serveUDP(insertHandler func(r io.Reader) error) { go func() { defer wg.Done() var bb bytesutil.ByteBuffer - bb.B = bytesutil.Resize(bb.B, 64*1024) + bb.B = bytesutil.ResizeNoCopy(bb.B, 64*1024) for { bb.Reset() bb.B = bb.B[:cap(bb.B)] diff --git a/lib/ingestserver/influx/server.go b/lib/ingestserver/influx/server.go index 8a412e2396..f6fa873cdf 100644 --- a/lib/ingestserver/influx/server.go +++ b/lib/ingestserver/influx/server.go @@ -135,7 +135,7 @@ func (s *Server) serveUDP(insertHandler func(r io.Reader) error) { go func() { defer wg.Done() var bb bytesutil.ByteBuffer - bb.B = bytesutil.Resize(bb.B, 64*1024) + bb.B = bytesutil.ResizeNoCopy(bb.B, 64*1024) for { bb.Reset() bb.B = bb.B[:cap(bb.B)] diff --git a/lib/ingestserver/opentsdb/server.go b/lib/ingestserver/opentsdb/server.go index a16267a66e..ee5056f8cb 100644 --- a/lib/ingestserver/opentsdb/server.go +++ b/lib/ingestserver/opentsdb/server.go @@ -153,7 +153,7 @@ func (s *Server) serveUDP(insertHandler func(r io.Reader) error) { go func() { defer wg.Done() var bb bytesutil.ByteBuffer - bb.B = bytesutil.Resize(bb.B, 64*1024) + bb.B = bytesutil.ResizeNoCopy(bb.B, 64*1024) for { bb.Reset() bb.B = bb.B[:cap(bb.B)] diff --git a/lib/mergeset/block_stream_reader.go b/lib/mergeset/block_stream_reader.go index 06a28b2381..ea2f1b8eee 100644 --- a/lib/mergeset/block_stream_reader.go +++ b/lib/mergeset/block_stream_reader.go @@ -211,13 +211,13 @@ func (bsr *blockStreamReader) Next() bool { bsr.bh = &bsr.bhs[bsr.bhIdx] bsr.bhIdx++ - bsr.sb.itemsData = bytesutil.Resize(bsr.sb.itemsData, int(bsr.bh.itemsBlockSize)) + bsr.sb.itemsData = bytesutil.ResizeNoCopy(bsr.sb.itemsData, int(bsr.bh.itemsBlockSize)) if err := fs.ReadFullData(bsr.itemsReader, bsr.sb.itemsData); err != nil { bsr.err = fmt.Errorf("cannot read compressed items block with size %d: %w", bsr.bh.itemsBlockSize, err) return false } - bsr.sb.lensData = bytesutil.Resize(bsr.sb.lensData, int(bsr.bh.lensBlockSize)) + bsr.sb.lensData = bytesutil.ResizeNoCopy(bsr.sb.lensData, int(bsr.bh.lensBlockSize)) if err := fs.ReadFullData(bsr.lensReader, bsr.sb.lensData); err != nil { bsr.err = fmt.Errorf("cannot read compressed lens block with size %d: %w", bsr.bh.lensBlockSize, err) return false @@ -260,7 +260,7 @@ func (bsr *blockStreamReader) readNextBHS() error { bsr.mrIdx++ // Read compressed index block. - bsr.packedBuf = bytesutil.Resize(bsr.packedBuf, int(mr.indexBlockSize)) + bsr.packedBuf = bytesutil.ResizeNoCopy(bsr.packedBuf, int(mr.indexBlockSize)) if err := fs.ReadFullData(bsr.indexReader, bsr.packedBuf); err != nil { return fmt.Errorf("cannot read compressed index block with size %d: %w", mr.indexBlockSize, err) } diff --git a/lib/mergeset/encoding.go b/lib/mergeset/encoding.go index 99a2197ceb..2e9643a60d 100644 --- a/lib/mergeset/encoding.go +++ b/lib/mergeset/encoding.go @@ -119,7 +119,7 @@ func (ib *inmemoryBlock) Add(x []byte) bool { } if cap(data) < maxInmemoryBlockSize { dataLen := len(data) - data = bytesutil.Resize(data, maxInmemoryBlockSize)[:dataLen] + data = bytesutil.ResizeWithCopy(data, maxInmemoryBlockSize)[:dataLen] } dataLen := len(data) data = append(data, x...) @@ -141,7 +141,7 @@ func (ib *inmemoryBlock) sort() { data := ib.data items := ib.items bb := bbPool.Get() - b := bytesutil.Resize(bb.B, len(data)) + b := bytesutil.ResizeNoCopy(bb.B, len(data)) b = b[:0] for i, it := range items { bLen := len(b) @@ -394,7 +394,7 @@ func (ib *inmemoryBlock) UnmarshalData(sb *storageBlock, firstItem, commonPrefix // Resize ib.data to dataLen instead of maxInmemoryBlockSize, // since the data isn't going to be resized after unmarshaling. // This may save memory for caching the unmarshaled block. - data := bytesutil.Resize(ib.data, dataLen) + data := bytesutil.ResizeNoCopy(ib.data, dataLen) if n := int(itemsCount) - cap(ib.items); n > 0 { ib.items = append(ib.items[:cap(ib.items)], make([]Item, n)...) } @@ -492,7 +492,7 @@ func (ib *inmemoryBlock) unmarshalDataPlain(sb *storageBlock, firstItem []byte, // Unmarshal items data. data := ib.data items := ib.items - data = bytesutil.Resize(data, len(firstItem)+len(sb.itemsData)+len(commonPrefix)*int(itemsCount)) + data = bytesutil.ResizeNoCopy(data, len(firstItem)+len(sb.itemsData)+len(commonPrefix)*int(itemsCount)) data = append(data[:0], firstItem...) items = append(items[:0], Item{ Start: 0, diff --git a/lib/mergeset/part_search.go b/lib/mergeset/part_search.go index 479070a006..2e7979bf25 100644 --- a/lib/mergeset/part_search.go +++ b/lib/mergeset/part_search.go @@ -274,7 +274,7 @@ func (ps *partSearch) nextBHS() error { } func (ps *partSearch) readIndexBlock(mr *metaindexRow) (*indexBlock, error) { - ps.compressedIndexBuf = bytesutil.Resize(ps.compressedIndexBuf, int(mr.indexBlockSize)) + ps.compressedIndexBuf = bytesutil.ResizeNoCopy(ps.compressedIndexBuf, int(mr.indexBlockSize)) ps.p.indexFile.MustReadAt(ps.compressedIndexBuf, int64(mr.indexBlockOffset)) var err error @@ -311,10 +311,10 @@ func (ps *partSearch) getInmemoryBlock(bh *blockHeader) (*inmemoryBlock, error) func (ps *partSearch) readInmemoryBlock(bh *blockHeader) (*inmemoryBlock, error) { ps.sb.Reset() - ps.sb.itemsData = bytesutil.Resize(ps.sb.itemsData, int(bh.itemsBlockSize)) + ps.sb.itemsData = bytesutil.ResizeNoCopy(ps.sb.itemsData, int(bh.itemsBlockSize)) ps.p.itemsFile.MustReadAt(ps.sb.itemsData, int64(bh.itemsBlockOffset)) - ps.sb.lensData = bytesutil.Resize(ps.sb.lensData, int(bh.lensBlockSize)) + ps.sb.lensData = bytesutil.ResizeNoCopy(ps.sb.lensData, int(bh.lensBlockSize)) ps.p.lensFile.MustReadAt(ps.sb.lensData, int64(bh.lensBlockOffset)) ib := getInmemoryBlock() diff --git a/lib/persistentqueue/persistentqueue.go b/lib/persistentqueue/persistentqueue.go index 3bbb9c519a..66ad247f9f 100644 --- a/lib/persistentqueue/persistentqueue.go +++ b/lib/persistentqueue/persistentqueue.go @@ -499,7 +499,7 @@ func (q *queue) readBlock(dst []byte) ([]byte, error) { again: // Read block len. header := headerBufPool.Get() - header.B = bytesutil.Resize(header.B, 8) + header.B = bytesutil.ResizeNoCopy(header.B, 8) err := q.readFull(header.B) blockLen := encoding.UnmarshalUint64(header.B) headerBufPool.Put(header) @@ -520,7 +520,7 @@ again: // Read block contents. dstLen := len(dst) - dst = bytesutil.Resize(dst, dstLen+int(blockLen)) + dst = bytesutil.ResizeWithCopy(dst, dstLen+int(blockLen)) if err := q.readFull(dst[dstLen:]); err != nil { logger.Errorf("skipping corrupted %q, since contents with size %d bytes cannot be read from it: %s", q.readerPath, blockLen, err) if err := q.skipBrokenChunkFile(); err != nil { diff --git a/lib/protoparser/clusternative/streamparser.go b/lib/protoparser/clusternative/streamparser.go index 47d1f3c27a..613214ed27 100644 --- a/lib/protoparser/clusternative/streamparser.go +++ b/lib/protoparser/clusternative/streamparser.go @@ -65,7 +65,7 @@ func ParseStream(bc *handshake.BufferedConn, callback func(rows []storage.Metric func readBlock(dst []byte, bc *handshake.BufferedConn, isReadOnly func() bool) ([]byte, error) { sizeBuf := auxBufPool.Get() defer auxBufPool.Put(sizeBuf) - sizeBuf.B = bytesutil.Resize(sizeBuf.B, 8) + sizeBuf.B = bytesutil.ResizeNoCopy(sizeBuf.B, 8) if _, err := io.ReadFull(bc, sizeBuf.B); err != nil { if err != io.EOF { readErrors.Inc() @@ -79,7 +79,7 @@ func readBlock(dst []byte, bc *handshake.BufferedConn, isReadOnly func() bool) ( return dst, fmt.Errorf("too big packet size: %d; shouldn't exceed %d", packetSize, consts.MaxInsertPacketSize) } dstLen := len(dst) - dst = bytesutil.Resize(dst, dstLen+int(packetSize)) + dst = bytesutil.ResizeWithCopy(dst, dstLen+int(packetSize)) if n, err := io.ReadFull(bc, dst[dstLen:]); err != nil { readErrors.Inc() return dst, fmt.Errorf("cannot read packet with size %d bytes: %w; read only %d bytes", packetSize, err, n) diff --git a/lib/protoparser/common/lines_reader.go b/lib/protoparser/common/lines_reader.go index 6e9ed3fb21..fc8f52780c 100644 --- a/lib/protoparser/common/lines_reader.go +++ b/lib/protoparser/common/lines_reader.go @@ -40,7 +40,7 @@ func ReadLinesBlock(r io.Reader, dstBuf, tailBuf []byte) ([]byte, []byte, error) func ReadLinesBlockExt(r io.Reader, dstBuf, tailBuf []byte, maxLineLen int) ([]byte, []byte, error) { startTime := time.Now() if cap(dstBuf) < defaultBlockSize { - dstBuf = bytesutil.Resize(dstBuf, defaultBlockSize) + dstBuf = bytesutil.ResizeNoCopy(dstBuf, defaultBlockSize) } dstBuf = append(dstBuf[:0], tailBuf...) tailBuf = tailBuf[:0] @@ -79,7 +79,7 @@ again: if cap(dstBuf) < 2*len(dstBuf) { // Increase dsbBuf capacity, so more data could be read into it. dstBufLen := len(dstBuf) - dstBuf = bytesutil.Resize(dstBuf, 2*cap(dstBuf)) + dstBuf = bytesutil.ResizeWithCopy(dstBuf, 2*cap(dstBuf)) dstBuf = dstBuf[:dstBufLen] } goto again diff --git a/lib/protoparser/native/streamparser.go b/lib/protoparser/native/streamparser.go index fa9c8af7a3..b4e0605e8b 100644 --- a/lib/protoparser/native/streamparser.go +++ b/lib/protoparser/native/streamparser.go @@ -84,7 +84,7 @@ func ParseStream(req *http.Request, callback func(block *Block) error) error { wg.Wait() return fmt.Errorf("too big metricName size; got %d; shouldn't exceed %d", bufSize, 1024*1024) } - uw.metricNameBuf = bytesutil.Resize(uw.metricNameBuf, int(bufSize)) + uw.metricNameBuf = bytesutil.ResizeNoCopy(uw.metricNameBuf, int(bufSize)) if _, err := io.ReadFull(br, uw.metricNameBuf); err != nil { readErrors.Inc() wg.Wait() @@ -105,7 +105,7 @@ func ParseStream(req *http.Request, callback func(block *Block) error) error { wg.Wait() return fmt.Errorf("too big native block size; got %d; shouldn't exceed %d", bufSize, 1024*1024) } - uw.blockBuf = bytesutil.Resize(uw.blockBuf, int(bufSize)) + uw.blockBuf = bytesutil.ResizeNoCopy(uw.blockBuf, int(bufSize)) if _, err := io.ReadFull(br, uw.blockBuf); err != nil { readErrors.Inc() wg.Wait() diff --git a/lib/storage/block_stream_reader.go b/lib/storage/block_stream_reader.go index 72b1966c7f..6398adf226 100644 --- a/lib/storage/block_stream_reader.go +++ b/lib/storage/block_stream_reader.go @@ -303,7 +303,7 @@ func (bsr *blockStreamReader) readBlock() error { if usePrevTimestamps { bsr.Block.timestampsData = append(bsr.Block.timestampsData[:0], bsr.prevTimestampsData...) } else { - bsr.Block.timestampsData = bytesutil.Resize(bsr.Block.timestampsData, int(bsr.Block.bh.TimestampsBlockSize)) + bsr.Block.timestampsData = bytesutil.ResizeNoCopy(bsr.Block.timestampsData, int(bsr.Block.bh.TimestampsBlockSize)) if err := fs.ReadFullData(bsr.timestampsReader, bsr.Block.timestampsData); err != nil { return fmt.Errorf("cannot read timestamps block at offset %d: %w", bsr.timestampsBlockOffset, err) } @@ -312,7 +312,7 @@ func (bsr *blockStreamReader) readBlock() error { } // Read values data. - bsr.Block.valuesData = bytesutil.Resize(bsr.Block.valuesData, int(bsr.Block.bh.ValuesBlockSize)) + bsr.Block.valuesData = bytesutil.ResizeNoCopy(bsr.Block.valuesData, int(bsr.Block.bh.ValuesBlockSize)) if err := fs.ReadFullData(bsr.valuesReader, bsr.Block.valuesData); err != nil { return fmt.Errorf("cannot read values block at offset %d: %w", bsr.valuesBlockOffset, err) } @@ -347,7 +347,7 @@ func (bsr *blockStreamReader) readIndexBlock() error { } // Read index block. - bsr.compressedIndexData = bytesutil.Resize(bsr.compressedIndexData, int(bsr.mr.IndexBlockSize)) + bsr.compressedIndexData = bytesutil.ResizeNoCopy(bsr.compressedIndexData, int(bsr.mr.IndexBlockSize)) if err := fs.ReadFullData(bsr.indexReader, bsr.compressedIndexData); err != nil { return fmt.Errorf("cannot read index block from index data at offset %d: %w", bsr.indexBlockOffset, err) } diff --git a/lib/storage/metric_name.go b/lib/storage/metric_name.go index 67c82c961b..d1283fce30 100644 --- a/lib/storage/metric_name.go +++ b/lib/storage/metric_name.go @@ -377,8 +377,7 @@ func (mn *MetricName) Marshal(dst []byte) []byte { tag := &mn.Tags[i] requiredSize += len(tag.Key) + len(tag.Value) + 2 } - dst = bytesutil.Resize(dst, requiredSize) - dst = dst[:dstLen] + dst = bytesutil.ResizeWithCopy(dst, requiredSize)[:dstLen] dst = encoding.MarshalUint32(dst, mn.AccountID) dst = encoding.MarshalUint32(dst, mn.ProjectID) @@ -439,8 +438,7 @@ func (mn *MetricName) MarshalNoAccountIDProjectID(dst []byte) []byte { tag := &mn.Tags[i] requiredSize += len(tag.Key) + len(tag.Value) + 2 } - dst = bytesutil.Resize(dst, requiredSize) - dst = dst[:dstLen] + dst = bytesutil.ResizeWithCopy(dst, requiredSize)[:dstLen] dst = marshalTagValue(dst, mn.MetricGroup) tags := mn.Tags @@ -546,7 +544,7 @@ func MarshalMetricNameRaw(dst []byte, accountID, projectID uint32, labels []prom dstSize += len(label.Value) dstSize += 4 } - dst = bytesutil.Resize(dst, dstSize)[:dstLen] + dst = bytesutil.ResizeWithCopy(dst, dstSize)[:dstLen] // Marshal labels to dst. dst = encoding.MarshalUint32(dst, accountID) diff --git a/lib/storage/part_search.go b/lib/storage/part_search.go index 6e7af19510..b2542f6631 100644 --- a/lib/storage/part_search.go +++ b/lib/storage/part_search.go @@ -211,7 +211,7 @@ func skipSmallMetaindexRows(metaindex []metaindexRow, tsid *TSID) []metaindexRow } func (ps *partSearch) readIndexBlock(mr *metaindexRow) (*indexBlock, error) { - ps.compressedIndexBuf = bytesutil.Resize(ps.compressedIndexBuf[:0], int(mr.IndexBlockSize)) + ps.compressedIndexBuf = bytesutil.ResizeNoCopy(ps.compressedIndexBuf, int(mr.IndexBlockSize)) ps.p.indexFile.MustReadAt(ps.compressedIndexBuf, int64(mr.IndexBlockOffset)) var err error diff --git a/lib/storage/search.go b/lib/storage/search.go index 09468e648f..1554aa1a96 100644 --- a/lib/storage/search.go +++ b/lib/storage/search.go @@ -40,10 +40,10 @@ func (br *BlockRef) MustReadBlock(dst *Block, fetchData bool) { return } - dst.timestampsData = bytesutil.Resize(dst.timestampsData[:0], int(br.bh.TimestampsBlockSize)) + dst.timestampsData = bytesutil.ResizeNoCopy(dst.timestampsData, int(br.bh.TimestampsBlockSize)) br.p.timestampsFile.MustReadAt(dst.timestampsData, int64(br.bh.TimestampsBlockOffset)) - dst.valuesData = bytesutil.Resize(dst.valuesData[:0], int(br.bh.ValuesBlockSize)) + dst.valuesData = bytesutil.ResizeNoCopy(dst.valuesData, int(br.bh.ValuesBlockSize)) br.p.valuesFile.MustReadAt(dst.valuesData, int64(br.bh.ValuesBlockOffset)) }