Revert "lib/uint64set: allow reusing bucket16 structs inside uint64set.Set via uint64set.Release method"

This reverts commit 7c6d3981bf.

Reason for revert: high contention at bucket16Pool on systems with big number of CPU cores.
This slows down query processing significantly.
This commit is contained in:
Aliaksandr Valialkin 2021-07-06 18:21:35 +03:00
parent 8aa9bba9bd
commit a7694092b8
4 changed files with 90 additions and 116 deletions

View file

@ -1996,7 +1996,6 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCountOptimized(tfs *TagFilter
if metricIDsForTimeRange.Len() <= maxTimeRangeMetrics {
return nil, metricIDsForTimeRange, nil
}
uint64set.Release(metricIDsForTimeRange)
return nil, nil, fmt.Errorf("more than %d time series found on the time range %s; either increase -search.maxUniqueTimeseries or shrink the time range",
maxMetrics, tr.String())
}
@ -2289,7 +2288,6 @@ func (is *indexSearch) searchMetricIDs(tfss []*TagFilters, tr TimeRange, maxMetr
}
sortedMetricIDs := metricIDs.AppendTo(nil)
uint64set.Release(metricIDs)
// Filter out deleted metricIDs.
dmis := is.db.s.getDeletedMetricIDs()
@ -2317,11 +2315,9 @@ func (is *indexSearch) searchMetricIDsInternal(tfss []*TagFilters, tr TimeRange,
}
}
if err := is.updateMetricIDsForTagFilters(metricIDs, tfs, tr, maxMetrics+1); err != nil {
uint64set.Release(metricIDs)
return nil, err
}
if metricIDs.Len() > maxMetrics {
uint64set.Release(metricIDs)
return nil, fmt.Errorf("the number of matching unique timeseries exceeds %d; either narrow down the search or increase -search.maxUniqueTimeseries", maxMetrics)
}
}
@ -2359,11 +2355,9 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf
if err != nil {
return err
}
uint64set.Release(minMetricIDs)
minMetricIDs = mIDs
}
metricIDs.Union(minMetricIDs)
uint64set.Release(minMetricIDs)
metricIDs.UnionMayOwn(minMetricIDs)
return nil
}
@ -2715,9 +2709,8 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (*
return
}
if metricIDs.Len() < maxMetrics {
metricIDs.Union(m)
metricIDs.UnionMayOwn(m)
}
uint64set.Release(m)
}(minDate)
minDate++
}
@ -2745,8 +2738,7 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set
if err != nil {
return err
}
metricIDs.Union(m)
uint64set.Release(m)
metricIDs.UnionMayOwn(m)
atomic.AddUint64(&is.db.dateRangeSearchHits, 1)
return nil
}
@ -2773,9 +2765,8 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set
return
}
if metricIDs.Len() < maxMetrics {
metricIDs.Union(m)
metricIDs.UnionMayOwn(m)
}
uint64set.Release(m)
}(minDate)
minDate++
}
@ -2962,7 +2953,6 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
} else {
metricIDs.Intersect(m)
}
uint64set.Release(m)
}
if metricIDs.Len() == 0 {
// There is no need in applying tfsPostponed, since the result is empty.

View file

@ -6,8 +6,6 @@ import (
"sync"
"sync/atomic"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
)
// Set is a fast set for uint64.
@ -37,18 +35,6 @@ func (s *bucket32Sorter) Swap(i, j int) {
a[i], a[j] = a[j], a[i]
}
// Release releases resources occupied by s.
//
// s mustn't be used after Release call.
func Release(s *Set) {
if s == nil {
return
}
for i := range s.buckets {
releaseBucket32(&s.buckets[i])
}
}
// Clone returns an independent copy of s.
func (s *Set) Clone() *Set {
if s == nil || s.itemsCount == 0 {
@ -261,18 +247,34 @@ func (s *Set) sort() {
// Union adds all the items from a to s.
func (s *Set) Union(a *Set) {
s.union(a, false)
}
// UnionMayOwn adds all the items from a to s.
//
// It may own a if s is empty. This means that `a` cannot be used
// after the call to UnionMayOwn.
func (s *Set) UnionMayOwn(a *Set) {
s.union(a, true)
}
func (s *Set) union(a *Set, mayOwn bool) {
if a.Len() == 0 {
// Fast path - nothing to union.
return
}
if s.Len() == 0 {
// Fast path - copy `a` to `s`.
a = a.Clone()
if !mayOwn {
a = a.Clone()
}
*s = *a
return
}
// Make shallow copy of `a`, since it can be modified by a.sort().
a = a.cloneShallow()
if !mayOwn {
a = a.cloneShallow()
}
a.sort()
s.sort()
i := 0
@ -299,7 +301,7 @@ func (s *Set) Union(a *Set) {
break
}
if s.buckets[i].hi == a.buckets[j].hi {
s.buckets[i].union(&a.buckets[j])
s.buckets[i].union(&a.buckets[j], mayOwn)
i++
j++
}
@ -410,12 +412,6 @@ type bucket32 struct {
buckets []*bucket16
}
func releaseBucket32(b32 *bucket32) {
for _, b16 := range b32.buckets {
putBucket16(b16)
}
}
func (b *bucket32) getLen() int {
n := 0
for i := range b.buckets {
@ -424,7 +420,7 @@ func (b *bucket32) getLen() int {
return n
}
func (b *bucket32) union(a *bucket32) {
func (b *bucket32) union(a *bucket32, mayOwn bool) {
i := 0
j := 0
bBucketsLen := len(b.buckets)
@ -435,14 +431,22 @@ func (b *bucket32) union(a *bucket32) {
if i >= bBucketsLen {
for j < len(a.b16his) {
b16 := b.addBucket16(a.b16his[j])
a.buckets[j].copyTo(b16)
if mayOwn {
*b16 = *a.buckets[j]
} else {
a.buckets[j].copyTo(b16)
}
j++
}
break
}
for j < len(a.b16his) && a.b16his[j] < b.b16his[i] {
b16 := b.addBucket16(a.b16his[j])
a.buckets[j].copyTo(b16)
if mayOwn {
*b16 = *a.buckets[j]
} else {
a.buckets[j].copyTo(b16)
}
j++
}
if j >= len(a.b16his) {
@ -554,7 +558,7 @@ func (b *bucket32) copyTo(dst *bucket32) {
if len(b.buckets) > 0 {
dst.buckets = make([]*bucket16, len(b.buckets))
for i, b16 := range b.buckets {
b16Dst := getBucket16()
b16Dst := &bucket16{}
b16.copyTo(b16Dst)
dst.buckets[i] = b16Dst
}
@ -628,8 +632,7 @@ func (b *bucket32) addSlow(hi, lo uint16) bool {
func (b *bucket32) addBucket16(hi uint16) *bucket16 {
b.b16his = append(b.b16his, hi)
b16 := getBucket16()
b.buckets = append(b.buckets, b16)
b.buckets = append(b.buckets, &bucket16{})
return b.buckets[len(b.buckets)-1]
}
@ -644,7 +647,7 @@ func (b *bucket32) addBucketAtPos(hi uint16, pos int) *bucket16 {
b.b16his = append(b.b16his[:pos+1], b.b16his[pos:]...)
b.b16his[pos] = hi
b.buckets = append(b.buckets[:pos+1], b.buckets[pos:]...)
b16 := getBucket16()
b16 := &bucket16{}
b.buckets[pos] = b16
return b16
}
@ -707,40 +710,6 @@ type bucket16 struct {
const smallPoolSize = 56
func getBucket16() *bucket16 {
select {
case b16 := <-bucket16Pool:
return b16
default:
return &bucket16{}
}
}
func putBucket16(b16 *bucket16) {
b16.reset()
select {
case bucket16Pool <- b16:
default:
// Drop b16 in order to reduce memory usage
}
}
// Use a chan instead of sync.Pool for reducing memory usage on systems with big number of CPU cores,
// since sync.Pool holds per-CPU pools of potentially big bucket16 structs (more than 64KB each).
var bucket16Pool = make(chan *bucket16, 100*cgroup.AvailableCPUs())
func (b *bucket16) reset() {
if bits := b.bits; bits != nil {
for i := range bits {
bits[i] = 0
}
}
for i := range b.smallPool {
b.smallPool[i] = 0
}
b.smallPoolLen = 0
}
func (b *bucket16) isZero() bool {
return b.bits == nil && b.smallPoolLen == 0
}

View file

@ -46,7 +46,6 @@ func TestSetOps(t *testing.T) {
if !sbOrig.Equal(sb) {
t.Fatalf("sbOrig must be equal to sb after sa.Union(sb); got\n%v\nvs\n%v", sbOrig, sb)
}
Release(sa)
// Verify sb.Union(sa)
sa = saOrig.Clone()
@ -57,7 +56,27 @@ func TestSetOps(t *testing.T) {
if !saOrig.Equal(sa) {
t.Fatalf("saOrig must be equal to sa after sb.Union(sa); got\n%v\nvs\n%v", saOrig, sa)
}
Release(sb)
// Verify sa.UnionMayOwn(sb)
sa = saOrig.Clone()
sb = sbOrig.Clone()
sa.UnionMayOwn(sb)
if err := expectEqual(sa, mUnion); err != nil {
t.Fatalf("invalid sa.UnionMayOwn(sb): %s", err)
}
if !sbOrig.Equal(sb) {
t.Fatalf("sbOrig must be equal to sb after sa.UnionMayOwn(sb); got\n%v\nvs\n%v", sbOrig, sb)
}
// Verify sb.UnionMayOwn(sa)
sa = saOrig.Clone()
sb.UnionMayOwn(sa)
if err := expectEqual(sb, mUnion); err != nil {
t.Fatalf("invalid sb.UnionMayOwn(sa): %s", err)
}
if !saOrig.Equal(sa) {
t.Fatalf("saOrig must be equal to sa after sb.UnionMayOwn(sa); got\n%v\nvs\n%v", saOrig, sa)
}
// Verify sa.Intersect(sb)
sa = saOrig.Clone()
@ -69,7 +88,6 @@ func TestSetOps(t *testing.T) {
if !sbOrig.Equal(sb) {
t.Fatalf("sbOrig must be equal to sb after sa.Intersect(sb); got\n%v\nvs\n%v", sbOrig, sb)
}
Release(sa)
// Verify sb.Intersect(sa)
sa = saOrig.Clone()
@ -80,8 +98,6 @@ func TestSetOps(t *testing.T) {
if !saOrig.Equal(sa) {
t.Fatalf("saOrig must be equal to sa after sb.Intersect(sa); got\n%v\nvs\n%v", saOrig, sa)
}
Release(sa)
Release(sb)
// Verify sa.Subtract(sb)
mSubtractAB := make(map[uint64]bool)
@ -100,7 +116,6 @@ func TestSetOps(t *testing.T) {
if !sbOrig.Equal(sb) {
t.Fatalf("sbOrig must be equal to sb after sa.Subtract(sb); got\n%v\nvs\n%v", sbOrig, sb)
}
Release(sa)
// Verify sb.Subtract(sa)
mSubtractBA := make(map[uint64]bool)
@ -118,8 +133,6 @@ func TestSetOps(t *testing.T) {
if !saOrig.Equal(sa) {
t.Fatalf("saOrig must be equal to sa after sb.Subtract(sa); got\n%v\nvs\n%v", saOrig, sa)
}
Release(sa)
Release(sb)
}
f(nil, nil)
@ -264,7 +277,6 @@ func testSetBasicOps(t *testing.T, itemsCount int) {
if n := sCopy.Len(); n != 0 {
t.Fatalf("unexpected sCopy.Len() after double deleting the item; got %d; want 0", n)
}
Release(sNil)
}
// Verify forward Add
@ -398,7 +410,6 @@ func testSetBasicOps(t *testing.T, itemsCount int) {
if itemsCount > 0 && calls != 1 {
t.Fatalf("Unexpected number of ForEach callback calls; got %d; want %d", calls, 1)
}
Release(&s)
// Verify ForEach on nil set.
var s1 *Set
@ -438,10 +449,38 @@ func testSetBasicOps(t *testing.T, itemsCount int) {
if n := s3.Len(); n != expectedLen {
t.Fatalf("unexpected s3.Len() after union with empty set; got %d; want %d", n, expectedLen)
}
Release(&s1)
Release(&s2)
Release(&s3)
Release(&s4)
}
// Verify UnionMayOwn
{
const unionOffset = 12345
var s1, s2 Set
for i := 0; i < itemsCount; i++ {
s1.Add(uint64(i) + offset)
s2.Add(uint64(i) + offset + unionOffset)
}
s1.UnionMayOwn(&s2)
expectedLen := 2 * itemsCount
if itemsCount > unionOffset {
expectedLen = itemsCount + unionOffset
}
if n := s1.Len(); n != expectedLen {
t.Fatalf("unexpected s1.Len() after union; got %d; want %d", n, expectedLen)
}
// Verify union on empty set.
var s3 Set
expectedLen = s1.Len()
s3.UnionMayOwn(&s1)
if n := s3.Len(); n != expectedLen {
t.Fatalf("unexpected s3.Len() after union with empty set; got %d; want %d", n, expectedLen)
}
var s4 Set
expectedLen = s3.Len()
s3.UnionMayOwn(&s4)
if n := s3.Len(); n != expectedLen {
t.Fatalf("unexpected s3.Len() after union with empty set; got %d; want %d", n, expectedLen)
}
}
// Verify intersect
@ -482,10 +521,6 @@ func testSetBasicOps(t *testing.T, itemsCount int) {
if n := s4.Len(); n != expectedLen {
t.Fatalf("unexpected s4.Len() after intersect with empty set; got %d; want %d", n, expectedLen)
}
Release(&s1)
Release(&s2)
Release(&s3)
Release(&s4)
}
// Verify subtract
@ -512,9 +547,6 @@ func testSetBasicOps(t *testing.T, itemsCount int) {
if n := s3.Len(); n != 0 {
t.Fatalf("unexpected s3.Len() after subtract from empty set; got %d; want %d", n, expectedLen)
}
Release(&s1)
Release(&s2)
Release(&s3)
}
// Verify Del
@ -565,7 +597,6 @@ func testSetBasicOps(t *testing.T, itemsCount int) {
t.Fatalf("missing bit %d on sCopy", uint64(i)+offset)
}
}
Release(sCopy)
}
func TestSetSparseItems(t *testing.T) {

View file

@ -13,7 +13,6 @@ func BenchmarkAddMulti(b *testing.B) {
start := uint64(time.Now().UnixNano())
sa := createRangeSet(start, itemsCount)
a := sa.AppendTo(nil)
Release(sa)
b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) {
benchmarkAddMulti(b, a)
})
@ -25,7 +24,6 @@ func BenchmarkAdd(b *testing.B) {
start := uint64(time.Now().UnixNano())
sa := createRangeSet(start, itemsCount)
a := sa.AppendTo(nil)
Release(sa)
b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) {
benchmarkAdd(b, a)
})
@ -40,8 +38,6 @@ func BenchmarkUnionNoOverlap(b *testing.B) {
b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) {
benchmarkUnion(b, sa, sb)
})
Release(sa)
Release(sb)
}
}
@ -53,8 +49,6 @@ func BenchmarkUnionPartialOverlap(b *testing.B) {
b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) {
benchmarkUnion(b, sa, sb)
})
Release(sa)
Release(sb)
}
}
@ -66,8 +60,6 @@ func BenchmarkUnionFullOverlap(b *testing.B) {
b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) {
benchmarkUnion(b, sa, sb)
})
Release(sa)
Release(sb)
}
}
@ -80,7 +72,6 @@ func benchmarkAdd(b *testing.B, a []uint64) {
for _, x := range a {
s.Add(x)
}
Release(&s)
}
})
}
@ -100,7 +91,6 @@ func benchmarkAddMulti(b *testing.B, a []uint64) {
s.AddMulti(a[n:m])
n = m
}
Release(&s)
}
})
}
@ -114,8 +104,6 @@ func benchmarkUnion(b *testing.B, sa, sb *Set) {
sbCopy := sb.Clone()
saCopy.Union(sb)
sbCopy.Union(sa)
Release(saCopy)
Release(sbCopy)
}
})
}
@ -162,8 +150,6 @@ func benchmarkIntersect(b *testing.B, sa, sb *Set) {
sbCopy := sb.Clone()
saCopy.Intersect(sb)
sbCopy.Intersect(sa)
Release(saCopy)
Release(sbCopy)
}
})
}
@ -393,7 +379,6 @@ func BenchmarkSetHasHit(b *testing.B) {
}
}
})
Release(&s)
})
}
}
@ -455,7 +440,6 @@ func BenchmarkSetHasMiss(b *testing.B) {
}
}
})
Release(&s)
})
}
}