lib/storage: add inmemory inverted index for the last hour

It should improve performance for `last N hours` dashboards with update intervals smaller than 1 hour.
This commit is contained in:
Aliaksandr Valialkin 2019-11-08 13:16:40 +02:00
parent 1e46961d68
commit d888b21657
8 changed files with 662 additions and 49 deletions

View file

@ -404,6 +404,22 @@ func registerStorageMetrics() {
return float64(idbm().ItemsCount)
})
metrics.NewGauge(`vm_recent_hour_inverted_index_entries`, func() float64 {
return float64(m().RecentHourInvertedIndexSize)
})
metrics.NewGauge(`vm_recent_hour_inverted_index_unique_tag_pairs`, func() float64 {
return float64(m().RecentHourInvertedIndexUniqueTagPairsSize)
})
metrics.NewGauge(`vm_recent_hour_inverted_index_pending_metric_ids`, func() float64 {
return float64(m().RecentHourInvertedIndexPendingMetricIDsSize)
})
metrics.NewGauge(`vm_recent_hour_inverted_index_search_calls_total`, func() float64 {
return float64(idbm().RecentHourInvertedIndexSearchCalls)
})
metrics.NewGauge(`vm_recent_hour_inverted_index_search_hits_total`, func() float64 {
return float64(idbm().RecentHourInvertedIndexSearchHits)
})
metrics.NewGauge(`vm_cache_entries{type="storage/tsid"}`, func() float64 {
return float64(m().TSIDCacheSize)
})

View file

@ -89,6 +89,12 @@ type indexDB struct {
// The number of successful searches for metric ids by days.
dateMetricIDsSearchHits uint64
// The number of calls for recent hour serches over inverted index.
recentHourInvertedIndexSearchCalls uint64
// The number of hits for recent hour searches over inverted index.
recentHourInvertedIndexSearchHits uint64
mustDrop uint64
name string
@ -201,6 +207,9 @@ type IndexDBMetrics struct {
DateMetricIDsSearchCalls uint64
DateMetricIDsSearchHits uint64
RecentHourInvertedIndexSearchCalls uint64
RecentHourInvertedIndexSearchHits uint64
IndexBlocksWithMetricIDsProcessed uint64
IndexBlocksWithMetricIDsIncorrectOrder uint64
@ -226,7 +235,7 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
db.uselessTagFiltersCache.UpdateStats(&cs)
m.UselessTagFiltersCacheSize += cs.EntriesCount
m.UselessTagFiltersCacheSizeBytes += cs.BytesSize
m.UselessTagFiltersCacheRequests += cs.GetBigCalls
m.UselessTagFiltersCacheRequests += cs.GetCalls
m.UselessTagFiltersCacheMisses += cs.Misses
m.DeletedMetricsCount += uint64(db.getDeletedMetricIDs().Len())
@ -238,6 +247,9 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
m.DateMetricIDsSearchCalls += atomic.LoadUint64(&db.dateMetricIDsSearchCalls)
m.DateMetricIDsSearchHits += atomic.LoadUint64(&db.dateMetricIDsSearchHits)
m.RecentHourInvertedIndexSearchCalls += atomic.LoadUint64(&db.recentHourInvertedIndexSearchCalls)
m.RecentHourInvertedIndexSearchHits += atomic.LoadUint64(&db.recentHourInvertedIndexSearchHits)
m.IndexBlocksWithMetricIDsProcessed = atomic.LoadUint64(&indexBlocksWithMetricIDsProcessed)
m.IndexBlocksWithMetricIDsIncorrectOrder = atomic.LoadUint64(&indexBlocksWithMetricIDsIncorrectOrder)
@ -395,7 +407,7 @@ func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versione
if versioned {
prefix = atomic.LoadUint64(&tagFiltersKeyGen)
}
const cacheGranularityMs = 1000 * 60 * 5
const cacheGranularityMs = 1000 * 10
startTime := (uint64(tr.MinTimestamp) / cacheGranularityMs) * cacheGranularityMs
endTime := (uint64(tr.MaxTimestamp) / cacheGranularityMs) * cacheGranularityMs
dst = encoding.MarshalUint64(dst, prefix)
@ -548,6 +560,7 @@ func (db *indexDB) createTSIDByName(dst *TSID, metricName []byte) error {
if err := db.generateTSID(dst, metricName, mn); err != nil {
return fmt.Errorf("cannot generate TSID: %s", err)
}
db.putMetricNameToCache(dst.MetricID, metricName)
if err := db.createIndexes(dst, mn); err != nil {
return fmt.Errorf("cannot create indexes: %s", err)
}
@ -1572,6 +1585,13 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf
return bytes.Compare(a.prefix, b.prefix) < 0
})
atomic.AddUint64(&is.db.recentHourInvertedIndexSearchCalls, 1)
if is.tryUpdatingMetricIDsForLastHourTimeRange(metricIDs, tfs, tr) {
// Fast path: found metricIDs in the inmemoryInvertedIndex for the last hour.
atomic.AddUint64(&is.db.recentHourInvertedIndexSearchHits, 1)
return nil
}
minTf, minMetricIDs, err := is.getTagFilterWithMinMetricIDsCountOptimized(tfs, tr, maxMetrics)
if err != nil {
return err
@ -1943,6 +1963,31 @@ func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int)
return nil, false
}
func (is *indexSearch) tryUpdatingMetricIDsForLastHourTimeRange(metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange) bool {
minHour := uint64(tr.MinTimestamp) / msecPerHour
maxHour := uint64(tr.MaxTimestamp) / msecPerHour
hmCurr := is.db.currHourMetricIDs.Load().(*hourMetricIDs)
if maxHour == hmCurr.hour && minHour == maxHour && hmCurr.isFull {
// The tr fits the current hour.
hmCurr.iidx.UpdateMetricIDsForTagFilters(metricIDs, hmCurr.m, tfs)
return true
}
hmPrev := is.db.prevHourMetricIDs.Load().(*hourMetricIDs)
if maxHour == hmPrev.hour && minHour == maxHour && hmPrev.isFull {
// The tr fits the previous hour.
hmPrev.iidx.UpdateMetricIDsForTagFilters(metricIDs, hmPrev.m, tfs)
return true
}
if maxHour == hmCurr.hour && minHour == hmPrev.hour && hmCurr.isFull && hmPrev.isFull {
// The tr spans the previous and the current hours.
hmPrev.iidx.UpdateMetricIDsForTagFilters(metricIDs, hmPrev.m, tfs)
hmCurr.iidx.UpdateMetricIDsForTagFilters(metricIDs, hmCurr.m, tfs)
return true
}
return false
}
func (db *indexDB) storeDateMetricID(date, metricID uint64) error {
is := db.getIndexSearch()
ok, err := is.hasDateMetricID(date, metricID)

View file

@ -0,0 +1,197 @@
package storage
import (
"bytes"
"fmt"
"io"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
)
type inmemoryInvertedIndex struct {
mu sync.RWMutex
m map[string]*uint64set.Set
pendingMetricIDs []uint64
}
func (iidx *inmemoryInvertedIndex) GetUniqueTagPairsLen() int {
if iidx == nil {
return 0
}
iidx.mu.RLock()
n := len(iidx.m)
iidx.mu.RUnlock()
return n
}
func (iidx *inmemoryInvertedIndex) GetEntriesCount() int {
if iidx == nil {
return 0
}
n := 0
iidx.mu.RLock()
for _, v := range iidx.m {
n += v.Len()
}
iidx.mu.RUnlock()
return n
}
func (iidx *inmemoryInvertedIndex) GetPendingMetricIDsLen() int {
if iidx == nil {
return 0
}
iidx.mu.RLock()
n := len(iidx.pendingMetricIDs)
iidx.mu.RUnlock()
return n
}
func newInmemoryInvertedIndex() *inmemoryInvertedIndex {
return &inmemoryInvertedIndex{
m: make(map[string]*uint64set.Set),
}
}
func (iidx *inmemoryInvertedIndex) Clone() *inmemoryInvertedIndex {
if iidx == nil {
return newInmemoryInvertedIndex()
}
iidx.mu.RLock()
mCopy := make(map[string]*uint64set.Set, len(iidx.m))
for k, v := range iidx.m {
mCopy[k] = v.Clone()
}
pendingMetricIDs := append([]uint64{}, iidx.pendingMetricIDs...)
iidx.mu.RUnlock()
return &inmemoryInvertedIndex{
m: mCopy,
pendingMetricIDs: pendingMetricIDs,
}
}
func (iidx *inmemoryInvertedIndex) MustUpdate(idb *indexDB, src *uint64set.Set) {
metricIDs := src.AppendTo(nil)
iidx.mu.Lock()
iidx.pendingMetricIDs = append(iidx.pendingMetricIDs, metricIDs...)
if err := iidx.updateLocked(idb); err != nil {
logger.Panicf("FATAL: cannot update inmemoryInvertedIndex with pendingMetricIDs: %s", err)
}
iidx.mu.Unlock()
}
func (iidx *inmemoryInvertedIndex) AddMetricID(idb *indexDB, metricID uint64) {
iidx.mu.Lock()
iidx.pendingMetricIDs = append(iidx.pendingMetricIDs, metricID)
if err := iidx.updateLocked(idb); err != nil {
logger.Panicf("FATAL: cannot update inmemoryInvertedIndex with pendingMetricIDs: %s", err)
}
iidx.mu.Unlock()
}
func (iidx *inmemoryInvertedIndex) UpdateMetricIDsForTagFilters(metricIDs, allMetricIDs *uint64set.Set, tfs *TagFilters) {
if iidx == nil {
return
}
var result *uint64set.Set
var tfFirst *tagFilter
for i := range tfs.tfs {
if tfs.tfs[i].isNegative {
continue
}
tfFirst = &tfs.tfs[i]
}
iidx.mu.RLock()
defer iidx.mu.RUnlock()
if tfFirst == nil {
result = allMetricIDs.Clone()
} else {
result = iidx.getMetricIDsForTagFilterLocked(tfFirst, tfs.commonPrefix)
}
for i := range tfs.tfs {
tf := &tfs.tfs[i]
if tf == tfFirst {
continue
}
m := iidx.getMetricIDsForTagFilterLocked(tf, tfs.commonPrefix)
if tf.isNegative {
result.Subtract(m)
} else {
result.Intersect(m)
}
if result.Len() == 0 {
return
}
}
metricIDs.Union(result)
}
func (iidx *inmemoryInvertedIndex) getMetricIDsForTagFilterLocked(tf *tagFilter, commonPrefix []byte) *uint64set.Set {
if !bytes.HasPrefix(tf.prefix, commonPrefix) {
logger.Panicf("BUG: tf.prefix must start with commonPrefix=%q; got %q", commonPrefix, tf.prefix)
}
prefix := tf.prefix[len(commonPrefix):]
var m uint64set.Set
kb := kbPool.Get()
defer kbPool.Put(kb)
for k, v := range iidx.m {
if len(k) < len(prefix) || k[:len(prefix)] != string(prefix) {
continue
}
kb.B = append(kb.B[:0], k[len(prefix):]...)
ok, err := tf.matchSuffix(kb.B)
if err != nil {
logger.Panicf("BUG: unexpected error from matchSuffix(%q): %s", kb.B, err)
}
if !ok {
continue
}
m.Union(v)
}
return &m
}
func (iidx *inmemoryInvertedIndex) updateLocked(idb *indexDB) error {
metricIDs := iidx.pendingMetricIDs
iidx.pendingMetricIDs = iidx.pendingMetricIDs[:0]
kb := kbPool.Get()
defer kbPool.Put(kb)
var mn MetricName
for _, metricID := range metricIDs {
var err error
kb.B, err = idb.searchMetricName(kb.B[:0], metricID)
if err != nil {
if err == io.EOF {
iidx.pendingMetricIDs = append(iidx.pendingMetricIDs, metricID)
continue
}
return fmt.Errorf("cannot find metricName by metricID %d: %s", metricID, err)
}
if err = mn.Unmarshal(kb.B); err != nil {
return fmt.Errorf("cannot unmarshal metricName %q: %s", kb.B, err)
}
kb.B = marshalTagValue(kb.B[:0], nil)
kb.B = marshalTagValue(kb.B, mn.MetricGroup)
iidx.addMetricIDLocked(kb.B, metricID)
for i := range mn.Tags {
kb.B = mn.Tags[i].Marshal(kb.B[:0])
iidx.addMetricIDLocked(kb.B, metricID)
}
}
return nil
}
func (iidx *inmemoryInvertedIndex) addMetricIDLocked(key []byte, metricID uint64) {
v := iidx.m[string(key)]
if v == nil {
v = &uint64set.Set{}
iidx.m[string(key)] = v
}
v.Add(metricID)
}

View file

@ -96,7 +96,7 @@ func TestSearch(t *testing.T) {
{[]byte("instance"), []byte("8.8.8.8:1234")},
}
startTimestamp := timestampFromTime(time.Now())
startTimestamp -= startTimestamp % (1e3 * 3600 * 24)
startTimestamp -= startTimestamp % (1e3 * 60 * 30)
blockRowsCount := 0
for i := 0; i < rowsCount; i++ {
mn.MetricGroup = []byte(fmt.Sprintf("metric_%d", i%metricGroupsCount))

View file

@ -140,6 +140,11 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) {
idbCurr.SetExtDB(idbPrev)
s.idbCurr.Store(idbCurr)
// Initialize iidx. hmCurr shouldn't be used till now,
// so it should be safe initializing it inplace.
hmCurr.iidx = newInmemoryInvertedIndex()
hmCurr.iidx.MustUpdate(s.idb(), hmCurr.m)
// Load data
tablePath := path + "/data"
tb, err := openTable(tablePath, retentionMonths, s.getDeletedMetricIDs)
@ -313,6 +318,10 @@ type Metrics struct {
HourMetricIDCacheSize uint64
RecentHourInvertedIndexSize uint64
RecentHourInvertedIndexUniqueTagPairsSize uint64
RecentHourInvertedIndexPendingMetricIDsSize uint64
IndexDBMetrics IndexDBMetrics
TableMetrics TableMetrics
}
@ -373,6 +382,15 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
}
m.HourMetricIDCacheSize += uint64(hourMetricIDsLen)
m.RecentHourInvertedIndexSize += uint64(hmPrev.iidx.GetEntriesCount())
m.RecentHourInvertedIndexSize += uint64(hmCurr.iidx.GetEntriesCount())
m.RecentHourInvertedIndexUniqueTagPairsSize += uint64(hmPrev.iidx.GetUniqueTagPairsLen())
m.RecentHourInvertedIndexUniqueTagPairsSize += uint64(hmCurr.iidx.GetUniqueTagPairsLen())
m.RecentHourInvertedIndexPendingMetricIDsSize += uint64(hmPrev.iidx.GetPendingMetricIDsLen())
m.RecentHourInvertedIndexPendingMetricIDsSize += uint64(hmCurr.iidx.GetPendingMetricIDsLen())
s.idb().UpdateMetrics(&m.IndexDBMetrics)
s.tb.UpdateMetrics(&m.TableMetrics)
}
@ -412,6 +430,7 @@ func (s *Storage) currHourMetricIDsUpdater() {
for {
select {
case <-s.stop:
s.updateCurrHourMetricIDs()
return
case <-t.C:
s.updateCurrHourMetricIDs()
@ -486,7 +505,9 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
startTime := time.Now()
if !fs.IsPathExist(path) {
logger.Infof("nothing to load from %q", path)
return &hourMetricIDs{}
return &hourMetricIDs{
hour: hour,
}
}
src, err := ioutil.ReadFile(path)
if err != nil {
@ -495,21 +516,27 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
srcOrigLen := len(src)
if len(src) < 24 {
logger.Errorf("discarding %s, since it has broken header; got %d bytes; want %d bytes", path, len(src), 24)
return &hourMetricIDs{}
return &hourMetricIDs{
hour: hour,
}
}
isFull := encoding.UnmarshalUint64(src)
src = src[8:]
hourLoaded := encoding.UnmarshalUint64(src)
src = src[8:]
if hourLoaded != hour {
logger.Infof("discarding %s, since it is outdated", name)
return &hourMetricIDs{}
logger.Infof("discarding %s, since it contains outdated hour; got %d; want %d", name, hourLoaded, hour)
return &hourMetricIDs{
hour: hour,
}
}
hmLen := encoding.UnmarshalUint64(src)
src = src[8:]
if uint64(len(src)) != 8*hmLen {
logger.Errorf("discarding %s, since it has broken body; got %d bytes; want %d bytes", path, len(src), 8*hmLen)
return &hourMetricIDs{}
return &hourMetricIDs{
hour: hour,
}
}
m := &uint64set.Set{}
for i := uint64(0); i < hmLen; i++ {
@ -772,9 +799,6 @@ var (
)
func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]rawRow, error) {
// Return only the last error, since it has no sense in returning all errors.
var lastWarn error
var is *indexSearch
var mn *MetricName
var kb *bytesutil.ByteBuffer
@ -788,6 +812,8 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
rows = rows[:rowsLen+len(mrs)]
j := 0
minTimestamp, maxTimestamp := s.tb.getMinMaxTimestamps()
// Return only the last error, since it has no sense in returning all errors.
var lastWarn error
for i := range mrs {
mr := &mrs[i]
if math.IsNaN(mr.Value) {
@ -878,6 +904,7 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error
keyBuf := kb.B
a := (*[2]uint64)(unsafe.Pointer(&keyBuf[0]))
idb := s.idb()
hm := s.currHourMetricIDs.Load().(*hourMetricIDs)
for i := range rows {
r := &rows[i]
if r.Timestamp != prevTimestamp {
@ -886,7 +913,6 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error
prevTimestamp = r.Timestamp
}
metricID := r.TSID.MetricID
hm := s.currHourMetricIDs.Load().(*hourMetricIDs)
if hour == hm.hour {
// The r belongs to the current hour. Check for the current hour cache.
if hm.m.Has(metricID) {
@ -896,6 +922,7 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error
s.pendingHourMetricIDsLock.Lock()
s.pendingHourMetricIDs.Add(metricID)
s.pendingHourMetricIDsLock.Unlock()
hm.iidx.AddMetricID(idb, metricID)
}
// Slower path: check global cache for (date, metricID) entry.
@ -920,31 +947,31 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error
func (s *Storage) updateCurrHourMetricIDs() {
hm := s.currHourMetricIDs.Load().(*hourMetricIDs)
s.pendingHourMetricIDsLock.Lock()
newMetricIDsLen := s.pendingHourMetricIDs.Len()
newMetricIDs := s.pendingHourMetricIDs
s.pendingHourMetricIDs = &uint64set.Set{}
s.pendingHourMetricIDsLock.Unlock()
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
if newMetricIDsLen == 0 && hm.hour == hour {
if newMetricIDs.Len() == 0 && hm.hour == hour {
// Fast path: nothing to update.
return
}
// Slow path: hm.m must be updated with non-empty s.pendingHourMetricIDs.
var m *uint64set.Set
var iidx *inmemoryInvertedIndex
isFull := hm.isFull
if hm.hour == hour {
m = hm.m.Clone()
iidx = hm.iidx.Clone()
} else {
m = &uint64set.Set{}
iidx = newInmemoryInvertedIndex()
isFull = true
}
s.pendingHourMetricIDsLock.Lock()
newMetricIDs := s.pendingHourMetricIDs
s.pendingHourMetricIDs = &uint64set.Set{}
s.pendingHourMetricIDsLock.Unlock()
m.Union(newMetricIDs)
hmNew := &hourMetricIDs{
m: m,
iidx: iidx,
hour: hour,
isFull: isFull,
}
@ -956,6 +983,7 @@ func (s *Storage) updateCurrHourMetricIDs() {
type hourMetricIDs struct {
m *uint64set.Set
iidx *inmemoryInvertedIndex
hour uint64
isFull bool
}

View file

@ -26,6 +26,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hmOrig := &hourMetricIDs{
m: &uint64set.Set{},
iidx: newInmemoryInvertedIndex(),
hour: 123,
}
hmOrig.m.Add(12)
@ -61,6 +62,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hmOrig := &hourMetricIDs{
m: &uint64set.Set{},
iidx: newInmemoryInvertedIndex(),
hour: hour,
}
hmOrig.m.Add(12)
@ -105,6 +107,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hmOrig := &hourMetricIDs{
m: &uint64set.Set{},
iidx: newInmemoryInvertedIndex(),
hour: 123,
}
hmOrig.m.Add(12)
@ -119,8 +122,8 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
}
}
if !reflect.DeepEqual(hmCurr.m, pendingHourMetricIDs) {
t.Fatalf("unexpected hm.m; got %v; want %v", hmCurr.m, pendingHourMetricIDs)
if !hmCurr.m.Equal(pendingHourMetricIDs) {
t.Fatalf("unexpected hmCurr.m; got %v; want %v", hmCurr.m, pendingHourMetricIDs)
}
if !hmCurr.isFull {
t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, true)
@ -146,6 +149,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hmOrig := &hourMetricIDs{
m: &uint64set.Set{},
iidx: newInmemoryInvertedIndex(),
hour: hour,
}
hmOrig.m.Add(12)
@ -167,7 +171,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
for _, metricID := range origMetricIDs {
m.Add(metricID)
}
if !reflect.DeepEqual(hmCurr.m, m) {
if !hmCurr.m.Equal(m) {
t.Fatalf("unexpected hm.m; got %v; want %v", hmCurr.m, m)
}
if hmCurr.isFull {

View file

@ -12,8 +12,10 @@ import (
//
// It is unsafe calling Set methods from concurrent goroutines.
type Set struct {
itemsCount int
buckets bucket32Sorter
skipSmallPool bool
itemsCount int
buckets bucket32Sorter
smallPool [5]uint64
}
type bucket32Sorter []*bucket32
@ -35,8 +37,10 @@ func (s *Set) Clone() *Set {
return &Set{}
}
var dst Set
dst.skipSmallPool = s.skipSmallPool
dst.itemsCount = s.itemsCount
dst.buckets = make([]*bucket32, len(s.buckets))
dst.smallPool = s.smallPool
for i, b32 := range s.buckets {
dst.buckets[i] = b32.clone()
}
@ -53,6 +57,10 @@ func (s *Set) Len() int {
// Add adds x to s.
func (s *Set) Add(x uint64) {
if !s.skipSmallPool {
s.addToSmallPool(x)
return
}
hi := uint32(x >> 32)
lo := uint32(x)
for _, b32 := range s.buckets {
@ -66,6 +74,23 @@ func (s *Set) Add(x uint64) {
s.addAlloc(hi, lo)
}
func (s *Set) addToSmallPool(x uint64) {
if s.hasInSmallPool(x) {
return
}
if s.itemsCount < len(s.smallPool) {
s.smallPool[s.itemsCount] = x
s.itemsCount++
return
}
s.skipSmallPool = true
s.itemsCount = 0
for _, v := range s.smallPool[:] {
s.Add(v)
}
s.Add(x)
}
func (s *Set) addAlloc(hi, lo uint32) {
var b32 bucket32
b32.hi = hi
@ -76,11 +101,14 @@ func (s *Set) addAlloc(hi, lo uint32) {
// Has verifies whether x exists in s.
func (s *Set) Has(x uint64) bool {
hi := uint32(x >> 32)
lo := uint32(x)
if s == nil {
return false
}
if !s.skipSmallPool {
return s.hasInSmallPool(x)
}
hi := uint32(x >> 32)
lo := uint32(x)
for _, b32 := range s.buckets {
if b32.hi == hi {
return b32.has(lo)
@ -89,8 +117,21 @@ func (s *Set) Has(x uint64) bool {
return false
}
func (s *Set) hasInSmallPool(x uint64) bool {
for _, v := range s.smallPool[:s.itemsCount] {
if v == x {
return true
}
}
return false
}
// Del deletes x from s.
func (s *Set) Del(x uint64) {
if !s.skipSmallPool {
s.delFromSmallPool(x)
return
}
hi := uint32(x >> 32)
lo := uint32(x)
for _, b32 := range s.buckets {
@ -103,13 +144,37 @@ func (s *Set) Del(x uint64) {
}
}
func (s *Set) delFromSmallPool(x uint64) {
idx := -1
for i, v := range s.smallPool[:s.itemsCount] {
if v == x {
idx = i
}
}
if idx < 0 {
return
}
copy(s.smallPool[idx:], s.smallPool[idx+1:])
s.itemsCount--
}
// AppendTo appends all the items from the set to dst and returns the result.
//
// The returned items are sorted.
//
// AppendTo can mutate s.
func (s *Set) AppendTo(dst []uint64) []uint64 {
if s == nil {
return dst
}
if !s.skipSmallPool {
a := s.smallPool[:s.itemsCount]
if len(a) > 1 {
sort.Slice(a, func(i, j int) bool { return a[i] < a[j] })
}
return append(dst, a...)
}
// pre-allocate memory for dst
dstLen := len(dst)
if n := s.Len() - cap(dst) + dstLen; n > 0 {
@ -128,23 +193,82 @@ func (s *Set) AppendTo(dst []uint64) []uint64 {
// Union adds all the items from a to s.
func (s *Set) Union(a *Set) {
// Clone a, since AppendTo may mutate it below.
aCopy := a.Clone()
if s.Len() == 0 {
// Fast path if the initial set is empty.
*s = *aCopy
return
}
// TODO: optimize it
for _, x := range a.AppendTo(nil) {
for _, x := range aCopy.AppendTo(nil) {
s.Add(x)
}
}
// Intersect removes all the items missing in a from s.
func (s *Set) Intersect(a *Set) {
if a.Len() == 0 {
// Fast path
*s = Set{}
return
}
// TODO: optimize it
for _, x := range s.AppendTo(nil) {
if !a.Has(x) {
s.Del(x)
}
}
}
// Subtract removes from s all the shared items between s and a.
func (s *Set) Subtract(a *Set) {
if s.Len() == 0 {
return
}
// Copy a because AppendTo below can mutate a.
aCopy := a.Clone()
// TODO: optimize it
for _, x := range aCopy.AppendTo(nil) {
if s.Has(x) {
s.Del(x)
}
}
}
// Equal returns true if s contains the same items as a.
func (s *Set) Equal(a *Set) bool {
if s.Len() != a.Len() {
return false
}
// Copy a because AppendTo below can mutate a
aCopy := a.Clone()
// TODO: optimize it
for _, x := range aCopy.AppendTo(nil) {
if !s.Has(x) {
return false
}
}
return true
}
type bucket32 struct {
hi uint32
b16his []uint16
buckets []*bucket16
skipSmallPool bool
smallPoolLen int
hi uint32
b16his []uint16
buckets []*bucket16
smallPool [14]uint32
}
func (b *bucket32) clone() *bucket32 {
var dst bucket32
dst.skipSmallPool = b.skipSmallPool
dst.smallPoolLen = b.smallPoolLen
dst.hi = b.hi
dst.b16his = append(dst.b16his[:0], b.b16his...)
dst.buckets = make([]*bucket16, len(b.buckets))
dst.smallPool = b.smallPool
for i, b16 := range b.buckets {
dst.buckets[i] = b16.clone()
}
@ -164,6 +288,9 @@ func (b *bucket32) Swap(i, j int) {
const maxUnsortedBuckets = 32
func (b *bucket32) add(x uint32) bool {
if !b.skipSmallPool {
return b.addToSmallPool(x)
}
hi := uint16(x >> 16)
lo := uint16(x)
if len(b.buckets) > maxUnsortedBuckets {
@ -178,6 +305,23 @@ func (b *bucket32) add(x uint32) bool {
return true
}
func (b *bucket32) addToSmallPool(x uint32) bool {
if b.hasInSmallPool(x) {
return false
}
if b.smallPoolLen < len(b.smallPool) {
b.smallPool[b.smallPoolLen] = x
b.smallPoolLen++
return true
}
b.skipSmallPool = true
b.smallPoolLen = 0
for _, v := range b.smallPool[:] {
b.add(v)
}
return b.add(x)
}
func (b *bucket32) addAllocSmall(hi, lo uint16) {
var b16 bucket16
_ = b16.add(lo)
@ -199,6 +343,7 @@ func (b *bucket32) addSlow(hi, lo uint16) bool {
func (b *bucket32) addAllocBig(hi, lo uint16, n int) {
if n < 0 {
// This is a hint to Go compiler to remove automatic bounds checks below.
return
}
var b16 bucket16
@ -215,6 +360,9 @@ func (b *bucket32) addAllocBig(hi, lo uint16, n int) {
}
func (b *bucket32) has(x uint32) bool {
if !b.skipSmallPool {
return b.hasInSmallPool(x)
}
hi := uint16(x >> 16)
lo := uint16(x)
if len(b.buckets) > maxUnsortedBuckets {
@ -228,6 +376,15 @@ func (b *bucket32) has(x uint32) bool {
return false
}
func (b *bucket32) hasInSmallPool(x uint32) bool {
for _, v := range b.smallPool[:b.smallPoolLen] {
if v == x {
return true
}
}
return false
}
func (b *bucket32) hasSlow(hi, lo uint16) bool {
n := binarySearch16(b.b16his, hi)
if n < 0 || n >= len(b.b16his) || b.b16his[n] != hi {
@ -237,6 +394,9 @@ func (b *bucket32) hasSlow(hi, lo uint16) bool {
}
func (b *bucket32) del(x uint32) bool {
if !b.skipSmallPool {
return b.delFromSmallPool(x)
}
hi := uint16(x >> 16)
lo := uint16(x)
if len(b.buckets) > maxUnsortedBuckets {
@ -250,6 +410,21 @@ func (b *bucket32) del(x uint32) bool {
return false
}
func (b *bucket32) delFromSmallPool(x uint32) bool {
idx := -1
for i, v := range b.smallPool[:b.smallPoolLen] {
if v == x {
idx = i
}
}
if idx < 0 {
return false
}
copy(b.smallPool[idx:], b.smallPool[idx+1:])
b.smallPoolLen--
return true
}
func (b *bucket32) delSlow(hi, lo uint16) bool {
n := binarySearch16(b.b16his, hi)
if n < 0 || n >= len(b.b16his) || b.b16his[n] != hi {
@ -259,6 +434,18 @@ func (b *bucket32) delSlow(hi, lo uint16) bool {
}
func (b *bucket32) appendTo(dst []uint64) []uint64 {
if !b.skipSmallPool {
a := b.smallPool[:b.smallPoolLen]
if len(a) > 1 {
sort.Slice(a, func(i, j int) bool { return a[i] < a[j] })
}
hi := uint64(b.hi) << 32
for _, lo32 := range a {
v := hi | uint64(lo32)
dst = append(dst, v)
}
return dst
}
if len(b.buckets) <= maxUnsortedBuckets && !sort.IsSorted(b) {
sort.Sort(b)
}

View file

@ -3,13 +3,14 @@ package uint64set
import (
"fmt"
"math/rand"
"reflect"
"sort"
"testing"
"time"
)
func TestSetBasicOps(t *testing.T) {
for _, itemsCount := range []int{1e2, 1e3, 1e4, 1e5, 1e6, maxUnsortedBuckets * bitsPerBucket * 2} {
for _, itemsCount := range []int{1, 2, 3, 4, 5, 6, 1e2, 1e3, 1e4, 1e5, 1e6, maxUnsortedBuckets * bitsPerBucket * 2} {
t.Run(fmt.Sprintf("items_%d", itemsCount), func(t *testing.T) {
testSetBasicOps(t, itemsCount)
})
@ -21,7 +22,51 @@ func testSetBasicOps(t *testing.T, itemsCount int) {
offset := uint64(time.Now().UnixNano())
// Verify operations on nil set
{
var sNil *Set
if sNil.Has(123) {
t.Fatalf("sNil shouldn't contain any item; found 123")
}
if n := sNil.Len(); n != 0 {
t.Fatalf("unexpected sNil.Len(); got %d; want 0", n)
}
result := sNil.AppendTo(nil)
if result != nil {
t.Fatalf("sNil.AppendTo(nil) must return nil")
}
buf := []uint64{1, 2, 3}
result = sNil.AppendTo(buf)
if !reflect.DeepEqual(result, buf) {
t.Fatalf("sNil.AppendTo(buf) must return buf")
}
sCopy := sNil.Clone()
if n := sCopy.Len(); n != 0 {
t.Fatalf("unexpected sCopy.Len() from nil set; got %d; want 0", n)
}
sCopy.Add(123)
if n := sCopy.Len(); n != 1 {
t.Fatalf("unexpected sCopy.Len() after adding an item; got %d; want 1", n)
}
sCopy.Add(123)
if n := sCopy.Len(); n != 1 {
t.Fatalf("unexpected sCopy.Len() after adding an item twice; got %d; want 1", n)
}
if !sCopy.Has(123) {
t.Fatalf("sCopy must contain 123")
}
sCopy.Del(123)
if n := sCopy.Len(); n != 0 {
t.Fatalf("unexpected sCopy.Len() after deleting the item; got %d; want 0", n)
}
sCopy.Del(123)
if n := sCopy.Len(); n != 0 {
t.Fatalf("unexpected sCopy.Len() after double deleting the item; got %d; want 0", n)
}
}
// Verify forward Add
itemsCount = (itemsCount / 2) * 2
for i := 0; i < itemsCount/2; i++ {
s.Add(uint64(i) + offset)
}
@ -59,7 +104,7 @@ func testSetBasicOps(t *testing.T, itemsCount int) {
}
}
// Verify Clone
// Verify Clone and Equal
sCopy := s.Clone()
if n := sCopy.Len(); n != itemsCount {
t.Fatalf("unexpected sCopy.Len(); got %d; want %d", n, itemsCount)
@ -69,6 +114,33 @@ func testSetBasicOps(t *testing.T, itemsCount int) {
t.Fatalf("missing bit %d on sCopy", uint64(i)+offset)
}
}
if !sCopy.Equal(&s) {
t.Fatalf("s must equal to sCopy")
}
if !s.Equal(sCopy) {
t.Fatalf("sCopy must equal to s")
}
if s.Len() > 0 {
var sEmpty Set
if s.Equal(&sEmpty) {
t.Fatalf("s mustn't equal to sEmpty")
}
sNew := s.Clone()
sNew.Del(offset)
if sNew.Equal(&s) {
t.Fatalf("sNew mustn't equal to s")
}
if s.Equal(sNew) {
t.Fatalf("s mustn't equal to sNew")
}
sNew.Add(offset - 123)
if sNew.Equal(&s) {
t.Fatalf("sNew mustn't equal to s")
}
if s.Equal(sNew) {
t.Fatalf("s mustn't equal to sNew")
}
}
// Verify AppendTo
a := s.AppendTo(nil)
@ -89,31 +161,95 @@ func testSetBasicOps(t *testing.T, itemsCount int) {
}
// Verify union
const unionOffset = 12345
var s1, s2 Set
for i := 0; i < itemsCount; i++ {
s1.Add(uint64(i) + offset)
s2.Add(uint64(i) + offset + unionOffset)
{
const unionOffset = 12345
var s1, s2 Set
for i := 0; i < itemsCount; i++ {
s1.Add(uint64(i) + offset)
s2.Add(uint64(i) + offset + unionOffset)
}
s1.Union(&s2)
expectedLen := 2 * itemsCount
if itemsCount > unionOffset {
expectedLen = itemsCount + unionOffset
}
if n := s1.Len(); n != expectedLen {
t.Fatalf("unexpected s1.Len() after union; got %d; want %d", n, expectedLen)
}
// Verify union on empty set.
var s3 Set
s3.Union(&s1)
expectedLen = s1.Len()
if n := s3.Len(); n != expectedLen {
t.Fatalf("unexpected s3.Len() after union with empty set; got %d; want %d", n, expectedLen)
}
}
s1.Union(&s2)
expectedLen := 2 * itemsCount
if itemsCount > unionOffset {
expectedLen = itemsCount + unionOffset
// Verify intersect
{
const intersectOffset = 12345
var s1, s2 Set
for i := 0; i < itemsCount; i++ {
s1.Add(uint64(i) + offset)
s2.Add(uint64(i) + offset + intersectOffset)
}
s1.Intersect(&s2)
expectedLen := 0
if itemsCount > intersectOffset {
expectedLen = itemsCount - intersectOffset
}
if n := s1.Len(); n != expectedLen {
t.Fatalf("unexpected s1.Len() after intersect; got %d; want %d", n, expectedLen)
}
// Verify intersect on empty set.
var s3 Set
s2.Intersect(&s3)
expectedLen = 0
if n := s2.Len(); n != 0 {
t.Fatalf("unexpected s3.Len() after intersect with empty set; got %d; want %d", n, expectedLen)
}
}
if n := s1.Len(); n != expectedLen {
t.Fatalf("unexpected s1.Len() after union; got %d; want %d", n, expectedLen)
// Verify subtract
{
const subtractOffset = 12345
var s1, s2 Set
for i := 0; i < itemsCount; i++ {
s1.Add(uint64(i) + offset)
s2.Add(uint64(i) + offset + subtractOffset)
}
s1.Subtract(&s2)
expectedLen := itemsCount
if itemsCount > subtractOffset {
expectedLen = subtractOffset
}
if n := s1.Len(); n != expectedLen {
t.Fatalf("unexpected s1.Len() after subtract; got %d; want %d", n, expectedLen)
}
// Verify subtract from empty set.
var s3 Set
s3.Subtract(&s2)
expectedLen = 0
if n := s3.Len(); n != 0 {
t.Fatalf("unexpected s3.Len() after subtract from empty set; got %d; want %d", n, expectedLen)
}
}
// Verify Del
itemsDeleted := 0
for i := itemsCount / 2; i < itemsCount-itemsCount/4; i++ {
s.Del(uint64(i) + offset)
itemsDeleted++
}
if n := s.Len(); n != itemsCount-itemsCount/4 {
t.Fatalf("unexpected s.Len() after Del; got %d; want %d", n, itemsCount-itemsCount/4)
if n := s.Len(); n != itemsCount-itemsDeleted {
t.Fatalf("unexpected s.Len() after Del; got %d; want %d", n, itemsCount-itemsDeleted)
}
a = s.AppendTo(a[:0])
if len(a) != itemsCount-itemsCount/4 {
t.Fatalf("unexpected len of exported array; got %d; want %d", len(a), itemsCount-itemsCount/4)
if len(a) != itemsCount-itemsDeleted {
t.Fatalf("unexpected len of exported array; got %d; want %d", len(a), itemsCount-itemsDeleted)
}
m = make(map[uint64]bool)
for _, x := range a {
@ -137,8 +273,8 @@ func testSetBasicOps(t *testing.T, itemsCount int) {
s.Del(uint64(i) + offset)
s.Del(uint64(i) + offset + uint64(itemsCount))
}
if n := s.Len(); n != itemsCount-itemsCount/4 {
t.Fatalf("unexpected s.Len() after Del for non-existing items; got %d; want %d", n, itemsCount-itemsCount/4)
if n := s.Len(); n != itemsCount-itemsDeleted {
t.Fatalf("unexpected s.Len() after Del for non-existing items; got %d; want %d", n, itemsCount-itemsDeleted)
}
// Verify sCopy has the original data