lib/uint64set: reduce memory usage in Union, Intersect and Subtract methods

Iterate items with newly added Set.ForEach method instead of allocating `[]uint64`
slice for all the items before the iteration.
This commit is contained in:
Aliaksandr Valialkin 2020-01-15 12:12:46 +02:00
parent 7483deccca
commit 605d588ba6
4 changed files with 129 additions and 34 deletions

View file

@ -553,9 +553,12 @@ func (s *Storage) mustSaveHourMetricIDs(hm *hourMetricIDs, name string) {
// Marshal hm.m // Marshal hm.m
dst = encoding.MarshalUint64(dst, uint64(hm.m.Len())) dst = encoding.MarshalUint64(dst, uint64(hm.m.Len()))
for _, metricID := range hm.m.AppendTo(nil) { hm.m.ForEach(func(part []uint64) bool {
dst = encoding.MarshalUint64(dst, metricID) for _, metricID := range part {
} dst = encoding.MarshalUint64(dst, metricID)
}
return true
})
if err := ioutil.WriteFile(path, dst, 0644); err != nil { if err := ioutil.WriteFile(path, dst, 0644); err != nil {
logger.Panicf("FATAL: cannot write %d bytes to %q: %s", len(dst), path, err) logger.Panicf("FATAL: cannot write %d bytes to %q: %s", len(dst), path, err)

View file

@ -245,10 +245,12 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
return return
} }
m := pendingHourEntries.Clone() m := pendingHourEntries.Clone()
origMetricIDs := hmOrig.m.AppendTo(nil) hmOrig.m.ForEach(func(part []uint64) bool {
for _, metricID := range origMetricIDs { for _, metricID := range part {
m.Add(metricID) m.Add(metricID)
} }
return true
})
if !hmCurr.m.Equal(m) { if !hmCurr.m.Equal(m) {
t.Fatalf("unexpected hm.m; got %v; want %v", hmCurr.m, m) t.Fatalf("unexpected hm.m; got %v; want %v", hmCurr.m, m)
} }

View file

@ -3,6 +3,7 @@ package uint64set
import ( import (
"math/bits" "math/bits"
"sort" "sort"
"sync"
"unsafe" "unsafe"
) )
@ -155,45 +156,53 @@ func (s *Set) AppendTo(dst []uint64) []uint64 {
// Union adds all the items from a to s. // Union adds all the items from a to s.
func (s *Set) Union(a *Set) { func (s *Set) Union(a *Set) {
// Clone a, since AppendTo may mutate it below.
aCopy := a.Clone()
if s.Len() == 0 { if s.Len() == 0 {
// Fast path if the initial set is empty. // Fast path - just copy a.
aCopy := a.Clone()
*s = *aCopy *s = *aCopy
return return
} }
// TODO: optimize it if a.Len() == 0 {
for _, x := range aCopy.AppendTo(nil) { // Fast path - nothing to union.
s.Add(x) return
} }
a.ForEach(func(part []uint64) bool {
for _, x := range part {
s.Add(x)
}
return true
})
} }
// Intersect removes all the items missing in a from s. // Intersect removes all the items missing in a from s.
func (s *Set) Intersect(a *Set) { func (s *Set) Intersect(a *Set) {
if a.Len() == 0 { if s.Len() == 0 || a.Len() == 0 {
// Fast path // Fast path - the result is empty.
*s = Set{} *s = Set{}
return return
} }
// TODO: optimize it s.ForEach(func(part []uint64) bool {
for _, x := range s.AppendTo(nil) { for _, x := range part {
if !a.Has(x) { if !a.Has(x) {
s.Del(x) s.Del(x)
}
} }
} return true
})
} }
// Subtract removes from s all the shared items between s and a. // Subtract removes from s all the shared items between s and a.
func (s *Set) Subtract(a *Set) { func (s *Set) Subtract(a *Set) {
if s.Len() == 0 { if s.Len() == 0 || a.Len() == 0 {
// Fast path - nothing to subtract.
return return
} }
// Copy a because AppendTo below can mutate a. a.ForEach(func(part []uint64) bool {
aCopy := a.Clone() for _, x := range part {
// TODO: optimize it s.Del(x)
for _, x := range aCopy.AppendTo(nil) { }
s.Del(x) return true
} })
} }
// Equal returns true if s contains the same items as a. // Equal returns true if s contains the same items as a.
@ -201,15 +210,32 @@ func (s *Set) Equal(a *Set) bool {
if s.Len() != a.Len() { if s.Len() != a.Len() {
return false return false
} }
// Copy a because AppendTo below can mutate a equal := true
aCopy := a.Clone() a.ForEach(func(part []uint64) bool {
// TODO: optimize it for _, x := range part {
for _, x := range aCopy.AppendTo(nil) { if !s.Has(x) {
if !s.Has(x) { equal = false
return false return false
}
}
return true
})
return equal
}
// ForEach calls f for all the items stored in s.
//
// Each call to f contains part with arbitrary part of items stored in the set.
// The iteration is stopped if f returns false.
func (s *Set) ForEach(f func(part []uint64) bool) {
if s == nil {
return
}
for i := range s.buckets {
if !s.buckets[i].forEach(f) {
return
} }
} }
return true
} }
type bucket32 struct { type bucket32 struct {
@ -218,6 +244,28 @@ type bucket32 struct {
buckets []bucket16 buckets []bucket16
} }
func (b *bucket32) forEach(f func(part []uint64) bool) bool {
xbuf := partBufPool.Get().(*[]uint64)
buf := *xbuf
for i := range b.buckets {
hi16 := b.b16his[i]
buf = b.buckets[i].appendTo(buf[:0], b.hi, hi16)
if !f(buf) {
return false
}
}
*xbuf = buf
partBufPool.Put(xbuf)
return true
}
var partBufPool = &sync.Pool{
New: func() interface{} {
buf := make([]uint64, 0, bitsPerBucket)
return &buf
},
}
func (b *bucket32) sizeBytes() uint64 { func (b *bucket32) sizeBytes() uint64 {
n := uint64(unsafe.Sizeof(*b)) n := uint64(unsafe.Sizeof(*b))
n += 2 * uint64(len(b.b16his)) n += 2 * uint64(len(b.b16his))

View file

@ -166,6 +166,48 @@ func testSetBasicOps(t *testing.T, itemsCount int) {
} }
} }
// Verify ForEach
{
var s Set
m := make(map[uint64]bool)
for i := 0; i < itemsCount; i++ {
v := uint64(i) + offset
s.Add(v)
m[v] = true
}
// Verify visiting all the items.
s.ForEach(func(part []uint64) bool {
for _, v := range part {
if !m[v] {
t.Fatalf("unexpected value v=%d passed to ForEach", v)
}
delete(m, v)
}
return true
})
if len(m) != 0 {
t.Fatalf("ForEach didn't visit %d items; items: %v", len(m), m)
}
// Verify fast stop
calls := 0
s.ForEach(func(part []uint64) bool {
calls++
return false
})
if itemsCount > 0 && calls != 1 {
t.Fatalf("Unexpected number of ForEach callback calls; got %d; want %d", calls, 1)
}
// Verify ForEach on nil set.
var s1 *Set
s1.ForEach(func(part []uint64) bool {
t.Fatalf("callback shouldn't be called on empty set")
return true
})
}
// Verify union // Verify union
{ {
const unionOffset = 12345 const unionOffset = 12345