mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
Add test for invalid caching of tsids (#232)
* Add test for invalid caching of tsids * Clean up error handling
This commit is contained in:
parent
ce8cc76a42
commit
955a592106
1 changed files with 169 additions and 0 deletions
|
@ -13,6 +13,7 @@ import (
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1397,6 +1398,174 @@ func TestMatchTagFilters(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSearchTSIDWithTimeRange(t *testing.T) {
|
||||||
|
metricIDCache := workingsetcache.New(1234, time.Hour)
|
||||||
|
metricNameCache := workingsetcache.New(1234, time.Hour)
|
||||||
|
defer metricIDCache.Stop()
|
||||||
|
defer metricNameCache.Stop()
|
||||||
|
|
||||||
|
currMetricIDs := &hourMetricIDs{
|
||||||
|
isFull: true,
|
||||||
|
m: &uint64set.Set{},
|
||||||
|
}
|
||||||
|
|
||||||
|
var hmCurr atomic.Value
|
||||||
|
hmCurr.Store(currMetricIDs)
|
||||||
|
|
||||||
|
prevMetricIDs := &hourMetricIDs{
|
||||||
|
isFull: true,
|
||||||
|
m: &uint64set.Set{},
|
||||||
|
}
|
||||||
|
var hmPrev atomic.Value
|
||||||
|
hmPrev.Store(prevMetricIDs)
|
||||||
|
|
||||||
|
dbName := "test-index-db-ts-range"
|
||||||
|
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, &hmCurr, &hmPrev)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("cannot open indexDB: %s", err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
db.MustClose()
|
||||||
|
if err := os.RemoveAll(dbName); err != nil {
|
||||||
|
t.Fatalf("cannot remove indexDB: %s", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
is := db.getIndexSearch()
|
||||||
|
defer db.putIndexSearch(is)
|
||||||
|
|
||||||
|
// Create a bunch of per-day time series
|
||||||
|
const accountID = 12345
|
||||||
|
const projectID = 85453
|
||||||
|
const days = 5
|
||||||
|
const metricsPerDay = 1000
|
||||||
|
theDay := time.Date(2019, time.October, 15, 5, 1, 0, 0, time.UTC)
|
||||||
|
now := uint64(timestampFromTime(theDay))
|
||||||
|
currMetricIDs.hour = now / msecPerHour
|
||||||
|
prevMetricIDs.hour = (now - msecPerHour) / msecPerHour
|
||||||
|
baseDate := now / msecPerDay
|
||||||
|
var metricNameBuf []byte
|
||||||
|
for day := 0; day < days; day++ {
|
||||||
|
var tsids []TSID
|
||||||
|
for metric := 0; metric < metricsPerDay; metric++ {
|
||||||
|
var mn MetricName
|
||||||
|
mn.AccountID = accountID
|
||||||
|
mn.ProjectID = projectID
|
||||||
|
mn.MetricGroup = []byte("testMetric")
|
||||||
|
mn.AddTag(
|
||||||
|
"constant",
|
||||||
|
"const",
|
||||||
|
)
|
||||||
|
mn.AddTag(
|
||||||
|
"day",
|
||||||
|
fmt.Sprintf("%v", day),
|
||||||
|
)
|
||||||
|
mn.AddTag(
|
||||||
|
"uniqueid",
|
||||||
|
fmt.Sprintf("%v", metric),
|
||||||
|
)
|
||||||
|
mn.sortTags()
|
||||||
|
|
||||||
|
metricNameBuf = mn.Marshal(metricNameBuf[:0])
|
||||||
|
var tsid TSID
|
||||||
|
if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf); err != nil {
|
||||||
|
t.Fatalf("unexpected error when creating tsid for mn:\n%s: %s", &mn, err)
|
||||||
|
}
|
||||||
|
if tsid.AccountID != accountID {
|
||||||
|
t.Fatalf("unexpected accountID; got %d; want %d", tsid.AccountID, accountID)
|
||||||
|
}
|
||||||
|
if tsid.ProjectID != projectID {
|
||||||
|
t.Fatalf("unexpected accountID; got %d; want %d", tsid.ProjectID, projectID)
|
||||||
|
}
|
||||||
|
tsids = append(tsids, tsid)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the metrics to the per-day stores
|
||||||
|
date := baseDate - uint64(day*msecPerDay)
|
||||||
|
for i := range tsids {
|
||||||
|
tsid := &tsids[i]
|
||||||
|
if err := db.storeDateMetricID(date, tsid.MetricID, tsid.AccountID, tsid.ProjectID); err != nil {
|
||||||
|
t.Fatalf("error in storeDateMetricID(%d, %d): %s", date, tsid.MetricID, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the the hour metrics caches
|
||||||
|
if day == 0 {
|
||||||
|
for i := 0; i < 256; i++ {
|
||||||
|
prevMetricIDs.m.Add(tsids[i].MetricID)
|
||||||
|
currMetricIDs.m.Add(tsids[i].MetricID)
|
||||||
|
}
|
||||||
|
k := accountProjectKey{
|
||||||
|
AccountID: accountID,
|
||||||
|
ProjectID: projectID,
|
||||||
|
}
|
||||||
|
prevMetricIDs.byTenant = map[accountProjectKey]*uint64set.Set{k: prevMetricIDs.m}
|
||||||
|
prevMetricIDs.iidx = newInmemoryInvertedIndex()
|
||||||
|
prevMetricIDs.iidx.MustUpdate(db, prevMetricIDs.byTenant)
|
||||||
|
if len(prevMetricIDs.iidx.pendingEntries) > 0 {
|
||||||
|
t.Fatalf("couldn't add %d metricIDs to inmemory inverted index for the previous hour", len(prevMetricIDs.iidx.pendingEntries))
|
||||||
|
}
|
||||||
|
currMetricIDs.byTenant = map[accountProjectKey]*uint64set.Set{k: currMetricIDs.m}
|
||||||
|
currMetricIDs.iidx = newInmemoryInvertedIndex()
|
||||||
|
currMetricIDs.iidx.MustUpdate(db, currMetricIDs.byTenant)
|
||||||
|
if len(currMetricIDs.iidx.pendingEntries) > 0 {
|
||||||
|
t.Fatalf("couldn't add %d metricIDs to inmemory inverted index for the current hour", len(currMetricIDs.iidx.pendingEntries))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Flush index to disk, so it becomes visible for search
|
||||||
|
db.tb.DebugFlush()
|
||||||
|
|
||||||
|
// Create a filter that will match series that occur across multiple days
|
||||||
|
tfs := NewTagFilters(accountID, projectID)
|
||||||
|
if err := tfs.Add([]byte("constant"), []byte("const"), false, false); err != nil {
|
||||||
|
t.Fatalf("cannot add filter: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Perform a search that can be fulfilled out of the hour metrics cache.
|
||||||
|
// This should return the metrics in the hourly cache
|
||||||
|
tr := TimeRange{
|
||||||
|
MinTimestamp: int64(now - msecPerHour + 1),
|
||||||
|
MaxTimestamp: int64(now),
|
||||||
|
}
|
||||||
|
matchedTSIDs, err := db.searchTSIDs([]*TagFilters{tfs}, tr, 10000)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error searching tsids: %v", err)
|
||||||
|
}
|
||||||
|
if len(matchedTSIDs) != 256 {
|
||||||
|
t.Fatalf("Expected %d time series for current hour, got %d", 256, len(matchedTSIDs))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Perform a search within a day that falls out out of the hour metrics cache.
|
||||||
|
// This should return the metrics for the day
|
||||||
|
tr = TimeRange{
|
||||||
|
MinTimestamp: int64(now - 2*msecPerHour - 1),
|
||||||
|
MaxTimestamp: int64(now),
|
||||||
|
}
|
||||||
|
matchedTSIDs, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 10000)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error searching tsids: %v", err)
|
||||||
|
}
|
||||||
|
if len(matchedTSIDs) != metricsPerDay {
|
||||||
|
t.Fatalf("Expected %d time series for current day, got %d", metricsPerDay, len(matchedTSIDs))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Perform a search across all the days, should match all metrics
|
||||||
|
tr = TimeRange{
|
||||||
|
MinTimestamp: int64(now - msecPerDay*days),
|
||||||
|
MaxTimestamp: int64(now),
|
||||||
|
}
|
||||||
|
|
||||||
|
matchedTSIDs, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 10000)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error searching tsids: %v", err)
|
||||||
|
}
|
||||||
|
if len(matchedTSIDs) != metricsPerDay*days {
|
||||||
|
t.Fatalf("Expected %d time series for all days, got %d", metricsPerDay*days, len(matchedTSIDs))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func toTFPointers(tfs []tagFilter) []*tagFilter {
|
func toTFPointers(tfs []tagFilter) []*tagFilter {
|
||||||
tfps := make([]*tagFilter, len(tfs))
|
tfps := make([]*tagFilter, len(tfs))
|
||||||
for i := range tfs {
|
for i := range tfs {
|
||||||
|
|
Loading…
Reference in a new issue