app/vmselect: optimize querying for /api/v1/labels and /api/v1/label/<name>/values when start and end args are set

This commit is contained in:
Aliaksandr Valialkin 2020-11-05 00:15:43 +02:00
parent 381ad564a2
commit b378cd6ed8
7 changed files with 369 additions and 17 deletions

View file

@ -1,6 +1,6 @@
# tip
* FEATURE: optimize requests to `/api/v1/labels` and `/api/v1/label/<name>/values` when `start` and `end` args are set.
* FEATURE: reduce memory usage when query touches big number of time series.
* FEATURE: vmagent: reduce memory usage when `kubernetes_sd_config` discovers big number of scrape targets (e.g. hundreds of thouthands) and the majority of these targets (99%)
are dropped during relabeling. Previously labels for all the dropped targets were displayed at `/api/v1/targets` page. Now only up to `-promscrape.maxDroppedTargets` such

View file

@ -453,6 +453,26 @@ func DeleteSeries(sq *storage.SearchQuery) (int, error) {
return vmstorage.DeleteMetrics(tfss)
}
// GetLabelsOnTimeRange returns labels for the given tr until the given deadline.
func GetLabelsOnTimeRange(tr storage.TimeRange, deadline searchutils.Deadline) ([]string, error) {
if deadline.Exceeded() {
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
}
labels, err := vmstorage.SearchTagKeysOnTimeRange(tr, *maxTagKeysPerSearch, deadline.Deadline())
if err != nil {
return nil, fmt.Errorf("error during labels search on time range: %w", err)
}
// Substitute "" with "__name__"
for i := range labels {
if labels[i] == "" {
labels[i] = "__name__"
}
}
// Sort labels like Prometheus does
sort.Strings(labels)
return labels, nil
}
// GetLabels returns labels until the given deadline.
func GetLabels(deadline searchutils.Deadline) ([]string, error) {
if deadline.Exceeded() {
@ -462,20 +482,36 @@ func GetLabels(deadline searchutils.Deadline) ([]string, error) {
if err != nil {
return nil, fmt.Errorf("error during labels search: %w", err)
}
// Substitute "" with "__name__"
for i := range labels {
if labels[i] == "" {
labels[i] = "__name__"
}
}
// Sort labels like Prometheus does
sort.Strings(labels)
return labels, nil
}
// GetLabelValuesOnTimeRange returns label values for the given labelName on the given tr
// until the given deadline.
func GetLabelValuesOnTimeRange(labelName string, tr storage.TimeRange, deadline searchutils.Deadline) ([]string, error) {
if deadline.Exceeded() {
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
}
if labelName == "__name__" {
labelName = ""
}
// Search for tag values
labelValues, err := vmstorage.SearchTagValuesOnTimeRange([]byte(labelName), tr, *maxTagValuesPerSearch, deadline.Deadline())
if err != nil {
return nil, fmt.Errorf("error during label values search on time range for labelName=%q: %w", labelName, err)
}
// Sort labelValues like Prometheus does
sort.Strings(labelValues)
return labelValues, nil
}
// GetLabelValues returns label values for the given labelName
// until the given deadline.
func GetLabelValues(labelName string, deadline searchutils.Deadline) ([]string, error) {
@ -485,16 +521,13 @@ func GetLabelValues(labelName string, deadline searchutils.Deadline) ([]string,
if labelName == "__name__" {
labelName = ""
}
// Search for tag values
labelValues, err := vmstorage.SearchTagValues([]byte(labelName), *maxTagValuesPerSearch, deadline.Deadline())
if err != nil {
return nil, fmt.Errorf("error during label values search for labelName=%q: %w", labelName, err)
}
// Sort labelValues like Prometheus does
sort.Strings(labelValues)
return labelValues, nil
}

View file

@ -511,9 +511,26 @@ func LabelValuesHandler(startTime time.Time, labelName string, w http.ResponseWr
return fmt.Errorf("cannot parse form values: %w", err)
}
var labelValues []string
if len(r.Form["match[]"]) == 0 && len(r.Form["start"]) == 0 && len(r.Form["end"]) == 0 {
if len(r.Form["match[]"]) == 0 {
var err error
labelValues, err = netstorage.GetLabelValues(labelName, deadline)
if len(r.Form["start"]) == 0 && len(r.Form["end"]) == 0 {
labelValues, err = netstorage.GetLabelValues(labelName, deadline)
} else {
ct := startTime.UnixNano() / 1e6
end, err := searchutils.GetTime(r, "end", ct)
if err != nil {
return err
}
start, err := searchutils.GetTime(r, "start", end-defaultStep)
if err != nil {
return err
}
tr := storage.TimeRange{
MinTimestamp: start,
MaxTimestamp: end,
}
labelValues, err = netstorage.GetLabelValuesOnTimeRange(labelName, tr, deadline)
}
if err != nil {
return fmt.Errorf(`cannot obtain label values for %q: %w`, labelName, err)
}
@ -692,9 +709,26 @@ func LabelsHandler(startTime time.Time, w http.ResponseWriter, r *http.Request)
return fmt.Errorf("cannot parse form values: %w", err)
}
var labels []string
if len(r.Form["match[]"]) == 0 && len(r.Form["start"]) == 0 && len(r.Form["end"]) == 0 {
if len(r.Form["match[]"]) == 0 {
var err error
labels, err = netstorage.GetLabels(deadline)
if len(r.Form["start"]) == 0 && len(r.Form["end"]) == 0 {
labels, err = netstorage.GetLabels(deadline)
} else {
ct := startTime.UnixNano() / 1e6
end, err := searchutils.GetTime(r, "end", ct)
if err != nil {
return err
}
start, err := searchutils.GetTime(r, "start", end-defaultStep)
if err != nil {
return err
}
tr := storage.TimeRange{
MinTimestamp: start,
MaxTimestamp: end,
}
labels, err = netstorage.GetLabelsOnTimeRange(tr, deadline)
}
if err != nil {
return fmt.Errorf("cannot obtain labels: %w", err)
}

View file

@ -122,6 +122,14 @@ func DeleteMetrics(tfss []*storage.TagFilters) (int, error) {
return n, err
}
// SearchTagKeysOnTimeRange searches for tag keys on tr.
func SearchTagKeysOnTimeRange(tr storage.TimeRange, maxTagKeys int, deadline uint64) ([]string, error) {
WG.Add(1)
keys, err := Storage.SearchTagKeysOnTimeRange(tr, maxTagKeys, deadline)
WG.Done()
return keys, err
}
// SearchTagKeys searches for tag keys
func SearchTagKeys(maxTagKeys int, deadline uint64) ([]string, error) {
WG.Add(1)
@ -130,6 +138,14 @@ func SearchTagKeys(maxTagKeys int, deadline uint64) ([]string, error) {
return keys, err
}
// SearchTagValuesOnTimeRange searches for tag values for the given tagKey on tr.
func SearchTagValuesOnTimeRange(tagKey []byte, tr storage.TimeRange, maxTagValues int, deadline uint64) ([]string, error) {
WG.Add(1)
values, err := Storage.SearchTagValuesOnTimeRange(tagKey, tr, maxTagValues, deadline)
WG.Done()
return values, err
}
// SearchTagValues searches for tag values for the given tagKey
func SearchTagValues(tagKey []byte, maxTagValues int, deadline uint64) ([]string, error) {
WG.Add(1)

View file

@ -701,10 +701,118 @@ func putIndexItems(ii *indexItems) {
var indexItemsPool sync.Pool
// SearchTagKeysOnTimeRange returns all the tag keys on the given tr.
func (db *indexDB) SearchTagKeysOnTimeRange(tr TimeRange, maxTagKeys int, deadline uint64) ([]string, error) {
tks := make(map[string]struct{})
is := db.getIndexSearch(deadline)
err := is.searchTagKeysOnTimeRange(tks, tr, maxTagKeys)
db.putIndexSearch(is)
if err != nil {
return nil, err
}
ok := db.doExtDB(func(extDB *indexDB) {
is := extDB.getIndexSearch(deadline)
err = is.searchTagKeysOnTimeRange(tks, tr, maxTagKeys)
extDB.putIndexSearch(is)
})
if ok && err != nil {
return nil, err
}
keys := make([]string, 0, len(tks))
for key := range tks {
// Do not skip empty keys, since they are converted to __name__
keys = append(keys, key)
}
// Do not sort keys, since they must be sorted by vmselect.
return keys, nil
}
func (is *indexSearch) searchTagKeysOnTimeRange(tks map[string]struct{}, tr TimeRange, maxTagKeys int) error {
minDate := uint64(tr.MinTimestamp) / msecPerDay
maxDate := uint64(tr.MaxTimestamp) / msecPerDay
var mu sync.Mutex
var wg sync.WaitGroup
var errGlobal error
for date := minDate; date <= maxDate; date++ {
wg.Add(1)
go func(date uint64) {
defer wg.Done()
tksLocal := make(map[string]struct{})
isLocal := is.db.getIndexSearch(is.deadline)
err := isLocal.searchTagKeysOnDate(tksLocal, date, maxTagKeys)
is.db.putIndexSearch(isLocal)
mu.Lock()
defer mu.Unlock()
if errGlobal != nil {
return
}
if err != nil {
errGlobal = err
return
}
if len(tks) >= maxTagKeys {
return
}
for k := range tksLocal {
tks[k] = struct{}{}
}
}(date)
}
wg.Wait()
return errGlobal
}
func (is *indexSearch) searchTagKeysOnDate(tks map[string]struct{}, date uint64, maxTagKeys int) error {
ts := &is.ts
kb := &is.kb
mp := &is.mp
mp.Reset()
dmis := is.db.getDeletedMetricIDs()
loopsPaceLimiter := 0
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
kb.B = encoding.MarshalUint64(kb.B, date)
prefix := kb.B
ts.Seek(prefix)
for len(tks) < maxTagKeys && ts.NextItem() {
if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return err
}
}
loopsPaceLimiter++
item := ts.Item
if !bytes.HasPrefix(item, prefix) {
break
}
if err := mp.Init(item, nsPrefixDateTagToMetricIDs); err != nil {
return err
}
if mp.IsDeletedTag(dmis) {
continue
}
// Store tag key.
tks[string(mp.Tag.Key)] = struct{}{}
// Search for the next tag key.
// The last char in kb.B must be tagSeparatorChar.
// Just increment it in order to jump to the next tag key.
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
kb.B = encoding.MarshalUint64(kb.B, date)
kb.B = marshalTagValue(kb.B, mp.Tag.Key)
kb.B[len(kb.B)-1]++
ts.Seek(kb.B)
}
if err := ts.Error(); err != nil {
return fmt.Errorf("error during search for prefix %q: %w", prefix, err)
}
return nil
}
// SearchTagKeys returns all the tag keys.
func (db *indexDB) SearchTagKeys(maxTagKeys int, deadline uint64) ([]string, error) {
// TODO: cache results?
tks := make(map[string]struct{})
is := db.getIndexSearch(deadline)
@ -728,7 +836,6 @@ func (db *indexDB) SearchTagKeys(maxTagKeys int, deadline uint64) ([]string, err
// Do not skip empty keys, since they are converted to __name__
keys = append(keys, key)
}
// Do not sort keys, since they must be sorted by vmselect.
return keys, nil
}
@ -778,10 +885,129 @@ func (is *indexSearch) searchTagKeys(tks map[string]struct{}, maxTagKeys int) er
return nil
}
// SearchTagValuesOnTimeRange returns all the tag values for the given tagKey on tr.
func (db *indexDB) SearchTagValuesOnTimeRange(tagKey []byte, tr TimeRange, maxTagValues int, deadline uint64) ([]string, error) {
tvs := make(map[string]struct{})
is := db.getIndexSearch(deadline)
err := is.searchTagValuesOnTimeRange(tvs, tagKey, tr, maxTagValues)
db.putIndexSearch(is)
if err != nil {
return nil, err
}
ok := db.doExtDB(func(extDB *indexDB) {
is := extDB.getIndexSearch(deadline)
err = is.searchTagValuesOnTimeRange(tvs, tagKey, tr, maxTagValues)
extDB.putIndexSearch(is)
})
if ok && err != nil {
return nil, err
}
tagValues := make([]string, 0, len(tvs))
for tv := range tvs {
if len(tv) == 0 {
// Skip empty values, since they have no any meaning.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/600
continue
}
tagValues = append(tagValues, tv)
}
// Do not sort tagValues, since they must be sorted by vmselect.
return tagValues, nil
}
func (is *indexSearch) searchTagValuesOnTimeRange(tvs map[string]struct{}, tagKey []byte, tr TimeRange, maxTagValues int) error {
minDate := uint64(tr.MinTimestamp) / msecPerDay
maxDate := uint64(tr.MaxTimestamp) / msecPerDay
var mu sync.Mutex
var wg sync.WaitGroup
var errGlobal error
for date := minDate; date <= maxDate; date++ {
wg.Add(1)
go func(date uint64) {
defer wg.Done()
tvsLocal := make(map[string]struct{})
isLocal := is.db.getIndexSearch(is.deadline)
err := isLocal.searchTagValuesOnDate(tvsLocal, tagKey, date, maxTagValues)
is.db.putIndexSearch(isLocal)
mu.Lock()
defer mu.Unlock()
if errGlobal != nil {
return
}
if err != nil {
errGlobal = err
return
}
if len(tvs) >= maxTagValues {
return
}
for v := range tvsLocal {
tvs[v] = struct{}{}
}
}(date)
}
wg.Wait()
return errGlobal
}
func (is *indexSearch) searchTagValuesOnDate(tvs map[string]struct{}, tagKey []byte, date uint64, maxTagValues int) error {
ts := &is.ts
kb := &is.kb
mp := &is.mp
mp.Reset()
dmis := is.db.getDeletedMetricIDs()
loopsPaceLimiter := 0
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
kb.B = encoding.MarshalUint64(kb.B, date)
kb.B = marshalTagValue(kb.B, tagKey)
prefix := kb.B
ts.Seek(prefix)
for len(tvs) < maxTagValues && ts.NextItem() {
if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return err
}
}
loopsPaceLimiter++
item := ts.Item
if !bytes.HasPrefix(item, prefix) {
break
}
if err := mp.Init(item, nsPrefixDateTagToMetricIDs); err != nil {
return err
}
if mp.IsDeletedTag(dmis) {
continue
}
// Store tag value
tvs[string(mp.Tag.Value)] = struct{}{}
if mp.MetricIDsLen() < maxMetricIDsPerRow/2 {
// There is no need in searching for the next tag value,
// since it is likely it is located in the next row,
// because the current row contains incomplete metricIDs set.
continue
}
// Search for the next tag value.
// The last char in kb.B must be tagSeparatorChar.
// Just increment it in order to jump to the next tag value.
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
kb.B = encoding.MarshalUint64(kb.B, date)
kb.B = marshalTagValue(kb.B, mp.Tag.Key)
kb.B = marshalTagValue(kb.B, mp.Tag.Value)
kb.B[len(kb.B)-1]++
ts.Seek(kb.B)
}
if err := ts.Error(); err != nil {
return fmt.Errorf("error when searching for tag name prefix %q: %w", prefix, err)
}
return nil
}
// SearchTagValues returns all the tag values for the given tagKey
func (db *indexDB) SearchTagValues(tagKey []byte, maxTagValues int, deadline uint64) ([]string, error) {
// TODO: cache results?
tvs := make(map[string]struct{})
is := db.getIndexSearch(deadline)
err := is.searchTagValues(tvs, tagKey, maxTagValues)
@ -807,7 +1033,6 @@ func (db *indexDB) SearchTagValues(tagKey []byte, maxTagValues int, deadline uin
}
tagValues = append(tagValues, tv)
}
// Do not sort tagValues, since they must be sorted by vmselect.
return tagValues, nil
}

View file

@ -8,6 +8,7 @@ import (
"os"
"reflect"
"regexp"
"sort"
"testing"
"time"
@ -1487,6 +1488,13 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
var metricNameBuf []byte
perDayMetricIDs := make(map[uint64]*uint64set.Set)
var allMetricIDs uint64set.Set
tagKeys := []string{
"", "constant", "day", "uniqueid",
}
tagValues := []string{
"testMetric",
}
sort.Strings(tagKeys)
for day := 0; day < days; day++ {
var tsids []TSID
for metric := 0; metric < metricsPerDay; metric++ {
@ -1554,6 +1562,32 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
t.Fatalf("unexpected metricIDs found;\ngot\n%d\nwant\n%d", metricIDs.AppendTo(nil), allMetricIDs.AppendTo(nil))
}
// Check SearchTagKeysOnTimeRange.
tks, err := db.SearchTagKeysOnTimeRange(TimeRange{
MinTimestamp: int64(now) - msecPerDay,
MaxTimestamp: int64(now),
}, 10000, noDeadline)
if err != nil {
t.Fatalf("unexpected error in SearchTagKeysOnTimeRange: %s", err)
}
sort.Strings(tks)
if !reflect.DeepEqual(tks, tagKeys) {
t.Fatalf("unexpected tagKeys; got\n%s\nwant\n%s", tks, tagKeys)
}
// Check SearchTagValuesOnTimeRange.
tvs, err := db.SearchTagValuesOnTimeRange([]byte(""), TimeRange{
MinTimestamp: int64(now)-msecPerDay,
MaxTimestamp: int64(now),
}, 10000, noDeadline)
if err != nil {
t.Fatalf("unexpected error in SearchTagValuesOnTimeRange: %s", err)
}
sort.Strings(tvs)
if !reflect.DeepEqual(tvs, tagValues) {
t.Fatalf("unexpected tagValues; got\n%s\nwant\n%s", tvs, tagValues)
}
// Create a filter that will match series that occur across multiple days
tfs := NewTagFilters()
if err := tfs.Add([]byte("constant"), []byte("const"), false, false); err != nil {

View file

@ -921,11 +921,21 @@ func (s *Storage) searchMetricName(dst []byte, metricID uint64) ([]byte, error)
return s.idb().searchMetricName(dst, metricID)
}
// SearchTagKeysOnTimeRange searches for tag keys on tr.
func (s *Storage) SearchTagKeysOnTimeRange(tr TimeRange, maxTagKeys int, deadline uint64) ([]string, error) {
return s.idb().SearchTagKeysOnTimeRange(tr, maxTagKeys, deadline)
}
// SearchTagKeys searches for tag keys
func (s *Storage) SearchTagKeys(maxTagKeys int, deadline uint64) ([]string, error) {
return s.idb().SearchTagKeys(maxTagKeys, deadline)
}
// SearchTagValuesOnTimeRange searches for tag values for the given tagKey on tr.
func (s *Storage) SearchTagValuesOnTimeRange(tagKey []byte, tr TimeRange, maxTagValues int, deadline uint64) ([]string, error) {
return s.idb().SearchTagValuesOnTimeRange(tagKey, tr, maxTagValues, deadline)
}
// SearchTagValues searches for tag values for the given tagKey
func (s *Storage) SearchTagValues(tagKey []byte, maxTagValues int, deadline uint64) ([]string, error) {
return s.idb().SearchTagValues(tagKey, maxTagValues, deadline)