mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
Fix inconsistent error handling in Storage.AddRows() (#6583)
### Describe Your Changes `Storage.AddRows()` returns an error only in one case: when `Storage.updatePerDateData()` fails to unmarshal a `metricNameRaw`. But the same error is treated as a warning when it happens inside `Storage.add()` or returned by `Storage.prefillNextIndexDB()`. This commit fixes this inconsistency by treating the error returned by `Storage.updatePerDateData()` as a warning as well. As a result `Storage.add()` does not need a return value anymore and so doesn't `Storage.AddRows()`. Additionally, this commit adds a unit test that checks all cases that result in a row not being added to the storage. --------- Signed-off-by: Artem Fetishev <wwctrsrx@gmail.com> Co-authored-by: Nikolay <nik@victoriametrics.com>
This commit is contained in:
parent
6f9f861f57
commit
bdc0e688e8
7 changed files with 193 additions and 66 deletions
|
@ -153,9 +153,9 @@ func AddRows(mrs []storage.MetricRow) error {
|
||||||
}
|
}
|
||||||
resetResponseCacheIfNeeded(mrs)
|
resetResponseCacheIfNeeded(mrs)
|
||||||
WG.Add(1)
|
WG.Add(1)
|
||||||
err := Storage.AddRows(mrs, uint8(*precisionBits))
|
Storage.AddRows(mrs, uint8(*precisionBits))
|
||||||
WG.Done()
|
WG.Done()
|
||||||
return err
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var errReadOnly = errors.New("the storage is in read-only mode; check -storage.minFreeDiskSpaceBytes command-line flag value")
|
var errReadOnly = errors.New("the storage is in read-only mode; check -storage.minFreeDiskSpaceBytes command-line flag value")
|
||||||
|
|
|
@ -67,6 +67,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
|
||||||
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix input cursor position reset in modal settings. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6530).
|
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix input cursor position reset in modal settings. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6530).
|
||||||
* BUGFIX: [vmbackupmanager](https://docs.victoriametrics.com/vmbackupmanager/): fix `vm_backup_last_run_failed` metric not being properly initialized during startup. Previously, it could imply an error even if the backup have been completed successfully. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6550) for the details.
|
* BUGFIX: [vmbackupmanager](https://docs.victoriametrics.com/vmbackupmanager/): fix `vm_backup_last_run_failed` metric not being properly initialized during startup. Previously, it could imply an error even if the backup have been completed successfully. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6550) for the details.
|
||||||
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly calculate [histogram_quantile](https://docs.victoriametrics.com/MetricsQL.html#histogram_quantile) over Prometheus buckets with inconsistent values. See [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4580#issuecomment-2186659102) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6547). Updates [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2819).
|
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly calculate [histogram_quantile](https://docs.victoriametrics.com/MetricsQL.html#histogram_quantile) over Prometheus buckets with inconsistent values. See [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4580#issuecomment-2186659102) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6547). Updates [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2819).
|
||||||
|
* BUGFIX: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): Fix inconsistent error handling in storage. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6583) for details.
|
||||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent/): fix panic when using multiple topics with the same name when [ingesting metrics from Kafka](https://docs.victoriametrics.com/vmagent/#kafka-integration). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6636) for the details.
|
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent/): fix panic when using multiple topics with the same name when [ingesting metrics from Kafka](https://docs.victoriametrics.com/vmagent/#kafka-integration). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6636) for the details.
|
||||||
|
|
||||||
## [v1.102.0-rc2](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.102.0-rc2)
|
## [v1.102.0-rc2](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.102.0-rc2)
|
||||||
|
|
|
@ -1467,9 +1467,7 @@ func TestIndexDBRepopulateAfterRotation(t *testing.T) {
|
||||||
timeMin := currentDayTimestamp - 24*3600*1000
|
timeMin := currentDayTimestamp - 24*3600*1000
|
||||||
timeMax := currentDayTimestamp + 24*3600*1000
|
timeMax := currentDayTimestamp + 24*3600*1000
|
||||||
mrs := testGenerateMetricRows(r, metricRowsN, timeMin, timeMax)
|
mrs := testGenerateMetricRows(r, metricRowsN, timeMin, timeMax)
|
||||||
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
|
s.AddRows(mrs, defaultPrecisionBits)
|
||||||
t.Fatalf("unexpected error when adding mrs: %s", err)
|
|
||||||
}
|
|
||||||
s.DebugFlush()
|
s.DebugFlush()
|
||||||
|
|
||||||
// verify the storage contains rows.
|
// verify the storage contains rows.
|
||||||
|
@ -1521,9 +1519,7 @@ func TestIndexDBRepopulateAfterRotation(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Re-insert rows again and verify that all the entries belong to new generation
|
// Re-insert rows again and verify that all the entries belong to new generation
|
||||||
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
|
s.AddRows(mrs, defaultPrecisionBits)
|
||||||
t.Fatalf("unexpected error when adding mrs: %s", err)
|
|
||||||
}
|
|
||||||
s.DebugFlush()
|
s.DebugFlush()
|
||||||
|
|
||||||
for _, mr := range mrs {
|
for _, mr := range mrs {
|
||||||
|
|
|
@ -104,15 +104,11 @@ func TestSearch(t *testing.T) {
|
||||||
|
|
||||||
blockRowsCount++
|
blockRowsCount++
|
||||||
if blockRowsCount == rowsPerBlock {
|
if blockRowsCount == rowsPerBlock {
|
||||||
if err := st.AddRows(mrs[i-blockRowsCount+1:i+1], defaultPrecisionBits); err != nil {
|
st.AddRows(mrs[i-blockRowsCount+1:i+1], defaultPrecisionBits)
|
||||||
t.Fatalf("cannot add rows %d-%d: %s", i-blockRowsCount+1, i+1, err)
|
|
||||||
}
|
|
||||||
blockRowsCount = 0
|
blockRowsCount = 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := st.AddRows(mrs[rowsCount-blockRowsCount:], defaultPrecisionBits); err != nil {
|
st.AddRows(mrs[rowsCount-blockRowsCount:], defaultPrecisionBits)
|
||||||
t.Fatalf("cannot add rows %v-%v: %s", rowsCount-blockRowsCount, rowsCount, err)
|
|
||||||
}
|
|
||||||
endTimestamp := mrs[len(mrs)-1].Timestamp
|
endTimestamp := mrs[len(mrs)-1].Timestamp
|
||||||
|
|
||||||
// Re-open the storage in order to flush all the pending cached data.
|
// Re-open the storage in order to flush all the pending cached data.
|
||||||
|
|
|
@ -1612,13 +1612,12 @@ var rowsAddedTotal atomic.Uint64
|
||||||
//
|
//
|
||||||
// The caller should limit the number of concurrent AddRows calls to the number
|
// The caller should limit the number of concurrent AddRows calls to the number
|
||||||
// of available CPU cores in order to limit memory usage.
|
// of available CPU cores in order to limit memory usage.
|
||||||
func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
|
func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) {
|
||||||
if len(mrs) == 0 {
|
if len(mrs) == 0 {
|
||||||
return nil
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add rows to the storage in blocks with limited size in order to reduce memory usage.
|
// Add rows to the storage in blocks with limited size in order to reduce memory usage.
|
||||||
var firstErr error
|
|
||||||
ic := getMetricRowsInsertCtx()
|
ic := getMetricRowsInsertCtx()
|
||||||
maxBlockLen := len(ic.rrs)
|
maxBlockLen := len(ic.rrs)
|
||||||
for len(mrs) > 0 {
|
for len(mrs) > 0 {
|
||||||
|
@ -1629,17 +1628,10 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
|
||||||
} else {
|
} else {
|
||||||
mrs = nil
|
mrs = nil
|
||||||
}
|
}
|
||||||
if err := s.add(ic.rrs, ic.tmpMrs, mrsBlock, precisionBits); err != nil {
|
s.add(ic.rrs, ic.tmpMrs, mrsBlock, precisionBits)
|
||||||
if firstErr == nil {
|
|
||||||
firstErr = err
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
rowsAddedTotal.Add(uint64(len(mrsBlock)))
|
rowsAddedTotal.Add(uint64(len(mrsBlock)))
|
||||||
}
|
}
|
||||||
putMetricRowsInsertCtx(ic)
|
putMetricRowsInsertCtx(ic)
|
||||||
|
|
||||||
return firstErr
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type metricRowsInsertCtx struct {
|
type metricRowsInsertCtx struct {
|
||||||
|
@ -1778,7 +1770,7 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, precisionBits uint8) error {
|
func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, precisionBits uint8) {
|
||||||
idb := s.idb()
|
idb := s.idb()
|
||||||
generation := idb.generation
|
generation := idb.generation
|
||||||
is := idb.getIndexSearch(noDeadline)
|
is := idb.getIndexSearch(noDeadline)
|
||||||
|
@ -1802,7 +1794,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
|
||||||
|
|
||||||
var genTSID generationTSID
|
var genTSID generationTSID
|
||||||
|
|
||||||
// Return only the first error, since it has no sense in returning all errors.
|
// Log only the first error, since it has no sense in logging all errors.
|
||||||
var firstWarn error
|
var firstWarn error
|
||||||
|
|
||||||
j := 0
|
j := 0
|
||||||
|
@ -1968,23 +1960,20 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
|
||||||
|
|
||||||
if err := s.prefillNextIndexDB(rows, dstMrs); err != nil {
|
if err := s.prefillNextIndexDB(rows, dstMrs); err != nil {
|
||||||
if firstWarn == nil {
|
if firstWarn == nil {
|
||||||
firstWarn = err
|
firstWarn = fmt.Errorf("cannot prefill next indexdb: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if err := s.updatePerDateData(rows, dstMrs); err != nil {
|
||||||
|
if firstWarn == nil {
|
||||||
|
firstWarn = fmt.Errorf("cannot not update per-day index: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if firstWarn != nil {
|
if firstWarn != nil {
|
||||||
storageAddRowsLogger.Warnf("warn occurred during rows addition: %s", firstWarn)
|
storageAddRowsLogger.Warnf("warn occurred during rows addition: %s", firstWarn)
|
||||||
}
|
}
|
||||||
|
|
||||||
err := s.updatePerDateData(rows, dstMrs)
|
|
||||||
if err != nil {
|
|
||||||
err = fmt.Errorf("cannot update per-date data: %w", err)
|
|
||||||
} else {
|
|
||||||
s.tb.MustAddRows(rows)
|
s.tb.MustAddRows(rows)
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error occurred during rows addition: %w", err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var storageAddRowsLogger = logger.WithThrottler("storageAddRows", 5*time.Second)
|
var storageAddRowsLogger = logger.WithThrottler("storageAddRows", 5*time.Second)
|
||||||
|
|
|
@ -2,12 +2,12 @@ package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"reflect"
|
"reflect"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"testing/quick"
|
"testing/quick"
|
||||||
|
@ -15,6 +15,7 @@ import (
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -570,12 +571,7 @@ func testStorageRandTimestamps(s *Storage) error {
|
||||||
}
|
}
|
||||||
mrs = append(mrs, mr)
|
mrs = append(mrs, mr)
|
||||||
}
|
}
|
||||||
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
|
s.AddRows(mrs, defaultPrecisionBits)
|
||||||
errStr := err.Error()
|
|
||||||
if !strings.Contains(errStr, "too big timestamp") && !strings.Contains(errStr, "too small timestamp") {
|
|
||||||
return fmt.Errorf("unexpected error when adding mrs: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify the storage contains rows.
|
// Verify the storage contains rows.
|
||||||
|
@ -691,9 +687,7 @@ func testStorageDeleteSeries(s *Storage, workerNum int) error {
|
||||||
}
|
}
|
||||||
mrs = append(mrs, mr)
|
mrs = append(mrs, mr)
|
||||||
}
|
}
|
||||||
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
|
s.AddRows(mrs, defaultPrecisionBits)
|
||||||
return fmt.Errorf("unexpected error when adding mrs: %w", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
s.DebugFlush()
|
s.DebugFlush()
|
||||||
|
|
||||||
|
@ -1031,9 +1025,7 @@ func testStorageAddRows(rng *rand.Rand, s *Storage) error {
|
||||||
minTimestamp := maxTimestamp - s.retentionMsecs + 3600*1000
|
minTimestamp := maxTimestamp - s.retentionMsecs + 3600*1000
|
||||||
for i := 0; i < addsCount; i++ {
|
for i := 0; i < addsCount; i++ {
|
||||||
mrs := testGenerateMetricRows(rng, rowsPerAdd, minTimestamp, maxTimestamp)
|
mrs := testGenerateMetricRows(rng, rowsPerAdd, minTimestamp, maxTimestamp)
|
||||||
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
|
s.AddRows(mrs, defaultPrecisionBits)
|
||||||
return fmt.Errorf("unexpected error when adding mrs: %w", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify the storage contains rows.
|
// Verify the storage contains rows.
|
||||||
|
@ -1172,9 +1164,7 @@ func testStorageAddMetrics(s *Storage, workerNum int) error {
|
||||||
Timestamp: timestamp,
|
Timestamp: timestamp,
|
||||||
Value: value,
|
Value: value,
|
||||||
}
|
}
|
||||||
if err := s.AddRows([]MetricRow{mr}, defaultPrecisionBits); err != nil {
|
s.AddRows([]MetricRow{mr}, defaultPrecisionBits)
|
||||||
return fmt.Errorf("unexpected error when adding mrs: %w", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify the storage contains rows.
|
// Verify the storage contains rows.
|
||||||
|
@ -1198,9 +1188,7 @@ func TestStorageDeleteStaleSnapshots(t *testing.T) {
|
||||||
minTimestamp := maxTimestamp - s.retentionMsecs
|
minTimestamp := maxTimestamp - s.retentionMsecs
|
||||||
for i := 0; i < addsCount; i++ {
|
for i := 0; i < addsCount; i++ {
|
||||||
mrs := testGenerateMetricRows(rng, rowsPerAdd, minTimestamp, maxTimestamp)
|
mrs := testGenerateMetricRows(rng, rowsPerAdd, minTimestamp, maxTimestamp)
|
||||||
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
|
s.AddRows(mrs, defaultPrecisionBits)
|
||||||
t.Fatalf("unexpected error when adding mrs: %s", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// Try creating a snapshot from the storage.
|
// Try creating a snapshot from the storage.
|
||||||
snapshotName, err := s.CreateSnapshot()
|
snapshotName, err := s.CreateSnapshot()
|
||||||
|
@ -1268,9 +1256,7 @@ func TestStorageSeriesAreNotCreatedOnStaleMarkers(t *testing.T) {
|
||||||
rng := rand.New(rand.NewSource(1))
|
rng := rand.New(rand.NewSource(1))
|
||||||
mrs := testGenerateMetricRows(rng, 20, tr.MinTimestamp, tr.MaxTimestamp)
|
mrs := testGenerateMetricRows(rng, 20, tr.MinTimestamp, tr.MaxTimestamp)
|
||||||
// populate storage with some rows
|
// populate storage with some rows
|
||||||
if err := s.AddRows(mrs[:10], defaultPrecisionBits); err != nil {
|
s.AddRows(mrs[:10], defaultPrecisionBits)
|
||||||
t.Fatal("error when adding mrs: %w", err)
|
|
||||||
}
|
|
||||||
s.DebugFlush()
|
s.DebugFlush()
|
||||||
|
|
||||||
// verify ingested rows are searchable
|
// verify ingested rows are searchable
|
||||||
|
@ -1289,9 +1275,7 @@ func TestStorageSeriesAreNotCreatedOnStaleMarkers(t *testing.T) {
|
||||||
for i := 0; i < len(mrs); i = i + 2 {
|
for i := 0; i < len(mrs); i = i + 2 {
|
||||||
mrs[i].Value = decimal.StaleNaN
|
mrs[i].Value = decimal.StaleNaN
|
||||||
}
|
}
|
||||||
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
|
s.AddRows(mrs, defaultPrecisionBits)
|
||||||
t.Fatal("error when adding mrs: %w", err)
|
|
||||||
}
|
|
||||||
s.DebugFlush()
|
s.DebugFlush()
|
||||||
|
|
||||||
// verify that rows marked as stale aren't searchable
|
// verify that rows marked as stale aren't searchable
|
||||||
|
@ -1302,3 +1286,166 @@ func TestStorageSeriesAreNotCreatedOnStaleMarkers(t *testing.T) {
|
||||||
t.Fatalf("cannot remove %q: %s", path, err)
|
t.Fatalf("cannot remove %q: %s", path, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// testRemoveAll removes all storage data produced by a test if the test hasn't
|
||||||
|
// failed. For this to work, the storage must use t.Name() as the base dir in
|
||||||
|
// its data path.
|
||||||
|
//
|
||||||
|
// In case of failure, the data is kept for further debugging.
|
||||||
|
func testRemoveAll(t *testing.T) {
|
||||||
|
defer func() {
|
||||||
|
if !t.Failed() {
|
||||||
|
fs.MustRemoveAll(t.Name())
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStorageRowsNotAdded(t *testing.T) {
|
||||||
|
defer testRemoveAll(t)
|
||||||
|
|
||||||
|
type options struct {
|
||||||
|
name string
|
||||||
|
retention time.Duration
|
||||||
|
mrs []MetricRow
|
||||||
|
tr TimeRange
|
||||||
|
}
|
||||||
|
f := func(opts *options) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
var gotMetrics Metrics
|
||||||
|
path := fmt.Sprintf("%s/%s", t.Name(), opts.name)
|
||||||
|
s := MustOpenStorage(path, opts.retention, 0, 0)
|
||||||
|
defer s.MustClose()
|
||||||
|
s.AddRows(opts.mrs, defaultPrecisionBits)
|
||||||
|
s.DebugFlush()
|
||||||
|
s.UpdateMetrics(&gotMetrics)
|
||||||
|
|
||||||
|
got := testCountAllMetricNames(s, opts.tr)
|
||||||
|
if got != 0 {
|
||||||
|
t.Fatalf("unexpected metric name count: got %d, want 0", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const numRows = 1000
|
||||||
|
var (
|
||||||
|
rng = rand.New(rand.NewSource(1))
|
||||||
|
retention time.Duration
|
||||||
|
minTimestamp int64
|
||||||
|
maxTimestamp int64
|
||||||
|
mrs []MetricRow
|
||||||
|
)
|
||||||
|
|
||||||
|
minTimestamp = -1000
|
||||||
|
maxTimestamp = -1
|
||||||
|
f(&options{
|
||||||
|
name: "NegativeTimestamps",
|
||||||
|
retention: retentionMax,
|
||||||
|
mrs: testGenerateMetricRows(rng, numRows, minTimestamp, maxTimestamp),
|
||||||
|
tr: TimeRange{minTimestamp, maxTimestamp},
|
||||||
|
})
|
||||||
|
|
||||||
|
retention = 48 * time.Hour
|
||||||
|
minTimestamp = time.Now().Add(-retention - time.Hour).UnixMilli()
|
||||||
|
maxTimestamp = minTimestamp + 1000
|
||||||
|
f(&options{
|
||||||
|
name: "TooSmallTimestamps",
|
||||||
|
retention: retention,
|
||||||
|
mrs: testGenerateMetricRows(rng, numRows, minTimestamp, maxTimestamp),
|
||||||
|
tr: TimeRange{minTimestamp, maxTimestamp},
|
||||||
|
})
|
||||||
|
|
||||||
|
retention = 48 * time.Hour
|
||||||
|
minTimestamp = time.Now().Add(7 * 24 * time.Hour).UnixMilli()
|
||||||
|
maxTimestamp = minTimestamp + 1000
|
||||||
|
f(&options{
|
||||||
|
name: "TooBigTimestamps",
|
||||||
|
retention: retention,
|
||||||
|
mrs: testGenerateMetricRows(rng, numRows, minTimestamp, maxTimestamp),
|
||||||
|
tr: TimeRange{minTimestamp, maxTimestamp},
|
||||||
|
})
|
||||||
|
|
||||||
|
minTimestamp = time.Now().UnixMilli()
|
||||||
|
maxTimestamp = minTimestamp + 1000
|
||||||
|
mrs = testGenerateMetricRows(rng, numRows, minTimestamp, maxTimestamp)
|
||||||
|
for i := range numRows {
|
||||||
|
mrs[i].Value = math.NaN()
|
||||||
|
}
|
||||||
|
f(&options{
|
||||||
|
name: "NaN",
|
||||||
|
mrs: mrs,
|
||||||
|
tr: TimeRange{minTimestamp, maxTimestamp},
|
||||||
|
})
|
||||||
|
|
||||||
|
minTimestamp = time.Now().UnixMilli()
|
||||||
|
maxTimestamp = minTimestamp + 1000
|
||||||
|
mrs = testGenerateMetricRows(rng, numRows, minTimestamp, maxTimestamp)
|
||||||
|
for i := range numRows {
|
||||||
|
mrs[i].Value = decimal.StaleNaN
|
||||||
|
}
|
||||||
|
f(&options{
|
||||||
|
name: "StaleNaN",
|
||||||
|
mrs: mrs,
|
||||||
|
tr: TimeRange{minTimestamp, maxTimestamp},
|
||||||
|
})
|
||||||
|
|
||||||
|
minTimestamp = time.Now().UnixMilli()
|
||||||
|
maxTimestamp = minTimestamp + 1000
|
||||||
|
mrs = testGenerateMetricRows(rng, numRows, minTimestamp, maxTimestamp)
|
||||||
|
for i := range numRows {
|
||||||
|
mrs[i].MetricNameRaw = []byte("garbage")
|
||||||
|
}
|
||||||
|
f(&options{
|
||||||
|
name: "InvalidMetricNameRaw",
|
||||||
|
mrs: mrs,
|
||||||
|
tr: TimeRange{minTimestamp, maxTimestamp},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStorageRowsNotAdded_SeriesLimitExceeded(t *testing.T) {
|
||||||
|
defer testRemoveAll(t)
|
||||||
|
|
||||||
|
f := func(name string, maxHourlySeries int, maxDailySeries int) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
rng := rand.New(rand.NewSource(1))
|
||||||
|
numRows := uint64(1000)
|
||||||
|
minTimestamp := time.Now().UnixMilli()
|
||||||
|
maxTimestamp := minTimestamp + 1000
|
||||||
|
mrs := testGenerateMetricRows(rng, numRows, minTimestamp, maxTimestamp)
|
||||||
|
|
||||||
|
var gotMetrics Metrics
|
||||||
|
path := fmt.Sprintf("%s/%s", t.Name(), name)
|
||||||
|
s := MustOpenStorage(path, 0, maxHourlySeries, maxDailySeries)
|
||||||
|
defer s.MustClose()
|
||||||
|
s.AddRows(mrs, defaultPrecisionBits)
|
||||||
|
s.DebugFlush()
|
||||||
|
s.UpdateMetrics(&gotMetrics)
|
||||||
|
|
||||||
|
want := numRows - (gotMetrics.HourlySeriesLimitRowsDropped + gotMetrics.DailySeriesLimitRowsDropped)
|
||||||
|
if got := testCountAllMetricNames(s, TimeRange{minTimestamp, maxTimestamp}); uint64(got) != want {
|
||||||
|
t.Fatalf("unexpected metric name count: %d, want %d", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
maxHourlySeries := 1
|
||||||
|
maxDailySeries := 0 // No limit
|
||||||
|
f("HourlyLimitExceeded", maxHourlySeries, maxDailySeries)
|
||||||
|
|
||||||
|
maxHourlySeries = 0 // No limit
|
||||||
|
maxDailySeries = 1
|
||||||
|
f("DailyLimitExceeded", maxHourlySeries, maxDailySeries)
|
||||||
|
}
|
||||||
|
|
||||||
|
// testCountAllMetricNames is a test helper function that counts the names of
|
||||||
|
// all time series within the given time range.
|
||||||
|
func testCountAllMetricNames(s *Storage, tr TimeRange) int {
|
||||||
|
tfsAll := NewTagFilters()
|
||||||
|
if err := tfsAll.Add([]byte("__name__"), []byte(".*"), false, true); err != nil {
|
||||||
|
panic(fmt.Sprintf("unexpected error in TagFilters.Add: %v", err))
|
||||||
|
}
|
||||||
|
names, err := s.SearchMetricNames(nil, []*TagFilters{tfsAll}, tr, 1e9, noDeadline)
|
||||||
|
if err != nil {
|
||||||
|
panic(fmt.Sprintf("SeachMetricNames() failed unexpectedly: %v", err))
|
||||||
|
}
|
||||||
|
return len(names)
|
||||||
|
}
|
||||||
|
|
|
@ -46,9 +46,7 @@ func benchmarkStorageAddRows(b *testing.B, rowsPerBatch int) {
|
||||||
mr.Timestamp = int64(offset + i)
|
mr.Timestamp = int64(offset + i)
|
||||||
mr.Value = float64(offset + i)
|
mr.Value = float64(offset + i)
|
||||||
}
|
}
|
||||||
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
|
s.AddRows(mrs, defaultPrecisionBits)
|
||||||
panic(fmt.Errorf("cannot add rows to storage: %w", err))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
b.StopTimer()
|
b.StopTimer()
|
||||||
|
|
Loading…
Reference in a new issue