diff --git a/docs/changelog/CHANGELOG.md b/docs/changelog/CHANGELOG.md index 714fac726..fa9e588fc 100644 --- a/docs/changelog/CHANGELOG.md +++ b/docs/changelog/CHANGELOG.md @@ -46,6 +46,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert/), [vmctl](https://docs.victoriametrics.com/vmctl/) and snapshot API: verify correctness of URLs provided via cmd-line flags before executing HTTP requests. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6740) issue for details. * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert): reduce memory usage when parsing responses with big number of metrics in response. The memory usage was increased in [v1.102.0-rc1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.102.0-rc1) after attempt to reduce CPU usage for heavy loaded vmalerts. * BUGFIX: all VictoriaMetrics components: forcefully set owner/group for release tars to 1000:1000. This helps to avoid unpacking [issues](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6788) on systems with limitations around UID:GID configuration. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6846). +* BUGFIX: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): fix metric names registering in the per-day index for new dates for existing time series when making calls to `/tags/tagSeries` and `/tags/tagMultiSeries` handlers of [Grpahite API](https://docs.victoriametrics.com/#graphite-api-usage). See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6872/) for details. ## [v1.102.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.102.1) diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 3da53cdf4..b3553d33d 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -1674,6 +1674,7 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) { mn := GetMetricName() defer PutMetricName(mn) + var newSeriesCount uint64 var seriesRepopulated uint64 idb := s.idb() @@ -1708,6 +1709,18 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) { genTSID.generation = generation s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date) seriesRepopulated++ + } else if !s.dateMetricIDCache.Has(generation, date, genTSID.TSID.MetricID) { + if !is.hasDateMetricIDNoExtDB(date, genTSID.TSID.MetricID) { + if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil { + if firstWarn == nil { + firstWarn = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err) + } + continue + } + mn.sortTags() + is.createPerDayIndexes(date, &genTSID.TSID, mn) + } + s.dateMetricIDCache.Set(generation, date, genTSID.TSID.MetricID) } continue } @@ -1758,8 +1771,10 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) { createAllIndexesForMetricName(is, mn, &genTSID.TSID, date) genTSID.generation = generation s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date) + newSeriesCount++ } + s.newTimeseriesCreated.Add(newSeriesCount) s.timeseriesRepopulated.Add(seriesRepopulated) // There is no need in pre-filling idbNext here, since RegisterMetricNames() is rarely called. diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index a789a2db1..26a36b5c3 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -1449,3 +1449,415 @@ func testCountAllMetricNames(s *Storage, tr TimeRange) int { } return len(names) } + +// testCountAllMetricIDs is a test helper function that counts the IDs of +// all time series within the given time range. +func testCountAllMetricIDs(s *Storage, tr TimeRange) int { + tfsAll := NewTagFilters() + if err := tfsAll.Add([]byte("__name__"), []byte(".*"), false, true); err != nil { + panic(fmt.Sprintf("unexpected error in TagFilters.Add: %v", err)) + } + ids, err := s.idb().searchMetricIDs(nil, []*TagFilters{tfsAll}, tr, 1e9, noDeadline) + if err != nil { + panic(fmt.Sprintf("seachMetricIDs() failed unexpectedly: %s", err)) + } + return len(ids) +} + +func TestStorageRegisterMetricNamesForVariousDataPatternsConcurrently(t *testing.T) { + testStorageVariousDataPatternsConcurrently(t, true, func(s *Storage, mrs []MetricRow) { + s.RegisterMetricNames(nil, mrs) + }) +} + +func TestStorageAddRowsForVariousDataPatternsConcurrently(t *testing.T) { + testStorageVariousDataPatternsConcurrently(t, false, func(s *Storage, mrs []MetricRow) { + s.AddRows(mrs, defaultPrecisionBits) + }) +} + +// testStorageVariousDataPatternsConcurrently tests different concurrency use +// cases when ingesting data of different patterns. +// +// The function is intended to be used by other tests that define which +// operation (AddRows or RegisterMetricNames) is tested. +func testStorageVariousDataPatternsConcurrently(t *testing.T, registerOnly bool, op func(s *Storage, mrs []MetricRow)) { + defer testRemoveAll(t) + + const concurrency = 4 + + t.Run("serial", func(t *testing.T) { + testStorageVariousDataPatterns(t, registerOnly, op, 1, false) + }) + t.Run("concurrentRows", func(t *testing.T) { + testStorageVariousDataPatterns(t, registerOnly, op, concurrency, true) + }) + t.Run("concurrentBatches", func(t *testing.T) { + testStorageVariousDataPatterns(t, registerOnly, op, concurrency, false) + }) +} + +// testStorageVariousDataPatterns tests the ingestion of different combinations +// of metric names and dates. +// +// The function is intended to be used by other tests that define the +// concurrency and the operation (AddRows or RegisterMetricNames) under test. +func testStorageVariousDataPatterns(t *testing.T, registerOnly bool, op func(s *Storage, mrs []MetricRow), concurrency int, splitBatches bool) { + f := func(t *testing.T, sameBatchMetricNames, sameRowMetricNames, sameBatchDates, sameRowDates bool) { + batches, wantCounts := testGenerateMetricRowBatches(&batchOptions{ + numBatches: 4, + numRowsPerBatch: 100, + registerOnly: registerOnly, + sameBatchMetricNames: sameBatchMetricNames, + sameRowMetricNames: sameRowMetricNames, + sameBatchDates: sameBatchDates, + sameRowDates: sameRowDates, + }) + strict := concurrency == 1 + + s := MustOpenStorage(t.Name(), 0, 0, 0) + + testDoConcurrently(s, op, concurrency, splitBatches, batches) + s.DebugFlush() + assertCounts(t, s, wantCounts, strict) + + // Rotate indexDB to test the case when TSIDs from tsidCache have the + // generation that is older than the generation of the current indexDB. + s.mustRotateIndexDB(time.Now()) + testDoConcurrently(s, op, concurrency, splitBatches, batches) + s.DebugFlush() + assertCounts(t, s, wantCounts, strict) + + // Empty the tsidCache to test the case when tsid is retrived from the + // index that belongs to the current generation indexDB. + s.resetAndSaveTSIDCache() + testDoConcurrently(s, op, concurrency, splitBatches, batches) + s.DebugFlush() + assertCounts(t, s, wantCounts, strict) + + // Empty the tsidCache and rotate indexDB to test the case when tsid is + // retrived from the index that belongs to the previous generation + // indexDB. + s.resetAndSaveTSIDCache() + s.mustRotateIndexDB(time.Now()) + testDoConcurrently(s, op, concurrency, splitBatches, batches) + s.DebugFlush() + assertCounts(t, s, wantCounts, strict) + + s.MustClose() + } + + t.Run("sameBatchMetrics/sameRowMetrics/sameBatchDates/sameRowDates", func(t *testing.T) { + // Batch1: metric 1971-01-01, metric 1971-01-01 + // Batch2: metric 1971-01-01, metric 1971-01-01 + t.Parallel() + f(t, true, true, true, true) + }) + + t.Run("sameBatchMetrics/sameRowMetrics/sameBatchDates/diffRowDates", func(t *testing.T) { + // Batch1: metric 1971-01-01, metric 1971-01-02 + // Batch2: metric 1971-01-01, metric 1971-01-02 + t.Parallel() + f(t, true, true, true, false) + }) + + t.Run("sameBatchMetrics/sameRowMetrics/diffBatchDates/sameRowDates", func(t *testing.T) { + // Batch1: metric 1971-01-01, metric 1971-01-01 + // Batch2: metric 1971-01-02, metric 1971-01-02 + t.Parallel() + f(t, true, true, false, true) + }) + + t.Run("sameBatchMetrics/sameRowMetrics/diffBatchDates/diffRowDates", func(t *testing.T) { + // Batch1: metric 1971-01-01, metric 1971-01-02 + // Batch2: metric 1971-01-03, metric 1971-01-04 + t.Parallel() + f(t, true, true, false, false) + }) + + t.Run("sameBatchMetrics/diffRowMetrics/sameBatchDates/sameRowDates", func(t *testing.T) { + // Batch1: metric_row0 1971-01-01, metric_row1 1971-01-01 + // Batch2: metric_row0 1971-01-01, metric_row1 1971-01-01 + t.Parallel() + f(t, true, false, true, true) + }) + + t.Run("sameBatchMetrics/diffRowMetrics/sameBatchDates/diffRowDates", func(t *testing.T) { + // Batch1: metric_row0 1971-01-01, metric_row1 1971-01-02 + // Batch2: metric_row0 1971-01-01, metric_row1 1971-01-02 + t.Parallel() + f(t, true, false, true, false) + }) + + t.Run("sameBatchMetrics/diffRowMetrics/diffBatchDates/sameRowDates", func(t *testing.T) { + // Batch1: metric_row0 1971-01-01, metric_row1 1971-01-01 + // Batch2: metric_row0 1971-01-02, metric_row1 1971-01-02 + t.Parallel() + f(t, true, false, false, true) + }) + + t.Run("sameBatchMetrics/diffRowMetrics/diffBatchDates/diffRowDates", func(t *testing.T) { + // Batch1: metric_row0 1971-01-01, metric_row1 1971-01-02 + // Batch2: metric_row0 1971-01-03, metric_row1 1971-01-04 + t.Parallel() + f(t, true, false, false, false) + }) + + t.Run("diffBatchMetrics/sameRowMetrics/sameBatchDates/sameRowDates", func(t *testing.T) { + // Batch1: metric_batch0 1971-01-01, metric_batch0 1971-01-01 + // Batch2: metric_batch1 1971-01-01, metric_batch1 1971-01-01 + t.Parallel() + f(t, false, true, true, true) + }) + + t.Run("diffBatchMetrics/sameRowMetrics/sameBatchDates/diffRowDates", func(t *testing.T) { + // Batch1: metric_batch0 1971-01-01, metric_batch0 1971-01-02 + // Batch2: metric_batch1 1971-01-01, metric_batch1 1971-01-02 + t.Parallel() + f(t, false, true, true, false) + }) + + t.Run("diffBatchMetrics/sameRowMetrics/diffBatchDates/sameRowDates", func(t *testing.T) { + // Batch1: metric_batch0 1971-01-01, metric_batch0 1971-01-01 + // Batch2: metric_batch1 1971-01-02, metric_batch1 1971-01-02 + t.Parallel() + f(t, false, true, false, true) + }) + + t.Run("diffBatchMetrics/sameRowMetrics/diffBatchDates/diffRowDates", func(t *testing.T) { + // Batch1: metric_batch0 1971-01-01, metric_batch0 1971-01-02 + // Batch2: metric_batch1 1971-01-03, metric_batch1 1971-01-04 + t.Parallel() + f(t, false, true, false, false) + }) + + t.Run("diffBatchMetrics/diffRowMetrics/sameBatchDates/sameRowDates", func(t *testing.T) { + // Batch1: metric_batch0_row0 1971-01-01, metric_batch0_row1 1971-01-01 + // Batch2: metric_batch1_row0 1971-01-01, metric_batch1_row1 1971-01-01 + t.Parallel() + f(t, false, false, true, true) + }) + + t.Run("diffBatchMetrics/diffRowMetrics/sameBatchDates/diffRowDates", func(t *testing.T) { + // Batch1: metric_batch0_row0 1971-01-01, metric_batch0_row1 1971-01-02 + // Batch2: metric_batch1_row0 1971-01-01, metric_batch1_row1 1971-01-02 + t.Parallel() + f(t, false, false, true, false) + }) + + t.Run("diffBatchMetrics/diffRowMetrics/diffBatchDates/sameRowDates", func(t *testing.T) { + // Batch1: metric_batch0_row0 1971-01-01, metric_batch0_row1 1971-01-01 + // Batch2: metric_batch1_row0 1971-01-02, metric_batch1_row1 1971-01-02 + t.Parallel() + f(t, false, false, false, true) + }) + + t.Run("diffBatchMetrics/diffRowMetrics/diffBatchDates/diffRowDates", func(t *testing.T) { + // Batch1: metric_batch0_row0 1971-01-01, metric_batch0_row1 1971-01-02 + // Batch2: metric_batch1_row0 1971-01-03, metric_batch1_row1 1971-01-04 + t.Parallel() + f(t, false, false, false, false) + }) +} + +// testDoConcurrently performs some storage operation on metric rows +// concurrently. +// +// The function accepts metric rows organized in batches. The number of +// goroutines is specified with concurrency arg. If splitBatches is false, then +// each batch is processed in a separate goroutine. Otherwise, rows from a +// single batch are spread across multiple goroutines and next batch won't be +// processed until all records of the current batch are processed. +func testDoConcurrently(s *Storage, op func(s *Storage, mrs []MetricRow), concurrency int, splitBatches bool, mrsBatches [][]MetricRow) { + if concurrency < 1 { + panic(fmt.Sprintf("Unexpected concurrency: got %d, want >= 1", concurrency)) + } + + var wg sync.WaitGroup + mrsCh := make(chan []MetricRow) + for range concurrency { + wg.Add(1) + go func() { + for mrs := range mrsCh { + op(s, mrs) + } + wg.Done() + }() + } + + n := 1 + if splitBatches { + n = concurrency + } + for _, batch := range mrsBatches { + step := len(batch) / n + if step == 0 { + step = 1 + } + for begin := 0; begin < len(batch); begin += step { + limit := begin + step + if limit > len(batch) { + limit = len(batch) + } + mrsCh <- batch[begin:limit] + } + } + close(mrsCh) + wg.Wait() +} + +type counts struct { + metrics *Metrics + timeRangeCounts map[TimeRange]int + dateTSDBStatuses map[uint64]*TSDBStatus +} + +// assertCounts retrieves various counts from storage and compares them with +// the wanted ones. +// +// Some counts can be greater than wanted values because duplicate metric IDs +// can be created when rows are inserted concurrently. In this case `strict` +// arg can be set to false in order to replace strict equality comparison with +// `greater or equal`. +func assertCounts(t *testing.T, s *Storage, want *counts, strict bool) { + t.Helper() + + var gotMetrics Metrics + s.UpdateMetrics(&gotMetrics) + gotCnt, wantCnt := gotMetrics.NewTimeseriesCreated, want.metrics.NewTimeseriesCreated + if strict { + if gotCnt != wantCnt { + t.Errorf("unexpected Metrics.NewTimeseriesCreated: got %d, want %d", gotCnt, wantCnt) + } + } else { + if gotCnt < wantCnt { + t.Errorf("unexpected Metrics.NewTimeseriesCreated: got %d, want >= %d", gotCnt, wantCnt) + } + } + + for tr, want := range want.timeRangeCounts { + if got := testCountAllMetricNames(s, tr); got != want { + t.Errorf("%v: unexpected metric name count: got %d, want %d", &tr, got, want) + } + got := testCountAllMetricIDs(s, tr) + if strict { + if got != want { + t.Errorf("%v: unexpected metric ID count: got %d, want %d", &tr, got, want) + } + } else { + if got < want { + t.Errorf("%v: unexpected metric ID count: got %d, want >= %d", &tr, got, want) + } + } + + } + + for date, wantStatus := range want.dateTSDBStatuses { + dt := time.UnixMilli(int64(date) * msecPerDay).UTC() + gotStatus, err := s.GetTSDBStatus(nil, nil, date, "", 10, 1e6, noDeadline) + if err != nil { + t.Fatalf("GetTSDBStatus(%v) failed unexpectedly: %v", dt, err) + } + got, want := gotStatus.TotalSeries, wantStatus.TotalSeries + if strict { + if got != want { + t.Errorf("%v: unexpected TSDBStatus.TotalSeries: got %d, want %d", dt, got, want) + } + } else { + if got < want { + t.Errorf("%v: unexpected TSDBStatus.TotalSeries: got %d, want >= %d", dt, got, want) + } + } + } +} + +type batchOptions struct { + numBatches int + numRowsPerBatch int + registerOnly bool + sameBatchMetricNames bool + sameRowMetricNames bool + sameBatchDates bool + sameRowDates bool +} + +// testGenerateMetricRowBatches generates metric rows batches of various +// combinations of metric names and dates. The function also returns the counts +// that the storage is expected to report once the generated batch is ingested +// into the storage. +func testGenerateMetricRowBatches(opts *batchOptions) ([][]MetricRow, *counts) { + if opts.numBatches <= 0 { + panic(fmt.Sprintf("unexpected number of batches: got %d, want > 0", opts.numBatches)) + } + if opts.numRowsPerBatch <= 0 { + panic(fmt.Sprintf("unexpected number of rows per batch: got %d, want > 0", opts.numRowsPerBatch)) + } + + rng := rand.New(rand.NewSource(1)) + + batches := make([][]MetricRow, opts.numBatches) + metricName := "metric" + startTime := time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC) + endTime := time.Date(1971, 1, 1, 23, 59, 59, 999, time.UTC) + days := time.Duration(0) + trNames := make(map[TimeRange]map[string]bool) + names := make(map[string]bool) + for batch := range opts.numBatches { + batchMetricName := metricName + if !opts.sameBatchMetricNames { + batchMetricName += fmt.Sprintf("_batch%d", batch) + } + var rows []MetricRow + for row := range opts.numRowsPerBatch { + rowMetricName := batchMetricName + if !opts.sameRowMetricNames { + rowMetricName += fmt.Sprintf("_row%d", row) + } + mn := MetricName{ + MetricGroup: []byte(rowMetricName), + } + tr := TimeRange{ + MinTimestamp: startTime.Add(days * 24 * time.Hour).UnixMilli(), + MaxTimestamp: endTime.Add(days * 24 * time.Hour).UnixMilli(), + } + rows = append(rows, MetricRow{ + MetricNameRaw: mn.marshalRaw(nil), + Timestamp: rng.Int63n(tr.MaxTimestamp-tr.MinTimestamp) + tr.MinTimestamp, + Value: rng.NormFloat64() * 1e6, + }) + if !opts.sameRowDates { + days++ + } + + if trNames[tr] == nil { + trNames[tr] = make(map[string]bool) + } + names[rowMetricName] = true + trNames[tr][rowMetricName] = true + } + batches[batch] = rows + if opts.sameBatchDates { + days = 0 + } else if opts.sameRowDates { + days++ + } + } + + allTimeseries := len(names) + want := counts{ + metrics: &Metrics{ + NewTimeseriesCreated: uint64(allTimeseries), + }, + timeRangeCounts: make(map[TimeRange]int), + dateTSDBStatuses: make(map[uint64]*TSDBStatus), + } + for tr, names := range trNames { + count := len(names) + date := uint64(tr.MinTimestamp / msecPerDay) + want.timeRangeCounts[tr] = count + want.dateTSDBStatuses[date] = &TSDBStatus{ + TotalSeries: uint64(count), + } + } + return batches, &want +}