From a537c4f602190f5c8576b41b4ff9bbe9ed88e779 Mon Sep 17 00:00:00 2001
From: Aliaksandr Valialkin <valyala@gmail.com>
Date: Thu, 18 Feb 2021 18:32:33 +0200
Subject: [PATCH] lib/storage: properly handle queries containing a filter on
 metric name plus any number of negative filters and zero non-negative filters

Example: `node_cpu_seconds_total{mode!="idle"}`
---
 .../graphite/name-plus-negative-filter.json   | 16 ++++
 docs/CHANGELOG.md                             |  2 +
 lib/storage/tag_filters.go                    | 10 ++-
 lib/storage/tag_filters_test.go               | 84 ++++++++++++++++---
 4 files changed, 100 insertions(+), 12 deletions(-)
 create mode 100644 app/victoria-metrics/testdata/graphite/name-plus-negative-filter.json

diff --git a/app/victoria-metrics/testdata/graphite/name-plus-negative-filter.json b/app/victoria-metrics/testdata/graphite/name-plus-negative-filter.json
new file mode 100644
index 0000000000..32cddbee62
--- /dev/null
+++ b/app/victoria-metrics/testdata/graphite/name-plus-negative-filter.json
@@ -0,0 +1,16 @@
+{
+  "name": "name-plus-negative-filter",
+  "issue": "",
+  "data": [
+    "name-plus-negative-filter;foo=123 1 {TIME_S-1m}",
+    "name-plus-negative-filter;bar=123 2 {TIME_S-1m}",
+    "name-plus-negative-filter;foo=qwe 3 {TIME_S-1m}"
+  ],
+  "query": ["/api/v1/query?query={__name__='name-plus-negative-filter',foo!='123'}&time={TIME_S-1m}"],
+  "result_query": {
+    "status":"success",
+    "data":{"resultType":"vector","result":[
+	    {"metric":{"__name__":"name-plus-negative-filter","foo":"qwe"},"value":["{TIME_S-1m}","3"]}
+    ]}
+  }
+}
diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md
index 3688633f18..b545b86602 100644
--- a/docs/CHANGELOG.md
+++ b/docs/CHANGELOG.md
@@ -2,6 +2,8 @@
 
 # tip
 
+* BUGFIX: properly handle queries containing a filter on metric name plus any number of negative filters and zero non-negative filters. For example, `node_cpu_seconds_total{mode!="idle"}`. The bug was introduced in [v1.54.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.54.0).
+
 
 # [v1.54.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.54.0)
 
diff --git a/lib/storage/tag_filters.go b/lib/storage/tag_filters.go
index f4e3a11c06..eb3f640011 100644
--- a/lib/storage/tag_filters.go
+++ b/lib/storage/tag_filters.go
@@ -35,13 +35,16 @@ func convertToCompositeTagFilters(tfs *TagFilters) *TagFilters {
 		}
 	}
 	if len(name) == 0 {
-		// There is no metric name filter, so composite filters cannot be created.
+		// Composite filters cannot be created in the following cases:
+		// - if there is no filter on metric name
+		// - if there is no at least a single positive filter.
 		atomic.AddUint64(&compositeFilterMissingConversions, 1)
 		return tfs
 	}
 	tfsNew := make([]tagFilter, 0, len(tfs.tfs))
 	var compositeKey []byte
 	compositeFilters := 0
+	hasPositiveFilter := false
 	for _, tf := range tfs.tfs {
 		if len(tf.key) == 0 {
 			if tf.isNegative || tf.isRegexp || string(tf.value) != string(name) {
@@ -58,10 +61,13 @@ func convertToCompositeTagFilters(tfs *TagFilters) *TagFilters {
 		if err := tfNew.Init(tfs.commonPrefix, compositeKey, tf.value, tf.isNegative, tf.isRegexp); err != nil {
 			logger.Panicf("BUG: unexpected error when creating composite tag filter for name=%q and key=%q: %s", name, tf.key, err)
 		}
+		if !tfNew.isNegative {
+			hasPositiveFilter = true
+		}
 		tfsNew = append(tfsNew, tfNew)
 		compositeFilters++
 	}
-	if compositeFilters == 0 {
+	if compositeFilters == 0 || !hasPositiveFilter {
 		atomic.AddUint64(&compositeFilterMissingConversions, 1)
 		return tfs
 	}
diff --git a/lib/storage/tag_filters_test.go b/lib/storage/tag_filters_test.go
index 80cc7fee9e..84077296ce 100644
--- a/lib/storage/tag_filters_test.go
+++ b/lib/storage/tag_filters_test.go
@@ -148,6 +148,70 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
 		},
 	})
 
+	// A name filter with a single negative filter
+	f([]TagFilter{
+		{
+			Key:        nil,
+			Value:      []byte("bar"),
+			IsNegative: false,
+			IsRegexp:   false,
+		},
+		{
+			Key:        []byte("foo"),
+			Value:      []byte("abc"),
+			IsNegative: true,
+			IsRegexp:   false,
+		},
+	}, []TagFilter{
+		{
+			Key:        nil,
+			Value:      []byte("bar"),
+			IsNegative: false,
+			IsRegexp:   false,
+		},
+		{
+			Key:        []byte("foo"),
+			Value:      []byte("abc"),
+			IsNegative: true,
+			IsRegexp:   false,
+		},
+	})
+
+	// A name filter with a negative and a positive filter
+	f([]TagFilter{
+		{
+			Key:        nil,
+			Value:      []byte("bar"),
+			IsNegative: false,
+			IsRegexp:   false,
+		},
+		{
+			Key:        []byte("foo"),
+			Value:      []byte("abc"),
+			IsNegative: true,
+			IsRegexp:   false,
+		},
+		{
+			Key:        []byte("a"),
+			Value:      []byte("b.+"),
+			IsNegative: false,
+			IsRegexp:   true,
+		},
+	}, []TagFilter{
+		{
+			Key:        []byte("\xfe\x03barfoo"),
+			Value:      []byte("abc"),
+			IsNegative: true,
+			IsRegexp:   false,
+		},
+		{
+			Key:        []byte("\xfe\x03bara"),
+			Value:      []byte("b.+"),
+			IsNegative: false,
+			IsRegexp:   true,
+		},
+	})
+
 	// Two name filters with non-name filter.
 	f([]TagFilter{
 		{
@@ -183,7 +247,7 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
 		},
 	})
 
-	// A name filter with negative regexp non-name filter, which can be converted to non-regexp.
+	// A name filter with regexp non-name filter, which can be converted to non-regexp.
 	f([]TagFilter{
 		{
 			Key:        nil,
@@ -194,19 +258,19 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
 		{
 			Key:        []byte("foo"),
 			Value:      []byte("abc"),
-			IsNegative: true,
+			IsNegative: false,
 			IsRegexp:   true,
 		},
 	}, []TagFilter{
 		{
 			Key:        []byte("\xfe\x03barfoo"),
 			Value:      []byte("abc"),
-			IsNegative: true,
+			IsNegative: false,
 			IsRegexp:   false,
 		},
 	})
 
-	// A name filter with negative regexp non-name filter.
+	// A name filter with regexp non-name filter.
 	f([]TagFilter{
 		{
 			Key:        nil,
@@ -217,14 +281,14 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
 		{
 			Key:        []byte("foo"),
 			Value:      []byte("abc.+"),
-			IsNegative: true,
+			IsNegative: false,
 			IsRegexp:   true,
 		},
 	}, []TagFilter{
 		{
 			Key:        []byte("\xfe\x03barfoo"),
 			Value:      []byte("abc.+"),
-			IsNegative: true,
+			IsNegative: false,
 			IsRegexp:   true,
 		},
 	})
@@ -269,7 +333,7 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
 		{
 			Key:        []byte("foo"),
 			Value:      []byte("abc"),
-			IsNegative: true,
+			IsNegative: false,
 			IsRegexp:   true,
 		},
 		{
@@ -282,7 +346,7 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
 		{
 			Key:        []byte("\xfe\x03barfoo"),
 			Value:      []byte("abc"),
-			IsNegative: true,
+			IsNegative: false,
 			IsRegexp:   false,
 		},
 		{
@@ -304,14 +368,14 @@ func TestConvertToCompositeTagFilters(t *testing.T) {
 		{
 			Key:        []byte("foo"),
 			Value:      []byte("abc"),
-			IsNegative: true,
+			IsNegative: false,
 			IsRegexp:   false,
 		},
 	}, []TagFilter{
 		{
 			Key:        []byte("\xfe\x03barfoo"),
 			Value:      []byte("abc"),
-			IsNegative: true,
+			IsNegative: false,
 			IsRegexp:   false,
 		},
 	})