diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index e1a72bc5c..b7fe762e2 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -153,9 +153,9 @@ func AddRows(mrs []storage.MetricRow) error { } resetResponseCacheIfNeeded(mrs) WG.Add(1) - err := Storage.AddRows(mrs, uint8(*precisionBits)) + Storage.AddRows(mrs, uint8(*precisionBits)) WG.Done() - return err + return nil } var errReadOnly = errors.New("the storage is in read-only mode; check -storage.minFreeDiskSpaceBytes command-line flag value") diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index b939571fb..6039b275d 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -67,6 +67,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix input cursor position reset in modal settings. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6530). * BUGFIX: [vmbackupmanager](https://docs.victoriametrics.com/vmbackupmanager/): fix `vm_backup_last_run_failed` metric not being properly initialized during startup. Previously, it could imply an error even if the backup have been completed successfully. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6550) for the details. * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly calculate [histogram_quantile](https://docs.victoriametrics.com/MetricsQL.html#histogram_quantile) over Prometheus buckets with inconsistent values. See [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4580#issuecomment-2186659102) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6547). Updates [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2819). +* BUGFIX: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): Fix inconsistent error handling in storage. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6583) for details. * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent/): fix panic when using multiple topics with the same name when [ingesting metrics from Kafka](https://docs.victoriametrics.com/vmagent/#kafka-integration). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6636) for the details. ## [v1.102.0-rc2](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.102.0-rc2) diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index 127365c3a..7d20ca6b2 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -1467,9 +1467,7 @@ func TestIndexDBRepopulateAfterRotation(t *testing.T) { timeMin := currentDayTimestamp - 24*3600*1000 timeMax := currentDayTimestamp + 24*3600*1000 mrs := testGenerateMetricRows(r, metricRowsN, timeMin, timeMax) - if err := s.AddRows(mrs, defaultPrecisionBits); err != nil { - t.Fatalf("unexpected error when adding mrs: %s", err) - } + s.AddRows(mrs, defaultPrecisionBits) s.DebugFlush() // verify the storage contains rows. @@ -1521,9 +1519,7 @@ func TestIndexDBRepopulateAfterRotation(t *testing.T) { } // Re-insert rows again and verify that all the entries belong to new generation - if err := s.AddRows(mrs, defaultPrecisionBits); err != nil { - t.Fatalf("unexpected error when adding mrs: %s", err) - } + s.AddRows(mrs, defaultPrecisionBits) s.DebugFlush() for _, mr := range mrs { diff --git a/lib/storage/search_test.go b/lib/storage/search_test.go index 69bedac05..b3573821b 100644 --- a/lib/storage/search_test.go +++ b/lib/storage/search_test.go @@ -104,15 +104,11 @@ func TestSearch(t *testing.T) { blockRowsCount++ if blockRowsCount == rowsPerBlock { - if err := st.AddRows(mrs[i-blockRowsCount+1:i+1], defaultPrecisionBits); err != nil { - t.Fatalf("cannot add rows %d-%d: %s", i-blockRowsCount+1, i+1, err) - } + st.AddRows(mrs[i-blockRowsCount+1:i+1], defaultPrecisionBits) blockRowsCount = 0 } } - if err := st.AddRows(mrs[rowsCount-blockRowsCount:], defaultPrecisionBits); err != nil { - t.Fatalf("cannot add rows %v-%v: %s", rowsCount-blockRowsCount, rowsCount, err) - } + st.AddRows(mrs[rowsCount-blockRowsCount:], defaultPrecisionBits) endTimestamp := mrs[len(mrs)-1].Timestamp // Re-open the storage in order to flush all the pending cached data. diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 3e22bb26d..2995326b1 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -1612,13 +1612,12 @@ var rowsAddedTotal atomic.Uint64 // // The caller should limit the number of concurrent AddRows calls to the number // of available CPU cores in order to limit memory usage. -func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error { +func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) { if len(mrs) == 0 { - return nil + return } // Add rows to the storage in blocks with limited size in order to reduce memory usage. - var firstErr error ic := getMetricRowsInsertCtx() maxBlockLen := len(ic.rrs) for len(mrs) > 0 { @@ -1629,17 +1628,10 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error { } else { mrs = nil } - if err := s.add(ic.rrs, ic.tmpMrs, mrsBlock, precisionBits); err != nil { - if firstErr == nil { - firstErr = err - } - continue - } + s.add(ic.rrs, ic.tmpMrs, mrsBlock, precisionBits) rowsAddedTotal.Add(uint64(len(mrsBlock))) } putMetricRowsInsertCtx(ic) - - return firstErr } type metricRowsInsertCtx struct { @@ -1778,7 +1770,7 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) { } } -func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, precisionBits uint8) error { +func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, precisionBits uint8) { idb := s.idb() generation := idb.generation is := idb.getIndexSearch(noDeadline) @@ -1802,7 +1794,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci var genTSID generationTSID - // Return only the first error, since it has no sense in returning all errors. + // Log only the first error, since it has no sense in logging all errors. var firstWarn error j := 0 @@ -1968,23 +1960,20 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci if err := s.prefillNextIndexDB(rows, dstMrs); err != nil { if firstWarn == nil { - firstWarn = err + firstWarn = fmt.Errorf("cannot prefill next indexdb: %w", err) } } + if err := s.updatePerDateData(rows, dstMrs); err != nil { + if firstWarn == nil { + firstWarn = fmt.Errorf("cannot not update per-day index: %w", err) + } + } + if firstWarn != nil { storageAddRowsLogger.Warnf("warn occurred during rows addition: %s", firstWarn) } - err := s.updatePerDateData(rows, dstMrs) - if err != nil { - err = fmt.Errorf("cannot update per-date data: %w", err) - } else { - s.tb.MustAddRows(rows) - } - if err != nil { - return fmt.Errorf("error occurred during rows addition: %w", err) - } - return nil + s.tb.MustAddRows(rows) } var storageAddRowsLogger = logger.WithThrottler("storageAddRows", 5*time.Second) diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index 7ce8fa30d..a789a2db1 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -2,12 +2,12 @@ package storage import ( "fmt" + "math" "math/rand" "os" "path/filepath" "reflect" "sort" - "strings" "sync" "testing" "testing/quick" @@ -15,6 +15,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" ) @@ -570,12 +571,7 @@ func testStorageRandTimestamps(s *Storage) error { } mrs = append(mrs, mr) } - if err := s.AddRows(mrs, defaultPrecisionBits); err != nil { - errStr := err.Error() - if !strings.Contains(errStr, "too big timestamp") && !strings.Contains(errStr, "too small timestamp") { - return fmt.Errorf("unexpected error when adding mrs: %w", err) - } - } + s.AddRows(mrs, defaultPrecisionBits) } // Verify the storage contains rows. @@ -691,9 +687,7 @@ func testStorageDeleteSeries(s *Storage, workerNum int) error { } mrs = append(mrs, mr) } - if err := s.AddRows(mrs, defaultPrecisionBits); err != nil { - return fmt.Errorf("unexpected error when adding mrs: %w", err) - } + s.AddRows(mrs, defaultPrecisionBits) } s.DebugFlush() @@ -1031,9 +1025,7 @@ func testStorageAddRows(rng *rand.Rand, s *Storage) error { minTimestamp := maxTimestamp - s.retentionMsecs + 3600*1000 for i := 0; i < addsCount; i++ { mrs := testGenerateMetricRows(rng, rowsPerAdd, minTimestamp, maxTimestamp) - if err := s.AddRows(mrs, defaultPrecisionBits); err != nil { - return fmt.Errorf("unexpected error when adding mrs: %w", err) - } + s.AddRows(mrs, defaultPrecisionBits) } // Verify the storage contains rows. @@ -1172,9 +1164,7 @@ func testStorageAddMetrics(s *Storage, workerNum int) error { Timestamp: timestamp, Value: value, } - if err := s.AddRows([]MetricRow{mr}, defaultPrecisionBits); err != nil { - return fmt.Errorf("unexpected error when adding mrs: %w", err) - } + s.AddRows([]MetricRow{mr}, defaultPrecisionBits) } // Verify the storage contains rows. @@ -1198,9 +1188,7 @@ func TestStorageDeleteStaleSnapshots(t *testing.T) { minTimestamp := maxTimestamp - s.retentionMsecs for i := 0; i < addsCount; i++ { mrs := testGenerateMetricRows(rng, rowsPerAdd, minTimestamp, maxTimestamp) - if err := s.AddRows(mrs, defaultPrecisionBits); err != nil { - t.Fatalf("unexpected error when adding mrs: %s", err) - } + s.AddRows(mrs, defaultPrecisionBits) } // Try creating a snapshot from the storage. snapshotName, err := s.CreateSnapshot() @@ -1268,9 +1256,7 @@ func TestStorageSeriesAreNotCreatedOnStaleMarkers(t *testing.T) { rng := rand.New(rand.NewSource(1)) mrs := testGenerateMetricRows(rng, 20, tr.MinTimestamp, tr.MaxTimestamp) // populate storage with some rows - if err := s.AddRows(mrs[:10], defaultPrecisionBits); err != nil { - t.Fatal("error when adding mrs: %w", err) - } + s.AddRows(mrs[:10], defaultPrecisionBits) s.DebugFlush() // verify ingested rows are searchable @@ -1289,9 +1275,7 @@ func TestStorageSeriesAreNotCreatedOnStaleMarkers(t *testing.T) { for i := 0; i < len(mrs); i = i + 2 { mrs[i].Value = decimal.StaleNaN } - if err := s.AddRows(mrs, defaultPrecisionBits); err != nil { - t.Fatal("error when adding mrs: %w", err) - } + s.AddRows(mrs, defaultPrecisionBits) s.DebugFlush() // verify that rows marked as stale aren't searchable @@ -1302,3 +1286,166 @@ func TestStorageSeriesAreNotCreatedOnStaleMarkers(t *testing.T) { t.Fatalf("cannot remove %q: %s", path, err) } } + +// testRemoveAll removes all storage data produced by a test if the test hasn't +// failed. For this to work, the storage must use t.Name() as the base dir in +// its data path. +// +// In case of failure, the data is kept for further debugging. +func testRemoveAll(t *testing.T) { + defer func() { + if !t.Failed() { + fs.MustRemoveAll(t.Name()) + } + }() +} + +func TestStorageRowsNotAdded(t *testing.T) { + defer testRemoveAll(t) + + type options struct { + name string + retention time.Duration + mrs []MetricRow + tr TimeRange + } + f := func(opts *options) { + t.Helper() + + var gotMetrics Metrics + path := fmt.Sprintf("%s/%s", t.Name(), opts.name) + s := MustOpenStorage(path, opts.retention, 0, 0) + defer s.MustClose() + s.AddRows(opts.mrs, defaultPrecisionBits) + s.DebugFlush() + s.UpdateMetrics(&gotMetrics) + + got := testCountAllMetricNames(s, opts.tr) + if got != 0 { + t.Fatalf("unexpected metric name count: got %d, want 0", got) + } + } + + const numRows = 1000 + var ( + rng = rand.New(rand.NewSource(1)) + retention time.Duration + minTimestamp int64 + maxTimestamp int64 + mrs []MetricRow + ) + + minTimestamp = -1000 + maxTimestamp = -1 + f(&options{ + name: "NegativeTimestamps", + retention: retentionMax, + mrs: testGenerateMetricRows(rng, numRows, minTimestamp, maxTimestamp), + tr: TimeRange{minTimestamp, maxTimestamp}, + }) + + retention = 48 * time.Hour + minTimestamp = time.Now().Add(-retention - time.Hour).UnixMilli() + maxTimestamp = minTimestamp + 1000 + f(&options{ + name: "TooSmallTimestamps", + retention: retention, + mrs: testGenerateMetricRows(rng, numRows, minTimestamp, maxTimestamp), + tr: TimeRange{minTimestamp, maxTimestamp}, + }) + + retention = 48 * time.Hour + minTimestamp = time.Now().Add(7 * 24 * time.Hour).UnixMilli() + maxTimestamp = minTimestamp + 1000 + f(&options{ + name: "TooBigTimestamps", + retention: retention, + mrs: testGenerateMetricRows(rng, numRows, minTimestamp, maxTimestamp), + tr: TimeRange{minTimestamp, maxTimestamp}, + }) + + minTimestamp = time.Now().UnixMilli() + maxTimestamp = minTimestamp + 1000 + mrs = testGenerateMetricRows(rng, numRows, minTimestamp, maxTimestamp) + for i := range numRows { + mrs[i].Value = math.NaN() + } + f(&options{ + name: "NaN", + mrs: mrs, + tr: TimeRange{minTimestamp, maxTimestamp}, + }) + + minTimestamp = time.Now().UnixMilli() + maxTimestamp = minTimestamp + 1000 + mrs = testGenerateMetricRows(rng, numRows, minTimestamp, maxTimestamp) + for i := range numRows { + mrs[i].Value = decimal.StaleNaN + } + f(&options{ + name: "StaleNaN", + mrs: mrs, + tr: TimeRange{minTimestamp, maxTimestamp}, + }) + + minTimestamp = time.Now().UnixMilli() + maxTimestamp = minTimestamp + 1000 + mrs = testGenerateMetricRows(rng, numRows, minTimestamp, maxTimestamp) + for i := range numRows { + mrs[i].MetricNameRaw = []byte("garbage") + } + f(&options{ + name: "InvalidMetricNameRaw", + mrs: mrs, + tr: TimeRange{minTimestamp, maxTimestamp}, + }) +} + +func TestStorageRowsNotAdded_SeriesLimitExceeded(t *testing.T) { + defer testRemoveAll(t) + + f := func(name string, maxHourlySeries int, maxDailySeries int) { + t.Helper() + + rng := rand.New(rand.NewSource(1)) + numRows := uint64(1000) + minTimestamp := time.Now().UnixMilli() + maxTimestamp := minTimestamp + 1000 + mrs := testGenerateMetricRows(rng, numRows, minTimestamp, maxTimestamp) + + var gotMetrics Metrics + path := fmt.Sprintf("%s/%s", t.Name(), name) + s := MustOpenStorage(path, 0, maxHourlySeries, maxDailySeries) + defer s.MustClose() + s.AddRows(mrs, defaultPrecisionBits) + s.DebugFlush() + s.UpdateMetrics(&gotMetrics) + + want := numRows - (gotMetrics.HourlySeriesLimitRowsDropped + gotMetrics.DailySeriesLimitRowsDropped) + if got := testCountAllMetricNames(s, TimeRange{minTimestamp, maxTimestamp}); uint64(got) != want { + t.Fatalf("unexpected metric name count: %d, want %d", got, want) + } + } + + maxHourlySeries := 1 + maxDailySeries := 0 // No limit + f("HourlyLimitExceeded", maxHourlySeries, maxDailySeries) + + maxHourlySeries = 0 // No limit + maxDailySeries = 1 + f("DailyLimitExceeded", maxHourlySeries, maxDailySeries) +} + +// testCountAllMetricNames is a test helper function that counts the names of +// all time series within the given time range. +func testCountAllMetricNames(s *Storage, tr TimeRange) int { + tfsAll := NewTagFilters() + if err := tfsAll.Add([]byte("__name__"), []byte(".*"), false, true); err != nil { + panic(fmt.Sprintf("unexpected error in TagFilters.Add: %v", err)) + } + names, err := s.SearchMetricNames(nil, []*TagFilters{tfsAll}, tr, 1e9, noDeadline) + if err != nil { + panic(fmt.Sprintf("SeachMetricNames() failed unexpectedly: %v", err)) + } + return len(names) +} diff --git a/lib/storage/storage_timing_test.go b/lib/storage/storage_timing_test.go index 80666fc6d..521f29e1f 100644 --- a/lib/storage/storage_timing_test.go +++ b/lib/storage/storage_timing_test.go @@ -46,9 +46,7 @@ func benchmarkStorageAddRows(b *testing.B, rowsPerBatch int) { mr.Timestamp = int64(offset + i) mr.Value = float64(offset + i) } - if err := s.AddRows(mrs, defaultPrecisionBits); err != nil { - panic(fmt.Errorf("cannot add rows to storage: %w", err)) - } + s.AddRows(mrs, defaultPrecisionBits) } }) b.StopTimer()