label values api

This commit is contained in:
Haley Wang 2024-01-11 17:12:08 +08:00
parent 91ccea236f
commit cc6df8a477
No known key found for this signature in database
GPG key ID: C6299A8A1D6CC50C
3 changed files with 48 additions and 12 deletions

View file

@ -176,7 +176,6 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
promql.ResetRollupResultCache()
return true
}
if strings.HasPrefix(path, "/api/v1/label/") {
s := path[len("/api/v1/label/"):]
if strings.HasSuffix(s, "/values") {

View file

@ -3,7 +3,6 @@ package prometheus
import (
"flag"
"fmt"
"github.com/VictoriaMetrics/metricsql"
"math"
"net/http"
"runtime"
@ -13,6 +12,8 @@ import (
"sync/atomic"
"time"
"github.com/VictoriaMetrics/metricsql"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/querystats"
@ -47,10 +48,10 @@ var (
maxStepForPointsAdjustment = flag.Duration("search.maxStepForPointsAdjustment", time.Minute, "The maximum step when /api/v1/query_range handler adjusts "+
"points with timestamps closer than -search.latencyOffset to the current time. The adjustment is needed because such points may contain incomplete data")
maxUniqueTimeseries = flag.Int("search.maxUniqueTimeseries", 300e3, "The maximum number of unique time series, which can be selected during /api/v1/query and /api/v1/query_range queries. This option allows limiting memory usage")
maxUniqueTimeseries = flag.Int("search.maxUniqueTimeseries", 100, "The maximum number of unique time series, which can be selected during /api/v1/query and /api/v1/query_range queries. This option allows limiting memory usage")
maxFederateSeries = flag.Int("search.maxFederateSeries", 1e6, "The maximum number of time series, which can be returned from /federate. This option allows limiting memory usage")
maxExportSeries = flag.Int("search.maxExportSeries", 10e6, "The maximum number of time series, which can be returned from /api/v1/export* APIs. This option allows limiting memory usage")
maxTSDBStatusSeries = flag.Int("search.maxTSDBStatusSeries", 10e6, "The maximum number of time series, which can be processed during the call to /api/v1/status/tsdb. This option allows limiting memory usage")
maxTSDBStatusSeries = flag.Int("search.maxTSDBStatusSeries", 100, "The maximum number of time series, which can be processed during the call to /api/v1/status/tsdb. This option allows limiting memory usage")
maxSeriesLimit = flag.Int("search.maxSeries", 30e3, "The maximum number of time series, which can be returned from /api/v1/series. This option allows limiting memory usage")
maxPointsPerTimeseries = flag.Int("search.maxPointsPerTimeseries", 30e3, "The maximum points per a single timeseries returned from /api/v1/query_range. "+
"This option doesn't limit the number of scanned raw samples in the database. The main purpose of this option is to limit the number of per-series points "+

View file

@ -10,6 +10,7 @@ import (
"reflect"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
@ -765,7 +766,8 @@ func (is *indexSearch) getLabelNamesForMetricIDs(qt *querytracer.Tracer, metricI
// SearchLabelValuesWithFiltersOnTimeRange returns label values for the given labelName, tfss and tr.
func (db *indexDB) SearchLabelValuesWithFiltersOnTimeRange(qt *querytracer.Tracer, labelName string, tfss []*TagFilters, tr TimeRange,
maxLabelValues, maxMetrics int, deadline uint64) ([]string, error) {
maxLabelValues, maxMetrics int, deadline uint64,
) ([]string, error) {
qt = qt.NewChild("search for label values: labelName=%q, filters=%s, timeRange=%s, maxLabelNames=%d, maxMetrics=%d", labelName, tfss, &tr, maxLabelValues, maxMetrics)
defer qt.Done()
lvs := make(map[string]struct{})
@ -804,7 +806,8 @@ func (db *indexDB) SearchLabelValuesWithFiltersOnTimeRange(qt *querytracer.Trace
}
func (is *indexSearch) searchLabelValuesWithFiltersOnTimeRange(qt *querytracer.Tracer, lvs map[string]struct{}, labelName string, tfss []*TagFilters,
tr TimeRange, maxLabelValues, maxMetrics int) error {
tr TimeRange, maxLabelValues, maxMetrics int,
) error {
minDate := uint64(tr.MinTimestamp) / msecPerDay
maxDate := uint64(tr.MaxTimestamp-1) / msecPerDay
if maxDate == 0 || minDate > maxDate || maxDate-minDate > maxDaysForPerDaySearch {
@ -853,9 +856,14 @@ func (is *indexSearch) searchLabelValuesWithFiltersOnTimeRange(qt *querytracer.T
}
func (is *indexSearch) searchLabelValuesWithFiltersOnDate(qt *querytracer.Tracer, lvs map[string]struct{}, labelName string, tfss []*TagFilters,
date uint64, maxLabelValues, maxMetrics int) error {
date uint64, maxLabelValues, maxMetrics int,
) error {
// use metricsID if values limit is high
if maxLabelValues > 1e3 {
maxMetrics = 2e9
}
filter, err := is.searchMetricIDsWithFiltersOnDate(qt, tfss, date, maxMetrics)
if err != nil {
if err != nil && !strings.Contains(err.Error(), "the number of matching timeseries exceeds") {
return err
}
if filter != nil && filter.Len() < 100e3 {
@ -900,7 +908,7 @@ func (is *indexSearch) searchLabelValuesWithFiltersOnDate(qt *querytracer.Tracer
if err := mp.Init(item, nsPrefixExpected); err != nil {
return err
}
if mp.GetMatchingSeriesCount(filter, dmis) == 0 {
if mp.GetMatchingSeriesCount(nil, dmis) == 0 {
continue
}
labelValue := mp.Tag.Value
@ -915,6 +923,30 @@ func (is *indexSearch) searchLabelValuesWithFiltersOnDate(qt *querytracer.Tracer
ts.Seek(kb.B)
continue
}
if len(tfss) != 0 {
tmptfs := NewTagFilters()
tmptfs.Add(labelNameBytes, labelValue, false, false)
tfssCopy := make([]*TagFilters, len(tfss))
for i, v := range tfss {
tfssCopy[i] = &TagFilters{
tfs: make([]tagFilter, len(v.tfs)),
}
copy(tfssCopy[i].tfs, v.tfs)
tfssCopy[i].commonPrefix = v.commonPrefix
}
tfssCopy[0].tfs = append(tfssCopy[0].tfs, tmptfs.tfs...)
isLocal := is.db.getIndexSearch(is.deadline)
filter, err := isLocal.searchMetricIDsWithFiltersOnDate(qt, tfssCopy, date, 10)
if err != nil && !strings.Contains(err.Error(), "the number of matching timeseries exceeds") {
return err
}
if err == nil && (filter == nil || filter.Len() == 0) {
continue
}
}
lvs[string(labelValue)] = struct{}{}
prevLabelValue = append(prevLabelValue[:0], labelValue...)
}
@ -2808,7 +2840,8 @@ func (is *indexSearch) hasDateMetricIDNoExtDB(date, metricID uint64) bool {
}
func (is *indexSearch) getMetricIDsForDateTagFilter(qt *querytracer.Tracer, tf *tagFilter, date uint64, commonPrefix []byte,
maxMetrics int, maxLoopsCount int64) (*uint64set.Set, int64, error) {
maxMetrics int, maxLoopsCount int64,
) (*uint64set.Set, int64, error) {
if qt.Enabled() {
qt = qt.NewChild("get metric ids for filter and date: filter={%s}, date=%s, maxMetrics=%d, maxLoopsCount=%d", tf, dateToString(date), maxMetrics, maxLoopsCount)
defer qt.Done()
@ -3226,8 +3259,10 @@ func mergeTagToMetricIDsRowsInternal(data []byte, items []mergeset.Item, nsPrefi
return dstData, dstItems
}
var indexBlocksWithMetricIDsIncorrectOrder uint64
var indexBlocksWithMetricIDsProcessed uint64
var (
indexBlocksWithMetricIDsIncorrectOrder uint64
indexBlocksWithMetricIDsProcessed uint64
)
func checkItemsSorted(data []byte, items []mergeset.Item) bool {
if len(items) == 0 {
@ -3255,6 +3290,7 @@ func (s uint64Sorter) Len() int { return len(s) }
func (s uint64Sorter) Less(i, j int) bool {
return s[i] < s[j]
}
func (s uint64Sorter) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}