lib/storage: add inmemory inverted index for the last hour

It should improve performance for `last N hours` dashboards with update intervals smaller than 1 hour.
This commit is contained in:
Aliaksandr Valialkin 2019-11-08 13:16:40 +02:00
parent 33abbec6b4
commit 0063c857f5
8 changed files with 709 additions and 83 deletions

View file

@ -349,6 +349,22 @@ func registerStorageMetrics(strg *storage.Storage) {
return float64(idbm().ItemsCount)
})
metrics.NewGauge(`vm_recent_hour_inverted_index_entries`, func() float64 {
return float64(m().RecentHourInvertedIndexSize)
})
metrics.NewGauge(`vm_recent_hour_inverted_index_unique_tag_pairs`, func() float64 {
return float64(m().RecentHourInvertedIndexUniqueTagPairsSize)
})
metrics.NewGauge(`vm_recent_hour_inverted_index_pending_metric_ids`, func() float64 {
return float64(m().RecentHourInvertedIndexPendingMetricIDsSize)
})
metrics.NewGauge(`vm_recent_hour_inverted_index_search_calls_total`, func() float64 {
return float64(idbm().RecentHourInvertedIndexSearchCalls)
})
metrics.NewGauge(`vm_recent_hour_inverted_index_search_hits_total`, func() float64 {
return float64(idbm().RecentHourInvertedIndexSearchHits)
})
metrics.NewGauge(`vm_cache_entries{type="storage/tsid"}`, func() float64 {
return float64(m().TSIDCacheSize)
})

View file

@ -89,6 +89,12 @@ type indexDB struct {
// The number of successful searches for metric ids by days.
dateMetricIDsSearchHits uint64
// The number of calls for recent hour serches over inverted index.
recentHourInvertedIndexSearchCalls uint64
// The number of hits for recent hour searches over inverted index.
recentHourInvertedIndexSearchHits uint64
mustDrop uint64
name string
@ -201,6 +207,9 @@ type IndexDBMetrics struct {
DateMetricIDsSearchCalls uint64
DateMetricIDsSearchHits uint64
RecentHourInvertedIndexSearchCalls uint64
RecentHourInvertedIndexSearchHits uint64
IndexBlocksWithMetricIDsProcessed uint64
IndexBlocksWithMetricIDsIncorrectOrder uint64
@ -226,7 +235,7 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
db.uselessTagFiltersCache.UpdateStats(&cs)
m.UselessTagFiltersCacheSize += cs.EntriesCount
m.UselessTagFiltersCacheSizeBytes += cs.BytesSize
m.UselessTagFiltersCacheRequests += cs.GetBigCalls
m.UselessTagFiltersCacheRequests += cs.GetCalls
m.UselessTagFiltersCacheMisses += cs.Misses
m.DeletedMetricsCount += uint64(db.getDeletedMetricIDs().Len())
@ -238,6 +247,9 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
m.DateMetricIDsSearchCalls += atomic.LoadUint64(&db.dateMetricIDsSearchCalls)
m.DateMetricIDsSearchHits += atomic.LoadUint64(&db.dateMetricIDsSearchHits)
m.RecentHourInvertedIndexSearchCalls += atomic.LoadUint64(&db.recentHourInvertedIndexSearchCalls)
m.RecentHourInvertedIndexSearchHits += atomic.LoadUint64(&db.recentHourInvertedIndexSearchHits)
m.IndexBlocksWithMetricIDsProcessed = atomic.LoadUint64(&indexBlocksWithMetricIDsProcessed)
m.IndexBlocksWithMetricIDsIncorrectOrder = atomic.LoadUint64(&indexBlocksWithMetricIDsIncorrectOrder)
@ -403,7 +415,7 @@ func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versione
if versioned {
prefix = atomic.LoadUint64(&tagFiltersKeyGen)
}
const cacheGranularityMs = 1000 * 60 * 5
const cacheGranularityMs = 1000 * 10
startTime := (uint64(tr.MinTimestamp) / cacheGranularityMs) * cacheGranularityMs
endTime := (uint64(tr.MaxTimestamp) / cacheGranularityMs) * cacheGranularityMs
dst = encoding.MarshalUint64(dst, prefix)
@ -561,6 +573,7 @@ func (db *indexDB) createTSIDByName(dst *TSID, metricName []byte) error {
if err := db.generateTSID(dst, metricName, mn); err != nil {
return fmt.Errorf("cannot generate TSID: %s", err)
}
db.putMetricNameToCache(dst.MetricID, metricName)
if err := db.createIndexes(dst, mn); err != nil {
return fmt.Errorf("cannot create indexes: %s", err)
}
@ -1589,6 +1602,13 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf
return bytes.Compare(a.prefix, b.prefix) < 0
})
atomic.AddUint64(&is.db.recentHourInvertedIndexSearchCalls, 1)
if is.tryUpdatingMetricIDsForLastHourTimeRange(metricIDs, tfs, tr) {
// Fast path: found metricIDs in the inmemoryInvertedIndex for the last hour.
atomic.AddUint64(&is.db.recentHourInvertedIndexSearchHits, 1)
return nil
}
minTf, minMetricIDs, err := is.getTagFilterWithMinMetricIDsCountOptimized(tfs, tr, maxMetrics)
if err != nil {
return err
@ -1929,17 +1949,18 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int, ac
func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int, accountID, projectID uint32) (*uint64set.Set, bool) {
// Return all the metricIDs for all the (AccountID, ProjectID) entries.
// The caller is responsible for proper filtering later.
k := accountProjectKey{
AccountID: accountID,
ProjectID: projectID,
}
minHour := uint64(tr.MinTimestamp) / msecPerHour
maxHour := uint64(tr.MaxTimestamp) / msecPerHour
hmCurr := is.db.currHourMetricIDs.Load().(*hourMetricIDs)
if maxHour == hmCurr.hour && minHour == maxHour && hmCurr.isFull {
// The tr fits the current hour.
// Return a copy of hmCurr.m, because the caller may modify
// the returned map.
k := accountProjectKey{
AccountID: accountID,
ProjectID: projectID,
}
m := hmCurr.byTenant[k]
if m.Len() > maxMetrics {
return nil, false
@ -1951,10 +1972,6 @@ func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int,
// The tr fits the previous hour.
// Return a copy of hmPrev.m, because the caller may modify
// the returned map.
k := accountProjectKey{
AccountID: accountID,
ProjectID: projectID,
}
m := hmPrev.byTenant[k]
if m.Len() > maxMetrics {
return nil, false
@ -1963,10 +1980,6 @@ func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int,
}
if maxHour == hmCurr.hour && minHour == hmPrev.hour && hmCurr.isFull && hmPrev.isFull {
// The tr spans the previous and the current hours.
k := accountProjectKey{
AccountID: accountID,
ProjectID: projectID,
}
mCurr := hmCurr.byTenant[k]
mPrev := hmPrev.byTenant[k]
if mCurr.Len()+mPrev.Len() > maxMetrics {
@ -1979,6 +1992,35 @@ func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int,
return nil, false
}
func (is *indexSearch) tryUpdatingMetricIDsForLastHourTimeRange(metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange) bool {
k := accountProjectKey{
AccountID: tfs.accountID,
ProjectID: tfs.projectID,
}
minHour := uint64(tr.MinTimestamp) / msecPerHour
maxHour := uint64(tr.MaxTimestamp) / msecPerHour
hmCurr := is.db.currHourMetricIDs.Load().(*hourMetricIDs)
if maxHour == hmCurr.hour && minHour == maxHour && hmCurr.isFull {
// The tr fits the current hour.
hmCurr.iidx.UpdateMetricIDsForTagFilters(metricIDs, hmCurr.byTenant[k], tfs)
return true
}
hmPrev := is.db.prevHourMetricIDs.Load().(*hourMetricIDs)
if maxHour == hmPrev.hour && minHour == maxHour && hmPrev.isFull {
// The tr fits the previous hour.
hmPrev.iidx.UpdateMetricIDsForTagFilters(metricIDs, hmPrev.byTenant[k], tfs)
return true
}
if maxHour == hmCurr.hour && minHour == hmPrev.hour && hmCurr.isFull && hmPrev.isFull {
// The tr spans the previous and the current hours.
hmPrev.iidx.UpdateMetricIDsForTagFilters(metricIDs, hmPrev.byTenant[k], tfs)
hmCurr.iidx.UpdateMetricIDsForTagFilters(metricIDs, hmCurr.byTenant[k], tfs)
return true
}
return false
}
func (db *indexDB) storeDateMetricID(date, metricID uint64, accountID, projectID uint32) error {
is := db.getIndexSearch()
ok, err := is.hasDateMetricID(date, metricID, accountID, projectID)

View file

@ -0,0 +1,211 @@
package storage
import (
"bytes"
"fmt"
"io"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
)
type inmemoryInvertedIndex struct {
mu sync.RWMutex
m map[string]*uint64set.Set
pendingEntries []pendingHourMetricIDEntry
}
func (iidx *inmemoryInvertedIndex) GetUniqueTagPairsLen() int {
if iidx == nil {
return 0
}
iidx.mu.RLock()
n := len(iidx.m)
iidx.mu.RUnlock()
return n
}
func (iidx *inmemoryInvertedIndex) GetEntriesCount() int {
if iidx == nil {
return 0
}
n := 0
iidx.mu.RLock()
for _, v := range iidx.m {
n += v.Len()
}
iidx.mu.RUnlock()
return n
}
func (iidx *inmemoryInvertedIndex) GetPendingMetricIDsLen() int {
if iidx == nil {
return 0
}
iidx.mu.RLock()
n := len(iidx.pendingEntries)
iidx.mu.RUnlock()
return n
}
func newInmemoryInvertedIndex() *inmemoryInvertedIndex {
return &inmemoryInvertedIndex{
m: make(map[string]*uint64set.Set),
}
}
func (iidx *inmemoryInvertedIndex) Clone() *inmemoryInvertedIndex {
if iidx == nil {
return newInmemoryInvertedIndex()
}
iidx.mu.RLock()
mCopy := make(map[string]*uint64set.Set, len(iidx.m))
for k, v := range iidx.m {
mCopy[k] = v.Clone()
}
pendingEntries := append([]pendingHourMetricIDEntry{}, iidx.pendingEntries...)
iidx.mu.RUnlock()
return &inmemoryInvertedIndex{
m: mCopy,
pendingEntries: pendingEntries,
}
}
func (iidx *inmemoryInvertedIndex) MustUpdate(idb *indexDB, byTenant map[accountProjectKey]*uint64set.Set) {
var entries []pendingHourMetricIDEntry
var metricIDs []uint64
for k, v := range byTenant {
var e pendingHourMetricIDEntry
e.AccountID = k.AccountID
e.ProjectID = k.ProjectID
metricIDs = v.AppendTo(metricIDs[:0])
for _, metricID := range metricIDs {
e.MetricID = metricID
entries = append(entries, e)
}
}
iidx.mu.Lock()
iidx.pendingEntries = append(iidx.pendingEntries, entries...)
if err := iidx.updateLocked(idb); err != nil {
logger.Panicf("FATAL: cannot update inmemoryInvertedIndex with pendingEntries: %s", err)
}
iidx.mu.Unlock()
}
func (iidx *inmemoryInvertedIndex) AddMetricID(idb *indexDB, e pendingHourMetricIDEntry) {
iidx.mu.Lock()
iidx.pendingEntries = append(iidx.pendingEntries, e)
if err := iidx.updateLocked(idb); err != nil {
logger.Panicf("FATAL: cannot update inmemoryInvertedIndex with pendingEntries: %s", err)
}
iidx.mu.Unlock()
}
func (iidx *inmemoryInvertedIndex) UpdateMetricIDsForTagFilters(metricIDs, allMetricIDs *uint64set.Set, tfs *TagFilters) {
if iidx == nil {
return
}
var result *uint64set.Set
var tfFirst *tagFilter
for i := range tfs.tfs {
if tfs.tfs[i].isNegative {
continue
}
tfFirst = &tfs.tfs[i]
}
iidx.mu.RLock()
defer iidx.mu.RUnlock()
if tfFirst == nil {
result = allMetricIDs.Clone()
} else {
result = iidx.getMetricIDsForTagFilterLocked(tfFirst, tfs.commonPrefix)
result.Intersect(allMetricIDs) // This line is required for filtering metrics by (accountID, projectID)
}
for i := range tfs.tfs {
tf := &tfs.tfs[i]
if tf == tfFirst {
continue
}
m := iidx.getMetricIDsForTagFilterLocked(tf, tfs.commonPrefix)
if tf.isNegative {
result.Subtract(m)
} else {
result.Intersect(m)
}
if result.Len() == 0 {
return
}
}
metricIDs.Union(result)
}
func (iidx *inmemoryInvertedIndex) getMetricIDsForTagFilterLocked(tf *tagFilter, commonPrefix []byte) *uint64set.Set {
if !bytes.HasPrefix(tf.prefix, commonPrefix) {
logger.Panicf("BUG: tf.prefix must start with commonPrefix=%q; got %q", commonPrefix, tf.prefix)
}
prefix := tf.prefix[len(commonPrefix):]
var m uint64set.Set
kb := kbPool.Get()
defer kbPool.Put(kb)
for k, v := range iidx.m {
if len(k) < len(prefix) || k[:len(prefix)] != string(prefix) {
continue
}
kb.B = append(kb.B[:0], k[len(prefix):]...)
ok, err := tf.matchSuffix(kb.B)
if err != nil {
logger.Panicf("BUG: unexpected error from matchSuffix(%q): %s", kb.B, err)
}
if !ok {
continue
}
m.Union(v)
}
return &m
}
func (iidx *inmemoryInvertedIndex) updateLocked(idb *indexDB) error {
entries := iidx.pendingEntries
iidx.pendingEntries = iidx.pendingEntries[:0]
kb := kbPool.Get()
defer kbPool.Put(kb)
var mn MetricName
for _, e := range entries {
var err error
metricID := e.MetricID
kb.B, err = idb.searchMetricName(kb.B[:0], metricID, e.AccountID, e.ProjectID)
if err != nil {
if err == io.EOF {
iidx.pendingEntries = append(iidx.pendingEntries, e)
continue
}
return fmt.Errorf("cannot find metricName by metricID %d: %s", metricID, err)
}
if err = mn.Unmarshal(kb.B); err != nil {
return fmt.Errorf("cannot unmarshal metricName %q: %s", kb.B, err)
}
kb.B = marshalTagValue(kb.B[:0], nil)
kb.B = marshalTagValue(kb.B, mn.MetricGroup)
iidx.addMetricIDLocked(kb.B, metricID)
for i := range mn.Tags {
kb.B = mn.Tags[i].Marshal(kb.B[:0])
iidx.addMetricIDLocked(kb.B, metricID)
}
}
return nil
}
func (iidx *inmemoryInvertedIndex) addMetricIDLocked(key []byte, metricID uint64) {
v := iidx.m[string(key)]
if v == nil {
v = &uint64set.Set{}
iidx.m[string(key)] = v
}
v.Add(metricID)
}

View file

@ -102,7 +102,7 @@ func TestSearch(t *testing.T) {
{[]byte("instance"), []byte("8.8.8.8:1234")},
}
startTimestamp := timestampFromTime(time.Now())
startTimestamp -= startTimestamp % (1e3 * 3600 * 24)
startTimestamp -= startTimestamp % (1e3 * 60 * 30)
blockRowsCount := 0
for i := 0; i < rowsCount; i++ {
mn.AccountID = uint32(i % accountsCount)

View file

@ -68,8 +68,8 @@ type Storage struct {
prevHourMetricIDs atomic.Value
// Pending MetricID values to be added to currHourMetricIDs.
pendingHourMetricIDsLock sync.Mutex
pendingHourMetricIDs []pendingHourMetricIDEntry
pendingHourEntriesLock sync.Mutex
pendingHourEntries []pendingHourMetricIDEntry
stop chan struct{}
@ -150,6 +150,13 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) {
idbCurr.SetExtDB(idbPrev)
s.idbCurr.Store(idbCurr)
// Initialize iidx. hmCurr and hmPrev shouldn't be used till now,
// so it should be safe initializing it inplace.
hmPrev.iidx = newInmemoryInvertedIndex()
hmPrev.iidx.MustUpdate(s.idb(), hmPrev.byTenant)
hmCurr.iidx = newInmemoryInvertedIndex()
hmCurr.iidx.MustUpdate(s.idb(), hmCurr.byTenant)
// Load data
tablePath := path + "/data"
tb, err := openTable(tablePath, retentionMonths, s.getDeletedMetricIDs)
@ -323,6 +330,10 @@ type Metrics struct {
HourMetricIDCacheSize uint64
RecentHourInvertedIndexSize uint64
RecentHourInvertedIndexUniqueTagPairsSize uint64
RecentHourInvertedIndexPendingMetricIDsSize uint64
IndexDBMetrics IndexDBMetrics
TableMetrics TableMetrics
}
@ -383,6 +394,15 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
}
m.HourMetricIDCacheSize += uint64(hourMetricIDsLen)
m.RecentHourInvertedIndexSize += uint64(hmPrev.iidx.GetEntriesCount())
m.RecentHourInvertedIndexSize += uint64(hmCurr.iidx.GetEntriesCount())
m.RecentHourInvertedIndexUniqueTagPairsSize += uint64(hmPrev.iidx.GetUniqueTagPairsLen())
m.RecentHourInvertedIndexUniqueTagPairsSize += uint64(hmCurr.iidx.GetUniqueTagPairsLen())
m.RecentHourInvertedIndexPendingMetricIDsSize += uint64(hmPrev.iidx.GetPendingMetricIDsLen())
m.RecentHourInvertedIndexPendingMetricIDsSize += uint64(hmCurr.iidx.GetPendingMetricIDsLen())
s.idb().UpdateMetrics(&m.IndexDBMetrics)
s.tb.UpdateMetrics(&m.TableMetrics)
}
@ -422,6 +442,7 @@ func (s *Storage) currHourMetricIDsUpdater() {
for {
select {
case <-s.stop:
s.updateCurrHourMetricIDs()
return
case <-t.C:
s.updateCurrHourMetricIDs()
@ -496,7 +517,9 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
startTime := time.Now()
if !fs.IsPathExist(path) {
logger.Infof("nothing to load from %q", path)
return &hourMetricIDs{}
return &hourMetricIDs{
hour: hour,
}
}
src, err := ioutil.ReadFile(path)
if err != nil {
@ -505,7 +528,9 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
srcOrigLen := len(src)
if len(src) < 24 {
logger.Errorf("discarding %s, since it has broken header; got %d bytes; want %d bytes", path, len(src), 24)
return &hourMetricIDs{}
return &hourMetricIDs{
hour: hour,
}
}
// Unmarshal header
@ -514,8 +539,10 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
hourLoaded := encoding.UnmarshalUint64(src)
src = src[8:]
if hourLoaded != hour {
logger.Infof("discarding %s, since it is outdated", name)
return &hourMetricIDs{}
logger.Infof("discarding %s, since it contains outdated hour; got %d; want %d", name, hourLoaded, hour)
return &hourMetricIDs{
hour: hour,
}
}
// Unmarshal hm.m
@ -523,7 +550,9 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
src = src[8:]
if uint64(len(src)) < 8*hmLen {
logger.Errorf("discarding %s, since it has broken hm.m data; got %d bytes; want %d bytes", path, len(src), 8*hmLen)
return &hourMetricIDs{}
return &hourMetricIDs{
hour: hour,
}
}
m := &uint64set.Set{}
for i := uint64(0); i < hmLen; i++ {
@ -845,9 +874,6 @@ var (
)
func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]rawRow, error) {
// Return only the last error, since it has no sense in returning all errors.
var lastWarn error
var is *indexSearch
var mn *MetricName
var kb *bytesutil.ByteBuffer
@ -861,6 +887,8 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
rows = rows[:rowsLen+len(mrs)]
j := 0
minTimestamp, maxTimestamp := s.tb.getMinMaxTimestamps()
// Return only the last error, since it has no sense in returning all errors.
var lastWarn error
for i := range mrs {
mr := &mrs[i]
if math.IsNaN(mr.Value) {
@ -951,6 +979,7 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error
keyBuf := kb.B
a := (*[2]uint64)(unsafe.Pointer(&keyBuf[0]))
idb := s.idb()
hm := s.currHourMetricIDs.Load().(*hourMetricIDs)
for i := range rows {
r := &rows[i]
if r.Timestamp != prevTimestamp {
@ -959,21 +988,21 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error
prevTimestamp = r.Timestamp
}
metricID := r.TSID.MetricID
hm := s.currHourMetricIDs.Load().(*hourMetricIDs)
if hour == hm.hour {
// The r belongs to the current hour. Check for the current hour cache.
if hm.m.Has(metricID) {
// Fast path: the metricID is in the current hour cache.
continue
}
s.pendingHourMetricIDsLock.Lock()
s.pendingHourEntriesLock.Lock()
e := pendingHourMetricIDEntry{
AccountID: r.TSID.AccountID,
ProjectID: r.TSID.ProjectID,
MetricID: metricID,
}
s.pendingHourMetricIDs = append(s.pendingHourMetricIDs, e)
s.pendingHourMetricIDsLock.Unlock()
s.pendingHourEntries = append(s.pendingHourEntries, e)
s.pendingHourEntriesLock.Unlock()
hm.iidx.AddMetricID(idb, e)
}
// Slower path: check global cache for (date, metricID) entry.
@ -997,37 +1026,36 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error
func (s *Storage) updateCurrHourMetricIDs() {
hm := s.currHourMetricIDs.Load().(*hourMetricIDs)
s.pendingHourMetricIDsLock.Lock()
newMetricIDsLen := len(s.pendingHourMetricIDs)
s.pendingHourMetricIDsLock.Unlock()
s.pendingHourEntriesLock.Lock()
newEntries := append([]pendingHourMetricIDEntry{}, s.pendingHourEntries...)
s.pendingHourEntries = s.pendingHourEntries[:0]
s.pendingHourEntriesLock.Unlock()
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
if newMetricIDsLen == 0 && hm.hour == hour {
if len(newEntries) == 0 && hm.hour == hour {
// Fast path: nothing to update.
return
}
// Slow path: hm.m must be updated with non-empty s.pendingHourMetricIDs.
// Slow path: hm.m must be updated with non-empty s.pendingHourEntries.
var m *uint64set.Set
var iidx *inmemoryInvertedIndex
var byTenant map[accountProjectKey]*uint64set.Set
isFull := hm.isFull
if hm.hour == hour {
m = hm.m.Clone()
iidx = hm.iidx.Clone()
byTenant = make(map[accountProjectKey]*uint64set.Set, len(hm.byTenant))
for k, e := range hm.byTenant {
byTenant[k] = e.Clone()
}
} else {
m = &uint64set.Set{}
iidx = newInmemoryInvertedIndex()
byTenant = make(map[accountProjectKey]*uint64set.Set)
isFull = true
}
s.pendingHourMetricIDsLock.Lock()
a := append([]pendingHourMetricIDEntry{}, s.pendingHourMetricIDs...)
s.pendingHourMetricIDs = s.pendingHourMetricIDs[:0]
s.pendingHourMetricIDsLock.Unlock()
for _, x := range a {
for _, x := range newEntries {
m.Add(x.MetricID)
k := accountProjectKey{
AccountID: x.AccountID,
@ -1043,6 +1071,7 @@ func (s *Storage) updateCurrHourMetricIDs() {
hmNew := &hourMetricIDs{
m: m,
iidx: iidx,
byTenant: byTenant,
hour: hour,
isFull: isFull,
@ -1055,6 +1084,7 @@ func (s *Storage) updateCurrHourMetricIDs() {
type hourMetricIDs struct {
m *uint64set.Set
iidx *inmemoryInvertedIndex
byTenant map[accountProjectKey]*uint64set.Set
hour uint64
isFull bool

View file

@ -25,6 +25,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hmOrig := &hourMetricIDs{
m: &uint64set.Set{},
iidx: newInmemoryInvertedIndex(),
hour: 123,
}
hmOrig.m.Add(12)
@ -51,8 +52,8 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig)
}
if len(s.pendingHourMetricIDs) != 0 {
t.Fatalf("unexpected len(s.pendingHourMetricIDs); got %d; want %d", len(s.pendingHourMetricIDs), 0)
if len(s.pendingHourEntries) != 0 {
t.Fatalf("unexpected len(s.pendingHourEntries); got %d; want %d", len(s.pendingHourEntries), 0)
}
})
t.Run("empty_pending_metric_ids_valid_curr_hour", func(t *testing.T) {
@ -60,6 +61,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hmOrig := &hourMetricIDs{
m: &uint64set.Set{},
iidx: newInmemoryInvertedIndex(),
hour: hour,
}
hmOrig.m.Add(12)
@ -88,23 +90,23 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
if !reflect.DeepEqual(hmPrev, hmEmpty) {
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty)
}
if len(s.pendingHourMetricIDs) != 0 {
t.Fatalf("unexpected len(s.pendingHourMetricIDs); got %d; want %d", len(s.pendingHourMetricIDs), 0)
if len(s.pendingHourEntries) != 0 {
t.Fatalf("unexpected len(s.pendingHourEntries); got %d; want %d", len(s.pendingHourEntries), 0)
}
})
t.Run("nonempty_pending_metric_ids_stale_curr_hour", func(t *testing.T) {
s := newStorage()
s.pendingHourMetricIDs = []pendingHourMetricIDEntry{
s.pendingHourEntries = []pendingHourMetricIDEntry{
{AccountID: 123, ProjectID: 431, MetricID: 343},
{AccountID: 123, ProjectID: 431, MetricID: 32424},
{AccountID: 1, ProjectID: 2, MetricID: 8293432},
}
mExpected := &uint64set.Set{}
for _, e := range s.pendingHourMetricIDs {
for _, e := range s.pendingHourEntries {
mExpected.Add(e.MetricID)
}
byTenantExpected := make(map[accountProjectKey]*uint64set.Set)
for _, e := range s.pendingHourMetricIDs {
for _, e := range s.pendingHourEntries {
k := accountProjectKey{
AccountID: e.AccountID,
ProjectID: e.ProjectID,
@ -120,6 +122,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hmOrig := &hourMetricIDs{
m: &uint64set.Set{},
iidx: newInmemoryInvertedIndex(),
hour: 123,
}
hmOrig.m.Add(12)
@ -134,7 +137,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
}
}
if !reflect.DeepEqual(hmCurr.m, mExpected) {
if !hmCurr.m.Equal(mExpected) {
t.Fatalf("unexpected hm.m; got %v; want %v", hmCurr.m, mExpected)
}
if !reflect.DeepEqual(hmCurr.byTenant, byTenantExpected) {
@ -148,23 +151,23 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
if !reflect.DeepEqual(hmPrev, hmOrig) {
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig)
}
if len(s.pendingHourMetricIDs) != 0 {
t.Fatalf("unexpected len(s.pendingHourMetricIDs); got %d; want %d", len(s.pendingHourMetricIDs), 0)
if len(s.pendingHourEntries) != 0 {
t.Fatalf("unexpected len(s.pendingHourEntries); got %d; want %d", len(s.pendingHourEntries), 0)
}
})
t.Run("nonempty_pending_metric_ids_valid_curr_hour", func(t *testing.T) {
s := newStorage()
s.pendingHourMetricIDs = []pendingHourMetricIDEntry{
s.pendingHourEntries = []pendingHourMetricIDEntry{
{AccountID: 123, ProjectID: 431, MetricID: 343},
{AccountID: 123, ProjectID: 431, MetricID: 32424},
{AccountID: 1, ProjectID: 2, MetricID: 8293432},
}
mExpected := &uint64set.Set{}
for _, e := range s.pendingHourMetricIDs {
for _, e := range s.pendingHourEntries {
mExpected.Add(e.MetricID)
}
byTenantExpected := make(map[accountProjectKey]*uint64set.Set)
for _, e := range s.pendingHourMetricIDs {
for _, e := range s.pendingHourEntries {
k := accountProjectKey{
AccountID: e.AccountID,
ProjectID: e.ProjectID,
@ -180,6 +183,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hmOrig := &hourMetricIDs{
m: &uint64set.Set{},
iidx: newInmemoryInvertedIndex(),
hour: hour,
}
hmOrig.m.Add(12)
@ -201,7 +205,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
for _, metricID := range origMetricIDs {
m.Add(metricID)
}
if !reflect.DeepEqual(hmCurr.m, m) {
if !hmCurr.m.Equal(m) {
t.Fatalf("unexpected hm.m; got %v; want %v", hmCurr.m, m)
}
if !reflect.DeepEqual(hmCurr.byTenant, byTenantExpected) {
@ -216,8 +220,8 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
if !reflect.DeepEqual(hmPrev, hmEmpty) {
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty)
}
if len(s.pendingHourMetricIDs) != 0 {
t.Fatalf("unexpected s.pendingHourMetricIDs.Len(); got %d; want %d", len(s.pendingHourMetricIDs), 0)
if len(s.pendingHourEntries) != 0 {
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", len(s.pendingHourEntries), 0)
}
})
}

View file

@ -12,8 +12,10 @@ import (
//
// It is unsafe calling Set methods from concurrent goroutines.
type Set struct {
itemsCount int
buckets bucket32Sorter
skipSmallPool bool
itemsCount int
buckets bucket32Sorter
smallPool [5]uint64
}
type bucket32Sorter []*bucket32
@ -35,8 +37,10 @@ func (s *Set) Clone() *Set {
return &Set{}
}
var dst Set
dst.skipSmallPool = s.skipSmallPool
dst.itemsCount = s.itemsCount
dst.buckets = make([]*bucket32, len(s.buckets))
dst.smallPool = s.smallPool
for i, b32 := range s.buckets {
dst.buckets[i] = b32.clone()
}
@ -53,6 +57,10 @@ func (s *Set) Len() int {
// Add adds x to s.
func (s *Set) Add(x uint64) {
if !s.skipSmallPool {
s.addToSmallPool(x)
return
}
hi := uint32(x >> 32)
lo := uint32(x)
for _, b32 := range s.buckets {
@ -66,6 +74,23 @@ func (s *Set) Add(x uint64) {
s.addAlloc(hi, lo)
}
func (s *Set) addToSmallPool(x uint64) {
if s.hasInSmallPool(x) {
return
}
if s.itemsCount < len(s.smallPool) {
s.smallPool[s.itemsCount] = x
s.itemsCount++
return
}
s.skipSmallPool = true
s.itemsCount = 0
for _, v := range s.smallPool[:] {
s.Add(v)
}
s.Add(x)
}
func (s *Set) addAlloc(hi, lo uint32) {
var b32 bucket32
b32.hi = hi
@ -76,11 +101,14 @@ func (s *Set) addAlloc(hi, lo uint32) {
// Has verifies whether x exists in s.
func (s *Set) Has(x uint64) bool {
hi := uint32(x >> 32)
lo := uint32(x)
if s == nil {
return false
}
if !s.skipSmallPool {
return s.hasInSmallPool(x)
}
hi := uint32(x >> 32)
lo := uint32(x)
for _, b32 := range s.buckets {
if b32.hi == hi {
return b32.has(lo)
@ -89,8 +117,21 @@ func (s *Set) Has(x uint64) bool {
return false
}
func (s *Set) hasInSmallPool(x uint64) bool {
for _, v := range s.smallPool[:s.itemsCount] {
if v == x {
return true
}
}
return false
}
// Del deletes x from s.
func (s *Set) Del(x uint64) {
if !s.skipSmallPool {
s.delFromSmallPool(x)
return
}
hi := uint32(x >> 32)
lo := uint32(x)
for _, b32 := range s.buckets {
@ -103,13 +144,37 @@ func (s *Set) Del(x uint64) {
}
}
func (s *Set) delFromSmallPool(x uint64) {
idx := -1
for i, v := range s.smallPool[:s.itemsCount] {
if v == x {
idx = i
}
}
if idx < 0 {
return
}
copy(s.smallPool[idx:], s.smallPool[idx+1:])
s.itemsCount--
}
// AppendTo appends all the items from the set to dst and returns the result.
//
// The returned items are sorted.
//
// AppendTo can mutate s.
func (s *Set) AppendTo(dst []uint64) []uint64 {
if s == nil {
return dst
}
if !s.skipSmallPool {
a := s.smallPool[:s.itemsCount]
if len(a) > 1 {
sort.Slice(a, func(i, j int) bool { return a[i] < a[j] })
}
return append(dst, a...)
}
// pre-allocate memory for dst
dstLen := len(dst)
if n := s.Len() - cap(dst) + dstLen; n > 0 {
@ -128,23 +193,82 @@ func (s *Set) AppendTo(dst []uint64) []uint64 {
// Union adds all the items from a to s.
func (s *Set) Union(a *Set) {
// Clone a, since AppendTo may mutate it below.
aCopy := a.Clone()
if s.Len() == 0 {
// Fast path if the initial set is empty.
*s = *aCopy
return
}
// TODO: optimize it
for _, x := range a.AppendTo(nil) {
for _, x := range aCopy.AppendTo(nil) {
s.Add(x)
}
}
// Intersect removes all the items missing in a from s.
func (s *Set) Intersect(a *Set) {
if a.Len() == 0 {
// Fast path
*s = Set{}
return
}
// TODO: optimize it
for _, x := range s.AppendTo(nil) {
if !a.Has(x) {
s.Del(x)
}
}
}
// Subtract removes from s all the shared items between s and a.
func (s *Set) Subtract(a *Set) {
if s.Len() == 0 {
return
}
// Copy a because AppendTo below can mutate a.
aCopy := a.Clone()
// TODO: optimize it
for _, x := range aCopy.AppendTo(nil) {
if s.Has(x) {
s.Del(x)
}
}
}
// Equal returns true if s contains the same items as a.
func (s *Set) Equal(a *Set) bool {
if s.Len() != a.Len() {
return false
}
// Copy a because AppendTo below can mutate a
aCopy := a.Clone()
// TODO: optimize it
for _, x := range aCopy.AppendTo(nil) {
if !s.Has(x) {
return false
}
}
return true
}
type bucket32 struct {
hi uint32
b16his []uint16
buckets []*bucket16
skipSmallPool bool
smallPoolLen int
hi uint32
b16his []uint16
buckets []*bucket16
smallPool [14]uint32
}
func (b *bucket32) clone() *bucket32 {
var dst bucket32
dst.skipSmallPool = b.skipSmallPool
dst.smallPoolLen = b.smallPoolLen
dst.hi = b.hi
dst.b16his = append(dst.b16his[:0], b.b16his...)
dst.buckets = make([]*bucket16, len(b.buckets))
dst.smallPool = b.smallPool
for i, b16 := range b.buckets {
dst.buckets[i] = b16.clone()
}
@ -164,6 +288,9 @@ func (b *bucket32) Swap(i, j int) {
const maxUnsortedBuckets = 32
func (b *bucket32) add(x uint32) bool {
if !b.skipSmallPool {
return b.addToSmallPool(x)
}
hi := uint16(x >> 16)
lo := uint16(x)
if len(b.buckets) > maxUnsortedBuckets {
@ -178,6 +305,23 @@ func (b *bucket32) add(x uint32) bool {
return true
}
func (b *bucket32) addToSmallPool(x uint32) bool {
if b.hasInSmallPool(x) {
return false
}
if b.smallPoolLen < len(b.smallPool) {
b.smallPool[b.smallPoolLen] = x
b.smallPoolLen++
return true
}
b.skipSmallPool = true
b.smallPoolLen = 0
for _, v := range b.smallPool[:] {
b.add(v)
}
return b.add(x)
}
func (b *bucket32) addAllocSmall(hi, lo uint16) {
var b16 bucket16
_ = b16.add(lo)
@ -199,6 +343,7 @@ func (b *bucket32) addSlow(hi, lo uint16) bool {
func (b *bucket32) addAllocBig(hi, lo uint16, n int) {
if n < 0 {
// This is a hint to Go compiler to remove automatic bounds checks below.
return
}
var b16 bucket16
@ -215,6 +360,9 @@ func (b *bucket32) addAllocBig(hi, lo uint16, n int) {
}
func (b *bucket32) has(x uint32) bool {
if !b.skipSmallPool {
return b.hasInSmallPool(x)
}
hi := uint16(x >> 16)
lo := uint16(x)
if len(b.buckets) > maxUnsortedBuckets {
@ -228,6 +376,15 @@ func (b *bucket32) has(x uint32) bool {
return false
}
func (b *bucket32) hasInSmallPool(x uint32) bool {
for _, v := range b.smallPool[:b.smallPoolLen] {
if v == x {
return true
}
}
return false
}
func (b *bucket32) hasSlow(hi, lo uint16) bool {
n := binarySearch16(b.b16his, hi)
if n < 0 || n >= len(b.b16his) || b.b16his[n] != hi {
@ -237,6 +394,9 @@ func (b *bucket32) hasSlow(hi, lo uint16) bool {
}
func (b *bucket32) del(x uint32) bool {
if !b.skipSmallPool {
return b.delFromSmallPool(x)
}
hi := uint16(x >> 16)
lo := uint16(x)
if len(b.buckets) > maxUnsortedBuckets {
@ -250,6 +410,21 @@ func (b *bucket32) del(x uint32) bool {
return false
}
func (b *bucket32) delFromSmallPool(x uint32) bool {
idx := -1
for i, v := range b.smallPool[:b.smallPoolLen] {
if v == x {
idx = i
}
}
if idx < 0 {
return false
}
copy(b.smallPool[idx:], b.smallPool[idx+1:])
b.smallPoolLen--
return true
}
func (b *bucket32) delSlow(hi, lo uint16) bool {
n := binarySearch16(b.b16his, hi)
if n < 0 || n >= len(b.b16his) || b.b16his[n] != hi {
@ -259,6 +434,18 @@ func (b *bucket32) delSlow(hi, lo uint16) bool {
}
func (b *bucket32) appendTo(dst []uint64) []uint64 {
if !b.skipSmallPool {
a := b.smallPool[:b.smallPoolLen]
if len(a) > 1 {
sort.Slice(a, func(i, j int) bool { return a[i] < a[j] })
}
hi := uint64(b.hi) << 32
for _, lo32 := range a {
v := hi | uint64(lo32)
dst = append(dst, v)
}
return dst
}
if len(b.buckets) <= maxUnsortedBuckets && !sort.IsSorted(b) {
sort.Sort(b)
}

View file

@ -3,13 +3,14 @@ package uint64set
import (
"fmt"
"math/rand"
"reflect"
"sort"
"testing"
"time"
)
func TestSetBasicOps(t *testing.T) {
for _, itemsCount := range []int{1e2, 1e3, 1e4, 1e5, 1e6, maxUnsortedBuckets * bitsPerBucket * 2} {
for _, itemsCount := range []int{1, 2, 3, 4, 5, 6, 1e2, 1e3, 1e4, 1e5, 1e6, maxUnsortedBuckets * bitsPerBucket * 2} {
t.Run(fmt.Sprintf("items_%d", itemsCount), func(t *testing.T) {
testSetBasicOps(t, itemsCount)
})
@ -21,7 +22,51 @@ func testSetBasicOps(t *testing.T, itemsCount int) {
offset := uint64(time.Now().UnixNano())
// Verify operations on nil set
{
var sNil *Set
if sNil.Has(123) {
t.Fatalf("sNil shouldn't contain any item; found 123")
}
if n := sNil.Len(); n != 0 {
t.Fatalf("unexpected sNil.Len(); got %d; want 0", n)
}
result := sNil.AppendTo(nil)
if result != nil {
t.Fatalf("sNil.AppendTo(nil) must return nil")
}
buf := []uint64{1, 2, 3}
result = sNil.AppendTo(buf)
if !reflect.DeepEqual(result, buf) {
t.Fatalf("sNil.AppendTo(buf) must return buf")
}
sCopy := sNil.Clone()
if n := sCopy.Len(); n != 0 {
t.Fatalf("unexpected sCopy.Len() from nil set; got %d; want 0", n)
}
sCopy.Add(123)
if n := sCopy.Len(); n != 1 {
t.Fatalf("unexpected sCopy.Len() after adding an item; got %d; want 1", n)
}
sCopy.Add(123)
if n := sCopy.Len(); n != 1 {
t.Fatalf("unexpected sCopy.Len() after adding an item twice; got %d; want 1", n)
}
if !sCopy.Has(123) {
t.Fatalf("sCopy must contain 123")
}
sCopy.Del(123)
if n := sCopy.Len(); n != 0 {
t.Fatalf("unexpected sCopy.Len() after deleting the item; got %d; want 0", n)
}
sCopy.Del(123)
if n := sCopy.Len(); n != 0 {
t.Fatalf("unexpected sCopy.Len() after double deleting the item; got %d; want 0", n)
}
}
// Verify forward Add
itemsCount = (itemsCount / 2) * 2
for i := 0; i < itemsCount/2; i++ {
s.Add(uint64(i) + offset)
}
@ -59,7 +104,7 @@ func testSetBasicOps(t *testing.T, itemsCount int) {
}
}
// Verify Clone
// Verify Clone and Equal
sCopy := s.Clone()
if n := sCopy.Len(); n != itemsCount {
t.Fatalf("unexpected sCopy.Len(); got %d; want %d", n, itemsCount)
@ -69,6 +114,33 @@ func testSetBasicOps(t *testing.T, itemsCount int) {
t.Fatalf("missing bit %d on sCopy", uint64(i)+offset)
}
}
if !sCopy.Equal(&s) {
t.Fatalf("s must equal to sCopy")
}
if !s.Equal(sCopy) {
t.Fatalf("sCopy must equal to s")
}
if s.Len() > 0 {
var sEmpty Set
if s.Equal(&sEmpty) {
t.Fatalf("s mustn't equal to sEmpty")
}
sNew := s.Clone()
sNew.Del(offset)
if sNew.Equal(&s) {
t.Fatalf("sNew mustn't equal to s")
}
if s.Equal(sNew) {
t.Fatalf("s mustn't equal to sNew")
}
sNew.Add(offset - 123)
if sNew.Equal(&s) {
t.Fatalf("sNew mustn't equal to s")
}
if s.Equal(sNew) {
t.Fatalf("s mustn't equal to sNew")
}
}
// Verify AppendTo
a := s.AppendTo(nil)
@ -89,31 +161,95 @@ func testSetBasicOps(t *testing.T, itemsCount int) {
}
// Verify union
const unionOffset = 12345
var s1, s2 Set
for i := 0; i < itemsCount; i++ {
s1.Add(uint64(i) + offset)
s2.Add(uint64(i) + offset + unionOffset)
{
const unionOffset = 12345
var s1, s2 Set
for i := 0; i < itemsCount; i++ {
s1.Add(uint64(i) + offset)
s2.Add(uint64(i) + offset + unionOffset)
}
s1.Union(&s2)
expectedLen := 2 * itemsCount
if itemsCount > unionOffset {
expectedLen = itemsCount + unionOffset
}
if n := s1.Len(); n != expectedLen {
t.Fatalf("unexpected s1.Len() after union; got %d; want %d", n, expectedLen)
}
// Verify union on empty set.
var s3 Set
s3.Union(&s1)
expectedLen = s1.Len()
if n := s3.Len(); n != expectedLen {
t.Fatalf("unexpected s3.Len() after union with empty set; got %d; want %d", n, expectedLen)
}
}
s1.Union(&s2)
expectedLen := 2 * itemsCount
if itemsCount > unionOffset {
expectedLen = itemsCount + unionOffset
// Verify intersect
{
const intersectOffset = 12345
var s1, s2 Set
for i := 0; i < itemsCount; i++ {
s1.Add(uint64(i) + offset)
s2.Add(uint64(i) + offset + intersectOffset)
}
s1.Intersect(&s2)
expectedLen := 0
if itemsCount > intersectOffset {
expectedLen = itemsCount - intersectOffset
}
if n := s1.Len(); n != expectedLen {
t.Fatalf("unexpected s1.Len() after intersect; got %d; want %d", n, expectedLen)
}
// Verify intersect on empty set.
var s3 Set
s2.Intersect(&s3)
expectedLen = 0
if n := s2.Len(); n != 0 {
t.Fatalf("unexpected s3.Len() after intersect with empty set; got %d; want %d", n, expectedLen)
}
}
if n := s1.Len(); n != expectedLen {
t.Fatalf("unexpected s1.Len() after union; got %d; want %d", n, expectedLen)
// Verify subtract
{
const subtractOffset = 12345
var s1, s2 Set
for i := 0; i < itemsCount; i++ {
s1.Add(uint64(i) + offset)
s2.Add(uint64(i) + offset + subtractOffset)
}
s1.Subtract(&s2)
expectedLen := itemsCount
if itemsCount > subtractOffset {
expectedLen = subtractOffset
}
if n := s1.Len(); n != expectedLen {
t.Fatalf("unexpected s1.Len() after subtract; got %d; want %d", n, expectedLen)
}
// Verify subtract from empty set.
var s3 Set
s3.Subtract(&s2)
expectedLen = 0
if n := s3.Len(); n != 0 {
t.Fatalf("unexpected s3.Len() after subtract from empty set; got %d; want %d", n, expectedLen)
}
}
// Verify Del
itemsDeleted := 0
for i := itemsCount / 2; i < itemsCount-itemsCount/4; i++ {
s.Del(uint64(i) + offset)
itemsDeleted++
}
if n := s.Len(); n != itemsCount-itemsCount/4 {
t.Fatalf("unexpected s.Len() after Del; got %d; want %d", n, itemsCount-itemsCount/4)
if n := s.Len(); n != itemsCount-itemsDeleted {
t.Fatalf("unexpected s.Len() after Del; got %d; want %d", n, itemsCount-itemsDeleted)
}
a = s.AppendTo(a[:0])
if len(a) != itemsCount-itemsCount/4 {
t.Fatalf("unexpected len of exported array; got %d; want %d", len(a), itemsCount-itemsCount/4)
if len(a) != itemsCount-itemsDeleted {
t.Fatalf("unexpected len of exported array; got %d; want %d", len(a), itemsCount-itemsDeleted)
}
m = make(map[uint64]bool)
for _, x := range a {
@ -137,8 +273,8 @@ func testSetBasicOps(t *testing.T, itemsCount int) {
s.Del(uint64(i) + offset)
s.Del(uint64(i) + offset + uint64(itemsCount))
}
if n := s.Len(); n != itemsCount-itemsCount/4 {
t.Fatalf("unexpected s.Len() after Del for non-existing items; got %d; want %d", n, itemsCount-itemsCount/4)
if n := s.Len(); n != itemsCount-itemsDeleted {
t.Fatalf("unexpected s.Len() after Del for non-existing items; got %d; want %d", n, itemsCount-itemsDeleted)
}
// Verify sCopy has the original data