From 9f027ec176a5efde34a3cc6c673e81c0dfd7f488 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 17 Jan 2020 16:11:46 +0200 Subject: [PATCH] lib/uint64set: optimize Intersect, Subtract and Union functions This should improve performance for queries over big number of time series. --- lib/uint64set/uint64set.go | 207 ++++++++++++++++++++++++++++++------- 1 file changed, 169 insertions(+), 38 deletions(-) diff --git a/lib/uint64set/uint64set.go b/lib/uint64set/uint64set.go index 75b418142d..27f019ed21 100644 --- a/lib/uint64set/uint64set.go +++ b/lib/uint64set/uint64set.go @@ -32,21 +32,26 @@ func (s *bucket32Sorter) Swap(i, j int) { // Clone returns an independent copy of s. func (s *Set) Clone() *Set { - if s == nil { + if s == nil || s.itemsCount == 0 { // Return an empty set, so data could be added into it later. return &Set{} } var dst Set dst.itemsCount = s.itemsCount - if len(s.buckets) > 0 { - dst.buckets = make([]bucket32, len(s.buckets)) - for i := range s.buckets { - s.buckets[i].copyTo(&dst.buckets[i]) - } + dst.buckets = make([]bucket32, len(s.buckets)) + for i := range s.buckets { + s.buckets[i].copyTo(&dst.buckets[i]) } return &dst } +func (s *Set) cloneShallow() *Set { + var dst Set + dst.itemsCount = s.itemsCount + dst.buckets = append(dst.buckets[:0], s.buckets...) + return &dst +} + // SizeBytes returns an estimate size of s in RAM. func (s *Set) SizeBytes() uint64 { if s == nil { @@ -144,16 +149,20 @@ func (s *Set) AppendTo(dst []uint64) []uint64 { dst = append(dst[:cap(dst)], make([]uint64, n)...) dst = dst[:dstLen] } - // sort s.buckets if it isn't sorted yet - if !sort.IsSorted(&s.buckets) { - sort.Sort(&s.buckets) - } + s.sort() for i := range s.buckets { dst = s.buckets[i].appendTo(dst) } return dst } +func (s *Set) sort() { + // sort s.buckets if it isn't sorted yet + if !sort.IsSorted(&s.buckets) { + sort.Sort(&s.buckets) + } +} + // Union adds all the items from a to s. func (s *Set) Union(a *Set) { if s.Len() == 0 { @@ -181,14 +190,38 @@ func (s *Set) Intersect(a *Set) { *s = Set{} return } - s.ForEach(func(part []uint64) bool { - for _, x := range part { - if !a.Has(x) { - s.Del(x) - } + // Make shallow copy of `a`, since it can be modified below. + a = a.cloneShallow() + a.sort() + s.sort() + itemsCount := 0 + i := 0 + j := 0 + for { + for i < len(s.buckets) && j <= len(a.buckets) && s.buckets[i].hi < a.buckets[j].hi { + s.buckets[i] = bucket32{} + i++ } - return true - }) + if i >= len(s.buckets) { + break + } + for j < len(a.buckets) && a.buckets[j].hi < s.buckets[i].hi { + j++ + } + if j >= len(a.buckets) { + for i < len(s.buckets) { + s.buckets[i] = bucket32{} + i++ + } + break + } + if s.buckets[i].hi == a.buckets[j].hi { + itemsCount += s.buckets[i].intersect(&a.buckets[j]) + i++ + j++ + } + } + s.itemsCount = itemsCount } // Subtract removes from s all the shared items between s and a. @@ -242,6 +275,53 @@ type bucket32 struct { hi uint32 b16his []uint16 buckets []bucket16 + + // hint may contain bucket index for the last successful add or del operation. + // This allows saving CPU time on subsequent calls to the same bucket. + hint int +} + +func (b *bucket32) cloneShallow() *bucket32 { + var dst bucket32 + dst.hi = b.hi + dst.b16his = append(dst.b16his[:0], b.b16his...) + dst.buckets = append(dst.buckets[:0], b.buckets...) + dst.hint = b.hint + return &dst +} + +func (b *bucket32) intersect(a *bucket32) int { + a = a.cloneShallow() // clone a, since is is sorted below. + a.sort() + b.sort() + itemsCount := 0 + i := 0 + j := 0 + for { + for i < len(b.b16his) && j < len(a.b16his) && b.b16his[i] < a.b16his[j] { + b.buckets[i] = bucket16{} + i++ + } + if i >= len(b.b16his) { + break + } + for j < len(a.b16his) && a.b16his[j] < b.b16his[i] { + j++ + } + if j >= len(a.b16his) { + for i < len(b.b16his) { + b.buckets[i] = bucket16{} + i++ + } + break + } + if b.b16his[i] == a.b16his[j] { + itemsCount += b.buckets[i].intersect(&a.buckets[j]) + i++ + j++ + } + } + return itemsCount } func (b *bucket32) forEach(f func(part []uint64) bool) bool { @@ -288,6 +368,7 @@ func (b *bucket32) copyTo(dst *bucket32) { b.buckets[i].copyTo(&dst.buckets[i]) } } + dst.hint = b.hint } // This is for sort.Interface @@ -305,11 +386,26 @@ const maxUnsortedBuckets = 32 func (b *bucket32) add(x uint32) bool { hi := uint16(x >> 16) lo := uint16(x) + if n := b.hint; n < len(b.b16his) && b.b16his[n] == hi { + // Fast path - add to the previously used bucket. + return n < len(b.buckets) && b.buckets[n].add(lo) + } + return b.addSlow(hi, lo) +} + +func (b *bucket32) addSlow(hi, lo uint16) bool { if len(b.buckets) > maxUnsortedBuckets { - return b.addSlow(hi, lo) + n := binarySearch16(b.b16his, hi) + b.hint = n + if n < 0 || n >= len(b.b16his) || b.b16his[n] != hi { + b.addAllocBig(hi, lo, n) + return true + } + return n < len(b.buckets) && b.buckets[n].add(lo) } for i, hi16 := range b.b16his { if hi16 == hi { + b.hint = i return i < len(b.buckets) && b.buckets[i].add(lo) } } @@ -331,15 +427,6 @@ func (b *bucket32) addBucket16() *bucket16 { return &b.buckets[len(b.buckets)-1] } -func (b *bucket32) addSlow(hi, lo uint16) bool { - n := binarySearch16(b.b16his, hi) - if n < 0 || n >= len(b.b16his) || b.b16his[n] != hi { - b.addAllocBig(hi, lo, n) - return true - } - return n < len(b.buckets) && b.buckets[n].add(lo) -} - func (b *bucket32) addAllocBig(hi, lo uint16, n int) { if n < 0 { // This is a hint to Go compiler to remove automatic bounds checks below. @@ -384,28 +471,34 @@ func (b *bucket32) hasSlow(hi, lo uint16) bool { func (b *bucket32) del(x uint32) bool { hi := uint16(x >> 16) lo := uint16(x) + if n := b.hint; n < len(b.b16his) && b.b16his[n] == hi { + // Fast path - use the bucket from the previous operation. + return n < len(b.buckets) && b.buckets[n].del(lo) + } + return b.delSlow(hi, lo) +} + +func (b *bucket32) delSlow(hi, lo uint16) bool { if len(b.buckets) > maxUnsortedBuckets { - return b.delSlow(hi, lo) + n := binarySearch16(b.b16his, hi) + b.hint = n + if n < 0 || n >= len(b.b16his) || b.b16his[n] != hi { + return false + } + return n < len(b.buckets) && b.buckets[n].del(lo) } for i, hi16 := range b.b16his { if hi16 == hi { + b.hint = i return i < len(b.buckets) && b.buckets[i].del(lo) } } return false } -func (b *bucket32) delSlow(hi, lo uint16) bool { - n := binarySearch16(b.b16his, hi) - if n < 0 || n >= len(b.b16his) || b.b16his[n] != hi { - return false - } - return n < len(b.buckets) && b.buckets[n].del(lo) -} - func (b *bucket32) appendTo(dst []uint64) []uint64 { - if len(b.buckets) <= maxUnsortedBuckets && !sort.IsSorted(b) { - sort.Sort(b) + if len(b.buckets) <= maxUnsortedBuckets { + b.sort() } for i := range b.buckets { hi16 := b.b16his[i] @@ -414,6 +507,12 @@ func (b *bucket32) appendTo(dst []uint64) []uint64 { return dst } +func (b *bucket32) sort() { + if !sort.IsSorted(b) { + sort.Sort(b) + } +} + const ( bitsPerBucket = 1 << 16 wordsPerBucket = bitsPerBucket / 64 @@ -425,6 +524,38 @@ type bucket16 struct { smallPool [56]uint16 } +func (b *bucket16) intersect(a *bucket16) int { + itemsCount := 0 + if a.bits != nil && b.bits != nil { + // Fast path - use bitwise ops + for i, ax := range a.bits { + bx := b.bits[i] + bx &= ax + if bx > 0 { + itemsCount += bits.OnesCount64(bx) + } + b.bits[i] = bx + } + return itemsCount + } + + // Slow path + xbuf := partBufPool.Get().(*[]uint64) + buf := *xbuf + buf = b.appendTo(buf[:0], 0, 0) + itemsCount = len(buf) + for _, x := range buf { + x16 := uint16(x) + if !a.has(x16) { + b.del(x16) + itemsCount-- + } + } + *xbuf = buf + partBufPool.Put(xbuf) + return itemsCount +} + func (b *bucket16) sizeBytes() uint64 { return uint64(unsafe.Sizeof(*b)) + uint64(unsafe.Sizeof(*b.bits)) }