From 1b03d7e6de6ba3e3ccaf6499412fbedde9d859bb Mon Sep 17 00:00:00 2001
From: rtm0 <149964189+rtm0@users.noreply.github.com>
Date: Wed, 17 Jul 2024 12:07:14 +0200
Subject: [PATCH] Fix inconsistent error handling in Storage.AddRows() (#6583)

`Storage.AddRows()` returns an error only in one case: when
`Storage.updatePerDateData()` fails to unmarshal a `metricNameRaw`. But
the same error is treated as a warning when it happens inside
`Storage.add()` or returned by `Storage.prefillNextIndexDB()`.

This commit fixes this inconsistency by treating the error returned by
`Storage.updatePerDateData()` as a warning as well. As a result
`Storage.add()` does not need a return value anymore and so doesn't
`Storage.AddRows()`.

Additionally, this commit adds a unit test that checks all cases that
result in a row not being added to the storage.

---------

Signed-off-by: Artem Fetishev <wwctrsrx@gmail.com>
Co-authored-by: Nikolay <nik@victoriametrics.com>
---
 app/vmstorage/servers/vminsert.go  |   3 +-
 docs/CHANGELOG.md                  |   1 +
 lib/storage/index_db_test.go       |   8 +-
 lib/storage/search_test.go         |   8 +-
 lib/storage/storage.go             |  37 ++----
 lib/storage/storage_test.go        | 203 +++++++++++++++++++++++++----
 lib/storage/storage_timing_test.go |   4 +-
 7 files changed, 199 insertions(+), 65 deletions(-)

diff --git a/app/vmstorage/servers/vminsert.go b/app/vmstorage/servers/vminsert.go
index b5e9dfbf96..be1d4c0b3e 100644
--- a/app/vmstorage/servers/vminsert.go
+++ b/app/vmstorage/servers/vminsert.go
@@ -123,7 +123,8 @@ func (s *VMInsertServer) run() {
 			logger.Infof("processing vminsert conn from %s", c.RemoteAddr())
 			err = stream.Parse(bc, func(rows []storage.MetricRow) error {
 				vminsertMetricsRead.Add(len(rows))
-				return s.storage.AddRows(rows, uint8(*precisionBits))
+				s.storage.AddRows(rows, uint8(*precisionBits))
+				return nil
 			}, s.storage.IsReadOnly)
 			if err != nil {
 				if s.isStopping() {
diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md
index b939571fb4..6039b275de 100644
--- a/docs/CHANGELOG.md
+++ b/docs/CHANGELOG.md
@@ -67,6 +67,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
 * BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix input cursor position reset in modal settings. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6530).
 * BUGFIX: [vmbackupmanager](https://docs.victoriametrics.com/vmbackupmanager/): fix `vm_backup_last_run_failed` metric not being properly initialized during startup. Previously, it could imply an error even if the backup have been completed successfully. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6550) for the details.
 * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly calculate [histogram_quantile](https://docs.victoriametrics.com/MetricsQL.html#histogram_quantile) over Prometheus buckets with inconsistent values. See [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4580#issuecomment-2186659102) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6547). Updates [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2819).
+* BUGFIX: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): Fix inconsistent error handling in storage. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6583) for details.
 * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent/): fix panic when using multiple topics with the same name when [ingesting metrics from Kafka](https://docs.victoriametrics.com/vmagent/#kafka-integration). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6636) for the details.
 
 ## [v1.102.0-rc2](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.102.0-rc2)
diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go
index 0be7bde8b3..6cf621e2f5 100644
--- a/lib/storage/index_db_test.go
+++ b/lib/storage/index_db_test.go
@@ -1573,9 +1573,7 @@ func TestIndexDBRepopulateAfterRotation(t *testing.T) {
 	timeMin := currentDayTimestamp - 24*3600*1000
 	timeMax := currentDayTimestamp + 24*3600*1000
 	mrs := testGenerateMetricRows(r, metricRowsN, timeMin, timeMax)
-	if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
-		t.Fatalf("unexpected error when adding mrs: %s", err)
-	}
+	s.AddRows(mrs, defaultPrecisionBits)
 	s.DebugFlush()
 
 	// verify the storage contains rows.
@@ -1627,9 +1625,7 @@ func TestIndexDBRepopulateAfterRotation(t *testing.T) {
 	}
 
 	// Re-insert rows again and verify that all the entries belong to new generation
-	if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
-		t.Fatalf("unexpected error when adding mrs: %s", err)
-	}
+	s.AddRows(mrs, defaultPrecisionBits)
 	s.DebugFlush()
 
 	for _, mr := range mrs {
diff --git a/lib/storage/search_test.go b/lib/storage/search_test.go
index b55ab6261f..7dfa707401 100644
--- a/lib/storage/search_test.go
+++ b/lib/storage/search_test.go
@@ -112,15 +112,11 @@ func TestSearch(t *testing.T) {
 
 		blockRowsCount++
 		if blockRowsCount == rowsPerBlock {
-			if err := st.AddRows(mrs[i-blockRowsCount+1:i+1], defaultPrecisionBits); err != nil {
-				t.Fatalf("cannot add rows %d-%d: %s", i-blockRowsCount+1, i+1, err)
-			}
+			st.AddRows(mrs[i-blockRowsCount+1:i+1], defaultPrecisionBits)
 			blockRowsCount = 0
 		}
 	}
-	if err := st.AddRows(mrs[rowsCount-blockRowsCount:], defaultPrecisionBits); err != nil {
-		t.Fatalf("cannot add rows %v-%v: %s", rowsCount-blockRowsCount, rowsCount, err)
-	}
+	st.AddRows(mrs[rowsCount-blockRowsCount:], defaultPrecisionBits)
 	endTimestamp := mrs[len(mrs)-1].Timestamp
 
 	// Re-open the storage in order to flush all the pending cached data.
diff --git a/lib/storage/storage.go b/lib/storage/storage.go
index b353e2f430..b103769853 100644
--- a/lib/storage/storage.go
+++ b/lib/storage/storage.go
@@ -1720,13 +1720,12 @@ var rowsAddedTotal atomic.Uint64
 //
 // The caller should limit the number of concurrent AddRows calls to the number
 // of available CPU cores in order to limit memory usage.
-func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
+func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) {
 	if len(mrs) == 0 {
-		return nil
+		return
 	}
 
 	// Add rows to the storage in blocks with limited size in order to reduce memory usage.
-	var firstErr error
 	ic := getMetricRowsInsertCtx()
 	maxBlockLen := len(ic.rrs)
 	for len(mrs) > 0 {
@@ -1737,17 +1736,10 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
 		} else {
 			mrs = nil
 		}
-		if err := s.add(ic.rrs, ic.tmpMrs, mrsBlock, precisionBits); err != nil {
-			if firstErr == nil {
-				firstErr = err
-			}
-			continue
-		}
+		s.add(ic.rrs, ic.tmpMrs, mrsBlock, precisionBits)
 		rowsAddedTotal.Add(uint64(len(mrsBlock)))
 	}
 	putMetricRowsInsertCtx(ic)
-
-	return firstErr
 }
 
 type metricRowsInsertCtx struct {
@@ -1886,7 +1878,7 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
 	}
 }
 
-func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, precisionBits uint8) error {
+func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, precisionBits uint8) {
 	idb := s.idb()
 	generation := idb.generation
 	is := idb.getIndexSearch(0, 0, noDeadline)
@@ -1910,7 +1902,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
 
 	var genTSID generationTSID
 
-	// Return only the first error, since it has no sense in returning all errors.
+	// Log only the first error, since it has no sense in logging all errors.
 	var firstWarn error
 
 	j := 0
@@ -2076,23 +2068,20 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
 
 	if err := s.prefillNextIndexDB(rows, dstMrs); err != nil {
 		if firstWarn == nil {
-			firstWarn = err
+			firstWarn = fmt.Errorf("cannot prefill next indexdb: %w", err)
 		}
 	}
+	if err := s.updatePerDateData(rows, dstMrs); err != nil {
+		if firstWarn == nil {
+			firstWarn = fmt.Errorf("cannot not update per-day index: %w", err)
+		}
+	}
+
 	if firstWarn != nil {
 		storageAddRowsLogger.Warnf("warn occurred during rows addition: %s", firstWarn)
 	}
 
-	err := s.updatePerDateData(rows, dstMrs)
-	if err != nil {
-		err = fmt.Errorf("cannot update per-date data: %w", err)
-	} else {
-		s.tb.MustAddRows(rows)
-	}
-	if err != nil {
-		return fmt.Errorf("error occurred during rows addition: %w", err)
-	}
-	return nil
+	s.tb.MustAddRows(rows)
 }
 
 var storageAddRowsLogger = logger.WithThrottler("storageAddRows", 5*time.Second)
diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go
index 0e0441784c..8728707ae1 100644
--- a/lib/storage/storage_test.go
+++ b/lib/storage/storage_test.go
@@ -2,12 +2,12 @@ package storage
 
 import (
 	"fmt"
+	"math"
 	"math/rand"
 	"os"
 	"path/filepath"
 	"reflect"
 	"sort"
-	"strings"
 	"sync"
 	"testing"
 	"testing/quick"
@@ -15,6 +15,7 @@ import (
 
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
 )
 
@@ -655,12 +656,7 @@ func testStorageRandTimestamps(s *Storage) error {
 			}
 			mrs = append(mrs, mr)
 		}
-		if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
-			errStr := err.Error()
-			if !strings.Contains(errStr, "too big timestamp") && !strings.Contains(errStr, "too small timestamp") {
-				return fmt.Errorf("unexpected error when adding mrs: %w", err)
-			}
-		}
+		s.AddRows(mrs, defaultPrecisionBits)
 	}
 
 	// Verify the storage contains rows.
@@ -780,9 +776,7 @@ func testStorageDeleteSeries(s *Storage, workerNum int) error {
 			}
 			mrs = append(mrs, mr)
 		}
-		if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
-			return fmt.Errorf("unexpected error when adding mrs: %w", err)
-		}
+		s.AddRows(mrs, defaultPrecisionBits)
 	}
 	s.DebugFlush()
 
@@ -1200,9 +1194,7 @@ func testStorageAddRows(rng *rand.Rand, s *Storage) error {
 	minTimestamp := maxTimestamp - s.retentionMsecs + 3600*1000
 	for i := 0; i < addsCount; i++ {
 		mrs := testGenerateMetricRows(rng, rowsPerAdd, minTimestamp, maxTimestamp)
-		if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
-			return fmt.Errorf("unexpected error when adding mrs: %w", err)
-		}
+		s.AddRows(mrs, defaultPrecisionBits)
 	}
 
 	// Verify the storage contains rows.
@@ -1343,9 +1335,7 @@ func testStorageAddMetrics(s *Storage, workerNum int) error {
 			Timestamp:     timestamp,
 			Value:         value,
 		}
-		if err := s.AddRows([]MetricRow{mr}, defaultPrecisionBits); err != nil {
-			return fmt.Errorf("unexpected error when adding mrs: %w", err)
-		}
+		s.AddRows([]MetricRow{mr}, defaultPrecisionBits)
 	}
 
 	// Verify the storage contains rows.
@@ -1369,9 +1359,7 @@ func TestStorageDeleteStaleSnapshots(t *testing.T) {
 	minTimestamp := maxTimestamp - s.retentionMsecs
 	for i := 0; i < addsCount; i++ {
 		mrs := testGenerateMetricRows(rng, rowsPerAdd, minTimestamp, maxTimestamp)
-		if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
-			t.Fatalf("unexpected error when adding mrs: %s", err)
-		}
+		s.AddRows(mrs, defaultPrecisionBits)
 	}
 	// Try creating a snapshot from the storage.
 	snapshotName, err := s.CreateSnapshot()
@@ -1441,9 +1429,7 @@ func TestStorageSeriesAreNotCreatedOnStaleMarkers(t *testing.T) {
 	rng := rand.New(rand.NewSource(1))
 	mrs := testGenerateMetricRowsForTenant(accountID, projectID, rng, 20, tr.MinTimestamp, tr.MaxTimestamp)
 	// populate storage with some rows
-	if err := s.AddRows(mrs[:10], defaultPrecisionBits); err != nil {
-		t.Fatal("error when adding mrs: %w", err)
-	}
+	s.AddRows(mrs[:10], defaultPrecisionBits)
 	s.DebugFlush()
 
 	// verify ingested rows are searchable
@@ -1462,9 +1448,7 @@ func TestStorageSeriesAreNotCreatedOnStaleMarkers(t *testing.T) {
 	for i := 0; i < len(mrs); i = i + 2 {
 		mrs[i].Value = decimal.StaleNaN
 	}
-	if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
-		t.Fatal("error when adding mrs: %w", err)
-	}
+	s.AddRows(mrs, defaultPrecisionBits)
 	s.DebugFlush()
 
 	// verify that rows marked as stale aren't searchable
@@ -1475,3 +1459,172 @@ func TestStorageSeriesAreNotCreatedOnStaleMarkers(t *testing.T) {
 		t.Fatalf("cannot remove %q: %s", path, err)
 	}
 }
+
+// testRemoveAll removes all storage data produced by a test if the test hasn't
+// failed. For this to work, the storage must use t.Name() as the base dir in
+// its data path.
+//
+// In case of failure, the data is kept for further debugging.
+func testRemoveAll(t *testing.T) {
+	defer func() {
+		if !t.Failed() {
+			fs.MustRemoveAll(t.Name())
+		}
+	}()
+}
+
+func TestStorageRowsNotAdded(t *testing.T) {
+	const accountID = 123
+	const projectID = 456
+
+	defer testRemoveAll(t)
+
+	type options struct {
+		name      string
+		retention time.Duration
+		mrs       []MetricRow
+		tr        TimeRange
+	}
+	f := func(opts *options) {
+		t.Helper()
+
+		var gotMetrics Metrics
+		path := fmt.Sprintf("%s/%s", t.Name(), opts.name)
+		s := MustOpenStorage(path, opts.retention, 0, 0)
+		defer s.MustClose()
+		s.AddRows(opts.mrs, defaultPrecisionBits)
+		s.DebugFlush()
+		s.UpdateMetrics(&gotMetrics)
+
+		got := testCountAllMetricNames(s, accountID, projectID, opts.tr)
+		if got != 0 {
+			t.Fatalf("unexpected metric name count: got %d, want 0", got)
+		}
+	}
+
+	const numRows = 1000
+	var (
+		rng          = rand.New(rand.NewSource(1))
+		retention    time.Duration
+		minTimestamp int64
+		maxTimestamp int64
+		mrs          []MetricRow
+	)
+
+	minTimestamp = -1000
+	maxTimestamp = -1
+	f(&options{
+		name:      "NegativeTimestamps",
+		retention: retentionMax,
+		mrs:       testGenerateMetricRowsForTenant(accountID, projectID, rng, numRows, minTimestamp, maxTimestamp),
+		tr:        TimeRange{minTimestamp, maxTimestamp},
+	})
+
+	retention = 48 * time.Hour
+	minTimestamp = time.Now().Add(-retention - time.Hour).UnixMilli()
+	maxTimestamp = minTimestamp + 1000
+	f(&options{
+		name:      "TooSmallTimestamps",
+		retention: retention,
+		mrs:       testGenerateMetricRowsForTenant(accountID, projectID, rng, numRows, minTimestamp, maxTimestamp),
+		tr:        TimeRange{minTimestamp, maxTimestamp},
+	})
+
+	retention = 48 * time.Hour
+	minTimestamp = time.Now().Add(7 * 24 * time.Hour).UnixMilli()
+	maxTimestamp = minTimestamp + 1000
+	f(&options{
+		name:      "TooBigTimestamps",
+		retention: retention,
+		mrs:       testGenerateMetricRowsForTenant(accountID, projectID, rng, numRows, minTimestamp, maxTimestamp),
+		tr:        TimeRange{minTimestamp, maxTimestamp},
+	})
+
+	minTimestamp = time.Now().UnixMilli()
+	maxTimestamp = minTimestamp + 1000
+	mrs = testGenerateMetricRowsForTenant(accountID, projectID, rng, numRows, minTimestamp, maxTimestamp)
+	for i := range numRows {
+		mrs[i].Value = math.NaN()
+	}
+	f(&options{
+		name: "NaN",
+		mrs:  mrs,
+		tr:   TimeRange{minTimestamp, maxTimestamp},
+	})
+
+	minTimestamp = time.Now().UnixMilli()
+	maxTimestamp = minTimestamp + 1000
+	mrs = testGenerateMetricRowsForTenant(accountID, projectID, rng, numRows, minTimestamp, maxTimestamp)
+	for i := range numRows {
+		mrs[i].Value = decimal.StaleNaN
+	}
+	f(&options{
+		name: "StaleNaN",
+		mrs:  mrs,
+		tr:   TimeRange{minTimestamp, maxTimestamp},
+	})
+
+	minTimestamp = time.Now().UnixMilli()
+	maxTimestamp = minTimestamp + 1000
+	mrs = testGenerateMetricRowsForTenant(accountID, projectID, rng, numRows, minTimestamp, maxTimestamp)
+	for i := range numRows {
+		mrs[i].MetricNameRaw = []byte("garbage")
+	}
+	f(&options{
+		name: "InvalidMetricNameRaw",
+		mrs:  mrs,
+		tr:   TimeRange{minTimestamp, maxTimestamp},
+	})
+}
+
+func TestStorageRowsNotAdded_SeriesLimitExceeded(t *testing.T) {
+	const accountID = 123
+	const projectID = 456
+
+	defer testRemoveAll(t)
+
+	f := func(name string, maxHourlySeries int, maxDailySeries int) {
+		t.Helper()
+
+		rng := rand.New(rand.NewSource(1))
+		numRows := uint64(1000)
+		minTimestamp := time.Now().UnixMilli()
+		maxTimestamp := minTimestamp + 1000
+		mrs := testGenerateMetricRowsForTenant(accountID, projectID, rng, numRows, minTimestamp, maxTimestamp)
+
+		var gotMetrics Metrics
+		path := fmt.Sprintf("%s/%s", t.Name(), name)
+		s := MustOpenStorage(path, 0, maxHourlySeries, maxDailySeries)
+		defer s.MustClose()
+		s.AddRows(mrs, defaultPrecisionBits)
+		s.DebugFlush()
+		s.UpdateMetrics(&gotMetrics)
+
+		want := numRows - (gotMetrics.HourlySeriesLimitRowsDropped + gotMetrics.DailySeriesLimitRowsDropped)
+		if got := testCountAllMetricNames(s, accountID, projectID, TimeRange{minTimestamp, maxTimestamp}); uint64(got) != want {
+			t.Fatalf("unexpected metric name count: %d, want %d", got, want)
+		}
+	}
+
+	maxHourlySeries := 1
+	maxDailySeries := 0 // No limit
+	f("HourlyLimitExceeded", maxHourlySeries, maxDailySeries)
+
+	maxHourlySeries = 0 // No limit
+	maxDailySeries = 1
+	f("DailyLimitExceeded", maxHourlySeries, maxDailySeries)
+}
+
+// testCountAllMetricNames is a test helper function that counts the names of
+// all time series within the given time range.
+func testCountAllMetricNames(s *Storage, accountID, projectID uint32, tr TimeRange) int {
+	tfsAll := NewTagFilters(accountID, projectID)
+	if err := tfsAll.Add([]byte("__name__"), []byte(".*"), false, true); err != nil {
+		panic(fmt.Sprintf("unexpected error in TagFilters.Add: %v", err))
+	}
+	names, err := s.SearchMetricNames(nil, []*TagFilters{tfsAll}, tr, 1e9, noDeadline)
+	if err != nil {
+		panic(fmt.Sprintf("SeachMetricNames() failed unexpectedly: %v", err))
+	}
+	return len(names)
+}
diff --git a/lib/storage/storage_timing_test.go b/lib/storage/storage_timing_test.go
index 2c080a5ea9..7587d601a7 100644
--- a/lib/storage/storage_timing_test.go
+++ b/lib/storage/storage_timing_test.go
@@ -48,9 +48,7 @@ func benchmarkStorageAddRows(b *testing.B, rowsPerBatch int) {
 				mr.Timestamp = int64(offset + i)
 				mr.Value = float64(offset + i)
 			}
-			if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
-				panic(fmt.Errorf("cannot add rows to storage: %w", err))
-			}
+			s.AddRows(mrs, defaultPrecisionBits)
 		}
 	})
 	b.StopTimer()