Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files

This commit is contained in:
Aliaksandr Valialkin 2020-11-05 02:51:08 +02:00
commit 56303aee5b
17 changed files with 498 additions and 31 deletions

View file

@ -1,6 +1,6 @@
# tip
* FEATURE: optimize requests to `/api/v1/labels` and `/api/v1/label/<name>/values` when `start` and `end` args are set.
* FEATURE: reduce memory usage when query touches big number of time series.
* FEATURE: vmagent: reduce memory usage when `kubernetes_sd_config` discovers big number of scrape targets (e.g. hundreds of thouthands) and the majority of these targets (99%)
are dropped during relabeling. Previously labels for all the dropped targets were displayed at `/api/v1/targets` page. Now only up to `-promscrape.maxDroppedTargets` such

View file

@ -38,6 +38,7 @@ See [features available for enterprise customers](https://github.com/VictoriaMet
* [Brandwatch](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#brandwatch)
* [Adsterra](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#adsterra)
* [ARNES](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#arnes)
* [Idealo.de](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#idealode)
## Prominent features

View file

@ -219,6 +219,9 @@ It accepts optional `show_original_labels=1` query arg, which shows the original
This information may be useful for debugging target relabeling.
* `http://vmagent-host:8429/api/v1/targets`. This handler returns data compatible with [the corresponding page from Prometheus API](https://prometheus.io/docs/prometheus/latest/querying/api/#targets).
* `http://vmagent-host:8429/ready`. This handler returns http 200 status code when `vmagent` finishes initialization for all service_discovery configs.
It may be useful for performing `vmagent` rolling update without scrape loss.
### Troubleshooting

View file

@ -7,6 +7,7 @@ import (
"os"
"strconv"
"strings"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/csvimport"
@ -222,6 +223,16 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
procutil.SelfSIGHUP()
w.WriteHeader(http.StatusOK)
return true
case "/ready":
if rdy := atomic.LoadInt32(&promscrape.PendingScrapeConfigs); rdy > 0 {
errMsg := fmt.Sprintf("waiting for scrapes to init, left: %d", rdy)
http.Error(w, errMsg, http.StatusTooEarly)
} else {
w.Header().Set("Content-Type", "text/plain")
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
}
return true
}
return false
}

View file

@ -170,6 +170,16 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
procutil.SelfSIGHUP()
w.WriteHeader(http.StatusNoContent)
return true
case "/ready":
if rdy := atomic.LoadInt32(&promscrape.PendingScrapeConfigs); rdy > 0 {
errMsg := fmt.Sprintf("waiting for scrape config to init targets, configs left: %d", rdy)
http.Error(w, errMsg, http.StatusTooEarly)
} else {
w.Header().Set("Content-Type", "text/plain")
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
}
return true
default:
// This is not our link
return false

View file

@ -495,6 +495,34 @@ func DeleteSeries(sq *storage.SearchQuery) (int, error) {
return vmstorage.DeleteMetrics(tfss)
}
// GetLabelsOnTimeRange returns labels for the given tr until the given deadline.
func GetLabelsOnTimeRange(tr storage.TimeRange, deadline searchutils.Deadline) ([]string, error) {
if deadline.Exceeded() {
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
}
labels, err := vmstorage.SearchTagKeysOnTimeRange(tr, *maxTagKeysPerSearch, deadline.Deadline())
if err != nil {
return nil, fmt.Errorf("error during labels search on time range: %w", err)
}
// Substitute "" with "__name__"
for i := range labels {
if labels[i] == "" {
labels[i] = "__name__"
}
}
// Merge labels obtained from Prometheus storage.
promLabels, err := promdb.GetLabelNamesOnTimeRange(tr, deadline)
if err != nil {
return nil, fmt.Errorf("cannot obtain labels from Prometheus storage: %w", err)
}
labels = mergeStrings(labels, promLabels)
// Sort labels like Prometheus does
sort.Strings(labels)
return labels, nil
}
// GetLabels returns labels until the given deadline.
func GetLabels(deadline searchutils.Deadline) ([]string, error) {
if deadline.Exceeded() {
@ -504,7 +532,6 @@ func GetLabels(deadline searchutils.Deadline) ([]string, error) {
if err != nil {
return nil, fmt.Errorf("error during labels search: %w", err)
}
// Substitute "" with "__name__"
for i := range labels {
if labels[i] == "" {
@ -521,7 +548,6 @@ func GetLabels(deadline searchutils.Deadline) ([]string, error) {
// Sort labels like Prometheus does
sort.Strings(labels)
return labels, nil
}
@ -546,6 +572,33 @@ func mergeStrings(a, b []string) []string {
return result
}
// GetLabelValuesOnTimeRange returns label values for the given labelName on the given tr
// until the given deadline.
func GetLabelValuesOnTimeRange(labelName string, tr storage.TimeRange, deadline searchutils.Deadline) ([]string, error) {
if deadline.Exceeded() {
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
}
if labelName == "__name__" {
labelName = ""
}
// Search for tag values
labelValues, err := vmstorage.SearchTagValuesOnTimeRange([]byte(labelName), tr, *maxTagValuesPerSearch, deadline.Deadline())
if err != nil {
return nil, fmt.Errorf("error during label values search on time range for labelName=%q: %w", labelName, err)
}
// Merge label values obtained from Prometheus storage.
promLabelValues, err := promdb.GetLabelValuesOnTimeRange(labelName, tr, deadline)
if err != nil {
return nil, fmt.Errorf("cannot obtain label values on time range for %q from Prometheus storage: %w", labelName, err)
}
labelValues = mergeStrings(labelValues, promLabelValues)
// Sort labelValues like Prometheus does
sort.Strings(labelValues)
return labelValues, nil
}
// GetLabelValues returns label values for the given labelName
// until the given deadline.
func GetLabelValues(labelName string, deadline searchutils.Deadline) ([]string, error) {
@ -555,7 +608,6 @@ func GetLabelValues(labelName string, deadline searchutils.Deadline) ([]string,
if labelName == "__name__" {
labelName = ""
}
// Search for tag values
labelValues, err := vmstorage.SearchTagValues([]byte(labelName), *maxTagValuesPerSearch, deadline.Deadline())
if err != nil {
@ -571,7 +623,6 @@ func GetLabelValues(labelName string, deadline searchutils.Deadline) ([]string,
// Sort labelValues like Prometheus does
sort.Strings(labelValues)
return labelValues, nil
}

View file

@ -511,12 +511,32 @@ func LabelValuesHandler(startTime time.Time, labelName string, w http.ResponseWr
return fmt.Errorf("cannot parse form values: %w", err)
}
var labelValues []string
if len(r.Form["match[]"]) == 0 && len(r.Form["start"]) == 0 && len(r.Form["end"]) == 0 {
if len(r.Form["match[]"]) == 0 {
if len(r.Form["start"]) == 0 && len(r.Form["end"]) == 0 {
var err error
labelValues, err = netstorage.GetLabelValues(labelName, deadline)
if err != nil {
return fmt.Errorf(`cannot obtain label values for %q: %w`, labelName, err)
}
} else {
ct := startTime.UnixNano() / 1e6
end, err := searchutils.GetTime(r, "end", ct)
if err != nil {
return err
}
start, err := searchutils.GetTime(r, "start", end-defaultStep)
if err != nil {
return err
}
tr := storage.TimeRange{
MinTimestamp: start,
MaxTimestamp: end,
}
labelValues, err = netstorage.GetLabelValuesOnTimeRange(labelName, tr, deadline)
if err != nil {
return fmt.Errorf(`cannot obtain label values on time range for %q: %w`, labelName, err)
}
}
} else {
// Extended functionality that allows filtering by label filters and time range
// i.e. /api/v1/label/foo/values?match[]=foobar{baz="abc"}&start=...&end=...
@ -692,12 +712,32 @@ func LabelsHandler(startTime time.Time, w http.ResponseWriter, r *http.Request)
return fmt.Errorf("cannot parse form values: %w", err)
}
var labels []string
if len(r.Form["match[]"]) == 0 && len(r.Form["start"]) == 0 && len(r.Form["end"]) == 0 {
if len(r.Form["match[]"]) == 0 {
if len(r.Form["start"]) == 0 && len(r.Form["end"]) == 0 {
var err error
labels, err = netstorage.GetLabels(deadline)
if err != nil {
return fmt.Errorf("cannot obtain labels: %w", err)
}
} else {
ct := startTime.UnixNano() / 1e6
end, err := searchutils.GetTime(r, "end", ct)
if err != nil {
return err
}
start, err := searchutils.GetTime(r, "start", end-defaultStep)
if err != nil {
return err
}
tr := storage.TimeRange{
MinTimestamp: start,
MaxTimestamp: end,
}
labels, err = netstorage.GetLabelsOnTimeRange(tr, deadline)
if err != nil {
return fmt.Errorf("cannot obtain labels on time range: %w", err)
}
}
} else {
// Extended functionality that allows filtering by label filters and time range
// i.e. /api/v1/labels?match[]=foobar{baz="abc"}&start=...&end=...

View file

@ -125,6 +125,14 @@ func DeleteMetrics(tfss []*storage.TagFilters) (int, error) {
return n, err
}
// SearchTagKeysOnTimeRange searches for tag keys on tr.
func SearchTagKeysOnTimeRange(tr storage.TimeRange, maxTagKeys int, deadline uint64) ([]string, error) {
WG.Add(1)
keys, err := Storage.SearchTagKeysOnTimeRange(tr, maxTagKeys, deadline)
WG.Done()
return keys, err
}
// SearchTagKeys searches for tag keys
func SearchTagKeys(maxTagKeys int, deadline uint64) ([]string, error) {
WG.Add(1)
@ -133,6 +141,14 @@ func SearchTagKeys(maxTagKeys int, deadline uint64) ([]string, error) {
return keys, err
}
// SearchTagValuesOnTimeRange searches for tag values for the given tagKey on tr.
func SearchTagValuesOnTimeRange(tagKey []byte, tr storage.TimeRange, maxTagValues int, deadline uint64) ([]string, error) {
WG.Add(1)
values, err := Storage.SearchTagValuesOnTimeRange(tagKey, tr, maxTagValues, deadline)
WG.Done()
return values, err
}
// SearchTagValues searches for tag values for the given tagKey
func SearchTagValues(tagKey []byte, maxTagValues int, deadline uint64) ([]string, error) {
WG.Add(1)

View file

@ -60,12 +60,12 @@ func MustClose() {
var promDB *tsdb.DB
// GetLabelNames returns label names.
func GetLabelNames(deadline searchutils.Deadline) ([]string, error) {
// GetLabelNamesOnTimeRange returns label names.
func GetLabelNamesOnTimeRange(tr storage.TimeRange, deadline searchutils.Deadline) ([]string, error) {
d := time.Unix(int64(deadline.Deadline()), 0)
ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
q, err := promDB.Querier(ctx, 0, d.UnixNano()/1e6)
q, err := promDB.Querier(ctx, tr.MinTimestamp, tr.MaxTimestamp)
if err != nil {
return nil, err
}
@ -77,12 +77,21 @@ func GetLabelNames(deadline searchutils.Deadline) ([]string, error) {
return names, err
}
// GetLabelValues returns values for the given labelName.
func GetLabelValues(labelName string, deadline searchutils.Deadline) ([]string, error) {
// GetLabelNames returns label names.
func GetLabelNames(deadline searchutils.Deadline) ([]string, error) {
tr := storage.TimeRange{
MinTimestamp: 0,
MaxTimestamp: time.Now().UnixNano() / 1e6,
}
return GetLabelNamesOnTimeRange(tr, deadline)
}
// GetLabelValuesOnTimeRange returns values for the given labelName on the given tr.
func GetLabelValuesOnTimeRange(labelName string, tr storage.TimeRange, deadline searchutils.Deadline) ([]string, error) {
d := time.Unix(int64(deadline.Deadline()), 0)
ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
q, err := promDB.Querier(ctx, 0, d.UnixNano()/1e6)
q, err := promDB.Querier(ctx, tr.MinTimestamp, tr.MaxTimestamp)
if err != nil {
return nil, err
}
@ -94,6 +103,15 @@ func GetLabelValues(labelName string, deadline searchutils.Deadline) ([]string,
return values, err
}
// GetLabelValues returns values for the given labelName.
func GetLabelValues(labelName string, deadline searchutils.Deadline) ([]string, error) {
tr := storage.TimeRange{
MinTimestamp: 0,
MaxTimestamp: time.Now().UnixNano() / 1e6,
}
return GetLabelValuesOnTimeRange(labelName, tr, deadline)
}
func copyStringsWithMemory(a []string) []string {
result := make([]string, len(a))
for i, s := range a {

View file

@ -6,6 +6,20 @@ and feel free asking for references, reviews and additional case studies from re
See also [articles about VictoriaMetrics from our users](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/Articles#third-party-articles-and-slides).
* [Adidas](#adidas)
* [CERN](#cern)
* [COLOPL](#colopl)
* [Zerodha](#zerodha)
* [Wix.com](#wixcom)
* [Wedos.com](#wedoscom)
* [Synthesio](#synthesio)
* [Dreamteam](#dreamteam)
* [Brandwatch](#brandwatch)
* [Adsterra](#adsterra)
* [ARNES](#arnes)
* [Idealo.de](#idealode)
## Adidas
See [slides](https://promcon.io/2019-munich/slides/remote-write-storage-wars.pdf) and [video](https://youtu.be/OsH6gPdxR4s)
@ -304,3 +318,21 @@ Grafana has a LB infront, so if one DC has problems, we can still view all metri
We are still in the process of migration, but we are really happy with the whole stack. It has proven as an essential piece
for insight into our services during COVID-19 and has enabled us to provide better service and spot problems faster.
## Idealo.de
[idealo.de](https://www.idealo.de/) is the leading price comparison website in Germany. We use Prometheus for metrics on our container platform.
When we introduced Prometheus at idealo we started with m3db as a longterm storage. In our setup m3db was quite unstable and consumed a lot of resources.
VictoriaMetrics runs very stable for us and uses only a fraction of the resources. Although we also increased our retention time from 1 month to 13 months.
Numbers:
- The number of active time series per VictoriaMetrics instance is 21M.
- Total ingestion rate 120k metrics per second.
- The total number of datapoints 3.1 trillion.
- The average time series churn rate is ~9M per day.
- The average query rate is ~20 per second. Response time for 99th quantile is 120ms.
- Retention: 13 months.
- Size of all datapoints: 3.5 TB

View file

@ -38,6 +38,7 @@ See [features available for enterprise customers](https://github.com/VictoriaMet
* [Brandwatch](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#brandwatch)
* [Adsterra](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#adsterra)
* [ARNES](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#arnes)
* [Idealo.de](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#idealode)
## Prominent features

View file

@ -219,6 +219,9 @@ It accepts optional `show_original_labels=1` query arg, which shows the original
This information may be useful for debugging target relabeling.
* `http://vmagent-host:8429/api/v1/targets`. This handler returns data compatible with [the corresponding page from Prometheus API](https://prometheus.io/docs/prometheus/latest/querying/api/#targets).
* `http://vmagent-host:8429/ready`. This handler returns http 200 status code when `vmagent` finishes initialization for all service_discovery configs.
It may be useful for performing `vmagent` rolling update without scrape loss.
### Troubleshooting

View file

@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -75,6 +76,9 @@ func Stop() {
var (
globalStopCh chan struct{}
scraperWG sync.WaitGroup
// PendingScrapeConfigs - zero value means, that
// all scrapeConfigs are inited and ready for work.
PendingScrapeConfigs int32
)
func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) {
@ -166,6 +170,7 @@ func newScrapeConfigs(pushData func(wr *prompbmarshal.WriteRequest)) *scrapeConf
}
func (scs *scrapeConfigs) add(name string, checkInterval time.Duration, getScrapeWork func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork) {
atomic.AddInt32(&PendingScrapeConfigs, 1)
scfg := &scrapeConfig{
name: name,
pushData: scs.pushData,
@ -216,10 +221,15 @@ func (scfg *scrapeConfig) run() {
cfg := <-scfg.cfgCh
var swsPrev []ScrapeWork
for {
updateScrapeWork := func(cfg *Config) {
sws := scfg.getScrapeWork(cfg, swsPrev)
sg.update(sws)
swsPrev = sws
}
updateScrapeWork(cfg)
atomic.AddInt32(&PendingScrapeConfigs, -1)
for {
select {
case <-scfg.stopCh:
@ -227,6 +237,7 @@ func (scfg *scrapeConfig) run() {
case cfg = <-scfg.cfgCh:
case <-tickerCh:
}
updateScrapeWork(cfg)
}
}

View file

@ -701,10 +701,118 @@ func putIndexItems(ii *indexItems) {
var indexItemsPool sync.Pool
// SearchTagKeysOnTimeRange returns all the tag keys on the given tr.
func (db *indexDB) SearchTagKeysOnTimeRange(tr TimeRange, maxTagKeys int, deadline uint64) ([]string, error) {
tks := make(map[string]struct{})
is := db.getIndexSearch(deadline)
err := is.searchTagKeysOnTimeRange(tks, tr, maxTagKeys)
db.putIndexSearch(is)
if err != nil {
return nil, err
}
ok := db.doExtDB(func(extDB *indexDB) {
is := extDB.getIndexSearch(deadline)
err = is.searchTagKeysOnTimeRange(tks, tr, maxTagKeys)
extDB.putIndexSearch(is)
})
if ok && err != nil {
return nil, err
}
keys := make([]string, 0, len(tks))
for key := range tks {
// Do not skip empty keys, since they are converted to __name__
keys = append(keys, key)
}
// Do not sort keys, since they must be sorted by vmselect.
return keys, nil
}
func (is *indexSearch) searchTagKeysOnTimeRange(tks map[string]struct{}, tr TimeRange, maxTagKeys int) error {
minDate := uint64(tr.MinTimestamp) / msecPerDay
maxDate := uint64(tr.MaxTimestamp) / msecPerDay
var mu sync.Mutex
var wg sync.WaitGroup
var errGlobal error
for date := minDate; date <= maxDate; date++ {
wg.Add(1)
go func(date uint64) {
defer wg.Done()
tksLocal := make(map[string]struct{})
isLocal := is.db.getIndexSearch(is.deadline)
err := isLocal.searchTagKeysOnDate(tksLocal, date, maxTagKeys)
is.db.putIndexSearch(isLocal)
mu.Lock()
defer mu.Unlock()
if errGlobal != nil {
return
}
if err != nil {
errGlobal = err
return
}
if len(tks) >= maxTagKeys {
return
}
for k := range tksLocal {
tks[k] = struct{}{}
}
}(date)
}
wg.Wait()
return errGlobal
}
func (is *indexSearch) searchTagKeysOnDate(tks map[string]struct{}, date uint64, maxTagKeys int) error {
ts := &is.ts
kb := &is.kb
mp := &is.mp
mp.Reset()
dmis := is.db.getDeletedMetricIDs()
loopsPaceLimiter := 0
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
kb.B = encoding.MarshalUint64(kb.B, date)
prefix := kb.B
ts.Seek(prefix)
for len(tks) < maxTagKeys && ts.NextItem() {
if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return err
}
}
loopsPaceLimiter++
item := ts.Item
if !bytes.HasPrefix(item, prefix) {
break
}
if err := mp.Init(item, nsPrefixDateTagToMetricIDs); err != nil {
return err
}
if mp.IsDeletedTag(dmis) {
continue
}
// Store tag key.
tks[string(mp.Tag.Key)] = struct{}{}
// Search for the next tag key.
// The last char in kb.B must be tagSeparatorChar.
// Just increment it in order to jump to the next tag key.
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
kb.B = encoding.MarshalUint64(kb.B, date)
kb.B = marshalTagValue(kb.B, mp.Tag.Key)
kb.B[len(kb.B)-1]++
ts.Seek(kb.B)
}
if err := ts.Error(); err != nil {
return fmt.Errorf("error during search for prefix %q: %w", prefix, err)
}
return nil
}
// SearchTagKeys returns all the tag keys.
func (db *indexDB) SearchTagKeys(maxTagKeys int, deadline uint64) ([]string, error) {
// TODO: cache results?
tks := make(map[string]struct{})
is := db.getIndexSearch(deadline)
@ -728,7 +836,6 @@ func (db *indexDB) SearchTagKeys(maxTagKeys int, deadline uint64) ([]string, err
// Do not skip empty keys, since they are converted to __name__
keys = append(keys, key)
}
// Do not sort keys, since they must be sorted by vmselect.
return keys, nil
}
@ -778,10 +885,129 @@ func (is *indexSearch) searchTagKeys(tks map[string]struct{}, maxTagKeys int) er
return nil
}
// SearchTagValuesOnTimeRange returns all the tag values for the given tagKey on tr.
func (db *indexDB) SearchTagValuesOnTimeRange(tagKey []byte, tr TimeRange, maxTagValues int, deadline uint64) ([]string, error) {
tvs := make(map[string]struct{})
is := db.getIndexSearch(deadline)
err := is.searchTagValuesOnTimeRange(tvs, tagKey, tr, maxTagValues)
db.putIndexSearch(is)
if err != nil {
return nil, err
}
ok := db.doExtDB(func(extDB *indexDB) {
is := extDB.getIndexSearch(deadline)
err = is.searchTagValuesOnTimeRange(tvs, tagKey, tr, maxTagValues)
extDB.putIndexSearch(is)
})
if ok && err != nil {
return nil, err
}
tagValues := make([]string, 0, len(tvs))
for tv := range tvs {
if len(tv) == 0 {
// Skip empty values, since they have no any meaning.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/600
continue
}
tagValues = append(tagValues, tv)
}
// Do not sort tagValues, since they must be sorted by vmselect.
return tagValues, nil
}
func (is *indexSearch) searchTagValuesOnTimeRange(tvs map[string]struct{}, tagKey []byte, tr TimeRange, maxTagValues int) error {
minDate := uint64(tr.MinTimestamp) / msecPerDay
maxDate := uint64(tr.MaxTimestamp) / msecPerDay
var mu sync.Mutex
var wg sync.WaitGroup
var errGlobal error
for date := minDate; date <= maxDate; date++ {
wg.Add(1)
go func(date uint64) {
defer wg.Done()
tvsLocal := make(map[string]struct{})
isLocal := is.db.getIndexSearch(is.deadline)
err := isLocal.searchTagValuesOnDate(tvsLocal, tagKey, date, maxTagValues)
is.db.putIndexSearch(isLocal)
mu.Lock()
defer mu.Unlock()
if errGlobal != nil {
return
}
if err != nil {
errGlobal = err
return
}
if len(tvs) >= maxTagValues {
return
}
for v := range tvsLocal {
tvs[v] = struct{}{}
}
}(date)
}
wg.Wait()
return errGlobal
}
func (is *indexSearch) searchTagValuesOnDate(tvs map[string]struct{}, tagKey []byte, date uint64, maxTagValues int) error {
ts := &is.ts
kb := &is.kb
mp := &is.mp
mp.Reset()
dmis := is.db.getDeletedMetricIDs()
loopsPaceLimiter := 0
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
kb.B = encoding.MarshalUint64(kb.B, date)
kb.B = marshalTagValue(kb.B, tagKey)
prefix := kb.B
ts.Seek(prefix)
for len(tvs) < maxTagValues && ts.NextItem() {
if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return err
}
}
loopsPaceLimiter++
item := ts.Item
if !bytes.HasPrefix(item, prefix) {
break
}
if err := mp.Init(item, nsPrefixDateTagToMetricIDs); err != nil {
return err
}
if mp.IsDeletedTag(dmis) {
continue
}
// Store tag value
tvs[string(mp.Tag.Value)] = struct{}{}
if mp.MetricIDsLen() < maxMetricIDsPerRow/2 {
// There is no need in searching for the next tag value,
// since it is likely it is located in the next row,
// because the current row contains incomplete metricIDs set.
continue
}
// Search for the next tag value.
// The last char in kb.B must be tagSeparatorChar.
// Just increment it in order to jump to the next tag value.
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
kb.B = encoding.MarshalUint64(kb.B, date)
kb.B = marshalTagValue(kb.B, mp.Tag.Key)
kb.B = marshalTagValue(kb.B, mp.Tag.Value)
kb.B[len(kb.B)-1]++
ts.Seek(kb.B)
}
if err := ts.Error(); err != nil {
return fmt.Errorf("error when searching for tag name prefix %q: %w", prefix, err)
}
return nil
}
// SearchTagValues returns all the tag values for the given tagKey
func (db *indexDB) SearchTagValues(tagKey []byte, maxTagValues int, deadline uint64) ([]string, error) {
// TODO: cache results?
tvs := make(map[string]struct{})
is := db.getIndexSearch(deadline)
err := is.searchTagValues(tvs, tagKey, maxTagValues)
@ -807,7 +1033,6 @@ func (db *indexDB) SearchTagValues(tagKey []byte, maxTagValues int, deadline uin
}
tagValues = append(tagValues, tv)
}
// Do not sort tagValues, since they must be sorted by vmselect.
return tagValues, nil
}

View file

@ -8,6 +8,7 @@ import (
"os"
"reflect"
"regexp"
"sort"
"testing"
"time"
@ -1487,6 +1488,13 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
var metricNameBuf []byte
perDayMetricIDs := make(map[uint64]*uint64set.Set)
var allMetricIDs uint64set.Set
tagKeys := []string{
"", "constant", "day", "uniqueid",
}
tagValues := []string{
"testMetric",
}
sort.Strings(tagKeys)
for day := 0; day < days; day++ {
var tsids []TSID
for metric := 0; metric < metricsPerDay; metric++ {
@ -1554,6 +1562,32 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
t.Fatalf("unexpected metricIDs found;\ngot\n%d\nwant\n%d", metricIDs.AppendTo(nil), allMetricIDs.AppendTo(nil))
}
// Check SearchTagKeysOnTimeRange.
tks, err := db.SearchTagKeysOnTimeRange(TimeRange{
MinTimestamp: int64(now) - msecPerDay,
MaxTimestamp: int64(now),
}, 10000, noDeadline)
if err != nil {
t.Fatalf("unexpected error in SearchTagKeysOnTimeRange: %s", err)
}
sort.Strings(tks)
if !reflect.DeepEqual(tks, tagKeys) {
t.Fatalf("unexpected tagKeys; got\n%s\nwant\n%s", tks, tagKeys)
}
// Check SearchTagValuesOnTimeRange.
tvs, err := db.SearchTagValuesOnTimeRange([]byte(""), TimeRange{
MinTimestamp: int64(now) - msecPerDay,
MaxTimestamp: int64(now),
}, 10000, noDeadline)
if err != nil {
t.Fatalf("unexpected error in SearchTagValuesOnTimeRange: %s", err)
}
sort.Strings(tvs)
if !reflect.DeepEqual(tvs, tagValues) {
t.Fatalf("unexpected tagValues; got\n%s\nwant\n%s", tvs, tagValues)
}
// Create a filter that will match series that occur across multiple days
tfs := NewTagFilters()
if err := tfs.Add([]byte("constant"), []byte("const"), false, false); err != nil {

View file

@ -3,6 +3,7 @@ package storage
import (
"fmt"
"sync"
"sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -53,12 +54,12 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc
}
if dmis.Has(bsm.Block.bh.TSID.MetricID) {
// Skip blocks for deleted metrics.
*rowsDeleted += uint64(bsm.Block.bh.RowsCount)
atomic.AddUint64(rowsDeleted, uint64(bsm.Block.bh.RowsCount))
continue
}
if bsm.Block.bh.MaxTimestamp < retentionDeadline {
// Skip blocks out of the given retention.
*rowsDeleted += uint64(bsm.Block.bh.RowsCount)
atomic.AddUint64(rowsDeleted, uint64(bsm.Block.bh.RowsCount))
continue
}
if pendingBlockIsEmpty {
@ -181,7 +182,7 @@ func skipSamplesOutsideRetention(b *Block, retentionDeadline int64, rowsDeleted
for nextIdx < len(timestamps) && timestamps[nextIdx] < retentionDeadline {
nextIdx++
}
*rowsDeleted += uint64(nextIdx - b.nextIdx)
atomic.AddUint64(rowsDeleted, uint64(nextIdx-b.nextIdx))
b.nextIdx = nextIdx
}

View file

@ -921,11 +921,21 @@ func (s *Storage) searchMetricName(dst []byte, metricID uint64) ([]byte, error)
return s.idb().searchMetricName(dst, metricID)
}
// SearchTagKeysOnTimeRange searches for tag keys on tr.
func (s *Storage) SearchTagKeysOnTimeRange(tr TimeRange, maxTagKeys int, deadline uint64) ([]string, error) {
return s.idb().SearchTagKeysOnTimeRange(tr, maxTagKeys, deadline)
}
// SearchTagKeys searches for tag keys
func (s *Storage) SearchTagKeys(maxTagKeys int, deadline uint64) ([]string, error) {
return s.idb().SearchTagKeys(maxTagKeys, deadline)
}
// SearchTagValuesOnTimeRange searches for tag values for the given tagKey on tr.
func (s *Storage) SearchTagValuesOnTimeRange(tagKey []byte, tr TimeRange, maxTagValues int, deadline uint64) ([]string, error) {
return s.idb().SearchTagValuesOnTimeRange(tagKey, tr, maxTagValues, deadline)
}
// SearchTagValues searches for tag values for the given tagKey
func (s *Storage) SearchTagValues(tagKey []byte, maxTagValues int, deadline uint64) ([]string, error) {
return s.idb().SearchTagValues(tagKey, maxTagValues, deadline)