diff --git a/CHANGELOG.md b/CHANGELOG.md index 99fd65efac..29cae90ff1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,15 +1,35 @@ # tip +* FEATURE: allow setting `-retentionPeriod` smaller than one month. I.e. `-retentionPeriod=3d`, `-retentionPeriod=2w`, etc. is supported now. + See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/173 * FEATURE: optimize more cases according to https://utcc.utoronto.ca/~cks/space/blog/sysadmin/PrometheusLabelNonOptimization . Now the following cases are optimized too: * `rollup_func(foo{filters}[d]) op bar` -> `rollup_func(foo{filters}[d]) op bar{filters}` * `transform_func(foo{filters}) op bar` -> `transform_func(foo{filters}) op bar{filters}` * `num_or_scalar op foo{filters} op bar` -> `num_or_scalar op foo{filters} op bar{filters}` * FEATURE: improve time series search for queries with multiple label filters. I.e. `foo{label1="value", label2=~"regexp"}`. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/781 +* FEATURE: vmagent: add `stream parse` mode. This mode allows reducing memory usage when individual scrape targets expose tens of millions of metrics. + For example, during scraping Prometheus in [federation](https://prometheus.io/docs/prometheus/latest/federation/) mode. + See `-promscrape.streamParse` command-line option and `stream_parse: true` config option for `scrape_config` section in `-promscrape.config`. + See also [troubleshooting docs for vmagent](https://victoriametrics.github.io/vmagent.html#troubleshooting). +* FEATURE: vmalert: add `-dryRun` command-line option for validating the provided config files without the need to start `vmalert` service. +* FEATURE: accept optional third argument of string type at `topk_*` and `bottomk_*` functions. This is label name for additional time series to return with the sum of time series outside top/bottom K. See [MetricsQL docs](https://victoriametrics.github.io/MetricsQL.html) for more details. +* FEATURE: vmagent: expose `/api/v1/targets` page according to [the corresponding Prometheus API](https://prometheus.io/docs/prometheus/latest/querying/api/#targets). + See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/643 * BUGFIX: vmagent: properly handle OpenStack endpoint ending with `v3.0` such as `https://ostack.example.com:5000/v3.0` in the same way as Prometheus does. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/728#issuecomment-709914803 +* BUGFIX: drop trailing data points for time series with a single raw sample. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/748 +* BUGFIX: do not drop trailing data points for instant queries to `/api/v1/query`. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/845 +* BUGFIX: vmbackup: fix panic when `-origin` isn't specified. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/856 +* BUGFIX: vmalert: skip automatically added labels on alerts restore. Label `alertgroup` was introduced in [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/611) + and automatically added to generated time series. By mistake, this new label wasn't correctly purged on restore event and affected alert's ID uniqueness. + See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/870 +* BUGFIX: vmagent: fix panic at scrape error body formating. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/864 +* BUGFIX: vmagent: add leading missing slash to metrics path like Prometheus does. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/835 +* BUGFIX: vmagent: drop packet if remote storage returns 4xx status code. This make the behaviour consistent with Prometheus. + See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/873 # [v1.44.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.44.0) diff --git a/README.md b/README.md index 81b10537f8..40b212b6dc 100644 --- a/README.md +++ b/README.md @@ -164,7 +164,7 @@ or [docker image](https://hub.docker.com/r/victoriametrics/victoria-metrics/) wi The following command-line flags are used the most: * `-storageDataPath` - path to data directory. VictoriaMetrics stores all the data in this directory. Default path is `victoria-metrics-data` in the current working directory. -* `-retentionPeriod` - retention period in months for stored data. Older data is automatically deleted. Default period is 1 month. +* `-retentionPeriod` - retention for stored data. Older data is automatically deleted. Default retention is 1 month. See [these docs](#retention) for more details. Other flags have good enough default values, so set them only if you really need this. Pass `-help` to see all the available flags with description and default values. @@ -495,6 +495,7 @@ VictoriaMetrics supports the following handlers from [Prometheus querying API](h * [/api/v1/labels](https://prometheus.io/docs/prometheus/latest/querying/api/#getting-label-names) * [/api/v1/label/.../values](https://prometheus.io/docs/prometheus/latest/querying/api/#querying-label-values) * [/api/v1/status/tsdb](https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats) +* [/api/v1/targets](https://prometheus.io/docs/prometheus/latest/querying/api/#targets) - see [these docs](#how-to-scrape-prometheus-exporters-such-as-node-exporter) for more details. These handlers can be queried from Prometheus-compatible clients such as Grafana or curl. @@ -1048,6 +1049,7 @@ The de-duplication reduces disk space usage if multiple identically configured P write data to the same VictoriaMetrics instance. Note that these Prometheus instances must have identical `external_labels` section in their configs, so they write data to the same time series. + ### Retention Retention is configured with `-retentionPeriod` command-line flag. For instance, `-retentionPeriod=3` means @@ -1059,6 +1061,10 @@ For example if `-retentionPeriod` is set to 1, data for January is deleted on Ma It is safe to extend `-retentionPeriod` on existing data. If `-retentionPeriod` is set to lower value than before then data outside the configured period will be eventually deleted. +VictoriaMetrics supports retention smaller than 1 month. For example, `-retentionPeriod=5d` would set data retention for 5 days. +Older data is eventually deleted during [background merge](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282). + + ### Multiple retentions Just start multiple VictoriaMetrics instances with distinct values for the following flags: diff --git a/app/vmagent/README.md b/app/vmagent/README.md index 428ea6e978..22d2c64030 100644 --- a/app/vmagent/README.md +++ b/app/vmagent/README.md @@ -211,9 +211,13 @@ either via `vmagent` itself or via Prometheus, so the exported metrics could be Use official [Grafana dashboard](https://grafana.com/grafana/dashboards/12683) for `vmagent` state overview. If you have suggestions, improvements or found a bug - feel free to open an issue on github or add review to the dashboard. -`vmagent` also exports target statuses at `http://vmagent-host:8429/targets` page in plaintext format. -`/targets` handler accepts optional `show_original_labels=1` query arg, which shows the original labels per each target -before applying relabeling. This information may be useful for debugging target relabeling. +`vmagent` also exports target statuses at the following handlers: + +* `http://vmagent-host:8429/targets`. This handler returns human-readable plaintext status for every active target. +This page is convenient to query from command line with `wget`, `curl` or similar tools. +It accepts optional `show_original_labels=1` query arg, which shows the original labels per each target before applying relabeling. +This information may be useful for debugging target relabeling. +* `http://vmagent-host:8429/api/v1/targets`. This handler returns data compatible with [the corresponding page from Prometheus API](https://prometheus.io/docs/prometheus/latest/querying/api/#targets). ### Troubleshooting @@ -224,7 +228,26 @@ before applying relabeling. This information may be useful for debugging target since `vmagent` establishes at least a single TCP connection per each target. * When `vmagent` scrapes many unreliable targets, it can flood error log with scrape errors. These errors can be suppressed - by passing `-promscrape.suppressScrapeErrors` command-line flag to `vmagent`. The most recent scrape error per each target can be observed at `http://vmagent-host:8429/targets`. + by passing `-promscrape.suppressScrapeErrors` command-line flag to `vmagent`. The most recent scrape error per each target can be observed at `http://vmagent-host:8429/targets` + and `http://vmagent-host:8429/api/v1/targets`. + +* If `vmagent` scrapes targets with millions of metrics per each target (for instance, when scraping [federation endpoints](https://prometheus.io/docs/prometheus/latest/federation/)), + then it is recommended enabling `stream parsing mode` in order to reduce memory usage during scraping. This mode may be enabled either globally for all the scrape targets + by passing `-promscrape.streamParse` command-line flag or on a per-scrape target basis with `stream_parse: true` option. For example: + + ```yml + scrape_configs: + - job_name: 'big-federate' + stream_parse: true + static_configs: + - targets: + - big-prometeus1 + - big-prometeus2 + honor_labels: true + metrics_path: /federate + params: + 'match[]': ['{__name__!=""}'] + ``` * It is recommended to increase `-remoteWrite.queues` if `vmagent_remotewrite_pending_data_bytes` metric exported at `http://vmagent-host:8429/metrics` page constantly grows. diff --git a/app/vmagent/main.go b/app/vmagent/main.go index 2a7432ca4d..d3fa1d6b9f 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -211,6 +211,12 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { showOriginalLabels, _ := strconv.ParseBool(r.FormValue("show_original_labels")) promscrape.WriteHumanReadableTargetsStatus(w, showOriginalLabels) return true + case "/api/v1/targets": + promscrapeAPIV1TargetsRequests.Inc() + w.Header().Set("Content-Type", "application/json") + state := r.FormValue("state") + promscrape.WriteAPIV1Targets(w, state) + return true case "/-/reload": promscrapeConfigReloadRequests.Inc() procutil.SelfSIGHUP() @@ -241,7 +247,8 @@ var ( influxQueryRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/query", protocol="influx"}`) - promscrapeTargetsRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/targets"}`) + promscrapeTargetsRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/targets"}`) + promscrapeAPIV1TargetsRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/targets"}`) promscrapeConfigReloadRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/-/reload"}`) ) diff --git a/app/vmagent/remotewrite/client.go b/app/vmagent/remotewrite/client.go index 6596bd75c9..1fb6e43722 100644 --- a/app/vmagent/remotewrite/client.go +++ b/app/vmagent/remotewrite/client.go @@ -53,6 +53,7 @@ type client struct { requestDuration *metrics.Histogram requestsOKCount *metrics.Counter errorsCount *metrics.Counter + packetsDropped *metrics.Counter retriesCount *metrics.Counter wg sync.WaitGroup @@ -114,6 +115,7 @@ func newClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persistentqu c.requestDuration = metrics.GetOrCreateHistogram(fmt.Sprintf(`vmagent_remotewrite_duration_seconds{url=%q}`, c.sanitizedURL)) c.requestsOKCount = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_requests_total{url=%q, status_code="2XX"}`, c.sanitizedURL)) c.errorsCount = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_errors_total{url=%q}`, c.sanitizedURL)) + c.packetsDropped = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_packets_dropped_total{url=%q}`, c.sanitizedURL)) c.retriesCount = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_retries_count_total{url=%q}`, c.sanitizedURL)) for i := 0; i < concurrency; i++ { c.wg.Add(1) @@ -228,10 +230,20 @@ again: c.requestsOKCount.Inc() return } + metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_requests_total{url=%q, status_code="%d"}`, c.sanitizedURL, statusCode)).Inc() + if statusCode/100 == 4 { + // Just drop block on 4xx status code like Prometheus does. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/873 + body, _ := ioutil.ReadAll(resp.Body) + _ = resp.Body.Close() + logger.Errorf("unexpected status code received when sending a block with size %d bytes to %q: #%d; dropping the block for 4XX status code like Prometheus does; "+ + "response body=%q", len(block), c.sanitizedURL, statusCode, body) + c.packetsDropped.Inc() + return + } // Unexpected status code returned retriesCount++ - metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_requests_total{url=%q, status_code="%d"}`, c.sanitizedURL, statusCode)).Inc() retryDuration *= 2 if retryDuration > time.Minute { retryDuration = time.Minute diff --git a/app/vmalert/alerting.go b/app/vmalert/alerting.go index 7e11ad8501..51eee637fd 100644 --- a/app/vmalert/alerting.go +++ b/app/vmalert/alerting.go @@ -403,7 +403,7 @@ func (ar *AlertingRule) Restore(ctx context.Context, q datasource.Querier, lookb labelsFilter += fmt.Sprintf(",%s=%q", k, v) } - // Get the last datapoint in range via MetricsQL `last_over_time`. + // Get the last data point in range via MetricsQL `last_over_time`. // We don't use plain PromQL since Prometheus doesn't support // remote write protocol which is used for state persistence in vmalert. expr := fmt.Sprintf("last_over_time(%s{alertname=%q%s}[%ds])", @@ -417,11 +417,14 @@ func (ar *AlertingRule) Restore(ctx context.Context, q datasource.Querier, lookb labels := m.Labels m.Labels = make([]datasource.Label, 0) // drop all extra labels, so hash key will - // be identical to timeseries received in Exec + // be identical to time series received in Exec for _, l := range labels { if l.Name == alertNameLabel { continue } + if l.Name == alertGroupNameLabel { + continue + } // drop all overridden labels if _, ok := ar.Labels[l.Name]; ok { continue @@ -436,7 +439,7 @@ func (ar *AlertingRule) Restore(ctx context.Context, q datasource.Querier, lookb a.ID = hash(m) a.State = notifier.StatePending ar.alerts[a.ID] = a - logger.Infof("alert %q(%d) restored to state at %v", a.Name, a.ID, a.Start) + logger.Infof("alert %q (%d) restored to state at %v", a.Name, a.ID, a.Start) } return nil } diff --git a/app/vmalert/alerting_test.go b/app/vmalert/alerting_test.go index cace27e6dc..55aa2b48da 100644 --- a/app/vmalert/alerting_test.go +++ b/app/vmalert/alerting_test.go @@ -355,6 +355,7 @@ func TestAlertingRule_Restore(t *testing.T) { metricWithValueAndLabels(t, float64(time.Now().Truncate(time.Hour).Unix()), "__name__", alertForStateMetricName, alertNameLabel, "", + alertGroupNameLabel, "groupID", "foo", "bar", "namespace", "baz", ), diff --git a/app/vmalert/config/config.go b/app/vmalert/config/config.go index 11f8f7eb85..1ea260e4e6 100644 --- a/app/vmalert/config/config.go +++ b/app/vmalert/config/config.go @@ -11,6 +11,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/metricsql" @@ -193,25 +194,32 @@ func Parse(pathPatterns []string, validateAnnotations, validateExpressions bool) } fp = append(fp, matches...) } + errGroup := new(utils.ErrGroup) var groups []Group for _, file := range fp { uniqueGroups := map[string]struct{}{} gr, err := parseFile(file) if err != nil { - return nil, fmt.Errorf("failed to parse file %q: %w", file, err) + errGroup.Add(fmt.Errorf("failed to parse file %q: %w", file, err)) + continue } for _, g := range gr { if err := g.Validate(validateAnnotations, validateExpressions); err != nil { - return nil, fmt.Errorf("invalid group %q in file %q: %w", g.Name, file, err) + errGroup.Add(fmt.Errorf("invalid group %q in file %q: %w", g.Name, file, err)) + continue } if _, ok := uniqueGroups[g.Name]; ok { - return nil, fmt.Errorf("group name %q duplicate in file %q", g.Name, file) + errGroup.Add(fmt.Errorf("group name %q duplicate in file %q", g.Name, file)) + continue } uniqueGroups[g.Name] = struct{}{} g.File = file groups = append(groups, g) } } + if err := errGroup.Err(); err != nil { + return nil, err + } if len(groups) < 1 { logger.Warnf("no groups found in %s", strings.Join(pathPatterns, ";")) } diff --git a/app/vmalert/main.go b/app/vmalert/main.go index a2252b5532..40d2478272 100644 --- a/app/vmalert/main.go +++ b/app/vmalert/main.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remoteread" @@ -47,6 +48,8 @@ eg. 'explore?orgId=1&left=[\"now-1h\",\"now\",\"VictoriaMetrics\",{\"expr\": \"{ remoteReadLookBack = flag.Duration("remoteRead.lookback", time.Hour, "Lookback defines how far to look into past for alerts timeseries."+ " For example, if lookback=1h then range from now() to now()-1h will be scanned.") + + dryRun = flag.Bool("dryRun", false, "Whether to check only config files without running vmalert. The rules file are validated. The `-rule` flag must be specified.") ) func main() { @@ -58,6 +61,18 @@ func main() { logger.Init() cgroup.UpdateGOMAXPROCSToCPUQuota() + if *dryRun { + u, _ := url.Parse("https://victoriametrics.com/") + notifier.InitTemplateFunc(u) + groups, err := config.Parse(*rulePath, true, true) + if err != nil { + logger.Fatalf(err.Error()) + } + if len(groups) == 0 { + logger.Fatalf("No rules for validation. Please specify path to file(s) with alerting and/or recording rules using `-rule` flag") + } + return + } ctx, cancel := context.WithCancel(context.Background()) manager, err := newManager(ctx) if err != nil { diff --git a/app/vmbackup/main.go b/app/vmbackup/main.go index 49bb728748..bbaa2a77a0 100644 --- a/app/vmbackup/main.go +++ b/app/vmbackup/main.go @@ -10,6 +10,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/actions" "github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fslocal" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fsnil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" @@ -146,9 +147,9 @@ func newDstFS() (common.RemoteFS, error) { return fs, nil } -func newOriginFS() (common.RemoteFS, error) { +func newOriginFS() (common.OriginFS, error) { if len(*origin) == 0 { - return nil, nil + return &fsnil.FS{}, nil } fs, err := actions.NewRemoteFS(*origin) if err != nil { diff --git a/app/vminsert/main.go b/app/vminsert/main.go index 616ced4b7d..504922c946 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -159,6 +159,12 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { showOriginalLabels, _ := strconv.ParseBool(r.FormValue("show_original_labels")) promscrape.WriteHumanReadableTargetsStatus(w, showOriginalLabels) return true + case "/api/v1/targets": + promscrapeAPIV1TargetsRequests.Inc() + w.Header().Set("Content-Type", "application/json") + state := r.FormValue("state") + promscrape.WriteAPIV1Targets(w, state) + return true case "/-/reload": promscrapeConfigReloadRequests.Inc() procutil.SelfSIGHUP() @@ -191,7 +197,8 @@ var ( influxQueryRequests = metrics.NewCounter(`vm_http_requests_total{path="/query", protocol="influx"}`) - promscrapeTargetsRequests = metrics.NewCounter(`vm_http_requests_total{path="/targets"}`) + promscrapeTargetsRequests = metrics.NewCounter(`vm_http_requests_total{path="/targets"}`) + promscrapeAPIV1TargetsRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/targets"}`) promscrapeConfigReloadRequests = metrics.NewCounter(`vm_http_requests_total{path="/-/reload"}`) diff --git a/app/vmrestore/README.md b/app/vmrestore/README.md index 0c80a9ad4b..59a24dda8c 100644 --- a/app/vmrestore/README.md +++ b/app/vmrestore/README.md @@ -3,7 +3,7 @@ `vmrestore` restores data from backups created by [vmbackup](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/app/vmbackup/README.md). VictoriaMetrics `v1.29.0` and newer versions must be used for working with the restored data. -Restore process can be interrupted at any time. It is automatically resumed from the inerruption point +Restore process can be interrupted at any time. It is automatically resumed from the interruption point when restarting `vmrestore` with the same args. diff --git a/app/vmselect/promql/aggr.go b/app/vmselect/promql/aggr.go index b87b34a64a..050a8cbd48 100644 --- a/app/vmselect/promql/aggr.go +++ b/app/vmselect/promql/aggr.go @@ -69,7 +69,9 @@ func newAggrFunc(afe func(tss []*timeseries) []*timeseries) aggrFunc { if err != nil { return nil, err } - return aggrFuncExt(afe, tss, &afa.ae.Modifier, afa.ae.Limit, false) + return aggrFuncExt(func(tss []*timeseries, modififer *metricsql.ModifierExpr) []*timeseries { + return afe(tss) + }, tss, &afa.ae.Modifier, afa.ae.Limit, false) } } @@ -98,7 +100,8 @@ func removeGroupTags(metricName *storage.MetricName, modifier *metricsql.Modifie } } -func aggrFuncExt(afe func(tss []*timeseries) []*timeseries, argOrig []*timeseries, modifier *metricsql.ModifierExpr, maxSeries int, keepOriginal bool) ([]*timeseries, error) { +func aggrFuncExt(afe func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries, argOrig []*timeseries, + modifier *metricsql.ModifierExpr, maxSeries int, keepOriginal bool) ([]*timeseries, error) { arg := copyTimeseriesMetricNames(argOrig, keepOriginal) // Perform grouping. @@ -124,7 +127,7 @@ func aggrFuncExt(afe func(tss []*timeseries) []*timeseries, argOrig []*timeserie dstTssCount := 0 rvs := make([]*timeseries, 0, len(m)) for _, tss := range m { - rv := afe(tss) + rv := afe(tss, modifier) rvs = append(rvs, rv...) srcTssCount += len(tss) dstTssCount += len(rv) @@ -141,7 +144,7 @@ func aggrFuncAny(afa *aggrFuncArg) ([]*timeseries, error) { if err != nil { return nil, err } - afe := func(tss []*timeseries) []*timeseries { + afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries { return tss[:1] } limit := afa.ae.Limit @@ -178,10 +181,11 @@ func aggrFuncSum(tss []*timeseries) []*timeseries { sum := float64(0) count := 0 for _, ts := range tss { - if math.IsNaN(ts.Values[i]) { + v := ts.Values[i] + if math.IsNaN(v) { continue } - sum += ts.Values[i] + sum += v count++ } if count == 0 { @@ -449,7 +453,7 @@ func aggrFuncZScore(afa *aggrFuncArg) ([]*timeseries, error) { if err != nil { return nil, err } - afe := func(tss []*timeseries) []*timeseries { + afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries { for i := range tss[0].Values { // Calculate avg and stddev for tss points at position i. // See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation @@ -550,7 +554,7 @@ func aggrFuncCountValues(afa *aggrFuncArg) ([]*timeseries, error) { // Do nothing } - afe := func(tss []*timeseries) []*timeseries { + afe := func(tss []*timeseries, modififer *metricsql.ModifierExpr) []*timeseries { m := make(map[float64]bool) for _, ts := range tss { for _, v := range ts.Values { @@ -602,7 +606,7 @@ func newAggrFuncTopK(isReverse bool) aggrFunc { if err != nil { return nil, err } - afe := func(tss []*timeseries) []*timeseries { + afe := func(tss []*timeseries, modififer *metricsql.ModifierExpr) []*timeseries { for n := range tss[0].Values { sort.Slice(tss, func(i, j int) bool { a := tss[i].Values[n] @@ -623,21 +627,32 @@ func newAggrFuncTopK(isReverse bool) aggrFunc { func newAggrFuncRangeTopK(f func(values []float64) float64, isReverse bool) aggrFunc { return func(afa *aggrFuncArg) ([]*timeseries, error) { args := afa.args - if err := expectTransformArgsNum(args, 2); err != nil { - return nil, err + if len(args) < 2 { + return nil, fmt.Errorf(`unexpected number of args; got %d; want at least %d`, len(args), 2) + } + if len(args) > 3 { + return nil, fmt.Errorf(`unexpected number of args; got %d; want no more than %d`, len(args), 3) } ks, err := getScalar(args[0], 0) if err != nil { return nil, err } - afe := func(tss []*timeseries) []*timeseries { - return getRangeTopKTimeseries(tss, ks, f, isReverse) + remainingSumTagName := "" + if len(args) == 3 { + remainingSumTagName, err = getString(args[2], 2) + if err != nil { + return nil, err + } + } + afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries { + return getRangeTopKTimeseries(tss, modifier, ks, remainingSumTagName, f, isReverse) } return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true) } } -func getRangeTopKTimeseries(tss []*timeseries, ks []float64, f func(values []float64) float64, isReverse bool) []*timeseries { +func getRangeTopKTimeseries(tss []*timeseries, modifier *metricsql.ModifierExpr, ks []float64, remainingSumTagName string, + f func(values []float64) float64, isReverse bool) []*timeseries { type tsWithValue struct { ts *timeseries value float64 @@ -661,28 +676,66 @@ func getRangeTopKTimeseries(tss []*timeseries, ks []float64, f func(values []flo for i := range maxs { tss[i] = maxs[i].ts } + remainingSumTS := getRemainingSumTimeseries(tss, modifier, ks, remainingSumTagName) for i, k := range ks { fillNaNsAtIdx(i, k, tss) } + if remainingSumTS != nil { + tss = append(tss, remainingSumTS) + } return removeNaNs(tss) } +func getRemainingSumTimeseries(tss []*timeseries, modifier *metricsql.ModifierExpr, ks []float64, remainingSumTagName string) *timeseries { + if len(remainingSumTagName) == 0 || len(tss) == 0 { + return nil + } + var dst timeseries + dst.CopyFromShallowTimestamps(tss[0]) + removeGroupTags(&dst.MetricName, modifier) + dst.MetricName.RemoveTag(remainingSumTagName) + dst.MetricName.AddTag(remainingSumTagName, remainingSumTagName) + for i, k := range ks { + kn := getIntK(k, len(tss)) + var sum float64 + count := 0 + for _, ts := range tss[:len(tss)-kn] { + v := ts.Values[i] + if math.IsNaN(v) { + continue + } + sum += v + count++ + } + if count == 0 { + sum = nan + } + dst.Values[i] = sum + } + return &dst +} + func fillNaNsAtIdx(idx int, k float64, tss []*timeseries) { - if math.IsNaN(k) { - k = 0 - } - kn := int(k) - if kn < 0 { - kn = 0 - } - if kn > len(tss) { - kn = len(tss) - } + kn := getIntK(k, len(tss)) for _, ts := range tss[:len(tss)-kn] { ts.Values[idx] = nan } } +func getIntK(k float64, kMax int) int { + if math.IsNaN(k) { + return 0 + } + kn := int(k) + if kn < 0 { + return 0 + } + if kn > kMax { + return kMax + } + return kn +} + func minValue(values []float64) float64 { if len(values) == 0 { return nan @@ -746,7 +799,7 @@ func aggrFuncOutliersK(afa *aggrFuncArg) ([]*timeseries, error) { if err != nil { return nil, err } - afe := func(tss []*timeseries) []*timeseries { + afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries { // Calculate medians for each point across tss. medians := make([]float64, len(ks)) h := histogram.GetFast() @@ -771,7 +824,7 @@ func aggrFuncOutliersK(afa *aggrFuncArg) ([]*timeseries, error) { } return sum2 } - return getRangeTopKTimeseries(tss, ks, f, false) + return getRangeTopKTimeseries(tss, &afa.ae.Modifier, ks, "", f, false) } return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true) } @@ -792,7 +845,7 @@ func aggrFuncLimitK(afa *aggrFuncArg) ([]*timeseries, error) { maxK = k } } - afe := func(tss []*timeseries) []*timeseries { + afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries { if len(tss) > maxK { tss = tss[:maxK] } @@ -833,8 +886,8 @@ func aggrFuncMedian(afa *aggrFuncArg) ([]*timeseries, error) { return aggrFuncExt(afe, tss, &afa.ae.Modifier, afa.ae.Limit, false) } -func newAggrQuantileFunc(phis []float64) func(tss []*timeseries) []*timeseries { - return func(tss []*timeseries) []*timeseries { +func newAggrQuantileFunc(phis []float64) func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries { + return func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries { dst := tss[0] h := histogram.GetFast() defer histogram.PutFast(h) diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go index 3c5c5501dd..977ae0575f 100644 --- a/app/vmselect/promql/exec_test.go +++ b/app/vmselect/promql/exec_test.go @@ -4193,7 +4193,7 @@ func TestExecSuccess(t *testing.T) { }) t.Run(`topk_max(1)`, func(t *testing.T) { t.Parallel() - q := `sort(topk_max(1, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss")))` + q := `topk_max(1, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss"))` r1 := netstorage.Result{ MetricName: metricNameExpected, Values: []float64{nan, nan, nan, 10.666666666666666, 12, 13.333333333333334}, @@ -4206,6 +4206,84 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{r1} f(q, resultExpected) }) + t.Run(`topk_max(1, remaining_sum)`, func(t *testing.T) { + t.Parallel() + q := `sort_desc(topk_max(1, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss"), "remaining_sum"))` + r1 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{nan, nan, nan, 10.666666666666666, 12, 13.333333333333334}, + Timestamps: timestampsExpected, + } + r1.MetricName.Tags = []storage.Tag{{ + Key: []byte("baz"), + Value: []byte("sss"), + }} + r2 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{10, 10, 10, 10, 10, 10}, + Timestamps: timestampsExpected, + } + r2.MetricName.Tags = []storage.Tag{ + { + Key: []byte("remaining_sum"), + Value: []byte("remaining_sum"), + }, + } + resultExpected := []netstorage.Result{r1, r2} + f(q, resultExpected) + }) + t.Run(`topk_max(2, remaining_sum)`, func(t *testing.T) { + t.Parallel() + q := `sort_desc(topk_max(2, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss"), "remaining_sum"))` + r1 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{nan, nan, nan, 10.666666666666666, 12, 13.333333333333334}, + Timestamps: timestampsExpected, + } + r1.MetricName.Tags = []storage.Tag{{ + Key: []byte("baz"), + Value: []byte("sss"), + }} + r2 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{10, 10, 10, 10, 10, 10}, + Timestamps: timestampsExpected, + } + r2.MetricName.Tags = []storage.Tag{ + { + Key: []byte("foo"), + Value: []byte("bar"), + }, + } + resultExpected := []netstorage.Result{r1, r2} + f(q, resultExpected) + }) + t.Run(`topk_max(3, remaining_sum)`, func(t *testing.T) { + t.Parallel() + q := `sort_desc(topk_max(3, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss"), "remaining_sum"))` + r1 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{nan, nan, nan, 10.666666666666666, 12, 13.333333333333334}, + Timestamps: timestampsExpected, + } + r1.MetricName.Tags = []storage.Tag{{ + Key: []byte("baz"), + Value: []byte("sss"), + }} + r2 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{10, 10, 10, 10, 10, 10}, + Timestamps: timestampsExpected, + } + r2.MetricName.Tags = []storage.Tag{ + { + Key: []byte("foo"), + Value: []byte("bar"), + }, + } + resultExpected := []netstorage.Result{r1, r2} + f(q, resultExpected) + }) t.Run(`bottomk_max(1)`, func(t *testing.T) { t.Parallel() q := `sort(bottomk_max(1, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss")))` diff --git a/app/vmselect/promql/rollup.go b/app/vmselect/promql/rollup.go index 41ca13b958..e3cada6df0 100644 --- a/app/vmselect/promql/rollup.go +++ b/app/vmselect/promql/rollup.go @@ -519,12 +519,13 @@ func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, valu } rfa.values = values[i:j] rfa.timestamps = timestamps[i:j] - if j == len(timestamps) && j > 0 && (tEnd-timestamps[j-1] > stalenessInterval || i == j && len(timestamps) == 1) { + if j == len(timestamps) && j > 0 && (tEnd-timestamps[j-1] > stalenessInterval || i == j && len(timestamps) == 1) && rc.End-tEnd >= 2*rc.Step { // Drop trailing data points in the following cases: // - if the distance between the last raw sample and tEnd exceeds stalenessInterval // - if time series contains only a single raw sample // This should prevent from double counting when a label changes in time series (for instance, // during new deployment in K8S). See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/748 + // Do not drop trailing data points for instant queries. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/845 rfa.prevValue = nan rfa.values = nil rfa.timestamps = nil diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index bf0ceb2fe7..f4682acb28 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -11,6 +11,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage/promdb" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -20,7 +21,7 @@ import ( ) var ( - retentionPeriod = flag.Int("retentionPeriod", 1, "Retention period in months") + retentionPeriod = flagutil.NewDuration("retentionPeriod", 1, "Data with timestamps outside the retentionPeriod is automatically deleted") snapshotAuthKey = flag.String("snapshotAuthKey", "", "authKey, which must be passed in query string to /snapshot* pages") forceMergeAuthKey = flag.String("forceMergeAuthKey", "", "authKey, which must be passed in query string to /internal/force_merge pages") @@ -45,12 +46,12 @@ func CheckTimeRange(tr storage.TimeRange) error { if !*denyQueriesOutsideRetention { return nil } - minAllowedTimestamp := (int64(fasttime.UnixTimestamp()) - int64(*retentionPeriod)*3600*24*30) * 1000 + minAllowedTimestamp := int64(fasttime.UnixTimestamp()*1000) - retentionPeriod.Msecs if tr.MinTimestamp > minAllowedTimestamp { return nil } return &httpserver.ErrorWithStatusCode{ - Err: fmt.Errorf("the given time range %s is outside the allowed retention of %d months according to -denyQueriesOutsideRetention", &tr, *retentionPeriod), + Err: fmt.Errorf("the given time range %s is outside the allowed -retentionPeriod=%s according to -denyQueriesOutsideRetention", &tr, retentionPeriod), StatusCode: http.StatusServiceUnavailable, } } @@ -73,12 +74,12 @@ func InitWithoutMetrics() { storage.SetBigMergeWorkersCount(*bigMergeConcurrency) storage.SetSmallMergeWorkersCount(*smallMergeConcurrency) - logger.Infof("opening storage at %q with retention period %d months", *DataPath, *retentionPeriod) + logger.Infof("opening storage at %q with -retentionPeriod=%s", *DataPath, retentionPeriod) startTime := time.Now() WG = syncwg.WaitGroup{} - strg, err := storage.OpenStorage(*DataPath, *retentionPeriod) + strg, err := storage.OpenStorage(*DataPath, retentionPeriod.Msecs) if err != nil { - logger.Fatalf("cannot open a storage at %s with retention period %d months: %s", *DataPath, *retentionPeriod, err) + logger.Fatalf("cannot open a storage at %s with -retentionPeriod=%s: %s", *DataPath, retentionPeriod, err) } Storage = strg diff --git a/dashboards/victoriametrics.json b/dashboards/victoriametrics.json index 7cd4f18d5f..948c79f43b 100644 --- a/dashboards/victoriametrics.json +++ b/dashboards/victoriametrics.json @@ -56,7 +56,7 @@ "gnetId": 10229, "graphTooltip": 0, "id": null, - "iteration": 1599034965731, + "iteration": 1603307754894, "links": [ { "icon": "doc", @@ -925,7 +925,7 @@ "dashLength": 10, "dashes": false, "datasource": "$ds", - "description": "Shows how many ongoing insertions are taking place.\n* `max` - equal to number of CPU * 2\n* `current` - current number of goroutines busy with inserting rows into storage\n\nWhen `current` hits `max` constantly, it means storage is overloaded and require more CPU.", + "description": "Shows how many ongoing insertions (not API /write calls) on disk are taking place, where:\n* `max` - equal to number of CPUs;\n* `current` - current number of goroutines busy with inserting rows into underlying storage.\n\nEvery successful API /write call results into flush on disk. However, these two actions are separated and controlled via different concurrency limiters. The `max` on this panel can't be changed and always equal to number of CPUs. \n\nWhen `current` hits `max` constantly, it means storage is overloaded and requires more CPU.\n\n", "fieldConfig": { "defaults": { "custom": {}, @@ -979,6 +979,7 @@ { "expr": "sum(vm_concurrent_addrows_capacity{job=\"$job\", instance=\"$instance\"})", "format": "time_series", + "interval": "", "intervalFactor": 1, "legendFormat": "max", "refId": "A" @@ -995,7 +996,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "Concurrent inserts ($instance)", + "title": "Concurrent flushes on disk ($instance)", "tooltip": { "shared": true, "sort": 2, @@ -1164,7 +1165,7 @@ "h": 8, "w": 12, "x": 0, - "y": 36 + "y": 3 }, "hiddenSeries": false, "id": 10, @@ -1250,7 +1251,7 @@ "dashLength": 10, "dashes": false, "datasource": "$ds", - "description": "How many datapoints are in RAM queue waiting to be written into storage. The number of pending data points should be in the range from 0 to `2*`, since VictoriaMetrics pushes pending data to persistent storage every second.", + "description": "Shows the time needed to reach the 100% of disk capacity based on the following params:\n* free disk space;\n* rows ingestion rate;\n* compression.\n\nUse this panel for capacity planning in order to estimate the time remaining for running out of the disk space.\n\n", "fieldConfig": { "defaults": { "custom": {}, @@ -1264,63 +1265,53 @@ "h": 8, "w": 12, "x": 12, - "y": 36 + "y": 3 }, "hiddenSeries": false, - "id": 34, + "id": 73, "legend": { - "avg": false, - "current": false, + "alignAsTable": true, + "avg": true, + "current": true, + "hideZero": true, "max": false, "min": false, "show": false, "total": false, - "values": false + "values": true }, "lines": true, "linewidth": 1, "links": [], - "nullPointMode": "null", + "nullPointMode": "null as zero", "percentage": false, "pluginVersion": "7.1.1", "pointradius": 2, "points": false, "renderer": "flot", - "seriesOverrides": [ - { - "alias": "pending index entries", - "yaxis": 2 - } - ], + "seriesOverrides": [], "spaceLength": 10, "stack": false, "steppedLine": false, "targets": [ { - "expr": "vm_pending_rows{job=\"$job\", instance=~\"$instance\", type=\"storage\"}", + "expr": "vm_free_disk_space_bytes{job=\"$job\", instance=\"$instance\"} / (sum(rate(vm_rows_added_to_storage_total{job=\"$job\", instance=\"$instance\"}[1d])) * (sum(vm_data_size_bytes{job=\"$job\", instance=\"$instance\", type!=\"indexdb\"}) / sum(vm_rows{job=\"$job\", instance=\"$instance\", type!=\"indexdb\"})))", "format": "time_series", "hide": false, + "interval": "", "intervalFactor": 1, - "legendFormat": "pending datapoints", + "legendFormat": "", "refId": "A" - }, - { - "expr": "vm_pending_rows{job=\"$job\", instance=~\"$instance\", type=\"indexdb\"}", - "format": "time_series", - "hide": false, - "intervalFactor": 1, - "legendFormat": "pending index entries", - "refId": "B" } ], "thresholds": [], "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "Pending datapoints ($instance)", + "title": "Storage full ETA ($instance)", "tooltip": { "shared": true, - "sort": 0, + "sort": 2, "value_type": "individual" }, "type": "graph", @@ -1333,7 +1324,8 @@ }, "yaxes": [ { - "format": "short", + "decimals": null, + "format": "s", "label": null, "logBase": 1, "max": null, @@ -1341,8 +1333,7 @@ "show": true }, { - "decimals": 3, - "format": "none", + "format": "short", "label": null, "logBase": 1, "max": null, @@ -1375,7 +1366,7 @@ "h": 8, "w": 12, "x": 0, - "y": 44 + "y": 11 }, "hiddenSeries": false, "id": 30, @@ -1472,7 +1463,7 @@ "dashLength": 10, "dashes": false, "datasource": "$ds", - "description": "Data parts of LSM tree.\nHigh number of parts could be an evidence of slow merge performance - check the resource utilization.\n* `indexdb` - inverted index\n* `storage/small` - recently added parts of data ingested into storage(hot data)\n* `storage/big` - small parts gradually merged into big parts (cold data)", + "description": "How many datapoints are in RAM queue waiting to be written into storage. The number of pending data points should be in the range from 0 to `2*`, since VictoriaMetrics pushes pending data to persistent storage every second.", "fieldConfig": { "defaults": { "custom": {}, @@ -1486,16 +1477,16 @@ "h": 8, "w": 12, "x": 12, - "y": 44 + "y": 11 }, "hiddenSeries": false, - "id": 36, + "id": 34, "legend": { "avg": false, "current": false, "max": false, "min": false, - "show": true, + "show": false, "total": false, "values": false }, @@ -1508,27 +1499,41 @@ "pointradius": 2, "points": false, "renderer": "flot", - "seriesOverrides": [], + "seriesOverrides": [ + { + "alias": "pending index entries", + "yaxis": 2 + } + ], "spaceLength": 10, "stack": false, "steppedLine": false, "targets": [ { - "expr": "sum(vm_parts{job=\"$job\", instance=\"$instance\"}) by (type)", + "expr": "vm_pending_rows{job=\"$job\", instance=~\"$instance\", type=\"storage\"}", "format": "time_series", + "hide": false, "intervalFactor": 1, - "legendFormat": "{{type}}", + "legendFormat": "pending datapoints", "refId": "A" + }, + { + "expr": "vm_pending_rows{job=\"$job\", instance=~\"$instance\", type=\"indexdb\"}", + "format": "time_series", + "hide": false, + "intervalFactor": 1, + "legendFormat": "pending index entries", + "refId": "B" } ], "thresholds": [], "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "LSM parts ($instance)", + "title": "Pending datapoints ($instance)", "tooltip": { "shared": true, - "sort": 2, + "sort": 0, "value_type": "individual" }, "type": "graph", @@ -1549,7 +1554,8 @@ "show": true }, { - "format": "short", + "decimals": 3, + "format": "none", "label": null, "logBase": 1, "max": null, @@ -1582,7 +1588,7 @@ "h": 8, "w": 12, "x": 0, - "y": 52 + "y": 19 }, "hiddenSeries": false, "id": 53, @@ -1669,6 +1675,196 @@ "alignLevel": null } }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "$ds", + "description": "Data parts of LSM tree.\nHigh number of parts could be an evidence of slow merge performance - check the resource utilization.\n* `indexdb` - inverted index\n* `storage/small` - recently added parts of data ingested into storage(hot data)\n* `storage/big` - small parts gradually merged into big parts (cold data)", + "fieldConfig": { + "defaults": { + "custom": {}, + "links": [] + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 19 + }, + "hiddenSeries": false, + "id": 36, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pluginVersion": "7.1.1", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "sum(vm_parts{job=\"$job\", instance=\"$instance\"}) by (type)", + "format": "time_series", + "intervalFactor": 1, + "legendFormat": "{{type}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "LSM parts ($instance)", + "tooltip": { + "shared": true, + "sort": 2, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "$ds", + "description": "The number of on-going merges in storage nodes. It is expected to have high numbers for `storage/small` metric.", + "fieldConfig": { + "defaults": { + "custom": {}, + "links": [] + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 27 + }, + "hiddenSeries": false, + "id": 62, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "percentage": false, + "pluginVersion": "7.1.1", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "sum(vm_active_merges{job=\"$job\", instance=\"$instance\"}) by(type)", + "legendFormat": "{{type}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Active merges ($instance)", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": 0, + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, { "aliasColors": {}, "bars": false, @@ -1689,7 +1885,7 @@ "h": 8, "w": 12, "x": 12, - "y": 52 + "y": 27 }, "hiddenSeries": false, "id": 55, @@ -1764,194 +1960,6 @@ "alignLevel": null } }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "$ds", - "description": "The number of on-going merges in storage nodes. It is expected to have high numbers for `storage/small` metric.", - "fieldConfig": { - "defaults": { - "custom": {}, - "links": [] - }, - "overrides": [] - }, - "fill": 1, - "fillGradient": 0, - "gridPos": { - "h": 8, - "w": 12, - "x": 0, - "y": 60 - }, - "hiddenSeries": false, - "id": 62, - "legend": { - "avg": false, - "current": false, - "max": false, - "min": false, - "show": true, - "total": false, - "values": false - }, - "lines": true, - "linewidth": 1, - "nullPointMode": "null", - "percentage": false, - "pluginVersion": "7.1.1", - "pointradius": 2, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "expr": "sum(vm_active_merges{job=\"$job\", instance=\"$instance\"}) by(type)", - "legendFormat": "{{type}}", - "refId": "A" - } - ], - "thresholds": [], - "timeFrom": null, - "timeRegions": [], - "timeShift": null, - "title": "Active merges ($instance)", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "decimals": 0, - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": "0", - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": "0", - "show": true - } - ], - "yaxis": { - "align": false, - "alignLevel": null - } - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "$ds", - "description": "The number of rows merged per second by storage nodes.", - "fieldConfig": { - "defaults": { - "custom": {}, - "links": [] - }, - "overrides": [] - }, - "fill": 1, - "fillGradient": 0, - "gridPos": { - "h": 8, - "w": 12, - "x": 12, - "y": 60 - }, - "hiddenSeries": false, - "id": 64, - "legend": { - "avg": false, - "current": false, - "max": false, - "min": false, - "show": true, - "total": false, - "values": false - }, - "lines": true, - "linewidth": 1, - "nullPointMode": "null", - "percentage": false, - "pluginVersion": "7.1.1", - "pointradius": 2, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "expr": "sum(rate(vm_rows_merged_total{job=\"$job\", instance=\"$instance\"}[5m])) by(type)", - "legendFormat": "{{type}}", - "refId": "A" - } - ], - "thresholds": [], - "timeFrom": null, - "timeRegions": [], - "timeShift": null, - "title": "Merge speed ($instance)", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "decimals": 0, - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": "0", - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": "0", - "show": true - } - ], - "yaxis": { - "align": false, - "alignLevel": null - } - }, { "aliasColors": {}, "bars": false, @@ -1972,7 +1980,7 @@ "h": 8, "w": 12, "x": 0, - "y": 68 + "y": 35 }, "hiddenSeries": false, "id": 58, @@ -2050,6 +2058,100 @@ "alignLevel": null } }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "$ds", + "description": "The number of rows merged per second by storage nodes.", + "fieldConfig": { + "defaults": { + "custom": {}, + "links": [] + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 35 + }, + "hiddenSeries": false, + "id": 64, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "percentage": false, + "pluginVersion": "7.1.1", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "sum(rate(vm_rows_merged_total{job=\"$job\", instance=\"$instance\"}[5m])) by(type)", + "legendFormat": "{{type}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Merge speed ($instance)", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": 0, + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, { "aliasColors": {}, "bars": false, @@ -2070,7 +2172,7 @@ "h": 8, "w": 12, "x": 12, - "y": 68 + "y": 43 }, "hiddenSeries": false, "id": 67, @@ -3293,4 +3395,4 @@ "title": "VictoriaMetrics", "uid": "wNf0q_kZk", "version": 1 -} +} \ No newline at end of file diff --git a/deployment/docker/docker-compose.yml b/deployment/docker/docker-compose.yml index 0f856b5151..8ccab8569f 100644 --- a/deployment/docker/docker-compose.yml +++ b/deployment/docker/docker-compose.yml @@ -21,7 +21,10 @@ services: image: victoriametrics/victoria-metrics ports: - 8428:8428 + - 8089:8089 + - 8089:8089/udp - 2003:2003 + - 2003:2003/udp - 4242:4242 volumes: - vmdata:/storage @@ -30,6 +33,7 @@ services: - '--graphiteListenAddr=:2003' - '--opentsdbListenAddr=:4242' - '--httpListenAddr=:8428' + - '--influxListenAddr=:8089' networks: - vm_net restart: always diff --git a/docs/MetricsQL.md b/docs/MetricsQL.md index c9c085019a..c1f3a9872c 100644 --- a/docs/MetricsQL.md +++ b/docs/MetricsQL.md @@ -113,6 +113,7 @@ This functionality can be tried at [an editable Grafana dashboard](http://play-g - `bottomk_max(k, q)` - returns bottom K time series with the min maximums on the given time range - `bottomk_avg(k, q)` - returns bottom K time series with the min averages on the given time range - `bottomk_median(k, q)` - returns bottom K time series with the min medians on the given time range + All the `topk_*` and `bottomk_*` functions accept optional third argument - label name for the sum of the remaining time series outside top K or bottom K time series. For example, `topk_max(3, process_resident_memory_bytes, "remaining_sum")` would return up to 3 time series with the maximum value for `process_resident_memory_bytes` plus fourth time series with the sum of the remaining time series if any. The fourth time series will contain `remaining_sum="remaining_sum"` additional label. - `share_le_over_time(m[d], le)` - returns share (in the range 0..1) of values in `m` over `d`, which are smaller or equal to `le`. Useful for calculating SLI and SLO. Example: `share_le_over_time(memory_usage_bytes[24h], 100*1024*1024)` returns the share of time series values for the last 24 hours when memory usage was below or equal to 100MB. - `share_gt_over_time(m[d], gt)` - returns share (in the range 0..1) of values in `m` over `d`, which are bigger than `gt`. Useful for calculating SLI and SLO. diff --git a/docs/Quick-Start.md b/docs/Quick-Start.md index 80f1218e82..05d52ebeae 100644 --- a/docs/Quick-Start.md +++ b/docs/Quick-Start.md @@ -8,7 +8,7 @@ and their default values. Default flag values should fit the majoirty of cases. The minimum required flags to configure are: * `-storageDataPath` - path to directory where VictoriaMetrics stores all the data. - * `-retentionPeriod` - data retention in months. + * `-retentionPeriod` - data retention. For instance: diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index 81b10537f8..40b212b6dc 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -164,7 +164,7 @@ or [docker image](https://hub.docker.com/r/victoriametrics/victoria-metrics/) wi The following command-line flags are used the most: * `-storageDataPath` - path to data directory. VictoriaMetrics stores all the data in this directory. Default path is `victoria-metrics-data` in the current working directory. -* `-retentionPeriod` - retention period in months for stored data. Older data is automatically deleted. Default period is 1 month. +* `-retentionPeriod` - retention for stored data. Older data is automatically deleted. Default retention is 1 month. See [these docs](#retention) for more details. Other flags have good enough default values, so set them only if you really need this. Pass `-help` to see all the available flags with description and default values. @@ -495,6 +495,7 @@ VictoriaMetrics supports the following handlers from [Prometheus querying API](h * [/api/v1/labels](https://prometheus.io/docs/prometheus/latest/querying/api/#getting-label-names) * [/api/v1/label/.../values](https://prometheus.io/docs/prometheus/latest/querying/api/#querying-label-values) * [/api/v1/status/tsdb](https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats) +* [/api/v1/targets](https://prometheus.io/docs/prometheus/latest/querying/api/#targets) - see [these docs](#how-to-scrape-prometheus-exporters-such-as-node-exporter) for more details. These handlers can be queried from Prometheus-compatible clients such as Grafana or curl. @@ -1048,6 +1049,7 @@ The de-duplication reduces disk space usage if multiple identically configured P write data to the same VictoriaMetrics instance. Note that these Prometheus instances must have identical `external_labels` section in their configs, so they write data to the same time series. + ### Retention Retention is configured with `-retentionPeriod` command-line flag. For instance, `-retentionPeriod=3` means @@ -1059,6 +1061,10 @@ For example if `-retentionPeriod` is set to 1, data for January is deleted on Ma It is safe to extend `-retentionPeriod` on existing data. If `-retentionPeriod` is set to lower value than before then data outside the configured period will be eventually deleted. +VictoriaMetrics supports retention smaller than 1 month. For example, `-retentionPeriod=5d` would set data retention for 5 days. +Older data is eventually deleted during [background merge](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282). + + ### Multiple retentions Just start multiple VictoriaMetrics instances with distinct values for the following flags: diff --git a/docs/vmagent.md b/docs/vmagent.md index 428ea6e978..22d2c64030 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -211,9 +211,13 @@ either via `vmagent` itself or via Prometheus, so the exported metrics could be Use official [Grafana dashboard](https://grafana.com/grafana/dashboards/12683) for `vmagent` state overview. If you have suggestions, improvements or found a bug - feel free to open an issue on github or add review to the dashboard. -`vmagent` also exports target statuses at `http://vmagent-host:8429/targets` page in plaintext format. -`/targets` handler accepts optional `show_original_labels=1` query arg, which shows the original labels per each target -before applying relabeling. This information may be useful for debugging target relabeling. +`vmagent` also exports target statuses at the following handlers: + +* `http://vmagent-host:8429/targets`. This handler returns human-readable plaintext status for every active target. +This page is convenient to query from command line with `wget`, `curl` or similar tools. +It accepts optional `show_original_labels=1` query arg, which shows the original labels per each target before applying relabeling. +This information may be useful for debugging target relabeling. +* `http://vmagent-host:8429/api/v1/targets`. This handler returns data compatible with [the corresponding page from Prometheus API](https://prometheus.io/docs/prometheus/latest/querying/api/#targets). ### Troubleshooting @@ -224,7 +228,26 @@ before applying relabeling. This information may be useful for debugging target since `vmagent` establishes at least a single TCP connection per each target. * When `vmagent` scrapes many unreliable targets, it can flood error log with scrape errors. These errors can be suppressed - by passing `-promscrape.suppressScrapeErrors` command-line flag to `vmagent`. The most recent scrape error per each target can be observed at `http://vmagent-host:8429/targets`. + by passing `-promscrape.suppressScrapeErrors` command-line flag to `vmagent`. The most recent scrape error per each target can be observed at `http://vmagent-host:8429/targets` + and `http://vmagent-host:8429/api/v1/targets`. + +* If `vmagent` scrapes targets with millions of metrics per each target (for instance, when scraping [federation endpoints](https://prometheus.io/docs/prometheus/latest/federation/)), + then it is recommended enabling `stream parsing mode` in order to reduce memory usage during scraping. This mode may be enabled either globally for all the scrape targets + by passing `-promscrape.streamParse` command-line flag or on a per-scrape target basis with `stream_parse: true` option. For example: + + ```yml + scrape_configs: + - job_name: 'big-federate' + stream_parse: true + static_configs: + - targets: + - big-prometeus1 + - big-prometeus2 + honor_labels: true + metrics_path: /federate + params: + 'match[]': ['{__name__!=""}'] + ``` * It is recommended to increase `-remoteWrite.queues` if `vmagent_remotewrite_pending_data_bytes` metric exported at `http://vmagent-host:8429/metrics` page constantly grows. diff --git a/docs/vmrestore.md b/docs/vmrestore.md index 0c80a9ad4b..59a24dda8c 100644 --- a/docs/vmrestore.md +++ b/docs/vmrestore.md @@ -3,7 +3,7 @@ `vmrestore` restores data from backups created by [vmbackup](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/app/vmbackup/README.md). VictoriaMetrics `v1.29.0` and newer versions must be used for working with the restored data. -Restore process can be interrupted at any time. It is automatically resumed from the inerruption point +Restore process can be interrupted at any time. It is automatically resumed from the interruption point when restarting `vmrestore` with the same args. diff --git a/lib/flagutil/duration.go b/lib/flagutil/duration.go new file mode 100644 index 0000000000..8dd920f67c --- /dev/null +++ b/lib/flagutil/duration.go @@ -0,0 +1,69 @@ +package flagutil + +import ( + "flag" + "fmt" + "strconv" + "strings" + + "github.com/VictoriaMetrics/metricsql" +) + +// NewDuration returns new `duration` flag with the given name, defaultValue and description. +// +// DefaultValue is in months. +func NewDuration(name string, defaultValue float64, description string) *Duration { + description += "\nThe following optional suffixes are supported: h (hour), d (day), w (week), y (year). If suffix isn't set, then the duration is counted in months" + d := Duration{ + Msecs: int64(defaultValue * msecsPerMonth), + valueString: fmt.Sprintf("%g", defaultValue), + } + flag.Var(&d, name, description) + return &d +} + +// Duration is a flag for holding duration. +type Duration struct { + // Msecs contains parsed duration in milliseconds. + Msecs int64 + + valueString string +} + +// String implements flag.Value interface +func (d *Duration) String() string { + return d.valueString +} + +// Set implements flag.Value interface +func (d *Duration) Set(value string) error { + // An attempt to parse value in months. + months, err := strconv.ParseFloat(value, 64) + if err == nil { + if months > maxMonths { + return fmt.Errorf("duration months must be smaller than %d; got %g", maxMonths, months) + } + if months < 0 { + return fmt.Errorf("duration months cannot be negative; got %g", months) + } + d.Msecs = int64(months * msecsPerMonth) + d.valueString = value + return nil + } + // Parse duration. + value = strings.ToLower(value) + if strings.HasSuffix(value, "m") { + return fmt.Errorf("duration in months must be set without `m` suffix due to ambiguity with duration in minutes; got %s", value) + } + msecs, err := metricsql.PositiveDurationValue(value, 0) + if err != nil { + return err + } + d.Msecs = msecs + d.valueString = value + return nil +} + +const maxMonths = 12 * 100 + +const msecsPerMonth = 31 * 24 * 3600 * 1000 diff --git a/lib/flagutil/duration_test.go b/lib/flagutil/duration_test.go new file mode 100644 index 0000000000..cdb1b59455 --- /dev/null +++ b/lib/flagutil/duration_test.go @@ -0,0 +1,57 @@ +package flagutil + +import ( + "strings" + "testing" +) + +func TestDurationSetFailure(t *testing.T) { + f := func(value string) { + t.Helper() + var d Duration + if err := d.Set(value); err == nil { + t.Fatalf("expecting non-nil error in d.Set(%q)", value) + } + } + f("") + f("foobar") + f("5foobar") + f("ah") + f("134xd") + f("2.43sdfw") + + // Too big value in months + f("12345") + + // Negative duration + f("-1") + f("-34h") + + // Duration in minutes is confused with duration in months + f("1m") +} + +func TestDurationSetSuccess(t *testing.T) { + f := func(value string, expectedMsecs int64) { + t.Helper() + var d Duration + if err := d.Set(value); err != nil { + t.Fatalf("unexpected error in d.Set(%q): %s", value, err) + } + if d.Msecs != expectedMsecs { + t.Fatalf("unexpected result; got %d; want %d", d.Msecs, expectedMsecs) + } + valueString := d.String() + valueExpected := strings.ToLower(value) + if valueString != valueExpected { + t.Fatalf("unexpected valueString; got %q; want %q", valueString, valueExpected) + } + } + f("0", 0) + f("1", msecsPerMonth) + f("123.456", 123.456*msecsPerMonth) + f("1h", 3600*1000) + f("1.5d", 1.5*24*3600*1000) + f("2.3W", 2.3*7*24*3600*1000) + f("0.25y", 0.25*365*24*3600*1000) +} diff --git a/lib/httpserver/httpserver.go b/lib/httpserver/httpserver.go index efa7d6c99c..1cf52c77a4 100644 --- a/lib/httpserver/httpserver.go +++ b/lib/httpserver/httpserver.go @@ -306,6 +306,9 @@ func maybeGzipResponseWriter(w http.ResponseWriter, r *http.Request) http.Respon if *disableResponseCompression { return w } + if r.Header.Get("Connection") == "Upgrade" { + return w + } ae := r.Header.Get("Accept-Encoding") if ae == "" { return w diff --git a/lib/memory/memory.go b/lib/memory/memory.go index aeab21812c..fc225f596d 100644 --- a/lib/memory/memory.go +++ b/lib/memory/memory.go @@ -35,12 +35,12 @@ func initOnce() { mem := sysTotalMemory() if allowedBytes.N <= 0 { if *allowedPercent < 1 || *allowedPercent > 200 { - logger.Panicf("FATAL: -memory.allowedPercent must be in the range [1...200]; got %f", *allowedPercent) + logger.Panicf("FATAL: -memory.allowedPercent must be in the range [1...200]; got %g", *allowedPercent) } percent := *allowedPercent / 100 allowedMemory = int(float64(mem) * percent) remainingMemory = mem - allowedMemory - logger.Infof("limiting caches to %d bytes, leaving %d bytes to the OS according to -memory.allowedPercent=%f", allowedMemory, remainingMemory, *allowedPercent) + logger.Infof("limiting caches to %d bytes, leaving %d bytes to the OS according to -memory.allowedPercent=%g", allowedMemory, remainingMemory, *allowedPercent) } else { allowedMemory = allowedBytes.N remainingMemory = mem - allowedMemory diff --git a/lib/promscrape/client.go b/lib/promscrape/client.go index e7f4d0f5e6..9504f9f030 100644 --- a/lib/promscrape/client.go +++ b/lib/promscrape/client.go @@ -1,13 +1,18 @@ package promscrape import ( + "context" "crypto/tls" "flag" "fmt" + "io" + "io/ioutil" + "net/http" "strings" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/fasthttp" "github.com/VictoriaMetrics/metrics" ) @@ -22,11 +27,19 @@ var ( "This may be useful when targets has no support for HTTP keep-alive connection. "+ "It is possible to set `disable_keepalive: true` individually per each 'scrape_config` section in '-promscrape.config' for fine grained control. "+ "Note that disabling HTTP keep-alive may increase load on both vmagent and scrape targets") + streamParse = flag.Bool("promscrape.streamParse", false, "Whether to enable stream parsing for metrics obtained from scrape targets. This may be useful "+ + "for reducing memory usage when millions of metrics are exposed per each scrape target. "+ + "It is posible to set `stream_parse: true` individually per each `scrape_config` section in `-promscrape.config` for fine grained control") ) type client struct { + // hc is the default client optimized for common case of scraping targets with moderate number of metrics. hc *fasthttp.HostClient + // sc (aka `stream client`) is used instead of hc if ScrapeWork.ParseStream is set. + // It may be useful for scraping targets with millions of metrics per target. + sc *http.Client + scrapeURL string host string requestURI string @@ -64,8 +77,23 @@ func newClient(sw *ScrapeWork) *client { MaxResponseBodySize: maxScrapeSize.N, MaxIdempotentRequestAttempts: 1, } + var sc *http.Client + if *streamParse || sw.StreamParse { + sc = &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: tlsCfg, + TLSHandshakeTimeout: 10 * time.Second, + IdleConnTimeout: 2 * sw.ScrapeInterval, + DisableCompression: *disableCompression || sw.DisableCompression, + DisableKeepAlives: *disableKeepAlive || sw.DisableKeepAlive, + DialContext: statStdDial, + }, + Timeout: sw.ScrapeTimeout, + } + } return &client{ hc: hc, + sc: sc, scrapeURL: sw.ScrapeURL, host: host, @@ -76,6 +104,43 @@ func newClient(sw *ScrapeWork) *client { } } +func (c *client) GetStreamReader() (*streamReader, error) { + deadline := time.Now().Add(c.hc.ReadTimeout) + ctx, cancel := context.WithDeadline(context.Background(), deadline) + req, err := http.NewRequestWithContext(ctx, "GET", c.scrapeURL, nil) + if err != nil { + cancel() + return nil, fmt.Errorf("cannot create request for %q: %w", c.scrapeURL, err) + } + // The following `Accept` header has been copied from Prometheus sources. + // See https://github.com/prometheus/prometheus/blob/f9d21f10ecd2a343a381044f131ea4e46381ce09/scrape/scrape.go#L532 . + // This is needed as a workaround for scraping stupid Java-based servers such as Spring Boot. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/608 for details. + // Do not bloat the `Accept` header with OpenMetrics shit, since it looks like dead standard now. + req.Header.Set("Accept", "text/plain;version=0.0.4;q=1,*/*;q=0.1") + if c.authHeader != "" { + req.Header.Set("Authorization", c.authHeader) + } + resp, err := c.sc.Do(req) + if err != nil { + cancel() + return nil, fmt.Errorf("cannot scrape %q: %w", c.scrapeURL, err) + } + if resp.StatusCode != http.StatusOK { + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_scrapes_total{status_code="%d"}`, resp.StatusCode)).Inc() + respBody, _ := ioutil.ReadAll(resp.Body) + _ = resp.Body.Close() + cancel() + return nil, fmt.Errorf("unexpected status code returned when scraping %q: %d; expecting %d; response body: %q", + c.scrapeURL, resp.StatusCode, http.StatusOK, respBody) + } + scrapesOK.Inc() + return &streamReader{ + r: resp.Body, + cancel: cancel, + }, nil +} + func (c *client) ReadData(dst []byte) ([]byte, error) { deadline := time.Now().Add(c.hc.ReadTimeout) req := fasthttp.AcquireRequest() @@ -87,7 +152,7 @@ func (c *client) ReadData(dst []byte) ([]byte, error) { // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/608 for details. // Do not bloat the `Accept` header with OpenMetrics shit, since it looks like dead standard now. req.Header.Set("Accept", "text/plain;version=0.0.4;q=1,*/*;q=0.1") - if !*disableCompression || c.disableCompression { + if !*disableCompression && !c.disableCompression { req.Header.Set("Accept-Encoding", "gzip") } if *disableKeepAlive || c.disableKeepAlive { @@ -131,7 +196,6 @@ func (c *client) ReadData(dst []byte) ([]byte, error) { } return dst, fmt.Errorf("error when scraping %q: %w", c.scrapeURL, err) } - dstLen := len(dst) if ce := resp.Header.Peek("Content-Encoding"); string(ce) == "gzip" { var err error var src []byte @@ -154,7 +218,7 @@ func (c *client) ReadData(dst []byte) ([]byte, error) { if statusCode != fasthttp.StatusOK { metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_scrapes_total{status_code="%d"}`, statusCode)).Inc() return dst, fmt.Errorf("unexpected status code returned when scraping %q: %d; expecting %d; response body: %q", - c.scrapeURL, statusCode, fasthttp.StatusOK, dst[dstLen:]) + c.scrapeURL, statusCode, fasthttp.StatusOK, dst) } scrapesOK.Inc() fasthttp.ReleaseResponse(resp) @@ -185,3 +249,22 @@ func doRequestWithPossibleRetry(hc *fasthttp.HostClient, req *fasthttp.Request, } } } + +type streamReader struct { + r io.ReadCloser + cancel context.CancelFunc + bytesRead int64 +} + +func (sr *streamReader) Read(p []byte) (int, error) { + n, err := sr.r.Read(p) + sr.bytesRead += int64(n) + return n, err +} + +func (sr *streamReader) MustClose() { + sr.cancel() + if err := sr.r.Close(); err != nil { + logger.Errorf("cannot close reader: %s", err) + } +} diff --git a/lib/promscrape/config.go b/lib/promscrape/config.go index 9595e78335..d8ca914699 100644 --- a/lib/promscrape/config.go +++ b/lib/promscrape/config.go @@ -84,6 +84,7 @@ type ScrapeConfig struct { // These options are supported only by lib/promscrape. DisableCompression bool `yaml:"disable_compression"` DisableKeepAlive bool `yaml:"disable_keepalive"` + StreamParse bool `yaml:"stream_parse"` // This is set in loadConfig swc *scrapeWorkConfig @@ -473,6 +474,7 @@ func getScrapeWorkConfig(sc *ScrapeConfig, baseDir string, globalCfg *GlobalConf sampleLimit: sc.SampleLimit, disableCompression: sc.DisableCompression, disableKeepAlive: sc.DisableKeepAlive, + streamParse: sc.StreamParse, } return swc, nil } @@ -493,6 +495,7 @@ type scrapeWorkConfig struct { sampleLimit int disableCompression bool disableKeepAlive bool + streamParse bool } func appendKubernetesScrapeWork(dst []ScrapeWork, sdc *kubernetes.SDConfig, baseDir string, swc *scrapeWorkConfig) ([]ScrapeWork, bool) { @@ -642,6 +645,7 @@ func appendScrapeWork(dst []ScrapeWork, swc *scrapeWorkConfig, target string, ex labels = promrelabel.RemoveMetaLabels(labels[:0], labels) if len(labels) == 0 { // Drop target without labels. + droppedTargetsMap.Register(originalLabels) return dst, nil } // See https://www.robustperception.io/life-of-a-label @@ -652,10 +656,12 @@ func appendScrapeWork(dst []ScrapeWork, swc *scrapeWorkConfig, target string, ex addressRelabeled := promrelabel.GetLabelValueByName(labels, "__address__") if len(addressRelabeled) == 0 { // Drop target without scrape address. + droppedTargetsMap.Register(originalLabels) return dst, nil } if strings.Contains(addressRelabeled, "/") { // Drop target with '/' + droppedTargetsMap.Register(originalLabels) return dst, nil } addressRelabeled = addMissingPort(schemeRelabeled, addressRelabeled) @@ -663,6 +669,9 @@ func appendScrapeWork(dst []ScrapeWork, swc *scrapeWorkConfig, target string, ex if metricsPathRelabeled == "" { metricsPathRelabeled = "/metrics" } + if !strings.HasPrefix(metricsPathRelabeled, "/") { + metricsPathRelabeled = "/" + metricsPathRelabeled + } paramsRelabeled := getParamsFromLabels(labels, swc.params) optionalQuestion := "?" if len(paramsRelabeled) == 0 || strings.Contains(metricsPathRelabeled, "?") { @@ -696,6 +705,7 @@ func appendScrapeWork(dst []ScrapeWork, swc *scrapeWorkConfig, target string, ex SampleLimit: swc.sampleLimit, DisableCompression: swc.disableCompression, DisableKeepAlive: swc.disableKeepAlive, + StreamParse: swc.streamParse, jobNameOriginal: swc.jobName, }) diff --git a/lib/promscrape/config_test.go b/lib/promscrape/config_test.go index e84567b404..380c481e7c 100644 --- a/lib/promscrape/config_test.go +++ b/lib/promscrape/config_test.go @@ -1276,6 +1276,7 @@ scrape_configs: sample_limit: 100 disable_keepalive: true disable_compression: true + stream_parse: true static_configs: - targets: - 192.168.1.2 # SNMP device. @@ -1328,9 +1329,49 @@ scrape_configs: SampleLimit: 100, DisableKeepAlive: true, DisableCompression: true, + StreamParse: true, jobNameOriginal: "snmp", }, }) + f(` +scrape_configs: +- job_name: path wo slash + static_configs: + - targets: ["foo.bar:1234"] + relabel_configs: + - replacement: metricspath + target_label: __metrics_path__ +`, []ScrapeWork{ + { + ScrapeURL: "http://foo.bar:1234/metricspath", + ScrapeInterval: defaultScrapeInterval, + ScrapeTimeout: defaultScrapeTimeout, + Labels: []prompbmarshal.Label{ + { + Name: "__address__", + Value: "foo.bar:1234", + }, + { + Name: "__metrics_path__", + Value: "metricspath", + }, + { + Name: "__scheme__", + Value: "http", + }, + { + Name: "instance", + Value: "foo.bar:1234", + }, + { + Name: "job", + Value: "path wo slash", + }, + }, + jobNameOriginal: "path wo slash", + AuthConfig: &promauth.Config{}, + }, + }) } var defaultRegexForRelabelConfig = regexp.MustCompile("^(.*)$") diff --git a/lib/promscrape/scraper.go b/lib/promscrape/scraper.go index bd611c13ef..341e66462a 100644 --- a/lib/promscrape/scraper.go +++ b/lib/promscrape/scraper.go @@ -284,6 +284,7 @@ func (sg *scraperGroup) update(sws []ScrapeWork) { "original labels for target1: %s; original labels for target2: %s", sw.ScrapeURL, sw.LabelsString(), promLabelsString(originalLabels), promLabelsString(sw.OriginalLabels)) } + droppedTargetsMap.Register(sw.OriginalLabels) continue } swsMap[key] = sw.OriginalLabels @@ -333,6 +334,7 @@ func newScraper(sw *ScrapeWork, group string, pushData func(wr *prompbmarshal.Wr sc.sw.Config = *sw sc.sw.ScrapeGroup = group sc.sw.ReadData = c.ReadData + sc.sw.GetStreamReader = c.GetStreamReader sc.sw.PushData = pushData return sc } diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index 939fff88da..81515c1571 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -82,19 +82,22 @@ type ScrapeWork struct { // Whether to disable HTTP keep-alive when querying ScrapeURL. DisableKeepAlive bool + // Whether to parse target responses in a streaming manner. + StreamParse bool + // The original 'job_name' jobNameOriginal string } // key returns unique identifier for the given sw. // -// it can be used for comparing for equality two ScrapeWork objects. +// it can be used for comparing for equality for two ScrapeWork objects. func (sw *ScrapeWork) key() string { // Do not take into account OriginalLabels. key := fmt.Sprintf("ScrapeURL=%s, ScrapeInterval=%s, ScrapeTimeout=%s, HonorLabels=%v, HonorTimestamps=%v, Labels=%s, "+ - "AuthConfig=%s, MetricRelabelConfigs=%s, SampleLimit=%d, DisableCompression=%v, DisableKeepAlive=%v", + "AuthConfig=%s, MetricRelabelConfigs=%s, SampleLimit=%d, DisableCompression=%v, DisableKeepAlive=%v, StreamParse=%v", sw.ScrapeURL, sw.ScrapeInterval, sw.ScrapeTimeout, sw.HonorLabels, sw.HonorTimestamps, sw.LabelsString(), - sw.AuthConfig.String(), sw.metricRelabelConfigsString(), sw.SampleLimit, sw.DisableCompression, sw.DisableKeepAlive) + sw.AuthConfig.String(), sw.metricRelabelConfigsString(), sw.SampleLimit, sw.DisableCompression, sw.DisableKeepAlive, sw.StreamParse) return key } @@ -132,6 +135,9 @@ type scrapeWork struct { // ReadData is called for reading the data. ReadData func(dst []byte) ([]byte, error) + // GetStreamReader is called if Config.StreamParse is set. + GetStreamReader func() (*streamReader, error) + // PushData is called for pushing collected data. PushData func(wr *prompbmarshal.WriteRequest) @@ -221,6 +227,15 @@ var ( ) func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error { + if *streamParse || sw.Config.StreamParse { + // Read data from scrape targets in streaming manner. + // This case is optimized for targets exposing millions and more of metrics per target. + return sw.scrapeStream(scrapeTimestamp, realTimestamp) + } + + // Common case: read all the data from scrape target to memory (body) and then process it. + // This case should work more optimally for than stream parse code above for common case when scrape target exposes + // up to a few thouthand metrics. body := leveledbytebufferpool.Get(sw.prevBodyLen) var err error body.B, err = sw.ReadData(body.B[:0]) @@ -281,6 +296,66 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error return err } +func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { + sr, err := sw.GetStreamReader() + if err != nil { + return fmt.Errorf("cannot read data: %s", err) + } + samplesScraped := 0 + samplesPostRelabeling := 0 + wc := writeRequestCtxPool.Get(sw.prevRowsLen) + var mu sync.Mutex + err = parser.ParseStream(sr, scrapeTimestamp, false, func(rows []parser.Row) error { + mu.Lock() + defer mu.Unlock() + samplesScraped += len(rows) + for i := range rows { + sw.addRowToTimeseries(wc, &rows[i], scrapeTimestamp, true) + if len(wc.labels) > 40000 { + // Limit the maximum size of wc.writeRequest. + // This should reduce memory usage when scraping targets with millions of metrics and/or labels. + // For example, when scraping /federate handler from Prometheus - see https://prometheus.io/docs/prometheus/latest/federation/ + samplesPostRelabeling += len(wc.writeRequest.Timeseries) + sw.updateSeriesAdded(wc) + startTime := time.Now() + sw.PushData(&wc.writeRequest) + pushDataDuration.UpdateDuration(startTime) + wc.resetNoRows() + } + } + return nil + }) + scrapedSamples.Update(float64(samplesScraped)) + endTimestamp := time.Now().UnixNano() / 1e6 + duration := float64(endTimestamp-realTimestamp) / 1e3 + scrapeDuration.Update(duration) + scrapeResponseSize.Update(float64(sr.bytesRead)) + sr.MustClose() + up := 1 + if err != nil { + if samplesScraped == 0 { + up = 0 + } + scrapesFailed.Inc() + } + samplesPostRelabeling += len(wc.writeRequest.Timeseries) + sw.updateSeriesAdded(wc) + seriesAdded := sw.finalizeSeriesAdded(samplesPostRelabeling) + sw.addAutoTimeseries(wc, "up", float64(up), scrapeTimestamp) + sw.addAutoTimeseries(wc, "scrape_duration_seconds", duration, scrapeTimestamp) + sw.addAutoTimeseries(wc, "scrape_samples_scraped", float64(samplesScraped), scrapeTimestamp) + sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp) + sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp) + startTime := time.Now() + sw.PushData(&wc.writeRequest) + pushDataDuration.UpdateDuration(startTime) + sw.prevRowsLen = len(wc.rows.Rows) + wc.reset() + writeRequestCtxPool.Put(wc) + tsmGlobal.Update(&sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), err) + return nil +} + // leveledWriteRequestCtxPool allows reducing memory usage when writeRequesCtx // structs contain mixed number of labels. // diff --git a/lib/promscrape/statconn.go b/lib/promscrape/statconn.go index 10d45f4aa0..420b8b624f 100644 --- a/lib/promscrape/statconn.go +++ b/lib/promscrape/statconn.go @@ -1,14 +1,48 @@ package promscrape import ( + "context" "net" + "sync" "sync/atomic" + "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/fasthttp" "github.com/VictoriaMetrics/metrics" ) +func statStdDial(ctx context.Context, network, addr string) (net.Conn, error) { + d := getStdDialer() + conn, err := d.DialContext(ctx, network, addr) + dialsTotal.Inc() + if err != nil { + dialErrors.Inc() + return nil, err + } + conns.Inc() + sc := &statConn{ + Conn: conn, + } + return sc, nil +} + +func getStdDialer() *net.Dialer { + stdDialerOnce.Do(func() { + stdDialer = &net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + DualStack: netutil.TCP6Enabled(), + } + }) + return stdDialer +} + +var ( + stdDialer *net.Dialer + stdDialerOnce sync.Once +) + func statDial(addr string) (conn net.Conn, err error) { if netutil.TCP6Enabled() { conn, err = fasthttp.DialDualStack(addr) diff --git a/lib/promscrape/targetstatus.go b/lib/promscrape/targetstatus.go index 03777a2449..48240368c5 100644 --- a/lib/promscrape/targetstatus.go +++ b/lib/promscrape/targetstatus.go @@ -6,6 +6,10 @@ import ( "sort" "sync" "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" ) var tsmGlobal = newTargetStatusMap() @@ -15,6 +19,26 @@ func WriteHumanReadableTargetsStatus(w io.Writer, showOriginalLabels bool) { tsmGlobal.WriteHumanReadable(w, showOriginalLabels) } +// WriteAPIV1Targets writes /api/v1/targets to w according to https://prometheus.io/docs/prometheus/latest/querying/api/#targets +func WriteAPIV1Targets(w io.Writer, state string) { + if state == "" { + state = "any" + } + fmt.Fprintf(w, `{"status":"success","data":{"activeTargets":`) + if state == "active" || state == "any" { + tsmGlobal.WriteActiveTargetsJSON(w) + } else { + fmt.Fprintf(w, `[]`) + } + fmt.Fprintf(w, `,"droppedTargets":`) + if state == "dropped" || state == "any" { + droppedTargetsMap.WriteDroppedTargetsJSON(w) + } else { + fmt.Fprintf(w, `[]`) + } + fmt.Fprintf(w, `}}`) +} + type targetStatusMap struct { mu sync.Mutex m map[uint64]targetStatus @@ -73,6 +97,66 @@ func (tsm *targetStatusMap) StatusByGroup(group string, up bool) int { return count } +// WriteActiveTargetsJSON writes `activeTargets` contents to w according to https://prometheus.io/docs/prometheus/latest/querying/api/#targets +func (tsm *targetStatusMap) WriteActiveTargetsJSON(w io.Writer) { + tsm.mu.Lock() + type keyStatus struct { + key string + st targetStatus + } + kss := make([]keyStatus, 0, len(tsm.m)) + for _, st := range tsm.m { + key := promLabelsString(st.sw.OriginalLabels) + kss = append(kss, keyStatus{ + key: key, + st: st, + }) + } + tsm.mu.Unlock() + + sort.Slice(kss, func(i, j int) bool { + return kss[i].key < kss[j].key + }) + fmt.Fprintf(w, `[`) + for i, ks := range kss { + st := ks.st + fmt.Fprintf(w, `{"discoveredLabels":`) + writeLabelsJSON(w, st.sw.OriginalLabels) + fmt.Fprintf(w, `,"labels":`) + labelsFinalized := promrelabel.FinalizeLabels(nil, st.sw.Labels) + writeLabelsJSON(w, labelsFinalized) + fmt.Fprintf(w, `,"scrapePool":%q`, st.sw.Job()) + fmt.Fprintf(w, `,"scrapeUrl":%q`, st.sw.ScrapeURL) + errMsg := "" + if st.err != nil { + errMsg = st.err.Error() + } + fmt.Fprintf(w, `,"lastError":%q`, errMsg) + fmt.Fprintf(w, `,"lastScrape":%q`, time.Unix(st.scrapeTime/1000, (st.scrapeTime%1000)*1e6).Format(time.RFC3339Nano)) + fmt.Fprintf(w, `,"lastScrapeDuration":%g`, (time.Millisecond * time.Duration(st.scrapeDuration)).Seconds()) + state := "up" + if !st.up { + state = "down" + } + fmt.Fprintf(w, `,"health":%q}`, state) + if i+1 < len(kss) { + fmt.Fprintf(w, `,`) + } + } + fmt.Fprintf(w, `]`) +} + +func writeLabelsJSON(w io.Writer, labels []prompbmarshal.Label) { + fmt.Fprintf(w, `{`) + for i, label := range labels { + fmt.Fprintf(w, "%q:%q", label.Name, label.Value) + if i+1 < len(labels) { + fmt.Fprintf(w, `,`) + } + } + fmt.Fprintf(w, `}`) +} + func (tsm *targetStatusMap) WriteHumanReadable(w io.Writer, showOriginalLabels bool) { byJob := make(map[string][]targetStatus) tsm.mu.Lock() @@ -143,3 +227,69 @@ type targetStatus struct { func (st *targetStatus) getDurationFromLastScrape() time.Duration { return time.Since(time.Unix(st.scrapeTime/1000, (st.scrapeTime%1000)*1e6)) } + +type droppedTargets struct { + mu sync.Mutex + m map[string]droppedTarget + lastCleanupTime uint64 +} + +type droppedTarget struct { + originalLabels []prompbmarshal.Label + deadline uint64 +} + +func (dt *droppedTargets) Register(originalLabels []prompbmarshal.Label) { + key := promLabelsString(originalLabels) + currentTime := fasttime.UnixTimestamp() + dt.mu.Lock() + dt.m[key] = droppedTarget{ + originalLabels: originalLabels, + deadline: currentTime + 10*60, + } + if currentTime-dt.lastCleanupTime > 60 { + for k, v := range dt.m { + if currentTime > v.deadline { + delete(dt.m, k) + } + } + dt.lastCleanupTime = currentTime + } + dt.mu.Unlock() +} + +// WriteDroppedTargetsJSON writes `droppedTargets` contents to w according to https://prometheus.io/docs/prometheus/latest/querying/api/#targets +func (dt *droppedTargets) WriteDroppedTargetsJSON(w io.Writer) { + dt.mu.Lock() + type keyStatus struct { + key string + originalLabels []prompbmarshal.Label + } + kss := make([]keyStatus, 0, len(dt.m)) + for _, v := range dt.m { + key := promLabelsString(v.originalLabels) + kss = append(kss, keyStatus{ + key: key, + originalLabels: v.originalLabels, + }) + } + dt.mu.Unlock() + + sort.Slice(kss, func(i, j int) bool { + return kss[i].key < kss[j].key + }) + fmt.Fprintf(w, `[`) + for i, ks := range kss { + fmt.Fprintf(w, `{"discoveredLabels":`) + writeLabelsJSON(w, ks.originalLabels) + fmt.Fprintf(w, `}`) + if i+1 < len(kss) { + fmt.Fprintf(w, `,`) + } + } + fmt.Fprintf(w, `]`) +} + +var droppedTargetsMap = &droppedTargets{ + m: make(map[string]droppedTarget), +} diff --git a/lib/protoparser/csvimport/streamparser.go b/lib/protoparser/csvimport/streamparser.go index 247526b8bd..027ecc4589 100644 --- a/lib/protoparser/csvimport/streamparser.go +++ b/lib/protoparser/csvimport/streamparser.go @@ -23,7 +23,8 @@ var ( // ParseStream parses csv from req and calls callback for the parsed rows. // -// The callback can be called multiple times for streamed data from req. +// The callback can be called concurrently multiple times for streamed data from req. +// The callback can be called after ParseStream returns. // // callback shouldn't hold rows after returning. func ParseStream(req *http.Request, callback func(rows []Row) error) error { diff --git a/lib/protoparser/graphite/streamparser.go b/lib/protoparser/graphite/streamparser.go index 2eea19bd5a..7fa734f9f1 100644 --- a/lib/protoparser/graphite/streamparser.go +++ b/lib/protoparser/graphite/streamparser.go @@ -23,7 +23,8 @@ var ( // ParseStream parses Graphite lines from r and calls callback for the parsed rows. // -// The callback can be called multiple times for streamed data from r. +// The callback can be called concurrently multiple times for streamed data from r. +// The callback can be called after ParseStream returns. // // callback shouldn't hold rows after returning. func ParseStream(r io.Reader, callback func(rows []Row) error) error { diff --git a/lib/protoparser/influx/streamparser.go b/lib/protoparser/influx/streamparser.go index 408d0a3445..d302e3ef01 100644 --- a/lib/protoparser/influx/streamparser.go +++ b/lib/protoparser/influx/streamparser.go @@ -24,7 +24,8 @@ var ( // ParseStream parses r with the given args and calls callback for the parsed rows. // -// The callback can be called multiple times for streamed data from r. +// The callback can be called concurrently multiple times for streamed data from r. +// The callback can be called after ParseStream returns. // // callback shouldn't hold rows after returning. func ParseStream(r io.Reader, isGzipped bool, precision, db string, callback func(db string, rows []Row) error) error { diff --git a/lib/protoparser/native/streamparser.go b/lib/protoparser/native/streamparser.go index 16af474e61..0e87d87879 100644 --- a/lib/protoparser/native/streamparser.go +++ b/lib/protoparser/native/streamparser.go @@ -17,7 +17,8 @@ import ( // ParseStream parses /api/v1/import/native lines from req and calls callback for parsed blocks. // -// The callback can be called multiple times for streamed data from req. +// The callback can be called concurrently multiple times for streamed data from req. +// The callback can be called after ParseStream returns. // // callback shouldn't hold block after returning. // callback can be called in parallel from multiple concurrent goroutines. diff --git a/lib/protoparser/opentsdb/streamparser.go b/lib/protoparser/opentsdb/streamparser.go index cf2b4f89eb..752f1cd5e3 100644 --- a/lib/protoparser/opentsdb/streamparser.go +++ b/lib/protoparser/opentsdb/streamparser.go @@ -23,7 +23,8 @@ var ( // ParseStream parses OpenTSDB lines from r and calls callback for the parsed rows. // -// The callback can be called multiple times for streamed data from r. +// The callback can be called concurrently multiple times for streamed data from r. +// The callback can be called after ParseStream returns. // // callback shouldn't hold rows after returning. func ParseStream(r io.Reader, callback func(rows []Row) error) error { diff --git a/lib/protoparser/opentsdbhttp/streamparser.go b/lib/protoparser/opentsdbhttp/streamparser.go index 9a7ffaa5f3..5a1f9012e7 100644 --- a/lib/protoparser/opentsdbhttp/streamparser.go +++ b/lib/protoparser/opentsdbhttp/streamparser.go @@ -26,7 +26,8 @@ var ( // ParseStream parses OpenTSDB http lines from req and calls callback for the parsed rows. // -// The callback can be called multiple times for streamed data from req. +// The callback can be called concurrently multiple times for streamed data from req. +// The callback can be called after ParseStream returns. // // callback shouldn't hold rows after returning. func ParseStream(req *http.Request, callback func(rows []Row) error) error { diff --git a/lib/protoparser/prometheus/streamparser.go b/lib/protoparser/prometheus/streamparser.go index cfee948cbd..8c915c9fd3 100644 --- a/lib/protoparser/prometheus/streamparser.go +++ b/lib/protoparser/prometheus/streamparser.go @@ -16,7 +16,8 @@ import ( // ParseStream parses lines with Prometheus exposition format from r and calls callback for the parsed rows. // -// The callback can be called multiple times for streamed data from r. +// The callback can be called concurrently multiple times for streamed data from r. +// It is guaranteed that the callback isn't called after ParseStream returns. // // callback shouldn't hold rows after returning. func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback func(rows []Row) error) error { @@ -32,11 +33,17 @@ func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback f defer putStreamContext(ctx) for ctx.Read() { uw := getUnmarshalWork() - uw.callback = callback + uw.callback = func(rows []Row) error { + err := callback(rows) + ctx.wg.Done() + return err + } uw.defaultTimestamp = defaultTimestamp uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf + ctx.wg.Add(1) common.ScheduleUnmarshalWork(uw) } + ctx.wg.Wait() // wait for all the outstanding callback calls before returning return ctx.Error() } @@ -61,6 +68,8 @@ type streamContext struct { reqBuf []byte tailBuf []byte err error + + wg sync.WaitGroup } func (ctx *streamContext) Error() error { diff --git a/lib/protoparser/promremotewrite/streamparser.go b/lib/protoparser/promremotewrite/streamparser.go index 4312310d62..0ce5632ce5 100644 --- a/lib/protoparser/promremotewrite/streamparser.go +++ b/lib/protoparser/promremotewrite/streamparser.go @@ -21,6 +21,9 @@ var maxInsertRequestSize = flagutil.NewBytes("maxInsertRequestSize", 32*1024*102 // ParseStream parses Prometheus remote_write message req and calls callback for the parsed timeseries. // +// The callback can be called concurrently multiple times for streamed data from req. +// The callback can be called after ParseStream returns. +// // callback shouldn't hold tss after returning. func ParseStream(req *http.Request, callback func(tss []prompb.TimeSeries) error) error { ctx := getPushCtx(req.Body) diff --git a/lib/protoparser/vmimport/streamparser.go b/lib/protoparser/vmimport/streamparser.go index 915ac18e8e..66ad6c3221 100644 --- a/lib/protoparser/vmimport/streamparser.go +++ b/lib/protoparser/vmimport/streamparser.go @@ -20,10 +20,10 @@ var maxLineLen = flagutil.NewBytes("import.maxLineLen", 100*1024*1024, "The maxi // ParseStream parses /api/v1/import lines from req and calls callback for the parsed rows. // -// The callback can be called multiple times for streamed data from req. +// The callback can be called concurrently multiple times for streamed data from req. +// The callback can be called after ParseStream returns. // // callback shouldn't hold rows after returning. -// callback is called from multiple concurrent goroutines. func ParseStream(req *http.Request, callback func(rows []Row) error) error { r := req.Body if req.Header.Get("Content-Encoding") == "gzip" { diff --git a/lib/storage/block.go b/lib/storage/block.go index ec0b3fdeb9..c446c370e0 100644 --- a/lib/storage/block.go +++ b/lib/storage/block.go @@ -23,7 +23,7 @@ const ( type Block struct { bh blockHeader - // nextIdx is the next row index for timestamps and values. + // nextIdx is the next index for reading timestamps and values. nextIdx int timestamps []int64 diff --git a/lib/storage/merge.go b/lib/storage/merge.go index 6f3463f916..0fc16d434d 100644 --- a/lib/storage/merge.go +++ b/lib/storage/merge.go @@ -15,12 +15,12 @@ import ( // // rowsMerged is atomically updated with the number of merged rows during the merge. func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{}, - dmis *uint64set.Set, rowsMerged, rowsDeleted *uint64) error { + dmis *uint64set.Set, retentionDeadline int64, rowsMerged, rowsDeleted *uint64) error { ph.Reset() bsm := bsmPool.Get().(*blockStreamMerger) bsm.Init(bsrs) - err := mergeBlockStreamsInternal(ph, bsw, bsm, stopCh, dmis, rowsMerged, rowsDeleted) + err := mergeBlockStreamsInternal(ph, bsw, bsm, stopCh, dmis, retentionDeadline, rowsMerged, rowsDeleted) bsm.reset() bsmPool.Put(bsm) bsw.MustClose() @@ -39,29 +39,10 @@ var bsmPool = &sync.Pool{ var errForciblyStopped = fmt.Errorf("forcibly stopped") func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *blockStreamMerger, stopCh <-chan struct{}, - dmis *uint64set.Set, rowsMerged, rowsDeleted *uint64) error { - // Search for the first block to merge - var pendingBlock *Block - for bsm.NextBlock() { - select { - case <-stopCh: - return errForciblyStopped - default: - } - if dmis.Has(bsm.Block.bh.TSID.MetricID) { - // Skip blocks for deleted metrics. - *rowsDeleted += uint64(bsm.Block.bh.RowsCount) - continue - } - pendingBlock = getBlock() - pendingBlock.CopyFrom(bsm.Block) - break - } - if pendingBlock != nil { - defer putBlock(pendingBlock) - } - - // Merge blocks. + dmis *uint64set.Set, retentionDeadline int64, rowsMerged, rowsDeleted *uint64) error { + pendingBlockIsEmpty := true + pendingBlock := getBlock() + defer putBlock(pendingBlock) tmpBlock := getBlock() defer putBlock(tmpBlock) for bsm.NextBlock() { @@ -75,6 +56,17 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc *rowsDeleted += uint64(bsm.Block.bh.RowsCount) continue } + if bsm.Block.bh.MaxTimestamp < retentionDeadline { + // Skip blocks out of the given retention. + *rowsDeleted += uint64(bsm.Block.bh.RowsCount) + continue + } + if pendingBlockIsEmpty { + // Load the next block if pendingBlock is empty. + pendingBlock.CopyFrom(bsm.Block) + pendingBlockIsEmpty = false + continue + } // Verify whether pendingBlock may be merged with bsm.Block (the current block). if pendingBlock.bh.TSID.MetricID != bsm.Block.bh.TSID.MetricID { @@ -104,16 +96,20 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc tmpBlock.bh.TSID = bsm.Block.bh.TSID tmpBlock.bh.Scale = bsm.Block.bh.Scale tmpBlock.bh.PrecisionBits = minUint8(pendingBlock.bh.PrecisionBits, bsm.Block.bh.PrecisionBits) - mergeBlocks(tmpBlock, pendingBlock, bsm.Block) + mergeBlocks(tmpBlock, pendingBlock, bsm.Block, retentionDeadline, rowsDeleted) if len(tmpBlock.timestamps) <= maxRowsPerBlock { // More entries may be added to tmpBlock. Swap it with pendingBlock, // so more entries may be added to pendingBlock on the next iteration. - tmpBlock.fixupTimestamps() + if len(tmpBlock.timestamps) > 0 { + tmpBlock.fixupTimestamps() + } else { + pendingBlockIsEmpty = true + } pendingBlock, tmpBlock = tmpBlock, pendingBlock continue } - // Write the first len(maxRowsPerBlock) of tmpBlock.timestamps to bsw, + // Write the first maxRowsPerBlock of tmpBlock.timestamps to bsw, // leave the rest in pendingBlock. tmpBlock.nextIdx = maxRowsPerBlock pendingBlock.CopyFrom(tmpBlock) @@ -127,18 +123,21 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc if err := bsm.Error(); err != nil { return fmt.Errorf("cannot read block to be merged: %w", err) } - if pendingBlock != nil { + if !pendingBlockIsEmpty { bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged) } return nil } // mergeBlocks merges ib1 and ib2 to ob. -func mergeBlocks(ob, ib1, ib2 *Block) { +func mergeBlocks(ob, ib1, ib2 *Block, retentionDeadline int64, rowsDeleted *uint64) { ib1.assertMergeable(ib2) ib1.assertUnmarshaled() ib2.assertUnmarshaled() + skipSamplesOutsideRetention(ib1, retentionDeadline, rowsDeleted) + skipSamplesOutsideRetention(ib2, retentionDeadline, rowsDeleted) + if ib1.bh.MaxTimestamp < ib2.bh.MinTimestamp { // Fast path - ib1 values have smaller timestamps than ib2 values. appendRows(ob, ib1) @@ -176,6 +175,16 @@ func mergeBlocks(ob, ib1, ib2 *Block) { } } +func skipSamplesOutsideRetention(b *Block, retentionDeadline int64, rowsDeleted *uint64) { + timestamps := b.timestamps + nextIdx := b.nextIdx + for nextIdx < len(timestamps) && timestamps[nextIdx] < retentionDeadline { + nextIdx++ + } + *rowsDeleted += uint64(nextIdx - b.nextIdx) + b.nextIdx = nextIdx +} + func appendRows(ob, ib *Block) { ob.timestamps = append(ob.timestamps, ib.timestamps[ib.nextIdx:]...) ob.values = append(ob.values, ib.values[ib.nextIdx:]...) @@ -189,7 +198,7 @@ func unmarshalAndCalibrateScale(b1, b2 *Block) error { return err } - scale := decimal.CalibrateScale(b1.values, b1.bh.Scale, b2.values, b2.bh.Scale) + scale := decimal.CalibrateScale(b1.values[b1.nextIdx:], b1.bh.Scale, b2.values[b2.nextIdx:], b2.bh.Scale) b1.bh.Scale = scale b2.bh.Scale = scale return nil diff --git a/lib/storage/merge_test.go b/lib/storage/merge_test.go index 215af357da..dd12ccaffa 100644 --- a/lib/storage/merge_test.go +++ b/lib/storage/merge_test.go @@ -365,7 +365,7 @@ func TestMergeForciblyStop(t *testing.T) { ch := make(chan struct{}) var rowsMerged, rowsDeleted uint64 close(ch) - if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, ch, nil, &rowsMerged, &rowsDeleted); !errors.Is(err, errForciblyStopped) { + if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, ch, nil, 0, &rowsMerged, &rowsDeleted); !errors.Is(err, errForciblyStopped) { t.Fatalf("unexpected error in mergeBlockStreams: got %v; want %v", err, errForciblyStopped) } if rowsMerged != 0 { @@ -385,7 +385,7 @@ func testMergeBlockStreams(t *testing.T, bsrs []*blockStreamReader, expectedBloc bsw.InitFromInmemoryPart(&mp) var rowsMerged, rowsDeleted uint64 - if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, nil, nil, &rowsMerged, &rowsDeleted); err != nil { + if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, nil, nil, 0, &rowsMerged, &rowsDeleted); err != nil { t.Fatalf("unexpected error in mergeBlockStreams: %s", err) } diff --git a/lib/storage/merge_timing_test.go b/lib/storage/merge_timing_test.go index 73a03ea297..38d2f5ba6e 100644 --- a/lib/storage/merge_timing_test.go +++ b/lib/storage/merge_timing_test.go @@ -41,7 +41,7 @@ func benchmarkMergeBlockStreams(b *testing.B, mps []*inmemoryPart, rowsPerLoop i } mpOut.Reset() bsw.InitFromInmemoryPart(&mpOut) - if err := mergeBlockStreams(&mpOut.ph, &bsw, bsrs, nil, nil, &rowsMerged, &rowsDeleted); err != nil { + if err := mergeBlockStreams(&mpOut.ph, &bsw, bsrs, nil, nil, 0, &rowsMerged, &rowsDeleted); err != nil { panic(fmt.Errorf("cannot merge block streams: %w", err)) } } diff --git a/lib/storage/metric_name.go b/lib/storage/metric_name.go index 88e71aa050..f620bd3f2a 100644 --- a/lib/storage/metric_name.go +++ b/lib/storage/metric_name.go @@ -196,6 +196,10 @@ func (mn *MetricName) CopyFrom(src *MetricName) { // AddTag adds new tag to mn with the given key and value. func (mn *MetricName) AddTag(key, value string) { + if key == string(metricGroupTagKey) { + mn.MetricGroup = append(mn.MetricGroup, value...) + return + } tag := mn.addNextTag() tag.Key = append(tag.Key[:0], key...) tag.Value = append(tag.Value[:0], value...) @@ -203,6 +207,10 @@ func (mn *MetricName) AddTag(key, value string) { // AddTagBytes adds new tag to mn with the given key and value. func (mn *MetricName) AddTagBytes(key, value []byte) { + if string(key) == string(metricGroupTagKey) { + mn.MetricGroup = append(mn.MetricGroup, value...) + return + } tag := mn.addNextTag() tag.Key = append(tag.Key[:0], key...) tag.Value = append(tag.Value[:0], value...) diff --git a/lib/storage/partition.go b/lib/storage/partition.go index f159c0b988..ad5aaee2e7 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -134,6 +134,10 @@ type partition struct { // The callack that returns deleted metric ids which must be skipped during merge. getDeletedMetricIDs func() *uint64set.Set + // data retention in milliseconds. + // Used for deleting data outside the retention during background merge. + retentionMsecs int64 + // Name is the name of the partition in the form YYYY_MM. name string @@ -206,7 +210,7 @@ func (pw *partWrapper) decRef() { // createPartition creates new partition for the given timestamp and the given paths // to small and big partitions. -func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() *uint64set.Set) (*partition, error) { +func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64) (*partition, error) { name := timestampToPartitionName(timestamp) smallPartsPath := filepath.Clean(smallPartitionsPath) + "/" + name bigPartsPath := filepath.Clean(bigPartitionsPath) + "/" + name @@ -219,7 +223,7 @@ func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath str return nil, fmt.Errorf("cannot create directories for big parts %q: %w", bigPartsPath, err) } - pt := newPartition(name, smallPartsPath, bigPartsPath, getDeletedMetricIDs) + pt := newPartition(name, smallPartsPath, bigPartsPath, getDeletedMetricIDs, retentionMsecs) pt.tr.fromPartitionTimestamp(timestamp) pt.startMergeWorkers() pt.startRawRowsFlusher() @@ -241,7 +245,7 @@ func (pt *partition) Drop() { } // openPartition opens the existing partition from the given paths. -func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set) (*partition, error) { +func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64) (*partition, error) { smallPartsPath = filepath.Clean(smallPartsPath) bigPartsPath = filepath.Clean(bigPartsPath) @@ -265,7 +269,7 @@ func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func return nil, fmt.Errorf("cannot open big parts from %q: %w", bigPartsPath, err) } - pt := newPartition(name, smallPartsPath, bigPartsPath, getDeletedMetricIDs) + pt := newPartition(name, smallPartsPath, bigPartsPath, getDeletedMetricIDs, retentionMsecs) pt.smallParts = smallParts pt.bigParts = bigParts if err := pt.tr.fromPartitionName(name); err != nil { @@ -278,13 +282,14 @@ func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func return pt, nil } -func newPartition(name, smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set) *partition { +func newPartition(name, smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64) *partition { p := &partition{ name: name, smallPartsPath: smallPartsPath, bigPartsPath: bigPartsPath, getDeletedMetricIDs: getDeletedMetricIDs, + retentionMsecs: retentionMsecs, mergeIdx: uint64(time.Now().UnixNano()), stopCh: make(chan struct{}), @@ -1129,7 +1134,8 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro atomic.AddUint64(&pt.smallMergesCount, 1) atomic.AddUint64(&pt.activeSmallMerges, 1) } - err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, dmis, rowsMerged, rowsDeleted) + retentionDeadline := timestampFromTime(startTime) - pt.retentionMsecs + err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, dmis, retentionDeadline, rowsMerged, rowsDeleted) if isBigPart { atomic.AddUint64(&pt.activeBigMerges, ^uint64(0)) } else { diff --git a/lib/storage/partition_search_test.go b/lib/storage/partition_search_test.go index 9342a9cbce..42c90fe839 100644 --- a/lib/storage/partition_search_test.go +++ b/lib/storage/partition_search_test.go @@ -167,7 +167,8 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma }) // Create partition from rowss and test search on it. - pt, err := createPartition(ptt, "./small-table", "./big-table", nilGetDeletedMetricIDs) + retentionMsecs := timestampFromTime(time.Now()) - ptr.MinTimestamp + 3600*1000 + pt, err := createPartition(ptt, "./small-table", "./big-table", nilGetDeletedMetricIDs, retentionMsecs) if err != nil { t.Fatalf("cannot create partition: %s", err) } @@ -191,7 +192,7 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma pt.MustClose() // Open the created partition and test search on it. - pt, err = openPartition(smallPartsPath, bigPartsPath, nilGetDeletedMetricIDs) + pt, err = openPartition(smallPartsPath, bigPartsPath, nilGetDeletedMetricIDs, retentionMsecs) if err != nil { t.Fatalf("cannot open partition: %s", err) } diff --git a/lib/storage/storage.go b/lib/storage/storage.go index dae3606563..a4106901c4 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -27,7 +27,10 @@ import ( "github.com/VictoriaMetrics/fastcache" ) -const maxRetentionMonths = 12 * 100 +const ( + msecsPerMonth = 31 * 24 * 3600 * 1000 + maxRetentionMsecs = 100 * 12 * msecsPerMonth +) // Storage represents TSDB storage. type Storage struct { @@ -47,9 +50,9 @@ type Storage struct { slowPerDayIndexInserts uint64 slowMetricNameLoads uint64 - path string - cachePath string - retentionMonths int + path string + cachePath string + retentionMsecs int64 // lock file for exclusive access to the storage on the given path. flockF *os.File @@ -106,23 +109,19 @@ type Storage struct { snapshotLock sync.Mutex } -// OpenStorage opens storage on the given path with the given number of retention months. -func OpenStorage(path string, retentionMonths int) (*Storage, error) { - if retentionMonths > maxRetentionMonths { - return nil, fmt.Errorf("too big retentionMonths=%d; cannot exceed %d", retentionMonths, maxRetentionMonths) - } - if retentionMonths <= 0 { - retentionMonths = maxRetentionMonths - } +// OpenStorage opens storage on the given path with the given retentionMsecs. +func OpenStorage(path string, retentionMsecs int64) (*Storage, error) { path, err := filepath.Abs(path) if err != nil { return nil, fmt.Errorf("cannot determine absolute path for %q: %w", path, err) } - + if retentionMsecs <= 0 { + retentionMsecs = maxRetentionMsecs + } s := &Storage{ - path: path, - cachePath: path + "/cache", - retentionMonths: retentionMonths, + path: path, + cachePath: path + "/cache", + retentionMsecs: retentionMsecs, stop: make(chan struct{}), } @@ -178,7 +177,7 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) { // Load data tablePath := path + "/data" - tb, err := openTable(tablePath, retentionMonths, s.getDeletedMetricIDs) + tb, err := openTable(tablePath, s.getDeletedMetricIDs, retentionMsecs) if err != nil { s.idb().MustClose() return nil, fmt.Errorf("cannot open table at %q: %w", tablePath, err) @@ -473,8 +472,9 @@ func (s *Storage) startRetentionWatcher() { } func (s *Storage) retentionWatcher() { + retentionMonths := int((s.retentionMsecs + (msecsPerMonth - 1)) / msecsPerMonth) for { - d := nextRetentionDuration(s.retentionMonths) + d := nextRetentionDuration(retentionMonths) select { case <-s.stop: return diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index 7e41c0fc85..f3ae424863 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -353,8 +353,8 @@ func TestStorageOpenMultipleTimes(t *testing.T) { func TestStorageRandTimestamps(t *testing.T) { path := "TestStorageRandTimestamps" - retentionMonths := 60 - s, err := OpenStorage(path, retentionMonths) + retentionMsecs := int64(60 * msecsPerMonth) + s, err := OpenStorage(path, retentionMsecs) if err != nil { t.Fatalf("cannot open storage: %s", err) } @@ -364,7 +364,7 @@ func TestStorageRandTimestamps(t *testing.T) { t.Fatal(err) } s.MustClose() - s, err = OpenStorage(path, retentionMonths) + s, err = OpenStorage(path, retentionMsecs) } }) t.Run("concurrent", func(t *testing.T) { diff --git a/lib/storage/table.go b/lib/storage/table.go index ed27803617..faea8c1a08 100644 --- a/lib/storage/table.go +++ b/lib/storage/table.go @@ -22,6 +22,7 @@ type table struct { bigPartitionsPath string getDeletedMetricIDs func() *uint64set.Set + retentionMsecs int64 ptws []*partitionWrapper ptwsLock sync.Mutex @@ -30,8 +31,7 @@ type table struct { stop chan struct{} - retentionMilliseconds int64 - retentionWatcherWG sync.WaitGroup + retentionWatcherWG sync.WaitGroup } // partitionWrapper provides refcounting mechanism for the partition. @@ -77,12 +77,12 @@ func (ptw *partitionWrapper) scheduleToDrop() { atomic.AddUint64(&ptw.mustDrop, 1) } -// openTable opens a table on the given path with the given retentionMonths. +// openTable opens a table on the given path with the given retentionMsecs. // // The table is created if it doesn't exist. // -// Data older than the retentionMonths may be dropped at any time. -func openTable(path string, retentionMonths int, getDeletedMetricIDs func() *uint64set.Set) (*table, error) { +// Data older than the retentionMsecs may be dropped at any time. +func openTable(path string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64) (*table, error) { path = filepath.Clean(path) // Create a directory for the table if it doesn't exist yet. @@ -115,7 +115,7 @@ func openTable(path string, retentionMonths int, getDeletedMetricIDs func() *uin } // Open partitions. - pts, err := openPartitions(smallPartitionsPath, bigPartitionsPath, getDeletedMetricIDs) + pts, err := openPartitions(smallPartitionsPath, bigPartitionsPath, getDeletedMetricIDs, retentionMsecs) if err != nil { return nil, fmt.Errorf("cannot open partitions in the table %q: %w", path, err) } @@ -125,6 +125,7 @@ func openTable(path string, retentionMonths int, getDeletedMetricIDs func() *uin smallPartitionsPath: smallPartitionsPath, bigPartitionsPath: bigPartitionsPath, getDeletedMetricIDs: getDeletedMetricIDs, + retentionMsecs: retentionMsecs, flockF: flockF, @@ -133,11 +134,6 @@ func openTable(path string, retentionMonths int, getDeletedMetricIDs func() *uin for _, pt := range pts { tb.addPartitionNolock(pt) } - if retentionMonths <= 0 || retentionMonths > maxRetentionMonths { - retentionMonths = maxRetentionMonths - } - tb.retentionMilliseconds = int64(retentionMonths) * 31 * 24 * 3600 * 1e3 - tb.startRetentionWatcher() return tb, nil } @@ -357,7 +353,7 @@ func (tb *table) AddRows(rows []rawRow) error { continue } - pt, err := createPartition(r.Timestamp, tb.smallPartitionsPath, tb.bigPartitionsPath, tb.getDeletedMetricIDs) + pt, err := createPartition(r.Timestamp, tb.smallPartitionsPath, tb.bigPartitionsPath, tb.getDeletedMetricIDs, tb.retentionMsecs) if err != nil { errors = append(errors, err) continue @@ -376,7 +372,7 @@ func (tb *table) AddRows(rows []rawRow) error { func (tb *table) getMinMaxTimestamps() (int64, int64) { now := int64(fasttime.UnixTimestamp() * 1000) - minTimestamp := now - tb.retentionMilliseconds + minTimestamp := now - tb.retentionMsecs maxTimestamp := now + 2*24*3600*1000 // allow max +2 days from now due to timezones shit :) if minTimestamp < 0 { // Negative timestamps aren't supported by the storage. @@ -406,7 +402,7 @@ func (tb *table) retentionWatcher() { case <-ticker.C: } - minTimestamp := int64(fasttime.UnixTimestamp()*1000) - tb.retentionMilliseconds + minTimestamp := int64(fasttime.UnixTimestamp()*1000) - tb.retentionMsecs var ptwsDrop []*partitionWrapper tb.ptwsLock.Lock() dst := tb.ptws[:0] @@ -457,7 +453,7 @@ func (tb *table) PutPartitions(ptws []*partitionWrapper) { } } -func openPartitions(smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() *uint64set.Set) ([]*partition, error) { +func openPartitions(smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64) ([]*partition, error) { // Certain partition directories in either `big` or `small` dir may be missing // after restoring from backup. So populate partition names from both dirs. ptNames := make(map[string]bool) @@ -471,7 +467,7 @@ func openPartitions(smallPartitionsPath, bigPartitionsPath string, getDeletedMet for ptName := range ptNames { smallPartsPath := smallPartitionsPath + "/" + ptName bigPartsPath := bigPartitionsPath + "/" + ptName - pt, err := openPartition(smallPartsPath, bigPartsPath, getDeletedMetricIDs) + pt, err := openPartition(smallPartsPath, bigPartsPath, getDeletedMetricIDs, retentionMsecs) if err != nil { mustClosePartitions(pts) return nil, fmt.Errorf("cannot open partition %q: %w", ptName, err) diff --git a/lib/storage/table_search.go b/lib/storage/table_search.go index 50b5ea35b8..f81dc2136f 100644 --- a/lib/storage/table_search.go +++ b/lib/storage/table_search.go @@ -66,7 +66,7 @@ func (ts *tableSearch) Init(tb *table, tsids []TSID, tr TimeRange) { // Adjust tr.MinTimestamp, so it doesn't obtain data older // than the tb retention. now := int64(fasttime.UnixTimestamp() * 1000) - minTimestamp := now - tb.retentionMilliseconds + minTimestamp := now - tb.retentionMsecs if tr.MinTimestamp < minTimestamp { tr.MinTimestamp = minTimestamp } diff --git a/lib/storage/table_search_test.go b/lib/storage/table_search_test.go index d2f81e813e..cda14c8b82 100644 --- a/lib/storage/table_search_test.go +++ b/lib/storage/table_search_test.go @@ -181,7 +181,7 @@ func testTableSearchEx(t *testing.T, trData, trSearch TimeRange, partitionsCount }) // Create a table from rowss and test search on it. - tb, err := openTable("./test-table", -1, nilGetDeletedMetricIDs) + tb, err := openTable("./test-table", nilGetDeletedMetricIDs, maxRetentionMsecs) if err != nil { t.Fatalf("cannot create table: %s", err) } @@ -202,7 +202,7 @@ func testTableSearchEx(t *testing.T, trData, trSearch TimeRange, partitionsCount tb.MustClose() // Open the created table and test search on it. - tb, err = openTable("./test-table", -1, nilGetDeletedMetricIDs) + tb, err = openTable("./test-table", nilGetDeletedMetricIDs, maxRetentionMsecs) if err != nil { t.Fatalf("cannot open table: %s", err) } diff --git a/lib/storage/table_search_timing_test.go b/lib/storage/table_search_timing_test.go index 42b450681b..c45c9b334b 100644 --- a/lib/storage/table_search_timing_test.go +++ b/lib/storage/table_search_timing_test.go @@ -47,7 +47,7 @@ func openBenchTable(b *testing.B, startTimestamp int64, rowsPerInsert, rowsCount createBenchTable(b, path, startTimestamp, rowsPerInsert, rowsCount, tsidsCount) createdBenchTables[path] = true } - tb, err := openTable(path, -1, nilGetDeletedMetricIDs) + tb, err := openTable(path, nilGetDeletedMetricIDs, maxRetentionMsecs) if err != nil { b.Fatalf("cnanot open table %q: %s", path, err) } @@ -70,7 +70,7 @@ var createdBenchTables = make(map[string]bool) func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerInsert, rowsCount, tsidsCount int) { b.Helper() - tb, err := openTable(path, -1, nilGetDeletedMetricIDs) + tb, err := openTable(path, nilGetDeletedMetricIDs, maxRetentionMsecs) if err != nil { b.Fatalf("cannot open table %q: %s", path, err) } diff --git a/lib/storage/table_test.go b/lib/storage/table_test.go index daeef21ee9..68a76dbc9d 100644 --- a/lib/storage/table_test.go +++ b/lib/storage/table_test.go @@ -7,7 +7,7 @@ import ( func TestTableOpenClose(t *testing.T) { const path = "TestTableOpenClose" - const retentionMonths = 123 + const retentionMsecs = 123 * msecsPerMonth if err := os.RemoveAll(path); err != nil { t.Fatalf("cannot remove %q: %s", path, err) @@ -17,7 +17,7 @@ func TestTableOpenClose(t *testing.T) { }() // Create a new table - tb, err := openTable(path, retentionMonths, nilGetDeletedMetricIDs) + tb, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs) if err != nil { t.Fatalf("cannot create new table: %s", err) } @@ -27,7 +27,7 @@ func TestTableOpenClose(t *testing.T) { // Re-open created table multiple times. for i := 0; i < 10; i++ { - tb, err := openTable(path, retentionMonths, nilGetDeletedMetricIDs) + tb, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs) if err != nil { t.Fatalf("cannot open created table: %s", err) } @@ -37,20 +37,20 @@ func TestTableOpenClose(t *testing.T) { func TestTableOpenMultipleTimes(t *testing.T) { const path = "TestTableOpenMultipleTimes" - const retentionMonths = 123 + const retentionMsecs = 123 * msecsPerMonth defer func() { _ = os.RemoveAll(path) }() - tb1, err := openTable(path, retentionMonths, nilGetDeletedMetricIDs) + tb1, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs) if err != nil { t.Fatalf("cannot open table the first time: %s", err) } defer tb1.MustClose() for i := 0; i < 10; i++ { - tb2, err := openTable(path, retentionMonths, nilGetDeletedMetricIDs) + tb2, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs) if err == nil { tb2.MustClose() t.Fatalf("expecting non-nil error when opening already opened table") diff --git a/lib/storage/table_timing_test.go b/lib/storage/table_timing_test.go index 2a20add410..ed1ca8bdec 100644 --- a/lib/storage/table_timing_test.go +++ b/lib/storage/table_timing_test.go @@ -45,7 +45,7 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) { b.SetBytes(int64(rowsCountExpected)) tablePath := "./benchmarkTableAddRows" for i := 0; i < b.N; i++ { - tb, err := openTable(tablePath, -1, nilGetDeletedMetricIDs) + tb, err := openTable(tablePath, nilGetDeletedMetricIDs, maxRetentionMsecs) if err != nil { b.Fatalf("cannot open table %q: %s", tablePath, err) } @@ -93,7 +93,7 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) { tb.MustClose() // Open the table from files and verify the rows count on it - tb, err = openTable(tablePath, -1, nilGetDeletedMetricIDs) + tb, err = openTable(tablePath, nilGetDeletedMetricIDs, maxRetentionMsecs) if err != nil { b.Fatalf("cannot open table %q: %s", tablePath, err) } diff --git a/ports/OpenBSD/README.md b/ports/OpenBSD/README.md new file mode 100644 index 0000000000..9c98b3f9cb --- /dev/null +++ b/ports/OpenBSD/README.md @@ -0,0 +1,11 @@ +# OpenBSD ports + +Tested with Release 6.7 + +The VictoriaMetrics DB must be place in `/usr/ports/sysutils` directory +and the file `/usr/ports/infrastructure/db/user.list` +should be modified with a new line +``` +855 _vmetrics _vmetrics sysutils/VictoriaMetrics +``` + diff --git a/ports/OpenBSD/VictoriaMetrics/Makefile b/ports/OpenBSD/VictoriaMetrics/Makefile new file mode 100644 index 0000000000..57e4103d4d --- /dev/null +++ b/ports/OpenBSD/VictoriaMetrics/Makefile @@ -0,0 +1,38 @@ +# $OpenBSD$ + +COMMENT = fast, cost-effective and scalable time series database + +GH_ACCOUNT = VictoriaMetrics +GH_PROJECT = VictoriaMetrics +GH_TAGNAME = v1.44.0 + +CATEGORIES = sysutils + +HOMEPAGE = https://victoriametrics.com/ + +MAINTAINER = VictoriaMetrics + +# Apache License 2.0 +PERMIT_PACKAGE = Yes + +WANTLIB = c pthread + +USE_GMAKE = Yes + +MODULES= lang/go +MODGO_GOPATH= ${MODGO_WORKSPACE} + +do-build: + cd ${WRKSRC} && GOOS=openbsd ${MAKE_ENV} ${MAKE_PROGRAM} victoria-metrics-pure + cd ${WRKSRC} && GOOS=openbsd ${MAKE_ENV} ${MAKE_PROGRAM} vmbackup + +do-install: + ${INSTALL_PROGRAM} ./pkg/vmlogger.pl ${PREFIX}/bin/vmetricslogger.pl + ${INSTALL_PROGRAM} ${WRKSRC}/bin/victoria-metrics-pure ${PREFIX}/bin/vmetrics + ${INSTALL_PROGRAM} ${WRKSRC}/bin/vmbackup ${PREFIX}/bin/vmetricsbackup + ${INSTALL_DATA_DIR} ${PREFIX}/share/doc/vmetrics/ + ${INSTALL_DATA} ${WRKSRC}/README.md ${PREFIX}/share/doc/vmetrics/ + ${INSTALL_DATA} ${WRKSRC}/LICENSE ${PREFIX}/share/doc/vmetrics/ + ${INSTALL_DATA} ${WRKSRC}/docs/* ${PREFIX}/share/doc/vmetrics/ + +.include diff --git a/ports/OpenBSD/VictoriaMetrics/distinfo b/ports/OpenBSD/VictoriaMetrics/distinfo new file mode 100644 index 0000000000..b1910b2a02 --- /dev/null +++ b/ports/OpenBSD/VictoriaMetrics/distinfo @@ -0,0 +1,2 @@ +SHA256 (VictoriaMetrics-1.44.0.tar.gz) = OIXIyqiijWvAPDgq5wMoDpv1rENcIOWIcXmz4T5v1lU= +SIZE (VictoriaMetrics-1.44.0.tar.gz) = 8898365 diff --git a/ports/OpenBSD/VictoriaMetrics/pkg/DESCR b/ports/OpenBSD/VictoriaMetrics/pkg/DESCR new file mode 100644 index 0000000000..653a93e4fa --- /dev/null +++ b/ports/OpenBSD/VictoriaMetrics/pkg/DESCR @@ -0,0 +1,3 @@ +VictoriaMetrics is fast, +cost-effective and scalable time-series database. + diff --git a/ports/OpenBSD/VictoriaMetrics/pkg/PLIST b/ports/OpenBSD/VictoriaMetrics/pkg/PLIST new file mode 100644 index 0000000000..1f5ea2639c --- /dev/null +++ b/ports/OpenBSD/VictoriaMetrics/pkg/PLIST @@ -0,0 +1,34 @@ +@comment $OpenBSD$ +@newgroup _vmetrics:855 +@newuser _vmetrics:855:_vmetrics:daemon:VictoriaMetrics:${VARBASE}/db/vmetrics:/sbin/nologin +@sample ${SYSCONFDIR}/prometheus/ +@rcscript ${RCDIR}/vmetrics +@bin bin/vmetricslogger.pl +@bin bin/vmetrics +@bin bin/vmetricsbackup +share/doc/vmetrics/ +share/doc/vmetrics/Articles.md +share/doc/vmetrics/CaseStudies.md +share/doc/vmetrics/Cluster-VictoriaMetrics.md +share/doc/vmetrics/ExtendedPromQL.md +share/doc/vmetrics/FAQ.md +share/doc/vmetrics/Home.md +share/doc/vmetrics/LICENSE +share/doc/vmetrics/MetricsQL.md +share/doc/vmetrics/Quick-Start.md +share/doc/vmetrics/README.md +share/doc/vmetrics/Release-Guide.md +share/doc/vmetrics/SampleSizeCalculations.md +share/doc/vmetrics/Single-server-VictoriaMetrics.md +share/doc/vmetrics/logo.png +share/doc/vmetrics/robots.txt +share/doc/vmetrics/vmagent.md +share/doc/vmetrics/vmagent.png +share/doc/vmetrics/vmalert.md +share/doc/vmetrics/vmauth.md +share/doc/vmetrics/vmbackup.md +share/doc/vmetrics/vmrestore.md +@mode 0755 +@owner _vmetrics +@group _vmetrics +@sample ${VARBASE}/db/vmetrics diff --git a/ports/OpenBSD/VictoriaMetrics/pkg/vmetrics.rc b/ports/OpenBSD/VictoriaMetrics/pkg/vmetrics.rc new file mode 100644 index 0000000000..d9cdfe6ee0 --- /dev/null +++ b/ports/OpenBSD/VictoriaMetrics/pkg/vmetrics.rc @@ -0,0 +1,19 @@ +#!/bin/sh +# +# $OpenBSD$ + +daemon="${TRUEPREFIX}/bin/vmetrics" +daemon_flags="-storageDataPath=/var/db/vmetrics/ ${daemon_flags}" +daemon_user=_vmetrics + +. /etc/rc.d/rc.subr + +pexp="${daemon}.*" +rc_bg=YES +rc_reload=NO + +rc_start() { + ${rcexec} "${daemon} -loggerDisableTimestamps ${daemon_flags} < /dev/null 2>&1 | ${TRUEPREFIX}/bin/vmetricslogger.pl" +} + +rc_cmd $1 diff --git a/ports/OpenBSD/VictoriaMetrics/pkg/vmlogger.pl b/ports/OpenBSD/VictoriaMetrics/pkg/vmlogger.pl new file mode 100644 index 0000000000..02b6b87463 --- /dev/null +++ b/ports/OpenBSD/VictoriaMetrics/pkg/vmlogger.pl @@ -0,0 +1,18 @@ +#!/usr/bin/perl +use Sys::Syslog qw(:standard :macros); + +openlog("victoria-metrics", "pid", "daemon"); + +while (my $l = <>) { + my @d = split /\t/, $l; + # go level : "INFO", "WARN", "ERROR", "FATAL", "PANIC": + my $lvl = $d[0]; + $lvl = LOG_EMERG if ($lvl eq 'panic'); + $lvl = 'crit' if ($lvl eq 'fatal'); + $lvl = 'err' if ($lvl eq 'error'); + $lvl = 'warning' if ($lvl eq 'warn'); + chomp $d[2]; + syslog( $lvl, $d[2] ); +} + +closelog();