lib/storage: create and use lib/uint64set instead of map[uint64]struct{}

This should improve inverted index search performance for filters matching big number of time series,
since `lib/uint64set.Set` is faster than `map[uint64]struct{}` for both `Add` and `Has` calls.
See the corresponding benchmarks in `lib/uint64set`.
This commit is contained in:
Aliaksandr Valialkin 2019-09-24 21:10:22 +03:00
parent ef2296e420
commit b986516fbe
12 changed files with 1041 additions and 188 deletions

1
go.mod
View file

@ -8,6 +8,7 @@ require (
github.com/google/go-cmp v0.3.0 // indirect github.com/google/go-cmp v0.3.0 // indirect
github.com/klauspost/compress v1.8.3 github.com/klauspost/compress v1.8.3
github.com/valyala/fastjson v1.4.1 github.com/valyala/fastjson v1.4.1
github.com/valyala/fastrand v1.0.0
github.com/valyala/gozstd v1.6.2 github.com/valyala/gozstd v1.6.2
github.com/valyala/histogram v1.0.1 github.com/valyala/histogram v1.0.1
github.com/valyala/quicktemplate v1.2.0 github.com/valyala/quicktemplate v1.2.0

View file

@ -371,7 +371,7 @@ func binarySearchKey(items [][]byte, key []byte) int {
i, j := uint(0), n i, j := uint(0), n
for i < j { for i < j {
h := uint(i+j) >> 1 h := uint(i+j) >> 1
if string(key) > string(items[h]) { if h >= 0 && h < uint(len(items)) && string(key) > string(items[h]) {
i = h + 1 i = h + 1
} else { } else {
j = h j = h

View file

@ -18,6 +18,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset" "github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
"github.com/VictoriaMetrics/fastcache" "github.com/VictoriaMetrics/fastcache"
xxhash "github.com/cespare/xxhash/v2" xxhash "github.com/cespare/xxhash/v2"
@ -67,12 +68,12 @@ type indexDB struct {
indexSearchPool sync.Pool indexSearchPool sync.Pool
// An inmemory map[uint64]struct{} of deleted metricIDs. // An inmemory set of deleted metricIDs.
// //
// The map holds deleted metricIDs for the current db and for the extDB. // The set holds deleted metricIDs for the current db and for the extDB.
// //
// It is safe to keep the map in memory even for big number of deleted // It is safe to keep the set in memory even for big number of deleted
// metricIDs, since it occupies only 8 bytes per deleted metricID. // metricIDs, since it usually requires 1 bit per deleted metricID.
deletedMetricIDs atomic.Value deletedMetricIDs atomic.Value
deletedMetricIDsUpdateLock sync.Mutex deletedMetricIDsUpdateLock sync.Mutex
@ -199,7 +200,7 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
m.UselessTagFiltersCacheRequests += cs.GetBigCalls m.UselessTagFiltersCacheRequests += cs.GetBigCalls
m.UselessTagFiltersCacheMisses += cs.Misses m.UselessTagFiltersCacheMisses += cs.Misses
m.DeletedMetricsCount += uint64(len(db.getDeletedMetricIDs())) m.DeletedMetricsCount += uint64(db.getDeletedMetricIDs().Len())
m.IndexDBRefCount += atomic.LoadUint64(&db.refCount) m.IndexDBRefCount += atomic.LoadUint64(&db.refCount)
m.MissingTSIDsForMetricID += atomic.LoadUint64(&db.missingTSIDsForMetricID) m.MissingTSIDsForMetricID += atomic.LoadUint64(&db.missingTSIDsForMetricID)
@ -237,7 +238,7 @@ func (db *indexDB) SetExtDB(extDB *indexDB) {
// Add deleted metricIDs from extDB to db. // Add deleted metricIDs from extDB to db.
if extDB != nil { if extDB != nil {
dmisExt := extDB.getDeletedMetricIDs() dmisExt := extDB.getDeletedMetricIDs()
metricIDs := getSortedMetricIDs(dmisExt) metricIDs := dmisExt.AppendTo(nil)
db.updateDeletedMetricIDs(metricIDs) db.updateDeletedMetricIDs(metricIDs)
} }
@ -879,30 +880,27 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) {
return deletedCount, nil return deletedCount, nil
} }
func (db *indexDB) getDeletedMetricIDs() map[uint64]struct{} { func (db *indexDB) getDeletedMetricIDs() *uint64set.Set {
return db.deletedMetricIDs.Load().(map[uint64]struct{}) return db.deletedMetricIDs.Load().(*uint64set.Set)
} }
func (db *indexDB) setDeletedMetricIDs(dmis map[uint64]struct{}) { func (db *indexDB) setDeletedMetricIDs(dmis *uint64set.Set) {
db.deletedMetricIDs.Store(dmis) db.deletedMetricIDs.Store(dmis)
} }
func (db *indexDB) updateDeletedMetricIDs(metricIDs []uint64) { func (db *indexDB) updateDeletedMetricIDs(metricIDs []uint64) {
db.deletedMetricIDsUpdateLock.Lock() db.deletedMetricIDsUpdateLock.Lock()
dmisOld := db.getDeletedMetricIDs() dmisOld := db.getDeletedMetricIDs()
dmisNew := make(map[uint64]struct{}, len(dmisOld)+len(metricIDs)) dmisNew := dmisOld.Clone()
for metricID := range dmisOld {
dmisNew[metricID] = struct{}{}
}
for _, metricID := range metricIDs { for _, metricID := range metricIDs {
dmisNew[metricID] = struct{}{} dmisNew.Add(metricID)
} }
db.setDeletedMetricIDs(dmisNew) db.setDeletedMetricIDs(dmisNew)
db.deletedMetricIDsUpdateLock.Unlock() db.deletedMetricIDsUpdateLock.Unlock()
} }
func (is *indexSearch) loadDeletedMetricIDs() (map[uint64]struct{}, error) { func (is *indexSearch) loadDeletedMetricIDs() (*uint64set.Set, error) {
dmis := make(map[uint64]struct{}) dmis := &uint64set.Set{}
ts := &is.ts ts := &is.ts
kb := &is.kb kb := &is.kb
kb.B = append(kb.B[:0], nsPrefixDeteletedMetricID) kb.B = append(kb.B[:0], nsPrefixDeteletedMetricID)
@ -917,7 +915,7 @@ func (is *indexSearch) loadDeletedMetricIDs() (map[uint64]struct{}, error) {
return nil, fmt.Errorf("unexpected item len; got %d bytes; want %d bytes", len(item), 8) return nil, fmt.Errorf("unexpected item len; got %d bytes; want %d bytes", len(item), 8)
} }
metricID := encoding.UnmarshalUint64(item) metricID := encoding.UnmarshalUint64(item)
dmis[metricID] = struct{}{} dmis.Add(metricID)
} }
if err := ts.Error(); err != nil { if err := ts.Error(); err != nil {
return nil, err return nil, err
@ -1009,9 +1007,9 @@ func (is *indexSearch) getTSIDByMetricName(dst *TSID, metricName []byte) error {
if len(tail) > 0 { if len(tail) > 0 {
return fmt.Errorf("unexpected non-empty tail left after unmarshaling TSID: %X", tail) return fmt.Errorf("unexpected non-empty tail left after unmarshaling TSID: %X", tail)
} }
if len(dmis) > 0 { if dmis.Len() > 0 {
// Verify whether the dst is marked as deleted. // Verify whether the dst is marked as deleted.
if _, deleted := dmis[dst.MetricID]; deleted { if dmis.Has(dst.MetricID) {
// The dst is deleted. Continue searching. // The dst is deleted. Continue searching.
continue continue
} }
@ -1175,9 +1173,9 @@ func (is *indexSearch) getSeriesCount() (uint64, error) {
// updateMetricIDsByMetricNameMatch matches metricName values for the given srcMetricIDs against tfs // updateMetricIDsByMetricNameMatch matches metricName values for the given srcMetricIDs against tfs
// and adds matching metrics to metricIDs. // and adds matching metrics to metricIDs.
func (is *indexSearch) updateMetricIDsByMetricNameMatch(metricIDs, srcMetricIDs map[uint64]struct{}, tfs []*tagFilter) error { func (is *indexSearch) updateMetricIDsByMetricNameMatch(metricIDs, srcMetricIDs *uint64set.Set, tfs []*tagFilter) error {
// sort srcMetricIDs in order to speed up Seek below. // sort srcMetricIDs in order to speed up Seek below.
sortedMetricIDs := getSortedMetricIDs(srcMetricIDs) sortedMetricIDs := srcMetricIDs.AppendTo(nil)
metricName := kbPool.Get() metricName := kbPool.Get()
defer kbPool.Put(metricName) defer kbPool.Put(metricName)
@ -1201,12 +1199,12 @@ func (is *indexSearch) updateMetricIDsByMetricNameMatch(metricIDs, srcMetricIDs
if !ok { if !ok {
continue continue
} }
metricIDs[metricID] = struct{}{} metricIDs.Add(metricID)
} }
return nil return nil
} }
func (is *indexSearch) getTagFilterWithMinMetricIDsCountOptimized(tfs *TagFilters, tr TimeRange, maxMetrics int) (*tagFilter, map[uint64]struct{}, error) { func (is *indexSearch) getTagFilterWithMinMetricIDsCountOptimized(tfs *TagFilters, tr TimeRange, maxMetrics int) (*tagFilter, *uint64set.Set, error) {
// Try fast path with the minimized number of maxMetrics. // Try fast path with the minimized number of maxMetrics.
maxMetricsAdjusted := is.adjustMaxMetricsAdaptive(tr, maxMetrics) maxMetricsAdjusted := is.adjustMaxMetricsAdaptive(tr, maxMetrics)
minTf, minMetricIDs, err := is.getTagFilterWithMinMetricIDsCountAdaptive(tfs, maxMetricsAdjusted) minTf, minMetricIDs, err := is.getTagFilterWithMinMetricIDsCountAdaptive(tfs, maxMetricsAdjusted)
@ -1243,7 +1241,7 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCountOptimized(tfs *TagFilter
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
if len(metricIDsForTimeRange) <= maxTimeRangeMetrics { if metricIDsForTimeRange.Len() <= maxTimeRangeMetrics {
return nil, metricIDsForTimeRange, nil return nil, metricIDsForTimeRange, nil
} }
@ -1273,7 +1271,7 @@ func (is *indexSearch) adjustMaxMetricsAdaptive(tr TimeRange, maxMetrics int) in
if !hmPrev.isFull { if !hmPrev.isFull {
return maxMetrics return maxMetrics
} }
hourMetrics := len(hmPrev.m) hourMetrics := hmPrev.m.Len()
if hourMetrics >= 256 && maxMetrics > hourMetrics/4 { if hourMetrics >= 256 && maxMetrics > hourMetrics/4 {
// It is cheaper to filter on the hour or day metrics if the minimum // It is cheaper to filter on the hour or day metrics if the minimum
// number of matching metrics across tfs exceeds hourMetrics / 4. // number of matching metrics across tfs exceeds hourMetrics / 4.
@ -1282,7 +1280,7 @@ func (is *indexSearch) adjustMaxMetricsAdaptive(tr TimeRange, maxMetrics int) in
return maxMetrics return maxMetrics
} }
func (is *indexSearch) getTagFilterWithMinMetricIDsCountAdaptive(tfs *TagFilters, maxMetrics int) (*tagFilter, map[uint64]struct{}, error) { func (is *indexSearch) getTagFilterWithMinMetricIDsCountAdaptive(tfs *TagFilters, maxMetrics int) (*tagFilter, *uint64set.Set, error) {
kb := &is.kb kb := &is.kb
kb.B = append(kb.B[:0], uselessMultiTagFiltersKeyPrefix) kb.B = append(kb.B[:0], uselessMultiTagFiltersKeyPrefix)
kb.B = encoding.MarshalUint64(kb.B, uint64(maxMetrics)) kb.B = encoding.MarshalUint64(kb.B, uint64(maxMetrics))
@ -1304,7 +1302,7 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCountAdaptive(tfs *TagFilters
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
if len(minMetricIDs) < maxAllowedMetrics { if minMetricIDs.Len() < maxAllowedMetrics {
// Found the tag filter with the minimum number of metrics. // Found the tag filter with the minimum number of metrics.
return minTf, minMetricIDs, nil return minTf, minMetricIDs, nil
} }
@ -1330,8 +1328,8 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCountAdaptive(tfs *TagFilters
var errTooManyMetrics = errors.New("all the tag filters match too many metrics") var errTooManyMetrics = errors.New("all the tag filters match too many metrics")
func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMetrics int) (*tagFilter, map[uint64]struct{}, error) { func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMetrics int) (*tagFilter, *uint64set.Set, error) {
var minMetricIDs map[uint64]struct{} var minMetricIDs *uint64set.Set
var minTf *tagFilter var minTf *tagFilter
kb := &is.kb kb := &is.kb
uselessTagFilters := 0 uselessTagFilters := 0
@ -1364,7 +1362,7 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMet
} }
return nil, nil, fmt.Errorf("cannot find MetricIDs for tagFilter %s: %s", tf, err) return nil, nil, fmt.Errorf("cannot find MetricIDs for tagFilter %s: %s", tf, err)
} }
if len(metricIDs) >= maxMetrics { if metricIDs.Len() >= maxMetrics {
// The tf matches at least maxMetrics. Skip it // The tf matches at least maxMetrics. Skip it
kb.B = append(kb.B[:0], uselessSingleTagFilterKeyPrefix) kb.B = append(kb.B[:0], uselessSingleTagFilterKeyPrefix)
kb.B = encoding.MarshalUint64(kb.B, uint64(maxMetrics)) kb.B = encoding.MarshalUint64(kb.B, uint64(maxMetrics))
@ -1376,7 +1374,7 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMet
minMetricIDs = metricIDs minMetricIDs = metricIDs
minTf = tf minTf = tf
maxMetrics = len(minMetricIDs) maxMetrics = minMetricIDs.Len()
if maxMetrics <= 1 { if maxMetrics <= 1 {
// There is no need in inspecting other filters, since minTf // There is no need in inspecting other filters, since minTf
// already matches 0 or 1 metric. // already matches 0 or 1 metric.
@ -1399,11 +1397,11 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMet
if len(is.db.uselessTagFiltersCache.Get(nil, kb.B)) > 0 { if len(is.db.uselessTagFiltersCache.Get(nil, kb.B)) > 0 {
return nil, nil, errTooManyMetrics return nil, nil, errTooManyMetrics
} }
metricIDs := make(map[uint64]struct{}) metricIDs := &uint64set.Set{}
if err := is.updateMetricIDsAll(metricIDs, maxMetrics); err != nil { if err := is.updateMetricIDsAll(metricIDs, maxMetrics); err != nil {
return nil, nil, err return nil, nil, err
} }
if len(metricIDs) >= maxMetrics { if metricIDs.Len() >= maxMetrics {
kb.B = append(kb.B[:0], uselessNegativeTagFilterKeyPrefix) kb.B = append(kb.B[:0], uselessNegativeTagFilterKeyPrefix)
kb.B = encoding.MarshalUint64(kb.B, uint64(maxMetrics)) kb.B = encoding.MarshalUint64(kb.B, uint64(maxMetrics))
kb.B = tfs.marshal(kb.B) kb.B = tfs.marshal(kb.B)
@ -1475,14 +1473,14 @@ func matchTagFilter(b []byte, tf *tagFilter) (bool, error) {
} }
func (is *indexSearch) searchMetricIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]uint64, error) { func (is *indexSearch) searchMetricIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]uint64, error) {
metricIDs := make(map[uint64]struct{}) metricIDs := &uint64set.Set{}
for _, tfs := range tfss { for _, tfs := range tfss {
if len(tfs.tfs) == 0 { if len(tfs.tfs) == 0 {
// Return all the metric ids // Return all the metric ids
if err := is.updateMetricIDsAll(metricIDs, maxMetrics+1); err != nil { if err := is.updateMetricIDsAll(metricIDs, maxMetrics+1); err != nil {
return nil, err return nil, err
} }
if len(metricIDs) > maxMetrics { if metricIDs.Len() > maxMetrics {
return nil, fmt.Errorf("the number or unique timeseries exceeds %d; either narrow down the search or increase -search.maxUniqueTimeseries", maxMetrics) return nil, fmt.Errorf("the number or unique timeseries exceeds %d; either narrow down the search or increase -search.maxUniqueTimeseries", maxMetrics)
} }
// Stop the iteration, since we cannot find more metric ids with the remaining tfss. // Stop the iteration, since we cannot find more metric ids with the remaining tfss.
@ -1491,23 +1489,23 @@ func (is *indexSearch) searchMetricIDs(tfss []*TagFilters, tr TimeRange, maxMetr
if err := is.updateMetricIDsForTagFilters(metricIDs, tfs, tr, maxMetrics+1); err != nil { if err := is.updateMetricIDsForTagFilters(metricIDs, tfs, tr, maxMetrics+1); err != nil {
return nil, err return nil, err
} }
if len(metricIDs) > maxMetrics { if metricIDs.Len() > maxMetrics {
return nil, fmt.Errorf("the number or matching unique timeseries exceeds %d; either narrow down the search or increase -search.maxUniqueTimeseries", maxMetrics) return nil, fmt.Errorf("the number or matching unique timeseries exceeds %d; either narrow down the search or increase -search.maxUniqueTimeseries", maxMetrics)
} }
} }
if len(metricIDs) == 0 { if metricIDs.Len() == 0 {
// Nothing found // Nothing found
return nil, nil return nil, nil
} }
sortedMetricIDs := getSortedMetricIDs(metricIDs) sortedMetricIDs := metricIDs.AppendTo(nil)
// Filter out deleted metricIDs. // Filter out deleted metricIDs.
dmis := is.db.getDeletedMetricIDs() dmis := is.db.getDeletedMetricIDs()
if len(dmis) > 0 { if dmis.Len() > 0 {
metricIDsFiltered := sortedMetricIDs[:0] metricIDsFiltered := sortedMetricIDs[:0]
for _, metricID := range sortedMetricIDs { for _, metricID := range sortedMetricIDs {
if _, deleted := dmis[metricID]; !deleted { if !dmis.Has(metricID) {
metricIDsFiltered = append(metricIDsFiltered, metricID) metricIDsFiltered = append(metricIDsFiltered, metricID)
} }
} }
@ -1517,7 +1515,7 @@ func (is *indexSearch) searchMetricIDs(tfss []*TagFilters, tr TimeRange, maxMetr
return sortedMetricIDs, nil return sortedMetricIDs, nil
} }
func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs map[uint64]struct{}, tfs *TagFilters, tr TimeRange, maxMetrics int) error { func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange, maxMetrics int) error {
// Sort tag filters for faster ts.Seek below. // Sort tag filters for faster ts.Seek below.
sort.Slice(tfs.tfs, func(i, j int) bool { return bytes.Compare(tfs.tfs[i].prefix, tfs.tfs[j].prefix) < 0 }) sort.Slice(tfs.tfs, func(i, j int) bool { return bytes.Compare(tfs.tfs[i].prefix, tfs.tfs[j].prefix) < 0 })
@ -1560,8 +1558,8 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs map[uint64]struct{
} }
minMetricIDs = mIDs minMetricIDs = mIDs
} }
for metricID := range minMetricIDs { for _, metricID := range minMetricIDs.AppendTo(nil) {
metricIDs[metricID] = struct{}{} metricIDs.Add(metricID)
} }
return nil return nil
} }
@ -1574,11 +1572,11 @@ const (
var uselessTagFilterCacheValue = []byte("1") var uselessTagFilterCacheValue = []byte("1")
func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int) (map[uint64]struct{}, error) { func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int) (*uint64set.Set, error) {
if tf.isNegative { if tf.isNegative {
logger.Panicf("BUG: isNegative must be false") logger.Panicf("BUG: isNegative must be false")
} }
metricIDs := make(map[uint64]struct{}, maxMetrics) metricIDs := &uint64set.Set{}
if len(tf.orSuffixes) > 0 { if len(tf.orSuffixes) > 0 {
// Fast path for orSuffixes - seek for rows for each value from orSuffxies. // Fast path for orSuffixes - seek for rows for each value from orSuffxies.
if err := is.updateMetricIDsForOrSuffixesNoFilter(tf, maxMetrics, metricIDs); err != nil { if err := is.updateMetricIDsForOrSuffixesNoFilter(tf, maxMetrics, metricIDs); err != nil {
@ -1593,8 +1591,8 @@ func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int) (
// Slow path - scan for all the rows with the given prefix. // Slow path - scan for all the rows with the given prefix.
maxLoops := maxMetrics * maxIndexScanLoopsPerMetric maxLoops := maxMetrics * maxIndexScanLoopsPerMetric
err := is.getMetricIDsForTagFilterSlow(tf, maxLoops, func(metricID uint64) bool { err := is.getMetricIDsForTagFilterSlow(tf, maxLoops, func(metricID uint64) bool {
metricIDs[metricID] = struct{}{} metricIDs.Add(metricID)
return len(metricIDs) < maxMetrics return metricIDs.Len() < maxMetrics
}) })
if err != nil { if err != nil {
if err == errFallbackToMetricNameMatch { if err == errFallbackToMetricNameMatch {
@ -1689,7 +1687,7 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, maxLoops int,
return nil return nil
} }
func (is *indexSearch) updateMetricIDsForOrSuffixesNoFilter(tf *tagFilter, maxMetrics int, metricIDs map[uint64]struct{}) error { func (is *indexSearch) updateMetricIDsForOrSuffixesNoFilter(tf *tagFilter, maxMetrics int, metricIDs *uint64set.Set) error {
if tf.isNegative { if tf.isNegative {
logger.Panicf("BUG: isNegative must be false") logger.Panicf("BUG: isNegative must be false")
} }
@ -1702,15 +1700,15 @@ func (is *indexSearch) updateMetricIDsForOrSuffixesNoFilter(tf *tagFilter, maxMe
if err := is.updateMetricIDsForOrSuffixNoFilter(kb.B, maxMetrics, metricIDs); err != nil { if err := is.updateMetricIDsForOrSuffixNoFilter(kb.B, maxMetrics, metricIDs); err != nil {
return err return err
} }
if len(metricIDs) >= maxMetrics { if metricIDs.Len() >= maxMetrics {
return nil return nil
} }
} }
return nil return nil
} }
func (is *indexSearch) updateMetricIDsForOrSuffixesWithFilter(tf *tagFilter, metricIDs, filter map[uint64]struct{}) error { func (is *indexSearch) updateMetricIDsForOrSuffixesWithFilter(tf *tagFilter, metricIDs, filter *uint64set.Set) error {
sortedFilter := getSortedMetricIDs(filter) sortedFilter := filter.AppendTo(nil)
kb := kbPool.Get() kb := kbPool.Get()
defer kbPool.Put(kb) defer kbPool.Put(kb)
for _, orSuffix := range tf.orSuffixes { for _, orSuffix := range tf.orSuffixes {
@ -1724,14 +1722,14 @@ func (is *indexSearch) updateMetricIDsForOrSuffixesWithFilter(tf *tagFilter, met
return nil return nil
} }
func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetrics int, metricIDs map[uint64]struct{}) error { func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetrics int, metricIDs *uint64set.Set) error {
ts := &is.ts ts := &is.ts
mp := &is.mp mp := &is.mp
mp.Reset() mp.Reset()
maxLoops := maxMetrics * maxIndexScanLoopsPerMetric maxLoops := maxMetrics * maxIndexScanLoopsPerMetric
loops := 0 loops := 0
ts.Seek(prefix) ts.Seek(prefix)
for len(metricIDs) < maxMetrics && ts.NextItem() { for metricIDs.Len() < maxMetrics && ts.NextItem() {
item := ts.Item item := ts.Item
if !bytes.HasPrefix(item, prefix) { if !bytes.HasPrefix(item, prefix) {
return nil return nil
@ -1745,7 +1743,7 @@ func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetr
} }
mp.ParseMetricIDs() mp.ParseMetricIDs()
for _, metricID := range mp.MetricIDs { for _, metricID := range mp.MetricIDs {
metricIDs[metricID] = struct{}{} metricIDs.Add(metricID)
} }
} }
if err := ts.Error(); err != nil { if err := ts.Error(); err != nil {
@ -1754,7 +1752,7 @@ func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetr
return nil return nil
} }
func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metricIDs map[uint64]struct{}, sortedFilter []uint64, isNegative bool) error { func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metricIDs *uint64set.Set, sortedFilter []uint64, isNegative bool) error {
if len(sortedFilter) == 0 { if len(sortedFilter) == 0 {
return nil return nil
} }
@ -1810,9 +1808,9 @@ func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metri
continue continue
} }
if isNegative { if isNegative {
delete(metricIDs, metricID) metricIDs.Del(metricID)
} else { } else {
metricIDs[metricID] = struct{}{} metricIDs.Add(metricID)
} }
sf = sf[1:] sf = sf[1:]
} }
@ -1827,7 +1825,7 @@ var errFallbackToMetricNameMatch = errors.New("fall back to updateMetricIDsByMet
var errMissingMetricIDsForDate = errors.New("missing metricIDs for date") var errMissingMetricIDsForDate = errors.New("missing metricIDs for date")
func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (map[uint64]struct{}, error) { func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (*uint64set.Set, error) {
if tr.isZero() { if tr.isZero() {
return nil, errMissingMetricIDsForDate return nil, errMissingMetricIDsForDate
} }
@ -1847,7 +1845,7 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (m
// Too much dates must be covered. Give up. // Too much dates must be covered. Give up.
return nil, errMissingMetricIDsForDate return nil, errMissingMetricIDsForDate
} }
metricIDs := make(map[uint64]struct{}, maxMetrics) metricIDs := &uint64set.Set{}
for minDate <= maxDate { for minDate <= maxDate {
if err := is.getMetricIDsForDate(minDate, metricIDs, maxMetrics); err != nil { if err := is.getMetricIDsForDate(minDate, metricIDs, maxMetrics); err != nil {
return nil, err return nil, err
@ -1858,7 +1856,7 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (m
return metricIDs, nil return metricIDs, nil
} }
func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int) (map[uint64]struct{}, bool) { func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int) (*uint64set.Set, bool) {
minHour := uint64(tr.MinTimestamp) / msecPerHour minHour := uint64(tr.MinTimestamp) / msecPerHour
maxHour := uint64(tr.MaxTimestamp) / msecPerHour maxHour := uint64(tr.MaxTimestamp) / msecPerHour
hmCurr := is.db.currHourMetricIDs.Load().(*hourMetricIDs) hmCurr := is.db.currHourMetricIDs.Load().(*hourMetricIDs)
@ -1866,46 +1864,35 @@ func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int)
// The tr fits the current hour. // The tr fits the current hour.
// Return a copy of hmCurr.m, because the caller may modify // Return a copy of hmCurr.m, because the caller may modify
// the returned map. // the returned map.
if len(hmCurr.m) > maxMetrics { if hmCurr.m.Len() > maxMetrics {
return nil, false return nil, false
} }
return getMetricIDsCopy(hmCurr.m), true return hmCurr.m.Clone(), true
} }
hmPrev := is.db.prevHourMetricIDs.Load().(*hourMetricIDs) hmPrev := is.db.prevHourMetricIDs.Load().(*hourMetricIDs)
if maxHour == hmPrev.hour && minHour == maxHour && hmPrev.isFull { if maxHour == hmPrev.hour && minHour == maxHour && hmPrev.isFull {
// The tr fits the previous hour. // The tr fits the previous hour.
// Return a copy of hmPrev.m, because the caller may modify // Return a copy of hmPrev.m, because the caller may modify
// the returned map. // the returned map.
if len(hmPrev.m) > maxMetrics { if hmPrev.m.Len() > maxMetrics {
return nil, false return nil, false
} }
return getMetricIDsCopy(hmPrev.m), true return hmPrev.m.Clone(), true
} }
if maxHour == hmCurr.hour && minHour == hmPrev.hour && hmCurr.isFull && hmPrev.isFull { if maxHour == hmCurr.hour && minHour == hmPrev.hour && hmCurr.isFull && hmPrev.isFull {
// The tr spans the previous and the current hours. // The tr spans the previous and the current hours.
if len(hmCurr.m)+len(hmPrev.m) > maxMetrics { if hmCurr.m.Len()+hmPrev.m.Len() > maxMetrics {
return nil, false return nil, false
} }
metricIDs := make(map[uint64]struct{}, len(hmCurr.m)+len(hmPrev.m)) metricIDs := hmCurr.m.Clone()
for metricID := range hmCurr.m { for _, metricID := range hmPrev.m.AppendTo(nil) {
metricIDs[metricID] = struct{}{} metricIDs.Add(metricID)
}
for metricID := range hmPrev.m {
metricIDs[metricID] = struct{}{}
} }
return metricIDs, true return metricIDs, true
} }
return nil, false return nil, false
} }
func getMetricIDsCopy(src map[uint64]struct{}) map[uint64]struct{} {
dst := make(map[uint64]struct{}, len(src))
for metricID := range src {
dst[metricID] = struct{}{}
}
return dst
}
func (db *indexDB) storeDateMetricID(date, metricID uint64) error { func (db *indexDB) storeDateMetricID(date, metricID uint64) error {
is := db.getIndexSearch() is := db.getIndexSearch()
ok, err := is.hasDateMetricID(date, metricID) ok, err := is.hasDateMetricID(date, metricID)
@ -1947,14 +1934,14 @@ func (is *indexSearch) hasDateMetricID(date, metricID uint64) (bool, error) {
return true, nil return true, nil
} }
func (is *indexSearch) getMetricIDsForDate(date uint64, metricIDs map[uint64]struct{}, maxMetrics int) error { func (is *indexSearch) getMetricIDsForDate(date uint64, metricIDs *uint64set.Set, maxMetrics int) error {
ts := &is.ts ts := &is.ts
kb := &is.kb kb := &is.kb
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateToMetricID) kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateToMetricID)
kb.B = encoding.MarshalUint64(kb.B, date) kb.B = encoding.MarshalUint64(kb.B, date)
ts.Seek(kb.B) ts.Seek(kb.B)
items := 0 items := 0
for len(metricIDs) < maxMetrics && ts.NextItem() { for metricIDs.Len() < maxMetrics && ts.NextItem() {
if !bytes.HasPrefix(ts.Item, kb.B) { if !bytes.HasPrefix(ts.Item, kb.B) {
break break
} }
@ -1964,7 +1951,7 @@ func (is *indexSearch) getMetricIDsForDate(date uint64, metricIDs map[uint64]str
return fmt.Errorf("cannot extract metricID from k; want %d bytes; got %d bytes", 8, len(v)) return fmt.Errorf("cannot extract metricID from k; want %d bytes; got %d bytes", 8, len(v))
} }
metricID := encoding.UnmarshalUint64(v) metricID := encoding.UnmarshalUint64(v)
metricIDs[metricID] = struct{}{} metricIDs.Add(metricID)
items++ items++
} }
if err := ts.Error(); err != nil { if err := ts.Error(); err != nil {
@ -2000,7 +1987,7 @@ func (is *indexSearch) containsTimeRange(tr TimeRange) (bool, error) {
return true, nil return true, nil
} }
func (is *indexSearch) updateMetricIDsAll(metricIDs map[uint64]struct{}, maxMetrics int) error { func (is *indexSearch) updateMetricIDsAll(metricIDs *uint64set.Set, maxMetrics int) error {
ts := &is.ts ts := &is.ts
kb := &is.kb kb := &is.kb
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixMetricIDToTSID) kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixMetricIDToTSID)
@ -2016,8 +2003,8 @@ func (is *indexSearch) updateMetricIDsAll(metricIDs map[uint64]struct{}, maxMetr
return fmt.Errorf("cannot unmarshal metricID from item with size %d; need at least 9 bytes; item=%q", len(tail), tail) return fmt.Errorf("cannot unmarshal metricID from item with size %d; need at least 9 bytes; item=%q", len(tail), tail)
} }
metricID := encoding.UnmarshalUint64(tail) metricID := encoding.UnmarshalUint64(tail)
metricIDs[metricID] = struct{}{} metricIDs.Add(metricID)
if len(metricIDs) >= maxMetrics { if metricIDs.Len() >= maxMetrics {
return nil return nil
} }
} }
@ -2032,13 +2019,13 @@ func (is *indexSearch) updateMetricIDsAll(metricIDs map[uint64]struct{}, maxMetr
// over the found metrics. // over the found metrics.
const maxIndexScanLoopsPerMetric = 400 const maxIndexScanLoopsPerMetric = 400
func (is *indexSearch) intersectMetricIDsWithTagFilter(tf *tagFilter, filter map[uint64]struct{}) (map[uint64]struct{}, error) { func (is *indexSearch) intersectMetricIDsWithTagFilter(tf *tagFilter, filter *uint64set.Set) (*uint64set.Set, error) {
if len(filter) == 0 { if filter.Len() == 0 {
return nil, nil return nil, nil
} }
metricIDs := filter metricIDs := filter
if !tf.isNegative { if !tf.isNegative {
metricIDs = make(map[uint64]struct{}, len(filter)) metricIDs = &uint64set.Set{}
} }
if len(tf.orSuffixes) > 0 { if len(tf.orSuffixes) > 0 {
// Fast path for orSuffixes - seek for rows for each value from orSuffixes. // Fast path for orSuffixes - seek for rows for each value from orSuffixes.
@ -2052,15 +2039,15 @@ func (is *indexSearch) intersectMetricIDsWithTagFilter(tf *tagFilter, filter map
} }
// Slow path - scan for all the rows with the given prefix. // Slow path - scan for all the rows with the given prefix.
maxLoops := len(filter) * maxIndexScanLoopsPerMetric maxLoops := filter.Len() * maxIndexScanLoopsPerMetric
err := is.getMetricIDsForTagFilterSlow(tf, maxLoops, func(metricID uint64) bool { err := is.getMetricIDsForTagFilterSlow(tf, maxLoops, func(metricID uint64) bool {
if tf.isNegative { if tf.isNegative {
// filter must be equal to metricIDs // filter must be equal to metricIDs
delete(metricIDs, metricID) metricIDs.Del(metricID)
return true return true
} }
if _, ok := filter[metricID]; ok { if filter.Has(metricID) {
metricIDs[metricID] = struct{}{} metricIDs.Add(metricID)
} }
return true return true
}) })
@ -2101,18 +2088,6 @@ func unmarshalCommonPrefix(src []byte) ([]byte, byte, error) {
// 1 byte for prefix // 1 byte for prefix
const commonPrefixLen = 1 const commonPrefixLen = 1
func getSortedMetricIDs(m map[uint64]struct{}) []uint64 {
a := make(uint64Sorter, len(m))
i := 0
for metricID := range m {
a[i] = metricID
i++
}
// Use sort.Sort instead of sort.Slice in order to reduce memory allocations
sort.Sort(a)
return a
}
type tagToMetricIDsRowParser struct { type tagToMetricIDsRowParser struct {
// MetricIDs contains parsed MetricIDs after ParseMetricIDs call // MetricIDs contains parsed MetricIDs after ParseMetricIDs call
MetricIDs []uint64 MetricIDs []uint64
@ -2216,13 +2191,13 @@ func (mp *tagToMetricIDsRowParser) ParseMetricIDs() {
// IsDeletedTag verifies whether the tag from mp is deleted according to dmis. // IsDeletedTag verifies whether the tag from mp is deleted according to dmis.
// //
// dmis must contain deleted MetricIDs. // dmis must contain deleted MetricIDs.
func (mp *tagToMetricIDsRowParser) IsDeletedTag(dmis map[uint64]struct{}) bool { func (mp *tagToMetricIDsRowParser) IsDeletedTag(dmis *uint64set.Set) bool {
if len(dmis) == 0 { if dmis.Len() == 0 {
return false return false
} }
mp.ParseMetricIDs() mp.ParseMetricIDs()
for _, metricID := range mp.MetricIDs { for _, metricID := range mp.MetricIDs {
if _, ok := dmis[metricID]; !ok { if !dmis.Has(metricID) {
return false return false
} }
} }

View file

@ -6,6 +6,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
) )
// mergeBlockStreams merges bsrs into bsw and updates ph. // mergeBlockStreams merges bsrs into bsw and updates ph.
@ -14,7 +15,7 @@ import (
// //
// rowsMerged is atomically updated with the number of merged rows during the merge. // rowsMerged is atomically updated with the number of merged rows during the merge.
func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{}, rowsMerged *uint64, func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{}, rowsMerged *uint64,
deletedMetricIDs map[uint64]struct{}, rowsDeleted *uint64) error { deletedMetricIDs *uint64set.Set, rowsDeleted *uint64) error {
ph.Reset() ph.Reset()
bsm := bsmPool.Get().(*blockStreamMerger) bsm := bsmPool.Get().(*blockStreamMerger)
@ -41,7 +42,7 @@ var bsmPool = &sync.Pool{
var errForciblyStopped = fmt.Errorf("forcibly stopped") var errForciblyStopped = fmt.Errorf("forcibly stopped")
func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *blockStreamMerger, stopCh <-chan struct{}, rowsMerged *uint64, func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *blockStreamMerger, stopCh <-chan struct{}, rowsMerged *uint64,
deletedMetricIDs map[uint64]struct{}, rowsDeleted *uint64) error { deletedMetricIDs *uint64set.Set, rowsDeleted *uint64) error {
// Search for the first block to merge // Search for the first block to merge
var pendingBlock *Block var pendingBlock *Block
for bsm.NextBlock() { for bsm.NextBlock() {
@ -50,7 +51,7 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc
return errForciblyStopped return errForciblyStopped
default: default:
} }
if _, deleted := deletedMetricIDs[bsm.Block.bh.TSID.MetricID]; deleted { if deletedMetricIDs.Has(bsm.Block.bh.TSID.MetricID) {
// Skip blocks for deleted metrics. // Skip blocks for deleted metrics.
*rowsDeleted += uint64(bsm.Block.bh.RowsCount) *rowsDeleted += uint64(bsm.Block.bh.RowsCount)
continue continue
@ -72,7 +73,7 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc
return errForciblyStopped return errForciblyStopped
default: default:
} }
if _, deleted := deletedMetricIDs[bsm.Block.bh.TSID.MetricID]; deleted { if deletedMetricIDs.Has(bsm.Block.bh.TSID.MetricID) {
// Skip blocks for deleted metrics. // Skip blocks for deleted metrics.
*rowsDeleted += uint64(bsm.Block.bh.RowsCount) *rowsDeleted += uint64(bsm.Block.bh.RowsCount)
continue continue

View file

@ -19,6 +19,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
) )
func maxRowsPerSmallPart() uint64 { func maxRowsPerSmallPart() uint64 {
@ -93,7 +94,7 @@ type partition struct {
bigPartsPath string bigPartsPath string
// The callack that returns deleted metric ids which must be skipped during merge. // The callack that returns deleted metric ids which must be skipped during merge.
getDeletedMetricIDs func() map[uint64]struct{} getDeletedMetricIDs func() *uint64set.Set
// Name is the name of the partition in the form YYYY_MM. // Name is the name of the partition in the form YYYY_MM.
name string name string
@ -183,7 +184,7 @@ func (pw *partWrapper) decRef() {
// createPartition creates new partition for the given timestamp and the given paths // createPartition creates new partition for the given timestamp and the given paths
// to small and big partitions. // to small and big partitions.
func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() map[uint64]struct{}) (*partition, error) { func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() *uint64set.Set) (*partition, error) {
name := timestampToPartitionName(timestamp) name := timestampToPartitionName(timestamp)
smallPartsPath := filepath.Clean(smallPartitionsPath) + "/" + name smallPartsPath := filepath.Clean(smallPartitionsPath) + "/" + name
bigPartsPath := filepath.Clean(bigPartitionsPath) + "/" + name bigPartsPath := filepath.Clean(bigPartitionsPath) + "/" + name
@ -218,7 +219,7 @@ func (pt *partition) Drop() {
} }
// openPartition opens the existing partition from the given paths. // openPartition opens the existing partition from the given paths.
func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() map[uint64]struct{}) (*partition, error) { func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set) (*partition, error) {
smallPartsPath = filepath.Clean(smallPartsPath) smallPartsPath = filepath.Clean(smallPartsPath)
bigPartsPath = filepath.Clean(bigPartsPath) bigPartsPath = filepath.Clean(bigPartsPath)
@ -255,7 +256,7 @@ func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func
return pt, nil return pt, nil
} }
func newPartition(name, smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() map[uint64]struct{}) *partition { func newPartition(name, smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set) *partition {
return &partition{ return &partition{
name: name, name: name,
smallPartsPath: smallPartsPath, smallPartsPath: smallPartsPath,

View file

@ -7,6 +7,8 @@ import (
"sort" "sort"
"testing" "testing"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
) )
func TestPartitionSearch(t *testing.T) { func TestPartitionSearch(t *testing.T) {
@ -284,6 +286,6 @@ func testPartitionSearchSerial(pt *partition, tsids []TSID, tr TimeRange, rbsExp
return nil return nil
} }
func nilGetDeletedMetricIDs() map[uint64]struct{} { func nilGetDeletedMetricIDs() *uint64set.Set {
return nil return nil
} }

View file

@ -20,6 +20,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
"github.com/VictoriaMetrics/fastcache" "github.com/VictoriaMetrics/fastcache"
) )
@ -59,7 +60,7 @@ type Storage struct {
// Pending MetricID values to be added to currHourMetricIDs. // Pending MetricID values to be added to currHourMetricIDs.
pendingHourMetricIDsLock sync.Mutex pendingHourMetricIDsLock sync.Mutex
pendingHourMetricIDs map[uint64]struct{} pendingHourMetricIDs *uint64set.Set
stop chan struct{} stop chan struct{}
@ -122,7 +123,7 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) {
hmPrev := s.mustLoadHourMetricIDs(hour-1, "prev_hour_metric_ids") hmPrev := s.mustLoadHourMetricIDs(hour-1, "prev_hour_metric_ids")
s.currHourMetricIDs.Store(hmCurr) s.currHourMetricIDs.Store(hmCurr)
s.prevHourMetricIDs.Store(hmPrev) s.prevHourMetricIDs.Store(hmPrev)
s.pendingHourMetricIDs = make(map[uint64]struct{}) s.pendingHourMetricIDs = &uint64set.Set{}
// Load indexdb // Load indexdb
idbPath := path + "/indexdb" idbPath := path + "/indexdb"
@ -158,7 +159,7 @@ func (s *Storage) debugFlush() {
s.idb().tb.DebugFlush() s.idb().tb.DebugFlush()
} }
func (s *Storage) getDeletedMetricIDs() map[uint64]struct{} { func (s *Storage) getDeletedMetricIDs() *uint64set.Set {
return s.idb().getDeletedMetricIDs() return s.idb().getDeletedMetricIDs()
} }
@ -364,9 +365,9 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs)
hourMetricIDsLen := len(hmPrev.m) hourMetricIDsLen := hmPrev.m.Len()
if len(hmCurr.m) > hourMetricIDsLen { if hmCurr.m.Len() > hourMetricIDsLen {
hourMetricIDsLen = len(hmCurr.m) hourMetricIDsLen = hmCurr.m.Len()
} }
m.HourMetricIDCacheSize += uint64(hourMetricIDsLen) m.HourMetricIDCacheSize += uint64(hourMetricIDsLen)
@ -508,11 +509,11 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
logger.Errorf("discarding %s, since it has broken body; got %d bytes; want %d bytes", path, len(src), 8*hmLen) logger.Errorf("discarding %s, since it has broken body; got %d bytes; want %d bytes", path, len(src), 8*hmLen)
return &hourMetricIDs{} return &hourMetricIDs{}
} }
m := make(map[uint64]struct{}, hmLen) m := &uint64set.Set{}
for i := uint64(0); i < hmLen; i++ { for i := uint64(0); i < hmLen; i++ {
metricID := encoding.UnmarshalUint64(src) metricID := encoding.UnmarshalUint64(src)
src = src[8:] src = src[8:]
m[metricID] = struct{}{} m.Add(metricID)
} }
logger.Infof("loaded %s from %q in %s; entriesCount: %d; sizeBytes: %d", name, path, time.Since(startTime), hmLen, srcOrigLen) logger.Infof("loaded %s from %q in %s; entriesCount: %d; sizeBytes: %d", name, path, time.Since(startTime), hmLen, srcOrigLen)
return &hourMetricIDs{ return &hourMetricIDs{
@ -526,21 +527,21 @@ func (s *Storage) mustSaveHourMetricIDs(hm *hourMetricIDs, name string) {
path := s.cachePath + "/" + name path := s.cachePath + "/" + name
logger.Infof("saving %s to %q...", name, path) logger.Infof("saving %s to %q...", name, path)
startTime := time.Now() startTime := time.Now()
dst := make([]byte, 0, len(hm.m)*8+24) dst := make([]byte, 0, hm.m.Len()*8+24)
isFull := uint64(0) isFull := uint64(0)
if hm.isFull { if hm.isFull {
isFull = 1 isFull = 1
} }
dst = encoding.MarshalUint64(dst, isFull) dst = encoding.MarshalUint64(dst, isFull)
dst = encoding.MarshalUint64(dst, hm.hour) dst = encoding.MarshalUint64(dst, hm.hour)
dst = encoding.MarshalUint64(dst, uint64(len(hm.m))) dst = encoding.MarshalUint64(dst, uint64(hm.m.Len()))
for metricID := range hm.m { for _, metricID := range hm.m.AppendTo(nil) {
dst = encoding.MarshalUint64(dst, metricID) dst = encoding.MarshalUint64(dst, metricID)
} }
if err := ioutil.WriteFile(path, dst, 0644); err != nil { if err := ioutil.WriteFile(path, dst, 0644); err != nil {
logger.Panicf("FATAL: cannot write %d bytes to %q: %s", len(dst), path, err) logger.Panicf("FATAL: cannot write %d bytes to %q: %s", len(dst), path, err)
} }
logger.Infof("saved %s to %q in %s; entriesCount: %d; sizeBytes: %d", name, path, time.Since(startTime), len(hm.m), len(dst)) logger.Infof("saved %s to %q in %s; entriesCount: %d; sizeBytes: %d", name, path, time.Since(startTime), hm.m.Len(), len(dst))
} }
func (s *Storage) mustLoadCache(info, name string, sizeBytes int) *workingsetcache.Cache { func (s *Storage) mustLoadCache(info, name string, sizeBytes int) *workingsetcache.Cache {
@ -810,11 +811,11 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
r.Value = mr.Value r.Value = mr.Value
r.PrecisionBits = precisionBits r.PrecisionBits = precisionBits
if s.getTSIDFromCache(&r.TSID, mr.MetricNameRaw) { if s.getTSIDFromCache(&r.TSID, mr.MetricNameRaw) {
if len(dmis) == 0 { if dmis.Len() == 0 {
// Fast path - the TSID for the given MetricName has been found in cache and isn't deleted. // Fast path - the TSID for the given MetricName has been found in cache and isn't deleted.
continue continue
} }
if _, deleted := dmis[r.TSID.MetricID]; !deleted { if !dmis.Has(r.TSID.MetricID) {
// Fast path - the TSID for the given MetricName has been found in cache and isn't deleted. // Fast path - the TSID for the given MetricName has been found in cache and isn't deleted.
continue continue
} }
@ -884,12 +885,12 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error
hm := s.currHourMetricIDs.Load().(*hourMetricIDs) hm := s.currHourMetricIDs.Load().(*hourMetricIDs)
if hour == hm.hour { if hour == hm.hour {
// The r belongs to the current hour. Check for the current hour cache. // The r belongs to the current hour. Check for the current hour cache.
if _, ok := hm.m[metricID]; ok { if hm.m.Has(metricID) {
// Fast path: the metricID is in the current hour cache. // Fast path: the metricID is in the current hour cache.
continue continue
} }
s.pendingHourMetricIDsLock.Lock() s.pendingHourMetricIDsLock.Lock()
s.pendingHourMetricIDs[metricID] = struct{}{} s.pendingHourMetricIDs.Add(metricID)
s.pendingHourMetricIDsLock.Unlock() s.pendingHourMetricIDsLock.Unlock()
} }
@ -915,7 +916,7 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error
func (s *Storage) updateCurrHourMetricIDs() { func (s *Storage) updateCurrHourMetricIDs() {
hm := s.currHourMetricIDs.Load().(*hourMetricIDs) hm := s.currHourMetricIDs.Load().(*hourMetricIDs)
s.pendingHourMetricIDsLock.Lock() s.pendingHourMetricIDsLock.Lock()
newMetricIDsLen := len(s.pendingHourMetricIDs) newMetricIDsLen := s.pendingHourMetricIDs.Len()
s.pendingHourMetricIDsLock.Unlock() s.pendingHourMetricIDsLock.Unlock()
hour := uint64(timestampFromTime(time.Now())) / msecPerHour hour := uint64(timestampFromTime(time.Now())) / msecPerHour
if newMetricIDsLen == 0 && hm.hour == hour { if newMetricIDsLen == 0 && hm.hour == hour {
@ -924,23 +925,20 @@ func (s *Storage) updateCurrHourMetricIDs() {
} }
// Slow path: hm.m must be updated with non-empty s.pendingHourMetricIDs. // Slow path: hm.m must be updated with non-empty s.pendingHourMetricIDs.
var m map[uint64]struct{} var m *uint64set.Set
isFull := hm.isFull isFull := hm.isFull
if hm.hour == hour { if hm.hour == hour {
m = make(map[uint64]struct{}, len(hm.m)+newMetricIDsLen) m = hm.m.Clone()
for metricID := range hm.m {
m[metricID] = struct{}{}
}
} else { } else {
m = make(map[uint64]struct{}, newMetricIDsLen) m = &uint64set.Set{}
isFull = true isFull = true
} }
s.pendingHourMetricIDsLock.Lock() s.pendingHourMetricIDsLock.Lock()
newMetricIDs := s.pendingHourMetricIDs newMetricIDs := s.pendingHourMetricIDs.AppendTo(nil)
s.pendingHourMetricIDs = make(map[uint64]struct{}, len(newMetricIDs)) s.pendingHourMetricIDs = &uint64set.Set{}
s.pendingHourMetricIDsLock.Unlock() s.pendingHourMetricIDsLock.Unlock()
for metricID := range newMetricIDs { for _, metricID := range newMetricIDs {
m[metricID] = struct{}{} m.Add(metricID)
} }
hmNew := &hourMetricIDs{ hmNew := &hourMetricIDs{
@ -955,7 +953,7 @@ func (s *Storage) updateCurrHourMetricIDs() {
} }
type hourMetricIDs struct { type hourMetricIDs struct {
m map[uint64]struct{} m *uint64set.Set
hour uint64 hour uint64
isFull bool isFull bool
} }

View file

@ -9,6 +9,8 @@ import (
"testing" "testing"
"testing/quick" "testing/quick"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
) )
func TestUpdateCurrHourMetricIDs(t *testing.T) { func TestUpdateCurrHourMetricIDs(t *testing.T) {
@ -16,19 +18,18 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
var s Storage var s Storage
s.currHourMetricIDs.Store(&hourMetricIDs{}) s.currHourMetricIDs.Store(&hourMetricIDs{})
s.prevHourMetricIDs.Store(&hourMetricIDs{}) s.prevHourMetricIDs.Store(&hourMetricIDs{})
s.pendingHourMetricIDs = make(map[uint64]struct{}) s.pendingHourMetricIDs = &uint64set.Set{}
return &s return &s
} }
t.Run("empty_pedning_metric_ids_stale_curr_hour", func(t *testing.T) { t.Run("empty_pedning_metric_ids_stale_curr_hour", func(t *testing.T) {
s := newStorage() s := newStorage()
hour := uint64(timestampFromTime(time.Now())) / msecPerHour hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hmOrig := &hourMetricIDs{ hmOrig := &hourMetricIDs{
m: map[uint64]struct{}{ m: &uint64set.Set{},
12: {},
34: {},
},
hour: 123, hour: 123,
} }
hmOrig.m.Add(12)
hmOrig.m.Add(34)
s.currHourMetricIDs.Store(hmOrig) s.currHourMetricIDs.Store(hmOrig)
s.updateCurrHourMetricIDs() s.updateCurrHourMetricIDs()
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
@ -39,8 +40,8 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
} }
} }
if len(hmCurr.m) != 0 { if hmCurr.m.Len() != 0 {
t.Fatalf("unexpected length of hm.m; got %d; want %d", len(hmCurr.m), 0) t.Fatalf("unexpected length of hm.m; got %d; want %d", hmCurr.m.Len(), 0)
} }
if !hmCurr.isFull { if !hmCurr.isFull {
t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, true) t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, true)
@ -51,20 +52,19 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig) t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig)
} }
if len(s.pendingHourMetricIDs) != 0 { if s.pendingHourMetricIDs.Len() != 0 {
t.Fatalf("unexpected len(s.pendingHourMetricIDs); got %d; want %d", len(s.pendingHourMetricIDs), 0) t.Fatalf("unexpected s.pendingHourMetricIDs.Len(); got %d; want %d", s.pendingHourMetricIDs.Len(), 0)
} }
}) })
t.Run("empty_pedning_metric_ids_valid_curr_hour", func(t *testing.T) { t.Run("empty_pedning_metric_ids_valid_curr_hour", func(t *testing.T) {
s := newStorage() s := newStorage()
hour := uint64(timestampFromTime(time.Now())) / msecPerHour hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hmOrig := &hourMetricIDs{ hmOrig := &hourMetricIDs{
m: map[uint64]struct{}{ m: &uint64set.Set{},
12: {},
34: {},
},
hour: hour, hour: hour,
} }
hmOrig.m.Add(12)
hmOrig.m.Add(34)
s.currHourMetricIDs.Store(hmOrig) s.currHourMetricIDs.Store(hmOrig)
s.updateCurrHourMetricIDs() s.updateCurrHourMetricIDs()
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
@ -90,27 +90,25 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty) t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty)
} }
if len(s.pendingHourMetricIDs) != 0 { if s.pendingHourMetricIDs.Len() != 0 {
t.Fatalf("unexpected len(s.pendingHourMetricIDs); got %d; want %d", len(s.pendingHourMetricIDs), 0) t.Fatalf("unexpected s.pendingHourMetricIDs.Len(); got %d; want %d", s.pendingHourMetricIDs.Len(), 0)
} }
}) })
t.Run("nonempty_pending_metric_ids_stale_curr_hour", func(t *testing.T) { t.Run("nonempty_pending_metric_ids_stale_curr_hour", func(t *testing.T) {
s := newStorage() s := newStorage()
pendingHourMetricIDs := map[uint64]struct{}{ pendingHourMetricIDs := &uint64set.Set{}
343: {}, pendingHourMetricIDs.Add(343)
32424: {}, pendingHourMetricIDs.Add(32424)
8293432: {}, pendingHourMetricIDs.Add(8293432)
}
s.pendingHourMetricIDs = pendingHourMetricIDs s.pendingHourMetricIDs = pendingHourMetricIDs
hour := uint64(timestampFromTime(time.Now())) / msecPerHour hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hmOrig := &hourMetricIDs{ hmOrig := &hourMetricIDs{
m: map[uint64]struct{}{ m: &uint64set.Set{},
12: {},
34: {},
},
hour: 123, hour: 123,
} }
hmOrig.m.Add(12)
hmOrig.m.Add(34)
s.currHourMetricIDs.Store(hmOrig) s.currHourMetricIDs.Store(hmOrig)
s.updateCurrHourMetricIDs() s.updateCurrHourMetricIDs()
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
@ -133,27 +131,25 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig) t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig)
} }
if len(s.pendingHourMetricIDs) != 0 { if s.pendingHourMetricIDs.Len() != 0 {
t.Fatalf("unexpected len(s.pendingHourMetricIDs); got %d; want %d", len(s.pendingHourMetricIDs), 0) t.Fatalf("unexpected s.pendingHourMetricIDs.Len(); got %d; want %d", s.pendingHourMetricIDs.Len(), 0)
} }
}) })
t.Run("nonempty_pending_metric_ids_valid_curr_hour", func(t *testing.T) { t.Run("nonempty_pending_metric_ids_valid_curr_hour", func(t *testing.T) {
s := newStorage() s := newStorage()
pendingHourMetricIDs := map[uint64]struct{}{ pendingHourMetricIDs := &uint64set.Set{}
343: {}, pendingHourMetricIDs.Add(343)
32424: {}, pendingHourMetricIDs.Add(32424)
8293432: {}, pendingHourMetricIDs.Add(8293432)
}
s.pendingHourMetricIDs = pendingHourMetricIDs s.pendingHourMetricIDs = pendingHourMetricIDs
hour := uint64(timestampFromTime(time.Now())) / msecPerHour hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hmOrig := &hourMetricIDs{ hmOrig := &hourMetricIDs{
m: map[uint64]struct{}{ m: &uint64set.Set{},
12: {},
34: {},
},
hour: hour, hour: hour,
} }
hmOrig.m.Add(12)
hmOrig.m.Add(34)
s.currHourMetricIDs.Store(hmOrig) s.currHourMetricIDs.Store(hmOrig)
s.updateCurrHourMetricIDs() s.updateCurrHourMetricIDs()
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
@ -166,9 +162,10 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
// Do not run other checks, since they may fail. // Do not run other checks, since they may fail.
return return
} }
m := getMetricIDsCopy(pendingHourMetricIDs) m := pendingHourMetricIDs.Clone()
for metricID := range hmOrig.m { origMetricIDs := hmOrig.m.AppendTo(nil)
m[metricID] = struct{}{} for _, metricID := range origMetricIDs {
m.Add(metricID)
} }
if !reflect.DeepEqual(hmCurr.m, m) { if !reflect.DeepEqual(hmCurr.m, m) {
t.Fatalf("unexpected hm.m; got %v; want %v", hmCurr.m, m) t.Fatalf("unexpected hm.m; got %v; want %v", hmCurr.m, m)
@ -183,8 +180,8 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty) t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty)
} }
if len(s.pendingHourMetricIDs) != 0 { if s.pendingHourMetricIDs.Len() != 0 {
t.Fatalf("unexpected len(s.pendingHourMetricIDs); got %d; want %d", len(s.pendingHourMetricIDs), 0) t.Fatalf("unexpected s.pendingHourMetricIDs.Len(); got %d; want %d", s.pendingHourMetricIDs.Len(), 0)
} }
}) })
} }

View file

@ -10,6 +10,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
) )
// table represents a single table with time series data. // table represents a single table with time series data.
@ -18,7 +19,7 @@ type table struct {
smallPartitionsPath string smallPartitionsPath string
bigPartitionsPath string bigPartitionsPath string
getDeletedMetricIDs func() map[uint64]struct{} getDeletedMetricIDs func() *uint64set.Set
ptws []*partitionWrapper ptws []*partitionWrapper
ptwsLock sync.Mutex ptwsLock sync.Mutex
@ -75,7 +76,7 @@ func (ptw *partitionWrapper) scheduleToDrop() {
// The table is created if it doesn't exist. // The table is created if it doesn't exist.
// //
// Data older than the retentionMonths may be dropped at any time. // Data older than the retentionMonths may be dropped at any time.
func openTable(path string, retentionMonths int, getDeletedMetricIDs func() map[uint64]struct{}) (*table, error) { func openTable(path string, retentionMonths int, getDeletedMetricIDs func() *uint64set.Set) (*table, error) {
path = filepath.Clean(path) path = filepath.Clean(path)
// Create a directory for the table if it doesn't exist yet. // Create a directory for the table if it doesn't exist yet.
@ -430,7 +431,7 @@ func (tb *table) PutPartitions(ptws []*partitionWrapper) {
} }
} }
func openPartitions(smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() map[uint64]struct{}) ([]*partition, error) { func openPartitions(smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() *uint64set.Set) ([]*partition, error) {
smallD, err := os.Open(smallPartitionsPath) smallD, err := os.Open(smallPartitionsPath)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot open directory with small partitions %q: %s", smallPartitionsPath, err) return nil, fmt.Errorf("cannot open directory with small partitions %q: %s", smallPartitionsPath, err)

332
lib/uint64set/uint64set.go Normal file
View file

@ -0,0 +1,332 @@
package uint64set
import (
"sort"
)
// Set is a fast set for uint64.
//
// It should work faster than map[uint64]struct{} for semi-sparse uint64 values
// such as MetricIDs generated by lib/storage.
//
// It is unsafe calling Set methods from concurrent goroutines.
type Set struct {
itemsCount int
buckets bucket32Sorter
}
type bucket32Sorter []*bucket32
func (s *bucket32Sorter) Len() int { return len(*s) }
func (s *bucket32Sorter) Less(i, j int) bool {
a := *s
return a[i].hi < a[j].hi
}
func (s *bucket32Sorter) Swap(i, j int) {
a := *s
a[i], a[j] = a[j], a[i]
}
// Clone returns an independent copy of s.
func (s *Set) Clone() *Set {
if s == nil {
return nil
}
var dst Set
dst.itemsCount = s.itemsCount
dst.buckets = make([]*bucket32, len(s.buckets))
for i, b32 := range s.buckets {
dst.buckets[i] = b32.clone()
}
return &dst
}
// Len returns the number of distinct uint64 values in s.
func (s *Set) Len() int {
if s == nil {
return 0
}
return s.itemsCount
}
// Add adds x to s.
func (s *Set) Add(x uint64) {
hi := uint32(x >> 32)
lo := uint32(x)
for _, b32 := range s.buckets {
if b32.hi == hi {
if b32.add(lo) {
s.itemsCount++
}
return
}
}
s.addAlloc(hi, lo)
}
func (s *Set) addAlloc(hi, lo uint32) {
var b32 bucket32
b32.hi = hi
_ = b32.add(lo)
s.itemsCount++
s.buckets = append(s.buckets, &b32)
}
// Has verifies whether x exists in s.
func (s *Set) Has(x uint64) bool {
hi := uint32(x >> 32)
lo := uint32(x)
if s == nil {
return false
}
for _, b32 := range s.buckets {
if b32.hi == hi {
return b32.has(lo)
}
}
return false
}
// Del deletes x from s.
func (s *Set) Del(x uint64) {
hi := uint32(x >> 32)
lo := uint32(x)
for _, b32 := range s.buckets {
if b32.hi == hi {
if b32.del(lo) {
s.itemsCount--
}
return
}
}
}
// AppendTo appends all the items from the set to dst and returns the result.
//
// The returned items are sorted.
func (s *Set) AppendTo(dst []uint64) []uint64 {
if s == nil {
return dst
}
// pre-allocate memory for dst
dstLen := len(dst)
if n := s.Len() - cap(dst) + dstLen; n > 0 {
dst = append(dst[:cap(dst)], make([]uint64, n)...)
dst = dst[:dstLen]
}
// sort s.buckets if it isn't sorted yet
if !sort.IsSorted(&s.buckets) {
sort.Sort(&s.buckets)
}
for _, b32 := range s.buckets {
dst = b32.appendTo(dst)
}
return dst
}
type bucket32 struct {
hi uint32
b16his []uint16
buckets []*bucket16
}
func (b *bucket32) clone() *bucket32 {
var dst bucket32
dst.hi = b.hi
dst.b16his = append(dst.b16his[:0], b.b16his...)
dst.buckets = make([]*bucket16, len(b.buckets))
for i, b16 := range b.buckets {
dst.buckets[i] = b16.clone()
}
return &dst
}
// This is for sort.Interface
func (b *bucket32) Len() int { return len(b.b16his) }
func (b *bucket32) Less(i, j int) bool { return b.b16his[i] < b.b16his[j] }
func (b *bucket32) Swap(i, j int) {
his := b.b16his
buckets := b.buckets
his[i], his[j] = his[j], his[i]
buckets[i], buckets[j] = buckets[j], buckets[i]
}
const maxUnsortedBuckets = 32
func (b *bucket32) add(x uint32) bool {
hi := uint16(x >> 16)
lo := uint16(x)
if len(b.buckets) > maxUnsortedBuckets {
return b.addSlow(hi, lo)
}
for i, hi16 := range b.b16his {
if hi16 == hi {
return i < len(b.buckets) && b.buckets[i].add(lo)
}
}
b.addAllocSmall(hi, lo)
return true
}
func (b *bucket32) addAllocSmall(hi, lo uint16) {
var b16 bucket16
_ = b16.add(lo)
b.b16his = append(b.b16his, hi)
b.buckets = append(b.buckets, &b16)
if len(b.buckets) > maxUnsortedBuckets {
sort.Sort(b)
}
}
func (b *bucket32) addSlow(hi, lo uint16) bool {
n := binarySearch16(b.b16his, hi)
if n < 0 || n >= len(b.b16his) || b.b16his[n] != hi {
b.addAllocBig(hi, lo, n)
return true
}
return n < len(b.buckets) && b.buckets[n].add(lo)
}
func (b *bucket32) addAllocBig(hi, lo uint16, n int) {
if n < 0 {
return
}
var b16 bucket16
_ = b16.add(lo)
if n >= len(b.b16his) {
b.b16his = append(b.b16his, hi)
b.buckets = append(b.buckets, &b16)
return
}
b.b16his = append(b.b16his[:n+1], b.b16his[n:]...)
b.b16his[n] = hi
b.buckets = append(b.buckets[:n+1], b.buckets[n:]...)
b.buckets[n] = &b16
}
func (b *bucket32) has(x uint32) bool {
hi := uint16(x >> 16)
lo := uint16(x)
if len(b.buckets) > maxUnsortedBuckets {
return b.hasSlow(hi, lo)
}
for i, hi16 := range b.b16his {
if hi16 == hi {
return i < len(b.buckets) && b.buckets[i].has(lo)
}
}
return false
}
func (b *bucket32) hasSlow(hi, lo uint16) bool {
n := binarySearch16(b.b16his, hi)
if n < 0 || n >= len(b.b16his) || b.b16his[n] != hi {
return false
}
return n < len(b.buckets) && b.buckets[n].has(lo)
}
func (b *bucket32) del(x uint32) bool {
hi := uint16(x >> 16)
lo := uint16(x)
if len(b.buckets) > maxUnsortedBuckets {
return b.delSlow(hi, lo)
}
for i, hi16 := range b.b16his {
if hi16 == hi {
return i < len(b.buckets) && b.buckets[i].del(lo)
}
}
return false
}
func (b *bucket32) delSlow(hi, lo uint16) bool {
n := binarySearch16(b.b16his, hi)
if n < 0 || n >= len(b.b16his) || b.b16his[n] != hi {
return false
}
return n < len(b.buckets) && b.buckets[n].del(lo)
}
func (b *bucket32) appendTo(dst []uint64) []uint64 {
if len(b.buckets) <= maxUnsortedBuckets && !sort.IsSorted(b) {
sort.Sort(b)
}
for i, b16 := range b.buckets {
hi16 := b.b16his[i]
dst = b16.appendTo(dst, b.hi, hi16)
}
return dst
}
const (
bitsPerBucket = 1 << 16
wordsPerBucket = bitsPerBucket / 64
)
type bucket16 struct {
bits [wordsPerBucket]uint64
}
func (b *bucket16) clone() *bucket16 {
var dst bucket16
copy(dst.bits[:], b.bits[:])
return &dst
}
func (b *bucket16) add(x uint16) bool {
wordNum, bitMask := getWordNumBitMask(x)
word := &b.bits[wordNum]
ok := *word&bitMask == 0
*word |= bitMask
return ok
}
func (b *bucket16) has(x uint16) bool {
wordNum, bitMask := getWordNumBitMask(x)
return b.bits[wordNum]&bitMask != 0
}
func (b *bucket16) del(x uint16) bool {
wordNum, bitMask := getWordNumBitMask(x)
word := &b.bits[wordNum]
ok := *word&bitMask != 0
*word &^= bitMask
return ok
}
func (b *bucket16) appendTo(dst []uint64, hi uint32, hi16 uint16) []uint64 {
hi64 := uint64(hi)<<32 | uint64(hi16)<<16
var wordNum uint64
for _, word := range b.bits {
for bitNum := uint64(0); bitNum < 64; bitNum++ {
if word&(uint64(1)<<bitNum) != 0 {
x := hi64 | uint64(wordNum)*64 | bitNum
dst = append(dst, x)
}
}
wordNum++
}
return dst
}
func getWordNumBitMask(x uint16) (uint16, uint64) {
wordNum := x / 64
bitMask := uint64(1) << (x & 63)
return wordNum, bitMask
}
func binarySearch16(u16 []uint16, x uint16) int {
// The code has been adapted from sort.Search.
n := len(u16)
i, j := 0, n
for i < j {
h := int(uint(i+j) >> 1)
if h >= 0 && h < len(u16) && u16[h] < x {
i = h + 1
} else {
j = h
}
}
return i
}

View file

@ -0,0 +1,224 @@
package uint64set
import (
"fmt"
"math/rand"
"sort"
"testing"
"time"
)
func TestSetBasicOps(t *testing.T) {
for _, itemsCount := range []int{1e2, 1e3, 1e4, 1e5, 1e6, maxUnsortedBuckets * bitsPerBucket * 2} {
t.Run(fmt.Sprintf("items_%d", itemsCount), func(t *testing.T) {
testSetBasicOps(t, itemsCount)
})
}
}
func testSetBasicOps(t *testing.T, itemsCount int) {
var s Set
offset := uint64(time.Now().UnixNano())
// Verify forward Add
for i := 0; i < itemsCount/2; i++ {
s.Add(uint64(i) + offset)
}
if n := s.Len(); n != itemsCount/2 {
t.Fatalf("unexpected s.Len() after forward Add; got %d; want %d", n, itemsCount/2)
}
// Verify backward Add
for i := 0; i < itemsCount/2; i++ {
s.Add(uint64(itemsCount-i-1) + offset)
}
if n := s.Len(); n != itemsCount {
t.Fatalf("unexpected s.Len() after backward Add; got %d; want %d", n, itemsCount)
}
// Verify repeated Add
for i := 0; i < itemsCount/2; i++ {
s.Add(uint64(i) + offset)
}
if n := s.Len(); n != itemsCount {
t.Fatalf("unexpected s.Len() after repeated Add; got %d; want %d", n, itemsCount)
}
// Verify Has on existing bits
for i := 0; i < itemsCount; i++ {
if !s.Has(uint64(i) + offset) {
t.Fatalf("missing bit %d", i)
}
}
// Verify Has on missing bits
for i := itemsCount; i < 2*itemsCount; i++ {
if s.Has(uint64(i) + offset) {
t.Fatalf("unexpected bit found: %d", i)
}
}
// Verify Clone
sCopy := s.Clone()
if n := sCopy.Len(); n != itemsCount {
t.Fatalf("unexpected sCopy.Len(); got %d; want %d", n, itemsCount)
}
for i := 0; i < itemsCount; i++ {
if !sCopy.Has(uint64(i) + offset) {
t.Fatalf("missing bit %d on sCopy", i)
}
}
// Verify AppendTo
a := s.AppendTo(nil)
if len(a) != itemsCount {
t.Fatalf("unexpected len of exported array; got %d; want %d; array:\n%d", len(a), itemsCount, a)
}
if !sort.SliceIsSorted(a, func(i, j int) bool { return a[i] < a[j] }) {
t.Fatalf("unsorted result returned from AppendTo: %d", a)
}
m := make(map[uint64]bool)
for _, x := range a {
m[x] = true
}
for i := 0; i < itemsCount; i++ {
if !m[uint64(i)+offset] {
t.Fatalf("missing bit %d in the exported bits; array:\n%d", i, a)
}
}
// Verify Del
for i := itemsCount / 2; i < itemsCount-itemsCount/4; i++ {
s.Del(uint64(i) + offset)
}
if n := s.Len(); n != itemsCount-itemsCount/4 {
t.Fatalf("unexpected s.Len() after Del; got %d; want %d", n, itemsCount-itemsCount/4)
}
a = s.AppendTo(a[:0])
if len(a) != itemsCount-itemsCount/4 {
t.Fatalf("unexpected len of exported array; got %d; want %d", len(a), itemsCount-itemsCount/4)
}
m = make(map[uint64]bool)
for _, x := range a {
m[x] = true
}
for i := 0; i < itemsCount; i++ {
if i >= itemsCount/2 && i < itemsCount-itemsCount/4 {
if m[uint64(i)+offset] {
t.Fatalf("unexpected bit found after deleting: %d", i)
}
} else {
if !m[uint64(i)+offset] {
t.Fatalf("missing bit %d in the exported bits after deleting", i)
}
}
}
// Try Del for non-existing items
for i := itemsCount / 2; i < itemsCount-itemsCount/4; i++ {
s.Del(uint64(i) + offset)
s.Del(uint64(i) + offset)
s.Del(uint64(i) + offset + uint64(itemsCount))
}
if n := s.Len(); n != itemsCount-itemsCount/4 {
t.Fatalf("unexpected s.Len() after Del for non-existing items; got %d; want %d", n, itemsCount-itemsCount/4)
}
// Verify sCopy has the original data
if n := sCopy.Len(); n != itemsCount {
t.Fatalf("unexpected sCopy.Len(); got %d; want %d", n, itemsCount)
}
for i := 0; i < itemsCount; i++ {
if !sCopy.Has(uint64(i) + offset) {
t.Fatalf("missing bit %d on sCopy", i)
}
}
}
func TestSetSparseItems(t *testing.T) {
for _, itemsCount := range []int{1e2, 1e3, 1e4} {
t.Run(fmt.Sprintf("items_%d", itemsCount), func(t *testing.T) {
testSetSparseItems(t, itemsCount)
})
}
}
func testSetSparseItems(t *testing.T, itemsCount int) {
var s Set
m := make(map[uint64]bool)
for i := 0; i < itemsCount; i++ {
x := rand.Uint64()
s.Add(x)
m[x] = true
}
if n := s.Len(); n != len(m) {
t.Fatalf("unexpected Len(); got %d; want %d", n, len(m))
}
// Check Has
for x := range m {
if !s.Has(x) {
t.Fatalf("missing item %d", x)
}
}
for i := 0; i < itemsCount; i++ {
x := uint64(i)
if m[x] {
continue
}
if s.Has(x) {
t.Fatalf("unexpected item found %d", x)
}
}
// Check Clone
sCopy := s.Clone()
if n := sCopy.Len(); n != len(m) {
t.Fatalf("unexpected sCopy.Len(); got %d; want %d", n, len(m))
}
for x := range m {
if !sCopy.Has(x) {
t.Fatalf("missing item %d on sCopy", x)
}
}
// Check AppendTo
a := s.AppendTo(nil)
if len(a) != len(m) {
t.Fatalf("unexpected len for AppendTo result; got %d; want %d", len(a), len(m))
}
if !sort.SliceIsSorted(a, func(i, j int) bool { return a[i] < a[j] }) {
t.Fatalf("unsorted result returned from AppendTo: %d", a)
}
for _, x := range a {
if !m[x] {
t.Fatalf("unexpected item found in AppendTo result: %d", x)
}
}
// Check Del
for x := range m {
s.Del(x)
s.Del(x)
s.Del(x + 1)
s.Del(x - 1)
}
if n := s.Len(); n != 0 {
t.Fatalf("unexpected number of items left after Del; got %d; want 0", n)
}
a = s.AppendTo(a[:0])
if len(a) != 0 {
t.Fatalf("unexpected number of items returned from AppendTo after Del; got %d; want 0; items\n%d", len(a), a)
}
// Check items in sCopy
if n := sCopy.Len(); n != len(m) {
t.Fatalf("unexpected sCopy.Len() after Del; got %d; want %d", n, len(m))
}
for x := range m {
if !sCopy.Has(x) {
t.Fatalf("missing item %d on sCopy after Del", x)
}
}
}

View file

@ -0,0 +1,321 @@
package uint64set
import (
"fmt"
"testing"
"time"
"github.com/valyala/fastrand"
)
func BenchmarkSetAddRandomLastBits(b *testing.B) {
const itemsCount = 1e5
for _, lastBits := range []uint64{20, 24, 28, 32} {
mask := (uint64(1) << lastBits) - 1
b.Run(fmt.Sprintf("lastBits_%d", lastBits), func(b *testing.B) {
b.ReportAllocs()
b.SetBytes(int64(itemsCount))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
start := uint64(time.Now().UnixNano())
var s Set
var rng fastrand.RNG
for i := 0; i < itemsCount; i++ {
n := start | (uint64(rng.Uint32()) & mask)
s.Add(n)
}
}
})
})
}
}
func BenchmarkMapAddRandomLastBits(b *testing.B) {
const itemsCount = 1e5
for _, lastBits := range []uint64{20, 24, 28, 32} {
mask := (uint64(1) << lastBits) - 1
b.Run(fmt.Sprintf("lastBits_%d", lastBits), func(b *testing.B) {
b.ReportAllocs()
b.SetBytes(int64(itemsCount))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
start := uint64(time.Now().UnixNano())
m := make(map[uint64]struct{})
var rng fastrand.RNG
for i := 0; i < itemsCount; i++ {
n := start | (uint64(rng.Uint32()) & mask)
m[n] = struct{}{}
}
}
})
})
}
}
func BenchmarkSetAddWithAllocs(b *testing.B) {
for _, itemsCount := range []uint64{1e3, 1e4, 1e5, 1e6, 1e7} {
b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) {
b.ReportAllocs()
b.SetBytes(int64(itemsCount))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
start := uint64(time.Now().UnixNano())
end := start + itemsCount
var s Set
n := start
for n < end {
s.Add(n)
n++
}
}
})
})
}
}
func BenchmarkMapAddWithAllocs(b *testing.B) {
for _, itemsCount := range []uint64{1e3, 1e4, 1e5, 1e6, 1e7} {
b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) {
b.ReportAllocs()
b.SetBytes(int64(itemsCount))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
start := uint64(time.Now().UnixNano())
end := start + itemsCount
m := make(map[uint64]struct{})
n := start
for n < end {
m[n] = struct{}{}
n++
}
}
})
})
}
}
func BenchmarkMapAddNoAllocs(b *testing.B) {
for _, itemsCount := range []uint64{1e3, 1e4, 1e5, 1e6, 1e7} {
b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) {
b.ReportAllocs()
b.SetBytes(int64(itemsCount))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
start := uint64(time.Now().UnixNano())
end := start + itemsCount
m := make(map[uint64]struct{}, itemsCount)
n := start
for n < end {
m[n] = struct{}{}
n++
}
}
})
})
}
}
func BenchmarkMapAddReuse(b *testing.B) {
for _, itemsCount := range []uint64{1e3, 1e4, 1e5, 1e6, 1e7} {
b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) {
b.ReportAllocs()
b.SetBytes(int64(itemsCount))
b.RunParallel(func(pb *testing.PB) {
m := make(map[uint64]struct{}, itemsCount)
for pb.Next() {
start := uint64(time.Now().UnixNano())
end := start + itemsCount
for k := range m {
delete(m, k)
}
n := start
for n < end {
m[n] = struct{}{}
n++
}
}
})
})
}
}
func BenchmarkSetHasHitRandomLastBits(b *testing.B) {
const itemsCount = 1e5
for _, lastBits := range []uint64{20, 24, 28, 32} {
mask := (uint64(1) << lastBits) - 1
b.Run(fmt.Sprintf("lastBits_%d", lastBits), func(b *testing.B) {
start := uint64(time.Now().UnixNano())
var s Set
var rng fastrand.RNG
for i := 0; i < itemsCount; i++ {
n := start | (uint64(rng.Uint32()) & mask)
s.Add(n)
}
a := s.AppendTo(nil)
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(a)))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
for _, n := range a {
if !s.Has(n) {
panic("unexpected miss")
}
}
}
})
})
}
}
func BenchmarkMapHasHitRandomLastBits(b *testing.B) {
const itemsCount = 1e5
for _, lastBits := range []uint64{20, 24, 28, 32} {
mask := (uint64(1) << lastBits) - 1
b.Run(fmt.Sprintf("lastBits_%d", lastBits), func(b *testing.B) {
start := uint64(time.Now().UnixNano())
m := make(map[uint64]struct{})
var rng fastrand.RNG
for i := 0; i < itemsCount; i++ {
n := start | (uint64(rng.Uint32()) & mask)
m[n] = struct{}{}
}
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(m)))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
for n := range m {
if _, ok := m[n]; !ok {
panic("unexpected miss")
}
}
}
})
})
}
}
func BenchmarkSetHasHit(b *testing.B) {
for _, itemsCount := range []uint64{1e3, 1e4, 1e5, 1e6, 1e7} {
b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) {
start := uint64(time.Now().UnixNano())
end := start + itemsCount
var s Set
n := start
for n < end {
s.Add(n)
n++
}
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(itemsCount))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
n := start
for n < end {
if !s.Has(n) {
panic("unexpected miss")
}
n++
}
}
})
})
}
}
func BenchmarkMapHasHit(b *testing.B) {
for _, itemsCount := range []uint64{1e3, 1e4, 1e5, 1e6, 1e7} {
b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) {
start := uint64(time.Now().UnixNano())
end := start + itemsCount
m := make(map[uint64]struct{}, itemsCount)
n := start
for n < end {
m[n] = struct{}{}
n++
}
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(itemsCount))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
n := start
for n < end {
if _, ok := m[n]; !ok {
panic("unexpected miss")
}
n++
}
}
})
})
}
}
func BenchmarkSetHasMiss(b *testing.B) {
for _, itemsCount := range []uint64{1e3, 1e4, 1e5, 1e6, 1e7} {
b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) {
start := uint64(time.Now().UnixNano())
end := start + itemsCount
var s Set
n := start
for n < end {
s.Add(n)
n++
}
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(itemsCount))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
n := end
nEnd := end + itemsCount
for n < nEnd {
if s.Has(n) {
panic("unexpected hit")
}
n++
}
}
})
})
}
}
func BenchmarkMapHasMiss(b *testing.B) {
for _, itemsCount := range []uint64{1e3, 1e4, 1e5, 1e6, 1e7} {
b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) {
start := uint64(time.Now().UnixNano())
end := start + itemsCount
m := make(map[uint64]struct{}, itemsCount)
n := start
for n < end {
m[n] = struct{}{}
n++
}
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(itemsCount))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
n := end
nEnd := end + itemsCount
for n < nEnd {
if _, ok := m[n]; ok {
panic("unexpected hit")
}
n++
}
}
})
})
}
}