mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/bytesutil: split Resize* funcs to MayOverallocate and NoOverallocate for more fine-grained control over memory allocations
Follow-up for f4989edd96
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2007
This commit is contained in:
parent
e13ce2ee98
commit
4bdd10ab90
18 changed files with 220 additions and 58 deletions
|
@ -149,7 +149,7 @@ func (tbf *tmpBlocksFile) MustReadBlockRefAt(partRef storage.PartRef, addr tmpBl
|
||||||
} else {
|
} else {
|
||||||
bb := tmpBufPool.Get()
|
bb := tmpBufPool.Get()
|
||||||
defer tmpBufPool.Put(bb)
|
defer tmpBufPool.Put(bb)
|
||||||
bb.B = bytesutil.ResizeNoCopy(bb.B, addr.size)
|
bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, addr.size)
|
||||||
tbf.r.MustReadAt(bb.B, int64(addr.offset))
|
tbf.r.MustReadAt(bb.B, int64(addr.offset))
|
||||||
buf = bb.B
|
buf = bb.B
|
||||||
}
|
}
|
||||||
|
|
|
@ -98,7 +98,7 @@ func marshalTimeseriesFast(dst []byte, tss []*timeseries, maxSize int, step int6
|
||||||
|
|
||||||
// Allocate the buffer for the marshaled tss before its' marshaling.
|
// Allocate the buffer for the marshaled tss before its' marshaling.
|
||||||
// This should reduce memory fragmentation and memory usage.
|
// This should reduce memory fragmentation and memory usage.
|
||||||
dst = bytesutil.ResizeNoCopy(dst, size)
|
dst = bytesutil.ResizeNoCopyMayOverallocate(dst, size)
|
||||||
dst = marshalFastTimestamps(dst[:0], tss[0].Timestamps)
|
dst = marshalFastTimestamps(dst[:0], tss[0].Timestamps)
|
||||||
for _, ts := range tss {
|
for _, ts := range tss {
|
||||||
dst = ts.marshalFastNoTimestamps(dst)
|
dst = ts.marshalFastNoTimestamps(dst)
|
||||||
|
|
|
@ -53,7 +53,7 @@ func (bb *ByteBuffer) MustReadAt(p []byte, offset int64) {
|
||||||
func (bb *ByteBuffer) ReadFrom(r io.Reader) (int64, error) {
|
func (bb *ByteBuffer) ReadFrom(r io.Reader) (int64, error) {
|
||||||
b := bb.B
|
b := bb.B
|
||||||
bLen := len(b)
|
bLen := len(b)
|
||||||
b = ResizeWithCopy(b, 4*1024)
|
b = ResizeWithCopyMayOverallocate(b, 4*1024)
|
||||||
b = b[:cap(b)]
|
b = b[:cap(b)]
|
||||||
offset := bLen
|
offset := bLen
|
||||||
for {
|
for {
|
||||||
|
|
|
@ -1,35 +1,66 @@
|
||||||
package bytesutil
|
package bytesutil
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"math/bits"
|
||||||
"reflect"
|
"reflect"
|
||||||
"unsafe"
|
"unsafe"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ResizeWithCopy resizes b to n bytes and returns the resized buffer (which may be newly allocated).
|
// ResizeWithCopyMayOverallocate resizes b to minimum n bytes and returns the resized buffer (which may be newly allocated).
|
||||||
//
|
//
|
||||||
// If newly allocated buffer is returned then b contents is copied to it.
|
// If newly allocated buffer is returned then b contents is copied to it.
|
||||||
func ResizeWithCopy(b []byte, n int) []byte {
|
func ResizeWithCopyMayOverallocate(b []byte, n int) []byte {
|
||||||
|
if n <= cap(b) {
|
||||||
|
return b[:n]
|
||||||
|
}
|
||||||
|
nNew := roundToNearestPow2(n)
|
||||||
|
bNew := make([]byte, nNew)
|
||||||
|
copy(bNew, b)
|
||||||
|
return bNew[:n]
|
||||||
|
}
|
||||||
|
|
||||||
|
// ResizeWithCopyNoOverallocate resizes b to exactly n bytes and returns the resized buffer (which may be newly allocated).
|
||||||
|
//
|
||||||
|
// If newly allocated buffer is returned then b contents is copied to it.
|
||||||
|
func ResizeWithCopyNoOverallocate(b []byte, n int) []byte {
|
||||||
if n <= cap(b) {
|
if n <= cap(b) {
|
||||||
return b[:n]
|
return b[:n]
|
||||||
}
|
}
|
||||||
// Allocate the exact number of bytes instead of using `b = append(b[:cap(b)], make([]byte, nn)...)`,
|
|
||||||
// since `append()` may allocate more than the requested bytes for additional capacity.
|
|
||||||
// Using make() instead of append() should save RAM when the resized slice is cached somewhere.
|
|
||||||
bNew := make([]byte, n)
|
bNew := make([]byte, n)
|
||||||
copy(bNew, b)
|
copy(bNew, b)
|
||||||
return bNew
|
return bNew
|
||||||
}
|
}
|
||||||
|
|
||||||
// ResizeNoCopy resizes b to n bytes and returns the resized buffer (which may be newly allocated).
|
// ResizeNoCopyMayOverallocate resizes b to minimum n bytes and returns the resized buffer (which may be newly allocated).
|
||||||
//
|
//
|
||||||
// If newly allocated buffer is returned then b contents isn't copied to it.
|
// If newly allocated buffer is returned then b contents isn't copied to it.
|
||||||
func ResizeNoCopy(b []byte, n int) []byte {
|
func ResizeNoCopyMayOverallocate(b []byte, n int) []byte {
|
||||||
|
if n <= cap(b) {
|
||||||
|
return b[:n]
|
||||||
|
}
|
||||||
|
nNew := roundToNearestPow2(n)
|
||||||
|
bNew := make([]byte, nNew)
|
||||||
|
return bNew[:n]
|
||||||
|
}
|
||||||
|
|
||||||
|
// ResizeNoCopyNoOverallocate resizes b to exactly n bytes and returns the resized buffer (which may be newly allocated).
|
||||||
|
//
|
||||||
|
// If newly allocated buffer is returned then b contents isn't copied to it.
|
||||||
|
func ResizeNoCopyNoOverallocate(b []byte, n int) []byte {
|
||||||
if n <= cap(b) {
|
if n <= cap(b) {
|
||||||
return b[:n]
|
return b[:n]
|
||||||
}
|
}
|
||||||
return make([]byte, n)
|
return make([]byte, n)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// roundToNearestPow2 rounds n to the nearest power of 2
|
||||||
|
//
|
||||||
|
// It is expected that n > 0
|
||||||
|
func roundToNearestPow2(n int) int {
|
||||||
|
pow2 := uint8(bits.Len(uint(n - 1)))
|
||||||
|
return 1 << pow2
|
||||||
|
}
|
||||||
|
|
||||||
// ToUnsafeString converts b to string without memory allocations.
|
// ToUnsafeString converts b to string without memory allocations.
|
||||||
//
|
//
|
||||||
// The returned string is valid only until b is reachable and unmodified.
|
// The returned string is valid only until b is reachable and unmodified.
|
||||||
|
|
|
@ -5,26 +5,63 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestResizeNoCopy(t *testing.T) {
|
func TestRoundToNearestPow2(t *testing.T) {
|
||||||
|
f := func(n, resultExpected int) {
|
||||||
|
t.Helper()
|
||||||
|
result := roundToNearestPow2(n)
|
||||||
|
if result != resultExpected {
|
||||||
|
t.Fatalf("unexpected roundtoNearestPow2(%d); got %d; want %d", n, result, resultExpected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
f(1, 1)
|
||||||
|
f(2, 2)
|
||||||
|
f(3, 4)
|
||||||
|
f(4, 4)
|
||||||
|
f(5, 8)
|
||||||
|
f(6, 8)
|
||||||
|
f(7, 8)
|
||||||
|
f(8, 8)
|
||||||
|
f(9, 16)
|
||||||
|
f(10, 16)
|
||||||
|
f(16, 16)
|
||||||
|
f(17, 32)
|
||||||
|
f(32, 32)
|
||||||
|
f(33, 64)
|
||||||
|
f(64, 64)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestResizeNoCopyNoOverallocate(t *testing.T) {
|
||||||
for i := 0; i < 1000; i++ {
|
for i := 0; i < 1000; i++ {
|
||||||
b := ResizeNoCopy(nil, i)
|
b := ResizeNoCopyNoOverallocate(nil, i)
|
||||||
if len(b) != i {
|
if len(b) != i {
|
||||||
t.Fatalf("invalid b size; got %d; expecting %d", len(b), i)
|
t.Fatalf("invalid b size; got %d; want %d", len(b), i)
|
||||||
}
|
}
|
||||||
b1 := ResizeNoCopy(b, i)
|
if cap(b) != i {
|
||||||
|
t.Fatalf("invalid cap(b); got %d; want %d", cap(b), i)
|
||||||
|
}
|
||||||
|
b1 := ResizeNoCopyNoOverallocate(b, i)
|
||||||
if len(b1) != len(b) || (len(b) > 0 && &b1[0] != &b[0]) {
|
if len(b1) != len(b) || (len(b) > 0 && &b1[0] != &b[0]) {
|
||||||
t.Fatalf("invalid b1; got %x; expecting %x", &b1[0], &b[0])
|
t.Fatalf("invalid b1; got %x; want %x", &b1[0], &b[0])
|
||||||
}
|
}
|
||||||
b2 := ResizeNoCopy(b[:0], i)
|
if cap(b1) != i {
|
||||||
|
t.Fatalf("invalid cap(b1); got %d; want %d", cap(b1), i)
|
||||||
|
}
|
||||||
|
b2 := ResizeNoCopyNoOverallocate(b[:0], i)
|
||||||
if len(b2) != len(b) || (len(b) > 0 && &b2[0] != &b[0]) {
|
if len(b2) != len(b) || (len(b) > 0 && &b2[0] != &b[0]) {
|
||||||
t.Fatalf("invalid b2; got %x; expecting %x", &b2[0], &b[0])
|
t.Fatalf("invalid b2; got %x; want %x", &b2[0], &b[0])
|
||||||
|
}
|
||||||
|
if cap(b2) != i {
|
||||||
|
t.Fatalf("invalid cap(b2); got %d; want %d", cap(b2), i)
|
||||||
}
|
}
|
||||||
if i > 0 {
|
if i > 0 {
|
||||||
b[0] = 123
|
b[0] = 123
|
||||||
b3 := ResizeNoCopy(b, i+1)
|
b3 := ResizeNoCopyNoOverallocate(b, i+1)
|
||||||
if len(b3) != i+1 {
|
if len(b3) != i+1 {
|
||||||
t.Fatalf("invalid b3 len; got %d; want %d", len(b3), i+1)
|
t.Fatalf("invalid b3 len; got %d; want %d", len(b3), i+1)
|
||||||
}
|
}
|
||||||
|
if cap(b3) != i+1 {
|
||||||
|
t.Fatalf("invalid cap(b3); got %d; want %d", cap(b3), i+1)
|
||||||
|
}
|
||||||
if &b3[0] == &b[0] {
|
if &b3[0] == &b[0] {
|
||||||
t.Fatalf("b3 must be newly allocated")
|
t.Fatalf("b3 must be newly allocated")
|
||||||
}
|
}
|
||||||
|
@ -35,26 +72,75 @@ func TestResizeNoCopy(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestResizeWithCopy(t *testing.T) {
|
func TestResizeNoCopyMayOverallocate(t *testing.T) {
|
||||||
for i := 0; i < 1000; i++ {
|
for i := 0; i < 1000; i++ {
|
||||||
b := ResizeWithCopy(nil, i)
|
b := ResizeNoCopyMayOverallocate(nil, i)
|
||||||
if len(b) != i {
|
if len(b) != i {
|
||||||
t.Fatalf("invalid b size; got %d; expecting %d", len(b), i)
|
t.Fatalf("invalid b size; got %d; want %d", len(b), i)
|
||||||
}
|
}
|
||||||
b1 := ResizeWithCopy(b, i)
|
capExpected := roundToNearestPow2(i)
|
||||||
|
if cap(b) != capExpected {
|
||||||
|
t.Fatalf("invalid cap(b); got %d; want %d", cap(b), capExpected)
|
||||||
|
}
|
||||||
|
b1 := ResizeNoCopyMayOverallocate(b, i)
|
||||||
if len(b1) != len(b) || (len(b) > 0 && &b1[0] != &b[0]) {
|
if len(b1) != len(b) || (len(b) > 0 && &b1[0] != &b[0]) {
|
||||||
t.Fatalf("invalid b1; got %x; expecting %x", &b1[0], &b[0])
|
t.Fatalf("invalid b1; got %x; want %x", &b1[0], &b[0])
|
||||||
}
|
}
|
||||||
b2 := ResizeWithCopy(b[:0], i)
|
if cap(b1) != capExpected {
|
||||||
|
t.Fatalf("invalid cap(b1); got %d; want %d", cap(b1), capExpected)
|
||||||
|
}
|
||||||
|
b2 := ResizeNoCopyMayOverallocate(b[:0], i)
|
||||||
if len(b2) != len(b) || (len(b) > 0 && &b2[0] != &b[0]) {
|
if len(b2) != len(b) || (len(b) > 0 && &b2[0] != &b[0]) {
|
||||||
t.Fatalf("invalid b2; got %x; expecting %x", &b2[0], &b[0])
|
t.Fatalf("invalid b2; got %x; want %x", &b2[0], &b[0])
|
||||||
|
}
|
||||||
|
if cap(b2) != capExpected {
|
||||||
|
t.Fatalf("invalid cap(b2); got %d; want %d", cap(b2), capExpected)
|
||||||
}
|
}
|
||||||
if i > 0 {
|
if i > 0 {
|
||||||
b[0] = 123
|
b3 := ResizeNoCopyMayOverallocate(b, i+1)
|
||||||
b3 := ResizeWithCopy(b, i+1)
|
|
||||||
if len(b3) != i+1 {
|
if len(b3) != i+1 {
|
||||||
t.Fatalf("invalid b3 len; got %d; want %d", len(b3), i+1)
|
t.Fatalf("invalid b3 len; got %d; want %d", len(b3), i+1)
|
||||||
}
|
}
|
||||||
|
capExpected = roundToNearestPow2(i + 1)
|
||||||
|
if cap(b3) != capExpected {
|
||||||
|
t.Fatalf("invalid cap(b3); got %d; want %d", cap(b3), capExpected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestResizeWithCopyNoOverallocate(t *testing.T) {
|
||||||
|
for i := 0; i < 1000; i++ {
|
||||||
|
b := ResizeWithCopyNoOverallocate(nil, i)
|
||||||
|
if len(b) != i {
|
||||||
|
t.Fatalf("invalid b size; got %d; want %d", len(b), i)
|
||||||
|
}
|
||||||
|
if cap(b) != i {
|
||||||
|
t.Fatalf("invalid cap(b); got %d; want %d", cap(b), i)
|
||||||
|
}
|
||||||
|
b1 := ResizeWithCopyNoOverallocate(b, i)
|
||||||
|
if len(b1) != len(b) || (len(b) > 0 && &b1[0] != &b[0]) {
|
||||||
|
t.Fatalf("invalid b1; got %x; want %x", &b1[0], &b[0])
|
||||||
|
}
|
||||||
|
if cap(b1) != i {
|
||||||
|
t.Fatalf("invalid cap(b1); got %d; want %d", cap(b1), i)
|
||||||
|
}
|
||||||
|
b2 := ResizeWithCopyNoOverallocate(b[:0], i)
|
||||||
|
if len(b2) != len(b) || (len(b) > 0 && &b2[0] != &b[0]) {
|
||||||
|
t.Fatalf("invalid b2; got %x; want %x", &b2[0], &b[0])
|
||||||
|
}
|
||||||
|
if cap(b2) != i {
|
||||||
|
t.Fatalf("invalid cap(b2); got %d; want %d", cap(b2), i)
|
||||||
|
}
|
||||||
|
if i > 0 {
|
||||||
|
b[0] = 123
|
||||||
|
b3 := ResizeWithCopyNoOverallocate(b, i+1)
|
||||||
|
if len(b3) != i+1 {
|
||||||
|
t.Fatalf("invalid b3 len; got %d; want %d", len(b3), i+1)
|
||||||
|
}
|
||||||
|
if cap(b3) != i+1 {
|
||||||
|
t.Fatalf("invalid cap(b3); got %d; want %d", cap(b3), i+1)
|
||||||
|
}
|
||||||
if &b3[0] == &b[0] {
|
if &b3[0] == &b[0] {
|
||||||
t.Fatalf("b3 must be newly allocated for i=%d", i)
|
t.Fatalf("b3 must be newly allocated for i=%d", i)
|
||||||
}
|
}
|
||||||
|
@ -65,6 +151,47 @@ func TestResizeWithCopy(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestResizeWithCopyMayOverallocate(t *testing.T) {
|
||||||
|
for i := 0; i < 1000; i++ {
|
||||||
|
b := ResizeWithCopyMayOverallocate(nil, i)
|
||||||
|
if len(b) != i {
|
||||||
|
t.Fatalf("invalid b size; got %d; want %d", len(b), i)
|
||||||
|
}
|
||||||
|
capExpected := roundToNearestPow2(i)
|
||||||
|
if cap(b) != capExpected {
|
||||||
|
t.Fatalf("invalid cap(b); got %d; want %d", cap(b), capExpected)
|
||||||
|
}
|
||||||
|
b1 := ResizeWithCopyMayOverallocate(b, i)
|
||||||
|
if len(b1) != len(b) || (len(b) > 0 && &b1[0] != &b[0]) {
|
||||||
|
t.Fatalf("invalid b1; got %x; want %x", &b1[0], &b[0])
|
||||||
|
}
|
||||||
|
if cap(b1) != capExpected {
|
||||||
|
t.Fatalf("invalid cap(b1); got %d; want %d", cap(b1), capExpected)
|
||||||
|
}
|
||||||
|
b2 := ResizeWithCopyMayOverallocate(b[:0], i)
|
||||||
|
if len(b2) != len(b) || (len(b) > 0 && &b2[0] != &b[0]) {
|
||||||
|
t.Fatalf("invalid b2; got %x; want %x", &b2[0], &b[0])
|
||||||
|
}
|
||||||
|
if cap(b2) != capExpected {
|
||||||
|
t.Fatalf("invalid cap(b2); got %d; want %d", cap(b2), capExpected)
|
||||||
|
}
|
||||||
|
if i > 0 {
|
||||||
|
b[0] = 123
|
||||||
|
b3 := ResizeWithCopyMayOverallocate(b, i+1)
|
||||||
|
if len(b3) != i+1 {
|
||||||
|
t.Fatalf("invalid b3 len; got %d; want %d", len(b3), i+1)
|
||||||
|
}
|
||||||
|
capExpected = roundToNearestPow2(i + 1)
|
||||||
|
if cap(b3) != capExpected {
|
||||||
|
t.Fatalf("invalid cap(b3); got %d; want %d", cap(b3), capExpected)
|
||||||
|
}
|
||||||
|
if b3[0] != b[0] || b3[0] != 123 {
|
||||||
|
t.Fatalf("b3[0] must equal b[0]; got %d; want %d", b3[0], b[0])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestToUnsafeString(t *testing.T) {
|
func TestToUnsafeString(t *testing.T) {
|
||||||
s := "str"
|
s := "str"
|
||||||
if !bytes.Equal([]byte("str"), ToUnsafeBytes(s)) {
|
if !bytes.Equal([]byte("str"), ToUnsafeBytes(s)) {
|
||||||
|
|
|
@ -135,7 +135,7 @@ func (s *Server) serveUDP(insertHandler func(r io.Reader) error) {
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
var bb bytesutil.ByteBuffer
|
var bb bytesutil.ByteBuffer
|
||||||
bb.B = bytesutil.ResizeNoCopy(bb.B, 64*1024)
|
bb.B = bytesutil.ResizeNoCopyNoOverallocate(bb.B, 64*1024)
|
||||||
for {
|
for {
|
||||||
bb.Reset()
|
bb.Reset()
|
||||||
bb.B = bb.B[:cap(bb.B)]
|
bb.B = bb.B[:cap(bb.B)]
|
||||||
|
|
|
@ -135,7 +135,7 @@ func (s *Server) serveUDP(insertHandler func(r io.Reader) error) {
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
var bb bytesutil.ByteBuffer
|
var bb bytesutil.ByteBuffer
|
||||||
bb.B = bytesutil.ResizeNoCopy(bb.B, 64*1024)
|
bb.B = bytesutil.ResizeNoCopyNoOverallocate(bb.B, 64*1024)
|
||||||
for {
|
for {
|
||||||
bb.Reset()
|
bb.Reset()
|
||||||
bb.B = bb.B[:cap(bb.B)]
|
bb.B = bb.B[:cap(bb.B)]
|
||||||
|
|
|
@ -153,7 +153,7 @@ func (s *Server) serveUDP(insertHandler func(r io.Reader) error) {
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
var bb bytesutil.ByteBuffer
|
var bb bytesutil.ByteBuffer
|
||||||
bb.B = bytesutil.ResizeNoCopy(bb.B, 64*1024)
|
bb.B = bytesutil.ResizeNoCopyNoOverallocate(bb.B, 64*1024)
|
||||||
for {
|
for {
|
||||||
bb.Reset()
|
bb.Reset()
|
||||||
bb.B = bb.B[:cap(bb.B)]
|
bb.B = bb.B[:cap(bb.B)]
|
||||||
|
|
|
@ -211,13 +211,13 @@ func (bsr *blockStreamReader) Next() bool {
|
||||||
bsr.bh = &bsr.bhs[bsr.bhIdx]
|
bsr.bh = &bsr.bhs[bsr.bhIdx]
|
||||||
bsr.bhIdx++
|
bsr.bhIdx++
|
||||||
|
|
||||||
bsr.sb.itemsData = bytesutil.ResizeNoCopy(bsr.sb.itemsData, int(bsr.bh.itemsBlockSize))
|
bsr.sb.itemsData = bytesutil.ResizeNoCopyMayOverallocate(bsr.sb.itemsData, int(bsr.bh.itemsBlockSize))
|
||||||
if err := fs.ReadFullData(bsr.itemsReader, bsr.sb.itemsData); err != nil {
|
if err := fs.ReadFullData(bsr.itemsReader, bsr.sb.itemsData); err != nil {
|
||||||
bsr.err = fmt.Errorf("cannot read compressed items block with size %d: %w", bsr.bh.itemsBlockSize, err)
|
bsr.err = fmt.Errorf("cannot read compressed items block with size %d: %w", bsr.bh.itemsBlockSize, err)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
bsr.sb.lensData = bytesutil.ResizeNoCopy(bsr.sb.lensData, int(bsr.bh.lensBlockSize))
|
bsr.sb.lensData = bytesutil.ResizeNoCopyMayOverallocate(bsr.sb.lensData, int(bsr.bh.lensBlockSize))
|
||||||
if err := fs.ReadFullData(bsr.lensReader, bsr.sb.lensData); err != nil {
|
if err := fs.ReadFullData(bsr.lensReader, bsr.sb.lensData); err != nil {
|
||||||
bsr.err = fmt.Errorf("cannot read compressed lens block with size %d: %w", bsr.bh.lensBlockSize, err)
|
bsr.err = fmt.Errorf("cannot read compressed lens block with size %d: %w", bsr.bh.lensBlockSize, err)
|
||||||
return false
|
return false
|
||||||
|
@ -260,7 +260,7 @@ func (bsr *blockStreamReader) readNextBHS() error {
|
||||||
bsr.mrIdx++
|
bsr.mrIdx++
|
||||||
|
|
||||||
// Read compressed index block.
|
// Read compressed index block.
|
||||||
bsr.packedBuf = bytesutil.ResizeNoCopy(bsr.packedBuf, int(mr.indexBlockSize))
|
bsr.packedBuf = bytesutil.ResizeNoCopyMayOverallocate(bsr.packedBuf, int(mr.indexBlockSize))
|
||||||
if err := fs.ReadFullData(bsr.indexReader, bsr.packedBuf); err != nil {
|
if err := fs.ReadFullData(bsr.indexReader, bsr.packedBuf); err != nil {
|
||||||
return fmt.Errorf("cannot read compressed index block with size %d: %w", mr.indexBlockSize, err)
|
return fmt.Errorf("cannot read compressed index block with size %d: %w", mr.indexBlockSize, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -119,7 +119,7 @@ func (ib *inmemoryBlock) Add(x []byte) bool {
|
||||||
}
|
}
|
||||||
if cap(data) < maxInmemoryBlockSize {
|
if cap(data) < maxInmemoryBlockSize {
|
||||||
dataLen := len(data)
|
dataLen := len(data)
|
||||||
data = bytesutil.ResizeWithCopy(data, maxInmemoryBlockSize)[:dataLen]
|
data = bytesutil.ResizeWithCopyNoOverallocate(data, maxInmemoryBlockSize)[:dataLen]
|
||||||
}
|
}
|
||||||
dataLen := len(data)
|
dataLen := len(data)
|
||||||
data = append(data, x...)
|
data = append(data, x...)
|
||||||
|
@ -141,7 +141,7 @@ func (ib *inmemoryBlock) sort() {
|
||||||
data := ib.data
|
data := ib.data
|
||||||
items := ib.items
|
items := ib.items
|
||||||
bb := bbPool.Get()
|
bb := bbPool.Get()
|
||||||
b := bytesutil.ResizeNoCopy(bb.B, len(data))
|
b := bytesutil.ResizeNoCopyMayOverallocate(bb.B, len(data))
|
||||||
b = b[:0]
|
b = b[:0]
|
||||||
for i, it := range items {
|
for i, it := range items {
|
||||||
bLen := len(b)
|
bLen := len(b)
|
||||||
|
@ -394,7 +394,7 @@ func (ib *inmemoryBlock) UnmarshalData(sb *storageBlock, firstItem, commonPrefix
|
||||||
// Resize ib.data to dataLen instead of maxInmemoryBlockSize,
|
// Resize ib.data to dataLen instead of maxInmemoryBlockSize,
|
||||||
// since the data isn't going to be resized after unmarshaling.
|
// since the data isn't going to be resized after unmarshaling.
|
||||||
// This may save memory for caching the unmarshaled block.
|
// This may save memory for caching the unmarshaled block.
|
||||||
data := bytesutil.ResizeNoCopy(ib.data, dataLen)
|
data := bytesutil.ResizeNoCopyNoOverallocate(ib.data, dataLen)
|
||||||
if n := int(itemsCount) - cap(ib.items); n > 0 {
|
if n := int(itemsCount) - cap(ib.items); n > 0 {
|
||||||
ib.items = append(ib.items[:cap(ib.items)], make([]Item, n)...)
|
ib.items = append(ib.items[:cap(ib.items)], make([]Item, n)...)
|
||||||
}
|
}
|
||||||
|
@ -492,7 +492,8 @@ func (ib *inmemoryBlock) unmarshalDataPlain(sb *storageBlock, firstItem []byte,
|
||||||
// Unmarshal items data.
|
// Unmarshal items data.
|
||||||
data := ib.data
|
data := ib.data
|
||||||
items := ib.items
|
items := ib.items
|
||||||
data = bytesutil.ResizeNoCopy(data, len(firstItem)+len(sb.itemsData)+len(commonPrefix)*int(itemsCount))
|
dataLen := len(firstItem) + len(sb.itemsData) + len(commonPrefix)*(int(itemsCount)-1)
|
||||||
|
data = bytesutil.ResizeNoCopyNoOverallocate(data, dataLen)
|
||||||
data = append(data[:0], firstItem...)
|
data = append(data[:0], firstItem...)
|
||||||
items = append(items[:0], Item{
|
items = append(items[:0], Item{
|
||||||
Start: 0,
|
Start: 0,
|
||||||
|
@ -504,20 +505,23 @@ func (ib *inmemoryBlock) unmarshalDataPlain(sb *storageBlock, firstItem []byte,
|
||||||
if uint64(len(b)) < itemLen {
|
if uint64(len(b)) < itemLen {
|
||||||
return fmt.Errorf("not enough data for decoding item from itemsData; want %d bytes; remained %d bytes", itemLen, len(b))
|
return fmt.Errorf("not enough data for decoding item from itemsData; want %d bytes; remained %d bytes", itemLen, len(b))
|
||||||
}
|
}
|
||||||
dataLen := len(data)
|
dataStart := len(data)
|
||||||
data = append(data, commonPrefix...)
|
data = append(data, commonPrefix...)
|
||||||
data = append(data, b[:itemLen]...)
|
data = append(data, b[:itemLen]...)
|
||||||
items = append(items, Item{
|
items = append(items, Item{
|
||||||
Start: uint32(dataLen),
|
Start: uint32(dataStart),
|
||||||
End: uint32(len(data)),
|
End: uint32(len(data)),
|
||||||
})
|
})
|
||||||
b = b[itemLen:]
|
b = b[itemLen:]
|
||||||
}
|
}
|
||||||
ib.data = data
|
|
||||||
ib.items = items
|
|
||||||
if len(b) > 0 {
|
if len(b) > 0 {
|
||||||
return fmt.Errorf("unexpected tail left after itemsData with len %d: %q", len(b), b)
|
return fmt.Errorf("unexpected tail left after itemsData with len %d: %q", len(b), b)
|
||||||
}
|
}
|
||||||
|
if len(data) != dataLen {
|
||||||
|
return fmt.Errorf("unexpected data len; got %d; want %d", len(data), dataLen)
|
||||||
|
}
|
||||||
|
ib.data = data
|
||||||
|
ib.items = items
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -274,7 +274,7 @@ func (ps *partSearch) nextBHS() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ps *partSearch) readIndexBlock(mr *metaindexRow) (*indexBlock, error) {
|
func (ps *partSearch) readIndexBlock(mr *metaindexRow) (*indexBlock, error) {
|
||||||
ps.compressedIndexBuf = bytesutil.ResizeNoCopy(ps.compressedIndexBuf, int(mr.indexBlockSize))
|
ps.compressedIndexBuf = bytesutil.ResizeNoCopyMayOverallocate(ps.compressedIndexBuf, int(mr.indexBlockSize))
|
||||||
ps.p.indexFile.MustReadAt(ps.compressedIndexBuf, int64(mr.indexBlockOffset))
|
ps.p.indexFile.MustReadAt(ps.compressedIndexBuf, int64(mr.indexBlockOffset))
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
|
@ -311,10 +311,10 @@ func (ps *partSearch) getInmemoryBlock(bh *blockHeader) (*inmemoryBlock, error)
|
||||||
func (ps *partSearch) readInmemoryBlock(bh *blockHeader) (*inmemoryBlock, error) {
|
func (ps *partSearch) readInmemoryBlock(bh *blockHeader) (*inmemoryBlock, error) {
|
||||||
ps.sb.Reset()
|
ps.sb.Reset()
|
||||||
|
|
||||||
ps.sb.itemsData = bytesutil.ResizeNoCopy(ps.sb.itemsData, int(bh.itemsBlockSize))
|
ps.sb.itemsData = bytesutil.ResizeNoCopyMayOverallocate(ps.sb.itemsData, int(bh.itemsBlockSize))
|
||||||
ps.p.itemsFile.MustReadAt(ps.sb.itemsData, int64(bh.itemsBlockOffset))
|
ps.p.itemsFile.MustReadAt(ps.sb.itemsData, int64(bh.itemsBlockOffset))
|
||||||
|
|
||||||
ps.sb.lensData = bytesutil.ResizeNoCopy(ps.sb.lensData, int(bh.lensBlockSize))
|
ps.sb.lensData = bytesutil.ResizeNoCopyMayOverallocate(ps.sb.lensData, int(bh.lensBlockSize))
|
||||||
ps.p.lensFile.MustReadAt(ps.sb.lensData, int64(bh.lensBlockOffset))
|
ps.p.lensFile.MustReadAt(ps.sb.lensData, int64(bh.lensBlockOffset))
|
||||||
|
|
||||||
ib := getInmemoryBlock()
|
ib := getInmemoryBlock()
|
||||||
|
|
|
@ -499,7 +499,7 @@ func (q *queue) readBlock(dst []byte) ([]byte, error) {
|
||||||
again:
|
again:
|
||||||
// Read block len.
|
// Read block len.
|
||||||
header := headerBufPool.Get()
|
header := headerBufPool.Get()
|
||||||
header.B = bytesutil.ResizeNoCopy(header.B, 8)
|
header.B = bytesutil.ResizeNoCopyMayOverallocate(header.B, 8)
|
||||||
err := q.readFull(header.B)
|
err := q.readFull(header.B)
|
||||||
blockLen := encoding.UnmarshalUint64(header.B)
|
blockLen := encoding.UnmarshalUint64(header.B)
|
||||||
headerBufPool.Put(header)
|
headerBufPool.Put(header)
|
||||||
|
@ -520,7 +520,7 @@ again:
|
||||||
|
|
||||||
// Read block contents.
|
// Read block contents.
|
||||||
dstLen := len(dst)
|
dstLen := len(dst)
|
||||||
dst = bytesutil.ResizeWithCopy(dst, dstLen+int(blockLen))
|
dst = bytesutil.ResizeWithCopyMayOverallocate(dst, dstLen+int(blockLen))
|
||||||
if err := q.readFull(dst[dstLen:]); err != nil {
|
if err := q.readFull(dst[dstLen:]); err != nil {
|
||||||
logger.Errorf("skipping corrupted %q, since contents with size %d bytes cannot be read from it: %s", q.readerPath, blockLen, err)
|
logger.Errorf("skipping corrupted %q, since contents with size %d bytes cannot be read from it: %s", q.readerPath, blockLen, err)
|
||||||
if err := q.skipBrokenChunkFile(); err != nil {
|
if err := q.skipBrokenChunkFile(); err != nil {
|
||||||
|
|
|
@ -40,7 +40,7 @@ func ReadLinesBlock(r io.Reader, dstBuf, tailBuf []byte) ([]byte, []byte, error)
|
||||||
func ReadLinesBlockExt(r io.Reader, dstBuf, tailBuf []byte, maxLineLen int) ([]byte, []byte, error) {
|
func ReadLinesBlockExt(r io.Reader, dstBuf, tailBuf []byte, maxLineLen int) ([]byte, []byte, error) {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
if cap(dstBuf) < defaultBlockSize {
|
if cap(dstBuf) < defaultBlockSize {
|
||||||
dstBuf = bytesutil.ResizeNoCopy(dstBuf, defaultBlockSize)
|
dstBuf = bytesutil.ResizeNoCopyNoOverallocate(dstBuf, defaultBlockSize)
|
||||||
}
|
}
|
||||||
dstBuf = append(dstBuf[:0], tailBuf...)
|
dstBuf = append(dstBuf[:0], tailBuf...)
|
||||||
tailBuf = tailBuf[:0]
|
tailBuf = tailBuf[:0]
|
||||||
|
@ -79,7 +79,7 @@ again:
|
||||||
if cap(dstBuf) < 2*len(dstBuf) {
|
if cap(dstBuf) < 2*len(dstBuf) {
|
||||||
// Increase dsbBuf capacity, so more data could be read into it.
|
// Increase dsbBuf capacity, so more data could be read into it.
|
||||||
dstBufLen := len(dstBuf)
|
dstBufLen := len(dstBuf)
|
||||||
dstBuf = bytesutil.ResizeWithCopy(dstBuf, 2*cap(dstBuf))
|
dstBuf = bytesutil.ResizeWithCopyNoOverallocate(dstBuf, 2*cap(dstBuf))
|
||||||
dstBuf = dstBuf[:dstBufLen]
|
dstBuf = dstBuf[:dstBufLen]
|
||||||
}
|
}
|
||||||
goto again
|
goto again
|
||||||
|
|
|
@ -84,7 +84,7 @@ func ParseStream(req *http.Request, callback func(block *Block) error) error {
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
return fmt.Errorf("too big metricName size; got %d; shouldn't exceed %d", bufSize, 1024*1024)
|
return fmt.Errorf("too big metricName size; got %d; shouldn't exceed %d", bufSize, 1024*1024)
|
||||||
}
|
}
|
||||||
uw.metricNameBuf = bytesutil.ResizeNoCopy(uw.metricNameBuf, int(bufSize))
|
uw.metricNameBuf = bytesutil.ResizeNoCopyMayOverallocate(uw.metricNameBuf, int(bufSize))
|
||||||
if _, err := io.ReadFull(br, uw.metricNameBuf); err != nil {
|
if _, err := io.ReadFull(br, uw.metricNameBuf); err != nil {
|
||||||
readErrors.Inc()
|
readErrors.Inc()
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
@ -105,7 +105,7 @@ func ParseStream(req *http.Request, callback func(block *Block) error) error {
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
return fmt.Errorf("too big native block size; got %d; shouldn't exceed %d", bufSize, 1024*1024)
|
return fmt.Errorf("too big native block size; got %d; shouldn't exceed %d", bufSize, 1024*1024)
|
||||||
}
|
}
|
||||||
uw.blockBuf = bytesutil.ResizeNoCopy(uw.blockBuf, int(bufSize))
|
uw.blockBuf = bytesutil.ResizeNoCopyMayOverallocate(uw.blockBuf, int(bufSize))
|
||||||
if _, err := io.ReadFull(br, uw.blockBuf); err != nil {
|
if _, err := io.ReadFull(br, uw.blockBuf); err != nil {
|
||||||
readErrors.Inc()
|
readErrors.Inc()
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
|
@ -308,7 +308,7 @@ func (bsr *blockStreamReader) readBlock() error {
|
||||||
if usePrevTimestamps {
|
if usePrevTimestamps {
|
||||||
bsr.Block.timestampsData = append(bsr.Block.timestampsData[:0], bsr.prevTimestampsData...)
|
bsr.Block.timestampsData = append(bsr.Block.timestampsData[:0], bsr.prevTimestampsData...)
|
||||||
} else {
|
} else {
|
||||||
bsr.Block.timestampsData = bytesutil.ResizeNoCopy(bsr.Block.timestampsData, int(bsr.Block.bh.TimestampsBlockSize))
|
bsr.Block.timestampsData = bytesutil.ResizeNoCopyMayOverallocate(bsr.Block.timestampsData, int(bsr.Block.bh.TimestampsBlockSize))
|
||||||
if err := fs.ReadFullData(bsr.timestampsReader, bsr.Block.timestampsData); err != nil {
|
if err := fs.ReadFullData(bsr.timestampsReader, bsr.Block.timestampsData); err != nil {
|
||||||
return fmt.Errorf("cannot read timestamps block at offset %d: %w", bsr.timestampsBlockOffset, err)
|
return fmt.Errorf("cannot read timestamps block at offset %d: %w", bsr.timestampsBlockOffset, err)
|
||||||
}
|
}
|
||||||
|
@ -317,7 +317,7 @@ func (bsr *blockStreamReader) readBlock() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read values data.
|
// Read values data.
|
||||||
bsr.Block.valuesData = bytesutil.ResizeNoCopy(bsr.Block.valuesData, int(bsr.Block.bh.ValuesBlockSize))
|
bsr.Block.valuesData = bytesutil.ResizeNoCopyMayOverallocate(bsr.Block.valuesData, int(bsr.Block.bh.ValuesBlockSize))
|
||||||
if err := fs.ReadFullData(bsr.valuesReader, bsr.Block.valuesData); err != nil {
|
if err := fs.ReadFullData(bsr.valuesReader, bsr.Block.valuesData); err != nil {
|
||||||
return fmt.Errorf("cannot read values block at offset %d: %w", bsr.valuesBlockOffset, err)
|
return fmt.Errorf("cannot read values block at offset %d: %w", bsr.valuesBlockOffset, err)
|
||||||
}
|
}
|
||||||
|
@ -352,7 +352,7 @@ func (bsr *blockStreamReader) readIndexBlock() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read index block.
|
// Read index block.
|
||||||
bsr.compressedIndexData = bytesutil.ResizeNoCopy(bsr.compressedIndexData, int(bsr.mr.IndexBlockSize))
|
bsr.compressedIndexData = bytesutil.ResizeNoCopyMayOverallocate(bsr.compressedIndexData, int(bsr.mr.IndexBlockSize))
|
||||||
if err := fs.ReadFullData(bsr.indexReader, bsr.compressedIndexData); err != nil {
|
if err := fs.ReadFullData(bsr.indexReader, bsr.compressedIndexData); err != nil {
|
||||||
return fmt.Errorf("cannot read index block from index data at offset %d: %w", bsr.indexBlockOffset, err)
|
return fmt.Errorf("cannot read index block from index data at offset %d: %w", bsr.indexBlockOffset, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -370,7 +370,7 @@ func (mn *MetricName) Marshal(dst []byte) []byte {
|
||||||
tag := &mn.Tags[i]
|
tag := &mn.Tags[i]
|
||||||
requiredSize += len(tag.Key) + len(tag.Value) + 2
|
requiredSize += len(tag.Key) + len(tag.Value) + 2
|
||||||
}
|
}
|
||||||
dst = bytesutil.ResizeWithCopy(dst, requiredSize)[:dstLen]
|
dst = bytesutil.ResizeWithCopyMayOverallocate(dst, requiredSize)[:dstLen]
|
||||||
|
|
||||||
// Marshal MetricGroup
|
// Marshal MetricGroup
|
||||||
dst = marshalTagValue(dst, mn.MetricGroup)
|
dst = marshalTagValue(dst, mn.MetricGroup)
|
||||||
|
@ -477,7 +477,7 @@ func MarshalMetricNameRaw(dst []byte, labels []prompb.Label) []byte {
|
||||||
dstSize += len(label.Value)
|
dstSize += len(label.Value)
|
||||||
dstSize += 4
|
dstSize += 4
|
||||||
}
|
}
|
||||||
dst = bytesutil.ResizeWithCopy(dst, dstSize)[:dstLen]
|
dst = bytesutil.ResizeWithCopyMayOverallocate(dst, dstSize)[:dstLen]
|
||||||
|
|
||||||
// Marshal labels to dst.
|
// Marshal labels to dst.
|
||||||
for i := range labels {
|
for i := range labels {
|
||||||
|
|
|
@ -211,7 +211,7 @@ func skipSmallMetaindexRows(metaindex []metaindexRow, tsid *TSID) []metaindexRow
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ps *partSearch) readIndexBlock(mr *metaindexRow) (*indexBlock, error) {
|
func (ps *partSearch) readIndexBlock(mr *metaindexRow) (*indexBlock, error) {
|
||||||
ps.compressedIndexBuf = bytesutil.ResizeNoCopy(ps.compressedIndexBuf, int(mr.IndexBlockSize))
|
ps.compressedIndexBuf = bytesutil.ResizeNoCopyMayOverallocate(ps.compressedIndexBuf, int(mr.IndexBlockSize))
|
||||||
ps.p.indexFile.MustReadAt(ps.compressedIndexBuf, int64(mr.IndexBlockOffset))
|
ps.p.indexFile.MustReadAt(ps.compressedIndexBuf, int64(mr.IndexBlockOffset))
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
|
|
|
@ -75,10 +75,10 @@ func (br *BlockRef) MustReadBlock(dst *Block, fetchData bool) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
dst.timestampsData = bytesutil.ResizeNoCopy(dst.timestampsData, int(br.bh.TimestampsBlockSize))
|
dst.timestampsData = bytesutil.ResizeNoCopyMayOverallocate(dst.timestampsData, int(br.bh.TimestampsBlockSize))
|
||||||
br.p.timestampsFile.MustReadAt(dst.timestampsData, int64(br.bh.TimestampsBlockOffset))
|
br.p.timestampsFile.MustReadAt(dst.timestampsData, int64(br.bh.TimestampsBlockOffset))
|
||||||
|
|
||||||
dst.valuesData = bytesutil.ResizeNoCopy(dst.valuesData, int(br.bh.ValuesBlockSize))
|
dst.valuesData = bytesutil.ResizeNoCopyMayOverallocate(dst.valuesData, int(br.bh.ValuesBlockSize))
|
||||||
br.p.valuesFile.MustReadAt(dst.valuesData, int64(br.bh.ValuesBlockOffset))
|
br.p.valuesFile.MustReadAt(dst.valuesData, int64(br.bh.ValuesBlockOffset))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue