From c4e8c34d0ee76043d12d20becc60fc1a9db4ebb1 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 4 Nov 2020 20:18:33 +0200 Subject: [PATCH 1/6] docs/CaseStudies.md: add case study for Idealo.de --- README.md | 1 + docs/CaseStudies.md | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/README.md b/README.md index 40b212b6d..c110a6d90 100644 --- a/README.md +++ b/README.md @@ -38,6 +38,7 @@ See [features available for enterprise customers](https://github.com/VictoriaMet * [Brandwatch](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#brandwatch) * [Adsterra](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#adsterra) * [ARNES](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#arnes) +* [Idealo.de](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#idealode) ## Prominent features diff --git a/docs/CaseStudies.md b/docs/CaseStudies.md index 989706672..bdb0e0a87 100644 --- a/docs/CaseStudies.md +++ b/docs/CaseStudies.md @@ -6,6 +6,20 @@ and feel free asking for references, reviews and additional case studies from re See also [articles about VictoriaMetrics from our users](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/Articles#third-party-articles-and-slides). +* [Adidas](#adidas) +* [CERN](#cern) +* [COLOPL](#colopl) +* [Zerodha](#zerodha) +* [Wix.com](#wixcom) +* [Wedos.com](#wedoscom) +* [Synthesio](#synthesio) +* [Dreamteam](#dreamteam) +* [Brandwatch](#brandwatch) +* [Adsterra](#adsterra) +* [ARNES](#arnes) +* [Idealo.de](#idealode) + + ## Adidas See [slides](https://promcon.io/2019-munich/slides/remote-write-storage-wars.pdf) and [video](https://youtu.be/OsH6gPdxR4s) @@ -304,3 +318,21 @@ Grafana has a LB infront, so if one DC has problems, we can still view all metri We are still in the process of migration, but we are really happy with the whole stack. It has proven as an essential piece for insight into our services during COVID-19 and has enabled us to provide better service and spot problems faster. + + +## Idealo.de + +[idealo.de](https://www.idealo.de/) is the leading price comparison website in Germany. We use Prometheus for metrics on our container platform. +When we introduced Prometheus at idealo we started with m3db as a longterm storage. In our setup m3db was quite unstable and consumed a lot of resources. + +VictoriaMetrics runs very stable for us and uses only a fraction of the resources. Although we also increased our retention time from 1 month to 13 months. + +Numbers: + +- The number of active time series per VictoriaMetrics instance is 21M. +- Total ingestion rate 120k metrics per second. +- The total number of datapoints 3.1 trillion. +- The average time series churn rate is ~9M per day. +- The average query rate is ~20 per second. Response time for 99th quantile is 120ms. +- Retention: 13 months. +- Size of all datapoints: 3.5 TB From 4c808d58bfb737a528a8e9ee0ed022d2ea9a412b Mon Sep 17 00:00:00 2001 From: Nikolay Date: Wed, 4 Nov 2020 21:29:18 +0300 Subject: [PATCH 2/6] Adds ready probe (#874) * adds leading forward slash check for scrapeURL path https://github.com/VictoriaMetrics/VictoriaMetrics/issues/835 * adds ready probe for scrape config initialization, it should prevent metrics loss during vmagent rolling update, /ready api will return 425 http code, if some scrape config still waits for initialization. * updates docs * Update app/vmagent/README.md * renames var * Update app/vmagent/README.md Co-authored-by: Aliaksandr Valialkin --- app/vmagent/README.md | 2 ++ app/vmagent/main.go | 11 +++++++++++ app/vminsert/main.go | 10 ++++++++++ docs/vmagent.md | 2 ++ lib/promscrape/scraper.go | 13 ++++++++++++- 5 files changed, 37 insertions(+), 1 deletion(-) diff --git a/app/vmagent/README.md b/app/vmagent/README.md index 6d132ad5a..099947c5b 100644 --- a/app/vmagent/README.md +++ b/app/vmagent/README.md @@ -219,6 +219,8 @@ It accepts optional `show_original_labels=1` query arg, which shows the original This information may be useful for debugging target relabeling. * `http://vmagent-host:8429/api/v1/targets`. This handler returns data compatible with [the corresponding page from Prometheus API](https://prometheus.io/docs/prometheus/latest/querying/api/#targets). +* `http://vmagent-host:8429/ready` - this handler returns http 200 status code when `vmagent` finishes initialization for all service_discovery configs. + It may be useful, when you have many entries at `-promscrape.config` and want to perform `vmagent` rolling update without scrape loss. ### Troubleshooting diff --git a/app/vmagent/main.go b/app/vmagent/main.go index d3fa1d6b9..4d2b7acb5 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -7,6 +7,7 @@ import ( "os" "strconv" "strings" + "sync/atomic" "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/csvimport" @@ -222,6 +223,16 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { procutil.SelfSIGHUP() w.WriteHeader(http.StatusOK) return true + case "/ready": + if rdy := atomic.LoadInt32(&promscrape.PendingScrapeConfigs); rdy > 0 { + errMsg := fmt.Sprintf("waiting for scrapes to init, left: %d", rdy) + http.Error(w, errMsg, http.StatusTooEarly) + } else { + w.Header().Set("Content-Type", "text/plain") + w.WriteHeader(http.StatusOK) + w.Write([]byte("OK")) + } + return true } return false } diff --git a/app/vminsert/main.go b/app/vminsert/main.go index 504922c94..1ab37655d 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -170,6 +170,16 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { procutil.SelfSIGHUP() w.WriteHeader(http.StatusNoContent) return true + case "/ready": + if rdy := atomic.LoadInt32(&promscrape.PendingScrapeConfigs); rdy > 0 { + errMsg := fmt.Sprintf("waiting for scrape config to init targets, configs left: %d", rdy) + http.Error(w, errMsg, http.StatusTooEarly) + } else { + w.Header().Set("Content-Type", "text/plain") + w.WriteHeader(http.StatusOK) + w.Write([]byte("OK")) + } + return true default: // This is not our link return false diff --git a/docs/vmagent.md b/docs/vmagent.md index 6d132ad5a..87559d888 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -219,6 +219,8 @@ It accepts optional `show_original_labels=1` query arg, which shows the original This information may be useful for debugging target relabeling. * `http://vmagent-host:8429/api/v1/targets`. This handler returns data compatible with [the corresponding page from Prometheus API](https://prometheus.io/docs/prometheus/latest/querying/api/#targets). +* `http://vmagent-host:8429/ready` - this handler returns http 200 status code, when `vmagent` finished initialization for all service_discovery configs. + It may be useful, when you have a lof of entries at promscrape.config and want to perform `vmagent` rolling update without metrics loss. ### Troubleshooting diff --git a/lib/promscrape/scraper.go b/lib/promscrape/scraper.go index 341e66462..52f99a97e 100644 --- a/lib/promscrape/scraper.go +++ b/lib/promscrape/scraper.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "sync" + "sync/atomic" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -75,6 +76,9 @@ func Stop() { var ( globalStopCh chan struct{} scraperWG sync.WaitGroup + // PendingScrapeConfigs - zero value means, that + // all scrapeConfigs are inited and ready for work. + PendingScrapeConfigs int32 ) func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) { @@ -166,6 +170,7 @@ func newScrapeConfigs(pushData func(wr *prompbmarshal.WriteRequest)) *scrapeConf } func (scs *scrapeConfigs) add(name string, checkInterval time.Duration, getScrapeWork func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork) { + atomic.AddInt32(&PendingScrapeConfigs, 1) scfg := &scrapeConfig{ name: name, pushData: scs.pushData, @@ -216,10 +221,15 @@ func (scfg *scrapeConfig) run() { cfg := <-scfg.cfgCh var swsPrev []ScrapeWork - for { + updateScrapeWork := func(cfg *Config) { sws := scfg.getScrapeWork(cfg, swsPrev) sg.update(sws) swsPrev = sws + } + updateScrapeWork(cfg) + atomic.AddInt32(&PendingScrapeConfigs, -1) + + for { select { case <-scfg.stopCh: @@ -227,6 +237,7 @@ func (scfg *scrapeConfig) run() { case cfg = <-scfg.cfgCh: case <-tickerCh: } + updateScrapeWork(cfg) } } From 381ad564a22339be3dc4da4e62d332fa1fa098d0 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 4 Nov 2020 20:31:43 +0200 Subject: [PATCH 3/6] docs/vmagent.md: update after 4c808d58bfb737a528a8e9ee0ed022d2ea9a412b --- app/vmagent/README.md | 5 +++-- docs/Single-server-VictoriaMetrics.md | 1 + docs/vmagent.md | 5 +++-- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/app/vmagent/README.md b/app/vmagent/README.md index 099947c5b..612a62108 100644 --- a/app/vmagent/README.md +++ b/app/vmagent/README.md @@ -219,8 +219,9 @@ It accepts optional `show_original_labels=1` query arg, which shows the original This information may be useful for debugging target relabeling. * `http://vmagent-host:8429/api/v1/targets`. This handler returns data compatible with [the corresponding page from Prometheus API](https://prometheus.io/docs/prometheus/latest/querying/api/#targets). -* `http://vmagent-host:8429/ready` - this handler returns http 200 status code when `vmagent` finishes initialization for all service_discovery configs. - It may be useful, when you have many entries at `-promscrape.config` and want to perform `vmagent` rolling update without scrape loss. +* `http://vmagent-host:8429/ready`. This handler returns http 200 status code when `vmagent` finishes initialization for all service_discovery configs. +It may be useful for performing `vmagent` rolling update without scrape loss. + ### Troubleshooting diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index 40b212b6d..c110a6d90 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -38,6 +38,7 @@ See [features available for enterprise customers](https://github.com/VictoriaMet * [Brandwatch](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#brandwatch) * [Adsterra](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#adsterra) * [ARNES](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#arnes) +* [Idealo.de](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies#idealode) ## Prominent features diff --git a/docs/vmagent.md b/docs/vmagent.md index 87559d888..612a62108 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -219,8 +219,9 @@ It accepts optional `show_original_labels=1` query arg, which shows the original This information may be useful for debugging target relabeling. * `http://vmagent-host:8429/api/v1/targets`. This handler returns data compatible with [the corresponding page from Prometheus API](https://prometheus.io/docs/prometheus/latest/querying/api/#targets). -* `http://vmagent-host:8429/ready` - this handler returns http 200 status code, when `vmagent` finished initialization for all service_discovery configs. - It may be useful, when you have a lof of entries at promscrape.config and want to perform `vmagent` rolling update without metrics loss. +* `http://vmagent-host:8429/ready`. This handler returns http 200 status code when `vmagent` finishes initialization for all service_discovery configs. +It may be useful for performing `vmagent` rolling update without scrape loss. + ### Troubleshooting From b378cd6ed892807a6ae3721996407fb002677db7 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 5 Nov 2020 00:15:43 +0200 Subject: [PATCH 4/6] app/vmselect: optimize querying for `/api/v1/labels` and `/api/v1/label//values` when `start` and `end` args are set --- CHANGELOG.md | 2 +- app/vmselect/netstorage/netstorage.go | 45 ++++- app/vmselect/prometheus/prometheus.go | 42 ++++- app/vmstorage/main.go | 16 ++ lib/storage/index_db.go | 237 +++++++++++++++++++++++++- lib/storage/index_db_test.go | 34 ++++ lib/storage/storage.go | 10 ++ 7 files changed, 369 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3a100d7c8..7cce09af9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # tip - +* FEATURE: optimize requests to `/api/v1/labels` and `/api/v1/label//values` when `start` and `end` args are set. * FEATURE: reduce memory usage when query touches big number of time series. * FEATURE: vmagent: reduce memory usage when `kubernetes_sd_config` discovers big number of scrape targets (e.g. hundreds of thouthands) and the majority of these targets (99%) are dropped during relabeling. Previously labels for all the dropped targets were displayed at `/api/v1/targets` page. Now only up to `-promscrape.maxDroppedTargets` such diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index dd452770e..e572be55e 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -453,6 +453,26 @@ func DeleteSeries(sq *storage.SearchQuery) (int, error) { return vmstorage.DeleteMetrics(tfss) } +// GetLabelsOnTimeRange returns labels for the given tr until the given deadline. +func GetLabelsOnTimeRange(tr storage.TimeRange, deadline searchutils.Deadline) ([]string, error) { + if deadline.Exceeded() { + return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) + } + labels, err := vmstorage.SearchTagKeysOnTimeRange(tr, *maxTagKeysPerSearch, deadline.Deadline()) + if err != nil { + return nil, fmt.Errorf("error during labels search on time range: %w", err) + } + // Substitute "" with "__name__" + for i := range labels { + if labels[i] == "" { + labels[i] = "__name__" + } + } + // Sort labels like Prometheus does + sort.Strings(labels) + return labels, nil +} + // GetLabels returns labels until the given deadline. func GetLabels(deadline searchutils.Deadline) ([]string, error) { if deadline.Exceeded() { @@ -462,20 +482,36 @@ func GetLabels(deadline searchutils.Deadline) ([]string, error) { if err != nil { return nil, fmt.Errorf("error during labels search: %w", err) } - // Substitute "" with "__name__" for i := range labels { if labels[i] == "" { labels[i] = "__name__" } } - // Sort labels like Prometheus does sort.Strings(labels) - return labels, nil } +// GetLabelValuesOnTimeRange returns label values for the given labelName on the given tr +// until the given deadline. +func GetLabelValuesOnTimeRange(labelName string, tr storage.TimeRange, deadline searchutils.Deadline) ([]string, error) { + if deadline.Exceeded() { + return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) + } + if labelName == "__name__" { + labelName = "" + } + // Search for tag values + labelValues, err := vmstorage.SearchTagValuesOnTimeRange([]byte(labelName), tr, *maxTagValuesPerSearch, deadline.Deadline()) + if err != nil { + return nil, fmt.Errorf("error during label values search on time range for labelName=%q: %w", labelName, err) + } + // Sort labelValues like Prometheus does + sort.Strings(labelValues) + return labelValues, nil +} + // GetLabelValues returns label values for the given labelName // until the given deadline. func GetLabelValues(labelName string, deadline searchutils.Deadline) ([]string, error) { @@ -485,16 +521,13 @@ func GetLabelValues(labelName string, deadline searchutils.Deadline) ([]string, if labelName == "__name__" { labelName = "" } - // Search for tag values labelValues, err := vmstorage.SearchTagValues([]byte(labelName), *maxTagValuesPerSearch, deadline.Deadline()) if err != nil { return nil, fmt.Errorf("error during label values search for labelName=%q: %w", labelName, err) } - // Sort labelValues like Prometheus does sort.Strings(labelValues) - return labelValues, nil } diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index f5eebc035..f43517e61 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -511,9 +511,26 @@ func LabelValuesHandler(startTime time.Time, labelName string, w http.ResponseWr return fmt.Errorf("cannot parse form values: %w", err) } var labelValues []string - if len(r.Form["match[]"]) == 0 && len(r.Form["start"]) == 0 && len(r.Form["end"]) == 0 { + if len(r.Form["match[]"]) == 0 { var err error - labelValues, err = netstorage.GetLabelValues(labelName, deadline) + if len(r.Form["start"]) == 0 && len(r.Form["end"]) == 0 { + labelValues, err = netstorage.GetLabelValues(labelName, deadline) + } else { + ct := startTime.UnixNano() / 1e6 + end, err := searchutils.GetTime(r, "end", ct) + if err != nil { + return err + } + start, err := searchutils.GetTime(r, "start", end-defaultStep) + if err != nil { + return err + } + tr := storage.TimeRange{ + MinTimestamp: start, + MaxTimestamp: end, + } + labelValues, err = netstorage.GetLabelValuesOnTimeRange(labelName, tr, deadline) + } if err != nil { return fmt.Errorf(`cannot obtain label values for %q: %w`, labelName, err) } @@ -692,9 +709,26 @@ func LabelsHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) return fmt.Errorf("cannot parse form values: %w", err) } var labels []string - if len(r.Form["match[]"]) == 0 && len(r.Form["start"]) == 0 && len(r.Form["end"]) == 0 { + if len(r.Form["match[]"]) == 0 { var err error - labels, err = netstorage.GetLabels(deadline) + if len(r.Form["start"]) == 0 && len(r.Form["end"]) == 0 { + labels, err = netstorage.GetLabels(deadline) + } else { + ct := startTime.UnixNano() / 1e6 + end, err := searchutils.GetTime(r, "end", ct) + if err != nil { + return err + } + start, err := searchutils.GetTime(r, "start", end-defaultStep) + if err != nil { + return err + } + tr := storage.TimeRange{ + MinTimestamp: start, + MaxTimestamp: end, + } + labels, err = netstorage.GetLabelsOnTimeRange(tr, deadline) + } if err != nil { return fmt.Errorf("cannot obtain labels: %w", err) } diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index cd30b2315..5b50dd0a7 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -122,6 +122,14 @@ func DeleteMetrics(tfss []*storage.TagFilters) (int, error) { return n, err } +// SearchTagKeysOnTimeRange searches for tag keys on tr. +func SearchTagKeysOnTimeRange(tr storage.TimeRange, maxTagKeys int, deadline uint64) ([]string, error) { + WG.Add(1) + keys, err := Storage.SearchTagKeysOnTimeRange(tr, maxTagKeys, deadline) + WG.Done() + return keys, err +} + // SearchTagKeys searches for tag keys func SearchTagKeys(maxTagKeys int, deadline uint64) ([]string, error) { WG.Add(1) @@ -130,6 +138,14 @@ func SearchTagKeys(maxTagKeys int, deadline uint64) ([]string, error) { return keys, err } +// SearchTagValuesOnTimeRange searches for tag values for the given tagKey on tr. +func SearchTagValuesOnTimeRange(tagKey []byte, tr storage.TimeRange, maxTagValues int, deadline uint64) ([]string, error) { + WG.Add(1) + values, err := Storage.SearchTagValuesOnTimeRange(tagKey, tr, maxTagValues, deadline) + WG.Done() + return values, err +} + // SearchTagValues searches for tag values for the given tagKey func SearchTagValues(tagKey []byte, maxTagValues int, deadline uint64) ([]string, error) { WG.Add(1) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 63b52eff1..aecd4ea44 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -701,10 +701,118 @@ func putIndexItems(ii *indexItems) { var indexItemsPool sync.Pool +// SearchTagKeysOnTimeRange returns all the tag keys on the given tr. +func (db *indexDB) SearchTagKeysOnTimeRange(tr TimeRange, maxTagKeys int, deadline uint64) ([]string, error) { + tks := make(map[string]struct{}) + is := db.getIndexSearch(deadline) + err := is.searchTagKeysOnTimeRange(tks, tr, maxTagKeys) + db.putIndexSearch(is) + if err != nil { + return nil, err + } + + ok := db.doExtDB(func(extDB *indexDB) { + is := extDB.getIndexSearch(deadline) + err = is.searchTagKeysOnTimeRange(tks, tr, maxTagKeys) + extDB.putIndexSearch(is) + }) + if ok && err != nil { + return nil, err + } + + keys := make([]string, 0, len(tks)) + for key := range tks { + // Do not skip empty keys, since they are converted to __name__ + keys = append(keys, key) + } + // Do not sort keys, since they must be sorted by vmselect. + return keys, nil +} + +func (is *indexSearch) searchTagKeysOnTimeRange(tks map[string]struct{}, tr TimeRange, maxTagKeys int) error { + minDate := uint64(tr.MinTimestamp) / msecPerDay + maxDate := uint64(tr.MaxTimestamp) / msecPerDay + var mu sync.Mutex + var wg sync.WaitGroup + var errGlobal error + for date := minDate; date <= maxDate; date++ { + wg.Add(1) + go func(date uint64) { + defer wg.Done() + tksLocal := make(map[string]struct{}) + isLocal := is.db.getIndexSearch(is.deadline) + err := isLocal.searchTagKeysOnDate(tksLocal, date, maxTagKeys) + is.db.putIndexSearch(isLocal) + mu.Lock() + defer mu.Unlock() + if errGlobal != nil { + return + } + if err != nil { + errGlobal = err + return + } + if len(tks) >= maxTagKeys { + return + } + for k := range tksLocal { + tks[k] = struct{}{} + } + }(date) + } + wg.Wait() + return errGlobal +} + +func (is *indexSearch) searchTagKeysOnDate(tks map[string]struct{}, date uint64, maxTagKeys int) error { + ts := &is.ts + kb := &is.kb + mp := &is.mp + mp.Reset() + dmis := is.db.getDeletedMetricIDs() + loopsPaceLimiter := 0 + kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs) + kb.B = encoding.MarshalUint64(kb.B, date) + prefix := kb.B + ts.Seek(prefix) + for len(tks) < maxTagKeys && ts.NextItem() { + if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 { + if err := checkSearchDeadlineAndPace(is.deadline); err != nil { + return err + } + } + loopsPaceLimiter++ + item := ts.Item + if !bytes.HasPrefix(item, prefix) { + break + } + if err := mp.Init(item, nsPrefixDateTagToMetricIDs); err != nil { + return err + } + if mp.IsDeletedTag(dmis) { + continue + } + + // Store tag key. + tks[string(mp.Tag.Key)] = struct{}{} + + // Search for the next tag key. + // The last char in kb.B must be tagSeparatorChar. + // Just increment it in order to jump to the next tag key. + kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs) + kb.B = encoding.MarshalUint64(kb.B, date) + kb.B = marshalTagValue(kb.B, mp.Tag.Key) + kb.B[len(kb.B)-1]++ + ts.Seek(kb.B) + } + if err := ts.Error(); err != nil { + return fmt.Errorf("error during search for prefix %q: %w", prefix, err) + } + return nil +} + // SearchTagKeys returns all the tag keys. func (db *indexDB) SearchTagKeys(maxTagKeys int, deadline uint64) ([]string, error) { - // TODO: cache results? - tks := make(map[string]struct{}) is := db.getIndexSearch(deadline) @@ -728,7 +836,6 @@ func (db *indexDB) SearchTagKeys(maxTagKeys int, deadline uint64) ([]string, err // Do not skip empty keys, since they are converted to __name__ keys = append(keys, key) } - // Do not sort keys, since they must be sorted by vmselect. return keys, nil } @@ -778,10 +885,129 @@ func (is *indexSearch) searchTagKeys(tks map[string]struct{}, maxTagKeys int) er return nil } +// SearchTagValuesOnTimeRange returns all the tag values for the given tagKey on tr. +func (db *indexDB) SearchTagValuesOnTimeRange(tagKey []byte, tr TimeRange, maxTagValues int, deadline uint64) ([]string, error) { + tvs := make(map[string]struct{}) + is := db.getIndexSearch(deadline) + err := is.searchTagValuesOnTimeRange(tvs, tagKey, tr, maxTagValues) + db.putIndexSearch(is) + if err != nil { + return nil, err + } + ok := db.doExtDB(func(extDB *indexDB) { + is := extDB.getIndexSearch(deadline) + err = is.searchTagValuesOnTimeRange(tvs, tagKey, tr, maxTagValues) + extDB.putIndexSearch(is) + }) + if ok && err != nil { + return nil, err + } + + tagValues := make([]string, 0, len(tvs)) + for tv := range tvs { + if len(tv) == 0 { + // Skip empty values, since they have no any meaning. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/600 + continue + } + tagValues = append(tagValues, tv) + } + // Do not sort tagValues, since they must be sorted by vmselect. + return tagValues, nil +} + +func (is *indexSearch) searchTagValuesOnTimeRange(tvs map[string]struct{}, tagKey []byte, tr TimeRange, maxTagValues int) error { + minDate := uint64(tr.MinTimestamp) / msecPerDay + maxDate := uint64(tr.MaxTimestamp) / msecPerDay + var mu sync.Mutex + var wg sync.WaitGroup + var errGlobal error + for date := minDate; date <= maxDate; date++ { + wg.Add(1) + go func(date uint64) { + defer wg.Done() + tvsLocal := make(map[string]struct{}) + isLocal := is.db.getIndexSearch(is.deadline) + err := isLocal.searchTagValuesOnDate(tvsLocal, tagKey, date, maxTagValues) + is.db.putIndexSearch(isLocal) + mu.Lock() + defer mu.Unlock() + if errGlobal != nil { + return + } + if err != nil { + errGlobal = err + return + } + if len(tvs) >= maxTagValues { + return + } + for v := range tvsLocal { + tvs[v] = struct{}{} + } + }(date) + } + wg.Wait() + return errGlobal +} + +func (is *indexSearch) searchTagValuesOnDate(tvs map[string]struct{}, tagKey []byte, date uint64, maxTagValues int) error { + ts := &is.ts + kb := &is.kb + mp := &is.mp + mp.Reset() + dmis := is.db.getDeletedMetricIDs() + loopsPaceLimiter := 0 + kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs) + kb.B = encoding.MarshalUint64(kb.B, date) + kb.B = marshalTagValue(kb.B, tagKey) + prefix := kb.B + ts.Seek(prefix) + for len(tvs) < maxTagValues && ts.NextItem() { + if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 { + if err := checkSearchDeadlineAndPace(is.deadline); err != nil { + return err + } + } + loopsPaceLimiter++ + item := ts.Item + if !bytes.HasPrefix(item, prefix) { + break + } + if err := mp.Init(item, nsPrefixDateTagToMetricIDs); err != nil { + return err + } + if mp.IsDeletedTag(dmis) { + continue + } + + // Store tag value + tvs[string(mp.Tag.Value)] = struct{}{} + + if mp.MetricIDsLen() < maxMetricIDsPerRow/2 { + // There is no need in searching for the next tag value, + // since it is likely it is located in the next row, + // because the current row contains incomplete metricIDs set. + continue + } + // Search for the next tag value. + // The last char in kb.B must be tagSeparatorChar. + // Just increment it in order to jump to the next tag value. + kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs) + kb.B = encoding.MarshalUint64(kb.B, date) + kb.B = marshalTagValue(kb.B, mp.Tag.Key) + kb.B = marshalTagValue(kb.B, mp.Tag.Value) + kb.B[len(kb.B)-1]++ + ts.Seek(kb.B) + } + if err := ts.Error(); err != nil { + return fmt.Errorf("error when searching for tag name prefix %q: %w", prefix, err) + } + return nil +} + // SearchTagValues returns all the tag values for the given tagKey func (db *indexDB) SearchTagValues(tagKey []byte, maxTagValues int, deadline uint64) ([]string, error) { - // TODO: cache results? - tvs := make(map[string]struct{}) is := db.getIndexSearch(deadline) err := is.searchTagValues(tvs, tagKey, maxTagValues) @@ -807,7 +1033,6 @@ func (db *indexDB) SearchTagValues(tagKey []byte, maxTagValues int, deadline uin } tagValues = append(tagValues, tv) } - // Do not sort tagValues, since they must be sorted by vmselect. return tagValues, nil } diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index 3da1236fb..3d12eb7da 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -8,6 +8,7 @@ import ( "os" "reflect" "regexp" + "sort" "testing" "time" @@ -1487,6 +1488,13 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { var metricNameBuf []byte perDayMetricIDs := make(map[uint64]*uint64set.Set) var allMetricIDs uint64set.Set + tagKeys := []string{ + "", "constant", "day", "uniqueid", + } + tagValues := []string{ + "testMetric", + } + sort.Strings(tagKeys) for day := 0; day < days; day++ { var tsids []TSID for metric := 0; metric < metricsPerDay; metric++ { @@ -1554,6 +1562,32 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { t.Fatalf("unexpected metricIDs found;\ngot\n%d\nwant\n%d", metricIDs.AppendTo(nil), allMetricIDs.AppendTo(nil)) } + // Check SearchTagKeysOnTimeRange. + tks, err := db.SearchTagKeysOnTimeRange(TimeRange{ + MinTimestamp: int64(now) - msecPerDay, + MaxTimestamp: int64(now), + }, 10000, noDeadline) + if err != nil { + t.Fatalf("unexpected error in SearchTagKeysOnTimeRange: %s", err) + } + sort.Strings(tks) + if !reflect.DeepEqual(tks, tagKeys) { + t.Fatalf("unexpected tagKeys; got\n%s\nwant\n%s", tks, tagKeys) + } + + // Check SearchTagValuesOnTimeRange. + tvs, err := db.SearchTagValuesOnTimeRange([]byte(""), TimeRange{ + MinTimestamp: int64(now)-msecPerDay, + MaxTimestamp: int64(now), + }, 10000, noDeadline) + if err != nil { + t.Fatalf("unexpected error in SearchTagValuesOnTimeRange: %s", err) + } + sort.Strings(tvs) + if !reflect.DeepEqual(tvs, tagValues) { + t.Fatalf("unexpected tagValues; got\n%s\nwant\n%s", tvs, tagValues) + } + // Create a filter that will match series that occur across multiple days tfs := NewTagFilters() if err := tfs.Add([]byte("constant"), []byte("const"), false, false); err != nil { diff --git a/lib/storage/storage.go b/lib/storage/storage.go index ca9459a8c..d79b38a55 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -921,11 +921,21 @@ func (s *Storage) searchMetricName(dst []byte, metricID uint64) ([]byte, error) return s.idb().searchMetricName(dst, metricID) } +// SearchTagKeysOnTimeRange searches for tag keys on tr. +func (s *Storage) SearchTagKeysOnTimeRange(tr TimeRange, maxTagKeys int, deadline uint64) ([]string, error) { + return s.idb().SearchTagKeysOnTimeRange(tr, maxTagKeys, deadline) +} + // SearchTagKeys searches for tag keys func (s *Storage) SearchTagKeys(maxTagKeys int, deadline uint64) ([]string, error) { return s.idb().SearchTagKeys(maxTagKeys, deadline) } +// SearchTagValuesOnTimeRange searches for tag values for the given tagKey on tr. +func (s *Storage) SearchTagValuesOnTimeRange(tagKey []byte, tr TimeRange, maxTagValues int, deadline uint64) ([]string, error) { + return s.idb().SearchTagValuesOnTimeRange(tagKey, tr, maxTagValues, deadline) +} + // SearchTagValues searches for tag values for the given tagKey func (s *Storage) SearchTagValues(tagKey []byte, maxTagValues int, deadline uint64) ([]string, error) { return s.idb().SearchTagValues(tagKey, maxTagValues, deadline) From 1cb78ba1a0bf2874a68d6999b1bdec93bda7b216 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 5 Nov 2020 01:12:21 +0200 Subject: [PATCH 5/6] lib/storage: remove data race when updating rowsDeleted --- lib/storage/merge.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/lib/storage/merge.go b/lib/storage/merge.go index 0fc16d434..433b2fc4e 100644 --- a/lib/storage/merge.go +++ b/lib/storage/merge.go @@ -3,6 +3,7 @@ package storage import ( "fmt" "sync" + "sync/atomic" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -53,12 +54,12 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc } if dmis.Has(bsm.Block.bh.TSID.MetricID) { // Skip blocks for deleted metrics. - *rowsDeleted += uint64(bsm.Block.bh.RowsCount) + atomic.AddUint64(rowsDeleted, uint64(bsm.Block.bh.RowsCount)) continue } if bsm.Block.bh.MaxTimestamp < retentionDeadline { // Skip blocks out of the given retention. - *rowsDeleted += uint64(bsm.Block.bh.RowsCount) + atomic.AddUint64(rowsDeleted, uint64(bsm.Block.bh.RowsCount)) continue } if pendingBlockIsEmpty { @@ -181,7 +182,7 @@ func skipSamplesOutsideRetention(b *Block, retentionDeadline int64, rowsDeleted for nextIdx < len(timestamps) && timestamps[nextIdx] < retentionDeadline { nextIdx++ } - *rowsDeleted += uint64(nextIdx - b.nextIdx) + atomic.AddUint64(rowsDeleted, uint64(nextIdx - b.nextIdx)) b.nextIdx = nextIdx } From 368b69b4c49bacba33c48a7ecea45506a3917eb7 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 5 Nov 2020 01:36:13 +0200 Subject: [PATCH 6/6] app/vmselect: properly handle errors in GetLabelsOnTimeRange and GetLabelValuesOnTimeRange --- app/vmselect/prometheus/prometheus.go | 22 ++++++++++++++-------- lib/storage/index_db_test.go | 2 +- lib/storage/merge.go | 2 +- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index f43517e61..cdee46c67 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -512,9 +512,12 @@ func LabelValuesHandler(startTime time.Time, labelName string, w http.ResponseWr } var labelValues []string if len(r.Form["match[]"]) == 0 { - var err error if len(r.Form["start"]) == 0 && len(r.Form["end"]) == 0 { + var err error labelValues, err = netstorage.GetLabelValues(labelName, deadline) + if err != nil { + return fmt.Errorf(`cannot obtain label values for %q: %w`, labelName, err) + } } else { ct := startTime.UnixNano() / 1e6 end, err := searchutils.GetTime(r, "end", ct) @@ -530,9 +533,9 @@ func LabelValuesHandler(startTime time.Time, labelName string, w http.ResponseWr MaxTimestamp: end, } labelValues, err = netstorage.GetLabelValuesOnTimeRange(labelName, tr, deadline) - } - if err != nil { - return fmt.Errorf(`cannot obtain label values for %q: %w`, labelName, err) + if err != nil { + return fmt.Errorf(`cannot obtain label values on time range for %q: %w`, labelName, err) + } } } else { // Extended functionality that allows filtering by label filters and time range @@ -710,9 +713,12 @@ func LabelsHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) } var labels []string if len(r.Form["match[]"]) == 0 { - var err error if len(r.Form["start"]) == 0 && len(r.Form["end"]) == 0 { + var err error labels, err = netstorage.GetLabels(deadline) + if err != nil { + return fmt.Errorf("cannot obtain labels: %w", err) + } } else { ct := startTime.UnixNano() / 1e6 end, err := searchutils.GetTime(r, "end", ct) @@ -728,9 +734,9 @@ func LabelsHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) MaxTimestamp: end, } labels, err = netstorage.GetLabelsOnTimeRange(tr, deadline) - } - if err != nil { - return fmt.Errorf("cannot obtain labels: %w", err) + if err != nil { + return fmt.Errorf("cannot obtain labels on time range: %w", err) + } } } else { // Extended functionality that allows filtering by label filters and time range diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index 3d12eb7da..eecf1b64f 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -1577,7 +1577,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { // Check SearchTagValuesOnTimeRange. tvs, err := db.SearchTagValuesOnTimeRange([]byte(""), TimeRange{ - MinTimestamp: int64(now)-msecPerDay, + MinTimestamp: int64(now) - msecPerDay, MaxTimestamp: int64(now), }, 10000, noDeadline) if err != nil { diff --git a/lib/storage/merge.go b/lib/storage/merge.go index 433b2fc4e..073361064 100644 --- a/lib/storage/merge.go +++ b/lib/storage/merge.go @@ -182,7 +182,7 @@ func skipSamplesOutsideRetention(b *Block, retentionDeadline int64, rowsDeleted for nextIdx < len(timestamps) && timestamps[nextIdx] < retentionDeadline { nextIdx++ } - atomic.AddUint64(rowsDeleted, uint64(nextIdx - b.nextIdx)) + atomic.AddUint64(rowsDeleted, uint64(nextIdx-b.nextIdx)) b.nextIdx = nextIdx }