mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-04-30 16:10:59 +00:00
lib/storage: fix active timeseries collection when per-day index is disabled (#8485)
Fix metric that shows number of active time series when per-day index is disabled. Previously, once per-day index was disabled, the active time series metric would stop being populated and the `Active time series` chart would show 0. See: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8411.
This commit is contained in:
parent
5e231fe07b
commit
ee66d601b4
4 changed files with 209 additions and 13 deletions
|
@ -236,3 +236,83 @@ func testSearchWithDisabledPerDayIndex(tc *at.TestCase, start startSUTFunc) {
|
|||
},
|
||||
})
|
||||
}
|
||||
|
||||
func TestSingleActiveTimeseriesMetric_enabledPerDayIndex(t *testing.T) {
|
||||
testSingleActiveTimeseriesMetric(t, false)
|
||||
}
|
||||
|
||||
func TestSingleActiveTimeseriesMetric_disabledPerDayIndex(t *testing.T) {
|
||||
testSingleActiveTimeseriesMetric(t, true)
|
||||
}
|
||||
|
||||
func testSingleActiveTimeseriesMetric(t *testing.T, disablePerDayIndex bool) {
|
||||
tc := at.NewTestCase(t)
|
||||
defer tc.Stop()
|
||||
|
||||
vmsingle := tc.MustStartVmsingle("vmsingle", []string{
|
||||
fmt.Sprintf("-storageDataPath=%s/vmsingle-%t", tc.Dir(), disablePerDayIndex),
|
||||
fmt.Sprintf("-disablePerDayIndex=%t", disablePerDayIndex),
|
||||
})
|
||||
|
||||
testActiveTimeseriesMetric(tc, vmsingle, func() int {
|
||||
return vmsingle.GetIntMetric(t, `vm_cache_entries{type="storage/hour_metric_ids"}`)
|
||||
})
|
||||
}
|
||||
|
||||
func TestClusterActiveTimeseriesMetric_enabledPerDayIndex(t *testing.T) {
|
||||
t.Skip("TODO(@rtm0): Enable once the fix is ported to cluster")
|
||||
|
||||
testClusterActiveTimeseriesMetric(t, false)
|
||||
}
|
||||
|
||||
func TestClusterActiveTimeseriesMetric_disabledPerDayIndex(t *testing.T) {
|
||||
t.Skip("TODO(@rtm0): Enable once the fix is ported to cluster")
|
||||
|
||||
testClusterActiveTimeseriesMetric(t, true)
|
||||
}
|
||||
|
||||
func testClusterActiveTimeseriesMetric(t *testing.T, disablePerDayIndex bool) {
|
||||
tc := at.NewTestCase(t)
|
||||
defer tc.Stop()
|
||||
|
||||
vmstorage1 := tc.MustStartVmstorage("vmstorage1", []string{
|
||||
fmt.Sprintf("-storageDataPath=%s/vmstorage1-%t", tc.Dir(), disablePerDayIndex),
|
||||
fmt.Sprintf("-disablePerDayIndex=%t", disablePerDayIndex),
|
||||
})
|
||||
vmstorage2 := tc.MustStartVmstorage("vmstorage2", []string{
|
||||
fmt.Sprintf("-storageDataPath=%s/vmstorage2-%t", tc.Dir(), disablePerDayIndex),
|
||||
fmt.Sprintf("-disablePerDayIndex=%t", disablePerDayIndex),
|
||||
})
|
||||
vminsert := tc.MustStartVminsert("vminsert", []string{
|
||||
"-storageNode=" + vmstorage1.VminsertAddr() + "," + vmstorage2.VminsertAddr(),
|
||||
})
|
||||
|
||||
vmcluster := &at.Vmcluster{
|
||||
Vmstorages: []*at.Vmstorage{vmstorage1, vmstorage2},
|
||||
Vminsert: vminsert,
|
||||
}
|
||||
|
||||
testActiveTimeseriesMetric(tc, vmcluster, func() int {
|
||||
cnt1 := vmstorage1.GetIntMetric(t, `vm_cache_entries{type="storage/hour_metric_ids"}`)
|
||||
cnt2 := vmstorage2.GetIntMetric(t, `vm_cache_entries{type="storage/hour_metric_ids"}`)
|
||||
return cnt1 + cnt2
|
||||
})
|
||||
}
|
||||
|
||||
func testActiveTimeseriesMetric(tc *at.TestCase, sut at.PrometheusWriteQuerier, getActiveTimeseries func() int) {
|
||||
t := tc.T()
|
||||
const numSamples = 1000
|
||||
samples := make([]string, numSamples)
|
||||
for i := range numSamples {
|
||||
samples[i] = fmt.Sprintf("metric_%03d %d", i, i)
|
||||
}
|
||||
sut.PrometheusAPIV1ImportPrometheus(t, samples, at.QueryOpts{})
|
||||
sut.ForceFlush(t)
|
||||
tc.Assert(&at.AssertOptions{
|
||||
Msg: `unexpected vm_cache_entries{type="storage/hour_metric_ids"} metric value`,
|
||||
Got: func() any {
|
||||
return getActiveTimeseries()
|
||||
},
|
||||
Want: numSamples,
|
||||
})
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
|
|||
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): expose `vmalert_alerts_send_duration_seconds` metric to measure the time taken to send alerts to the specified `-notifier.url`. Thanks to @eyazici90 for [the pull reuqest](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8468).
|
||||
|
||||
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation): fix panic on `rate` output. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8469).
|
||||
* BUGFIX: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and [vmstorage](https://docs.victoriametrics.com/victoriametrics/): fix metric that shows number of active time series when per-day index is disabled. Previously, once per-day index was disabled, the active time series metric would stop being populated and the `Active time series` chart would show 0. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8411) for details.
|
||||
|
||||
## [v1.113.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.113.0)
|
||||
|
||||
|
|
|
@ -379,6 +379,9 @@ func (s *Storage) DebugFlush() {
|
|||
idb.doExtDB(func(extDB *indexDB) {
|
||||
extDB.tb.DebugFlush()
|
||||
})
|
||||
|
||||
hour := fasttime.UnixHour()
|
||||
s.updateCurrHourMetricIDs(hour)
|
||||
}
|
||||
|
||||
// CreateSnapshot creates snapshot for s and returns the snapshot name.
|
||||
|
@ -1949,6 +1952,10 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
|
|||
is := idb.getIndexSearch(noDeadline)
|
||||
defer idb.putIndexSearch(is)
|
||||
|
||||
hmPrev := s.prevHourMetricIDs.Load()
|
||||
hmCurr := s.currHourMetricIDs.Load()
|
||||
var pendingHourEntries []uint64
|
||||
|
||||
mn := GetMetricName()
|
||||
defer PutMetricName(mn)
|
||||
|
||||
|
@ -2008,6 +2015,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
|
|||
r.Value = mr.Value
|
||||
r.PrecisionBits = precisionBits
|
||||
date := s.date(r.Timestamp)
|
||||
hour := uint64(r.Timestamp) / msecPerHour
|
||||
|
||||
// Search for TSID for the given mr.MetricNameRaw and store it at r.TSID.
|
||||
if string(mr.MetricNameRaw) == string(prevMetricNameRaw) {
|
||||
|
@ -2016,6 +2024,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
|
|||
r.TSID = prevTSID
|
||||
continue
|
||||
}
|
||||
|
||||
if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) {
|
||||
// Fast path - the TSID for the given mr.MetricNameRaw has been found in cache and isn't deleted.
|
||||
// There is no need in checking whether r.TSID.MetricID is deleted, since tsidCache doesn't
|
||||
|
@ -2049,6 +2058,9 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
|
|||
seriesRepopulated++
|
||||
slowInsertsCount++
|
||||
}
|
||||
if hour == hmCurr.hour && !hmCurr.m.Has(genTSID.TSID.MetricID) {
|
||||
pendingHourEntries = append(pendingHourEntries, genTSID.TSID.MetricID)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -2093,6 +2105,11 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
|
|||
r.TSID = genTSID.TSID
|
||||
prevTSID = genTSID.TSID
|
||||
prevMetricNameRaw = mr.MetricNameRaw
|
||||
|
||||
if hour == hmCurr.hour && !hmCurr.m.Has(genTSID.TSID.MetricID) {
|
||||
pendingHourEntries = append(pendingHourEntries, genTSID.TSID.MetricID)
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -2114,6 +2131,10 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
|
|||
prevTSID = r.TSID
|
||||
prevMetricNameRaw = mr.MetricNameRaw
|
||||
|
||||
if hour == hmCurr.hour && !hmCurr.m.Has(genTSID.TSID.MetricID) {
|
||||
pendingHourEntries = append(pendingHourEntries, genTSID.TSID.MetricID)
|
||||
}
|
||||
|
||||
if logNewSeries {
|
||||
logger.Infof("new series created: %s", mn.String())
|
||||
}
|
||||
|
@ -2126,13 +2147,19 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
|
|||
dstMrs = dstMrs[:j]
|
||||
rows = rows[:j]
|
||||
|
||||
if len(pendingHourEntries) > 0 {
|
||||
s.pendingHourEntriesLock.Lock()
|
||||
s.pendingHourEntries.AddMulti(pendingHourEntries)
|
||||
s.pendingHourEntriesLock.Unlock()
|
||||
}
|
||||
|
||||
if err := s.prefillNextIndexDB(rows, dstMrs); err != nil {
|
||||
if firstWarn == nil {
|
||||
firstWarn = fmt.Errorf("cannot prefill next indexdb: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := s.updatePerDateData(rows, dstMrs); err != nil {
|
||||
if err := s.updatePerDateData(rows, dstMrs, hmPrev, hmCurr); err != nil {
|
||||
if firstWarn == nil {
|
||||
firstWarn = fmt.Errorf("cannot not update per-day index: %w", err)
|
||||
}
|
||||
|
@ -2281,7 +2308,7 @@ func (s *Storage) prefillNextIndexDB(rows []rawRow, mrs []*MetricRow) error {
|
|||
return firstError
|
||||
}
|
||||
|
||||
func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
|
||||
func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow, hmPrev, hmCurr *hourMetricIDs) error {
|
||||
if s.disablePerDayIndex {
|
||||
return nil
|
||||
}
|
||||
|
@ -2299,8 +2326,6 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
|
|||
idb := s.idb()
|
||||
generation := idb.generation
|
||||
|
||||
hm := s.currHourMetricIDs.Load()
|
||||
hmPrev := s.prevHourMetricIDs.Load()
|
||||
hmPrevDate := hmPrev.hour / 24
|
||||
nextDayMetricIDs := &s.nextDayMetricIDs.Load().v
|
||||
ts := fasttime.UnixTimestamp()
|
||||
|
@ -2314,7 +2339,6 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
|
|||
}
|
||||
var pendingDateMetricIDs []pendingDateMetricID
|
||||
var pendingNextDayMetricIDs []uint64
|
||||
var pendingHourEntries []uint64
|
||||
for i := range rows {
|
||||
r := &rows[i]
|
||||
if r.Timestamp != prevTimestamp {
|
||||
|
@ -2329,9 +2353,9 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
|
|||
}
|
||||
prevDate = date
|
||||
prevMetricID = metricID
|
||||
if hour == hm.hour {
|
||||
if hour == hmCurr.hour {
|
||||
// The row belongs to the current hour. Check for the current hour cache.
|
||||
if hm.m.Has(metricID) {
|
||||
if hmCurr.m.Has(metricID) {
|
||||
// Fast path: the metricID is in the current hour cache.
|
||||
// This means the metricID has been already added to per-day inverted index.
|
||||
|
||||
|
@ -2352,7 +2376,6 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
|
|||
}
|
||||
continue
|
||||
}
|
||||
pendingHourEntries = append(pendingHourEntries, metricID)
|
||||
if date == hmPrevDate && hmPrev.m.Has(metricID) {
|
||||
// The metricID is already registered for the current day on the previous hour.
|
||||
continue
|
||||
|
@ -2375,11 +2398,6 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
|
|||
s.pendingNextDayMetricIDs.AddMulti(pendingNextDayMetricIDs)
|
||||
s.pendingNextDayMetricIDsLock.Unlock()
|
||||
}
|
||||
if len(pendingHourEntries) > 0 {
|
||||
s.pendingHourEntriesLock.Lock()
|
||||
s.pendingHourEntries.AddMulti(pendingHourEntries)
|
||||
s.pendingHourEntriesLock.Unlock()
|
||||
}
|
||||
if len(pendingDateMetricIDs) == 0 {
|
||||
// Fast path - there are no new (date, metricID) entries.
|
||||
return nil
|
||||
|
|
|
@ -2456,6 +2456,103 @@ func TestStorageAddRows_SamplesWithZeroDate(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestStorageAddRows_currHourMetricIDs(t *testing.T) {
|
||||
defer testRemoveAll(t)
|
||||
|
||||
f := func(t *testing.T, disablePerDayIndex bool) {
|
||||
t.Helper()
|
||||
|
||||
s := MustOpenStorage(t.Name(), OpenOptions{
|
||||
DisablePerDayIndex: disablePerDayIndex,
|
||||
})
|
||||
defer s.MustClose()
|
||||
|
||||
now := time.Now().UTC()
|
||||
currHourTR := TimeRange{
|
||||
MinTimestamp: time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.UTC).UnixMilli(),
|
||||
MaxTimestamp: time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 59, 59, 999_999_999, time.UTC).UnixMilli(),
|
||||
}
|
||||
currHour := uint64(currHourTR.MinTimestamp / 1000 / 3600)
|
||||
prevHourTR := TimeRange{
|
||||
MinTimestamp: currHourTR.MinTimestamp - 3600*1000,
|
||||
MaxTimestamp: currHourTR.MaxTimestamp - 3600*1000,
|
||||
}
|
||||
rng := rand.New(rand.NewSource(1))
|
||||
|
||||
// Test current hour metricIDs population when data ingestion takes the
|
||||
// slow path. The database is empty, therefore the index and the
|
||||
// tsidCache contain no metricIDs, therefore the data ingestion will
|
||||
// take slow path.
|
||||
|
||||
mrs := testGenerateMetricRowsWithPrefix(rng, 1000, "slow_path", currHourTR)
|
||||
s.AddRows(mrs, defaultPrecisionBits)
|
||||
s.DebugFlush()
|
||||
s.updateCurrHourMetricIDs(currHour)
|
||||
if got, want := s.currHourMetricIDs.Load().m.Len(), 1000; got != want {
|
||||
t.Errorf("[slow path] unexpected current hour metric ID count: got %d, want %d", got, want)
|
||||
}
|
||||
|
||||
// Test current hour metricIDs population when data ingestion takes the
|
||||
// fast path (when the metricIDs are found in the tsidCache)
|
||||
|
||||
// First insert samples to populate the tsidCache. The samples belong to
|
||||
// the previous hour, therefore the metricIDs won't be added to
|
||||
// currHourMetricIDs.
|
||||
mrs = testGenerateMetricRowsWithPrefix(rng, 1000, "fast_path", prevHourTR)
|
||||
s.AddRows(mrs, defaultPrecisionBits)
|
||||
s.DebugFlush()
|
||||
s.updateCurrHourMetricIDs(currHour)
|
||||
if got, want := s.currHourMetricIDs.Load().m.Len(), 1000; got != want {
|
||||
t.Errorf("[fast path] unexpected current hour metric ID count after ingesting samples for previous hour: got %d, want %d", got, want)
|
||||
}
|
||||
|
||||
// Now ingest the same metrics. This time the metricIDs will be found in
|
||||
// tsidCache so the ingestion will take the fast path.
|
||||
mrs = testGenerateMetricRowsWithPrefix(rng, 1000, "fast_path", currHourTR)
|
||||
s.AddRows(mrs, defaultPrecisionBits)
|
||||
s.DebugFlush()
|
||||
s.updateCurrHourMetricIDs(currHour)
|
||||
if got, want := s.currHourMetricIDs.Load().m.Len(), 2000; got != want {
|
||||
t.Errorf("[fast path] unexpected current hour metric ID count: got %d, want %d", got, want)
|
||||
}
|
||||
|
||||
// Test current hour metricIDs population when data ingestion takes the
|
||||
// slower path (when the metricIDs are not found in the tsidCache but
|
||||
// found in the index)
|
||||
|
||||
// First insert samples to populate the index. The samples belong to
|
||||
// the previous hour, therefore the metricIDs won't be added to
|
||||
// currHourMetricIDs.
|
||||
mrs = testGenerateMetricRowsWithPrefix(rng, 1000, "slower_path", prevHourTR)
|
||||
s.AddRows(mrs, defaultPrecisionBits)
|
||||
s.DebugFlush()
|
||||
s.updateCurrHourMetricIDs(currHour)
|
||||
if got, want := s.currHourMetricIDs.Load().m.Len(), 2000; got != want {
|
||||
t.Errorf("[slower path] unexpected current hour metric ID count after ingesting samples for previous hour: got %d, want %d", got, want)
|
||||
}
|
||||
// Inserted samples were also added to the tsidCache. Drop it to
|
||||
// enforce the fallback to index search.
|
||||
s.resetAndSaveTSIDCache()
|
||||
|
||||
// Now ingest the same metrics. This time the metricIDs will be searched
|
||||
// and found in index so the ingestion will take the slower path.
|
||||
mrs = testGenerateMetricRowsWithPrefix(rng, 1000, "slower_path", currHourTR)
|
||||
s.AddRows(mrs, defaultPrecisionBits)
|
||||
s.DebugFlush()
|
||||
s.updateCurrHourMetricIDs(currHour)
|
||||
if got, want := s.currHourMetricIDs.Load().m.Len(), 3000; got != want {
|
||||
t.Errorf("[slower path] unexpected current hour metric ID count: got %d, want %d", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
t.Run("disablePerDayIndex=false", func(t *testing.T) {
|
||||
f(t, false)
|
||||
})
|
||||
t.Run("disablePerDayIndex=true", func(t *testing.T) {
|
||||
f(t, true)
|
||||
})
|
||||
}
|
||||
|
||||
func TestStorageRegisterMetricNamesForVariousDataPatternsConcurrently(t *testing.T) {
|
||||
testStorageVariousDataPatternsConcurrently(t, true, func(s *Storage, mrs []MetricRow) {
|
||||
s.RegisterMetricNames(nil, mrs)
|
||||
|
|
Loading…
Reference in a new issue