mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-02-09 15:27:11 +00:00
lib/storage: merge getTSDBStatusForDate with getTSDBStatusWithFiltersForDate
These functions are non-trivial, while their code has minimal differences. It is better from maintainability PoV to merge these functions into a single function.
This commit is contained in:
parent
ad04c29a35
commit
008ae25b3a
1 changed files with 38 additions and 114 deletions
|
@ -1335,7 +1335,7 @@ func (is *indexSearch) getSeriesCount() (uint64, error) {
|
|||
// GetTSDBStatusWithFiltersForDate returns topN entries for tsdb status for the given tfss, date, accountID and projectID.
|
||||
func (db *indexDB) GetTSDBStatusWithFiltersForDate(accountID, projectID uint32, tfss []*TagFilters, date uint64, topN int, deadline uint64) (*TSDBStatus, error) {
|
||||
is := db.getIndexSearch(accountID, projectID, deadline)
|
||||
status, err := is.getTSDBStatusWithFiltersForDate(tfss, date, topN, deadline)
|
||||
status, err := is.getTSDBStatusWithFiltersForDate(tfss, date, topN)
|
||||
db.putIndexSearch(is)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -1345,7 +1345,7 @@ func (db *indexDB) GetTSDBStatusWithFiltersForDate(accountID, projectID uint32,
|
|||
}
|
||||
ok := db.doExtDB(func(extDB *indexDB) {
|
||||
is := extDB.getIndexSearch(accountID, projectID, deadline)
|
||||
status, err = is.getTSDBStatusWithFiltersForDate(tfss, date, topN, deadline)
|
||||
status, err = is.getTSDBStatusWithFiltersForDate(tfss, date, topN)
|
||||
extDB.putIndexSearch(is)
|
||||
})
|
||||
if ok && err != nil {
|
||||
|
@ -1355,21 +1355,24 @@ func (db *indexDB) GetTSDBStatusWithFiltersForDate(accountID, projectID uint32,
|
|||
}
|
||||
|
||||
// getTSDBStatusWithFiltersForDate returns topN entries for tsdb status for the given tfss and the given date.
|
||||
func (is *indexSearch) getTSDBStatusWithFiltersForDate(tfss []*TagFilters, date uint64, topN int, deadline uint64) (*TSDBStatus, error) {
|
||||
tr := TimeRange{
|
||||
MinTimestamp: int64(date) * msecPerDay,
|
||||
MaxTimestamp: int64(date+1) * msecPerDay,
|
||||
}
|
||||
metricIDs, err := is.searchMetricIDsInternal(tfss, tr, 2e9)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if metricIDs.Len() == 0 {
|
||||
// Nothing found.
|
||||
return &TSDBStatus{}, nil
|
||||
func (is *indexSearch) getTSDBStatusWithFiltersForDate(tfss []*TagFilters, date uint64, topN int) (*TSDBStatus, error) {
|
||||
var filter *uint64set.Set
|
||||
if len(tfss) > 0 {
|
||||
tr := TimeRange{
|
||||
MinTimestamp: int64(date) * msecPerDay,
|
||||
MaxTimestamp: int64(date+1) * msecPerDay,
|
||||
}
|
||||
metricIDs, err := is.searchMetricIDsInternal(tfss, tr, 2e9)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if metricIDs.Len() == 0 {
|
||||
// Nothing found.
|
||||
return &TSDBStatus{}, nil
|
||||
}
|
||||
filter = metricIDs
|
||||
}
|
||||
|
||||
// The code below must be in sync with getTSDBStatusForDate
|
||||
ts := &is.ts
|
||||
kb := &is.kb
|
||||
mp := &is.mp
|
||||
|
@ -1396,19 +1399,21 @@ func (is *indexSearch) getTSDBStatusWithFiltersForDate(tfss []*TagFilters, date
|
|||
if !bytes.HasPrefix(item, prefix) {
|
||||
break
|
||||
}
|
||||
if err := mp.Init(item, nsPrefixDateTagToMetricIDs); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
mp.ParseMetricIDs()
|
||||
matchingSeriesCount := 0
|
||||
for _, metricID := range mp.MetricIDs {
|
||||
if metricIDs.Has(metricID) {
|
||||
matchingSeriesCount++
|
||||
if filter != nil {
|
||||
if err := mp.Init(item, nsPrefixDateTagToMetricIDs); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
mp.ParseMetricIDs()
|
||||
for _, metricID := range mp.MetricIDs {
|
||||
if filter.Has(metricID) {
|
||||
matchingSeriesCount++
|
||||
}
|
||||
}
|
||||
if matchingSeriesCount == 0 {
|
||||
// Skip rows without matching metricIDs.
|
||||
continue
|
||||
}
|
||||
}
|
||||
if matchingSeriesCount == 0 {
|
||||
// Skip rows without matching metricIDs.
|
||||
continue
|
||||
}
|
||||
tail := item[len(prefix):]
|
||||
var err error
|
||||
|
@ -1442,8 +1447,11 @@ func (is *indexSearch) getTSDBStatusWithFiltersForDate(tfss []*TagFilters, date
|
|||
labelValueCountByLabelName++
|
||||
labelNameValue = append(labelNameValue[:0], tmp...)
|
||||
}
|
||||
if err := mp.InitOnlyTail(item, tail); err != nil {
|
||||
return nil, err
|
||||
if filter == nil {
|
||||
if err := mp.InitOnlyTail(item, tail); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
matchingSeriesCount = mp.MetricIDsLen()
|
||||
}
|
||||
// Take into account deleted timeseries too.
|
||||
// It is OK if series can be counted multiple times in rare cases -
|
||||
|
@ -1469,7 +1477,7 @@ func (is *indexSearch) getTSDBStatusWithFiltersForDate(tfss []*TagFilters, date
|
|||
// GetTSDBStatusForDate returns topN entries for tsdb status for the given date, accountID and projectID.
|
||||
func (db *indexDB) GetTSDBStatusForDate(accountID, projectID uint32, date uint64, topN int, deadline uint64) (*TSDBStatus, error) {
|
||||
is := db.getIndexSearch(accountID, projectID, deadline)
|
||||
status, err := is.getTSDBStatusForDate(date, topN)
|
||||
status, err := is.getTSDBStatusWithFiltersForDate(nil, date, topN)
|
||||
db.putIndexSearch(is)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -1482,7 +1490,7 @@ func (db *indexDB) GetTSDBStatusForDate(accountID, projectID uint32, date uint64
|
|||
// The entries weren't found in the db. Try searching them in extDB.
|
||||
ok := db.doExtDB(func(extDB *indexDB) {
|
||||
is := extDB.getIndexSearch(accountID, projectID, deadline)
|
||||
status, err = is.getTSDBStatusForDate(date, topN)
|
||||
status, err = is.getTSDBStatusWithFiltersForDate(nil, date, topN)
|
||||
extDB.putIndexSearch(is)
|
||||
})
|
||||
if ok && err != nil {
|
||||
|
@ -1491,90 +1499,6 @@ func (db *indexDB) GetTSDBStatusForDate(accountID, projectID uint32, date uint64
|
|||
return status, nil
|
||||
}
|
||||
|
||||
func (is *indexSearch) getTSDBStatusForDate(date uint64, topN int) (*TSDBStatus, error) {
|
||||
// The code below must be in sync with getTSDBStatusWithFiltersForDate
|
||||
ts := &is.ts
|
||||
kb := &is.kb
|
||||
mp := &is.mp
|
||||
thLabelValueCountByLabelName := newTopHeap(topN)
|
||||
thSeriesCountByLabelValuePair := newTopHeap(topN)
|
||||
thSeriesCountByMetricName := newTopHeap(topN)
|
||||
var tmp, labelName, labelNameValue []byte
|
||||
var labelValueCountByLabelName, seriesCountByLabelValuePair uint64
|
||||
nameEqualBytes := []byte("__name__=")
|
||||
|
||||
loopsPaceLimiter := 0
|
||||
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
|
||||
kb.B = encoding.MarshalUint64(kb.B, date)
|
||||
prefix := kb.B
|
||||
ts.Seek(prefix)
|
||||
for ts.NextItem() {
|
||||
if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 {
|
||||
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
loopsPaceLimiter++
|
||||
item := ts.Item
|
||||
if !bytes.HasPrefix(item, prefix) {
|
||||
break
|
||||
}
|
||||
tail := item[len(prefix):]
|
||||
var err error
|
||||
tail, tmp, err = unmarshalTagValue(tmp[:0], tail)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot unmarshal tag key from line %q: %w", item, err)
|
||||
}
|
||||
if isArtificialTagKey(tmp) {
|
||||
// Skip artificially created tag keys.
|
||||
continue
|
||||
}
|
||||
if len(tmp) == 0 {
|
||||
tmp = append(tmp, "__name__"...)
|
||||
}
|
||||
if !bytes.Equal(tmp, labelName) {
|
||||
thLabelValueCountByLabelName.pushIfNonEmpty(labelName, labelValueCountByLabelName)
|
||||
labelValueCountByLabelName = 0
|
||||
labelName = append(labelName[:0], tmp...)
|
||||
}
|
||||
tmp = append(tmp, '=')
|
||||
tail, tmp, err = unmarshalTagValue(tmp, tail)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot unmarshal tag value from line %q: %w", item, err)
|
||||
}
|
||||
if !bytes.Equal(tmp, labelNameValue) {
|
||||
thSeriesCountByLabelValuePair.pushIfNonEmpty(labelNameValue, seriesCountByLabelValuePair)
|
||||
if bytes.HasPrefix(labelNameValue, nameEqualBytes) {
|
||||
thSeriesCountByMetricName.pushIfNonEmpty(labelNameValue[len(nameEqualBytes):], seriesCountByLabelValuePair)
|
||||
}
|
||||
seriesCountByLabelValuePair = 0
|
||||
labelValueCountByLabelName++
|
||||
labelNameValue = append(labelNameValue[:0], tmp...)
|
||||
}
|
||||
if err := mp.InitOnlyTail(item, tail); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Take into account deleted timeseries too.
|
||||
// It is OK if series can be counted multiple times in rare cases -
|
||||
// the returned number is an estimation.
|
||||
seriesCountByLabelValuePair += uint64(mp.MetricIDsLen())
|
||||
}
|
||||
if err := ts.Error(); err != nil {
|
||||
return nil, fmt.Errorf("error when counting time series by metric names: %w", err)
|
||||
}
|
||||
thLabelValueCountByLabelName.pushIfNonEmpty(labelName, labelValueCountByLabelName)
|
||||
thSeriesCountByLabelValuePair.pushIfNonEmpty(labelNameValue, seriesCountByLabelValuePair)
|
||||
if bytes.HasPrefix(labelNameValue, nameEqualBytes) {
|
||||
thSeriesCountByMetricName.pushIfNonEmpty(labelNameValue[len(nameEqualBytes):], seriesCountByLabelValuePair)
|
||||
}
|
||||
status := &TSDBStatus{
|
||||
SeriesCountByMetricName: thSeriesCountByMetricName.getSortedResult(),
|
||||
LabelValueCountByLabelName: thLabelValueCountByLabelName.getSortedResult(),
|
||||
SeriesCountByLabelValuePair: thSeriesCountByLabelValuePair.getSortedResult(),
|
||||
}
|
||||
return status, nil
|
||||
}
|
||||
|
||||
// TSDBStatus contains TSDB status data for /api/v1/status/tsdb.
|
||||
//
|
||||
// See https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
|
||||
|
|
Loading…
Reference in a new issue