From 7e0c6d4ca6b8881ab887c7704121ae590b2015c2 Mon Sep 17 00:00:00 2001
From: Aliaksandr Valialkin <valyala@gmail.com>
Date: Fri, 20 Sep 2019 11:53:42 +0300
Subject: [PATCH] lib/storage: optimize selecting all the metricIDs by scanning
 MetricID->TSID entries instead of tag->MetricID entries

The number of MetricID->TSID entries is smaller than the number of tag->MetricID entries
and MetricID->TSID entries are usually shorter than tag->MetricID entries.
This should improve performance when selecting all the metricIDs.
---
 lib/storage/index_db.go | 37 ++++++++++++++++++++-----------------
 1 file changed, 20 insertions(+), 17 deletions(-)

diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go
index dbfe0a22bc..bacf2c4dbf 100644
--- a/lib/storage/index_db.go
+++ b/lib/storage/index_db.go
@@ -1439,7 +1439,7 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMet
 	}
 
 	// There is no positive filter with small number of matching metrics.
-	// Create it, so it matches all the MetricIDs for tfs.commonPrefix.
+	// Create it, so it matches all the MetricIDs.
 	kb.B = append(kb.B[:0], uselessNegativeTagFilterKeyPrefix)
 	kb.B = encoding.MarshalUint64(kb.B, uint64(maxMetrics))
 	kb.B = tfs.marshal(kb.B)
@@ -1447,7 +1447,7 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMet
 		return nil, nil, errTooManyMetrics
 	}
 	metricIDs := make(map[uint64]struct{})
-	if err := is.updateMetricIDsForCommonPrefix(metricIDs, tfs.commonPrefix, maxMetrics); err != nil {
+	if err := is.updateMetricIDsAll(metricIDs, tfs.accountID, tfs.projectID, maxMetrics); err != nil {
 		return nil, nil, err
 	}
 	if len(metricIDs) >= maxMetrics {
@@ -1525,7 +1525,7 @@ func (is *indexSearch) searchMetricIDs(tfss []*TagFilters, tr TimeRange, maxMetr
 	for _, tfs := range tfss {
 		if len(tfs.tfs) == 0 {
 			// Return all the metric ids
-			if err := is.updateMetricIDsForCommonPrefix(metricIDs, tfs.commonPrefix, maxMetrics+1); err != nil {
+			if err := is.updateMetricIDsAll(metricIDs, tfs.accountID, tfs.projectID, maxMetrics+1); err != nil {
 				return nil, err
 			}
 			if len(metricIDs) > maxMetrics {
@@ -2041,26 +2041,29 @@ func (is *indexSearch) containsTimeRange(tr TimeRange, accountID, projectID uint
 	return true, nil
 }
 
-func (is *indexSearch) updateMetricIDsForCommonPrefix(metricIDs map[uint64]struct{}, commonPrefix []byte, maxMetrics int) error {
+func (is *indexSearch) updateMetricIDsAll(metricIDs map[uint64]struct{}, accountID, projectID uint32, maxMetrics int) error {
 	ts := &is.ts
-	ts.Seek(commonPrefix)
-	for len(metricIDs) < maxMetrics && ts.NextItem() {
-		k := ts.Item
-		if !bytes.HasPrefix(k, commonPrefix) {
-			break
+	kb := &is.kb
+	kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixMetricIDToTSID, accountID, projectID)
+	prefix := kb.B
+	ts.Seek(prefix)
+	for ts.NextItem() {
+		item := ts.Item
+		if !bytes.HasPrefix(item, prefix) {
+			return nil
 		}
-
-		// Extract MetricID from k (the last 8 bytes).
-		k = k[len(commonPrefix):]
-		if len(k) < 8 {
-			return fmt.Errorf("cannot extract metricID from k; want at least %d bytes; got %d bytes", 8, len(k))
+		tail := item[len(prefix):]
+		if len(tail) < 8 {
+			return fmt.Errorf("cannot unmarshal metricID from item with size %d; need at least 9 bytes; item=%q", len(tail), tail)
 		}
-		v := k[len(k)-8:]
-		metricID := encoding.UnmarshalUint64(v)
+		metricID := encoding.UnmarshalUint64(tail)
 		metricIDs[metricID] = struct{}{}
+		if len(metricIDs) >= maxMetrics {
+			return nil
+		}
 	}
 	if err := ts.Error(); err != nil {
-		return fmt.Errorf("error when searching for metricIDs by commonPrefix %q: %s", commonPrefix, err)
+		return fmt.Errorf("error when searching for all metricIDs by prefix %q: %s", prefix, err)
 	}
 	return nil
 }