lib/storage: properly register index records with RegisterMetricNames

Once the timeseries is in tsidCache, new entries won't be created in
per-day index because the RegisterMetricNames() code does consider
different dates for the same timeseries. So this case has been added.

The same bug exists for AddRows() but it is not manifested because the
index entries are finally created in updatePerDateData().

RegisterMetricNames also updated to increase the newTimeseriesCreated
counter because it actually creates new time series in index.

A unit tests has been added that check all possible data patterns
(different metric names and dates) and code branches in both
RegisterMetricNames and AddRows. The total number of new unit tests is
around 100 which increaded the running time of storage tests by 50%. 

---------

Signed-off-by: Artem Fetishev <wwctrsrx@gmail.com>
Co-authored-by: Roman Khavronenko <hagen1778@gmail.com>
This commit is contained in:
rtm0 2024-08-27 22:33:53 +03:00 committed by GitHub
parent 30f98916f9
commit eef6943084
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 428 additions and 0 deletions

View file

@ -46,6 +46,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert/), [vmctl](https://docs.victoriametrics.com/vmctl/) and snapshot API: verify correctness of URLs provided via cmd-line flags before executing HTTP requests. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6740) issue for details.
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert): reduce memory usage when parsing responses with big number of metrics in response. The memory usage was increased in [v1.102.0-rc1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.102.0-rc1) after attempt to reduce CPU usage for heavy loaded vmalerts.
* BUGFIX: all VictoriaMetrics components: forcefully set owner/group for release tars to 1000:1000. This helps to avoid unpacking [issues](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6788) on systems with limitations around UID:GID configuration. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6846).
* BUGFIX: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): fix metric names registering in the per-day index for new dates for existing time series when making calls to `/tags/tagSeries` and `/tags/tagMultiSeries` handlers of [Grpahite API](https://docs.victoriametrics.com/#graphite-api-usage). See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6872/) for details.
## [v1.102.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.102.1)

View file

@ -1674,6 +1674,7 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
mn := GetMetricName()
defer PutMetricName(mn)
var newSeriesCount uint64
var seriesRepopulated uint64
idb := s.idb()
@ -1708,6 +1709,18 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
genTSID.generation = generation
s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date)
seriesRepopulated++
} else if !s.dateMetricIDCache.Has(generation, date, genTSID.TSID.MetricID) {
if !is.hasDateMetricIDNoExtDB(date, genTSID.TSID.MetricID) {
if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil {
if firstWarn == nil {
firstWarn = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err)
}
continue
}
mn.sortTags()
is.createPerDayIndexes(date, &genTSID.TSID, mn)
}
s.dateMetricIDCache.Set(generation, date, genTSID.TSID.MetricID)
}
continue
}
@ -1758,8 +1771,10 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
createAllIndexesForMetricName(is, mn, &genTSID.TSID, date)
genTSID.generation = generation
s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date)
newSeriesCount++
}
s.newTimeseriesCreated.Add(newSeriesCount)
s.timeseriesRepopulated.Add(seriesRepopulated)
// There is no need in pre-filling idbNext here, since RegisterMetricNames() is rarely called.

View file

@ -1449,3 +1449,415 @@ func testCountAllMetricNames(s *Storage, tr TimeRange) int {
}
return len(names)
}
// testCountAllMetricIDs is a test helper function that counts the IDs of
// all time series within the given time range.
func testCountAllMetricIDs(s *Storage, tr TimeRange) int {
tfsAll := NewTagFilters()
if err := tfsAll.Add([]byte("__name__"), []byte(".*"), false, true); err != nil {
panic(fmt.Sprintf("unexpected error in TagFilters.Add: %v", err))
}
ids, err := s.idb().searchMetricIDs(nil, []*TagFilters{tfsAll}, tr, 1e9, noDeadline)
if err != nil {
panic(fmt.Sprintf("seachMetricIDs() failed unexpectedly: %s", err))
}
return len(ids)
}
func TestStorageRegisterMetricNamesForVariousDataPatternsConcurrently(t *testing.T) {
testStorageVariousDataPatternsConcurrently(t, true, func(s *Storage, mrs []MetricRow) {
s.RegisterMetricNames(nil, mrs)
})
}
func TestStorageAddRowsForVariousDataPatternsConcurrently(t *testing.T) {
testStorageVariousDataPatternsConcurrently(t, false, func(s *Storage, mrs []MetricRow) {
s.AddRows(mrs, defaultPrecisionBits)
})
}
// testStorageVariousDataPatternsConcurrently tests different concurrency use
// cases when ingesting data of different patterns.
//
// The function is intended to be used by other tests that define which
// operation (AddRows or RegisterMetricNames) is tested.
func testStorageVariousDataPatternsConcurrently(t *testing.T, registerOnly bool, op func(s *Storage, mrs []MetricRow)) {
defer testRemoveAll(t)
const concurrency = 4
t.Run("serial", func(t *testing.T) {
testStorageVariousDataPatterns(t, registerOnly, op, 1, false)
})
t.Run("concurrentRows", func(t *testing.T) {
testStorageVariousDataPatterns(t, registerOnly, op, concurrency, true)
})
t.Run("concurrentBatches", func(t *testing.T) {
testStorageVariousDataPatterns(t, registerOnly, op, concurrency, false)
})
}
// testStorageVariousDataPatterns tests the ingestion of different combinations
// of metric names and dates.
//
// The function is intended to be used by other tests that define the
// concurrency and the operation (AddRows or RegisterMetricNames) under test.
func testStorageVariousDataPatterns(t *testing.T, registerOnly bool, op func(s *Storage, mrs []MetricRow), concurrency int, splitBatches bool) {
f := func(t *testing.T, sameBatchMetricNames, sameRowMetricNames, sameBatchDates, sameRowDates bool) {
batches, wantCounts := testGenerateMetricRowBatches(&batchOptions{
numBatches: 4,
numRowsPerBatch: 100,
registerOnly: registerOnly,
sameBatchMetricNames: sameBatchMetricNames,
sameRowMetricNames: sameRowMetricNames,
sameBatchDates: sameBatchDates,
sameRowDates: sameRowDates,
})
strict := concurrency == 1
s := MustOpenStorage(t.Name(), 0, 0, 0)
testDoConcurrently(s, op, concurrency, splitBatches, batches)
s.DebugFlush()
assertCounts(t, s, wantCounts, strict)
// Rotate indexDB to test the case when TSIDs from tsidCache have the
// generation that is older than the generation of the current indexDB.
s.mustRotateIndexDB(time.Now())
testDoConcurrently(s, op, concurrency, splitBatches, batches)
s.DebugFlush()
assertCounts(t, s, wantCounts, strict)
// Empty the tsidCache to test the case when tsid is retrived from the
// index that belongs to the current generation indexDB.
s.resetAndSaveTSIDCache()
testDoConcurrently(s, op, concurrency, splitBatches, batches)
s.DebugFlush()
assertCounts(t, s, wantCounts, strict)
// Empty the tsidCache and rotate indexDB to test the case when tsid is
// retrived from the index that belongs to the previous generation
// indexDB.
s.resetAndSaveTSIDCache()
s.mustRotateIndexDB(time.Now())
testDoConcurrently(s, op, concurrency, splitBatches, batches)
s.DebugFlush()
assertCounts(t, s, wantCounts, strict)
s.MustClose()
}
t.Run("sameBatchMetrics/sameRowMetrics/sameBatchDates/sameRowDates", func(t *testing.T) {
// Batch1: metric 1971-01-01, metric 1971-01-01
// Batch2: metric 1971-01-01, metric 1971-01-01
t.Parallel()
f(t, true, true, true, true)
})
t.Run("sameBatchMetrics/sameRowMetrics/sameBatchDates/diffRowDates", func(t *testing.T) {
// Batch1: metric 1971-01-01, metric 1971-01-02
// Batch2: metric 1971-01-01, metric 1971-01-02
t.Parallel()
f(t, true, true, true, false)
})
t.Run("sameBatchMetrics/sameRowMetrics/diffBatchDates/sameRowDates", func(t *testing.T) {
// Batch1: metric 1971-01-01, metric 1971-01-01
// Batch2: metric 1971-01-02, metric 1971-01-02
t.Parallel()
f(t, true, true, false, true)
})
t.Run("sameBatchMetrics/sameRowMetrics/diffBatchDates/diffRowDates", func(t *testing.T) {
// Batch1: metric 1971-01-01, metric 1971-01-02
// Batch2: metric 1971-01-03, metric 1971-01-04
t.Parallel()
f(t, true, true, false, false)
})
t.Run("sameBatchMetrics/diffRowMetrics/sameBatchDates/sameRowDates", func(t *testing.T) {
// Batch1: metric_row0 1971-01-01, metric_row1 1971-01-01
// Batch2: metric_row0 1971-01-01, metric_row1 1971-01-01
t.Parallel()
f(t, true, false, true, true)
})
t.Run("sameBatchMetrics/diffRowMetrics/sameBatchDates/diffRowDates", func(t *testing.T) {
// Batch1: metric_row0 1971-01-01, metric_row1 1971-01-02
// Batch2: metric_row0 1971-01-01, metric_row1 1971-01-02
t.Parallel()
f(t, true, false, true, false)
})
t.Run("sameBatchMetrics/diffRowMetrics/diffBatchDates/sameRowDates", func(t *testing.T) {
// Batch1: metric_row0 1971-01-01, metric_row1 1971-01-01
// Batch2: metric_row0 1971-01-02, metric_row1 1971-01-02
t.Parallel()
f(t, true, false, false, true)
})
t.Run("sameBatchMetrics/diffRowMetrics/diffBatchDates/diffRowDates", func(t *testing.T) {
// Batch1: metric_row0 1971-01-01, metric_row1 1971-01-02
// Batch2: metric_row0 1971-01-03, metric_row1 1971-01-04
t.Parallel()
f(t, true, false, false, false)
})
t.Run("diffBatchMetrics/sameRowMetrics/sameBatchDates/sameRowDates", func(t *testing.T) {
// Batch1: metric_batch0 1971-01-01, metric_batch0 1971-01-01
// Batch2: metric_batch1 1971-01-01, metric_batch1 1971-01-01
t.Parallel()
f(t, false, true, true, true)
})
t.Run("diffBatchMetrics/sameRowMetrics/sameBatchDates/diffRowDates", func(t *testing.T) {
// Batch1: metric_batch0 1971-01-01, metric_batch0 1971-01-02
// Batch2: metric_batch1 1971-01-01, metric_batch1 1971-01-02
t.Parallel()
f(t, false, true, true, false)
})
t.Run("diffBatchMetrics/sameRowMetrics/diffBatchDates/sameRowDates", func(t *testing.T) {
// Batch1: metric_batch0 1971-01-01, metric_batch0 1971-01-01
// Batch2: metric_batch1 1971-01-02, metric_batch1 1971-01-02
t.Parallel()
f(t, false, true, false, true)
})
t.Run("diffBatchMetrics/sameRowMetrics/diffBatchDates/diffRowDates", func(t *testing.T) {
// Batch1: metric_batch0 1971-01-01, metric_batch0 1971-01-02
// Batch2: metric_batch1 1971-01-03, metric_batch1 1971-01-04
t.Parallel()
f(t, false, true, false, false)
})
t.Run("diffBatchMetrics/diffRowMetrics/sameBatchDates/sameRowDates", func(t *testing.T) {
// Batch1: metric_batch0_row0 1971-01-01, metric_batch0_row1 1971-01-01
// Batch2: metric_batch1_row0 1971-01-01, metric_batch1_row1 1971-01-01
t.Parallel()
f(t, false, false, true, true)
})
t.Run("diffBatchMetrics/diffRowMetrics/sameBatchDates/diffRowDates", func(t *testing.T) {
// Batch1: metric_batch0_row0 1971-01-01, metric_batch0_row1 1971-01-02
// Batch2: metric_batch1_row0 1971-01-01, metric_batch1_row1 1971-01-02
t.Parallel()
f(t, false, false, true, false)
})
t.Run("diffBatchMetrics/diffRowMetrics/diffBatchDates/sameRowDates", func(t *testing.T) {
// Batch1: metric_batch0_row0 1971-01-01, metric_batch0_row1 1971-01-01
// Batch2: metric_batch1_row0 1971-01-02, metric_batch1_row1 1971-01-02
t.Parallel()
f(t, false, false, false, true)
})
t.Run("diffBatchMetrics/diffRowMetrics/diffBatchDates/diffRowDates", func(t *testing.T) {
// Batch1: metric_batch0_row0 1971-01-01, metric_batch0_row1 1971-01-02
// Batch2: metric_batch1_row0 1971-01-03, metric_batch1_row1 1971-01-04
t.Parallel()
f(t, false, false, false, false)
})
}
// testDoConcurrently performs some storage operation on metric rows
// concurrently.
//
// The function accepts metric rows organized in batches. The number of
// goroutines is specified with concurrency arg. If splitBatches is false, then
// each batch is processed in a separate goroutine. Otherwise, rows from a
// single batch are spread across multiple goroutines and next batch won't be
// processed until all records of the current batch are processed.
func testDoConcurrently(s *Storage, op func(s *Storage, mrs []MetricRow), concurrency int, splitBatches bool, mrsBatches [][]MetricRow) {
if concurrency < 1 {
panic(fmt.Sprintf("Unexpected concurrency: got %d, want >= 1", concurrency))
}
var wg sync.WaitGroup
mrsCh := make(chan []MetricRow)
for range concurrency {
wg.Add(1)
go func() {
for mrs := range mrsCh {
op(s, mrs)
}
wg.Done()
}()
}
n := 1
if splitBatches {
n = concurrency
}
for _, batch := range mrsBatches {
step := len(batch) / n
if step == 0 {
step = 1
}
for begin := 0; begin < len(batch); begin += step {
limit := begin + step
if limit > len(batch) {
limit = len(batch)
}
mrsCh <- batch[begin:limit]
}
}
close(mrsCh)
wg.Wait()
}
type counts struct {
metrics *Metrics
timeRangeCounts map[TimeRange]int
dateTSDBStatuses map[uint64]*TSDBStatus
}
// assertCounts retrieves various counts from storage and compares them with
// the wanted ones.
//
// Some counts can be greater than wanted values because duplicate metric IDs
// can be created when rows are inserted concurrently. In this case `strict`
// arg can be set to false in order to replace strict equality comparison with
// `greater or equal`.
func assertCounts(t *testing.T, s *Storage, want *counts, strict bool) {
t.Helper()
var gotMetrics Metrics
s.UpdateMetrics(&gotMetrics)
gotCnt, wantCnt := gotMetrics.NewTimeseriesCreated, want.metrics.NewTimeseriesCreated
if strict {
if gotCnt != wantCnt {
t.Errorf("unexpected Metrics.NewTimeseriesCreated: got %d, want %d", gotCnt, wantCnt)
}
} else {
if gotCnt < wantCnt {
t.Errorf("unexpected Metrics.NewTimeseriesCreated: got %d, want >= %d", gotCnt, wantCnt)
}
}
for tr, want := range want.timeRangeCounts {
if got := testCountAllMetricNames(s, tr); got != want {
t.Errorf("%v: unexpected metric name count: got %d, want %d", &tr, got, want)
}
got := testCountAllMetricIDs(s, tr)
if strict {
if got != want {
t.Errorf("%v: unexpected metric ID count: got %d, want %d", &tr, got, want)
}
} else {
if got < want {
t.Errorf("%v: unexpected metric ID count: got %d, want >= %d", &tr, got, want)
}
}
}
for date, wantStatus := range want.dateTSDBStatuses {
dt := time.UnixMilli(int64(date) * msecPerDay).UTC()
gotStatus, err := s.GetTSDBStatus(nil, nil, date, "", 10, 1e6, noDeadline)
if err != nil {
t.Fatalf("GetTSDBStatus(%v) failed unexpectedly: %v", dt, err)
}
got, want := gotStatus.TotalSeries, wantStatus.TotalSeries
if strict {
if got != want {
t.Errorf("%v: unexpected TSDBStatus.TotalSeries: got %d, want %d", dt, got, want)
}
} else {
if got < want {
t.Errorf("%v: unexpected TSDBStatus.TotalSeries: got %d, want >= %d", dt, got, want)
}
}
}
}
type batchOptions struct {
numBatches int
numRowsPerBatch int
registerOnly bool
sameBatchMetricNames bool
sameRowMetricNames bool
sameBatchDates bool
sameRowDates bool
}
// testGenerateMetricRowBatches generates metric rows batches of various
// combinations of metric names and dates. The function also returns the counts
// that the storage is expected to report once the generated batch is ingested
// into the storage.
func testGenerateMetricRowBatches(opts *batchOptions) ([][]MetricRow, *counts) {
if opts.numBatches <= 0 {
panic(fmt.Sprintf("unexpected number of batches: got %d, want > 0", opts.numBatches))
}
if opts.numRowsPerBatch <= 0 {
panic(fmt.Sprintf("unexpected number of rows per batch: got %d, want > 0", opts.numRowsPerBatch))
}
rng := rand.New(rand.NewSource(1))
batches := make([][]MetricRow, opts.numBatches)
metricName := "metric"
startTime := time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC)
endTime := time.Date(1971, 1, 1, 23, 59, 59, 999, time.UTC)
days := time.Duration(0)
trNames := make(map[TimeRange]map[string]bool)
names := make(map[string]bool)
for batch := range opts.numBatches {
batchMetricName := metricName
if !opts.sameBatchMetricNames {
batchMetricName += fmt.Sprintf("_batch%d", batch)
}
var rows []MetricRow
for row := range opts.numRowsPerBatch {
rowMetricName := batchMetricName
if !opts.sameRowMetricNames {
rowMetricName += fmt.Sprintf("_row%d", row)
}
mn := MetricName{
MetricGroup: []byte(rowMetricName),
}
tr := TimeRange{
MinTimestamp: startTime.Add(days * 24 * time.Hour).UnixMilli(),
MaxTimestamp: endTime.Add(days * 24 * time.Hour).UnixMilli(),
}
rows = append(rows, MetricRow{
MetricNameRaw: mn.marshalRaw(nil),
Timestamp: rng.Int63n(tr.MaxTimestamp-tr.MinTimestamp) + tr.MinTimestamp,
Value: rng.NormFloat64() * 1e6,
})
if !opts.sameRowDates {
days++
}
if trNames[tr] == nil {
trNames[tr] = make(map[string]bool)
}
names[rowMetricName] = true
trNames[tr][rowMetricName] = true
}
batches[batch] = rows
if opts.sameBatchDates {
days = 0
} else if opts.sameRowDates {
days++
}
}
allTimeseries := len(names)
want := counts{
metrics: &Metrics{
NewTimeseriesCreated: uint64(allTimeseries),
},
timeRangeCounts: make(map[TimeRange]int),
dateTSDBStatuses: make(map[uint64]*TSDBStatus),
}
for tr, names := range trNames {
count := len(names)
date := uint64(tr.MinTimestamp / msecPerDay)
want.timeRangeCounts[tr] = count
want.dateTSDBStatuses[date] = &TSDBStatus{
TotalSeries: uint64(count),
}
}
return batches, &want
}