From 7c6d3981bf88e162b039c169a22dc34e05fae216 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 6 Jul 2021 15:33:43 +0300 Subject: [PATCH] lib/uint64set: allow reusing bucket16 structs inside uint64set.Set via uint64set.Release method This reduces the load on memory allocator in Go runtime in production workload. --- lib/storage/index_db.go | 18 +++-- lib/uint64set/uint64set.go | 97 +++++++++++++++++--------- lib/uint64set/uint64set_test.go | 75 ++++++-------------- lib/uint64set/uint64set_timing_test.go | 16 +++++ 4 files changed, 116 insertions(+), 90 deletions(-) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 00af9b9297..3015ac408a 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -1996,6 +1996,7 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCountOptimized(tfs *TagFilter if metricIDsForTimeRange.Len() <= maxTimeRangeMetrics { return nil, metricIDsForTimeRange, nil } + uint64set.Release(metricIDsForTimeRange) return nil, nil, fmt.Errorf("more than %d time series found on the time range %s; either increase -search.maxUniqueTimeseries or shrink the time range", maxMetrics, tr.String()) } @@ -2288,6 +2289,7 @@ func (is *indexSearch) searchMetricIDs(tfss []*TagFilters, tr TimeRange, maxMetr } sortedMetricIDs := metricIDs.AppendTo(nil) + uint64set.Release(metricIDs) // Filter out deleted metricIDs. dmis := is.db.s.getDeletedMetricIDs() @@ -2315,9 +2317,11 @@ func (is *indexSearch) searchMetricIDsInternal(tfss []*TagFilters, tr TimeRange, } } if err := is.updateMetricIDsForTagFilters(metricIDs, tfs, tr, maxMetrics+1); err != nil { + uint64set.Release(metricIDs) return nil, err } if metricIDs.Len() > maxMetrics { + uint64set.Release(metricIDs) return nil, fmt.Errorf("the number of matching unique timeseries exceeds %d; either narrow down the search or increase -search.maxUniqueTimeseries", maxMetrics) } } @@ -2355,9 +2359,11 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf if err != nil { return err } + uint64set.Release(minMetricIDs) minMetricIDs = mIDs } - metricIDs.UnionMayOwn(minMetricIDs) + metricIDs.Union(minMetricIDs) + uint64set.Release(minMetricIDs) return nil } @@ -2709,8 +2715,9 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (* return } if metricIDs.Len() < maxMetrics { - metricIDs.UnionMayOwn(m) + metricIDs.Union(m) } + uint64set.Release(m) }(minDate) minDate++ } @@ -2738,7 +2745,8 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set if err != nil { return err } - metricIDs.UnionMayOwn(m) + metricIDs.Union(m) + uint64set.Release(m) atomic.AddUint64(&is.db.dateRangeSearchHits, 1) return nil } @@ -2765,8 +2773,9 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set return } if metricIDs.Len() < maxMetrics { - metricIDs.UnionMayOwn(m) + metricIDs.Union(m) } + uint64set.Release(m) }(minDate) minDate++ } @@ -2953,6 +2962,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter } else { metricIDs.Intersect(m) } + uint64set.Release(m) } if metricIDs.Len() == 0 { // There is no need in applying tfsPostponed, since the result is empty. diff --git a/lib/uint64set/uint64set.go b/lib/uint64set/uint64set.go index 37c5417ac7..12c64c3440 100644 --- a/lib/uint64set/uint64set.go +++ b/lib/uint64set/uint64set.go @@ -6,6 +6,8 @@ import ( "sync" "sync/atomic" "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" ) // Set is a fast set for uint64. @@ -35,6 +37,18 @@ func (s *bucket32Sorter) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +// Release releases resources occupied by s. +// +// s mustn't be used after Release call. +func Release(s *Set) { + if s == nil { + return + } + for i := range s.buckets { + releaseBucket32(&s.buckets[i]) + } +} + // Clone returns an independent copy of s. func (s *Set) Clone() *Set { if s == nil || s.itemsCount == 0 { @@ -247,34 +261,18 @@ func (s *Set) sort() { // Union adds all the items from a to s. func (s *Set) Union(a *Set) { - s.union(a, false) -} - -// UnionMayOwn adds all the items from a to s. -// -// It may own a if s is empty. This means that `a` cannot be used -// after the call to UnionMayOwn. -func (s *Set) UnionMayOwn(a *Set) { - s.union(a, true) -} - -func (s *Set) union(a *Set, mayOwn bool) { if a.Len() == 0 { // Fast path - nothing to union. return } if s.Len() == 0 { // Fast path - copy `a` to `s`. - if !mayOwn { - a = a.Clone() - } + a = a.Clone() *s = *a return } // Make shallow copy of `a`, since it can be modified by a.sort(). - if !mayOwn { - a = a.cloneShallow() - } + a = a.cloneShallow() a.sort() s.sort() i := 0 @@ -301,7 +299,7 @@ func (s *Set) union(a *Set, mayOwn bool) { break } if s.buckets[i].hi == a.buckets[j].hi { - s.buckets[i].union(&a.buckets[j], mayOwn) + s.buckets[i].union(&a.buckets[j]) i++ j++ } @@ -412,6 +410,12 @@ type bucket32 struct { buckets []*bucket16 } +func releaseBucket32(b32 *bucket32) { + for _, b16 := range b32.buckets { + putBucket16(b16) + } +} + func (b *bucket32) getLen() int { n := 0 for i := range b.buckets { @@ -420,7 +424,7 @@ func (b *bucket32) getLen() int { return n } -func (b *bucket32) union(a *bucket32, mayOwn bool) { +func (b *bucket32) union(a *bucket32) { i := 0 j := 0 bBucketsLen := len(b.buckets) @@ -431,22 +435,14 @@ func (b *bucket32) union(a *bucket32, mayOwn bool) { if i >= bBucketsLen { for j < len(a.b16his) { b16 := b.addBucket16(a.b16his[j]) - if mayOwn { - *b16 = *a.buckets[j] - } else { - a.buckets[j].copyTo(b16) - } + a.buckets[j].copyTo(b16) j++ } break } for j < len(a.b16his) && a.b16his[j] < b.b16his[i] { b16 := b.addBucket16(a.b16his[j]) - if mayOwn { - *b16 = *a.buckets[j] - } else { - a.buckets[j].copyTo(b16) - } + a.buckets[j].copyTo(b16) j++ } if j >= len(a.b16his) { @@ -558,7 +554,7 @@ func (b *bucket32) copyTo(dst *bucket32) { if len(b.buckets) > 0 { dst.buckets = make([]*bucket16, len(b.buckets)) for i, b16 := range b.buckets { - b16Dst := &bucket16{} + b16Dst := getBucket16() b16.copyTo(b16Dst) dst.buckets[i] = b16Dst } @@ -632,7 +628,8 @@ func (b *bucket32) addSlow(hi, lo uint16) bool { func (b *bucket32) addBucket16(hi uint16) *bucket16 { b.b16his = append(b.b16his, hi) - b.buckets = append(b.buckets, &bucket16{}) + b16 := getBucket16() + b.buckets = append(b.buckets, b16) return b.buckets[len(b.buckets)-1] } @@ -647,7 +644,7 @@ func (b *bucket32) addBucketAtPos(hi uint16, pos int) *bucket16 { b.b16his = append(b.b16his[:pos+1], b.b16his[pos:]...) b.b16his[pos] = hi b.buckets = append(b.buckets[:pos+1], b.buckets[pos:]...) - b16 := &bucket16{} + b16 := getBucket16() b.buckets[pos] = b16 return b16 } @@ -710,6 +707,40 @@ type bucket16 struct { const smallPoolSize = 56 +func getBucket16() *bucket16 { + select { + case b16 := <-bucket16Pool: + return b16 + default: + return &bucket16{} + } +} + +func putBucket16(b16 *bucket16) { + b16.reset() + select { + case bucket16Pool <- b16: + default: + // Drop b16 in order to reduce memory usage + } +} + +// Use a chan instead of sync.Pool for reducing memory usage on systems with big number of CPU cores, +// since sync.Pool holds per-CPU pools of potentially big bucket16 structs (more than 64KB each). +var bucket16Pool = make(chan *bucket16, 100*cgroup.AvailableCPUs()) + +func (b *bucket16) reset() { + if bits := b.bits; bits != nil { + for i := range bits { + bits[i] = 0 + } + } + for i := range b.smallPool { + b.smallPool[i] = 0 + } + b.smallPoolLen = 0 +} + func (b *bucket16) isZero() bool { return b.bits == nil && b.smallPoolLen == 0 } diff --git a/lib/uint64set/uint64set_test.go b/lib/uint64set/uint64set_test.go index 5cd97cf950..a9391ae312 100644 --- a/lib/uint64set/uint64set_test.go +++ b/lib/uint64set/uint64set_test.go @@ -46,6 +46,7 @@ func TestSetOps(t *testing.T) { if !sbOrig.Equal(sb) { t.Fatalf("sbOrig must be equal to sb after sa.Union(sb); got\n%v\nvs\n%v", sbOrig, sb) } + Release(sa) // Verify sb.Union(sa) sa = saOrig.Clone() @@ -56,27 +57,7 @@ func TestSetOps(t *testing.T) { if !saOrig.Equal(sa) { t.Fatalf("saOrig must be equal to sa after sb.Union(sa); got\n%v\nvs\n%v", saOrig, sa) } - - // Verify sa.UnionMayOwn(sb) - sa = saOrig.Clone() - sb = sbOrig.Clone() - sa.UnionMayOwn(sb) - if err := expectEqual(sa, mUnion); err != nil { - t.Fatalf("invalid sa.UnionMayOwn(sb): %s", err) - } - if !sbOrig.Equal(sb) { - t.Fatalf("sbOrig must be equal to sb after sa.UnionMayOwn(sb); got\n%v\nvs\n%v", sbOrig, sb) - } - - // Verify sb.UnionMayOwn(sa) - sa = saOrig.Clone() - sb.UnionMayOwn(sa) - if err := expectEqual(sb, mUnion); err != nil { - t.Fatalf("invalid sb.UnionMayOwn(sa): %s", err) - } - if !saOrig.Equal(sa) { - t.Fatalf("saOrig must be equal to sa after sb.UnionMayOwn(sa); got\n%v\nvs\n%v", saOrig, sa) - } + Release(sb) // Verify sa.Intersect(sb) sa = saOrig.Clone() @@ -88,6 +69,7 @@ func TestSetOps(t *testing.T) { if !sbOrig.Equal(sb) { t.Fatalf("sbOrig must be equal to sb after sa.Intersect(sb); got\n%v\nvs\n%v", sbOrig, sb) } + Release(sa) // Verify sb.Intersect(sa) sa = saOrig.Clone() @@ -98,6 +80,8 @@ func TestSetOps(t *testing.T) { if !saOrig.Equal(sa) { t.Fatalf("saOrig must be equal to sa after sb.Intersect(sa); got\n%v\nvs\n%v", saOrig, sa) } + Release(sa) + Release(sb) // Verify sa.Subtract(sb) mSubtractAB := make(map[uint64]bool) @@ -116,6 +100,7 @@ func TestSetOps(t *testing.T) { if !sbOrig.Equal(sb) { t.Fatalf("sbOrig must be equal to sb after sa.Subtract(sb); got\n%v\nvs\n%v", sbOrig, sb) } + Release(sa) // Verify sb.Subtract(sa) mSubtractBA := make(map[uint64]bool) @@ -133,6 +118,8 @@ func TestSetOps(t *testing.T) { if !saOrig.Equal(sa) { t.Fatalf("saOrig must be equal to sa after sb.Subtract(sa); got\n%v\nvs\n%v", saOrig, sa) } + Release(sa) + Release(sb) } f(nil, nil) @@ -277,6 +264,7 @@ func testSetBasicOps(t *testing.T, itemsCount int) { if n := sCopy.Len(); n != 0 { t.Fatalf("unexpected sCopy.Len() after double deleting the item; got %d; want 0", n) } + Release(sNil) } // Verify forward Add @@ -410,6 +398,7 @@ func testSetBasicOps(t *testing.T, itemsCount int) { if itemsCount > 0 && calls != 1 { t.Fatalf("Unexpected number of ForEach callback calls; got %d; want %d", calls, 1) } + Release(&s) // Verify ForEach on nil set. var s1 *Set @@ -449,38 +438,10 @@ func testSetBasicOps(t *testing.T, itemsCount int) { if n := s3.Len(); n != expectedLen { t.Fatalf("unexpected s3.Len() after union with empty set; got %d; want %d", n, expectedLen) } - } - - // Verify UnionMayOwn - { - const unionOffset = 12345 - var s1, s2 Set - for i := 0; i < itemsCount; i++ { - s1.Add(uint64(i) + offset) - s2.Add(uint64(i) + offset + unionOffset) - } - s1.UnionMayOwn(&s2) - expectedLen := 2 * itemsCount - if itemsCount > unionOffset { - expectedLen = itemsCount + unionOffset - } - if n := s1.Len(); n != expectedLen { - t.Fatalf("unexpected s1.Len() after union; got %d; want %d", n, expectedLen) - } - - // Verify union on empty set. - var s3 Set - expectedLen = s1.Len() - s3.UnionMayOwn(&s1) - if n := s3.Len(); n != expectedLen { - t.Fatalf("unexpected s3.Len() after union with empty set; got %d; want %d", n, expectedLen) - } - var s4 Set - expectedLen = s3.Len() - s3.UnionMayOwn(&s4) - if n := s3.Len(); n != expectedLen { - t.Fatalf("unexpected s3.Len() after union with empty set; got %d; want %d", n, expectedLen) - } + Release(&s1) + Release(&s2) + Release(&s3) + Release(&s4) } // Verify intersect @@ -521,6 +482,10 @@ func testSetBasicOps(t *testing.T, itemsCount int) { if n := s4.Len(); n != expectedLen { t.Fatalf("unexpected s4.Len() after intersect with empty set; got %d; want %d", n, expectedLen) } + Release(&s1) + Release(&s2) + Release(&s3) + Release(&s4) } // Verify subtract @@ -547,6 +512,9 @@ func testSetBasicOps(t *testing.T, itemsCount int) { if n := s3.Len(); n != 0 { t.Fatalf("unexpected s3.Len() after subtract from empty set; got %d; want %d", n, expectedLen) } + Release(&s1) + Release(&s2) + Release(&s3) } // Verify Del @@ -597,6 +565,7 @@ func testSetBasicOps(t *testing.T, itemsCount int) { t.Fatalf("missing bit %d on sCopy", uint64(i)+offset) } } + Release(sCopy) } func TestSetSparseItems(t *testing.T) { diff --git a/lib/uint64set/uint64set_timing_test.go b/lib/uint64set/uint64set_timing_test.go index ebd981d011..ea870dcf1a 100644 --- a/lib/uint64set/uint64set_timing_test.go +++ b/lib/uint64set/uint64set_timing_test.go @@ -13,6 +13,7 @@ func BenchmarkAddMulti(b *testing.B) { start := uint64(time.Now().UnixNano()) sa := createRangeSet(start, itemsCount) a := sa.AppendTo(nil) + Release(sa) b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) { benchmarkAddMulti(b, a) }) @@ -24,6 +25,7 @@ func BenchmarkAdd(b *testing.B) { start := uint64(time.Now().UnixNano()) sa := createRangeSet(start, itemsCount) a := sa.AppendTo(nil) + Release(sa) b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) { benchmarkAdd(b, a) }) @@ -38,6 +40,8 @@ func BenchmarkUnionNoOverlap(b *testing.B) { b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) { benchmarkUnion(b, sa, sb) }) + Release(sa) + Release(sb) } } @@ -49,6 +53,8 @@ func BenchmarkUnionPartialOverlap(b *testing.B) { b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) { benchmarkUnion(b, sa, sb) }) + Release(sa) + Release(sb) } } @@ -60,6 +66,8 @@ func BenchmarkUnionFullOverlap(b *testing.B) { b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) { benchmarkUnion(b, sa, sb) }) + Release(sa) + Release(sb) } } @@ -72,6 +80,7 @@ func benchmarkAdd(b *testing.B, a []uint64) { for _, x := range a { s.Add(x) } + Release(&s) } }) } @@ -91,6 +100,7 @@ func benchmarkAddMulti(b *testing.B, a []uint64) { s.AddMulti(a[n:m]) n = m } + Release(&s) } }) } @@ -104,6 +114,8 @@ func benchmarkUnion(b *testing.B, sa, sb *Set) { sbCopy := sb.Clone() saCopy.Union(sb) sbCopy.Union(sa) + Release(saCopy) + Release(sbCopy) } }) } @@ -150,6 +162,8 @@ func benchmarkIntersect(b *testing.B, sa, sb *Set) { sbCopy := sb.Clone() saCopy.Intersect(sb) sbCopy.Intersect(sa) + Release(saCopy) + Release(sbCopy) } }) } @@ -379,6 +393,7 @@ func BenchmarkSetHasHit(b *testing.B) { } } }) + Release(&s) }) } } @@ -440,6 +455,7 @@ func BenchmarkSetHasMiss(b *testing.B) { } } }) + Release(&s) }) } }