From a5424e95b393022090a7996aa92cefe4d29f7311 Mon Sep 17 00:00:00 2001 From: Artem Fetishev <149964189+rtm0@users.noreply.github.com> Date: Fri, 6 Sep 2024 18:57:21 +0300 Subject: [PATCH] lib/storage: adds metrics that count records that failed to insert ### Describe Your Changes Add storage metrics that count records that failed to insert: - `RowsReceivedTotal`: the number of records that have been received by the storage from the clients - `RowsAddedTotal`: the number of records that have actually been persisted. This value must be equal to `RowsReceivedTotal` if all the records have been valid ones. But it will be smaller otherwise. The values of the metrics below should provide the insight of why some records hasn't been added - `NaNValueRows`: the number of records whose value was `NaN` - `StaleNaNValueRows`: the number of records whose value was `Stale NaN` - `InvalidRawMetricNames`: the number of records whose raw metric name has failed to unmarshal. The following metrics existed before this PR and are listed here for completeness: - `TooSmallTimestampRows`: the number of records whose timestamp is negative or is older than retention period - `TooBigTimestampRows`: the number of records whose timestamp is too far in the future. - `HourlySeriesLimitRowsDropped`: the number of records that have not been added because the hourly series limit has been exceeded. - `DailySeriesLimitRowsDropped`: the number of records that have not been added because the daily series limit has been exceeded. --- Signed-off-by: Artem Fetishev --- app/vmstorage/main.go | 9 ++++++ lib/storage/storage.go | 31 +++++++++++++++++--- lib/storage/storage_test.go | 56 ++++++++++++++++++++++++++++++++++--- 3 files changed, 88 insertions(+), 8 deletions(-) diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index d496310f90..a4d7c4f97c 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -521,12 +521,21 @@ func writeStorageMetrics(w io.Writer, strg *storage.Storage) { metrics.WriteGaugeUint64(w, `vm_data_size_bytes{type="indexdb/inmemory"}`, idbm.InmemorySizeBytes) metrics.WriteGaugeUint64(w, `vm_data_size_bytes{type="indexdb/file"}`, idbm.FileSizeBytes) + metrics.WriteCounterUint64(w, `vm_rows_received_by_storage_total`, m.RowsReceivedTotal) metrics.WriteCounterUint64(w, `vm_rows_added_to_storage_total`, m.RowsAddedTotal) metrics.WriteCounterUint64(w, `vm_deduplicated_samples_total{type="merge"}`, m.DedupsDuringMerge) metrics.WriteGaugeUint64(w, `vm_snapshots`, m.SnapshotsCount) + metrics.WriteCounterUint64(w, `vm_rows_ignored_total{reason="nan_value"}`, m.NaNValueRows) metrics.WriteCounterUint64(w, `vm_rows_ignored_total{reason="big_timestamp"}`, m.TooBigTimestampRows) metrics.WriteCounterUint64(w, `vm_rows_ignored_total{reason="small_timestamp"}`, m.TooSmallTimestampRows) + metrics.WriteCounterUint64(w, `vm_rows_ignored_total{reason="invalid_raw_metric_name"}`, m.InvalidRawMetricNames) + if *maxHourlySeries > 0 { + metrics.WriteCounterUint64(w, `vm_rows_ignored_total{reason="hourly_limit_exceeded"}`, m.HourlySeriesLimitRowsDropped) + } + if *maxDailySeries > 0 { + metrics.WriteCounterUint64(w, `vm_rows_ignored_total{reason="daily_limit_exceeded"}`, m.DailySeriesLimitRowsDropped) + } metrics.WriteCounterUint64(w, `vm_timeseries_repopulated_total`, m.TimeseriesRepopulated) metrics.WriteCounterUint64(w, `vm_timeseries_precreated_total`, m.TimeseriesPreCreated) diff --git a/lib/storage/storage.go b/lib/storage/storage.go index ea6cdbc52e..2125ca247c 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -40,10 +40,13 @@ const ( // Storage represents TSDB storage. type Storage struct { - rowsAddedTotal atomic.Uint64 + rowsReceivedTotal atomic.Uint64 + rowsAddedTotal atomic.Uint64 + naNValueRows atomic.Uint64 tooSmallTimestampRows atomic.Uint64 tooBigTimestampRows atomic.Uint64 + invalidRawMetricNames atomic.Uint64 timeseriesRepopulated atomic.Uint64 timeseriesPreCreated atomic.Uint64 @@ -481,12 +484,15 @@ func (s *Storage) idb() *indexDB { // Metrics contains essential metrics for the Storage. type Metrics struct { + RowsReceivedTotal uint64 RowsAddedTotal uint64 DedupsDuringMerge uint64 SnapshotsCount uint64 + NaNValueRows uint64 TooSmallTimestampRows uint64 TooBigTimestampRows uint64 + InvalidRawMetricNames uint64 TimeseriesRepopulated uint64 TimeseriesPreCreated uint64 @@ -554,12 +560,15 @@ func (m *Metrics) Reset() { // UpdateMetrics updates m with metrics from s. func (s *Storage) UpdateMetrics(m *Metrics) { + m.RowsReceivedTotal += s.rowsReceivedTotal.Load() m.RowsAddedTotal += s.rowsAddedTotal.Load() m.DedupsDuringMerge = dedupsDuringMerge.Load() m.SnapshotsCount += uint64(s.mustGetSnapshotsCount()) + m.NaNValueRows += s.naNValueRows.Load() m.TooSmallTimestampRows += s.tooSmallTimestampRows.Load() m.TooBigTimestampRows += s.tooBigTimestampRows.Load() + m.InvalidRawMetricNames += s.invalidRawMetricNames.Load() m.TimeseriesRepopulated += s.timeseriesRepopulated.Load() m.TimeseriesPreCreated += s.timeseriesPreCreated.Load() @@ -1628,8 +1637,13 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) { } else { mrs = nil } - s.add(ic.rrs, ic.tmpMrs, mrsBlock, precisionBits) - s.rowsAddedTotal.Add(uint64(len(mrsBlock))) + rowsAdded := s.add(ic.rrs, ic.tmpMrs, mrsBlock, precisionBits) + + // If the number of received rows is greater than the number of added + // rows, then some rows have failed to add. Check logs for the first + // error. + s.rowsAddedTotal.Add(uint64(rowsAdded)) + s.rowsReceivedTotal.Add(uint64(len(mrsBlock))) } putMetricRowsInsertCtx(ic) } @@ -1701,6 +1715,7 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) { if firstWarn == nil { firstWarn = fmt.Errorf("cannot umarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err) } + s.invalidRawMetricNames.Add(1) continue } mn.sortTags() @@ -1735,6 +1750,7 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) { if firstWarn == nil { firstWarn = fmt.Errorf("cannot umarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err) } + s.invalidRawMetricNames.Add(1) continue } mn.sortTags() @@ -1785,7 +1801,7 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) { } } -func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, precisionBits uint8) { +func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, precisionBits uint8) int { idb := s.idb() generation := idb.generation is := idb.getIndexSearch(noDeadline) @@ -1819,6 +1835,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci if !decimal.IsStaleNaN(mr.Value) { // Skip NaNs other than Prometheus staleness marker, since the underlying encoding // doesn't know how to work with them. + s.naNValueRows.Add(1) continue } } @@ -1881,6 +1898,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci firstWarn = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err) } j-- + s.invalidRawMetricNames.Add(1) continue } mn.sortTags() @@ -1905,6 +1923,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci firstWarn = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err) } j-- + s.invalidRawMetricNames.Add(1) continue } mn.sortTags() @@ -1980,6 +1999,8 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci } s.tb.MustAddRows(rows) + + return len(rows) } var storageAddRowsLogger = logger.WithThrottler("storageAddRows", 5*time.Second) @@ -2100,6 +2121,7 @@ func (s *Storage) prefillNextIndexDB(rows []rawRow, mrs []*MetricRow) error { if firstError == nil { firstError = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", metricNameRaw, err) } + s.invalidRawMetricNames.Add(1) continue } mn.sortTags() @@ -2245,6 +2267,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error { if firstError == nil { firstError = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", dmid.mr.MetricNameRaw, err) } + s.invalidRawMetricNames.Add(1) continue } mn.sortTags() diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index 7b8525227d..8ccccc4ee2 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -1248,10 +1248,11 @@ func TestStorageRowsNotAdded(t *testing.T) { defer testRemoveAll(t) type options struct { - name string - retention time.Duration - mrs []MetricRow - tr TimeRange + name string + retention time.Duration + mrs []MetricRow + tr TimeRange + wantMetrics *Metrics } f := func(opts *options) { t.Helper() @@ -1268,6 +1269,19 @@ func TestStorageRowsNotAdded(t *testing.T) { if got != 0 { t.Fatalf("unexpected metric name count: got %d, want 0", got) } + + if got, want := gotMetrics.RowsReceivedTotal, opts.wantMetrics.RowsReceivedTotal; got != want { + t.Fatalf("unexpected Metrics.RowsReceivedTotal: got %d, want %d", got, want) + } + if got, want := gotMetrics.RowsAddedTotal, opts.wantMetrics.RowsAddedTotal; got != want { + t.Fatalf("unexpected Metrics.RowsAddedTotal: got %d, want %d", got, want) + } + if got, want := gotMetrics.NaNValueRows, opts.wantMetrics.NaNValueRows; got != want { + t.Fatalf("unexpected Metrics.NaNValueRows: got %d, want %d", got, want) + } + if got, want := gotMetrics.InvalidRawMetricNames, opts.wantMetrics.InvalidRawMetricNames; got != want { + t.Fatalf("unexpected Metrics.InvalidRawMetricNames: got %d, want %d", got, want) + } } const numRows = 1000 @@ -1286,6 +1300,10 @@ func TestStorageRowsNotAdded(t *testing.T) { retention: retentionMax, mrs: testGenerateMetricRows(rng, numRows, minTimestamp, maxTimestamp), tr: TimeRange{minTimestamp, maxTimestamp}, + wantMetrics: &Metrics{ + RowsReceivedTotal: numRows, + TooSmallTimestampRows: numRows, + }, }) retention = 48 * time.Hour @@ -1296,6 +1314,10 @@ func TestStorageRowsNotAdded(t *testing.T) { retention: retention, mrs: testGenerateMetricRows(rng, numRows, minTimestamp, maxTimestamp), tr: TimeRange{minTimestamp, maxTimestamp}, + wantMetrics: &Metrics{ + RowsReceivedTotal: numRows, + TooSmallTimestampRows: numRows, + }, }) retention = 48 * time.Hour @@ -1306,6 +1328,10 @@ func TestStorageRowsNotAdded(t *testing.T) { retention: retention, mrs: testGenerateMetricRows(rng, numRows, minTimestamp, maxTimestamp), tr: TimeRange{minTimestamp, maxTimestamp}, + wantMetrics: &Metrics{ + RowsReceivedTotal: numRows, + TooBigTimestampRows: numRows, + }, }) minTimestamp = time.Now().UnixMilli() @@ -1318,6 +1344,10 @@ func TestStorageRowsNotAdded(t *testing.T) { name: "NaN", mrs: mrs, tr: TimeRange{minTimestamp, maxTimestamp}, + wantMetrics: &Metrics{ + RowsReceivedTotal: numRows, + NaNValueRows: numRows, + }, }) minTimestamp = time.Now().UnixMilli() @@ -1330,6 +1360,10 @@ func TestStorageRowsNotAdded(t *testing.T) { name: "InvalidMetricNameRaw", mrs: mrs, tr: TimeRange{minTimestamp, maxTimestamp}, + wantMetrics: &Metrics{ + RowsReceivedTotal: numRows, + InvalidRawMetricNames: numRows, + }, }) } @@ -1353,10 +1387,24 @@ func TestStorageRowsNotAdded_SeriesLimitExceeded(t *testing.T) { s.DebugFlush() s.UpdateMetrics(&gotMetrics) + if got, want := gotMetrics.RowsReceivedTotal, numRows; got != want { + t.Fatalf("unexpected Metrics.RowsReceivedTotal: got %d, want %d", got, want) + } + if got := gotMetrics.HourlySeriesLimitRowsDropped; maxHourlySeries > 0 && got <= 0 { + t.Fatalf("unexpected Metrics.HourlySeriesLimitRowsDropped: got %d, want > 0", got) + } + if got := gotMetrics.DailySeriesLimitRowsDropped; maxDailySeries > 0 && got <= 0 { + t.Fatalf("unexpected Metrics.DailySeriesLimitRowsDropped: got %d, want > 0", got) + } + want := numRows - (gotMetrics.HourlySeriesLimitRowsDropped + gotMetrics.DailySeriesLimitRowsDropped) if got := testCountAllMetricNames(s, TimeRange{minTimestamp, maxTimestamp}); uint64(got) != want { t.Fatalf("unexpected metric name count: %d, want %d", got, want) } + + if got := gotMetrics.RowsAddedTotal; got != want { + t.Fatalf("unexpected Metrics.RowsAddedTotal: got %d, want %d", got, want) + } } maxHourlySeries := 1