From 85bf768013670d0086a42b59e2bd411d5aeb40ca Mon Sep 17 00:00:00 2001
From: Artem Fetishev <149964189+rtm0@users.noreply.github.com>
Date: Fri, 6 Sep 2024 18:57:21 +0300
Subject: [PATCH] lib/storage: adds metrics that count records that failed to
 insert

### Describe Your Changes

Add storage metrics that count records that failed to insert:

- `RowsReceivedTotal`: the number of records that have been received by
the storage from the clients
- `RowsAddedTotal`: the number of records that have actually been
persisted. This value must be equal to `RowsReceivedTotal` if all the
records have been valid ones. But it will be smaller otherwise. The
values of the metrics below should provide the insight of why some
records hasn't been added
-   `NaNValueRows`: the number of records whose value was `NaN`
- `StaleNaNValueRows`: the number of records whose value was `Stale NaN`
- `InvalidRawMetricNames`: the number of records whose raw metric name
has failed to unmarshal.

The following metrics existed before this PR and are listed here for
completeness:

- `TooSmallTimestampRows`: the number of records whose timestamp is
negative or is older than retention period
- `TooBigTimestampRows`: the number of records whose timestamp is too
far in the future.
- `HourlySeriesLimitRowsDropped`: the number of records that have not
been added because the hourly series limit has been exceeded.
- `DailySeriesLimitRowsDropped`: the number of records that have not
been added because the daily series limit has been exceeded.

---
Signed-off-by: Artem Fetishev <wwctrsrx@gmail.com>
---
 app/vmstorage/main.go       |  9 ++++++
 lib/storage/storage.go      | 31 +++++++++++++++++---
 lib/storage/storage_test.go | 56 ++++++++++++++++++++++++++++++++++---
 3 files changed, 88 insertions(+), 8 deletions(-)

diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go
index d762a9d928..ec3d6a9c5f 100644
--- a/app/vmstorage/main.go
+++ b/app/vmstorage/main.go
@@ -446,12 +446,21 @@ func writeStorageMetrics(w io.Writer, strg *storage.Storage) {
 	metrics.WriteGaugeUint64(w, `vm_data_size_bytes{type="indexdb/inmemory"}`, idbm.InmemorySizeBytes)
 	metrics.WriteGaugeUint64(w, `vm_data_size_bytes{type="indexdb/file"}`, idbm.FileSizeBytes)
 
+	metrics.WriteCounterUint64(w, `vm_rows_received_by_storage_total`, m.RowsReceivedTotal)
 	metrics.WriteCounterUint64(w, `vm_rows_added_to_storage_total`, m.RowsAddedTotal)
 	metrics.WriteCounterUint64(w, `vm_deduplicated_samples_total{type="merge"}`, m.DedupsDuringMerge)
 	metrics.WriteGaugeUint64(w, `vm_snapshots`, m.SnapshotsCount)
 
+	metrics.WriteCounterUint64(w, `vm_rows_ignored_total{reason="nan_value"}`, m.NaNValueRows)
 	metrics.WriteCounterUint64(w, `vm_rows_ignored_total{reason="big_timestamp"}`, m.TooBigTimestampRows)
 	metrics.WriteCounterUint64(w, `vm_rows_ignored_total{reason="small_timestamp"}`, m.TooSmallTimestampRows)
+	metrics.WriteCounterUint64(w, `vm_rows_ignored_total{reason="invalid_raw_metric_name"}`, m.InvalidRawMetricNames)
+	if *maxHourlySeries > 0 {
+		metrics.WriteCounterUint64(w, `vm_rows_ignored_total{reason="hourly_limit_exceeded"}`, m.HourlySeriesLimitRowsDropped)
+	}
+	if *maxDailySeries > 0 {
+		metrics.WriteCounterUint64(w, `vm_rows_ignored_total{reason="daily_limit_exceeded"}`, m.DailySeriesLimitRowsDropped)
+	}
 
 	metrics.WriteCounterUint64(w, `vm_timeseries_repopulated_total`, m.TimeseriesRepopulated)
 	metrics.WriteCounterUint64(w, `vm_timeseries_precreated_total`, m.TimeseriesPreCreated)
diff --git a/lib/storage/storage.go b/lib/storage/storage.go
index 638747cc5d..d7bfb01aa0 100644
--- a/lib/storage/storage.go
+++ b/lib/storage/storage.go
@@ -40,10 +40,13 @@ const (
 
 // Storage represents TSDB storage.
 type Storage struct {
-	rowsAddedTotal atomic.Uint64
+	rowsReceivedTotal atomic.Uint64
+	rowsAddedTotal    atomic.Uint64
+	naNValueRows      atomic.Uint64
 
 	tooSmallTimestampRows atomic.Uint64
 	tooBigTimestampRows   atomic.Uint64
+	invalidRawMetricNames atomic.Uint64
 
 	timeseriesRepopulated  atomic.Uint64
 	timeseriesPreCreated   atomic.Uint64
@@ -496,12 +499,15 @@ func (s *Storage) idb() *indexDB {
 
 // Metrics contains essential metrics for the Storage.
 type Metrics struct {
+	RowsReceivedTotal uint64
 	RowsAddedTotal    uint64
 	DedupsDuringMerge uint64
 	SnapshotsCount    uint64
 
+	NaNValueRows          uint64
 	TooSmallTimestampRows uint64
 	TooBigTimestampRows   uint64
+	InvalidRawMetricNames uint64
 
 	TimeseriesRepopulated  uint64
 	TimeseriesPreCreated   uint64
@@ -569,12 +575,15 @@ func (m *Metrics) Reset() {
 
 // UpdateMetrics updates m with metrics from s.
 func (s *Storage) UpdateMetrics(m *Metrics) {
+	m.RowsReceivedTotal += s.rowsReceivedTotal.Load()
 	m.RowsAddedTotal += s.rowsAddedTotal.Load()
 	m.DedupsDuringMerge = dedupsDuringMerge.Load()
 	m.SnapshotsCount += uint64(s.mustGetSnapshotsCount())
 
+	m.NaNValueRows += s.naNValueRows.Load()
 	m.TooSmallTimestampRows += s.tooSmallTimestampRows.Load()
 	m.TooBigTimestampRows += s.tooBigTimestampRows.Load()
+	m.InvalidRawMetricNames += s.invalidRawMetricNames.Load()
 
 	m.TimeseriesRepopulated += s.timeseriesRepopulated.Load()
 	m.TimeseriesPreCreated += s.timeseriesPreCreated.Load()
@@ -1736,8 +1745,13 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) {
 		} else {
 			mrs = nil
 		}
-		s.add(ic.rrs, ic.tmpMrs, mrsBlock, precisionBits)
-		s.rowsAddedTotal.Add(uint64(len(mrsBlock)))
+		rowsAdded := s.add(ic.rrs, ic.tmpMrs, mrsBlock, precisionBits)
+
+		// If the number of received rows is greater than the number of added
+		// rows, then some rows have failed to add. Check logs for the first
+		// error.
+		s.rowsAddedTotal.Add(uint64(rowsAdded))
+		s.rowsReceivedTotal.Add(uint64(len(mrsBlock)))
 	}
 	putMetricRowsInsertCtx(ic)
 }
@@ -1809,6 +1823,7 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
 					if firstWarn == nil {
 						firstWarn = fmt.Errorf("cannot umarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err)
 					}
+					s.invalidRawMetricNames.Add(1)
 					continue
 				}
 				mn.sortTags()
@@ -1843,6 +1858,7 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
 			if firstWarn == nil {
 				firstWarn = fmt.Errorf("cannot umarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err)
 			}
+			s.invalidRawMetricNames.Add(1)
 			continue
 		}
 		mn.sortTags()
@@ -1893,7 +1909,7 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
 	}
 }
 
-func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, precisionBits uint8) {
+func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, precisionBits uint8) int {
 	idb := s.idb()
 	generation := idb.generation
 	is := idb.getIndexSearch(0, 0, noDeadline)
@@ -1927,6 +1943,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
 			if !decimal.IsStaleNaN(mr.Value) {
 				// Skip NaNs other than Prometheus staleness marker, since the underlying encoding
 				// doesn't know how to work with them.
+				s.naNValueRows.Add(1)
 				continue
 			}
 		}
@@ -1989,6 +2006,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
 						firstWarn = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err)
 					}
 					j--
+					s.invalidRawMetricNames.Add(1)
 					continue
 				}
 				mn.sortTags()
@@ -2013,6 +2031,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
 				firstWarn = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err)
 			}
 			j--
+			s.invalidRawMetricNames.Add(1)
 			continue
 		}
 		mn.sortTags()
@@ -2088,6 +2107,8 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
 	}
 
 	s.tb.MustAddRows(rows)
+
+	return len(rows)
 }
 
 var storageAddRowsLogger = logger.WithThrottler("storageAddRows", 5*time.Second)
@@ -2208,6 +2229,7 @@ func (s *Storage) prefillNextIndexDB(rows []rawRow, mrs []*MetricRow) error {
 			if firstError == nil {
 				firstError = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", metricNameRaw, err)
 			}
+			s.invalidRawMetricNames.Add(1)
 			continue
 		}
 		mn.sortTags()
@@ -2364,6 +2386,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
 				if firstError == nil {
 					firstError = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", dmid.mr.MetricNameRaw, err)
 				}
+				s.invalidRawMetricNames.Add(1)
 				continue
 			}
 			mn.sortTags()
diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go
index 02799bbcc7..624dda9901 100644
--- a/lib/storage/storage_test.go
+++ b/lib/storage/storage_test.go
@@ -1447,10 +1447,11 @@ func TestStorageRowsNotAdded(t *testing.T) {
 	defer testRemoveAll(t)
 
 	type options struct {
-		name      string
-		retention time.Duration
-		mrs       []MetricRow
-		tr        TimeRange
+		name        string
+		retention   time.Duration
+		mrs         []MetricRow
+		tr          TimeRange
+		wantMetrics *Metrics
 	}
 	f := func(opts *options) {
 		t.Helper()
@@ -1467,6 +1468,19 @@ func TestStorageRowsNotAdded(t *testing.T) {
 		if got != 0 {
 			t.Fatalf("unexpected metric name count: got %d, want 0", got)
 		}
+
+		if got, want := gotMetrics.RowsReceivedTotal, opts.wantMetrics.RowsReceivedTotal; got != want {
+			t.Fatalf("unexpected Metrics.RowsReceivedTotal: got %d, want %d", got, want)
+		}
+		if got, want := gotMetrics.RowsAddedTotal, opts.wantMetrics.RowsAddedTotal; got != want {
+			t.Fatalf("unexpected Metrics.RowsAddedTotal: got %d, want %d", got, want)
+		}
+		if got, want := gotMetrics.NaNValueRows, opts.wantMetrics.NaNValueRows; got != want {
+			t.Fatalf("unexpected Metrics.NaNValueRows: got %d, want %d", got, want)
+		}
+		if got, want := gotMetrics.InvalidRawMetricNames, opts.wantMetrics.InvalidRawMetricNames; got != want {
+			t.Fatalf("unexpected Metrics.InvalidRawMetricNames: got %d, want %d", got, want)
+		}
 	}
 
 	const numRows = 1000
@@ -1485,6 +1499,10 @@ func TestStorageRowsNotAdded(t *testing.T) {
 		retention: retentionMax,
 		mrs:       testGenerateMetricRowsForTenant(accountID, projectID, rng, numRows, minTimestamp, maxTimestamp),
 		tr:        TimeRange{minTimestamp, maxTimestamp},
+		wantMetrics: &Metrics{
+			RowsReceivedTotal:     numRows,
+			TooSmallTimestampRows: numRows,
+		},
 	})
 
 	retention = 48 * time.Hour
@@ -1495,6 +1513,10 @@ func TestStorageRowsNotAdded(t *testing.T) {
 		retention: retention,
 		mrs:       testGenerateMetricRowsForTenant(accountID, projectID, rng, numRows, minTimestamp, maxTimestamp),
 		tr:        TimeRange{minTimestamp, maxTimestamp},
+		wantMetrics: &Metrics{
+			RowsReceivedTotal:     numRows,
+			TooSmallTimestampRows: numRows,
+		},
 	})
 
 	retention = 48 * time.Hour
@@ -1505,6 +1527,10 @@ func TestStorageRowsNotAdded(t *testing.T) {
 		retention: retention,
 		mrs:       testGenerateMetricRowsForTenant(accountID, projectID, rng, numRows, minTimestamp, maxTimestamp),
 		tr:        TimeRange{minTimestamp, maxTimestamp},
+		wantMetrics: &Metrics{
+			RowsReceivedTotal:   numRows,
+			TooBigTimestampRows: numRows,
+		},
 	})
 
 	minTimestamp = time.Now().UnixMilli()
@@ -1517,6 +1543,10 @@ func TestStorageRowsNotAdded(t *testing.T) {
 		name: "NaN",
 		mrs:  mrs,
 		tr:   TimeRange{minTimestamp, maxTimestamp},
+		wantMetrics: &Metrics{
+			RowsReceivedTotal: numRows,
+			NaNValueRows:      numRows,
+		},
 	})
 
 	minTimestamp = time.Now().UnixMilli()
@@ -1529,6 +1559,10 @@ func TestStorageRowsNotAdded(t *testing.T) {
 		name: "InvalidMetricNameRaw",
 		mrs:  mrs,
 		tr:   TimeRange{minTimestamp, maxTimestamp},
+		wantMetrics: &Metrics{
+			RowsReceivedTotal:     numRows,
+			InvalidRawMetricNames: numRows,
+		},
 	})
 }
 
@@ -1555,10 +1589,24 @@ func TestStorageRowsNotAdded_SeriesLimitExceeded(t *testing.T) {
 		s.DebugFlush()
 		s.UpdateMetrics(&gotMetrics)
 
+		if got, want := gotMetrics.RowsReceivedTotal, numRows; got != want {
+			t.Fatalf("unexpected Metrics.RowsReceivedTotal: got %d, want %d", got, want)
+		}
+		if got := gotMetrics.HourlySeriesLimitRowsDropped; maxHourlySeries > 0 && got <= 0 {
+			t.Fatalf("unexpected Metrics.HourlySeriesLimitRowsDropped: got %d, want > 0", got)
+		}
+		if got := gotMetrics.DailySeriesLimitRowsDropped; maxDailySeries > 0 && got <= 0 {
+			t.Fatalf("unexpected Metrics.DailySeriesLimitRowsDropped: got %d, want > 0", got)
+		}
+
 		want := numRows - (gotMetrics.HourlySeriesLimitRowsDropped + gotMetrics.DailySeriesLimitRowsDropped)
 		if got := testCountAllMetricNames(s, accountID, projectID, TimeRange{minTimestamp, maxTimestamp}); uint64(got) != want {
 			t.Fatalf("unexpected metric name count: %d, want %d", got, want)
 		}
+
+		if got := gotMetrics.RowsAddedTotal; got != want {
+			t.Fatalf("unexpected Metrics.RowsAddedTotal: got %d, want %d", got, want)
+		}
 	}
 
 	maxHourlySeries := 1