From a7694092b8a6ff0366aeba63455bd78481a0acf6 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 6 Jul 2021 18:21:35 +0300 Subject: [PATCH] Revert "lib/uint64set: allow reusing bucket16 structs inside uint64set.Set via uint64set.Release method" This reverts commit 7c6d3981bf88e162b039c169a22dc34e05fae216. Reason for revert: high contention at bucket16Pool on systems with big number of CPU cores. This slows down query processing significantly. --- lib/storage/index_db.go | 18 ++--- lib/uint64set/uint64set.go | 97 +++++++++----------------- lib/uint64set/uint64set_test.go | 75 ++++++++++++++------ lib/uint64set/uint64set_timing_test.go | 16 ----- 4 files changed, 90 insertions(+), 116 deletions(-) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 3015ac408a..00af9b9297 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -1996,7 +1996,6 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCountOptimized(tfs *TagFilter if metricIDsForTimeRange.Len() <= maxTimeRangeMetrics { return nil, metricIDsForTimeRange, nil } - uint64set.Release(metricIDsForTimeRange) return nil, nil, fmt.Errorf("more than %d time series found on the time range %s; either increase -search.maxUniqueTimeseries or shrink the time range", maxMetrics, tr.String()) } @@ -2289,7 +2288,6 @@ func (is *indexSearch) searchMetricIDs(tfss []*TagFilters, tr TimeRange, maxMetr } sortedMetricIDs := metricIDs.AppendTo(nil) - uint64set.Release(metricIDs) // Filter out deleted metricIDs. dmis := is.db.s.getDeletedMetricIDs() @@ -2317,11 +2315,9 @@ func (is *indexSearch) searchMetricIDsInternal(tfss []*TagFilters, tr TimeRange, } } if err := is.updateMetricIDsForTagFilters(metricIDs, tfs, tr, maxMetrics+1); err != nil { - uint64set.Release(metricIDs) return nil, err } if metricIDs.Len() > maxMetrics { - uint64set.Release(metricIDs) return nil, fmt.Errorf("the number of matching unique timeseries exceeds %d; either narrow down the search or increase -search.maxUniqueTimeseries", maxMetrics) } } @@ -2359,11 +2355,9 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf if err != nil { return err } - uint64set.Release(minMetricIDs) minMetricIDs = mIDs } - metricIDs.Union(minMetricIDs) - uint64set.Release(minMetricIDs) + metricIDs.UnionMayOwn(minMetricIDs) return nil } @@ -2715,9 +2709,8 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (* return } if metricIDs.Len() < maxMetrics { - metricIDs.Union(m) + metricIDs.UnionMayOwn(m) } - uint64set.Release(m) }(minDate) minDate++ } @@ -2745,8 +2738,7 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set if err != nil { return err } - metricIDs.Union(m) - uint64set.Release(m) + metricIDs.UnionMayOwn(m) atomic.AddUint64(&is.db.dateRangeSearchHits, 1) return nil } @@ -2773,9 +2765,8 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set return } if metricIDs.Len() < maxMetrics { - metricIDs.Union(m) + metricIDs.UnionMayOwn(m) } - uint64set.Release(m) }(minDate) minDate++ } @@ -2962,7 +2953,6 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter } else { metricIDs.Intersect(m) } - uint64set.Release(m) } if metricIDs.Len() == 0 { // There is no need in applying tfsPostponed, since the result is empty. diff --git a/lib/uint64set/uint64set.go b/lib/uint64set/uint64set.go index 12c64c3440..37c5417ac7 100644 --- a/lib/uint64set/uint64set.go +++ b/lib/uint64set/uint64set.go @@ -6,8 +6,6 @@ import ( "sync" "sync/atomic" "unsafe" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" ) // Set is a fast set for uint64. @@ -37,18 +35,6 @@ func (s *bucket32Sorter) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -// Release releases resources occupied by s. -// -// s mustn't be used after Release call. -func Release(s *Set) { - if s == nil { - return - } - for i := range s.buckets { - releaseBucket32(&s.buckets[i]) - } -} - // Clone returns an independent copy of s. func (s *Set) Clone() *Set { if s == nil || s.itemsCount == 0 { @@ -261,18 +247,34 @@ func (s *Set) sort() { // Union adds all the items from a to s. func (s *Set) Union(a *Set) { + s.union(a, false) +} + +// UnionMayOwn adds all the items from a to s. +// +// It may own a if s is empty. This means that `a` cannot be used +// after the call to UnionMayOwn. +func (s *Set) UnionMayOwn(a *Set) { + s.union(a, true) +} + +func (s *Set) union(a *Set, mayOwn bool) { if a.Len() == 0 { // Fast path - nothing to union. return } if s.Len() == 0 { // Fast path - copy `a` to `s`. - a = a.Clone() + if !mayOwn { + a = a.Clone() + } *s = *a return } // Make shallow copy of `a`, since it can be modified by a.sort(). - a = a.cloneShallow() + if !mayOwn { + a = a.cloneShallow() + } a.sort() s.sort() i := 0 @@ -299,7 +301,7 @@ func (s *Set) Union(a *Set) { break } if s.buckets[i].hi == a.buckets[j].hi { - s.buckets[i].union(&a.buckets[j]) + s.buckets[i].union(&a.buckets[j], mayOwn) i++ j++ } @@ -410,12 +412,6 @@ type bucket32 struct { buckets []*bucket16 } -func releaseBucket32(b32 *bucket32) { - for _, b16 := range b32.buckets { - putBucket16(b16) - } -} - func (b *bucket32) getLen() int { n := 0 for i := range b.buckets { @@ -424,7 +420,7 @@ func (b *bucket32) getLen() int { return n } -func (b *bucket32) union(a *bucket32) { +func (b *bucket32) union(a *bucket32, mayOwn bool) { i := 0 j := 0 bBucketsLen := len(b.buckets) @@ -435,14 +431,22 @@ func (b *bucket32) union(a *bucket32) { if i >= bBucketsLen { for j < len(a.b16his) { b16 := b.addBucket16(a.b16his[j]) - a.buckets[j].copyTo(b16) + if mayOwn { + *b16 = *a.buckets[j] + } else { + a.buckets[j].copyTo(b16) + } j++ } break } for j < len(a.b16his) && a.b16his[j] < b.b16his[i] { b16 := b.addBucket16(a.b16his[j]) - a.buckets[j].copyTo(b16) + if mayOwn { + *b16 = *a.buckets[j] + } else { + a.buckets[j].copyTo(b16) + } j++ } if j >= len(a.b16his) { @@ -554,7 +558,7 @@ func (b *bucket32) copyTo(dst *bucket32) { if len(b.buckets) > 0 { dst.buckets = make([]*bucket16, len(b.buckets)) for i, b16 := range b.buckets { - b16Dst := getBucket16() + b16Dst := &bucket16{} b16.copyTo(b16Dst) dst.buckets[i] = b16Dst } @@ -628,8 +632,7 @@ func (b *bucket32) addSlow(hi, lo uint16) bool { func (b *bucket32) addBucket16(hi uint16) *bucket16 { b.b16his = append(b.b16his, hi) - b16 := getBucket16() - b.buckets = append(b.buckets, b16) + b.buckets = append(b.buckets, &bucket16{}) return b.buckets[len(b.buckets)-1] } @@ -644,7 +647,7 @@ func (b *bucket32) addBucketAtPos(hi uint16, pos int) *bucket16 { b.b16his = append(b.b16his[:pos+1], b.b16his[pos:]...) b.b16his[pos] = hi b.buckets = append(b.buckets[:pos+1], b.buckets[pos:]...) - b16 := getBucket16() + b16 := &bucket16{} b.buckets[pos] = b16 return b16 } @@ -707,40 +710,6 @@ type bucket16 struct { const smallPoolSize = 56 -func getBucket16() *bucket16 { - select { - case b16 := <-bucket16Pool: - return b16 - default: - return &bucket16{} - } -} - -func putBucket16(b16 *bucket16) { - b16.reset() - select { - case bucket16Pool <- b16: - default: - // Drop b16 in order to reduce memory usage - } -} - -// Use a chan instead of sync.Pool for reducing memory usage on systems with big number of CPU cores, -// since sync.Pool holds per-CPU pools of potentially big bucket16 structs (more than 64KB each). -var bucket16Pool = make(chan *bucket16, 100*cgroup.AvailableCPUs()) - -func (b *bucket16) reset() { - if bits := b.bits; bits != nil { - for i := range bits { - bits[i] = 0 - } - } - for i := range b.smallPool { - b.smallPool[i] = 0 - } - b.smallPoolLen = 0 -} - func (b *bucket16) isZero() bool { return b.bits == nil && b.smallPoolLen == 0 } diff --git a/lib/uint64set/uint64set_test.go b/lib/uint64set/uint64set_test.go index a9391ae312..5cd97cf950 100644 --- a/lib/uint64set/uint64set_test.go +++ b/lib/uint64set/uint64set_test.go @@ -46,7 +46,6 @@ func TestSetOps(t *testing.T) { if !sbOrig.Equal(sb) { t.Fatalf("sbOrig must be equal to sb after sa.Union(sb); got\n%v\nvs\n%v", sbOrig, sb) } - Release(sa) // Verify sb.Union(sa) sa = saOrig.Clone() @@ -57,7 +56,27 @@ func TestSetOps(t *testing.T) { if !saOrig.Equal(sa) { t.Fatalf("saOrig must be equal to sa after sb.Union(sa); got\n%v\nvs\n%v", saOrig, sa) } - Release(sb) + + // Verify sa.UnionMayOwn(sb) + sa = saOrig.Clone() + sb = sbOrig.Clone() + sa.UnionMayOwn(sb) + if err := expectEqual(sa, mUnion); err != nil { + t.Fatalf("invalid sa.UnionMayOwn(sb): %s", err) + } + if !sbOrig.Equal(sb) { + t.Fatalf("sbOrig must be equal to sb after sa.UnionMayOwn(sb); got\n%v\nvs\n%v", sbOrig, sb) + } + + // Verify sb.UnionMayOwn(sa) + sa = saOrig.Clone() + sb.UnionMayOwn(sa) + if err := expectEqual(sb, mUnion); err != nil { + t.Fatalf("invalid sb.UnionMayOwn(sa): %s", err) + } + if !saOrig.Equal(sa) { + t.Fatalf("saOrig must be equal to sa after sb.UnionMayOwn(sa); got\n%v\nvs\n%v", saOrig, sa) + } // Verify sa.Intersect(sb) sa = saOrig.Clone() @@ -69,7 +88,6 @@ func TestSetOps(t *testing.T) { if !sbOrig.Equal(sb) { t.Fatalf("sbOrig must be equal to sb after sa.Intersect(sb); got\n%v\nvs\n%v", sbOrig, sb) } - Release(sa) // Verify sb.Intersect(sa) sa = saOrig.Clone() @@ -80,8 +98,6 @@ func TestSetOps(t *testing.T) { if !saOrig.Equal(sa) { t.Fatalf("saOrig must be equal to sa after sb.Intersect(sa); got\n%v\nvs\n%v", saOrig, sa) } - Release(sa) - Release(sb) // Verify sa.Subtract(sb) mSubtractAB := make(map[uint64]bool) @@ -100,7 +116,6 @@ func TestSetOps(t *testing.T) { if !sbOrig.Equal(sb) { t.Fatalf("sbOrig must be equal to sb after sa.Subtract(sb); got\n%v\nvs\n%v", sbOrig, sb) } - Release(sa) // Verify sb.Subtract(sa) mSubtractBA := make(map[uint64]bool) @@ -118,8 +133,6 @@ func TestSetOps(t *testing.T) { if !saOrig.Equal(sa) { t.Fatalf("saOrig must be equal to sa after sb.Subtract(sa); got\n%v\nvs\n%v", saOrig, sa) } - Release(sa) - Release(sb) } f(nil, nil) @@ -264,7 +277,6 @@ func testSetBasicOps(t *testing.T, itemsCount int) { if n := sCopy.Len(); n != 0 { t.Fatalf("unexpected sCopy.Len() after double deleting the item; got %d; want 0", n) } - Release(sNil) } // Verify forward Add @@ -398,7 +410,6 @@ func testSetBasicOps(t *testing.T, itemsCount int) { if itemsCount > 0 && calls != 1 { t.Fatalf("Unexpected number of ForEach callback calls; got %d; want %d", calls, 1) } - Release(&s) // Verify ForEach on nil set. var s1 *Set @@ -438,10 +449,38 @@ func testSetBasicOps(t *testing.T, itemsCount int) { if n := s3.Len(); n != expectedLen { t.Fatalf("unexpected s3.Len() after union with empty set; got %d; want %d", n, expectedLen) } - Release(&s1) - Release(&s2) - Release(&s3) - Release(&s4) + } + + // Verify UnionMayOwn + { + const unionOffset = 12345 + var s1, s2 Set + for i := 0; i < itemsCount; i++ { + s1.Add(uint64(i) + offset) + s2.Add(uint64(i) + offset + unionOffset) + } + s1.UnionMayOwn(&s2) + expectedLen := 2 * itemsCount + if itemsCount > unionOffset { + expectedLen = itemsCount + unionOffset + } + if n := s1.Len(); n != expectedLen { + t.Fatalf("unexpected s1.Len() after union; got %d; want %d", n, expectedLen) + } + + // Verify union on empty set. + var s3 Set + expectedLen = s1.Len() + s3.UnionMayOwn(&s1) + if n := s3.Len(); n != expectedLen { + t.Fatalf("unexpected s3.Len() after union with empty set; got %d; want %d", n, expectedLen) + } + var s4 Set + expectedLen = s3.Len() + s3.UnionMayOwn(&s4) + if n := s3.Len(); n != expectedLen { + t.Fatalf("unexpected s3.Len() after union with empty set; got %d; want %d", n, expectedLen) + } } // Verify intersect @@ -482,10 +521,6 @@ func testSetBasicOps(t *testing.T, itemsCount int) { if n := s4.Len(); n != expectedLen { t.Fatalf("unexpected s4.Len() after intersect with empty set; got %d; want %d", n, expectedLen) } - Release(&s1) - Release(&s2) - Release(&s3) - Release(&s4) } // Verify subtract @@ -512,9 +547,6 @@ func testSetBasicOps(t *testing.T, itemsCount int) { if n := s3.Len(); n != 0 { t.Fatalf("unexpected s3.Len() after subtract from empty set; got %d; want %d", n, expectedLen) } - Release(&s1) - Release(&s2) - Release(&s3) } // Verify Del @@ -565,7 +597,6 @@ func testSetBasicOps(t *testing.T, itemsCount int) { t.Fatalf("missing bit %d on sCopy", uint64(i)+offset) } } - Release(sCopy) } func TestSetSparseItems(t *testing.T) { diff --git a/lib/uint64set/uint64set_timing_test.go b/lib/uint64set/uint64set_timing_test.go index ea870dcf1a..ebd981d011 100644 --- a/lib/uint64set/uint64set_timing_test.go +++ b/lib/uint64set/uint64set_timing_test.go @@ -13,7 +13,6 @@ func BenchmarkAddMulti(b *testing.B) { start := uint64(time.Now().UnixNano()) sa := createRangeSet(start, itemsCount) a := sa.AppendTo(nil) - Release(sa) b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) { benchmarkAddMulti(b, a) }) @@ -25,7 +24,6 @@ func BenchmarkAdd(b *testing.B) { start := uint64(time.Now().UnixNano()) sa := createRangeSet(start, itemsCount) a := sa.AppendTo(nil) - Release(sa) b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) { benchmarkAdd(b, a) }) @@ -40,8 +38,6 @@ func BenchmarkUnionNoOverlap(b *testing.B) { b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) { benchmarkUnion(b, sa, sb) }) - Release(sa) - Release(sb) } } @@ -53,8 +49,6 @@ func BenchmarkUnionPartialOverlap(b *testing.B) { b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) { benchmarkUnion(b, sa, sb) }) - Release(sa) - Release(sb) } } @@ -66,8 +60,6 @@ func BenchmarkUnionFullOverlap(b *testing.B) { b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) { benchmarkUnion(b, sa, sb) }) - Release(sa) - Release(sb) } } @@ -80,7 +72,6 @@ func benchmarkAdd(b *testing.B, a []uint64) { for _, x := range a { s.Add(x) } - Release(&s) } }) } @@ -100,7 +91,6 @@ func benchmarkAddMulti(b *testing.B, a []uint64) { s.AddMulti(a[n:m]) n = m } - Release(&s) } }) } @@ -114,8 +104,6 @@ func benchmarkUnion(b *testing.B, sa, sb *Set) { sbCopy := sb.Clone() saCopy.Union(sb) sbCopy.Union(sa) - Release(saCopy) - Release(sbCopy) } }) } @@ -162,8 +150,6 @@ func benchmarkIntersect(b *testing.B, sa, sb *Set) { sbCopy := sb.Clone() saCopy.Intersect(sb) sbCopy.Intersect(sa) - Release(saCopy) - Release(sbCopy) } }) } @@ -393,7 +379,6 @@ func BenchmarkSetHasHit(b *testing.B) { } } }) - Release(&s) }) } } @@ -455,7 +440,6 @@ func BenchmarkSetHasMiss(b *testing.B) { } } }) - Release(&s) }) } }