lib/storage: merge getTSDBStatusForDate with getTSDBStatusWithFiltersForDate

These functions are non-trivial, while their code has minimal differences.
It is better from maintainability PoV to merge these functions into a single function.
This commit is contained in:
Aliaksandr Valialkin 2021-05-12 17:56:50 +03:00
parent beddc0c0d5
commit a22a17dc66

View file

@ -1312,7 +1312,7 @@ func (is *indexSearch) getSeriesCount() (uint64, error) {
// GetTSDBStatusWithFiltersForDate returns topN entries for tsdb status for the given tfss and the given date.
func (db *indexDB) GetTSDBStatusWithFiltersForDate(tfss []*TagFilters, date uint64, topN int, deadline uint64) (*TSDBStatus, error) {
is := db.getIndexSearch(deadline)
status, err := is.getTSDBStatusWithFiltersForDate(tfss, date, topN, deadline)
status, err := is.getTSDBStatusWithFiltersForDate(tfss, date, topN)
db.putIndexSearch(is)
if err != nil {
return nil, err
@ -1322,7 +1322,7 @@ func (db *indexDB) GetTSDBStatusWithFiltersForDate(tfss []*TagFilters, date uint
}
ok := db.doExtDB(func(extDB *indexDB) {
is := extDB.getIndexSearch(deadline)
status, err = is.getTSDBStatusWithFiltersForDate(tfss, date, topN, deadline)
status, err = is.getTSDBStatusWithFiltersForDate(tfss, date, topN)
extDB.putIndexSearch(is)
})
if ok && err != nil {
@ -1332,21 +1332,24 @@ func (db *indexDB) GetTSDBStatusWithFiltersForDate(tfss []*TagFilters, date uint
}
// getTSDBStatusWithFiltersForDate returns topN entries for tsdb status for the given tfss and the given date.
func (is *indexSearch) getTSDBStatusWithFiltersForDate(tfss []*TagFilters, date uint64, topN int, deadline uint64) (*TSDBStatus, error) {
tr := TimeRange{
MinTimestamp: int64(date) * msecPerDay,
MaxTimestamp: int64(date+1) * msecPerDay,
}
metricIDs, err := is.searchMetricIDsInternal(tfss, tr, 2e9)
if err != nil {
return nil, err
}
if metricIDs.Len() == 0 {
// Nothing found.
return &TSDBStatus{}, nil
func (is *indexSearch) getTSDBStatusWithFiltersForDate(tfss []*TagFilters, date uint64, topN int) (*TSDBStatus, error) {
var filter *uint64set.Set
if len(tfss) > 0 {
tr := TimeRange{
MinTimestamp: int64(date) * msecPerDay,
MaxTimestamp: int64(date+1) * msecPerDay,
}
metricIDs, err := is.searchMetricIDsInternal(tfss, tr, 2e9)
if err != nil {
return nil, err
}
if metricIDs.Len() == 0 {
// Nothing found.
return &TSDBStatus{}, nil
}
filter = metricIDs
}
// The code below must be in sync with getTSDBStatusForDate
ts := &is.ts
kb := &is.kb
mp := &is.mp
@ -1373,19 +1376,21 @@ func (is *indexSearch) getTSDBStatusWithFiltersForDate(tfss []*TagFilters, date
if !bytes.HasPrefix(item, prefix) {
break
}
if err := mp.Init(item, nsPrefixDateTagToMetricIDs); err != nil {
return nil, err
}
mp.ParseMetricIDs()
matchingSeriesCount := 0
for _, metricID := range mp.MetricIDs {
if metricIDs.Has(metricID) {
matchingSeriesCount++
if filter != nil {
if err := mp.Init(item, nsPrefixDateTagToMetricIDs); err != nil {
return nil, err
}
mp.ParseMetricIDs()
for _, metricID := range mp.MetricIDs {
if filter.Has(metricID) {
matchingSeriesCount++
}
}
if matchingSeriesCount == 0 {
// Skip rows without matching metricIDs.
continue
}
}
if matchingSeriesCount == 0 {
// Skip rows without matching metricIDs.
continue
}
tail := item[len(prefix):]
var err error
@ -1419,8 +1424,11 @@ func (is *indexSearch) getTSDBStatusWithFiltersForDate(tfss []*TagFilters, date
labelValueCountByLabelName++
labelNameValue = append(labelNameValue[:0], tmp...)
}
if err := mp.InitOnlyTail(item, tail); err != nil {
return nil, err
if filter == nil {
if err := mp.InitOnlyTail(item, tail); err != nil {
return nil, err
}
matchingSeriesCount = mp.MetricIDsLen()
}
// Take into account deleted timeseries too.
// It is OK if series can be counted multiple times in rare cases -
@ -1446,7 +1454,7 @@ func (is *indexSearch) getTSDBStatusWithFiltersForDate(tfss []*TagFilters, date
// GetTSDBStatusForDate returns topN entries for tsdb status for the given date.
func (db *indexDB) GetTSDBStatusForDate(date uint64, topN int, deadline uint64) (*TSDBStatus, error) {
is := db.getIndexSearch(deadline)
status, err := is.getTSDBStatusForDate(date, topN)
status, err := is.getTSDBStatusWithFiltersForDate(nil, date, topN)
db.putIndexSearch(is)
if err != nil {
return nil, err
@ -1459,7 +1467,7 @@ func (db *indexDB) GetTSDBStatusForDate(date uint64, topN int, deadline uint64)
// The entries weren't found in the db. Try searching them in extDB.
ok := db.doExtDB(func(extDB *indexDB) {
is := extDB.getIndexSearch(deadline)
status, err = is.getTSDBStatusForDate(date, topN)
status, err = is.getTSDBStatusWithFiltersForDate(nil, date, topN)
extDB.putIndexSearch(is)
})
if ok && err != nil {
@ -1468,90 +1476,6 @@ func (db *indexDB) GetTSDBStatusForDate(date uint64, topN int, deadline uint64)
return status, nil
}
func (is *indexSearch) getTSDBStatusForDate(date uint64, topN int) (*TSDBStatus, error) {
// The code below must be in sync with getTSDBStatusWithFiltersForDate
ts := &is.ts
kb := &is.kb
mp := &is.mp
thLabelValueCountByLabelName := newTopHeap(topN)
thSeriesCountByLabelValuePair := newTopHeap(topN)
thSeriesCountByMetricName := newTopHeap(topN)
var tmp, labelName, labelNameValue []byte
var labelValueCountByLabelName, seriesCountByLabelValuePair uint64
nameEqualBytes := []byte("__name__=")
loopsPaceLimiter := 0
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
kb.B = encoding.MarshalUint64(kb.B, date)
prefix := kb.B
ts.Seek(prefix)
for ts.NextItem() {
if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return nil, err
}
}
loopsPaceLimiter++
item := ts.Item
if !bytes.HasPrefix(item, prefix) {
break
}
tail := item[len(prefix):]
var err error
tail, tmp, err = unmarshalTagValue(tmp[:0], tail)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal tag key from line %q: %w", item, err)
}
if isArtificialTagKey(tmp) {
// Skip artificially created tag keys.
continue
}
if len(tmp) == 0 {
tmp = append(tmp, "__name__"...)
}
if !bytes.Equal(tmp, labelName) {
thLabelValueCountByLabelName.pushIfNonEmpty(labelName, labelValueCountByLabelName)
labelValueCountByLabelName = 0
labelName = append(labelName[:0], tmp...)
}
tmp = append(tmp, '=')
tail, tmp, err = unmarshalTagValue(tmp, tail)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal tag value from line %q: %w", item, err)
}
if !bytes.Equal(tmp, labelNameValue) {
thSeriesCountByLabelValuePair.pushIfNonEmpty(labelNameValue, seriesCountByLabelValuePair)
if bytes.HasPrefix(labelNameValue, nameEqualBytes) {
thSeriesCountByMetricName.pushIfNonEmpty(labelNameValue[len(nameEqualBytes):], seriesCountByLabelValuePair)
}
seriesCountByLabelValuePair = 0
labelValueCountByLabelName++
labelNameValue = append(labelNameValue[:0], tmp...)
}
if err := mp.InitOnlyTail(item, tail); err != nil {
return nil, err
}
// Take into account deleted timeseries too.
// It is OK if series can be counted multiple times in rare cases -
// the returned number is an estimation.
seriesCountByLabelValuePair += uint64(mp.MetricIDsLen())
}
if err := ts.Error(); err != nil {
return nil, fmt.Errorf("error when counting time series by metric names: %w", err)
}
thLabelValueCountByLabelName.pushIfNonEmpty(labelName, labelValueCountByLabelName)
thSeriesCountByLabelValuePair.pushIfNonEmpty(labelNameValue, seriesCountByLabelValuePair)
if bytes.HasPrefix(labelNameValue, nameEqualBytes) {
thSeriesCountByMetricName.pushIfNonEmpty(labelNameValue[len(nameEqualBytes):], seriesCountByLabelValuePair)
}
status := &TSDBStatus{
SeriesCountByMetricName: thSeriesCountByMetricName.getSortedResult(),
LabelValueCountByLabelName: thLabelValueCountByLabelName.getSortedResult(),
SeriesCountByLabelValuePair: thSeriesCountByLabelValuePair.getSortedResult(),
}
return status, nil
}
// TSDBStatus contains TSDB status data for /api/v1/status/tsdb.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats