From b986516fbe97a607f9cbbd78e6bf93665fa4825f Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 24 Sep 2019 21:10:22 +0300 Subject: [PATCH] lib/storage: create and use `lib/uint64set` instead of `map[uint64]struct{}` This should improve inverted index search performance for filters matching big number of time series, since `lib/uint64set.Set` is faster than `map[uint64]struct{}` for both `Add` and `Has` calls. See the corresponding benchmarks in `lib/uint64set`. --- go.mod | 1 + lib/mergeset/part_search.go | 2 +- lib/storage/index_db.go | 189 ++++++-------- lib/storage/merge.go | 9 +- lib/storage/partition.go | 9 +- lib/storage/partition_search_test.go | 4 +- lib/storage/storage.go | 54 ++-- lib/storage/storage_test.go | 77 +++--- lib/storage/table.go | 7 +- lib/uint64set/uint64set.go | 332 +++++++++++++++++++++++++ lib/uint64set/uint64set_test.go | 224 +++++++++++++++++ lib/uint64set/uint64set_timing_test.go | 321 ++++++++++++++++++++++++ 12 files changed, 1041 insertions(+), 188 deletions(-) create mode 100644 lib/uint64set/uint64set.go create mode 100644 lib/uint64set/uint64set_test.go create mode 100644 lib/uint64set/uint64set_timing_test.go diff --git a/go.mod b/go.mod index c1950e7ca5..152f015816 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/google/go-cmp v0.3.0 // indirect github.com/klauspost/compress v1.8.3 github.com/valyala/fastjson v1.4.1 + github.com/valyala/fastrand v1.0.0 github.com/valyala/gozstd v1.6.2 github.com/valyala/histogram v1.0.1 github.com/valyala/quicktemplate v1.2.0 diff --git a/lib/mergeset/part_search.go b/lib/mergeset/part_search.go index 2acb6bfb28..a28f583a15 100644 --- a/lib/mergeset/part_search.go +++ b/lib/mergeset/part_search.go @@ -371,7 +371,7 @@ func binarySearchKey(items [][]byte, key []byte) int { i, j := uint(0), n for i < j { h := uint(i+j) >> 1 - if string(key) > string(items[h]) { + if h >= 0 && h < uint(len(items)) && string(key) > string(items[h]) { i = h + 1 } else { j = h diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 807d470ff5..3e508ebd4e 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -18,6 +18,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" "github.com/VictoriaMetrics/fastcache" xxhash "github.com/cespare/xxhash/v2" @@ -67,12 +68,12 @@ type indexDB struct { indexSearchPool sync.Pool - // An inmemory map[uint64]struct{} of deleted metricIDs. + // An inmemory set of deleted metricIDs. // - // The map holds deleted metricIDs for the current db and for the extDB. + // The set holds deleted metricIDs for the current db and for the extDB. // - // It is safe to keep the map in memory even for big number of deleted - // metricIDs, since it occupies only 8 bytes per deleted metricID. + // It is safe to keep the set in memory even for big number of deleted + // metricIDs, since it usually requires 1 bit per deleted metricID. deletedMetricIDs atomic.Value deletedMetricIDsUpdateLock sync.Mutex @@ -199,7 +200,7 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) { m.UselessTagFiltersCacheRequests += cs.GetBigCalls m.UselessTagFiltersCacheMisses += cs.Misses - m.DeletedMetricsCount += uint64(len(db.getDeletedMetricIDs())) + m.DeletedMetricsCount += uint64(db.getDeletedMetricIDs().Len()) m.IndexDBRefCount += atomic.LoadUint64(&db.refCount) m.MissingTSIDsForMetricID += atomic.LoadUint64(&db.missingTSIDsForMetricID) @@ -237,7 +238,7 @@ func (db *indexDB) SetExtDB(extDB *indexDB) { // Add deleted metricIDs from extDB to db. if extDB != nil { dmisExt := extDB.getDeletedMetricIDs() - metricIDs := getSortedMetricIDs(dmisExt) + metricIDs := dmisExt.AppendTo(nil) db.updateDeletedMetricIDs(metricIDs) } @@ -879,30 +880,27 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) { return deletedCount, nil } -func (db *indexDB) getDeletedMetricIDs() map[uint64]struct{} { - return db.deletedMetricIDs.Load().(map[uint64]struct{}) +func (db *indexDB) getDeletedMetricIDs() *uint64set.Set { + return db.deletedMetricIDs.Load().(*uint64set.Set) } -func (db *indexDB) setDeletedMetricIDs(dmis map[uint64]struct{}) { +func (db *indexDB) setDeletedMetricIDs(dmis *uint64set.Set) { db.deletedMetricIDs.Store(dmis) } func (db *indexDB) updateDeletedMetricIDs(metricIDs []uint64) { db.deletedMetricIDsUpdateLock.Lock() dmisOld := db.getDeletedMetricIDs() - dmisNew := make(map[uint64]struct{}, len(dmisOld)+len(metricIDs)) - for metricID := range dmisOld { - dmisNew[metricID] = struct{}{} - } + dmisNew := dmisOld.Clone() for _, metricID := range metricIDs { - dmisNew[metricID] = struct{}{} + dmisNew.Add(metricID) } db.setDeletedMetricIDs(dmisNew) db.deletedMetricIDsUpdateLock.Unlock() } -func (is *indexSearch) loadDeletedMetricIDs() (map[uint64]struct{}, error) { - dmis := make(map[uint64]struct{}) +func (is *indexSearch) loadDeletedMetricIDs() (*uint64set.Set, error) { + dmis := &uint64set.Set{} ts := &is.ts kb := &is.kb kb.B = append(kb.B[:0], nsPrefixDeteletedMetricID) @@ -917,7 +915,7 @@ func (is *indexSearch) loadDeletedMetricIDs() (map[uint64]struct{}, error) { return nil, fmt.Errorf("unexpected item len; got %d bytes; want %d bytes", len(item), 8) } metricID := encoding.UnmarshalUint64(item) - dmis[metricID] = struct{}{} + dmis.Add(metricID) } if err := ts.Error(); err != nil { return nil, err @@ -1009,9 +1007,9 @@ func (is *indexSearch) getTSIDByMetricName(dst *TSID, metricName []byte) error { if len(tail) > 0 { return fmt.Errorf("unexpected non-empty tail left after unmarshaling TSID: %X", tail) } - if len(dmis) > 0 { + if dmis.Len() > 0 { // Verify whether the dst is marked as deleted. - if _, deleted := dmis[dst.MetricID]; deleted { + if dmis.Has(dst.MetricID) { // The dst is deleted. Continue searching. continue } @@ -1175,9 +1173,9 @@ func (is *indexSearch) getSeriesCount() (uint64, error) { // updateMetricIDsByMetricNameMatch matches metricName values for the given srcMetricIDs against tfs // and adds matching metrics to metricIDs. -func (is *indexSearch) updateMetricIDsByMetricNameMatch(metricIDs, srcMetricIDs map[uint64]struct{}, tfs []*tagFilter) error { +func (is *indexSearch) updateMetricIDsByMetricNameMatch(metricIDs, srcMetricIDs *uint64set.Set, tfs []*tagFilter) error { // sort srcMetricIDs in order to speed up Seek below. - sortedMetricIDs := getSortedMetricIDs(srcMetricIDs) + sortedMetricIDs := srcMetricIDs.AppendTo(nil) metricName := kbPool.Get() defer kbPool.Put(metricName) @@ -1201,12 +1199,12 @@ func (is *indexSearch) updateMetricIDsByMetricNameMatch(metricIDs, srcMetricIDs if !ok { continue } - metricIDs[metricID] = struct{}{} + metricIDs.Add(metricID) } return nil } -func (is *indexSearch) getTagFilterWithMinMetricIDsCountOptimized(tfs *TagFilters, tr TimeRange, maxMetrics int) (*tagFilter, map[uint64]struct{}, error) { +func (is *indexSearch) getTagFilterWithMinMetricIDsCountOptimized(tfs *TagFilters, tr TimeRange, maxMetrics int) (*tagFilter, *uint64set.Set, error) { // Try fast path with the minimized number of maxMetrics. maxMetricsAdjusted := is.adjustMaxMetricsAdaptive(tr, maxMetrics) minTf, minMetricIDs, err := is.getTagFilterWithMinMetricIDsCountAdaptive(tfs, maxMetricsAdjusted) @@ -1243,7 +1241,7 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCountOptimized(tfs *TagFilter if err != nil { return nil, nil, err } - if len(metricIDsForTimeRange) <= maxTimeRangeMetrics { + if metricIDsForTimeRange.Len() <= maxTimeRangeMetrics { return nil, metricIDsForTimeRange, nil } @@ -1273,7 +1271,7 @@ func (is *indexSearch) adjustMaxMetricsAdaptive(tr TimeRange, maxMetrics int) in if !hmPrev.isFull { return maxMetrics } - hourMetrics := len(hmPrev.m) + hourMetrics := hmPrev.m.Len() if hourMetrics >= 256 && maxMetrics > hourMetrics/4 { // It is cheaper to filter on the hour or day metrics if the minimum // number of matching metrics across tfs exceeds hourMetrics / 4. @@ -1282,7 +1280,7 @@ func (is *indexSearch) adjustMaxMetricsAdaptive(tr TimeRange, maxMetrics int) in return maxMetrics } -func (is *indexSearch) getTagFilterWithMinMetricIDsCountAdaptive(tfs *TagFilters, maxMetrics int) (*tagFilter, map[uint64]struct{}, error) { +func (is *indexSearch) getTagFilterWithMinMetricIDsCountAdaptive(tfs *TagFilters, maxMetrics int) (*tagFilter, *uint64set.Set, error) { kb := &is.kb kb.B = append(kb.B[:0], uselessMultiTagFiltersKeyPrefix) kb.B = encoding.MarshalUint64(kb.B, uint64(maxMetrics)) @@ -1304,7 +1302,7 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCountAdaptive(tfs *TagFilters if err != nil { return nil, nil, err } - if len(minMetricIDs) < maxAllowedMetrics { + if minMetricIDs.Len() < maxAllowedMetrics { // Found the tag filter with the minimum number of metrics. return minTf, minMetricIDs, nil } @@ -1330,8 +1328,8 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCountAdaptive(tfs *TagFilters var errTooManyMetrics = errors.New("all the tag filters match too many metrics") -func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMetrics int) (*tagFilter, map[uint64]struct{}, error) { - var minMetricIDs map[uint64]struct{} +func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMetrics int) (*tagFilter, *uint64set.Set, error) { + var minMetricIDs *uint64set.Set var minTf *tagFilter kb := &is.kb uselessTagFilters := 0 @@ -1364,7 +1362,7 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMet } return nil, nil, fmt.Errorf("cannot find MetricIDs for tagFilter %s: %s", tf, err) } - if len(metricIDs) >= maxMetrics { + if metricIDs.Len() >= maxMetrics { // The tf matches at least maxMetrics. Skip it kb.B = append(kb.B[:0], uselessSingleTagFilterKeyPrefix) kb.B = encoding.MarshalUint64(kb.B, uint64(maxMetrics)) @@ -1376,7 +1374,7 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMet minMetricIDs = metricIDs minTf = tf - maxMetrics = len(minMetricIDs) + maxMetrics = minMetricIDs.Len() if maxMetrics <= 1 { // There is no need in inspecting other filters, since minTf // already matches 0 or 1 metric. @@ -1399,11 +1397,11 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMet if len(is.db.uselessTagFiltersCache.Get(nil, kb.B)) > 0 { return nil, nil, errTooManyMetrics } - metricIDs := make(map[uint64]struct{}) + metricIDs := &uint64set.Set{} if err := is.updateMetricIDsAll(metricIDs, maxMetrics); err != nil { return nil, nil, err } - if len(metricIDs) >= maxMetrics { + if metricIDs.Len() >= maxMetrics { kb.B = append(kb.B[:0], uselessNegativeTagFilterKeyPrefix) kb.B = encoding.MarshalUint64(kb.B, uint64(maxMetrics)) kb.B = tfs.marshal(kb.B) @@ -1475,14 +1473,14 @@ func matchTagFilter(b []byte, tf *tagFilter) (bool, error) { } func (is *indexSearch) searchMetricIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]uint64, error) { - metricIDs := make(map[uint64]struct{}) + metricIDs := &uint64set.Set{} for _, tfs := range tfss { if len(tfs.tfs) == 0 { // Return all the metric ids if err := is.updateMetricIDsAll(metricIDs, maxMetrics+1); err != nil { return nil, err } - if len(metricIDs) > maxMetrics { + if metricIDs.Len() > maxMetrics { return nil, fmt.Errorf("the number or unique timeseries exceeds %d; either narrow down the search or increase -search.maxUniqueTimeseries", maxMetrics) } // Stop the iteration, since we cannot find more metric ids with the remaining tfss. @@ -1491,23 +1489,23 @@ func (is *indexSearch) searchMetricIDs(tfss []*TagFilters, tr TimeRange, maxMetr if err := is.updateMetricIDsForTagFilters(metricIDs, tfs, tr, maxMetrics+1); err != nil { return nil, err } - if len(metricIDs) > maxMetrics { + if metricIDs.Len() > maxMetrics { return nil, fmt.Errorf("the number or matching unique timeseries exceeds %d; either narrow down the search or increase -search.maxUniqueTimeseries", maxMetrics) } } - if len(metricIDs) == 0 { + if metricIDs.Len() == 0 { // Nothing found return nil, nil } - sortedMetricIDs := getSortedMetricIDs(metricIDs) + sortedMetricIDs := metricIDs.AppendTo(nil) // Filter out deleted metricIDs. dmis := is.db.getDeletedMetricIDs() - if len(dmis) > 0 { + if dmis.Len() > 0 { metricIDsFiltered := sortedMetricIDs[:0] for _, metricID := range sortedMetricIDs { - if _, deleted := dmis[metricID]; !deleted { + if !dmis.Has(metricID) { metricIDsFiltered = append(metricIDsFiltered, metricID) } } @@ -1517,7 +1515,7 @@ func (is *indexSearch) searchMetricIDs(tfss []*TagFilters, tr TimeRange, maxMetr return sortedMetricIDs, nil } -func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs map[uint64]struct{}, tfs *TagFilters, tr TimeRange, maxMetrics int) error { +func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange, maxMetrics int) error { // Sort tag filters for faster ts.Seek below. sort.Slice(tfs.tfs, func(i, j int) bool { return bytes.Compare(tfs.tfs[i].prefix, tfs.tfs[j].prefix) < 0 }) @@ -1560,8 +1558,8 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs map[uint64]struct{ } minMetricIDs = mIDs } - for metricID := range minMetricIDs { - metricIDs[metricID] = struct{}{} + for _, metricID := range minMetricIDs.AppendTo(nil) { + metricIDs.Add(metricID) } return nil } @@ -1574,11 +1572,11 @@ const ( var uselessTagFilterCacheValue = []byte("1") -func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int) (map[uint64]struct{}, error) { +func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int) (*uint64set.Set, error) { if tf.isNegative { logger.Panicf("BUG: isNegative must be false") } - metricIDs := make(map[uint64]struct{}, maxMetrics) + metricIDs := &uint64set.Set{} if len(tf.orSuffixes) > 0 { // Fast path for orSuffixes - seek for rows for each value from orSuffxies. if err := is.updateMetricIDsForOrSuffixesNoFilter(tf, maxMetrics, metricIDs); err != nil { @@ -1593,8 +1591,8 @@ func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int) ( // Slow path - scan for all the rows with the given prefix. maxLoops := maxMetrics * maxIndexScanLoopsPerMetric err := is.getMetricIDsForTagFilterSlow(tf, maxLoops, func(metricID uint64) bool { - metricIDs[metricID] = struct{}{} - return len(metricIDs) < maxMetrics + metricIDs.Add(metricID) + return metricIDs.Len() < maxMetrics }) if err != nil { if err == errFallbackToMetricNameMatch { @@ -1689,7 +1687,7 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, maxLoops int, return nil } -func (is *indexSearch) updateMetricIDsForOrSuffixesNoFilter(tf *tagFilter, maxMetrics int, metricIDs map[uint64]struct{}) error { +func (is *indexSearch) updateMetricIDsForOrSuffixesNoFilter(tf *tagFilter, maxMetrics int, metricIDs *uint64set.Set) error { if tf.isNegative { logger.Panicf("BUG: isNegative must be false") } @@ -1702,15 +1700,15 @@ func (is *indexSearch) updateMetricIDsForOrSuffixesNoFilter(tf *tagFilter, maxMe if err := is.updateMetricIDsForOrSuffixNoFilter(kb.B, maxMetrics, metricIDs); err != nil { return err } - if len(metricIDs) >= maxMetrics { + if metricIDs.Len() >= maxMetrics { return nil } } return nil } -func (is *indexSearch) updateMetricIDsForOrSuffixesWithFilter(tf *tagFilter, metricIDs, filter map[uint64]struct{}) error { - sortedFilter := getSortedMetricIDs(filter) +func (is *indexSearch) updateMetricIDsForOrSuffixesWithFilter(tf *tagFilter, metricIDs, filter *uint64set.Set) error { + sortedFilter := filter.AppendTo(nil) kb := kbPool.Get() defer kbPool.Put(kb) for _, orSuffix := range tf.orSuffixes { @@ -1724,14 +1722,14 @@ func (is *indexSearch) updateMetricIDsForOrSuffixesWithFilter(tf *tagFilter, met return nil } -func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetrics int, metricIDs map[uint64]struct{}) error { +func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetrics int, metricIDs *uint64set.Set) error { ts := &is.ts mp := &is.mp mp.Reset() maxLoops := maxMetrics * maxIndexScanLoopsPerMetric loops := 0 ts.Seek(prefix) - for len(metricIDs) < maxMetrics && ts.NextItem() { + for metricIDs.Len() < maxMetrics && ts.NextItem() { item := ts.Item if !bytes.HasPrefix(item, prefix) { return nil @@ -1745,7 +1743,7 @@ func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetr } mp.ParseMetricIDs() for _, metricID := range mp.MetricIDs { - metricIDs[metricID] = struct{}{} + metricIDs.Add(metricID) } } if err := ts.Error(); err != nil { @@ -1754,7 +1752,7 @@ func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetr return nil } -func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metricIDs map[uint64]struct{}, sortedFilter []uint64, isNegative bool) error { +func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metricIDs *uint64set.Set, sortedFilter []uint64, isNegative bool) error { if len(sortedFilter) == 0 { return nil } @@ -1810,9 +1808,9 @@ func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metri continue } if isNegative { - delete(metricIDs, metricID) + metricIDs.Del(metricID) } else { - metricIDs[metricID] = struct{}{} + metricIDs.Add(metricID) } sf = sf[1:] } @@ -1827,7 +1825,7 @@ var errFallbackToMetricNameMatch = errors.New("fall back to updateMetricIDsByMet var errMissingMetricIDsForDate = errors.New("missing metricIDs for date") -func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (map[uint64]struct{}, error) { +func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (*uint64set.Set, error) { if tr.isZero() { return nil, errMissingMetricIDsForDate } @@ -1847,7 +1845,7 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (m // Too much dates must be covered. Give up. return nil, errMissingMetricIDsForDate } - metricIDs := make(map[uint64]struct{}, maxMetrics) + metricIDs := &uint64set.Set{} for minDate <= maxDate { if err := is.getMetricIDsForDate(minDate, metricIDs, maxMetrics); err != nil { return nil, err @@ -1858,7 +1856,7 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (m return metricIDs, nil } -func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int) (map[uint64]struct{}, bool) { +func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int) (*uint64set.Set, bool) { minHour := uint64(tr.MinTimestamp) / msecPerHour maxHour := uint64(tr.MaxTimestamp) / msecPerHour hmCurr := is.db.currHourMetricIDs.Load().(*hourMetricIDs) @@ -1866,46 +1864,35 @@ func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int) // The tr fits the current hour. // Return a copy of hmCurr.m, because the caller may modify // the returned map. - if len(hmCurr.m) > maxMetrics { + if hmCurr.m.Len() > maxMetrics { return nil, false } - return getMetricIDsCopy(hmCurr.m), true + return hmCurr.m.Clone(), true } hmPrev := is.db.prevHourMetricIDs.Load().(*hourMetricIDs) if maxHour == hmPrev.hour && minHour == maxHour && hmPrev.isFull { // The tr fits the previous hour. // Return a copy of hmPrev.m, because the caller may modify // the returned map. - if len(hmPrev.m) > maxMetrics { + if hmPrev.m.Len() > maxMetrics { return nil, false } - return getMetricIDsCopy(hmPrev.m), true + return hmPrev.m.Clone(), true } if maxHour == hmCurr.hour && minHour == hmPrev.hour && hmCurr.isFull && hmPrev.isFull { // The tr spans the previous and the current hours. - if len(hmCurr.m)+len(hmPrev.m) > maxMetrics { + if hmCurr.m.Len()+hmPrev.m.Len() > maxMetrics { return nil, false } - metricIDs := make(map[uint64]struct{}, len(hmCurr.m)+len(hmPrev.m)) - for metricID := range hmCurr.m { - metricIDs[metricID] = struct{}{} - } - for metricID := range hmPrev.m { - metricIDs[metricID] = struct{}{} + metricIDs := hmCurr.m.Clone() + for _, metricID := range hmPrev.m.AppendTo(nil) { + metricIDs.Add(metricID) } return metricIDs, true } return nil, false } -func getMetricIDsCopy(src map[uint64]struct{}) map[uint64]struct{} { - dst := make(map[uint64]struct{}, len(src)) - for metricID := range src { - dst[metricID] = struct{}{} - } - return dst -} - func (db *indexDB) storeDateMetricID(date, metricID uint64) error { is := db.getIndexSearch() ok, err := is.hasDateMetricID(date, metricID) @@ -1947,14 +1934,14 @@ func (is *indexSearch) hasDateMetricID(date, metricID uint64) (bool, error) { return true, nil } -func (is *indexSearch) getMetricIDsForDate(date uint64, metricIDs map[uint64]struct{}, maxMetrics int) error { +func (is *indexSearch) getMetricIDsForDate(date uint64, metricIDs *uint64set.Set, maxMetrics int) error { ts := &is.ts kb := &is.kb kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateToMetricID) kb.B = encoding.MarshalUint64(kb.B, date) ts.Seek(kb.B) items := 0 - for len(metricIDs) < maxMetrics && ts.NextItem() { + for metricIDs.Len() < maxMetrics && ts.NextItem() { if !bytes.HasPrefix(ts.Item, kb.B) { break } @@ -1964,7 +1951,7 @@ func (is *indexSearch) getMetricIDsForDate(date uint64, metricIDs map[uint64]str return fmt.Errorf("cannot extract metricID from k; want %d bytes; got %d bytes", 8, len(v)) } metricID := encoding.UnmarshalUint64(v) - metricIDs[metricID] = struct{}{} + metricIDs.Add(metricID) items++ } if err := ts.Error(); err != nil { @@ -2000,7 +1987,7 @@ func (is *indexSearch) containsTimeRange(tr TimeRange) (bool, error) { return true, nil } -func (is *indexSearch) updateMetricIDsAll(metricIDs map[uint64]struct{}, maxMetrics int) error { +func (is *indexSearch) updateMetricIDsAll(metricIDs *uint64set.Set, maxMetrics int) error { ts := &is.ts kb := &is.kb kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixMetricIDToTSID) @@ -2016,8 +2003,8 @@ func (is *indexSearch) updateMetricIDsAll(metricIDs map[uint64]struct{}, maxMetr return fmt.Errorf("cannot unmarshal metricID from item with size %d; need at least 9 bytes; item=%q", len(tail), tail) } metricID := encoding.UnmarshalUint64(tail) - metricIDs[metricID] = struct{}{} - if len(metricIDs) >= maxMetrics { + metricIDs.Add(metricID) + if metricIDs.Len() >= maxMetrics { return nil } } @@ -2032,13 +2019,13 @@ func (is *indexSearch) updateMetricIDsAll(metricIDs map[uint64]struct{}, maxMetr // over the found metrics. const maxIndexScanLoopsPerMetric = 400 -func (is *indexSearch) intersectMetricIDsWithTagFilter(tf *tagFilter, filter map[uint64]struct{}) (map[uint64]struct{}, error) { - if len(filter) == 0 { +func (is *indexSearch) intersectMetricIDsWithTagFilter(tf *tagFilter, filter *uint64set.Set) (*uint64set.Set, error) { + if filter.Len() == 0 { return nil, nil } metricIDs := filter if !tf.isNegative { - metricIDs = make(map[uint64]struct{}, len(filter)) + metricIDs = &uint64set.Set{} } if len(tf.orSuffixes) > 0 { // Fast path for orSuffixes - seek for rows for each value from orSuffixes. @@ -2052,15 +2039,15 @@ func (is *indexSearch) intersectMetricIDsWithTagFilter(tf *tagFilter, filter map } // Slow path - scan for all the rows with the given prefix. - maxLoops := len(filter) * maxIndexScanLoopsPerMetric + maxLoops := filter.Len() * maxIndexScanLoopsPerMetric err := is.getMetricIDsForTagFilterSlow(tf, maxLoops, func(metricID uint64) bool { if tf.isNegative { // filter must be equal to metricIDs - delete(metricIDs, metricID) + metricIDs.Del(metricID) return true } - if _, ok := filter[metricID]; ok { - metricIDs[metricID] = struct{}{} + if filter.Has(metricID) { + metricIDs.Add(metricID) } return true }) @@ -2101,18 +2088,6 @@ func unmarshalCommonPrefix(src []byte) ([]byte, byte, error) { // 1 byte for prefix const commonPrefixLen = 1 -func getSortedMetricIDs(m map[uint64]struct{}) []uint64 { - a := make(uint64Sorter, len(m)) - i := 0 - for metricID := range m { - a[i] = metricID - i++ - } - // Use sort.Sort instead of sort.Slice in order to reduce memory allocations - sort.Sort(a) - return a -} - type tagToMetricIDsRowParser struct { // MetricIDs contains parsed MetricIDs after ParseMetricIDs call MetricIDs []uint64 @@ -2216,13 +2191,13 @@ func (mp *tagToMetricIDsRowParser) ParseMetricIDs() { // IsDeletedTag verifies whether the tag from mp is deleted according to dmis. // // dmis must contain deleted MetricIDs. -func (mp *tagToMetricIDsRowParser) IsDeletedTag(dmis map[uint64]struct{}) bool { - if len(dmis) == 0 { +func (mp *tagToMetricIDsRowParser) IsDeletedTag(dmis *uint64set.Set) bool { + if dmis.Len() == 0 { return false } mp.ParseMetricIDs() for _, metricID := range mp.MetricIDs { - if _, ok := dmis[metricID]; !ok { + if !dmis.Has(metricID) { return false } } diff --git a/lib/storage/merge.go b/lib/storage/merge.go index 524b8a4d13..8d696cb277 100644 --- a/lib/storage/merge.go +++ b/lib/storage/merge.go @@ -6,6 +6,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" ) // mergeBlockStreams merges bsrs into bsw and updates ph. @@ -14,7 +15,7 @@ import ( // // rowsMerged is atomically updated with the number of merged rows during the merge. func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{}, rowsMerged *uint64, - deletedMetricIDs map[uint64]struct{}, rowsDeleted *uint64) error { + deletedMetricIDs *uint64set.Set, rowsDeleted *uint64) error { ph.Reset() bsm := bsmPool.Get().(*blockStreamMerger) @@ -41,7 +42,7 @@ var bsmPool = &sync.Pool{ var errForciblyStopped = fmt.Errorf("forcibly stopped") func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *blockStreamMerger, stopCh <-chan struct{}, rowsMerged *uint64, - deletedMetricIDs map[uint64]struct{}, rowsDeleted *uint64) error { + deletedMetricIDs *uint64set.Set, rowsDeleted *uint64) error { // Search for the first block to merge var pendingBlock *Block for bsm.NextBlock() { @@ -50,7 +51,7 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc return errForciblyStopped default: } - if _, deleted := deletedMetricIDs[bsm.Block.bh.TSID.MetricID]; deleted { + if deletedMetricIDs.Has(bsm.Block.bh.TSID.MetricID) { // Skip blocks for deleted metrics. *rowsDeleted += uint64(bsm.Block.bh.RowsCount) continue @@ -72,7 +73,7 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc return errForciblyStopped default: } - if _, deleted := deletedMetricIDs[bsm.Block.bh.TSID.MetricID]; deleted { + if deletedMetricIDs.Has(bsm.Block.bh.TSID.MetricID) { // Skip blocks for deleted metrics. *rowsDeleted += uint64(bsm.Block.bh.RowsCount) continue diff --git a/lib/storage/partition.go b/lib/storage/partition.go index b9fe8f6cbf..404bc28a9c 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -19,6 +19,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" ) func maxRowsPerSmallPart() uint64 { @@ -93,7 +94,7 @@ type partition struct { bigPartsPath string // The callack that returns deleted metric ids which must be skipped during merge. - getDeletedMetricIDs func() map[uint64]struct{} + getDeletedMetricIDs func() *uint64set.Set // Name is the name of the partition in the form YYYY_MM. name string @@ -183,7 +184,7 @@ func (pw *partWrapper) decRef() { // createPartition creates new partition for the given timestamp and the given paths // to small and big partitions. -func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() map[uint64]struct{}) (*partition, error) { +func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() *uint64set.Set) (*partition, error) { name := timestampToPartitionName(timestamp) smallPartsPath := filepath.Clean(smallPartitionsPath) + "/" + name bigPartsPath := filepath.Clean(bigPartitionsPath) + "/" + name @@ -218,7 +219,7 @@ func (pt *partition) Drop() { } // openPartition opens the existing partition from the given paths. -func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() map[uint64]struct{}) (*partition, error) { +func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set) (*partition, error) { smallPartsPath = filepath.Clean(smallPartsPath) bigPartsPath = filepath.Clean(bigPartsPath) @@ -255,7 +256,7 @@ func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func return pt, nil } -func newPartition(name, smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() map[uint64]struct{}) *partition { +func newPartition(name, smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set) *partition { return &partition{ name: name, smallPartsPath: smallPartsPath, diff --git a/lib/storage/partition_search_test.go b/lib/storage/partition_search_test.go index fba8785907..7089653e7c 100644 --- a/lib/storage/partition_search_test.go +++ b/lib/storage/partition_search_test.go @@ -7,6 +7,8 @@ import ( "sort" "testing" "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" ) func TestPartitionSearch(t *testing.T) { @@ -284,6 +286,6 @@ func testPartitionSearchSerial(pt *partition, tsids []TSID, tr TimeRange, rbsExp return nil } -func nilGetDeletedMetricIDs() map[uint64]struct{} { +func nilGetDeletedMetricIDs() *uint64set.Set { return nil } diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 53e7edd1f7..3fd293b1e2 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -20,6 +20,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" "github.com/VictoriaMetrics/fastcache" ) @@ -59,7 +60,7 @@ type Storage struct { // Pending MetricID values to be added to currHourMetricIDs. pendingHourMetricIDsLock sync.Mutex - pendingHourMetricIDs map[uint64]struct{} + pendingHourMetricIDs *uint64set.Set stop chan struct{} @@ -122,7 +123,7 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) { hmPrev := s.mustLoadHourMetricIDs(hour-1, "prev_hour_metric_ids") s.currHourMetricIDs.Store(hmCurr) s.prevHourMetricIDs.Store(hmPrev) - s.pendingHourMetricIDs = make(map[uint64]struct{}) + s.pendingHourMetricIDs = &uint64set.Set{} // Load indexdb idbPath := path + "/indexdb" @@ -158,7 +159,7 @@ func (s *Storage) debugFlush() { s.idb().tb.DebugFlush() } -func (s *Storage) getDeletedMetricIDs() map[uint64]struct{} { +func (s *Storage) getDeletedMetricIDs() *uint64set.Set { return s.idb().getDeletedMetricIDs() } @@ -364,9 +365,9 @@ func (s *Storage) UpdateMetrics(m *Metrics) { hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) - hourMetricIDsLen := len(hmPrev.m) - if len(hmCurr.m) > hourMetricIDsLen { - hourMetricIDsLen = len(hmCurr.m) + hourMetricIDsLen := hmPrev.m.Len() + if hmCurr.m.Len() > hourMetricIDsLen { + hourMetricIDsLen = hmCurr.m.Len() } m.HourMetricIDCacheSize += uint64(hourMetricIDsLen) @@ -508,11 +509,11 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs logger.Errorf("discarding %s, since it has broken body; got %d bytes; want %d bytes", path, len(src), 8*hmLen) return &hourMetricIDs{} } - m := make(map[uint64]struct{}, hmLen) + m := &uint64set.Set{} for i := uint64(0); i < hmLen; i++ { metricID := encoding.UnmarshalUint64(src) src = src[8:] - m[metricID] = struct{}{} + m.Add(metricID) } logger.Infof("loaded %s from %q in %s; entriesCount: %d; sizeBytes: %d", name, path, time.Since(startTime), hmLen, srcOrigLen) return &hourMetricIDs{ @@ -526,21 +527,21 @@ func (s *Storage) mustSaveHourMetricIDs(hm *hourMetricIDs, name string) { path := s.cachePath + "/" + name logger.Infof("saving %s to %q...", name, path) startTime := time.Now() - dst := make([]byte, 0, len(hm.m)*8+24) + dst := make([]byte, 0, hm.m.Len()*8+24) isFull := uint64(0) if hm.isFull { isFull = 1 } dst = encoding.MarshalUint64(dst, isFull) dst = encoding.MarshalUint64(dst, hm.hour) - dst = encoding.MarshalUint64(dst, uint64(len(hm.m))) - for metricID := range hm.m { + dst = encoding.MarshalUint64(dst, uint64(hm.m.Len())) + for _, metricID := range hm.m.AppendTo(nil) { dst = encoding.MarshalUint64(dst, metricID) } if err := ioutil.WriteFile(path, dst, 0644); err != nil { logger.Panicf("FATAL: cannot write %d bytes to %q: %s", len(dst), path, err) } - logger.Infof("saved %s to %q in %s; entriesCount: %d; sizeBytes: %d", name, path, time.Since(startTime), len(hm.m), len(dst)) + logger.Infof("saved %s to %q in %s; entriesCount: %d; sizeBytes: %d", name, path, time.Since(startTime), hm.m.Len(), len(dst)) } func (s *Storage) mustLoadCache(info, name string, sizeBytes int) *workingsetcache.Cache { @@ -810,11 +811,11 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra r.Value = mr.Value r.PrecisionBits = precisionBits if s.getTSIDFromCache(&r.TSID, mr.MetricNameRaw) { - if len(dmis) == 0 { + if dmis.Len() == 0 { // Fast path - the TSID for the given MetricName has been found in cache and isn't deleted. continue } - if _, deleted := dmis[r.TSID.MetricID]; !deleted { + if !dmis.Has(r.TSID.MetricID) { // Fast path - the TSID for the given MetricName has been found in cache and isn't deleted. continue } @@ -884,12 +885,12 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error hm := s.currHourMetricIDs.Load().(*hourMetricIDs) if hour == hm.hour { // The r belongs to the current hour. Check for the current hour cache. - if _, ok := hm.m[metricID]; ok { + if hm.m.Has(metricID) { // Fast path: the metricID is in the current hour cache. continue } s.pendingHourMetricIDsLock.Lock() - s.pendingHourMetricIDs[metricID] = struct{}{} + s.pendingHourMetricIDs.Add(metricID) s.pendingHourMetricIDsLock.Unlock() } @@ -915,7 +916,7 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error func (s *Storage) updateCurrHourMetricIDs() { hm := s.currHourMetricIDs.Load().(*hourMetricIDs) s.pendingHourMetricIDsLock.Lock() - newMetricIDsLen := len(s.pendingHourMetricIDs) + newMetricIDsLen := s.pendingHourMetricIDs.Len() s.pendingHourMetricIDsLock.Unlock() hour := uint64(timestampFromTime(time.Now())) / msecPerHour if newMetricIDsLen == 0 && hm.hour == hour { @@ -924,23 +925,20 @@ func (s *Storage) updateCurrHourMetricIDs() { } // Slow path: hm.m must be updated with non-empty s.pendingHourMetricIDs. - var m map[uint64]struct{} + var m *uint64set.Set isFull := hm.isFull if hm.hour == hour { - m = make(map[uint64]struct{}, len(hm.m)+newMetricIDsLen) - for metricID := range hm.m { - m[metricID] = struct{}{} - } + m = hm.m.Clone() } else { - m = make(map[uint64]struct{}, newMetricIDsLen) + m = &uint64set.Set{} isFull = true } s.pendingHourMetricIDsLock.Lock() - newMetricIDs := s.pendingHourMetricIDs - s.pendingHourMetricIDs = make(map[uint64]struct{}, len(newMetricIDs)) + newMetricIDs := s.pendingHourMetricIDs.AppendTo(nil) + s.pendingHourMetricIDs = &uint64set.Set{} s.pendingHourMetricIDsLock.Unlock() - for metricID := range newMetricIDs { - m[metricID] = struct{}{} + for _, metricID := range newMetricIDs { + m.Add(metricID) } hmNew := &hourMetricIDs{ @@ -955,7 +953,7 @@ func (s *Storage) updateCurrHourMetricIDs() { } type hourMetricIDs struct { - m map[uint64]struct{} + m *uint64set.Set hour uint64 isFull bool } diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index dca77da27f..2334994b93 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -9,6 +9,8 @@ import ( "testing" "testing/quick" "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" ) func TestUpdateCurrHourMetricIDs(t *testing.T) { @@ -16,19 +18,18 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { var s Storage s.currHourMetricIDs.Store(&hourMetricIDs{}) s.prevHourMetricIDs.Store(&hourMetricIDs{}) - s.pendingHourMetricIDs = make(map[uint64]struct{}) + s.pendingHourMetricIDs = &uint64set.Set{} return &s } t.Run("empty_pedning_metric_ids_stale_curr_hour", func(t *testing.T) { s := newStorage() hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmOrig := &hourMetricIDs{ - m: map[uint64]struct{}{ - 12: {}, - 34: {}, - }, + m: &uint64set.Set{}, hour: 123, } + hmOrig.m.Add(12) + hmOrig.m.Add(34) s.currHourMetricIDs.Store(hmOrig) s.updateCurrHourMetricIDs() hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) @@ -39,8 +40,8 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) } } - if len(hmCurr.m) != 0 { - t.Fatalf("unexpected length of hm.m; got %d; want %d", len(hmCurr.m), 0) + if hmCurr.m.Len() != 0 { + t.Fatalf("unexpected length of hm.m; got %d; want %d", hmCurr.m.Len(), 0) } if !hmCurr.isFull { t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, true) @@ -51,20 +52,19 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig) } - if len(s.pendingHourMetricIDs) != 0 { - t.Fatalf("unexpected len(s.pendingHourMetricIDs); got %d; want %d", len(s.pendingHourMetricIDs), 0) + if s.pendingHourMetricIDs.Len() != 0 { + t.Fatalf("unexpected s.pendingHourMetricIDs.Len(); got %d; want %d", s.pendingHourMetricIDs.Len(), 0) } }) t.Run("empty_pedning_metric_ids_valid_curr_hour", func(t *testing.T) { s := newStorage() hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmOrig := &hourMetricIDs{ - m: map[uint64]struct{}{ - 12: {}, - 34: {}, - }, + m: &uint64set.Set{}, hour: hour, } + hmOrig.m.Add(12) + hmOrig.m.Add(34) s.currHourMetricIDs.Store(hmOrig) s.updateCurrHourMetricIDs() hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) @@ -90,27 +90,25 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty) } - if len(s.pendingHourMetricIDs) != 0 { - t.Fatalf("unexpected len(s.pendingHourMetricIDs); got %d; want %d", len(s.pendingHourMetricIDs), 0) + if s.pendingHourMetricIDs.Len() != 0 { + t.Fatalf("unexpected s.pendingHourMetricIDs.Len(); got %d; want %d", s.pendingHourMetricIDs.Len(), 0) } }) t.Run("nonempty_pending_metric_ids_stale_curr_hour", func(t *testing.T) { s := newStorage() - pendingHourMetricIDs := map[uint64]struct{}{ - 343: {}, - 32424: {}, - 8293432: {}, - } + pendingHourMetricIDs := &uint64set.Set{} + pendingHourMetricIDs.Add(343) + pendingHourMetricIDs.Add(32424) + pendingHourMetricIDs.Add(8293432) s.pendingHourMetricIDs = pendingHourMetricIDs hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmOrig := &hourMetricIDs{ - m: map[uint64]struct{}{ - 12: {}, - 34: {}, - }, + m: &uint64set.Set{}, hour: 123, } + hmOrig.m.Add(12) + hmOrig.m.Add(34) s.currHourMetricIDs.Store(hmOrig) s.updateCurrHourMetricIDs() hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) @@ -133,27 +131,25 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig) } - if len(s.pendingHourMetricIDs) != 0 { - t.Fatalf("unexpected len(s.pendingHourMetricIDs); got %d; want %d", len(s.pendingHourMetricIDs), 0) + if s.pendingHourMetricIDs.Len() != 0 { + t.Fatalf("unexpected s.pendingHourMetricIDs.Len(); got %d; want %d", s.pendingHourMetricIDs.Len(), 0) } }) t.Run("nonempty_pending_metric_ids_valid_curr_hour", func(t *testing.T) { s := newStorage() - pendingHourMetricIDs := map[uint64]struct{}{ - 343: {}, - 32424: {}, - 8293432: {}, - } + pendingHourMetricIDs := &uint64set.Set{} + pendingHourMetricIDs.Add(343) + pendingHourMetricIDs.Add(32424) + pendingHourMetricIDs.Add(8293432) s.pendingHourMetricIDs = pendingHourMetricIDs hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmOrig := &hourMetricIDs{ - m: map[uint64]struct{}{ - 12: {}, - 34: {}, - }, + m: &uint64set.Set{}, hour: hour, } + hmOrig.m.Add(12) + hmOrig.m.Add(34) s.currHourMetricIDs.Store(hmOrig) s.updateCurrHourMetricIDs() hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) @@ -166,9 +162,10 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { // Do not run other checks, since they may fail. return } - m := getMetricIDsCopy(pendingHourMetricIDs) - for metricID := range hmOrig.m { - m[metricID] = struct{}{} + m := pendingHourMetricIDs.Clone() + origMetricIDs := hmOrig.m.AppendTo(nil) + for _, metricID := range origMetricIDs { + m.Add(metricID) } if !reflect.DeepEqual(hmCurr.m, m) { t.Fatalf("unexpected hm.m; got %v; want %v", hmCurr.m, m) @@ -183,8 +180,8 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty) } - if len(s.pendingHourMetricIDs) != 0 { - t.Fatalf("unexpected len(s.pendingHourMetricIDs); got %d; want %d", len(s.pendingHourMetricIDs), 0) + if s.pendingHourMetricIDs.Len() != 0 { + t.Fatalf("unexpected s.pendingHourMetricIDs.Len(); got %d; want %d", s.pendingHourMetricIDs.Len(), 0) } }) } diff --git a/lib/storage/table.go b/lib/storage/table.go index 9a0cc3d6e4..63f82a02f3 100644 --- a/lib/storage/table.go +++ b/lib/storage/table.go @@ -10,6 +10,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" ) // table represents a single table with time series data. @@ -18,7 +19,7 @@ type table struct { smallPartitionsPath string bigPartitionsPath string - getDeletedMetricIDs func() map[uint64]struct{} + getDeletedMetricIDs func() *uint64set.Set ptws []*partitionWrapper ptwsLock sync.Mutex @@ -75,7 +76,7 @@ func (ptw *partitionWrapper) scheduleToDrop() { // The table is created if it doesn't exist. // // Data older than the retentionMonths may be dropped at any time. -func openTable(path string, retentionMonths int, getDeletedMetricIDs func() map[uint64]struct{}) (*table, error) { +func openTable(path string, retentionMonths int, getDeletedMetricIDs func() *uint64set.Set) (*table, error) { path = filepath.Clean(path) // Create a directory for the table if it doesn't exist yet. @@ -430,7 +431,7 @@ func (tb *table) PutPartitions(ptws []*partitionWrapper) { } } -func openPartitions(smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() map[uint64]struct{}) ([]*partition, error) { +func openPartitions(smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() *uint64set.Set) ([]*partition, error) { smallD, err := os.Open(smallPartitionsPath) if err != nil { return nil, fmt.Errorf("cannot open directory with small partitions %q: %s", smallPartitionsPath, err) diff --git a/lib/uint64set/uint64set.go b/lib/uint64set/uint64set.go new file mode 100644 index 0000000000..e357c6669c --- /dev/null +++ b/lib/uint64set/uint64set.go @@ -0,0 +1,332 @@ +package uint64set + +import ( + "sort" +) + +// Set is a fast set for uint64. +// +// It should work faster than map[uint64]struct{} for semi-sparse uint64 values +// such as MetricIDs generated by lib/storage. +// +// It is unsafe calling Set methods from concurrent goroutines. +type Set struct { + itemsCount int + buckets bucket32Sorter +} + +type bucket32Sorter []*bucket32 + +func (s *bucket32Sorter) Len() int { return len(*s) } +func (s *bucket32Sorter) Less(i, j int) bool { + a := *s + return a[i].hi < a[j].hi +} +func (s *bucket32Sorter) Swap(i, j int) { + a := *s + a[i], a[j] = a[j], a[i] +} + +// Clone returns an independent copy of s. +func (s *Set) Clone() *Set { + if s == nil { + return nil + } + var dst Set + dst.itemsCount = s.itemsCount + dst.buckets = make([]*bucket32, len(s.buckets)) + for i, b32 := range s.buckets { + dst.buckets[i] = b32.clone() + } + return &dst +} + +// Len returns the number of distinct uint64 values in s. +func (s *Set) Len() int { + if s == nil { + return 0 + } + return s.itemsCount +} + +// Add adds x to s. +func (s *Set) Add(x uint64) { + hi := uint32(x >> 32) + lo := uint32(x) + for _, b32 := range s.buckets { + if b32.hi == hi { + if b32.add(lo) { + s.itemsCount++ + } + return + } + } + s.addAlloc(hi, lo) +} + +func (s *Set) addAlloc(hi, lo uint32) { + var b32 bucket32 + b32.hi = hi + _ = b32.add(lo) + s.itemsCount++ + s.buckets = append(s.buckets, &b32) +} + +// Has verifies whether x exists in s. +func (s *Set) Has(x uint64) bool { + hi := uint32(x >> 32) + lo := uint32(x) + if s == nil { + return false + } + for _, b32 := range s.buckets { + if b32.hi == hi { + return b32.has(lo) + } + } + return false +} + +// Del deletes x from s. +func (s *Set) Del(x uint64) { + hi := uint32(x >> 32) + lo := uint32(x) + for _, b32 := range s.buckets { + if b32.hi == hi { + if b32.del(lo) { + s.itemsCount-- + } + return + } + } +} + +// AppendTo appends all the items from the set to dst and returns the result. +// +// The returned items are sorted. +func (s *Set) AppendTo(dst []uint64) []uint64 { + if s == nil { + return dst + } + // pre-allocate memory for dst + dstLen := len(dst) + if n := s.Len() - cap(dst) + dstLen; n > 0 { + dst = append(dst[:cap(dst)], make([]uint64, n)...) + dst = dst[:dstLen] + } + // sort s.buckets if it isn't sorted yet + if !sort.IsSorted(&s.buckets) { + sort.Sort(&s.buckets) + } + for _, b32 := range s.buckets { + dst = b32.appendTo(dst) + } + return dst +} + +type bucket32 struct { + hi uint32 + b16his []uint16 + buckets []*bucket16 +} + +func (b *bucket32) clone() *bucket32 { + var dst bucket32 + dst.hi = b.hi + dst.b16his = append(dst.b16his[:0], b.b16his...) + dst.buckets = make([]*bucket16, len(b.buckets)) + for i, b16 := range b.buckets { + dst.buckets[i] = b16.clone() + } + return &dst +} + +// This is for sort.Interface +func (b *bucket32) Len() int { return len(b.b16his) } +func (b *bucket32) Less(i, j int) bool { return b.b16his[i] < b.b16his[j] } +func (b *bucket32) Swap(i, j int) { + his := b.b16his + buckets := b.buckets + his[i], his[j] = his[j], his[i] + buckets[i], buckets[j] = buckets[j], buckets[i] +} + +const maxUnsortedBuckets = 32 + +func (b *bucket32) add(x uint32) bool { + hi := uint16(x >> 16) + lo := uint16(x) + if len(b.buckets) > maxUnsortedBuckets { + return b.addSlow(hi, lo) + } + for i, hi16 := range b.b16his { + if hi16 == hi { + return i < len(b.buckets) && b.buckets[i].add(lo) + } + } + b.addAllocSmall(hi, lo) + return true +} + +func (b *bucket32) addAllocSmall(hi, lo uint16) { + var b16 bucket16 + _ = b16.add(lo) + b.b16his = append(b.b16his, hi) + b.buckets = append(b.buckets, &b16) + if len(b.buckets) > maxUnsortedBuckets { + sort.Sort(b) + } +} + +func (b *bucket32) addSlow(hi, lo uint16) bool { + n := binarySearch16(b.b16his, hi) + if n < 0 || n >= len(b.b16his) || b.b16his[n] != hi { + b.addAllocBig(hi, lo, n) + return true + } + return n < len(b.buckets) && b.buckets[n].add(lo) +} + +func (b *bucket32) addAllocBig(hi, lo uint16, n int) { + if n < 0 { + return + } + var b16 bucket16 + _ = b16.add(lo) + if n >= len(b.b16his) { + b.b16his = append(b.b16his, hi) + b.buckets = append(b.buckets, &b16) + return + } + b.b16his = append(b.b16his[:n+1], b.b16his[n:]...) + b.b16his[n] = hi + b.buckets = append(b.buckets[:n+1], b.buckets[n:]...) + b.buckets[n] = &b16 +} + +func (b *bucket32) has(x uint32) bool { + hi := uint16(x >> 16) + lo := uint16(x) + if len(b.buckets) > maxUnsortedBuckets { + return b.hasSlow(hi, lo) + } + for i, hi16 := range b.b16his { + if hi16 == hi { + return i < len(b.buckets) && b.buckets[i].has(lo) + } + } + return false +} + +func (b *bucket32) hasSlow(hi, lo uint16) bool { + n := binarySearch16(b.b16his, hi) + if n < 0 || n >= len(b.b16his) || b.b16his[n] != hi { + return false + } + return n < len(b.buckets) && b.buckets[n].has(lo) +} + +func (b *bucket32) del(x uint32) bool { + hi := uint16(x >> 16) + lo := uint16(x) + if len(b.buckets) > maxUnsortedBuckets { + return b.delSlow(hi, lo) + } + for i, hi16 := range b.b16his { + if hi16 == hi { + return i < len(b.buckets) && b.buckets[i].del(lo) + } + } + return false +} + +func (b *bucket32) delSlow(hi, lo uint16) bool { + n := binarySearch16(b.b16his, hi) + if n < 0 || n >= len(b.b16his) || b.b16his[n] != hi { + return false + } + return n < len(b.buckets) && b.buckets[n].del(lo) +} + +func (b *bucket32) appendTo(dst []uint64) []uint64 { + if len(b.buckets) <= maxUnsortedBuckets && !sort.IsSorted(b) { + sort.Sort(b) + } + for i, b16 := range b.buckets { + hi16 := b.b16his[i] + dst = b16.appendTo(dst, b.hi, hi16) + } + return dst +} + +const ( + bitsPerBucket = 1 << 16 + wordsPerBucket = bitsPerBucket / 64 +) + +type bucket16 struct { + bits [wordsPerBucket]uint64 +} + +func (b *bucket16) clone() *bucket16 { + var dst bucket16 + copy(dst.bits[:], b.bits[:]) + return &dst +} + +func (b *bucket16) add(x uint16) bool { + wordNum, bitMask := getWordNumBitMask(x) + word := &b.bits[wordNum] + ok := *word&bitMask == 0 + *word |= bitMask + return ok +} + +func (b *bucket16) has(x uint16) bool { + wordNum, bitMask := getWordNumBitMask(x) + return b.bits[wordNum]&bitMask != 0 +} + +func (b *bucket16) del(x uint16) bool { + wordNum, bitMask := getWordNumBitMask(x) + word := &b.bits[wordNum] + ok := *word&bitMask != 0 + *word &^= bitMask + return ok +} + +func (b *bucket16) appendTo(dst []uint64, hi uint32, hi16 uint16) []uint64 { + hi64 := uint64(hi)<<32 | uint64(hi16)<<16 + var wordNum uint64 + for _, word := range b.bits { + for bitNum := uint64(0); bitNum < 64; bitNum++ { + if word&(uint64(1)<> 1) + if h >= 0 && h < len(u16) && u16[h] < x { + i = h + 1 + } else { + j = h + } + } + return i +} diff --git a/lib/uint64set/uint64set_test.go b/lib/uint64set/uint64set_test.go new file mode 100644 index 0000000000..a6a60d2754 --- /dev/null +++ b/lib/uint64set/uint64set_test.go @@ -0,0 +1,224 @@ +package uint64set + +import ( + "fmt" + "math/rand" + "sort" + "testing" + "time" +) + +func TestSetBasicOps(t *testing.T) { + for _, itemsCount := range []int{1e2, 1e3, 1e4, 1e5, 1e6, maxUnsortedBuckets * bitsPerBucket * 2} { + t.Run(fmt.Sprintf("items_%d", itemsCount), func(t *testing.T) { + testSetBasicOps(t, itemsCount) + }) + } +} + +func testSetBasicOps(t *testing.T, itemsCount int) { + var s Set + + offset := uint64(time.Now().UnixNano()) + + // Verify forward Add + for i := 0; i < itemsCount/2; i++ { + s.Add(uint64(i) + offset) + } + if n := s.Len(); n != itemsCount/2 { + t.Fatalf("unexpected s.Len() after forward Add; got %d; want %d", n, itemsCount/2) + } + + // Verify backward Add + for i := 0; i < itemsCount/2; i++ { + s.Add(uint64(itemsCount-i-1) + offset) + } + if n := s.Len(); n != itemsCount { + t.Fatalf("unexpected s.Len() after backward Add; got %d; want %d", n, itemsCount) + } + + // Verify repeated Add + for i := 0; i < itemsCount/2; i++ { + s.Add(uint64(i) + offset) + } + if n := s.Len(); n != itemsCount { + t.Fatalf("unexpected s.Len() after repeated Add; got %d; want %d", n, itemsCount) + } + + // Verify Has on existing bits + for i := 0; i < itemsCount; i++ { + if !s.Has(uint64(i) + offset) { + t.Fatalf("missing bit %d", i) + } + } + + // Verify Has on missing bits + for i := itemsCount; i < 2*itemsCount; i++ { + if s.Has(uint64(i) + offset) { + t.Fatalf("unexpected bit found: %d", i) + } + } + + // Verify Clone + sCopy := s.Clone() + if n := sCopy.Len(); n != itemsCount { + t.Fatalf("unexpected sCopy.Len(); got %d; want %d", n, itemsCount) + } + for i := 0; i < itemsCount; i++ { + if !sCopy.Has(uint64(i) + offset) { + t.Fatalf("missing bit %d on sCopy", i) + } + } + + // Verify AppendTo + a := s.AppendTo(nil) + if len(a) != itemsCount { + t.Fatalf("unexpected len of exported array; got %d; want %d; array:\n%d", len(a), itemsCount, a) + } + if !sort.SliceIsSorted(a, func(i, j int) bool { return a[i] < a[j] }) { + t.Fatalf("unsorted result returned from AppendTo: %d", a) + } + m := make(map[uint64]bool) + for _, x := range a { + m[x] = true + } + for i := 0; i < itemsCount; i++ { + if !m[uint64(i)+offset] { + t.Fatalf("missing bit %d in the exported bits; array:\n%d", i, a) + } + } + + // Verify Del + for i := itemsCount / 2; i < itemsCount-itemsCount/4; i++ { + s.Del(uint64(i) + offset) + } + if n := s.Len(); n != itemsCount-itemsCount/4 { + t.Fatalf("unexpected s.Len() after Del; got %d; want %d", n, itemsCount-itemsCount/4) + } + a = s.AppendTo(a[:0]) + if len(a) != itemsCount-itemsCount/4 { + t.Fatalf("unexpected len of exported array; got %d; want %d", len(a), itemsCount-itemsCount/4) + } + m = make(map[uint64]bool) + for _, x := range a { + m[x] = true + } + for i := 0; i < itemsCount; i++ { + if i >= itemsCount/2 && i < itemsCount-itemsCount/4 { + if m[uint64(i)+offset] { + t.Fatalf("unexpected bit found after deleting: %d", i) + } + } else { + if !m[uint64(i)+offset] { + t.Fatalf("missing bit %d in the exported bits after deleting", i) + } + } + } + + // Try Del for non-existing items + for i := itemsCount / 2; i < itemsCount-itemsCount/4; i++ { + s.Del(uint64(i) + offset) + s.Del(uint64(i) + offset) + s.Del(uint64(i) + offset + uint64(itemsCount)) + } + if n := s.Len(); n != itemsCount-itemsCount/4 { + t.Fatalf("unexpected s.Len() after Del for non-existing items; got %d; want %d", n, itemsCount-itemsCount/4) + } + + // Verify sCopy has the original data + if n := sCopy.Len(); n != itemsCount { + t.Fatalf("unexpected sCopy.Len(); got %d; want %d", n, itemsCount) + } + for i := 0; i < itemsCount; i++ { + if !sCopy.Has(uint64(i) + offset) { + t.Fatalf("missing bit %d on sCopy", i) + } + } +} + +func TestSetSparseItems(t *testing.T) { + for _, itemsCount := range []int{1e2, 1e3, 1e4} { + t.Run(fmt.Sprintf("items_%d", itemsCount), func(t *testing.T) { + testSetSparseItems(t, itemsCount) + }) + } +} + +func testSetSparseItems(t *testing.T, itemsCount int) { + var s Set + m := make(map[uint64]bool) + for i := 0; i < itemsCount; i++ { + x := rand.Uint64() + s.Add(x) + m[x] = true + } + if n := s.Len(); n != len(m) { + t.Fatalf("unexpected Len(); got %d; want %d", n, len(m)) + } + + // Check Has + for x := range m { + if !s.Has(x) { + t.Fatalf("missing item %d", x) + } + } + for i := 0; i < itemsCount; i++ { + x := uint64(i) + if m[x] { + continue + } + if s.Has(x) { + t.Fatalf("unexpected item found %d", x) + } + } + + // Check Clone + sCopy := s.Clone() + if n := sCopy.Len(); n != len(m) { + t.Fatalf("unexpected sCopy.Len(); got %d; want %d", n, len(m)) + } + for x := range m { + if !sCopy.Has(x) { + t.Fatalf("missing item %d on sCopy", x) + } + } + + // Check AppendTo + a := s.AppendTo(nil) + if len(a) != len(m) { + t.Fatalf("unexpected len for AppendTo result; got %d; want %d", len(a), len(m)) + } + if !sort.SliceIsSorted(a, func(i, j int) bool { return a[i] < a[j] }) { + t.Fatalf("unsorted result returned from AppendTo: %d", a) + } + for _, x := range a { + if !m[x] { + t.Fatalf("unexpected item found in AppendTo result: %d", x) + } + } + + // Check Del + for x := range m { + s.Del(x) + s.Del(x) + s.Del(x + 1) + s.Del(x - 1) + } + if n := s.Len(); n != 0 { + t.Fatalf("unexpected number of items left after Del; got %d; want 0", n) + } + a = s.AppendTo(a[:0]) + if len(a) != 0 { + t.Fatalf("unexpected number of items returned from AppendTo after Del; got %d; want 0; items\n%d", len(a), a) + } + + // Check items in sCopy + if n := sCopy.Len(); n != len(m) { + t.Fatalf("unexpected sCopy.Len() after Del; got %d; want %d", n, len(m)) + } + for x := range m { + if !sCopy.Has(x) { + t.Fatalf("missing item %d on sCopy after Del", x) + } + } +} diff --git a/lib/uint64set/uint64set_timing_test.go b/lib/uint64set/uint64set_timing_test.go new file mode 100644 index 0000000000..ef143a3541 --- /dev/null +++ b/lib/uint64set/uint64set_timing_test.go @@ -0,0 +1,321 @@ +package uint64set + +import ( + "fmt" + "testing" + "time" + + "github.com/valyala/fastrand" +) + +func BenchmarkSetAddRandomLastBits(b *testing.B) { + const itemsCount = 1e5 + for _, lastBits := range []uint64{20, 24, 28, 32} { + mask := (uint64(1) << lastBits) - 1 + b.Run(fmt.Sprintf("lastBits_%d", lastBits), func(b *testing.B) { + b.ReportAllocs() + b.SetBytes(int64(itemsCount)) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + start := uint64(time.Now().UnixNano()) + var s Set + var rng fastrand.RNG + for i := 0; i < itemsCount; i++ { + n := start | (uint64(rng.Uint32()) & mask) + s.Add(n) + } + } + }) + }) + } +} + +func BenchmarkMapAddRandomLastBits(b *testing.B) { + const itemsCount = 1e5 + for _, lastBits := range []uint64{20, 24, 28, 32} { + mask := (uint64(1) << lastBits) - 1 + b.Run(fmt.Sprintf("lastBits_%d", lastBits), func(b *testing.B) { + b.ReportAllocs() + b.SetBytes(int64(itemsCount)) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + start := uint64(time.Now().UnixNano()) + m := make(map[uint64]struct{}) + var rng fastrand.RNG + for i := 0; i < itemsCount; i++ { + n := start | (uint64(rng.Uint32()) & mask) + m[n] = struct{}{} + } + } + }) + }) + } +} + +func BenchmarkSetAddWithAllocs(b *testing.B) { + for _, itemsCount := range []uint64{1e3, 1e4, 1e5, 1e6, 1e7} { + b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) { + b.ReportAllocs() + b.SetBytes(int64(itemsCount)) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + start := uint64(time.Now().UnixNano()) + end := start + itemsCount + var s Set + n := start + for n < end { + s.Add(n) + n++ + } + } + }) + }) + } +} + +func BenchmarkMapAddWithAllocs(b *testing.B) { + for _, itemsCount := range []uint64{1e3, 1e4, 1e5, 1e6, 1e7} { + b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) { + b.ReportAllocs() + b.SetBytes(int64(itemsCount)) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + start := uint64(time.Now().UnixNano()) + end := start + itemsCount + m := make(map[uint64]struct{}) + n := start + for n < end { + m[n] = struct{}{} + n++ + } + } + }) + }) + } +} + +func BenchmarkMapAddNoAllocs(b *testing.B) { + for _, itemsCount := range []uint64{1e3, 1e4, 1e5, 1e6, 1e7} { + b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) { + b.ReportAllocs() + b.SetBytes(int64(itemsCount)) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + start := uint64(time.Now().UnixNano()) + end := start + itemsCount + m := make(map[uint64]struct{}, itemsCount) + n := start + for n < end { + m[n] = struct{}{} + n++ + } + } + }) + }) + } +} + +func BenchmarkMapAddReuse(b *testing.B) { + for _, itemsCount := range []uint64{1e3, 1e4, 1e5, 1e6, 1e7} { + b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) { + b.ReportAllocs() + b.SetBytes(int64(itemsCount)) + b.RunParallel(func(pb *testing.PB) { + m := make(map[uint64]struct{}, itemsCount) + for pb.Next() { + start := uint64(time.Now().UnixNano()) + end := start + itemsCount + for k := range m { + delete(m, k) + } + n := start + for n < end { + m[n] = struct{}{} + n++ + } + } + }) + }) + } +} + +func BenchmarkSetHasHitRandomLastBits(b *testing.B) { + const itemsCount = 1e5 + for _, lastBits := range []uint64{20, 24, 28, 32} { + mask := (uint64(1) << lastBits) - 1 + b.Run(fmt.Sprintf("lastBits_%d", lastBits), func(b *testing.B) { + start := uint64(time.Now().UnixNano()) + var s Set + var rng fastrand.RNG + for i := 0; i < itemsCount; i++ { + n := start | (uint64(rng.Uint32()) & mask) + s.Add(n) + } + a := s.AppendTo(nil) + + b.ResetTimer() + b.ReportAllocs() + b.SetBytes(int64(len(a))) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + for _, n := range a { + if !s.Has(n) { + panic("unexpected miss") + } + } + } + }) + }) + } +} + +func BenchmarkMapHasHitRandomLastBits(b *testing.B) { + const itemsCount = 1e5 + for _, lastBits := range []uint64{20, 24, 28, 32} { + mask := (uint64(1) << lastBits) - 1 + b.Run(fmt.Sprintf("lastBits_%d", lastBits), func(b *testing.B) { + start := uint64(time.Now().UnixNano()) + m := make(map[uint64]struct{}) + var rng fastrand.RNG + for i := 0; i < itemsCount; i++ { + n := start | (uint64(rng.Uint32()) & mask) + m[n] = struct{}{} + } + + b.ResetTimer() + b.ReportAllocs() + b.SetBytes(int64(len(m))) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + for n := range m { + if _, ok := m[n]; !ok { + panic("unexpected miss") + } + } + } + }) + }) + } +} + +func BenchmarkSetHasHit(b *testing.B) { + for _, itemsCount := range []uint64{1e3, 1e4, 1e5, 1e6, 1e7} { + b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) { + start := uint64(time.Now().UnixNano()) + end := start + itemsCount + var s Set + n := start + for n < end { + s.Add(n) + n++ + } + + b.ResetTimer() + b.ReportAllocs() + b.SetBytes(int64(itemsCount)) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + n := start + for n < end { + if !s.Has(n) { + panic("unexpected miss") + } + n++ + } + } + }) + }) + } +} + +func BenchmarkMapHasHit(b *testing.B) { + for _, itemsCount := range []uint64{1e3, 1e4, 1e5, 1e6, 1e7} { + b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) { + start := uint64(time.Now().UnixNano()) + end := start + itemsCount + m := make(map[uint64]struct{}, itemsCount) + n := start + for n < end { + m[n] = struct{}{} + n++ + } + + b.ResetTimer() + b.ReportAllocs() + b.SetBytes(int64(itemsCount)) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + n := start + for n < end { + if _, ok := m[n]; !ok { + panic("unexpected miss") + } + n++ + } + } + }) + }) + } +} + +func BenchmarkSetHasMiss(b *testing.B) { + for _, itemsCount := range []uint64{1e3, 1e4, 1e5, 1e6, 1e7} { + b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) { + start := uint64(time.Now().UnixNano()) + end := start + itemsCount + var s Set + n := start + for n < end { + s.Add(n) + n++ + } + + b.ResetTimer() + b.ReportAllocs() + b.SetBytes(int64(itemsCount)) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + n := end + nEnd := end + itemsCount + for n < nEnd { + if s.Has(n) { + panic("unexpected hit") + } + n++ + } + } + }) + }) + } +} + +func BenchmarkMapHasMiss(b *testing.B) { + for _, itemsCount := range []uint64{1e3, 1e4, 1e5, 1e6, 1e7} { + b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) { + start := uint64(time.Now().UnixNano()) + end := start + itemsCount + m := make(map[uint64]struct{}, itemsCount) + n := start + for n < end { + m[n] = struct{}{} + n++ + } + + b.ResetTimer() + b.ReportAllocs() + b.SetBytes(int64(itemsCount)) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + n := end + nEnd := end + itemsCount + for n < nEnd { + if _, ok := m[n]; ok { + panic("unexpected hit") + } + n++ + } + } + }) + }) + } +}