diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index d496310f9..a4d7c4f97 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -521,12 +521,21 @@ func writeStorageMetrics(w io.Writer, strg *storage.Storage) { metrics.WriteGaugeUint64(w, `vm_data_size_bytes{type="indexdb/inmemory"}`, idbm.InmemorySizeBytes) metrics.WriteGaugeUint64(w, `vm_data_size_bytes{type="indexdb/file"}`, idbm.FileSizeBytes) + metrics.WriteCounterUint64(w, `vm_rows_received_by_storage_total`, m.RowsReceivedTotal) metrics.WriteCounterUint64(w, `vm_rows_added_to_storage_total`, m.RowsAddedTotal) metrics.WriteCounterUint64(w, `vm_deduplicated_samples_total{type="merge"}`, m.DedupsDuringMerge) metrics.WriteGaugeUint64(w, `vm_snapshots`, m.SnapshotsCount) + metrics.WriteCounterUint64(w, `vm_rows_ignored_total{reason="nan_value"}`, m.NaNValueRows) metrics.WriteCounterUint64(w, `vm_rows_ignored_total{reason="big_timestamp"}`, m.TooBigTimestampRows) metrics.WriteCounterUint64(w, `vm_rows_ignored_total{reason="small_timestamp"}`, m.TooSmallTimestampRows) + metrics.WriteCounterUint64(w, `vm_rows_ignored_total{reason="invalid_raw_metric_name"}`, m.InvalidRawMetricNames) + if *maxHourlySeries > 0 { + metrics.WriteCounterUint64(w, `vm_rows_ignored_total{reason="hourly_limit_exceeded"}`, m.HourlySeriesLimitRowsDropped) + } + if *maxDailySeries > 0 { + metrics.WriteCounterUint64(w, `vm_rows_ignored_total{reason="daily_limit_exceeded"}`, m.DailySeriesLimitRowsDropped) + } metrics.WriteCounterUint64(w, `vm_timeseries_repopulated_total`, m.TimeseriesRepopulated) metrics.WriteCounterUint64(w, `vm_timeseries_precreated_total`, m.TimeseriesPreCreated) diff --git a/lib/storage/storage.go b/lib/storage/storage.go index ea6cdbc52..2125ca247 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -40,10 +40,13 @@ const ( // Storage represents TSDB storage. type Storage struct { - rowsAddedTotal atomic.Uint64 + rowsReceivedTotal atomic.Uint64 + rowsAddedTotal atomic.Uint64 + naNValueRows atomic.Uint64 tooSmallTimestampRows atomic.Uint64 tooBigTimestampRows atomic.Uint64 + invalidRawMetricNames atomic.Uint64 timeseriesRepopulated atomic.Uint64 timeseriesPreCreated atomic.Uint64 @@ -481,12 +484,15 @@ func (s *Storage) idb() *indexDB { // Metrics contains essential metrics for the Storage. type Metrics struct { + RowsReceivedTotal uint64 RowsAddedTotal uint64 DedupsDuringMerge uint64 SnapshotsCount uint64 + NaNValueRows uint64 TooSmallTimestampRows uint64 TooBigTimestampRows uint64 + InvalidRawMetricNames uint64 TimeseriesRepopulated uint64 TimeseriesPreCreated uint64 @@ -554,12 +560,15 @@ func (m *Metrics) Reset() { // UpdateMetrics updates m with metrics from s. func (s *Storage) UpdateMetrics(m *Metrics) { + m.RowsReceivedTotal += s.rowsReceivedTotal.Load() m.RowsAddedTotal += s.rowsAddedTotal.Load() m.DedupsDuringMerge = dedupsDuringMerge.Load() m.SnapshotsCount += uint64(s.mustGetSnapshotsCount()) + m.NaNValueRows += s.naNValueRows.Load() m.TooSmallTimestampRows += s.tooSmallTimestampRows.Load() m.TooBigTimestampRows += s.tooBigTimestampRows.Load() + m.InvalidRawMetricNames += s.invalidRawMetricNames.Load() m.TimeseriesRepopulated += s.timeseriesRepopulated.Load() m.TimeseriesPreCreated += s.timeseriesPreCreated.Load() @@ -1628,8 +1637,13 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) { } else { mrs = nil } - s.add(ic.rrs, ic.tmpMrs, mrsBlock, precisionBits) - s.rowsAddedTotal.Add(uint64(len(mrsBlock))) + rowsAdded := s.add(ic.rrs, ic.tmpMrs, mrsBlock, precisionBits) + + // If the number of received rows is greater than the number of added + // rows, then some rows have failed to add. Check logs for the first + // error. + s.rowsAddedTotal.Add(uint64(rowsAdded)) + s.rowsReceivedTotal.Add(uint64(len(mrsBlock))) } putMetricRowsInsertCtx(ic) } @@ -1701,6 +1715,7 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) { if firstWarn == nil { firstWarn = fmt.Errorf("cannot umarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err) } + s.invalidRawMetricNames.Add(1) continue } mn.sortTags() @@ -1735,6 +1750,7 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) { if firstWarn == nil { firstWarn = fmt.Errorf("cannot umarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err) } + s.invalidRawMetricNames.Add(1) continue } mn.sortTags() @@ -1785,7 +1801,7 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) { } } -func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, precisionBits uint8) { +func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, precisionBits uint8) int { idb := s.idb() generation := idb.generation is := idb.getIndexSearch(noDeadline) @@ -1819,6 +1835,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci if !decimal.IsStaleNaN(mr.Value) { // Skip NaNs other than Prometheus staleness marker, since the underlying encoding // doesn't know how to work with them. + s.naNValueRows.Add(1) continue } } @@ -1881,6 +1898,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci firstWarn = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err) } j-- + s.invalidRawMetricNames.Add(1) continue } mn.sortTags() @@ -1905,6 +1923,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci firstWarn = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err) } j-- + s.invalidRawMetricNames.Add(1) continue } mn.sortTags() @@ -1980,6 +1999,8 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci } s.tb.MustAddRows(rows) + + return len(rows) } var storageAddRowsLogger = logger.WithThrottler("storageAddRows", 5*time.Second) @@ -2100,6 +2121,7 @@ func (s *Storage) prefillNextIndexDB(rows []rawRow, mrs []*MetricRow) error { if firstError == nil { firstError = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", metricNameRaw, err) } + s.invalidRawMetricNames.Add(1) continue } mn.sortTags() @@ -2245,6 +2267,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error { if firstError == nil { firstError = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", dmid.mr.MetricNameRaw, err) } + s.invalidRawMetricNames.Add(1) continue } mn.sortTags() diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index 7b8525227..8ccccc4ee 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -1248,10 +1248,11 @@ func TestStorageRowsNotAdded(t *testing.T) { defer testRemoveAll(t) type options struct { - name string - retention time.Duration - mrs []MetricRow - tr TimeRange + name string + retention time.Duration + mrs []MetricRow + tr TimeRange + wantMetrics *Metrics } f := func(opts *options) { t.Helper() @@ -1268,6 +1269,19 @@ func TestStorageRowsNotAdded(t *testing.T) { if got != 0 { t.Fatalf("unexpected metric name count: got %d, want 0", got) } + + if got, want := gotMetrics.RowsReceivedTotal, opts.wantMetrics.RowsReceivedTotal; got != want { + t.Fatalf("unexpected Metrics.RowsReceivedTotal: got %d, want %d", got, want) + } + if got, want := gotMetrics.RowsAddedTotal, opts.wantMetrics.RowsAddedTotal; got != want { + t.Fatalf("unexpected Metrics.RowsAddedTotal: got %d, want %d", got, want) + } + if got, want := gotMetrics.NaNValueRows, opts.wantMetrics.NaNValueRows; got != want { + t.Fatalf("unexpected Metrics.NaNValueRows: got %d, want %d", got, want) + } + if got, want := gotMetrics.InvalidRawMetricNames, opts.wantMetrics.InvalidRawMetricNames; got != want { + t.Fatalf("unexpected Metrics.InvalidRawMetricNames: got %d, want %d", got, want) + } } const numRows = 1000 @@ -1286,6 +1300,10 @@ func TestStorageRowsNotAdded(t *testing.T) { retention: retentionMax, mrs: testGenerateMetricRows(rng, numRows, minTimestamp, maxTimestamp), tr: TimeRange{minTimestamp, maxTimestamp}, + wantMetrics: &Metrics{ + RowsReceivedTotal: numRows, + TooSmallTimestampRows: numRows, + }, }) retention = 48 * time.Hour @@ -1296,6 +1314,10 @@ func TestStorageRowsNotAdded(t *testing.T) { retention: retention, mrs: testGenerateMetricRows(rng, numRows, minTimestamp, maxTimestamp), tr: TimeRange{minTimestamp, maxTimestamp}, + wantMetrics: &Metrics{ + RowsReceivedTotal: numRows, + TooSmallTimestampRows: numRows, + }, }) retention = 48 * time.Hour @@ -1306,6 +1328,10 @@ func TestStorageRowsNotAdded(t *testing.T) { retention: retention, mrs: testGenerateMetricRows(rng, numRows, minTimestamp, maxTimestamp), tr: TimeRange{minTimestamp, maxTimestamp}, + wantMetrics: &Metrics{ + RowsReceivedTotal: numRows, + TooBigTimestampRows: numRows, + }, }) minTimestamp = time.Now().UnixMilli() @@ -1318,6 +1344,10 @@ func TestStorageRowsNotAdded(t *testing.T) { name: "NaN", mrs: mrs, tr: TimeRange{minTimestamp, maxTimestamp}, + wantMetrics: &Metrics{ + RowsReceivedTotal: numRows, + NaNValueRows: numRows, + }, }) minTimestamp = time.Now().UnixMilli() @@ -1330,6 +1360,10 @@ func TestStorageRowsNotAdded(t *testing.T) { name: "InvalidMetricNameRaw", mrs: mrs, tr: TimeRange{minTimestamp, maxTimestamp}, + wantMetrics: &Metrics{ + RowsReceivedTotal: numRows, + InvalidRawMetricNames: numRows, + }, }) } @@ -1353,10 +1387,24 @@ func TestStorageRowsNotAdded_SeriesLimitExceeded(t *testing.T) { s.DebugFlush() s.UpdateMetrics(&gotMetrics) + if got, want := gotMetrics.RowsReceivedTotal, numRows; got != want { + t.Fatalf("unexpected Metrics.RowsReceivedTotal: got %d, want %d", got, want) + } + if got := gotMetrics.HourlySeriesLimitRowsDropped; maxHourlySeries > 0 && got <= 0 { + t.Fatalf("unexpected Metrics.HourlySeriesLimitRowsDropped: got %d, want > 0", got) + } + if got := gotMetrics.DailySeriesLimitRowsDropped; maxDailySeries > 0 && got <= 0 { + t.Fatalf("unexpected Metrics.DailySeriesLimitRowsDropped: got %d, want > 0", got) + } + want := numRows - (gotMetrics.HourlySeriesLimitRowsDropped + gotMetrics.DailySeriesLimitRowsDropped) if got := testCountAllMetricNames(s, TimeRange{minTimestamp, maxTimestamp}); uint64(got) != want { t.Fatalf("unexpected metric name count: %d, want %d", got, want) } + + if got := gotMetrics.RowsAddedTotal; got != want { + t.Fatalf("unexpected Metrics.RowsAddedTotal: got %d, want %d", got, want) + } } maxHourlySeries := 1