diff --git a/CHANGELOG.md b/CHANGELOG.md index 3a100d7c8..7cce09af9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # tip - +* FEATURE: optimize requests to `/api/v1/labels` and `/api/v1/label//values` when `start` and `end` args are set. * FEATURE: reduce memory usage when query touches big number of time series. * FEATURE: vmagent: reduce memory usage when `kubernetes_sd_config` discovers big number of scrape targets (e.g. hundreds of thouthands) and the majority of these targets (99%) are dropped during relabeling. Previously labels for all the dropped targets were displayed at `/api/v1/targets` page. Now only up to `-promscrape.maxDroppedTargets` such diff --git a/README.md b/README.md index 40b212b6d..c110a6d90 100644 --- a/README.md +++ b/README.md @@ -38,6 +38,7 @@ See [features available for enterprise customers](https://github.com/VictoriaMet * [Brandwatch](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#brandwatch) * [Adsterra](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#adsterra) * [ARNES](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#arnes) +* [Idealo.de](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#idealode) ## Prominent features diff --git a/app/vmagent/README.md b/app/vmagent/README.md index 6d132ad5a..612a62108 100644 --- a/app/vmagent/README.md +++ b/app/vmagent/README.md @@ -219,6 +219,9 @@ It accepts optional `show_original_labels=1` query arg, which shows the original This information may be useful for debugging target relabeling. * `http://vmagent-host:8429/api/v1/targets`. This handler returns data compatible with [the corresponding page from Prometheus API](https://prometheus.io/docs/prometheus/latest/querying/api/#targets). +* `http://vmagent-host:8429/ready`. This handler returns http 200 status code when `vmagent` finishes initialization for all service_discovery configs. +It may be useful for performing `vmagent` rolling update without scrape loss. + ### Troubleshooting diff --git a/app/vmagent/main.go b/app/vmagent/main.go index d3fa1d6b9..4d2b7acb5 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -7,6 +7,7 @@ import ( "os" "strconv" "strings" + "sync/atomic" "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/csvimport" @@ -222,6 +223,16 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { procutil.SelfSIGHUP() w.WriteHeader(http.StatusOK) return true + case "/ready": + if rdy := atomic.LoadInt32(&promscrape.PendingScrapeConfigs); rdy > 0 { + errMsg := fmt.Sprintf("waiting for scrapes to init, left: %d", rdy) + http.Error(w, errMsg, http.StatusTooEarly) + } else { + w.Header().Set("Content-Type", "text/plain") + w.WriteHeader(http.StatusOK) + w.Write([]byte("OK")) + } + return true } return false } diff --git a/app/vminsert/main.go b/app/vminsert/main.go index 504922c94..1ab37655d 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -170,6 +170,16 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { procutil.SelfSIGHUP() w.WriteHeader(http.StatusNoContent) return true + case "/ready": + if rdy := atomic.LoadInt32(&promscrape.PendingScrapeConfigs); rdy > 0 { + errMsg := fmt.Sprintf("waiting for scrape config to init targets, configs left: %d", rdy) + http.Error(w, errMsg, http.StatusTooEarly) + } else { + w.Header().Set("Content-Type", "text/plain") + w.WriteHeader(http.StatusOK) + w.Write([]byte("OK")) + } + return true default: // This is not our link return false diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index e47467e99..ccffc49e7 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -495,6 +495,34 @@ func DeleteSeries(sq *storage.SearchQuery) (int, error) { return vmstorage.DeleteMetrics(tfss) } +// GetLabelsOnTimeRange returns labels for the given tr until the given deadline. +func GetLabelsOnTimeRange(tr storage.TimeRange, deadline searchutils.Deadline) ([]string, error) { + if deadline.Exceeded() { + return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) + } + labels, err := vmstorage.SearchTagKeysOnTimeRange(tr, *maxTagKeysPerSearch, deadline.Deadline()) + if err != nil { + return nil, fmt.Errorf("error during labels search on time range: %w", err) + } + // Substitute "" with "__name__" + for i := range labels { + if labels[i] == "" { + labels[i] = "__name__" + } + } + + // Merge labels obtained from Prometheus storage. + promLabels, err := promdb.GetLabelNamesOnTimeRange(tr, deadline) + if err != nil { + return nil, fmt.Errorf("cannot obtain labels from Prometheus storage: %w", err) + } + labels = mergeStrings(labels, promLabels) + + // Sort labels like Prometheus does + sort.Strings(labels) + return labels, nil +} + // GetLabels returns labels until the given deadline. func GetLabels(deadline searchutils.Deadline) ([]string, error) { if deadline.Exceeded() { @@ -504,7 +532,6 @@ func GetLabels(deadline searchutils.Deadline) ([]string, error) { if err != nil { return nil, fmt.Errorf("error during labels search: %w", err) } - // Substitute "" with "__name__" for i := range labels { if labels[i] == "" { @@ -521,7 +548,6 @@ func GetLabels(deadline searchutils.Deadline) ([]string, error) { // Sort labels like Prometheus does sort.Strings(labels) - return labels, nil } @@ -546,6 +572,33 @@ func mergeStrings(a, b []string) []string { return result } +// GetLabelValuesOnTimeRange returns label values for the given labelName on the given tr +// until the given deadline. +func GetLabelValuesOnTimeRange(labelName string, tr storage.TimeRange, deadline searchutils.Deadline) ([]string, error) { + if deadline.Exceeded() { + return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) + } + if labelName == "__name__" { + labelName = "" + } + // Search for tag values + labelValues, err := vmstorage.SearchTagValuesOnTimeRange([]byte(labelName), tr, *maxTagValuesPerSearch, deadline.Deadline()) + if err != nil { + return nil, fmt.Errorf("error during label values search on time range for labelName=%q: %w", labelName, err) + } + + // Merge label values obtained from Prometheus storage. + promLabelValues, err := promdb.GetLabelValuesOnTimeRange(labelName, tr, deadline) + if err != nil { + return nil, fmt.Errorf("cannot obtain label values on time range for %q from Prometheus storage: %w", labelName, err) + } + labelValues = mergeStrings(labelValues, promLabelValues) + + // Sort labelValues like Prometheus does + sort.Strings(labelValues) + return labelValues, nil +} + // GetLabelValues returns label values for the given labelName // until the given deadline. func GetLabelValues(labelName string, deadline searchutils.Deadline) ([]string, error) { @@ -555,7 +608,6 @@ func GetLabelValues(labelName string, deadline searchutils.Deadline) ([]string, if labelName == "__name__" { labelName = "" } - // Search for tag values labelValues, err := vmstorage.SearchTagValues([]byte(labelName), *maxTagValuesPerSearch, deadline.Deadline()) if err != nil { @@ -571,7 +623,6 @@ func GetLabelValues(labelName string, deadline searchutils.Deadline) ([]string, // Sort labelValues like Prometheus does sort.Strings(labelValues) - return labelValues, nil } diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index f5eebc035..cdee46c67 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -511,11 +511,31 @@ func LabelValuesHandler(startTime time.Time, labelName string, w http.ResponseWr return fmt.Errorf("cannot parse form values: %w", err) } var labelValues []string - if len(r.Form["match[]"]) == 0 && len(r.Form["start"]) == 0 && len(r.Form["end"]) == 0 { - var err error - labelValues, err = netstorage.GetLabelValues(labelName, deadline) - if err != nil { - return fmt.Errorf(`cannot obtain label values for %q: %w`, labelName, err) + if len(r.Form["match[]"]) == 0 { + if len(r.Form["start"]) == 0 && len(r.Form["end"]) == 0 { + var err error + labelValues, err = netstorage.GetLabelValues(labelName, deadline) + if err != nil { + return fmt.Errorf(`cannot obtain label values for %q: %w`, labelName, err) + } + } else { + ct := startTime.UnixNano() / 1e6 + end, err := searchutils.GetTime(r, "end", ct) + if err != nil { + return err + } + start, err := searchutils.GetTime(r, "start", end-defaultStep) + if err != nil { + return err + } + tr := storage.TimeRange{ + MinTimestamp: start, + MaxTimestamp: end, + } + labelValues, err = netstorage.GetLabelValuesOnTimeRange(labelName, tr, deadline) + if err != nil { + return fmt.Errorf(`cannot obtain label values on time range for %q: %w`, labelName, err) + } } } else { // Extended functionality that allows filtering by label filters and time range @@ -692,11 +712,31 @@ func LabelsHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) return fmt.Errorf("cannot parse form values: %w", err) } var labels []string - if len(r.Form["match[]"]) == 0 && len(r.Form["start"]) == 0 && len(r.Form["end"]) == 0 { - var err error - labels, err = netstorage.GetLabels(deadline) - if err != nil { - return fmt.Errorf("cannot obtain labels: %w", err) + if len(r.Form["match[]"]) == 0 { + if len(r.Form["start"]) == 0 && len(r.Form["end"]) == 0 { + var err error + labels, err = netstorage.GetLabels(deadline) + if err != nil { + return fmt.Errorf("cannot obtain labels: %w", err) + } + } else { + ct := startTime.UnixNano() / 1e6 + end, err := searchutils.GetTime(r, "end", ct) + if err != nil { + return err + } + start, err := searchutils.GetTime(r, "start", end-defaultStep) + if err != nil { + return err + } + tr := storage.TimeRange{ + MinTimestamp: start, + MaxTimestamp: end, + } + labels, err = netstorage.GetLabelsOnTimeRange(tr, deadline) + if err != nil { + return fmt.Errorf("cannot obtain labels on time range: %w", err) + } } } else { // Extended functionality that allows filtering by label filters and time range diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 073889791..82b58aacd 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -125,6 +125,14 @@ func DeleteMetrics(tfss []*storage.TagFilters) (int, error) { return n, err } +// SearchTagKeysOnTimeRange searches for tag keys on tr. +func SearchTagKeysOnTimeRange(tr storage.TimeRange, maxTagKeys int, deadline uint64) ([]string, error) { + WG.Add(1) + keys, err := Storage.SearchTagKeysOnTimeRange(tr, maxTagKeys, deadline) + WG.Done() + return keys, err +} + // SearchTagKeys searches for tag keys func SearchTagKeys(maxTagKeys int, deadline uint64) ([]string, error) { WG.Add(1) @@ -133,6 +141,14 @@ func SearchTagKeys(maxTagKeys int, deadline uint64) ([]string, error) { return keys, err } +// SearchTagValuesOnTimeRange searches for tag values for the given tagKey on tr. +func SearchTagValuesOnTimeRange(tagKey []byte, tr storage.TimeRange, maxTagValues int, deadline uint64) ([]string, error) { + WG.Add(1) + values, err := Storage.SearchTagValuesOnTimeRange(tagKey, tr, maxTagValues, deadline) + WG.Done() + return values, err +} + // SearchTagValues searches for tag values for the given tagKey func SearchTagValues(tagKey []byte, maxTagValues int, deadline uint64) ([]string, error) { WG.Add(1) diff --git a/app/vmstorage/promdb/promdb.go b/app/vmstorage/promdb/promdb.go index 11824bc27..5ced884be 100644 --- a/app/vmstorage/promdb/promdb.go +++ b/app/vmstorage/promdb/promdb.go @@ -60,12 +60,12 @@ func MustClose() { var promDB *tsdb.DB -// GetLabelNames returns label names. -func GetLabelNames(deadline searchutils.Deadline) ([]string, error) { +// GetLabelNamesOnTimeRange returns label names. +func GetLabelNamesOnTimeRange(tr storage.TimeRange, deadline searchutils.Deadline) ([]string, error) { d := time.Unix(int64(deadline.Deadline()), 0) ctx, cancel := context.WithDeadline(context.Background(), d) defer cancel() - q, err := promDB.Querier(ctx, 0, d.UnixNano()/1e6) + q, err := promDB.Querier(ctx, tr.MinTimestamp, tr.MaxTimestamp) if err != nil { return nil, err } @@ -77,12 +77,21 @@ func GetLabelNames(deadline searchutils.Deadline) ([]string, error) { return names, err } -// GetLabelValues returns values for the given labelName. -func GetLabelValues(labelName string, deadline searchutils.Deadline) ([]string, error) { +// GetLabelNames returns label names. +func GetLabelNames(deadline searchutils.Deadline) ([]string, error) { + tr := storage.TimeRange{ + MinTimestamp: 0, + MaxTimestamp: time.Now().UnixNano() / 1e6, + } + return GetLabelNamesOnTimeRange(tr, deadline) +} + +// GetLabelValuesOnTimeRange returns values for the given labelName on the given tr. +func GetLabelValuesOnTimeRange(labelName string, tr storage.TimeRange, deadline searchutils.Deadline) ([]string, error) { d := time.Unix(int64(deadline.Deadline()), 0) ctx, cancel := context.WithDeadline(context.Background(), d) defer cancel() - q, err := promDB.Querier(ctx, 0, d.UnixNano()/1e6) + q, err := promDB.Querier(ctx, tr.MinTimestamp, tr.MaxTimestamp) if err != nil { return nil, err } @@ -94,6 +103,15 @@ func GetLabelValues(labelName string, deadline searchutils.Deadline) ([]string, return values, err } +// GetLabelValues returns values for the given labelName. +func GetLabelValues(labelName string, deadline searchutils.Deadline) ([]string, error) { + tr := storage.TimeRange{ + MinTimestamp: 0, + MaxTimestamp: time.Now().UnixNano() / 1e6, + } + return GetLabelValuesOnTimeRange(labelName, tr, deadline) +} + func copyStringsWithMemory(a []string) []string { result := make([]string, len(a)) for i, s := range a { diff --git a/docs/CaseStudies.md b/docs/CaseStudies.md index 989706672..bdb0e0a87 100644 --- a/docs/CaseStudies.md +++ b/docs/CaseStudies.md @@ -6,6 +6,20 @@ and feel free asking for references, reviews and additional case studies from re See also [articles about VictoriaMetrics from our users](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/Articles#third-party-articles-and-slides). +* [Adidas](#adidas) +* [CERN](#cern) +* [COLOPL](#colopl) +* [Zerodha](#zerodha) +* [Wix.com](#wixcom) +* [Wedos.com](#wedoscom) +* [Synthesio](#synthesio) +* [Dreamteam](#dreamteam) +* [Brandwatch](#brandwatch) +* [Adsterra](#adsterra) +* [ARNES](#arnes) +* [Idealo.de](#idealode) + + ## Adidas See [slides](https://promcon.io/2019-munich/slides/remote-write-storage-wars.pdf) and [video](https://youtu.be/OsH6gPdxR4s) @@ -304,3 +318,21 @@ Grafana has a LB infront, so if one DC has problems, we can still view all metri We are still in the process of migration, but we are really happy with the whole stack. It has proven as an essential piece for insight into our services during COVID-19 and has enabled us to provide better service and spot problems faster. + + +## Idealo.de + +[idealo.de](https://www.idealo.de/) is the leading price comparison website in Germany. We use Prometheus for metrics on our container platform. +When we introduced Prometheus at idealo we started with m3db as a longterm storage. In our setup m3db was quite unstable and consumed a lot of resources. + +VictoriaMetrics runs very stable for us and uses only a fraction of the resources. Although we also increased our retention time from 1 month to 13 months. + +Numbers: + +- The number of active time series per VictoriaMetrics instance is 21M. +- Total ingestion rate 120k metrics per second. +- The total number of datapoints 3.1 trillion. +- The average time series churn rate is ~9M per day. +- The average query rate is ~20 per second. Response time for 99th quantile is 120ms. +- Retention: 13 months. +- Size of all datapoints: 3.5 TB diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index 40b212b6d..c110a6d90 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -38,6 +38,7 @@ See [features available for enterprise customers](https://github.com/VictoriaMet * [Brandwatch](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#brandwatch) * [Adsterra](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#adsterra) * [ARNES](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#arnes) +* [Idealo.de](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#idealode) ## Prominent features diff --git a/docs/vmagent.md b/docs/vmagent.md index 6d132ad5a..612a62108 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -219,6 +219,9 @@ It accepts optional `show_original_labels=1` query arg, which shows the original This information may be useful for debugging target relabeling. * `http://vmagent-host:8429/api/v1/targets`. This handler returns data compatible with [the corresponding page from Prometheus API](https://prometheus.io/docs/prometheus/latest/querying/api/#targets). +* `http://vmagent-host:8429/ready`. This handler returns http 200 status code when `vmagent` finishes initialization for all service_discovery configs. +It may be useful for performing `vmagent` rolling update without scrape loss. + ### Troubleshooting diff --git a/lib/promscrape/scraper.go b/lib/promscrape/scraper.go index 341e66462..52f99a97e 100644 --- a/lib/promscrape/scraper.go +++ b/lib/promscrape/scraper.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "sync" + "sync/atomic" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -75,6 +76,9 @@ func Stop() { var ( globalStopCh chan struct{} scraperWG sync.WaitGroup + // PendingScrapeConfigs - zero value means, that + // all scrapeConfigs are inited and ready for work. + PendingScrapeConfigs int32 ) func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) { @@ -166,6 +170,7 @@ func newScrapeConfigs(pushData func(wr *prompbmarshal.WriteRequest)) *scrapeConf } func (scs *scrapeConfigs) add(name string, checkInterval time.Duration, getScrapeWork func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork) { + atomic.AddInt32(&PendingScrapeConfigs, 1) scfg := &scrapeConfig{ name: name, pushData: scs.pushData, @@ -216,10 +221,15 @@ func (scfg *scrapeConfig) run() { cfg := <-scfg.cfgCh var swsPrev []ScrapeWork - for { + updateScrapeWork := func(cfg *Config) { sws := scfg.getScrapeWork(cfg, swsPrev) sg.update(sws) swsPrev = sws + } + updateScrapeWork(cfg) + atomic.AddInt32(&PendingScrapeConfigs, -1) + + for { select { case <-scfg.stopCh: @@ -227,6 +237,7 @@ func (scfg *scrapeConfig) run() { case cfg = <-scfg.cfgCh: case <-tickerCh: } + updateScrapeWork(cfg) } } diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 63b52eff1..aecd4ea44 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -701,10 +701,118 @@ func putIndexItems(ii *indexItems) { var indexItemsPool sync.Pool +// SearchTagKeysOnTimeRange returns all the tag keys on the given tr. +func (db *indexDB) SearchTagKeysOnTimeRange(tr TimeRange, maxTagKeys int, deadline uint64) ([]string, error) { + tks := make(map[string]struct{}) + is := db.getIndexSearch(deadline) + err := is.searchTagKeysOnTimeRange(tks, tr, maxTagKeys) + db.putIndexSearch(is) + if err != nil { + return nil, err + } + + ok := db.doExtDB(func(extDB *indexDB) { + is := extDB.getIndexSearch(deadline) + err = is.searchTagKeysOnTimeRange(tks, tr, maxTagKeys) + extDB.putIndexSearch(is) + }) + if ok && err != nil { + return nil, err + } + + keys := make([]string, 0, len(tks)) + for key := range tks { + // Do not skip empty keys, since they are converted to __name__ + keys = append(keys, key) + } + // Do not sort keys, since they must be sorted by vmselect. + return keys, nil +} + +func (is *indexSearch) searchTagKeysOnTimeRange(tks map[string]struct{}, tr TimeRange, maxTagKeys int) error { + minDate := uint64(tr.MinTimestamp) / msecPerDay + maxDate := uint64(tr.MaxTimestamp) / msecPerDay + var mu sync.Mutex + var wg sync.WaitGroup + var errGlobal error + for date := minDate; date <= maxDate; date++ { + wg.Add(1) + go func(date uint64) { + defer wg.Done() + tksLocal := make(map[string]struct{}) + isLocal := is.db.getIndexSearch(is.deadline) + err := isLocal.searchTagKeysOnDate(tksLocal, date, maxTagKeys) + is.db.putIndexSearch(isLocal) + mu.Lock() + defer mu.Unlock() + if errGlobal != nil { + return + } + if err != nil { + errGlobal = err + return + } + if len(tks) >= maxTagKeys { + return + } + for k := range tksLocal { + tks[k] = struct{}{} + } + }(date) + } + wg.Wait() + return errGlobal +} + +func (is *indexSearch) searchTagKeysOnDate(tks map[string]struct{}, date uint64, maxTagKeys int) error { + ts := &is.ts + kb := &is.kb + mp := &is.mp + mp.Reset() + dmis := is.db.getDeletedMetricIDs() + loopsPaceLimiter := 0 + kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs) + kb.B = encoding.MarshalUint64(kb.B, date) + prefix := kb.B + ts.Seek(prefix) + for len(tks) < maxTagKeys && ts.NextItem() { + if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 { + if err := checkSearchDeadlineAndPace(is.deadline); err != nil { + return err + } + } + loopsPaceLimiter++ + item := ts.Item + if !bytes.HasPrefix(item, prefix) { + break + } + if err := mp.Init(item, nsPrefixDateTagToMetricIDs); err != nil { + return err + } + if mp.IsDeletedTag(dmis) { + continue + } + + // Store tag key. + tks[string(mp.Tag.Key)] = struct{}{} + + // Search for the next tag key. + // The last char in kb.B must be tagSeparatorChar. + // Just increment it in order to jump to the next tag key. + kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs) + kb.B = encoding.MarshalUint64(kb.B, date) + kb.B = marshalTagValue(kb.B, mp.Tag.Key) + kb.B[len(kb.B)-1]++ + ts.Seek(kb.B) + } + if err := ts.Error(); err != nil { + return fmt.Errorf("error during search for prefix %q: %w", prefix, err) + } + return nil +} + // SearchTagKeys returns all the tag keys. func (db *indexDB) SearchTagKeys(maxTagKeys int, deadline uint64) ([]string, error) { - // TODO: cache results? - tks := make(map[string]struct{}) is := db.getIndexSearch(deadline) @@ -728,7 +836,6 @@ func (db *indexDB) SearchTagKeys(maxTagKeys int, deadline uint64) ([]string, err // Do not skip empty keys, since they are converted to __name__ keys = append(keys, key) } - // Do not sort keys, since they must be sorted by vmselect. return keys, nil } @@ -778,10 +885,129 @@ func (is *indexSearch) searchTagKeys(tks map[string]struct{}, maxTagKeys int) er return nil } +// SearchTagValuesOnTimeRange returns all the tag values for the given tagKey on tr. +func (db *indexDB) SearchTagValuesOnTimeRange(tagKey []byte, tr TimeRange, maxTagValues int, deadline uint64) ([]string, error) { + tvs := make(map[string]struct{}) + is := db.getIndexSearch(deadline) + err := is.searchTagValuesOnTimeRange(tvs, tagKey, tr, maxTagValues) + db.putIndexSearch(is) + if err != nil { + return nil, err + } + ok := db.doExtDB(func(extDB *indexDB) { + is := extDB.getIndexSearch(deadline) + err = is.searchTagValuesOnTimeRange(tvs, tagKey, tr, maxTagValues) + extDB.putIndexSearch(is) + }) + if ok && err != nil { + return nil, err + } + + tagValues := make([]string, 0, len(tvs)) + for tv := range tvs { + if len(tv) == 0 { + // Skip empty values, since they have no any meaning. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/600 + continue + } + tagValues = append(tagValues, tv) + } + // Do not sort tagValues, since they must be sorted by vmselect. + return tagValues, nil +} + +func (is *indexSearch) searchTagValuesOnTimeRange(tvs map[string]struct{}, tagKey []byte, tr TimeRange, maxTagValues int) error { + minDate := uint64(tr.MinTimestamp) / msecPerDay + maxDate := uint64(tr.MaxTimestamp) / msecPerDay + var mu sync.Mutex + var wg sync.WaitGroup + var errGlobal error + for date := minDate; date <= maxDate; date++ { + wg.Add(1) + go func(date uint64) { + defer wg.Done() + tvsLocal := make(map[string]struct{}) + isLocal := is.db.getIndexSearch(is.deadline) + err := isLocal.searchTagValuesOnDate(tvsLocal, tagKey, date, maxTagValues) + is.db.putIndexSearch(isLocal) + mu.Lock() + defer mu.Unlock() + if errGlobal != nil { + return + } + if err != nil { + errGlobal = err + return + } + if len(tvs) >= maxTagValues { + return + } + for v := range tvsLocal { + tvs[v] = struct{}{} + } + }(date) + } + wg.Wait() + return errGlobal +} + +func (is *indexSearch) searchTagValuesOnDate(tvs map[string]struct{}, tagKey []byte, date uint64, maxTagValues int) error { + ts := &is.ts + kb := &is.kb + mp := &is.mp + mp.Reset() + dmis := is.db.getDeletedMetricIDs() + loopsPaceLimiter := 0 + kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs) + kb.B = encoding.MarshalUint64(kb.B, date) + kb.B = marshalTagValue(kb.B, tagKey) + prefix := kb.B + ts.Seek(prefix) + for len(tvs) < maxTagValues && ts.NextItem() { + if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 { + if err := checkSearchDeadlineAndPace(is.deadline); err != nil { + return err + } + } + loopsPaceLimiter++ + item := ts.Item + if !bytes.HasPrefix(item, prefix) { + break + } + if err := mp.Init(item, nsPrefixDateTagToMetricIDs); err != nil { + return err + } + if mp.IsDeletedTag(dmis) { + continue + } + + // Store tag value + tvs[string(mp.Tag.Value)] = struct{}{} + + if mp.MetricIDsLen() < maxMetricIDsPerRow/2 { + // There is no need in searching for the next tag value, + // since it is likely it is located in the next row, + // because the current row contains incomplete metricIDs set. + continue + } + // Search for the next tag value. + // The last char in kb.B must be tagSeparatorChar. + // Just increment it in order to jump to the next tag value. + kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs) + kb.B = encoding.MarshalUint64(kb.B, date) + kb.B = marshalTagValue(kb.B, mp.Tag.Key) + kb.B = marshalTagValue(kb.B, mp.Tag.Value) + kb.B[len(kb.B)-1]++ + ts.Seek(kb.B) + } + if err := ts.Error(); err != nil { + return fmt.Errorf("error when searching for tag name prefix %q: %w", prefix, err) + } + return nil +} + // SearchTagValues returns all the tag values for the given tagKey func (db *indexDB) SearchTagValues(tagKey []byte, maxTagValues int, deadline uint64) ([]string, error) { - // TODO: cache results? - tvs := make(map[string]struct{}) is := db.getIndexSearch(deadline) err := is.searchTagValues(tvs, tagKey, maxTagValues) @@ -807,7 +1033,6 @@ func (db *indexDB) SearchTagValues(tagKey []byte, maxTagValues int, deadline uin } tagValues = append(tagValues, tv) } - // Do not sort tagValues, since they must be sorted by vmselect. return tagValues, nil } diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index 3da1236fb..eecf1b64f 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -8,6 +8,7 @@ import ( "os" "reflect" "regexp" + "sort" "testing" "time" @@ -1487,6 +1488,13 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { var metricNameBuf []byte perDayMetricIDs := make(map[uint64]*uint64set.Set) var allMetricIDs uint64set.Set + tagKeys := []string{ + "", "constant", "day", "uniqueid", + } + tagValues := []string{ + "testMetric", + } + sort.Strings(tagKeys) for day := 0; day < days; day++ { var tsids []TSID for metric := 0; metric < metricsPerDay; metric++ { @@ -1554,6 +1562,32 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { t.Fatalf("unexpected metricIDs found;\ngot\n%d\nwant\n%d", metricIDs.AppendTo(nil), allMetricIDs.AppendTo(nil)) } + // Check SearchTagKeysOnTimeRange. + tks, err := db.SearchTagKeysOnTimeRange(TimeRange{ + MinTimestamp: int64(now) - msecPerDay, + MaxTimestamp: int64(now), + }, 10000, noDeadline) + if err != nil { + t.Fatalf("unexpected error in SearchTagKeysOnTimeRange: %s", err) + } + sort.Strings(tks) + if !reflect.DeepEqual(tks, tagKeys) { + t.Fatalf("unexpected tagKeys; got\n%s\nwant\n%s", tks, tagKeys) + } + + // Check SearchTagValuesOnTimeRange. + tvs, err := db.SearchTagValuesOnTimeRange([]byte(""), TimeRange{ + MinTimestamp: int64(now) - msecPerDay, + MaxTimestamp: int64(now), + }, 10000, noDeadline) + if err != nil { + t.Fatalf("unexpected error in SearchTagValuesOnTimeRange: %s", err) + } + sort.Strings(tvs) + if !reflect.DeepEqual(tvs, tagValues) { + t.Fatalf("unexpected tagValues; got\n%s\nwant\n%s", tvs, tagValues) + } + // Create a filter that will match series that occur across multiple days tfs := NewTagFilters() if err := tfs.Add([]byte("constant"), []byte("const"), false, false); err != nil { diff --git a/lib/storage/merge.go b/lib/storage/merge.go index 0fc16d434..073361064 100644 --- a/lib/storage/merge.go +++ b/lib/storage/merge.go @@ -3,6 +3,7 @@ package storage import ( "fmt" "sync" + "sync/atomic" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -53,12 +54,12 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc } if dmis.Has(bsm.Block.bh.TSID.MetricID) { // Skip blocks for deleted metrics. - *rowsDeleted += uint64(bsm.Block.bh.RowsCount) + atomic.AddUint64(rowsDeleted, uint64(bsm.Block.bh.RowsCount)) continue } if bsm.Block.bh.MaxTimestamp < retentionDeadline { // Skip blocks out of the given retention. - *rowsDeleted += uint64(bsm.Block.bh.RowsCount) + atomic.AddUint64(rowsDeleted, uint64(bsm.Block.bh.RowsCount)) continue } if pendingBlockIsEmpty { @@ -181,7 +182,7 @@ func skipSamplesOutsideRetention(b *Block, retentionDeadline int64, rowsDeleted for nextIdx < len(timestamps) && timestamps[nextIdx] < retentionDeadline { nextIdx++ } - *rowsDeleted += uint64(nextIdx - b.nextIdx) + atomic.AddUint64(rowsDeleted, uint64(nextIdx-b.nextIdx)) b.nextIdx = nextIdx } diff --git a/lib/storage/storage.go b/lib/storage/storage.go index ca9459a8c..d79b38a55 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -921,11 +921,21 @@ func (s *Storage) searchMetricName(dst []byte, metricID uint64) ([]byte, error) return s.idb().searchMetricName(dst, metricID) } +// SearchTagKeysOnTimeRange searches for tag keys on tr. +func (s *Storage) SearchTagKeysOnTimeRange(tr TimeRange, maxTagKeys int, deadline uint64) ([]string, error) { + return s.idb().SearchTagKeysOnTimeRange(tr, maxTagKeys, deadline) +} + // SearchTagKeys searches for tag keys func (s *Storage) SearchTagKeys(maxTagKeys int, deadline uint64) ([]string, error) { return s.idb().SearchTagKeys(maxTagKeys, deadline) } +// SearchTagValuesOnTimeRange searches for tag values for the given tagKey on tr. +func (s *Storage) SearchTagValuesOnTimeRange(tagKey []byte, tr TimeRange, maxTagValues int, deadline uint64) ([]string, error) { + return s.idb().SearchTagValuesOnTimeRange(tagKey, tr, maxTagValues, deadline) +} + // SearchTagValues searches for tag values for the given tagKey func (s *Storage) SearchTagValues(tagKey []byte, maxTagValues int, deadline uint64) ([]string, error) { return s.idb().SearchTagValues(tagKey, maxTagValues, deadline)