lib/uint64set: allow reusing bucket16 structs inside uint64set.Set via uint64set.Release method

This reduces the load on memory allocator in Go runtime in production workload.
This commit is contained in:
Aliaksandr Valialkin 2021-07-06 15:33:43 +03:00
parent 78c9174682
commit 7c6d3981bf
4 changed files with 116 additions and 90 deletions

View file

@ -1996,6 +1996,7 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCountOptimized(tfs *TagFilter
if metricIDsForTimeRange.Len() <= maxTimeRangeMetrics {
return nil, metricIDsForTimeRange, nil
}
uint64set.Release(metricIDsForTimeRange)
return nil, nil, fmt.Errorf("more than %d time series found on the time range %s; either increase -search.maxUniqueTimeseries or shrink the time range",
maxMetrics, tr.String())
}
@ -2288,6 +2289,7 @@ func (is *indexSearch) searchMetricIDs(tfss []*TagFilters, tr TimeRange, maxMetr
}
sortedMetricIDs := metricIDs.AppendTo(nil)
uint64set.Release(metricIDs)
// Filter out deleted metricIDs.
dmis := is.db.s.getDeletedMetricIDs()
@ -2315,9 +2317,11 @@ func (is *indexSearch) searchMetricIDsInternal(tfss []*TagFilters, tr TimeRange,
}
}
if err := is.updateMetricIDsForTagFilters(metricIDs, tfs, tr, maxMetrics+1); err != nil {
uint64set.Release(metricIDs)
return nil, err
}
if metricIDs.Len() > maxMetrics {
uint64set.Release(metricIDs)
return nil, fmt.Errorf("the number of matching unique timeseries exceeds %d; either narrow down the search or increase -search.maxUniqueTimeseries", maxMetrics)
}
}
@ -2355,9 +2359,11 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf
if err != nil {
return err
}
uint64set.Release(minMetricIDs)
minMetricIDs = mIDs
}
metricIDs.UnionMayOwn(minMetricIDs)
metricIDs.Union(minMetricIDs)
uint64set.Release(minMetricIDs)
return nil
}
@ -2709,8 +2715,9 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (*
return
}
if metricIDs.Len() < maxMetrics {
metricIDs.UnionMayOwn(m)
metricIDs.Union(m)
}
uint64set.Release(m)
}(minDate)
minDate++
}
@ -2738,7 +2745,8 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set
if err != nil {
return err
}
metricIDs.UnionMayOwn(m)
metricIDs.Union(m)
uint64set.Release(m)
atomic.AddUint64(&is.db.dateRangeSearchHits, 1)
return nil
}
@ -2765,8 +2773,9 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set
return
}
if metricIDs.Len() < maxMetrics {
metricIDs.UnionMayOwn(m)
metricIDs.Union(m)
}
uint64set.Release(m)
}(minDate)
minDate++
}
@ -2953,6 +2962,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
} else {
metricIDs.Intersect(m)
}
uint64set.Release(m)
}
if metricIDs.Len() == 0 {
// There is no need in applying tfsPostponed, since the result is empty.

View file

@ -6,6 +6,8 @@ import (
"sync"
"sync/atomic"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
)
// Set is a fast set for uint64.
@ -35,6 +37,18 @@ func (s *bucket32Sorter) Swap(i, j int) {
a[i], a[j] = a[j], a[i]
}
// Release releases resources occupied by s.
//
// s mustn't be used after Release call.
func Release(s *Set) {
if s == nil {
return
}
for i := range s.buckets {
releaseBucket32(&s.buckets[i])
}
}
// Clone returns an independent copy of s.
func (s *Set) Clone() *Set {
if s == nil || s.itemsCount == 0 {
@ -247,34 +261,18 @@ func (s *Set) sort() {
// Union adds all the items from a to s.
func (s *Set) Union(a *Set) {
s.union(a, false)
}
// UnionMayOwn adds all the items from a to s.
//
// It may own a if s is empty. This means that `a` cannot be used
// after the call to UnionMayOwn.
func (s *Set) UnionMayOwn(a *Set) {
s.union(a, true)
}
func (s *Set) union(a *Set, mayOwn bool) {
if a.Len() == 0 {
// Fast path - nothing to union.
return
}
if s.Len() == 0 {
// Fast path - copy `a` to `s`.
if !mayOwn {
a = a.Clone()
}
a = a.Clone()
*s = *a
return
}
// Make shallow copy of `a`, since it can be modified by a.sort().
if !mayOwn {
a = a.cloneShallow()
}
a = a.cloneShallow()
a.sort()
s.sort()
i := 0
@ -301,7 +299,7 @@ func (s *Set) union(a *Set, mayOwn bool) {
break
}
if s.buckets[i].hi == a.buckets[j].hi {
s.buckets[i].union(&a.buckets[j], mayOwn)
s.buckets[i].union(&a.buckets[j])
i++
j++
}
@ -412,6 +410,12 @@ type bucket32 struct {
buckets []*bucket16
}
func releaseBucket32(b32 *bucket32) {
for _, b16 := range b32.buckets {
putBucket16(b16)
}
}
func (b *bucket32) getLen() int {
n := 0
for i := range b.buckets {
@ -420,7 +424,7 @@ func (b *bucket32) getLen() int {
return n
}
func (b *bucket32) union(a *bucket32, mayOwn bool) {
func (b *bucket32) union(a *bucket32) {
i := 0
j := 0
bBucketsLen := len(b.buckets)
@ -431,22 +435,14 @@ func (b *bucket32) union(a *bucket32, mayOwn bool) {
if i >= bBucketsLen {
for j < len(a.b16his) {
b16 := b.addBucket16(a.b16his[j])
if mayOwn {
*b16 = *a.buckets[j]
} else {
a.buckets[j].copyTo(b16)
}
a.buckets[j].copyTo(b16)
j++
}
break
}
for j < len(a.b16his) && a.b16his[j] < b.b16his[i] {
b16 := b.addBucket16(a.b16his[j])
if mayOwn {
*b16 = *a.buckets[j]
} else {
a.buckets[j].copyTo(b16)
}
a.buckets[j].copyTo(b16)
j++
}
if j >= len(a.b16his) {
@ -558,7 +554,7 @@ func (b *bucket32) copyTo(dst *bucket32) {
if len(b.buckets) > 0 {
dst.buckets = make([]*bucket16, len(b.buckets))
for i, b16 := range b.buckets {
b16Dst := &bucket16{}
b16Dst := getBucket16()
b16.copyTo(b16Dst)
dst.buckets[i] = b16Dst
}
@ -632,7 +628,8 @@ func (b *bucket32) addSlow(hi, lo uint16) bool {
func (b *bucket32) addBucket16(hi uint16) *bucket16 {
b.b16his = append(b.b16his, hi)
b.buckets = append(b.buckets, &bucket16{})
b16 := getBucket16()
b.buckets = append(b.buckets, b16)
return b.buckets[len(b.buckets)-1]
}
@ -647,7 +644,7 @@ func (b *bucket32) addBucketAtPos(hi uint16, pos int) *bucket16 {
b.b16his = append(b.b16his[:pos+1], b.b16his[pos:]...)
b.b16his[pos] = hi
b.buckets = append(b.buckets[:pos+1], b.buckets[pos:]...)
b16 := &bucket16{}
b16 := getBucket16()
b.buckets[pos] = b16
return b16
}
@ -710,6 +707,40 @@ type bucket16 struct {
const smallPoolSize = 56
func getBucket16() *bucket16 {
select {
case b16 := <-bucket16Pool:
return b16
default:
return &bucket16{}
}
}
func putBucket16(b16 *bucket16) {
b16.reset()
select {
case bucket16Pool <- b16:
default:
// Drop b16 in order to reduce memory usage
}
}
// Use a chan instead of sync.Pool for reducing memory usage on systems with big number of CPU cores,
// since sync.Pool holds per-CPU pools of potentially big bucket16 structs (more than 64KB each).
var bucket16Pool = make(chan *bucket16, 100*cgroup.AvailableCPUs())
func (b *bucket16) reset() {
if bits := b.bits; bits != nil {
for i := range bits {
bits[i] = 0
}
}
for i := range b.smallPool {
b.smallPool[i] = 0
}
b.smallPoolLen = 0
}
func (b *bucket16) isZero() bool {
return b.bits == nil && b.smallPoolLen == 0
}

View file

@ -46,6 +46,7 @@ func TestSetOps(t *testing.T) {
if !sbOrig.Equal(sb) {
t.Fatalf("sbOrig must be equal to sb after sa.Union(sb); got\n%v\nvs\n%v", sbOrig, sb)
}
Release(sa)
// Verify sb.Union(sa)
sa = saOrig.Clone()
@ -56,27 +57,7 @@ func TestSetOps(t *testing.T) {
if !saOrig.Equal(sa) {
t.Fatalf("saOrig must be equal to sa after sb.Union(sa); got\n%v\nvs\n%v", saOrig, sa)
}
// Verify sa.UnionMayOwn(sb)
sa = saOrig.Clone()
sb = sbOrig.Clone()
sa.UnionMayOwn(sb)
if err := expectEqual(sa, mUnion); err != nil {
t.Fatalf("invalid sa.UnionMayOwn(sb): %s", err)
}
if !sbOrig.Equal(sb) {
t.Fatalf("sbOrig must be equal to sb after sa.UnionMayOwn(sb); got\n%v\nvs\n%v", sbOrig, sb)
}
// Verify sb.UnionMayOwn(sa)
sa = saOrig.Clone()
sb.UnionMayOwn(sa)
if err := expectEqual(sb, mUnion); err != nil {
t.Fatalf("invalid sb.UnionMayOwn(sa): %s", err)
}
if !saOrig.Equal(sa) {
t.Fatalf("saOrig must be equal to sa after sb.UnionMayOwn(sa); got\n%v\nvs\n%v", saOrig, sa)
}
Release(sb)
// Verify sa.Intersect(sb)
sa = saOrig.Clone()
@ -88,6 +69,7 @@ func TestSetOps(t *testing.T) {
if !sbOrig.Equal(sb) {
t.Fatalf("sbOrig must be equal to sb after sa.Intersect(sb); got\n%v\nvs\n%v", sbOrig, sb)
}
Release(sa)
// Verify sb.Intersect(sa)
sa = saOrig.Clone()
@ -98,6 +80,8 @@ func TestSetOps(t *testing.T) {
if !saOrig.Equal(sa) {
t.Fatalf("saOrig must be equal to sa after sb.Intersect(sa); got\n%v\nvs\n%v", saOrig, sa)
}
Release(sa)
Release(sb)
// Verify sa.Subtract(sb)
mSubtractAB := make(map[uint64]bool)
@ -116,6 +100,7 @@ func TestSetOps(t *testing.T) {
if !sbOrig.Equal(sb) {
t.Fatalf("sbOrig must be equal to sb after sa.Subtract(sb); got\n%v\nvs\n%v", sbOrig, sb)
}
Release(sa)
// Verify sb.Subtract(sa)
mSubtractBA := make(map[uint64]bool)
@ -133,6 +118,8 @@ func TestSetOps(t *testing.T) {
if !saOrig.Equal(sa) {
t.Fatalf("saOrig must be equal to sa after sb.Subtract(sa); got\n%v\nvs\n%v", saOrig, sa)
}
Release(sa)
Release(sb)
}
f(nil, nil)
@ -277,6 +264,7 @@ func testSetBasicOps(t *testing.T, itemsCount int) {
if n := sCopy.Len(); n != 0 {
t.Fatalf("unexpected sCopy.Len() after double deleting the item; got %d; want 0", n)
}
Release(sNil)
}
// Verify forward Add
@ -410,6 +398,7 @@ func testSetBasicOps(t *testing.T, itemsCount int) {
if itemsCount > 0 && calls != 1 {
t.Fatalf("Unexpected number of ForEach callback calls; got %d; want %d", calls, 1)
}
Release(&s)
// Verify ForEach on nil set.
var s1 *Set
@ -449,38 +438,10 @@ func testSetBasicOps(t *testing.T, itemsCount int) {
if n := s3.Len(); n != expectedLen {
t.Fatalf("unexpected s3.Len() after union with empty set; got %d; want %d", n, expectedLen)
}
}
// Verify UnionMayOwn
{
const unionOffset = 12345
var s1, s2 Set
for i := 0; i < itemsCount; i++ {
s1.Add(uint64(i) + offset)
s2.Add(uint64(i) + offset + unionOffset)
}
s1.UnionMayOwn(&s2)
expectedLen := 2 * itemsCount
if itemsCount > unionOffset {
expectedLen = itemsCount + unionOffset
}
if n := s1.Len(); n != expectedLen {
t.Fatalf("unexpected s1.Len() after union; got %d; want %d", n, expectedLen)
}
// Verify union on empty set.
var s3 Set
expectedLen = s1.Len()
s3.UnionMayOwn(&s1)
if n := s3.Len(); n != expectedLen {
t.Fatalf("unexpected s3.Len() after union with empty set; got %d; want %d", n, expectedLen)
}
var s4 Set
expectedLen = s3.Len()
s3.UnionMayOwn(&s4)
if n := s3.Len(); n != expectedLen {
t.Fatalf("unexpected s3.Len() after union with empty set; got %d; want %d", n, expectedLen)
}
Release(&s1)
Release(&s2)
Release(&s3)
Release(&s4)
}
// Verify intersect
@ -521,6 +482,10 @@ func testSetBasicOps(t *testing.T, itemsCount int) {
if n := s4.Len(); n != expectedLen {
t.Fatalf("unexpected s4.Len() after intersect with empty set; got %d; want %d", n, expectedLen)
}
Release(&s1)
Release(&s2)
Release(&s3)
Release(&s4)
}
// Verify subtract
@ -547,6 +512,9 @@ func testSetBasicOps(t *testing.T, itemsCount int) {
if n := s3.Len(); n != 0 {
t.Fatalf("unexpected s3.Len() after subtract from empty set; got %d; want %d", n, expectedLen)
}
Release(&s1)
Release(&s2)
Release(&s3)
}
// Verify Del
@ -597,6 +565,7 @@ func testSetBasicOps(t *testing.T, itemsCount int) {
t.Fatalf("missing bit %d on sCopy", uint64(i)+offset)
}
}
Release(sCopy)
}
func TestSetSparseItems(t *testing.T) {

View file

@ -13,6 +13,7 @@ func BenchmarkAddMulti(b *testing.B) {
start := uint64(time.Now().UnixNano())
sa := createRangeSet(start, itemsCount)
a := sa.AppendTo(nil)
Release(sa)
b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) {
benchmarkAddMulti(b, a)
})
@ -24,6 +25,7 @@ func BenchmarkAdd(b *testing.B) {
start := uint64(time.Now().UnixNano())
sa := createRangeSet(start, itemsCount)
a := sa.AppendTo(nil)
Release(sa)
b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) {
benchmarkAdd(b, a)
})
@ -38,6 +40,8 @@ func BenchmarkUnionNoOverlap(b *testing.B) {
b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) {
benchmarkUnion(b, sa, sb)
})
Release(sa)
Release(sb)
}
}
@ -49,6 +53,8 @@ func BenchmarkUnionPartialOverlap(b *testing.B) {
b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) {
benchmarkUnion(b, sa, sb)
})
Release(sa)
Release(sb)
}
}
@ -60,6 +66,8 @@ func BenchmarkUnionFullOverlap(b *testing.B) {
b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) {
benchmarkUnion(b, sa, sb)
})
Release(sa)
Release(sb)
}
}
@ -72,6 +80,7 @@ func benchmarkAdd(b *testing.B, a []uint64) {
for _, x := range a {
s.Add(x)
}
Release(&s)
}
})
}
@ -91,6 +100,7 @@ func benchmarkAddMulti(b *testing.B, a []uint64) {
s.AddMulti(a[n:m])
n = m
}
Release(&s)
}
})
}
@ -104,6 +114,8 @@ func benchmarkUnion(b *testing.B, sa, sb *Set) {
sbCopy := sb.Clone()
saCopy.Union(sb)
sbCopy.Union(sa)
Release(saCopy)
Release(sbCopy)
}
})
}
@ -150,6 +162,8 @@ func benchmarkIntersect(b *testing.B, sa, sb *Set) {
sbCopy := sb.Clone()
saCopy.Intersect(sb)
sbCopy.Intersect(sa)
Release(saCopy)
Release(sbCopy)
}
})
}
@ -379,6 +393,7 @@ func BenchmarkSetHasHit(b *testing.B) {
}
}
})
Release(&s)
})
}
}
@ -440,6 +455,7 @@ func BenchmarkSetHasMiss(b *testing.B) {
}
}
})
Release(&s)
})
}
}