lib/storage: fix active timeseries collection when per-day index is disabled ()

Fix metric that shows number of active time series when per-day index is disabled. Previously, once per-day index was disabled, the active time series metric would stop being populated and the `Active time series` chart would show 0.

See: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8411.
This commit is contained in:
Artem Fetishev 2025-03-12 17:12:09 +01:00 committed by GitHub
parent 5e231fe07b
commit ee66d601b4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 209 additions and 13 deletions

View file

@ -236,3 +236,83 @@ func testSearchWithDisabledPerDayIndex(tc *at.TestCase, start startSUTFunc) {
},
})
}
func TestSingleActiveTimeseriesMetric_enabledPerDayIndex(t *testing.T) {
testSingleActiveTimeseriesMetric(t, false)
}
func TestSingleActiveTimeseriesMetric_disabledPerDayIndex(t *testing.T) {
testSingleActiveTimeseriesMetric(t, true)
}
func testSingleActiveTimeseriesMetric(t *testing.T, disablePerDayIndex bool) {
tc := at.NewTestCase(t)
defer tc.Stop()
vmsingle := tc.MustStartVmsingle("vmsingle", []string{
fmt.Sprintf("-storageDataPath=%s/vmsingle-%t", tc.Dir(), disablePerDayIndex),
fmt.Sprintf("-disablePerDayIndex=%t", disablePerDayIndex),
})
testActiveTimeseriesMetric(tc, vmsingle, func() int {
return vmsingle.GetIntMetric(t, `vm_cache_entries{type="storage/hour_metric_ids"}`)
})
}
func TestClusterActiveTimeseriesMetric_enabledPerDayIndex(t *testing.T) {
t.Skip("TODO(@rtm0): Enable once the fix is ported to cluster")
testClusterActiveTimeseriesMetric(t, false)
}
func TestClusterActiveTimeseriesMetric_disabledPerDayIndex(t *testing.T) {
t.Skip("TODO(@rtm0): Enable once the fix is ported to cluster")
testClusterActiveTimeseriesMetric(t, true)
}
func testClusterActiveTimeseriesMetric(t *testing.T, disablePerDayIndex bool) {
tc := at.NewTestCase(t)
defer tc.Stop()
vmstorage1 := tc.MustStartVmstorage("vmstorage1", []string{
fmt.Sprintf("-storageDataPath=%s/vmstorage1-%t", tc.Dir(), disablePerDayIndex),
fmt.Sprintf("-disablePerDayIndex=%t", disablePerDayIndex),
})
vmstorage2 := tc.MustStartVmstorage("vmstorage2", []string{
fmt.Sprintf("-storageDataPath=%s/vmstorage2-%t", tc.Dir(), disablePerDayIndex),
fmt.Sprintf("-disablePerDayIndex=%t", disablePerDayIndex),
})
vminsert := tc.MustStartVminsert("vminsert", []string{
"-storageNode=" + vmstorage1.VminsertAddr() + "," + vmstorage2.VminsertAddr(),
})
vmcluster := &at.Vmcluster{
Vmstorages: []*at.Vmstorage{vmstorage1, vmstorage2},
Vminsert: vminsert,
}
testActiveTimeseriesMetric(tc, vmcluster, func() int {
cnt1 := vmstorage1.GetIntMetric(t, `vm_cache_entries{type="storage/hour_metric_ids"}`)
cnt2 := vmstorage2.GetIntMetric(t, `vm_cache_entries{type="storage/hour_metric_ids"}`)
return cnt1 + cnt2
})
}
func testActiveTimeseriesMetric(tc *at.TestCase, sut at.PrometheusWriteQuerier, getActiveTimeseries func() int) {
t := tc.T()
const numSamples = 1000
samples := make([]string, numSamples)
for i := range numSamples {
samples[i] = fmt.Sprintf("metric_%03d %d", i, i)
}
sut.PrometheusAPIV1ImportPrometheus(t, samples, at.QueryOpts{})
sut.ForceFlush(t)
tc.Assert(&at.AssertOptions{
Msg: `unexpected vm_cache_entries{type="storage/hour_metric_ids"} metric value`,
Got: func() any {
return getActiveTimeseries()
},
Want: numSamples,
})
}

View file

@ -24,6 +24,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): expose `vmalert_alerts_send_duration_seconds` metric to measure the time taken to send alerts to the specified `-notifier.url`. Thanks to @eyazici90 for [the pull reuqest](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8468).
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation): fix panic on `rate` output. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8469).
* BUGFIX: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and [vmstorage](https://docs.victoriametrics.com/victoriametrics/): fix metric that shows number of active time series when per-day index is disabled. Previously, once per-day index was disabled, the active time series metric would stop being populated and the `Active time series` chart would show 0. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8411) for details.
## [v1.113.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.113.0)

View file

@ -379,6 +379,9 @@ func (s *Storage) DebugFlush() {
idb.doExtDB(func(extDB *indexDB) {
extDB.tb.DebugFlush()
})
hour := fasttime.UnixHour()
s.updateCurrHourMetricIDs(hour)
}
// CreateSnapshot creates snapshot for s and returns the snapshot name.
@ -1949,6 +1952,10 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
is := idb.getIndexSearch(noDeadline)
defer idb.putIndexSearch(is)
hmPrev := s.prevHourMetricIDs.Load()
hmCurr := s.currHourMetricIDs.Load()
var pendingHourEntries []uint64
mn := GetMetricName()
defer PutMetricName(mn)
@ -2008,6 +2015,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
r.Value = mr.Value
r.PrecisionBits = precisionBits
date := s.date(r.Timestamp)
hour := uint64(r.Timestamp) / msecPerHour
// Search for TSID for the given mr.MetricNameRaw and store it at r.TSID.
if string(mr.MetricNameRaw) == string(prevMetricNameRaw) {
@ -2016,6 +2024,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
r.TSID = prevTSID
continue
}
if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) {
// Fast path - the TSID for the given mr.MetricNameRaw has been found in cache and isn't deleted.
// There is no need in checking whether r.TSID.MetricID is deleted, since tsidCache doesn't
@ -2049,6 +2058,9 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
seriesRepopulated++
slowInsertsCount++
}
if hour == hmCurr.hour && !hmCurr.m.Has(genTSID.TSID.MetricID) {
pendingHourEntries = append(pendingHourEntries, genTSID.TSID.MetricID)
}
continue
}
@ -2093,6 +2105,11 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
r.TSID = genTSID.TSID
prevTSID = genTSID.TSID
prevMetricNameRaw = mr.MetricNameRaw
if hour == hmCurr.hour && !hmCurr.m.Has(genTSID.TSID.MetricID) {
pendingHourEntries = append(pendingHourEntries, genTSID.TSID.MetricID)
}
continue
}
@ -2114,6 +2131,10 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
prevTSID = r.TSID
prevMetricNameRaw = mr.MetricNameRaw
if hour == hmCurr.hour && !hmCurr.m.Has(genTSID.TSID.MetricID) {
pendingHourEntries = append(pendingHourEntries, genTSID.TSID.MetricID)
}
if logNewSeries {
logger.Infof("new series created: %s", mn.String())
}
@ -2126,13 +2147,19 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
dstMrs = dstMrs[:j]
rows = rows[:j]
if len(pendingHourEntries) > 0 {
s.pendingHourEntriesLock.Lock()
s.pendingHourEntries.AddMulti(pendingHourEntries)
s.pendingHourEntriesLock.Unlock()
}
if err := s.prefillNextIndexDB(rows, dstMrs); err != nil {
if firstWarn == nil {
firstWarn = fmt.Errorf("cannot prefill next indexdb: %w", err)
}
}
if err := s.updatePerDateData(rows, dstMrs); err != nil {
if err := s.updatePerDateData(rows, dstMrs, hmPrev, hmCurr); err != nil {
if firstWarn == nil {
firstWarn = fmt.Errorf("cannot not update per-day index: %w", err)
}
@ -2281,7 +2308,7 @@ func (s *Storage) prefillNextIndexDB(rows []rawRow, mrs []*MetricRow) error {
return firstError
}
func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow, hmPrev, hmCurr *hourMetricIDs) error {
if s.disablePerDayIndex {
return nil
}
@ -2299,8 +2326,6 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
idb := s.idb()
generation := idb.generation
hm := s.currHourMetricIDs.Load()
hmPrev := s.prevHourMetricIDs.Load()
hmPrevDate := hmPrev.hour / 24
nextDayMetricIDs := &s.nextDayMetricIDs.Load().v
ts := fasttime.UnixTimestamp()
@ -2314,7 +2339,6 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
}
var pendingDateMetricIDs []pendingDateMetricID
var pendingNextDayMetricIDs []uint64
var pendingHourEntries []uint64
for i := range rows {
r := &rows[i]
if r.Timestamp != prevTimestamp {
@ -2329,9 +2353,9 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
}
prevDate = date
prevMetricID = metricID
if hour == hm.hour {
if hour == hmCurr.hour {
// The row belongs to the current hour. Check for the current hour cache.
if hm.m.Has(metricID) {
if hmCurr.m.Has(metricID) {
// Fast path: the metricID is in the current hour cache.
// This means the metricID has been already added to per-day inverted index.
@ -2352,7 +2376,6 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
}
continue
}
pendingHourEntries = append(pendingHourEntries, metricID)
if date == hmPrevDate && hmPrev.m.Has(metricID) {
// The metricID is already registered for the current day on the previous hour.
continue
@ -2375,11 +2398,6 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
s.pendingNextDayMetricIDs.AddMulti(pendingNextDayMetricIDs)
s.pendingNextDayMetricIDsLock.Unlock()
}
if len(pendingHourEntries) > 0 {
s.pendingHourEntriesLock.Lock()
s.pendingHourEntries.AddMulti(pendingHourEntries)
s.pendingHourEntriesLock.Unlock()
}
if len(pendingDateMetricIDs) == 0 {
// Fast path - there are no new (date, metricID) entries.
return nil

View file

@ -2456,6 +2456,103 @@ func TestStorageAddRows_SamplesWithZeroDate(t *testing.T) {
})
}
func TestStorageAddRows_currHourMetricIDs(t *testing.T) {
defer testRemoveAll(t)
f := func(t *testing.T, disablePerDayIndex bool) {
t.Helper()
s := MustOpenStorage(t.Name(), OpenOptions{
DisablePerDayIndex: disablePerDayIndex,
})
defer s.MustClose()
now := time.Now().UTC()
currHourTR := TimeRange{
MinTimestamp: time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.UTC).UnixMilli(),
MaxTimestamp: time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 59, 59, 999_999_999, time.UTC).UnixMilli(),
}
currHour := uint64(currHourTR.MinTimestamp / 1000 / 3600)
prevHourTR := TimeRange{
MinTimestamp: currHourTR.MinTimestamp - 3600*1000,
MaxTimestamp: currHourTR.MaxTimestamp - 3600*1000,
}
rng := rand.New(rand.NewSource(1))
// Test current hour metricIDs population when data ingestion takes the
// slow path. The database is empty, therefore the index and the
// tsidCache contain no metricIDs, therefore the data ingestion will
// take slow path.
mrs := testGenerateMetricRowsWithPrefix(rng, 1000, "slow_path", currHourTR)
s.AddRows(mrs, defaultPrecisionBits)
s.DebugFlush()
s.updateCurrHourMetricIDs(currHour)
if got, want := s.currHourMetricIDs.Load().m.Len(), 1000; got != want {
t.Errorf("[slow path] unexpected current hour metric ID count: got %d, want %d", got, want)
}
// Test current hour metricIDs population when data ingestion takes the
// fast path (when the metricIDs are found in the tsidCache)
// First insert samples to populate the tsidCache. The samples belong to
// the previous hour, therefore the metricIDs won't be added to
// currHourMetricIDs.
mrs = testGenerateMetricRowsWithPrefix(rng, 1000, "fast_path", prevHourTR)
s.AddRows(mrs, defaultPrecisionBits)
s.DebugFlush()
s.updateCurrHourMetricIDs(currHour)
if got, want := s.currHourMetricIDs.Load().m.Len(), 1000; got != want {
t.Errorf("[fast path] unexpected current hour metric ID count after ingesting samples for previous hour: got %d, want %d", got, want)
}
// Now ingest the same metrics. This time the metricIDs will be found in
// tsidCache so the ingestion will take the fast path.
mrs = testGenerateMetricRowsWithPrefix(rng, 1000, "fast_path", currHourTR)
s.AddRows(mrs, defaultPrecisionBits)
s.DebugFlush()
s.updateCurrHourMetricIDs(currHour)
if got, want := s.currHourMetricIDs.Load().m.Len(), 2000; got != want {
t.Errorf("[fast path] unexpected current hour metric ID count: got %d, want %d", got, want)
}
// Test current hour metricIDs population when data ingestion takes the
// slower path (when the metricIDs are not found in the tsidCache but
// found in the index)
// First insert samples to populate the index. The samples belong to
// the previous hour, therefore the metricIDs won't be added to
// currHourMetricIDs.
mrs = testGenerateMetricRowsWithPrefix(rng, 1000, "slower_path", prevHourTR)
s.AddRows(mrs, defaultPrecisionBits)
s.DebugFlush()
s.updateCurrHourMetricIDs(currHour)
if got, want := s.currHourMetricIDs.Load().m.Len(), 2000; got != want {
t.Errorf("[slower path] unexpected current hour metric ID count after ingesting samples for previous hour: got %d, want %d", got, want)
}
// Inserted samples were also added to the tsidCache. Drop it to
// enforce the fallback to index search.
s.resetAndSaveTSIDCache()
// Now ingest the same metrics. This time the metricIDs will be searched
// and found in index so the ingestion will take the slower path.
mrs = testGenerateMetricRowsWithPrefix(rng, 1000, "slower_path", currHourTR)
s.AddRows(mrs, defaultPrecisionBits)
s.DebugFlush()
s.updateCurrHourMetricIDs(currHour)
if got, want := s.currHourMetricIDs.Load().m.Len(), 3000; got != want {
t.Errorf("[slower path] unexpected current hour metric ID count: got %d, want %d", got, want)
}
}
t.Run("disablePerDayIndex=false", func(t *testing.T) {
f(t, false)
})
t.Run("disablePerDayIndex=true", func(t *testing.T) {
f(t, true)
})
}
func TestStorageRegisterMetricNamesForVariousDataPatternsConcurrently(t *testing.T) {
testStorageVariousDataPatternsConcurrently(t, true, func(s *Storage, mrs []MetricRow) {
s.RegisterMetricNames(nil, mrs)