lib/storage: adds metrics that count records that failed to insert

### Describe Your Changes

Add storage metrics that count records that failed to insert:

- `RowsReceivedTotal`: the number of records that have been received by
the storage from the clients
- `RowsAddedTotal`: the number of records that have actually been
persisted. This value must be equal to `RowsReceivedTotal` if all the
records have been valid ones. But it will be smaller otherwise. The
values of the metrics below should provide the insight of why some
records hasn't been added
-   `NaNValueRows`: the number of records whose value was `NaN`
- `StaleNaNValueRows`: the number of records whose value was `Stale NaN`
- `InvalidRawMetricNames`: the number of records whose raw metric name
has failed to unmarshal.

The following metrics existed before this PR and are listed here for
completeness:

- `TooSmallTimestampRows`: the number of records whose timestamp is
negative or is older than retention period
- `TooBigTimestampRows`: the number of records whose timestamp is too
far in the future.
- `HourlySeriesLimitRowsDropped`: the number of records that have not
been added because the hourly series limit has been exceeded.
- `DailySeriesLimitRowsDropped`: the number of records that have not
been added because the daily series limit has been exceeded.

---
Signed-off-by: Artem Fetishev <wwctrsrx@gmail.com>
This commit is contained in:
Artem Fetishev 2024-09-06 18:57:21 +03:00 committed by GitHub
parent 9f7ee4c0bb
commit a5424e95b3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 88 additions and 8 deletions

View file

@ -521,12 +521,21 @@ func writeStorageMetrics(w io.Writer, strg *storage.Storage) {
metrics.WriteGaugeUint64(w, `vm_data_size_bytes{type="indexdb/inmemory"}`, idbm.InmemorySizeBytes)
metrics.WriteGaugeUint64(w, `vm_data_size_bytes{type="indexdb/file"}`, idbm.FileSizeBytes)
metrics.WriteCounterUint64(w, `vm_rows_received_by_storage_total`, m.RowsReceivedTotal)
metrics.WriteCounterUint64(w, `vm_rows_added_to_storage_total`, m.RowsAddedTotal)
metrics.WriteCounterUint64(w, `vm_deduplicated_samples_total{type="merge"}`, m.DedupsDuringMerge)
metrics.WriteGaugeUint64(w, `vm_snapshots`, m.SnapshotsCount)
metrics.WriteCounterUint64(w, `vm_rows_ignored_total{reason="nan_value"}`, m.NaNValueRows)
metrics.WriteCounterUint64(w, `vm_rows_ignored_total{reason="big_timestamp"}`, m.TooBigTimestampRows)
metrics.WriteCounterUint64(w, `vm_rows_ignored_total{reason="small_timestamp"}`, m.TooSmallTimestampRows)
metrics.WriteCounterUint64(w, `vm_rows_ignored_total{reason="invalid_raw_metric_name"}`, m.InvalidRawMetricNames)
if *maxHourlySeries > 0 {
metrics.WriteCounterUint64(w, `vm_rows_ignored_total{reason="hourly_limit_exceeded"}`, m.HourlySeriesLimitRowsDropped)
}
if *maxDailySeries > 0 {
metrics.WriteCounterUint64(w, `vm_rows_ignored_total{reason="daily_limit_exceeded"}`, m.DailySeriesLimitRowsDropped)
}
metrics.WriteCounterUint64(w, `vm_timeseries_repopulated_total`, m.TimeseriesRepopulated)
metrics.WriteCounterUint64(w, `vm_timeseries_precreated_total`, m.TimeseriesPreCreated)

View file

@ -40,10 +40,13 @@ const (
// Storage represents TSDB storage.
type Storage struct {
rowsAddedTotal atomic.Uint64
rowsReceivedTotal atomic.Uint64
rowsAddedTotal atomic.Uint64
naNValueRows atomic.Uint64
tooSmallTimestampRows atomic.Uint64
tooBigTimestampRows atomic.Uint64
invalidRawMetricNames atomic.Uint64
timeseriesRepopulated atomic.Uint64
timeseriesPreCreated atomic.Uint64
@ -481,12 +484,15 @@ func (s *Storage) idb() *indexDB {
// Metrics contains essential metrics for the Storage.
type Metrics struct {
RowsReceivedTotal uint64
RowsAddedTotal uint64
DedupsDuringMerge uint64
SnapshotsCount uint64
NaNValueRows uint64
TooSmallTimestampRows uint64
TooBigTimestampRows uint64
InvalidRawMetricNames uint64
TimeseriesRepopulated uint64
TimeseriesPreCreated uint64
@ -554,12 +560,15 @@ func (m *Metrics) Reset() {
// UpdateMetrics updates m with metrics from s.
func (s *Storage) UpdateMetrics(m *Metrics) {
m.RowsReceivedTotal += s.rowsReceivedTotal.Load()
m.RowsAddedTotal += s.rowsAddedTotal.Load()
m.DedupsDuringMerge = dedupsDuringMerge.Load()
m.SnapshotsCount += uint64(s.mustGetSnapshotsCount())
m.NaNValueRows += s.naNValueRows.Load()
m.TooSmallTimestampRows += s.tooSmallTimestampRows.Load()
m.TooBigTimestampRows += s.tooBigTimestampRows.Load()
m.InvalidRawMetricNames += s.invalidRawMetricNames.Load()
m.TimeseriesRepopulated += s.timeseriesRepopulated.Load()
m.TimeseriesPreCreated += s.timeseriesPreCreated.Load()
@ -1628,8 +1637,13 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) {
} else {
mrs = nil
}
s.add(ic.rrs, ic.tmpMrs, mrsBlock, precisionBits)
s.rowsAddedTotal.Add(uint64(len(mrsBlock)))
rowsAdded := s.add(ic.rrs, ic.tmpMrs, mrsBlock, precisionBits)
// If the number of received rows is greater than the number of added
// rows, then some rows have failed to add. Check logs for the first
// error.
s.rowsAddedTotal.Add(uint64(rowsAdded))
s.rowsReceivedTotal.Add(uint64(len(mrsBlock)))
}
putMetricRowsInsertCtx(ic)
}
@ -1701,6 +1715,7 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
if firstWarn == nil {
firstWarn = fmt.Errorf("cannot umarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err)
}
s.invalidRawMetricNames.Add(1)
continue
}
mn.sortTags()
@ -1735,6 +1750,7 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
if firstWarn == nil {
firstWarn = fmt.Errorf("cannot umarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err)
}
s.invalidRawMetricNames.Add(1)
continue
}
mn.sortTags()
@ -1785,7 +1801,7 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
}
}
func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, precisionBits uint8) {
func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, precisionBits uint8) int {
idb := s.idb()
generation := idb.generation
is := idb.getIndexSearch(noDeadline)
@ -1819,6 +1835,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
if !decimal.IsStaleNaN(mr.Value) {
// Skip NaNs other than Prometheus staleness marker, since the underlying encoding
// doesn't know how to work with them.
s.naNValueRows.Add(1)
continue
}
}
@ -1881,6 +1898,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
firstWarn = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err)
}
j--
s.invalidRawMetricNames.Add(1)
continue
}
mn.sortTags()
@ -1905,6 +1923,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
firstWarn = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err)
}
j--
s.invalidRawMetricNames.Add(1)
continue
}
mn.sortTags()
@ -1980,6 +1999,8 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
}
s.tb.MustAddRows(rows)
return len(rows)
}
var storageAddRowsLogger = logger.WithThrottler("storageAddRows", 5*time.Second)
@ -2100,6 +2121,7 @@ func (s *Storage) prefillNextIndexDB(rows []rawRow, mrs []*MetricRow) error {
if firstError == nil {
firstError = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", metricNameRaw, err)
}
s.invalidRawMetricNames.Add(1)
continue
}
mn.sortTags()
@ -2245,6 +2267,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
if firstError == nil {
firstError = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", dmid.mr.MetricNameRaw, err)
}
s.invalidRawMetricNames.Add(1)
continue
}
mn.sortTags()

View file

@ -1248,10 +1248,11 @@ func TestStorageRowsNotAdded(t *testing.T) {
defer testRemoveAll(t)
type options struct {
name string
retention time.Duration
mrs []MetricRow
tr TimeRange
name string
retention time.Duration
mrs []MetricRow
tr TimeRange
wantMetrics *Metrics
}
f := func(opts *options) {
t.Helper()
@ -1268,6 +1269,19 @@ func TestStorageRowsNotAdded(t *testing.T) {
if got != 0 {
t.Fatalf("unexpected metric name count: got %d, want 0", got)
}
if got, want := gotMetrics.RowsReceivedTotal, opts.wantMetrics.RowsReceivedTotal; got != want {
t.Fatalf("unexpected Metrics.RowsReceivedTotal: got %d, want %d", got, want)
}
if got, want := gotMetrics.RowsAddedTotal, opts.wantMetrics.RowsAddedTotal; got != want {
t.Fatalf("unexpected Metrics.RowsAddedTotal: got %d, want %d", got, want)
}
if got, want := gotMetrics.NaNValueRows, opts.wantMetrics.NaNValueRows; got != want {
t.Fatalf("unexpected Metrics.NaNValueRows: got %d, want %d", got, want)
}
if got, want := gotMetrics.InvalidRawMetricNames, opts.wantMetrics.InvalidRawMetricNames; got != want {
t.Fatalf("unexpected Metrics.InvalidRawMetricNames: got %d, want %d", got, want)
}
}
const numRows = 1000
@ -1286,6 +1300,10 @@ func TestStorageRowsNotAdded(t *testing.T) {
retention: retentionMax,
mrs: testGenerateMetricRows(rng, numRows, minTimestamp, maxTimestamp),
tr: TimeRange{minTimestamp, maxTimestamp},
wantMetrics: &Metrics{
RowsReceivedTotal: numRows,
TooSmallTimestampRows: numRows,
},
})
retention = 48 * time.Hour
@ -1296,6 +1314,10 @@ func TestStorageRowsNotAdded(t *testing.T) {
retention: retention,
mrs: testGenerateMetricRows(rng, numRows, minTimestamp, maxTimestamp),
tr: TimeRange{minTimestamp, maxTimestamp},
wantMetrics: &Metrics{
RowsReceivedTotal: numRows,
TooSmallTimestampRows: numRows,
},
})
retention = 48 * time.Hour
@ -1306,6 +1328,10 @@ func TestStorageRowsNotAdded(t *testing.T) {
retention: retention,
mrs: testGenerateMetricRows(rng, numRows, minTimestamp, maxTimestamp),
tr: TimeRange{minTimestamp, maxTimestamp},
wantMetrics: &Metrics{
RowsReceivedTotal: numRows,
TooBigTimestampRows: numRows,
},
})
minTimestamp = time.Now().UnixMilli()
@ -1318,6 +1344,10 @@ func TestStorageRowsNotAdded(t *testing.T) {
name: "NaN",
mrs: mrs,
tr: TimeRange{minTimestamp, maxTimestamp},
wantMetrics: &Metrics{
RowsReceivedTotal: numRows,
NaNValueRows: numRows,
},
})
minTimestamp = time.Now().UnixMilli()
@ -1330,6 +1360,10 @@ func TestStorageRowsNotAdded(t *testing.T) {
name: "InvalidMetricNameRaw",
mrs: mrs,
tr: TimeRange{minTimestamp, maxTimestamp},
wantMetrics: &Metrics{
RowsReceivedTotal: numRows,
InvalidRawMetricNames: numRows,
},
})
}
@ -1353,10 +1387,24 @@ func TestStorageRowsNotAdded_SeriesLimitExceeded(t *testing.T) {
s.DebugFlush()
s.UpdateMetrics(&gotMetrics)
if got, want := gotMetrics.RowsReceivedTotal, numRows; got != want {
t.Fatalf("unexpected Metrics.RowsReceivedTotal: got %d, want %d", got, want)
}
if got := gotMetrics.HourlySeriesLimitRowsDropped; maxHourlySeries > 0 && got <= 0 {
t.Fatalf("unexpected Metrics.HourlySeriesLimitRowsDropped: got %d, want > 0", got)
}
if got := gotMetrics.DailySeriesLimitRowsDropped; maxDailySeries > 0 && got <= 0 {
t.Fatalf("unexpected Metrics.DailySeriesLimitRowsDropped: got %d, want > 0", got)
}
want := numRows - (gotMetrics.HourlySeriesLimitRowsDropped + gotMetrics.DailySeriesLimitRowsDropped)
if got := testCountAllMetricNames(s, TimeRange{minTimestamp, maxTimestamp}); uint64(got) != want {
t.Fatalf("unexpected metric name count: %d, want %d", got, want)
}
if got := gotMetrics.RowsAddedTotal; got != want {
t.Fatalf("unexpected Metrics.RowsAddedTotal: got %d, want %d", got, want)
}
}
maxHourlySeries := 1