Adds tsdb match filters (#1282)

* init work on filters

* init propose for status filters

* fixes tsdb status
adds test

* fix bug

* removes checks from test
This commit is contained in:
Nikolay 2021-05-12 15:18:45 +03:00 committed by GitHub
parent cfd6aa28e1
commit 8a0678678b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 196 additions and 3 deletions

View file

@ -695,6 +695,26 @@ func GetTSDBStatusForDate(deadline searchutils.Deadline, date uint64, topN int)
return status, nil
}
// GetTSDBStatusWithFilters returns tsdb status according to https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
func GetTSDBStatusWithFilters(deadline searchutils.Deadline, sq *storage.SearchQuery, topN int) (*storage.TSDBStatus, error) {
if deadline.Exceeded() {
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
}
tr := storage.TimeRange{
MaxTimestamp: sq.MaxTimestamp,
MinTimestamp: sq.MinTimestamp,
}
tfss, err := setupTfss(tr, sq.TagFilterss, deadline)
if err != nil {
return nil, err
}
status, err := vmstorage.GetTSDBStatusWithFilters(tr, tfss, topN, *maxMetricsPerSearch, deadline.Deadline())
if err != nil {
return nil, fmt.Errorf("error during tsdb status request: %w", err)
}
return status, nil
}
// GetSeriesCount returns the number of unique series.
func GetSeriesCount(deadline searchutils.Deadline) (uint64, error) {
if deadline.Exceeded() {

View file

@ -638,6 +638,12 @@ func TSDBStatusHandler(startTime time.Time, w http.ResponseWriter, r *http.Reque
if err := r.ParseForm(); err != nil {
return fmt.Errorf("cannot parse form values: %w", err)
}
etf, err := searchutils.GetEnforcedTagFiltersFromRequest(r)
if err != nil {
return err
}
matches := getMatchesFromRequest(r)
date := fasttime.UnixDate()
dateStr := r.FormValue("date")
if len(dateStr) > 0 {
@ -662,9 +668,17 @@ func TSDBStatusHandler(startTime time.Time, w http.ResponseWriter, r *http.Reque
}
topN = n
}
status, err := netstorage.GetTSDBStatusForDate(deadline, date, topN)
if err != nil {
return fmt.Errorf(`cannot obtain tsdb status for date=%d, topN=%d: %w`, date, topN, err)
var status *storage.TSDBStatus
if len(matches) == 0 && len(etf) == 0 {
status, err = netstorage.GetTSDBStatusForDate(deadline, date, topN)
if err != nil {
return fmt.Errorf(`cannot obtain tsdb status for date=%d, topN=%d: %w`, date, topN, err)
}
} else {
status, err = tsdbStatusWithMatches(matches, etf, date, topN, deadline)
if err != nil {
return fmt.Errorf("cannot tsdb status with matches for date=%d, topN=%d: %w", date, topN, err)
}
}
w.Header().Set("Content-Type", "application/json; charset=utf-8")
bw := bufferedwriter.Get(w)
@ -677,6 +691,26 @@ func TSDBStatusHandler(startTime time.Time, w http.ResponseWriter, r *http.Reque
return nil
}
func tsdbStatusWithMatches(matches []string, etf []storage.TagFilter, date uint64, topN int, deadline searchutils.Deadline) (*storage.TSDBStatus, error) {
tagFilterss, err := getTagFilterssFromMatches(matches)
if err != nil {
return nil, err
}
tagFilterss = addEnforcedFiltersToTagFilterss(tagFilterss, etf)
if len(tagFilterss) == 0 {
logger.Panicf("BUG: tagFilterss must be non-empty")
}
start := int64(date*secsPerDay) * 1000
end := int64(date*secsPerDay+secsPerDay) * 1000
sq := storage.NewSearchQuery(start, end, tagFilterss)
status, err := netstorage.GetTSDBStatusWithFilters(deadline, sq, topN)
if err != nil {
return nil, err
}
return status, nil
}
var tsdbStatusDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/status/tsdb"}`)
// LabelsHandler processes /api/v1/labels request.

View file

@ -213,6 +213,14 @@ func GetTSDBStatusForDate(date uint64, topN int, deadline uint64) (*storage.TSDB
return status, err
}
// GetTSDBStatusWithFilters returns TSDB status for given filters.
func GetTSDBStatusWithFilters(tr storage.TimeRange, tfss []*storage.TagFilters, topN, maxMetrics int, deadline uint64) (*storage.TSDBStatus, error) {
WG.Add(1)
status, err := Storage.GetTSDBStatusForDateWithFilters(tfss, tr, maxMetrics, deadline, topN)
WG.Done()
return status, err
}
// GetSeriesCount returns the number of time series in the storage.
func GetSeriesCount(deadline uint64) (uint64, error) {
WG.Add(1)

View file

@ -1309,6 +1309,105 @@ func (is *indexSearch) getSeriesCount() (uint64, error) {
return metricIDsLen, nil
}
// GetTSDBStatusWithFiltersOnTimeRange returns topN entries for tsdb status for given TSIDs.
func (db *indexDB) GetTSDBStatusWithFiltersOnTimeRange(tfss []*TagFilters, tr TimeRange, maxMetrics, topN int, deadline uint64) (*TSDBStatus, error) {
is := db.getIndexSearch(deadline)
status, err := is.GetTSDBStatusWithFiltersOnTimeRange(tfss, tr, maxMetrics, topN, deadline)
db.putIndexSearch(is)
if err != nil {
return nil, err
}
if status.hasEntries() {
return status, nil
}
ok := db.doExtDB(func(extDB *indexDB) {
is := extDB.getIndexSearch(deadline)
status, err = is.GetTSDBStatusWithFiltersOnTimeRange(tfss, tr, maxMetrics, topN, deadline)
extDB.putIndexSearch(is)
})
if ok && err != nil {
return nil, fmt.Errorf("error when obtaining TSDB status from extDB: %w", err)
}
return status, nil
}
// GetTSDBStatusWithFiltersOnTimeRange returns topN entries for tsdb status for given TSIDs.
func (is *indexSearch) GetTSDBStatusWithFiltersOnTimeRange(tfss []*TagFilters, tr TimeRange, maxMetrics, topN int, deadline uint64) (*TSDBStatus, error) {
metricIDs, err := is.searchMetricIDs(tfss, tr, maxMetrics)
if err != nil {
return nil, err
}
thLabelValueCountByLabelName := newTopHeap(topN)
thSeriesCountByLabelValuePair := newTopHeap(topN)
thSeriesCountByMetricName := newTopHeap(topN)
var metricName, tmpMetricName, labelPairs []byte
var mn MetricName
var metricNameCnt uint64
metricNameLabel := "__name__"
tmpPairs := []byte(metricNameLabel)
// holds uniq count values per label name.
cntByUniqLabelValues := make(map[string]uint64, len(metricIDs))
// holds count for label=value uniq pairs.
cntLabelPairs := make(map[string]uint64, len(metricIDs))
for i := range metricIDs {
if i&paceLimiterSlowIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(deadline); err != nil {
return nil, err
}
}
mID := metricIDs[i]
metricName, err = is.searchMetricName(metricName[:0], mID)
if err == io.EOF {
continue
}
if err != nil {
return nil, err
}
if err = mn.Unmarshal(metricName); err != nil {
return nil, fmt.Errorf("cannot unmarshal metricName=%q: %w", metricName, err)
}
if !bytes.Equal(tmpMetricName, mn.MetricGroup) {
tmpMetricName = mn.MetricGroup
cntByUniqLabelValues[metricNameLabel]++
tmpPairs = append(tmpPairs[:len(metricNameLabel)], '=')
tmpPairs = append(tmpPairs, mn.MetricGroup...)
thSeriesCountByMetricName.pushIfNonEmpty(tmpMetricName, metricNameCnt)
metricNameCnt = 0
}
cntLabelPairs[string(tmpPairs)]++
metricNameCnt++
for j := range mn.Tags {
tag := mn.Tags[j]
labelPairs = append(labelPairs[:0], tag.Key...)
labelPairs = append(labelPairs, '=')
labelPairs = append(labelPairs, tag.Value...)
// if label pairs not seen, its uniq value for given label.
if _, ok := cntLabelPairs[string(labelPairs)]; ok {
cntLabelPairs[string(labelPairs)]++
} else {
cntLabelPairs[string(labelPairs)]++
cntByUniqLabelValues[string(tag.Key)]++
}
}
}
thSeriesCountByMetricName.pushIfNonEmpty(tmpMetricName, metricNameCnt)
for k, v := range cntLabelPairs {
thSeriesCountByLabelValuePair.pushIfNonEmpty([]byte(k), v)
}
for k, v := range cntByUniqLabelValues {
thLabelValueCountByLabelName.pushIfNonEmpty([]byte(k), v)
}
status := TSDBStatus{
SeriesCountByMetricName: thSeriesCountByMetricName.getSortedResult(),
LabelValueCountByLabelName: thLabelValueCountByLabelName.getSortedResult(),
SeriesCountByLabelValuePair: thSeriesCountByLabelValuePair.getSortedResult(),
}
return &status, nil
}
// GetTSDBStatusForDate returns topN entries for tsdb status for the given date.
func (db *indexDB) GetTSDBStatusForDate(date uint64, topN int, deadline uint64) (*TSDBStatus, error) {
is := db.getIndexSearch(deadline)

View file

@ -1691,6 +1691,33 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
if !reflect.DeepEqual(status.SeriesCountByLabelValuePair, expectedSeriesCountByLabelValuePair) {
t.Fatalf("unexpected SeriesCountByLabelValuePair;\ngot\n%v\nwant\n%v", status.SeriesCountByLabelValuePair, expectedSeriesCountByLabelValuePair)
}
// Perform a search across all the days, should match all metrics
tr = TimeRange{
MinTimestamp: int64(now),
MaxTimestamp: int64(now - msecPerDay*days),
}
tfs = NewTagFilters()
if err := tfs.Add([]byte("day"), []byte("3"), false, false); err != nil {
t.Fatalf("cannot add filter: %s", err)
}
status, err = db.GetTSDBStatusWithFiltersOnTimeRange([]*TagFilters{tfs}, tr, 10000, 5, noDeadline)
if err != nil {
t.Fatalf("error in GetTSDBStatusForDate: %s", err)
}
if !status.hasEntries() {
t.Fatalf("expecting non-empty TSDB status")
}
expectedSeriesCountByMetricName = []TopHeapEntry{
{
Name: "testMetric",
Count: 1000,
},
}
if !reflect.DeepEqual(status.SeriesCountByMetricName, expectedSeriesCountByMetricName) {
t.Fatalf("unexpected SeriesCountByMetricName;\ngot\n%v\nwant\n%v", status.SeriesCountByMetricName, expectedSeriesCountByMetricName)
}
}
func toTFPointers(tfs []tagFilter) []*tagFilter {

View file

@ -1248,6 +1248,11 @@ func (s *Storage) GetTSDBStatusForDate(date uint64, topN int, deadline uint64) (
return s.idb().GetTSDBStatusForDate(date, topN, deadline)
}
// GetTSDBStatusForDateWithFilters special function for /api/v1/status/tsdb with match[] filters.
func (s *Storage) GetTSDBStatusForDateWithFilters(tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64, topN int) (*TSDBStatus, error) {
return s.idb().GetTSDBStatusWithFiltersOnTimeRange(tfss, tr, maxMetrics, topN, deadline)
}
// MetricRow is a metric to insert into storage.
type MetricRow struct {
// MetricNameRaw contains raw metric name, which must be decoded