diff --git a/app/vmselect/main.go b/app/vmselect/main.go index c239852a4..5ab0bd4d5 100644 --- a/app/vmselect/main.go +++ b/app/vmselect/main.go @@ -176,7 +176,6 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { promql.ResetRollupResultCache() return true } - if strings.HasPrefix(path, "/api/v1/label/") { s := path[len("/api/v1/label/"):] if strings.HasSuffix(s, "/values") { diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index 67f4c94fd..8a978b661 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -3,7 +3,6 @@ package prometheus import ( "flag" "fmt" - "github.com/VictoriaMetrics/metricsql" "math" "net/http" "runtime" @@ -13,6 +12,8 @@ import ( "sync/atomic" "time" + "github.com/VictoriaMetrics/metricsql" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/querystats" @@ -47,10 +48,10 @@ var ( maxStepForPointsAdjustment = flag.Duration("search.maxStepForPointsAdjustment", time.Minute, "The maximum step when /api/v1/query_range handler adjusts "+ "points with timestamps closer than -search.latencyOffset to the current time. The adjustment is needed because such points may contain incomplete data") - maxUniqueTimeseries = flag.Int("search.maxUniqueTimeseries", 300e3, "The maximum number of unique time series, which can be selected during /api/v1/query and /api/v1/query_range queries. This option allows limiting memory usage") + maxUniqueTimeseries = flag.Int("search.maxUniqueTimeseries", 100, "The maximum number of unique time series, which can be selected during /api/v1/query and /api/v1/query_range queries. This option allows limiting memory usage") maxFederateSeries = flag.Int("search.maxFederateSeries", 1e6, "The maximum number of time series, which can be returned from /federate. This option allows limiting memory usage") maxExportSeries = flag.Int("search.maxExportSeries", 10e6, "The maximum number of time series, which can be returned from /api/v1/export* APIs. This option allows limiting memory usage") - maxTSDBStatusSeries = flag.Int("search.maxTSDBStatusSeries", 10e6, "The maximum number of time series, which can be processed during the call to /api/v1/status/tsdb. This option allows limiting memory usage") + maxTSDBStatusSeries = flag.Int("search.maxTSDBStatusSeries", 100, "The maximum number of time series, which can be processed during the call to /api/v1/status/tsdb. This option allows limiting memory usage") maxSeriesLimit = flag.Int("search.maxSeries", 30e3, "The maximum number of time series, which can be returned from /api/v1/series. This option allows limiting memory usage") maxPointsPerTimeseries = flag.Int("search.maxPointsPerTimeseries", 30e3, "The maximum points per a single timeseries returned from /api/v1/query_range. "+ "This option doesn't limit the number of scanned raw samples in the database. The main purpose of this option is to limit the number of per-series points "+ diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index c3e2785b8..9b62adac8 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -10,6 +10,7 @@ import ( "reflect" "sort" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -765,7 +766,8 @@ func (is *indexSearch) getLabelNamesForMetricIDs(qt *querytracer.Tracer, metricI // SearchLabelValuesWithFiltersOnTimeRange returns label values for the given labelName, tfss and tr. func (db *indexDB) SearchLabelValuesWithFiltersOnTimeRange(qt *querytracer.Tracer, labelName string, tfss []*TagFilters, tr TimeRange, - maxLabelValues, maxMetrics int, deadline uint64) ([]string, error) { + maxLabelValues, maxMetrics int, deadline uint64, +) ([]string, error) { qt = qt.NewChild("search for label values: labelName=%q, filters=%s, timeRange=%s, maxLabelNames=%d, maxMetrics=%d", labelName, tfss, &tr, maxLabelValues, maxMetrics) defer qt.Done() lvs := make(map[string]struct{}) @@ -804,7 +806,8 @@ func (db *indexDB) SearchLabelValuesWithFiltersOnTimeRange(qt *querytracer.Trace } func (is *indexSearch) searchLabelValuesWithFiltersOnTimeRange(qt *querytracer.Tracer, lvs map[string]struct{}, labelName string, tfss []*TagFilters, - tr TimeRange, maxLabelValues, maxMetrics int) error { + tr TimeRange, maxLabelValues, maxMetrics int, +) error { minDate := uint64(tr.MinTimestamp) / msecPerDay maxDate := uint64(tr.MaxTimestamp-1) / msecPerDay if maxDate == 0 || minDate > maxDate || maxDate-minDate > maxDaysForPerDaySearch { @@ -853,9 +856,14 @@ func (is *indexSearch) searchLabelValuesWithFiltersOnTimeRange(qt *querytracer.T } func (is *indexSearch) searchLabelValuesWithFiltersOnDate(qt *querytracer.Tracer, lvs map[string]struct{}, labelName string, tfss []*TagFilters, - date uint64, maxLabelValues, maxMetrics int) error { + date uint64, maxLabelValues, maxMetrics int, +) error { + // use metricsID if values limit is high + if maxLabelValues > 1e3 { + maxMetrics = 2e9 + } filter, err := is.searchMetricIDsWithFiltersOnDate(qt, tfss, date, maxMetrics) - if err != nil { + if err != nil && !strings.Contains(err.Error(), "the number of matching timeseries exceeds") { return err } if filter != nil && filter.Len() < 100e3 { @@ -900,7 +908,7 @@ func (is *indexSearch) searchLabelValuesWithFiltersOnDate(qt *querytracer.Tracer if err := mp.Init(item, nsPrefixExpected); err != nil { return err } - if mp.GetMatchingSeriesCount(filter, dmis) == 0 { + if mp.GetMatchingSeriesCount(nil, dmis) == 0 { continue } labelValue := mp.Tag.Value @@ -915,6 +923,30 @@ func (is *indexSearch) searchLabelValuesWithFiltersOnDate(qt *querytracer.Tracer ts.Seek(kb.B) continue } + + if len(tfss) != 0 { + tmptfs := NewTagFilters() + tmptfs.Add(labelNameBytes, labelValue, false, false) + tfssCopy := make([]*TagFilters, len(tfss)) + for i, v := range tfss { + tfssCopy[i] = &TagFilters{ + tfs: make([]tagFilter, len(v.tfs)), + } + copy(tfssCopy[i].tfs, v.tfs) + tfssCopy[i].commonPrefix = v.commonPrefix + } + + tfssCopy[0].tfs = append(tfssCopy[0].tfs, tmptfs.tfs...) + isLocal := is.db.getIndexSearch(is.deadline) + filter, err := isLocal.searchMetricIDsWithFiltersOnDate(qt, tfssCopy, date, 10) + if err != nil && !strings.Contains(err.Error(), "the number of matching timeseries exceeds") { + return err + } + if err == nil && (filter == nil || filter.Len() == 0) { + continue + } + } + lvs[string(labelValue)] = struct{}{} prevLabelValue = append(prevLabelValue[:0], labelValue...) } @@ -2808,7 +2840,8 @@ func (is *indexSearch) hasDateMetricIDNoExtDB(date, metricID uint64) bool { } func (is *indexSearch) getMetricIDsForDateTagFilter(qt *querytracer.Tracer, tf *tagFilter, date uint64, commonPrefix []byte, - maxMetrics int, maxLoopsCount int64) (*uint64set.Set, int64, error) { + maxMetrics int, maxLoopsCount int64, +) (*uint64set.Set, int64, error) { if qt.Enabled() { qt = qt.NewChild("get metric ids for filter and date: filter={%s}, date=%s, maxMetrics=%d, maxLoopsCount=%d", tf, dateToString(date), maxMetrics, maxLoopsCount) defer qt.Done() @@ -3226,8 +3259,10 @@ func mergeTagToMetricIDsRowsInternal(data []byte, items []mergeset.Item, nsPrefi return dstData, dstItems } -var indexBlocksWithMetricIDsIncorrectOrder uint64 -var indexBlocksWithMetricIDsProcessed uint64 +var ( + indexBlocksWithMetricIDsIncorrectOrder uint64 + indexBlocksWithMetricIDsProcessed uint64 +) func checkItemsSorted(data []byte, items []mergeset.Item) bool { if len(items) == 0 { @@ -3255,6 +3290,7 @@ func (s uint64Sorter) Len() int { return len(s) } func (s uint64Sorter) Less(i, j int) bool { return s[i] < s[j] } + func (s uint64Sorter) Swap(i, j int) { s[i], s[j] = s[j], s[i] }