mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-01 14:47:38 +00:00
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5069
This commit is contained in:
parent
eb2a7ced9c
commit
f61aa472bb
2 changed files with 70 additions and 0 deletions
|
@ -1835,6 +1835,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
|
|||
|
||||
// Return only the first error, since it has no sense in returning all errors.
|
||||
var firstWarn error
|
||||
var isStaleNan bool
|
||||
j := 0
|
||||
for i := range mrs {
|
||||
mr := &mrs[i]
|
||||
|
@ -1844,6 +1845,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
|
|||
// doesn't know how to work with them.
|
||||
continue
|
||||
}
|
||||
isStaleNan = true
|
||||
}
|
||||
if mr.Timestamp < minTimestamp {
|
||||
// Skip rows with too small timestamps outside the retention.
|
||||
|
@ -1957,6 +1959,13 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
|
|||
continue
|
||||
}
|
||||
|
||||
// If TSID was not found in cache and in indexdb, it's deleted. If metric contains stale
|
||||
// marker, do not create a new TSID.
|
||||
if isStaleNan {
|
||||
j--
|
||||
continue
|
||||
}
|
||||
|
||||
// Slowest path - the TSID for the given mr.MetricNameRaw isn't found in indexdb. Create it.
|
||||
generateTSID(&genTSID.TSID, mn)
|
||||
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
"testing/quick"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
|
||||
)
|
||||
|
@ -1359,3 +1360,63 @@ func TestStorageDeleteStaleSnapshots(t *testing.T) {
|
|||
t.Fatalf("cannot remove %q: %s", path, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStorageSeriesAreNotCreatedOnStaleMarkers(t *testing.T) {
|
||||
rng := rand.New(rand.NewSource(1))
|
||||
path := "TestStorageSeriesAreNotCreatedOnStaleMarkers"
|
||||
s := MustOpenStorage(path, -1, 1e5, 1e6)
|
||||
|
||||
// Verify no label names exist
|
||||
lns, err := s.SearchLabelNamesWithFiltersOnTimeRange(nil, nil, TimeRange{}, 1e5, 1e9, noDeadline)
|
||||
if err != nil {
|
||||
t.Fatalf("error in SearchLabelNamesWithFiltersOnTimeRange() at the start: %s", err)
|
||||
}
|
||||
if len(lns) != 0 {
|
||||
t.Fatalf("found non-empty tag keys at the start: %q", lns)
|
||||
}
|
||||
|
||||
// Add rows with stale markers to the storage
|
||||
mrs := testGenerateStaleMetricRows(rng, 100, 0, 2e10)
|
||||
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
|
||||
t.Fatal("error when adding mrs: %w", err)
|
||||
}
|
||||
|
||||
// Verify that tag keys were not created
|
||||
lns, err = s.SearchLabelNamesWithFiltersOnTimeRange(nil, nil, TimeRange{}, 1e5, 1e9, noDeadline)
|
||||
if err != nil {
|
||||
t.Fatalf("error in SearchLabelNamesWithFiltersOnTimeRange() at the start: %s", err)
|
||||
}
|
||||
if len(lns) != 0 {
|
||||
t.Fatalf("found non-empty tag keys at the start: %q", lns)
|
||||
}
|
||||
|
||||
s.MustClose()
|
||||
if err := os.RemoveAll(path); err != nil {
|
||||
t.Fatalf("cannot remove %q: %s", path, err)
|
||||
}
|
||||
}
|
||||
|
||||
func testGenerateStaleMetricRows(rng *rand.Rand, rows uint64, timestampMin, timestampMax int64) []MetricRow {
|
||||
var mrs []MetricRow
|
||||
var mn MetricName
|
||||
mn.Tags = []Tag{
|
||||
{[]byte("job"), []byte("test_service")},
|
||||
{[]byte("instance"), []byte("1.2.3.4")},
|
||||
}
|
||||
|
||||
for i := 0; i < int(rows); i++ {
|
||||
mn.MetricGroup = []byte(fmt.Sprintf("metric_%d", i))
|
||||
metricNameRaw := mn.marshalRaw(nil)
|
||||
timestamp := rng.Int63n(timestampMax - timestampMin)
|
||||
value := decimal.StaleNaN
|
||||
|
||||
mr := MetricRow{
|
||||
MetricNameRaw: metricNameRaw,
|
||||
Timestamp: timestamp,
|
||||
Value: value,
|
||||
}
|
||||
mrs = append(mrs, mr)
|
||||
}
|
||||
|
||||
return mrs
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue