diff --git a/app/vmagent/remotewrite/client.go b/app/vmagent/remotewrite/client.go index 0493758b8..d265ac0ff 100644 --- a/app/vmagent/remotewrite/client.go +++ b/app/vmagent/remotewrite/client.go @@ -92,9 +92,9 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste } tlsCfg := authCfg.NewTLSConfig() tr := &http.Transport{ - Dial: statDial, + DialContext: statDial, TLSClientConfig: tlsCfg, - TLSHandshakeTimeout: 5 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, MaxConnsPerHost: 2 * concurrency, MaxIdleConnsPerHost: 2 * concurrency, IdleConnTimeout: time.Minute, diff --git a/app/vmagent/remotewrite/statconn.go b/app/vmagent/remotewrite/statconn.go index 5d597e9bf..787aa19f5 100644 --- a/app/vmagent/remotewrite/statconn.go +++ b/app/vmagent/remotewrite/statconn.go @@ -1,7 +1,9 @@ package remotewrite import ( + "context" "net" + "sync" "sync/atomic" "time" @@ -9,9 +11,26 @@ import ( "github.com/VictoriaMetrics/metrics" ) -func statDial(networkUnused, addr string) (conn net.Conn, err error) { +func getStdDialer() *net.Dialer { + stdDialerOnce.Do(func() { + stdDialer = &net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + DualStack: netutil.TCP6Enabled(), + } + }) + return stdDialer +} + +var ( + stdDialer *net.Dialer + stdDialerOnce sync.Once +) + +func statDial(ctx context.Context, networkUnused, addr string) (conn net.Conn, err error) { network := netutil.GetTCPNetwork() - conn, err = net.DialTimeout(network, addr, 5*time.Second) + d := getStdDialer() + conn, err = d.DialContext(ctx, network, addr) dialsTotal.Inc() if err != nil { dialErrors.Inc() diff --git a/app/vmalert/alerting.go b/app/vmalert/alerting.go index a5b929f86..0f819cbd6 100644 --- a/app/vmalert/alerting.go +++ b/app/vmalert/alerting.go @@ -141,6 +141,53 @@ func (ar *AlertingRule) ID() uint64 { return ar.RuleID } +type labelSet struct { + // origin labels from series + // used for templating + origin map[string]string + // processed labels with additional data + // used as Alert labels + processed map[string]string +} + +// toLabels converts labels from given Metric +// to labelSet which contains original and processed labels. +func (ar *AlertingRule) toLabels(m datasource.Metric, qFn notifier.QueryFn) (*labelSet, error) { + ls := &labelSet{ + origin: make(map[string]string, len(m.Labels)), + processed: make(map[string]string), + } + for _, l := range m.Labels { + // drop __name__ to be consistent with Prometheus alerting + if l.Name == "__name__" { + continue + } + ls.origin[l.Name] = l.Value + ls.processed[l.Name] = l.Value + } + + extraLabels, err := notifier.ExecTemplate(qFn, ar.Labels, notifier.AlertTplData{ + Labels: ls.origin, + Value: m.Values[0], + Expr: ar.Expr, + }) + if err != nil { + return nil, fmt.Errorf("failed to expand labels: %s", err) + } + for k, v := range extraLabels { + ls.processed[k] = v + } + + // set additional labels to identify group and rule name + if ar.Name != "" { + ls.processed[alertNameLabel] = ar.Name + } + if !*disableAlertGroupLabel && ar.GroupName != "" { + ls.processed[alertGroupNameLabel] = ar.GroupName + } + return ls, nil +} + // ExecRange executes alerting rule on the given time range similarly to Exec. // It doesn't update internal states of the Rule and meant to be used just // to get time series for backfilling. @@ -155,24 +202,7 @@ func (ar *AlertingRule) ExecRange(ctx context.Context, start, end time.Time) ([] return nil, fmt.Errorf("`query` template isn't supported in replay mode") } for _, s := range series { - // set additional labels to identify group and rule Name - if ar.Name != "" { - s.SetLabel(alertNameLabel, ar.Name) - } - if !*disableAlertGroupLabel && ar.GroupName != "" { - s.SetLabel(alertGroupNameLabel, ar.GroupName) - } - // extra labels could contain templates, so we expand them first - labels, err := expandLabels(s, qFn, ar) - if err != nil { - return nil, fmt.Errorf("failed to expand labels: %s", err) - } - for k, v := range labels { - // apply extra labels to datasource - // so the hash key will be consistent on restore - s.SetLabel(k, v) - } - a, err := ar.newAlert(s, time.Time{}, qFn) // initial alert + a, err := ar.newAlert(s, nil, time.Time{}, qFn) // initial alert if err != nil { return nil, fmt.Errorf("failed to create alert: %s", err) } @@ -234,28 +264,15 @@ func (ar *AlertingRule) Exec(ctx context.Context, ts time.Time) ([]prompbmarshal updated := make(map[uint64]struct{}) // update list of active alerts for _, m := range qMetrics { - // set additional labels to identify group and rule name - if ar.Name != "" { - m.SetLabel(alertNameLabel, ar.Name) - } - if !*disableAlertGroupLabel && ar.GroupName != "" { - m.SetLabel(alertGroupNameLabel, ar.GroupName) - } - // extra labels could contain templates, so we expand them first - labels, err := expandLabels(m, qFn, ar) + ls, err := ar.toLabels(m, qFn) if err != nil { return nil, fmt.Errorf("failed to expand labels: %s", err) } - for k, v := range labels { - // apply extra labels to datasource - // so the hash key will be consistent on restore - m.SetLabel(k, v) - } - h := hash(m) + h := hash(ls.processed) if _, ok := updated[h]; ok { // duplicate may be caused by extra labels // conflicting with the metric labels - ar.lastExecError = fmt.Errorf("labels %v: %w", m.Labels, errDuplicate) + ar.lastExecError = fmt.Errorf("labels %v: %w", ls.processed, errDuplicate) return nil, ar.lastExecError } updated[h] = struct{}{} @@ -272,14 +289,14 @@ func (ar *AlertingRule) Exec(ctx context.Context, ts time.Time) ([]prompbmarshal a.Value = m.Values[0] // and re-exec template since Value can be used // in annotations - a.Annotations, err = a.ExecTemplate(qFn, ar.Annotations) + a.Annotations, err = a.ExecTemplate(qFn, ls.origin, ar.Annotations) if err != nil { return nil, err } } continue } - a, err := ar.newAlert(m, ar.lastExecTime, qFn) + a, err := ar.newAlert(m, ls, ar.lastExecTime, qFn) if err != nil { ar.lastExecError = err return nil, fmt.Errorf("failed to create alert: %w", err) @@ -315,19 +332,6 @@ func (ar *AlertingRule) Exec(ctx context.Context, ts time.Time) ([]prompbmarshal return ar.toTimeSeries(ts.Unix()), nil } -func expandLabels(m datasource.Metric, q notifier.QueryFn, ar *AlertingRule) (map[string]string, error) { - metricLabels := make(map[string]string) - for _, l := range m.Labels { - metricLabels[l.Name] = l.Value - } - tpl := notifier.AlertTplData{ - Labels: metricLabels, - Value: m.Values[0], - Expr: ar.Expr, - } - return notifier.ExecTemplate(q, ar.Labels, tpl) -} - func (ar *AlertingRule) toTimeSeries(timestamp int64) []prompbmarshal.TimeSeries { var tss []prompbmarshal.TimeSeries for _, a := range ar.alerts { @@ -358,42 +362,43 @@ func (ar *AlertingRule) UpdateWith(r Rule) error { } // TODO: consider hashing algorithm in VM -func hash(m datasource.Metric) uint64 { +func hash(labels map[string]string) uint64 { hash := fnv.New64a() - labels := m.Labels - sort.Slice(labels, func(i, j int) bool { - return labels[i].Name < labels[j].Name - }) - for _, l := range labels { + keys := make([]string, 0, len(labels)) + for k := range labels { + keys = append(keys, k) + } + sort.Strings(keys) + for _, k := range keys { // drop __name__ to be consistent with Prometheus alerting - if l.Name == "__name__" { + if k == "__name__" { continue } - hash.Write([]byte(l.Name)) - hash.Write([]byte(l.Value)) + name, value := k, labels[k] + hash.Write([]byte(name)) + hash.Write([]byte(value)) hash.Write([]byte("\xff")) } return hash.Sum64() } -func (ar *AlertingRule) newAlert(m datasource.Metric, start time.Time, qFn notifier.QueryFn) (*notifier.Alert, error) { +func (ar *AlertingRule) newAlert(m datasource.Metric, ls *labelSet, start time.Time, qFn notifier.QueryFn) (*notifier.Alert, error) { + var err error + if ls == nil { + ls, err = ar.toLabels(m, qFn) + if err != nil { + return nil, fmt.Errorf("failed to expand labels: %s", err) + } + } a := ¬ifier.Alert{ GroupID: ar.GroupID, Name: ar.Name, - Labels: map[string]string{}, + Labels: ls.processed, Value: m.Values[0], ActiveAt: start, Expr: ar.Expr, } - for _, l := range m.Labels { - // drop __name__ to be consistent with Prometheus alerting - if l.Name == "__name__" { - continue - } - a.Labels[l.Name] = l.Value - } - var err error - a.Annotations, err = a.ExecTemplate(qFn, ar.Annotations) + a.Annotations, err = a.ExecTemplate(qFn, ls.origin, ar.Annotations) return a, err } @@ -560,15 +565,26 @@ func (ar *AlertingRule) Restore(ctx context.Context, q datasource.Querier, lookb } for _, m := range qMetrics { - a, err := ar.newAlert(m, time.Unix(int64(m.Values[0]), 0), qFn) + ls := &labelSet{ + origin: make(map[string]string, len(m.Labels)), + processed: make(map[string]string, len(m.Labels)), + } + for _, l := range m.Labels { + if l.Name == "__name__" { + continue + } + ls.origin[l.Name] = l.Value + ls.processed[l.Name] = l.Value + } + a, err := ar.newAlert(m, ls, time.Unix(int64(m.Values[0]), 0), qFn) if err != nil { return fmt.Errorf("failed to create alert: %w", err) } - a.ID = hash(m) + a.ID = hash(ls.processed) a.State = notifier.StatePending a.Restored = true ar.alerts[a.ID] = a - logger.Infof("alert %q (%d) restored to state at %v", a.Name, a.ID, a.Start) + logger.Infof("alert %q (%d) restored to state at %v", a.Name, a.ID, a.ActiveAt) } return nil } diff --git a/app/vmalert/alerting_test.go b/app/vmalert/alerting_test.go index edca8254a..6bbd2dff6 100644 --- a/app/vmalert/alerting_test.go +++ b/app/vmalert/alerting_test.go @@ -315,10 +315,13 @@ func TestAlertingRule_Exec(t *testing.T) { } expAlerts := make(map[uint64]*notifier.Alert) for _, ta := range tc.expAlerts { - labels := ta.labels - labels = append(labels, alertNameLabel) - labels = append(labels, tc.rule.Name) - h := hash(metricWithLabels(t, labels...)) + labels := make(map[string]string) + for i := 0; i < len(ta.labels); i += 2 { + k, v := ta.labels[i], ta.labels[i+1] + labels[k] = v + } + labels[alertNameLabel] = tc.rule.Name + h := hash(labels) expAlerts[h] = ta.alert } for key, exp := range expAlerts { @@ -513,7 +516,7 @@ func TestAlertingRule_Restore(t *testing.T) { ), }, map[uint64]*notifier.Alert{ - hash(datasource.Metric{}): {State: notifier.StatePending, + hash(nil): {State: notifier.StatePending, ActiveAt: time.Now().Truncate(time.Hour)}, }, }, @@ -529,12 +532,12 @@ func TestAlertingRule_Restore(t *testing.T) { ), }, map[uint64]*notifier.Alert{ - hash(metricWithLabels(t, - alertNameLabel, "metric labels", - alertGroupNameLabel, "groupID", - "foo", "bar", - "namespace", "baz", - )): {State: notifier.StatePending, + hash(map[string]string{ + alertNameLabel: "metric labels", + alertGroupNameLabel: "groupID", + "foo": "bar", + "namespace": "baz", + }): {State: notifier.StatePending, ActiveAt: time.Now().Truncate(time.Hour)}, }, }, @@ -550,11 +553,11 @@ func TestAlertingRule_Restore(t *testing.T) { ), }, map[uint64]*notifier.Alert{ - hash(metricWithLabels(t, - "foo", "bar", - "namespace", "baz", - "source", "vm", - )): {State: notifier.StatePending, + hash(map[string]string{ + "foo": "bar", + "namespace": "baz", + "source": "vm", + }): {State: notifier.StatePending, ActiveAt: time.Now().Truncate(time.Hour)}, }, }, @@ -575,11 +578,11 @@ func TestAlertingRule_Restore(t *testing.T) { ), }, map[uint64]*notifier.Alert{ - hash(metricWithLabels(t, "host", "localhost-1")): {State: notifier.StatePending, + hash(map[string]string{"host": "localhost-1"}): {State: notifier.StatePending, ActiveAt: time.Now().Truncate(time.Hour)}, - hash(metricWithLabels(t, "host", "localhost-2")): {State: notifier.StatePending, + hash(map[string]string{"host": "localhost-2"}): {State: notifier.StatePending, ActiveAt: time.Now().Truncate(2 * time.Hour)}, - hash(metricWithLabels(t, "host", "localhost-3")): {State: notifier.StatePending, + hash(map[string]string{"host": "localhost-3"}): {State: notifier.StatePending, ActiveAt: time.Now().Truncate(3 * time.Hour)}, }, }, @@ -659,7 +662,7 @@ func TestAlertingRule_Template(t *testing.T) { metricWithValueAndLabels(t, 1, "instance", "bar"), }, map[uint64]*notifier.Alert{ - hash(metricWithLabels(t, alertNameLabel, "common", "region", "east", "instance", "foo")): { + hash(map[string]string{alertNameLabel: "common", "region": "east", "instance": "foo"}): { Annotations: map[string]string{}, Labels: map[string]string{ alertNameLabel: "common", @@ -667,7 +670,7 @@ func TestAlertingRule_Template(t *testing.T) { "instance": "foo", }, }, - hash(metricWithLabels(t, alertNameLabel, "common", "region", "east", "instance", "bar")): { + hash(map[string]string{alertNameLabel: "common", "region": "east", "instance": "bar"}): { Annotations: map[string]string{}, Labels: map[string]string{ alertNameLabel: "common", @@ -682,11 +685,10 @@ func TestAlertingRule_Template(t *testing.T) { Name: "override label", Labels: map[string]string{ "instance": "{{ $labels.instance }}", - "region": "east", }, Annotations: map[string]string{ - "summary": `Too high connection number for "{{ $labels.instance }}" for region {{ $labels.region }}`, - "description": `It is {{ $value }} connections for "{{ $labels.instance }}"`, + "summary": `Too high connection number for "{{ $labels.instance }}"`, + "description": `{{ $labels.alertname}}: It is {{ $value }} connections for "{{ $labels.instance }}"`, }, alerts: make(map[uint64]*notifier.Alert), }, @@ -695,64 +697,58 @@ func TestAlertingRule_Template(t *testing.T) { metricWithValueAndLabels(t, 10, "instance", "bar", alertNameLabel, "override"), }, map[uint64]*notifier.Alert{ - hash(metricWithLabels(t, alertNameLabel, "override label", "region", "east", "instance", "foo")): { + hash(map[string]string{alertNameLabel: "override label", "instance": "foo"}): { Labels: map[string]string{ alertNameLabel: "override label", "instance": "foo", - "region": "east", }, Annotations: map[string]string{ - "summary": `Too high connection number for "foo" for region east`, - "description": `It is 2 connections for "foo"`, + "summary": `Too high connection number for "foo"`, + "description": `override: It is 2 connections for "foo"`, }, }, - hash(metricWithLabels(t, alertNameLabel, "override label", "region", "east", "instance", "bar")): { + hash(map[string]string{alertNameLabel: "override label", "instance": "bar"}): { Labels: map[string]string{ alertNameLabel: "override label", "instance": "bar", - "region": "east", }, Annotations: map[string]string{ - "summary": `Too high connection number for "bar" for region east`, - "description": `It is 10 connections for "bar"`, + "summary": `Too high connection number for "bar"`, + "description": `override: It is 10 connections for "bar"`, }, }, }, }, { &AlertingRule{ - Name: "ExtraTemplating", + Name: "OriginLabels", GroupName: "Testing", Labels: map[string]string{ - "name": "alert_{{ $labels.alertname }}", - "group": "group_{{ $labels.alertgroup }}", "instance": "{{ $labels.instance }}", }, Annotations: map[string]string{ - "summary": `Alert "{{ $labels.alertname }}({{ $labels.alertgroup }})" for instance {{ $labels.instance }}`, - "description": `Alert "{{ $labels.name }}({{ $labels.group }})" for instance {{ $labels.instance }}`, + "summary": `Alert "{{ $labels.alertname }}({{ $labels.alertgroup }})" for instance {{ $labels.instance }}`, }, alerts: make(map[uint64]*notifier.Alert), }, []datasource.Metric{ - metricWithValueAndLabels(t, 1, "instance", "foo"), + metricWithValueAndLabels(t, 1, + alertNameLabel, "originAlertname", + alertGroupNameLabel, "originGroupname", + "instance", "foo"), }, map[uint64]*notifier.Alert{ - hash(metricWithLabels(t, alertNameLabel, "ExtraTemplating", - "name", "alert_ExtraTemplating", - alertGroupNameLabel, "Testing", - "group", "group_Testing", - "instance", "foo")): { + hash(map[string]string{ + alertNameLabel: "OriginLabels", + alertGroupNameLabel: "Testing", + "instance": "foo"}): { Labels: map[string]string{ - alertNameLabel: "ExtraTemplating", - "name": "alert_ExtraTemplating", + alertNameLabel: "OriginLabels", alertGroupNameLabel: "Testing", - "group": "group_Testing", "instance": "foo", }, Annotations: map[string]string{ - "summary": `Alert "ExtraTemplating(Testing)" for instance foo`, - "description": `Alert "alert_ExtraTemplating(group_Testing)" for instance foo`, + "summary": `Alert "originAlertname(originGroupname)" for instance foo`, }, }, }, diff --git a/app/vmalert/group_test.go b/app/vmalert/group_test.go index d94838b60..8322e65d0 100644 --- a/app/vmalert/group_test.go +++ b/app/vmalert/group_test.go @@ -174,7 +174,7 @@ func TestGroupStart(t *testing.T) { m2 := metricWithLabels(t, "instance", inst2, "job", job) r := g.Rules[0].(*AlertingRule) - alert1, err := r.newAlert(m1, time.Now(), nil) + alert1, err := r.newAlert(m1, nil, time.Now(), nil) if err != nil { t.Fatalf("faield to create alert: %s", err) } @@ -187,13 +187,9 @@ func TestGroupStart(t *testing.T) { // add service labels alert1.Labels[alertNameLabel] = alert1.Name alert1.Labels[alertGroupNameLabel] = g.Name - var labels1 []string - for k, v := range alert1.Labels { - labels1 = append(labels1, k, v) - } - alert1.ID = hash(metricWithLabels(t, labels1...)) + alert1.ID = hash(alert1.Labels) - alert2, err := r.newAlert(m2, time.Now(), nil) + alert2, err := r.newAlert(m2, nil, time.Now(), nil) if err != nil { t.Fatalf("faield to create alert: %s", err) } @@ -206,11 +202,7 @@ func TestGroupStart(t *testing.T) { // add service labels alert2.Labels[alertNameLabel] = alert2.Name alert2.Labels[alertGroupNameLabel] = g.Name - var labels2 []string - for k, v := range alert2.Labels { - labels2 = append(labels2, k, v) - } - alert2.ID = hash(metricWithLabels(t, labels2...)) + alert2.ID = hash(alert2.Labels) finished := make(chan struct{}) fs.add(m1) diff --git a/app/vmalert/main.go b/app/vmalert/main.go index c98650329..9366ad350 100644 --- a/app/vmalert/main.go +++ b/app/vmalert/main.go @@ -243,7 +243,7 @@ func getAlertURLGenerator(externalURL *url.URL, externalAlertSource string, vali "tpl": externalAlertSource, } return func(alert notifier.Alert) string { - templated, err := alert.ExecTemplate(nil, m) + templated, err := alert.ExecTemplate(nil, nil, m) if err != nil { logger.Errorf("can not exec source template %s", err) } diff --git a/app/vmalert/manager.go b/app/vmalert/manager.go index 7152fd258..3ab70c6f4 100644 --- a/app/vmalert/manager.go +++ b/app/vmalert/manager.go @@ -37,7 +37,7 @@ func (m *manager) AlertAPI(gID, aID uint64) (*APIAlert, error) { g, ok := m.groups[gID] if !ok { - return nil, fmt.Errorf("can't find group with id %q", gID) + return nil, fmt.Errorf("can't find group with id %d", gID) } for _, rule := range g.Rules { ar, ok := rule.(*AlertingRule) @@ -48,7 +48,7 @@ func (m *manager) AlertAPI(gID, aID uint64) (*APIAlert, error) { return apiAlert, nil } } - return nil, fmt.Errorf("can't find alert with id %q in group %q", aID, g.Name) + return nil, fmt.Errorf("can't find alert with id %d in group %q", aID, g.Name) } func (m *manager) start(ctx context.Context, groupsCfg []config.Group) error { diff --git a/app/vmalert/notifier/alert.go b/app/vmalert/notifier/alert.go index 44b80eb4a..a1bcae7c1 100644 --- a/app/vmalert/notifier/alert.go +++ b/app/vmalert/notifier/alert.go @@ -88,8 +88,8 @@ var tplHeaders = []string{ // map of annotations. // Every alert could have a different datasource, so function // requires a queryFunction as an argument. -func (a *Alert) ExecTemplate(q QueryFn, annotations map[string]string) (map[string]string, error) { - tplData := AlertTplData{Value: a.Value, Labels: a.Labels, Expr: a.Expr} +func (a *Alert) ExecTemplate(q QueryFn, labels, annotations map[string]string) (map[string]string, error) { + tplData := AlertTplData{Value: a.Value, Labels: labels, Expr: a.Expr} return templateAnnotations(annotations, tplData, funcsWithQuery(q)) } diff --git a/app/vmalert/notifier/alert_test.go b/app/vmalert/notifier/alert_test.go index f8e0c77ee..27b83aac6 100644 --- a/app/vmalert/notifier/alert_test.go +++ b/app/vmalert/notifier/alert_test.go @@ -130,7 +130,7 @@ func TestAlert_ExecTemplate(t *testing.T) { } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - tpl, err := tc.alert.ExecTemplate(qFn, tc.annotations) + tpl, err := tc.alert.ExecTemplate(qFn, tc.alert.Labels, tc.annotations) if err != nil { t.Fatal(err) } diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 85a4b010a..cf6cdb785 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -33,6 +33,7 @@ Previously the `-search.maxUniqueTimeseries` command-line flag was used as a glo When using [cluster version of VictoriaMetrics](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html), these command-line flags (including `-search.maxUniqueTimeseries`) must be passed to `vmselect` instead of `vmstorage`. +* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html) and [vmauth](https://docs.victoriametrics.com/vmauth.html): reduce the probability of `TLS handshake error from XX.XX.XX.XX: EOF` errors when `-remoteWrite.url` points to HTTPS url at `vmauth`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1699). * BUGFIX: return `Content-Type: text/html` response header when requesting `/` HTTP path at VictoriaMetrics components. Previously `text/plain` response header was returned, which could lead to broken page formatting. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2323). * BUGFIX: [Graphite Render API](https://docs.victoriametrics.com/#graphite-render-api-usage): accept floating-point values for [maxDataPoints](https://graphite.readthedocs.io/en/stable/render_api.html#maxdatapoints) query arg, since some clients send floating-point values instead of integer values for this arg. diff --git a/docs/Release-Guide.md b/docs/Release-Guide.md index 21fa41b48..5f133cc11 100644 --- a/docs/Release-Guide.md +++ b/docs/Release-Guide.md @@ -6,6 +6,7 @@ sort: 18 ## Release version and Docker images +0. Make sure that the release commits have no security issues. 1. Document all the changes for new release in [CHANGELOG.md](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/docs/CHANGELOG.md). 2. Create the following release tags: * `git tag -s v1.xx.y` in `master` branch @@ -13,8 +14,9 @@ sort: 18 * `git tag -s v1.xx.y-enterprise` in `enterprise` branch * `git tag -s v1.xx.y-enterprise-cluster` in `enterprise-cluster` branch 3. Run `TAG=v1.xx.y make publish-release`. It will create `*.tar.gz` release archives with the corresponding `_checksums.txt` files inside `bin` directory and publish Docker images for the given `TAG`, `TAG-cluster`, `TAG-enterprise` and `TAG-enterprise-cluster`. -4. Push release tag to : `git push origin v1.xx.y`. -5. Go to , create new release from the pushed tag on step 5 and upload `*.tar.gz` archive with the corresponding `_checksums.txt` from step 2. +4. Push release tags to : `git push origin v1.xx.y` and `git push origin v1.xx.y-cluster`. Do not push `-enterprise` tags to public repository. +5. Go to , create new release from the pushed tag on step 4 and upload `*.tar.gz` archive with the corresponding `_checksums.txt` from step 3. +6. Copy the [CHANGELOG](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/docs/CHANGELOG.md) for this release to [releases](https://github.com/VictoriaMetrics/VictoriaMetrics/releases) page. ## Building snap package diff --git a/go.mod b/go.mod index 76f350911..e3bb6cef6 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.17 require ( cloud.google.com/go/storage v1.21.0 - github.com/VictoriaMetrics/fastcache v1.9.0 + github.com/VictoriaMetrics/fastcache v1.10.0 // Do not use the original github.com/valyala/fasthttp because of issues // like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b @@ -33,7 +33,7 @@ require ( github.com/valyala/quicktemplate v1.7.0 golang.org/x/net v0.0.0-20220403103023-749bd193bc2b golang.org/x/oauth2 v0.0.0-20220309155454-6242fa91716a - golang.org/x/sys v0.0.0-20220403205710-6acee93ad0eb + golang.org/x/sys v0.0.0-20220405052023-b1e9470b6e64 google.golang.org/api v0.74.0 gopkg.in/yaml.v2 v2.4.0 ) diff --git a/go.sum b/go.sum index bc2087a91..a7e67e894 100644 --- a/go.sum +++ b/go.sum @@ -113,8 +113,8 @@ github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdko github.com/SAP/go-hdb v0.14.1/go.mod h1:7fdQLVC2lER3urZLjZCm0AuMQfApof92n3aylBPEkMo= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= -github.com/VictoriaMetrics/fastcache v1.9.0 h1:oMwsS6c8abz98B7ytAewQ7M1ZN/Im/iwKoE1euaFvhs= -github.com/VictoriaMetrics/fastcache v1.9.0/go.mod h1:otoTS3xu+6IzF/qByjqzjp3rTuzM3Qf0ScU1UTj97iU= +github.com/VictoriaMetrics/fastcache v1.10.0 h1:5hDJnLsKLpnUEToub7ETuRu8RCkb40woBZAUiKonXzY= +github.com/VictoriaMetrics/fastcache v1.10.0/go.mod h1:tjiYeEfYXCqacuvYw/7UoDIeJaNxq6132xHICNP77w8= github.com/VictoriaMetrics/fasthttp v1.1.0 h1:3crd4YWHsMwu60GUXRH6OstowiFvqrwS4a/ueoLdLL0= github.com/VictoriaMetrics/fasthttp v1.1.0/go.mod h1:/7DMcogqd+aaD3G3Hg5kFgoFwlR2uydjiWvoLp5ZTqQ= github.com/VictoriaMetrics/metrics v1.18.1 h1:OZ0+kTTto8oPfHnVAnTOoyl0XlRhRkoQrD2n2cOuRw0= @@ -1315,12 +1315,11 @@ golang.org/x/sys v0.0.0-20211210111614-af8b64212486/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220328115105-d36c6a25d886/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220403205710-6acee93ad0eb h1:PVGECzEo9Y3uOidtkHGdd347NjLtITfJFO9BxFpmRoo= -golang.org/x/sys v0.0.0-20220403205710-6acee93ad0eb/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220405052023-b1e9470b6e64 h1:D1v9ucDTYBtbz5vNuBbAhIMAGhQhJ6Ym5ah3maMVNX4= +golang.org/x/sys v0.0.0-20220405052023-b1e9470b6e64/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= diff --git a/lib/cgroup/mem.go b/lib/cgroup/mem.go index 2fa34a11a..f35d3cbad 100644 --- a/lib/cgroup/mem.go +++ b/lib/cgroup/mem.go @@ -19,15 +19,17 @@ func init() { func initGOGC() { if v := os.Getenv("GOGC"); v != "" { - n, err := strconv.Atoi(v) + n, err := strconv.ParseFloat(v, 64) if err != nil { n = 100 } - gogc = n + gogc = int(n) } else { - // Set GOGC to 50% by default if it isn't set yet. - // This should reduce memory usage for typical workloads for VictoriaMetrics components. - gogc = 50 + // Use lower GOGC if it isn't set yet. + // This should reduce memory usage for typical workloads for VictoriaMetrics components + // at the cost of increased CPU usage. + // It is recommended increasing GOGC if go_memstats_gc_cpu_fraction exceeds 0.05 for extended periods of time. + gogc = 30 debug.SetGCPercent(gogc) } } diff --git a/lib/memory/memory.go b/lib/memory/memory.go index 04e4ff0c6..d403d4a48 100644 --- a/lib/memory/memory.go +++ b/lib/memory/memory.go @@ -7,6 +7,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/metrics" ) var ( @@ -14,11 +15,15 @@ var ( allowedBytes = flagutil.NewBytes("memory.allowedBytes", 0, `Allowed size of system memory VictoriaMetrics caches may occupy. This option overrides -memory.allowedPercent if set to a non-zero value. Too low a value may increase the cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from OS page cache resulting in higher disk IO usage`) ) +var _ = metrics.NewGauge("process_memory_limit_bytes", func() float64 { + return float64(memoryLimit) +}) + var ( allowedMemory int remainingMemory int + memoryLimit int ) - var once sync.Once func initOnce() { @@ -26,18 +31,18 @@ func initOnce() { // Do not use logger.Panicf here, since logger may be uninitialized yet. panic(fmt.Errorf("BUG: memory.Allowed must be called only after flag.Parse call")) } - mem := sysTotalMemory() + memoryLimit = sysTotalMemory() if allowedBytes.N <= 0 { if *allowedPercent < 1 || *allowedPercent > 200 { logger.Panicf("FATAL: -memory.allowedPercent must be in the range [1...200]; got %g", *allowedPercent) } percent := *allowedPercent / 100 - allowedMemory = int(float64(mem) * percent) - remainingMemory = mem - allowedMemory + allowedMemory = int(float64(memoryLimit) * percent) + remainingMemory = memoryLimit - allowedMemory logger.Infof("limiting caches to %d bytes, leaving %d bytes to the OS according to -memory.allowedPercent=%g", allowedMemory, remainingMemory, *allowedPercent) } else { allowedMemory = allowedBytes.N - remainingMemory = mem - allowedMemory + remainingMemory = memoryLimit - allowedMemory logger.Infof("limiting caches to %d bytes, leaving %d bytes to the OS according to -memory.allowedBytes=%s", allowedMemory, remainingMemory, allowedBytes.String()) } } diff --git a/lib/mergeset/encoding.go b/lib/mergeset/encoding.go index b7774d720..07de82286 100644 --- a/lib/mergeset/encoding.go +++ b/lib/mergeset/encoding.go @@ -28,9 +28,10 @@ type Item struct { // // The returned bytes representation belongs to data. func (it Item) Bytes(data []byte) []byte { + n := int(it.End - it.Start) sh := (*reflect.SliceHeader)(unsafe.Pointer(&data)) - sh.Cap = int(it.End - it.Start) - sh.Len = int(it.End - it.Start) + sh.Cap = n + sh.Len = n sh.Data += uintptr(it.Start) return data } @@ -48,8 +49,13 @@ func (it Item) String(data []byte) string { func (ib *inmemoryBlock) Len() int { return len(ib.items) } func (ib *inmemoryBlock) Less(i, j int) bool { - data := ib.data items := ib.items + a := items[i] + b := items[j] + cpLen := uint32(len(ib.commonPrefix)) + a.Start += cpLen + b.Start += cpLen + data := ib.data return string(items[i].Bytes(data)) < string(items[j].Bytes(data)) } @@ -59,9 +65,15 @@ func (ib *inmemoryBlock) Swap(i, j int) { } type inmemoryBlock struct { + // commonPrefix contains common prefix for all the items stored in the inmemoryBlock commonPrefix []byte - data []byte - items []Item + + // data contains source data for items + data []byte + + // items contains items stored in inmemoryBlock. + // Every item contains the prefix specified at commonPrefix. + items []Item } func (ib *inmemoryBlock) SizeBytes() int { @@ -74,17 +86,29 @@ func (ib *inmemoryBlock) Reset() { ib.items = ib.items[:0] } -func (ib *inmemoryBlock) updateCommonPrefix() { +func (ib *inmemoryBlock) updateCommonPrefixSorted() { ib.commonPrefix = ib.commonPrefix[:0] - if len(ib.items) == 0 { + items := ib.items + if len(items) == 0 { return } - items := ib.items data := ib.data cp := items[0].Bytes(data) - if len(cp) == 0 { + if len(items) > 1 { + cpLen := commonPrefixLen(cp, items[len(items)-1].Bytes(data)) + cp = cp[:cpLen] + } + ib.commonPrefix = append(ib.commonPrefix[:0], cp...) +} + +func (ib *inmemoryBlock) updateCommonPrefixUnsorted() { + ib.commonPrefix = ib.commonPrefix[:0] + items := ib.items + if len(items) == 0 { return } + data := ib.data + cp := items[0].Bytes(data) for _, it := range items[1:] { cpLen := commonPrefixLen(cp, it.Bytes(data)) if cpLen == 0 { @@ -176,9 +200,11 @@ func (ib *inmemoryBlock) isSorted() bool { // - returns the marshal type used for the encoding. func (ib *inmemoryBlock) MarshalUnsortedData(sb *storageBlock, firstItemDst, commonPrefixDst []byte, compressLevel int) ([]byte, []byte, uint32, marshalType) { if !ib.isSorted() { + ib.updateCommonPrefixUnsorted() sort.Sort(ib) + } else { + ib.updateCommonPrefixSorted() } - ib.updateCommonPrefix() return ib.marshalData(sb, firstItemDst, commonPrefixDst, compressLevel) } @@ -197,7 +223,7 @@ func (ib *inmemoryBlock) MarshalSortedData(sb *storageBlock, firstItemDst, commo if isInTest && !ib.isSorted() { logger.Panicf("BUG: %d items must be sorted; items:\n%s", len(ib.items), ib.debugItemsString()) } - ib.updateCommonPrefix() + ib.updateCommonPrefixSorted() return ib.marshalData(sb, firstItemDst, commonPrefixDst, compressLevel) } @@ -218,7 +244,7 @@ func (ib *inmemoryBlock) debugItemsString() string { // Preconditions: // - ib.items must be sorted. -// - updateCommonPrefix must be called. +// - updateCommonPrefix* must be called. func (ib *inmemoryBlock) marshalData(sb *storageBlock, firstItemDst, commonPrefixDst []byte, compressLevel int) ([]byte, []byte, uint32, marshalType) { if len(ib.items) <= 0 { logger.Panicf("BUG: inmemoryBlock.marshalData must be called on non-empty blocks only") diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index cb942d4f2..f0473c9d4 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -125,9 +125,16 @@ type rawItemsShards struct { // The number of shards for rawItems per table. // // Higher number of shards reduces CPU contention and increases the max bandwidth on multi-core systems. -var rawItemsShardsPerTable = cgroup.AvailableCPUs() +var rawItemsShardsPerTable = func() int { + cpus := cgroup.AvailableCPUs() + multiplier := cpus + if multiplier > 16 { + multiplier = 16 + } + return (cpus*multiplier + 1) / 2 +}() -const maxBlocksPerShard = 512 +const maxBlocksPerShard = 256 func (riss *rawItemsShards) init() { riss.shards = make([]rawItemsShard, rawItemsShardsPerTable) diff --git a/lib/protoparser/csvimport/streamparser.go b/lib/protoparser/csvimport/streamparser.go index 7f5d5ed19..45a86a35d 100644 --- a/lib/protoparser/csvimport/streamparser.go +++ b/lib/protoparser/csvimport/streamparser.go @@ -45,16 +45,8 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error { defer putStreamContext(ctx) for ctx.Read() { uw := getUnmarshalWork() - uw.callback = func(rows []Row) { - if err := callback(rows); err != nil { - ctx.callbackErrLock.Lock() - if ctx.callbackErr == nil { - ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err) - } - ctx.callbackErrLock.Unlock() - } - ctx.wg.Done() - } + uw.ctx = ctx + uw.callback = callback uw.cds = cds uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf ctx.wg.Add(1) @@ -153,18 +145,32 @@ var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows Rows - callback func(rows []Row) + ctx *streamContext + callback func(rows []Row) error cds []ColumnDescriptor reqBuf []byte } func (uw *unmarshalWork) reset() { uw.rows.Reset() + uw.ctx = nil uw.callback = nil uw.cds = nil uw.reqBuf = uw.reqBuf[:0] } +func (uw *unmarshalWork) runCallback(rows []Row) { + ctx := uw.ctx + if err := uw.callback(rows); err != nil { + ctx.callbackErrLock.Lock() + if ctx.callbackErr == nil { + ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err) + } + ctx.callbackErrLock.Unlock() + } + ctx.wg.Done() +} + // Unmarshal implements common.UnmarshalWork func (uw *unmarshalWork) Unmarshal() { uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf), uw.cds) @@ -188,7 +194,7 @@ func (uw *unmarshalWork) Unmarshal() { } } - uw.callback(rows) + uw.runCallback(rows) putUnmarshalWork(uw) } diff --git a/lib/protoparser/graphite/parser_test.go b/lib/protoparser/graphite/parser_test.go index e2922548a..3a29be751 100644 --- a/lib/protoparser/graphite/parser_test.go +++ b/lib/protoparser/graphite/parser_test.go @@ -324,7 +324,8 @@ func Test_streamContext_Read(t *testing.T) { } uw := getUnmarshalWork() callbackCalls := 0 - uw.callback = func(rows []Row) { + uw.ctx = ctx + uw.callback = func(rows []Row) error { callbackCalls++ if len(rows) != len(rowsExpected.Rows) { t.Fatalf("different len of expected rows;\ngot\n%+v;\nwant\n%+v", rows, rowsExpected.Rows) @@ -332,8 +333,10 @@ func Test_streamContext_Read(t *testing.T) { if !reflect.DeepEqual(rows, rowsExpected.Rows) { t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows, rowsExpected.Rows) } + return nil } uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf...) + ctx.wg.Add(1) uw.Unmarshal() if callbackCalls != 1 { t.Fatalf("unexpected number of callback calls; got %d; want 1", callbackCalls) diff --git a/lib/protoparser/graphite/streamparser.go b/lib/protoparser/graphite/streamparser.go index 010771c4d..a4207b5df 100644 --- a/lib/protoparser/graphite/streamparser.go +++ b/lib/protoparser/graphite/streamparser.go @@ -31,16 +31,8 @@ func ParseStream(r io.Reader, callback func(rows []Row) error) error { for ctx.Read() { uw := getUnmarshalWork() - uw.callback = func(rows []Row) { - if err := callback(rows); err != nil { - ctx.callbackErrLock.Lock() - if ctx.callbackErr == nil { - ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err) - } - ctx.callbackErrLock.Unlock() - } - ctx.wg.Done() - } + uw.ctx = ctx + uw.callback = callback uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf ctx.wg.Add(1) common.ScheduleUnmarshalWork(uw) @@ -138,16 +130,30 @@ var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows Rows - callback func(rows []Row) + ctx *streamContext + callback func(rows []Row) error reqBuf []byte } func (uw *unmarshalWork) reset() { uw.rows.Reset() + uw.ctx = nil uw.callback = nil uw.reqBuf = uw.reqBuf[:0] } +func (uw *unmarshalWork) runCallback(rows []Row) { + ctx := uw.ctx + if err := uw.callback(rows); err != nil { + ctx.callbackErrLock.Lock() + if ctx.callbackErr == nil { + ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err) + } + ctx.callbackErrLock.Unlock() + } + ctx.wg.Done() +} + // Unmarshal implements common.UnmarshalWork func (uw *unmarshalWork) Unmarshal() { uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf)) @@ -176,7 +182,7 @@ func (uw *unmarshalWork) Unmarshal() { } } - uw.callback(rows) + uw.runCallback(rows) putUnmarshalWork(uw) } diff --git a/lib/protoparser/influx/streamparser.go b/lib/protoparser/influx/streamparser.go index 96f865b6a..a3820f62a 100644 --- a/lib/protoparser/influx/streamparser.go +++ b/lib/protoparser/influx/streamparser.go @@ -56,16 +56,8 @@ func ParseStream(r io.Reader, isGzipped bool, precision, db string, callback fun defer putStreamContext(ctx) for ctx.Read() { uw := getUnmarshalWork() - uw.callback = func(db string, rows []Row) { - if err := callback(db, rows); err != nil { - ctx.callbackErrLock.Lock() - if ctx.callbackErr == nil { - ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err) - } - ctx.callbackErrLock.Unlock() - } - ctx.wg.Done() - } + uw.ctx = ctx + uw.callback = callback uw.db = db uw.tsMultiplier = tsMultiplier uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf @@ -165,7 +157,8 @@ var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows Rows - callback func(db string, rows []Row) + ctx *streamContext + callback func(db string, rows []Row) error db string tsMultiplier int64 reqBuf []byte @@ -173,12 +166,25 @@ type unmarshalWork struct { func (uw *unmarshalWork) reset() { uw.rows.Reset() + uw.ctx = nil uw.callback = nil uw.db = "" uw.tsMultiplier = 0 uw.reqBuf = uw.reqBuf[:0] } +func (uw *unmarshalWork) runCallback(rows []Row) { + ctx := uw.ctx + if err := uw.callback(uw.db, rows); err != nil { + ctx.callbackErrLock.Lock() + if ctx.callbackErr == nil { + ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err) + } + ctx.callbackErrLock.Unlock() + } + ctx.wg.Done() +} + // Unmarshal implements common.UnmarshalWork func (uw *unmarshalWork) Unmarshal() { uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf)) @@ -225,7 +231,7 @@ func (uw *unmarshalWork) Unmarshal() { } } - uw.callback(uw.db, rows) + uw.runCallback(rows) putUnmarshalWork(uw) } diff --git a/lib/protoparser/opentsdb/streamparser.go b/lib/protoparser/opentsdb/streamparser.go index c66ad5fca..94e7fc5b8 100644 --- a/lib/protoparser/opentsdb/streamparser.go +++ b/lib/protoparser/opentsdb/streamparser.go @@ -30,16 +30,8 @@ func ParseStream(r io.Reader, callback func(rows []Row) error) error { defer putStreamContext(ctx) for ctx.Read() { uw := getUnmarshalWork() - uw.callback = func(rows []Row) { - if err := callback(rows); err != nil { - ctx.callbackErrLock.Lock() - if ctx.callbackErr == nil { - ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err) - } - ctx.callbackErrLock.Unlock() - } - ctx.wg.Done() - } + uw.ctx = ctx + uw.callback = callback uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf ctx.wg.Add(1) common.ScheduleUnmarshalWork(uw) @@ -137,16 +129,30 @@ var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows Rows - callback func(rows []Row) + ctx *streamContext + callback func(rows []Row) error reqBuf []byte } func (uw *unmarshalWork) reset() { uw.rows.Reset() + uw.ctx = nil uw.callback = nil uw.reqBuf = uw.reqBuf[:0] } +func (uw *unmarshalWork) runCallback(rows []Row) { + ctx := uw.ctx + if err := uw.callback(rows); err != nil { + ctx.callbackErrLock.Lock() + if ctx.callbackErr == nil { + ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err) + } + ctx.callbackErrLock.Unlock() + } + ctx.wg.Done() +} + // Unmarshal implements common.UnmarshalWork func (uw *unmarshalWork) Unmarshal() { uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf)) @@ -175,7 +181,7 @@ func (uw *unmarshalWork) Unmarshal() { } } - uw.callback(rows) + uw.runCallback(rows) putUnmarshalWork(uw) } diff --git a/lib/protoparser/prometheus/streamparser.go b/lib/protoparser/prometheus/streamparser.go index ef7781950..af093f82f 100644 --- a/lib/protoparser/prometheus/streamparser.go +++ b/lib/protoparser/prometheus/streamparser.go @@ -32,16 +32,8 @@ func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback f for ctx.Read() { uw := getUnmarshalWork() uw.errLogger = errLogger - uw.callback = func(rows []Row) { - if err := callback(rows); err != nil { - ctx.callbackErrLock.Lock() - if ctx.callbackErr == nil { - ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err) - } - ctx.callbackErrLock.Unlock() - } - ctx.wg.Done() - } + uw.ctx = ctx + uw.callback = callback uw.defaultTimestamp = defaultTimestamp uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf ctx.wg.Add(1) @@ -140,7 +132,8 @@ var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows Rows - callback func(rows []Row) + ctx *streamContext + callback func(rows []Row) error errLogger func(string) defaultTimestamp int64 reqBuf []byte @@ -148,12 +141,25 @@ type unmarshalWork struct { func (uw *unmarshalWork) reset() { uw.rows.Reset() + uw.ctx = nil uw.callback = nil uw.errLogger = nil uw.defaultTimestamp = 0 uw.reqBuf = uw.reqBuf[:0] } +func (uw *unmarshalWork) runCallback(rows []Row) { + ctx := uw.ctx + if err := uw.callback(rows); err != nil { + ctx.callbackErrLock.Lock() + if ctx.callbackErr == nil { + ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err) + } + ctx.callbackErrLock.Unlock() + } + ctx.wg.Done() +} + // Unmarshal implements common.UnmarshalWork func (uw *unmarshalWork) Unmarshal() { if uw.errLogger != nil { @@ -176,7 +182,7 @@ func (uw *unmarshalWork) Unmarshal() { } } - uw.callback(rows) + uw.runCallback(rows) putUnmarshalWork(uw) } diff --git a/lib/protoparser/vmimport/streamparser.go b/lib/protoparser/vmimport/streamparser.go index 497643090..dbcd84e56 100644 --- a/lib/protoparser/vmimport/streamparser.go +++ b/lib/protoparser/vmimport/streamparser.go @@ -34,16 +34,8 @@ func ParseStream(r io.Reader, isGzipped bool, callback func(rows []Row) error) e defer putStreamContext(ctx) for ctx.Read() { uw := getUnmarshalWork() - uw.callback = func(rows []Row) { - if err := callback(rows); err != nil { - ctx.callbackErrLock.Lock() - if ctx.callbackErr == nil { - ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err) - } - ctx.callbackErrLock.Unlock() - } - ctx.wg.Done() - } + uw.ctx = ctx + uw.callback = callback uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf ctx.wg.Add(1) common.ScheduleUnmarshalWork(uw) @@ -141,16 +133,30 @@ var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows Rows - callback func(rows []Row) + ctx *streamContext + callback func(rows []Row) error reqBuf []byte } func (uw *unmarshalWork) reset() { uw.rows.Reset() + uw.ctx = nil uw.callback = nil uw.reqBuf = uw.reqBuf[:0] } +func (uw *unmarshalWork) runCallback(rows []Row) { + ctx := uw.ctx + if err := uw.callback(rows); err != nil { + ctx.callbackErrLock.Lock() + if ctx.callbackErr == nil { + ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err) + } + ctx.callbackErrLock.Unlock() + } + ctx.wg.Done() +} + // Unmarshal implements common.UnmarshalWork func (uw *unmarshalWork) Unmarshal() { uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf)) @@ -159,7 +165,7 @@ func (uw *unmarshalWork) Unmarshal() { row := &rows[i] rowsRead.Add(len(row.Timestamps)) } - uw.callback(rows) + uw.runCallback(rows) putUnmarshalWork(uw) } diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index bab0783d1..9aecc4f86 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -724,7 +724,7 @@ func (is *indexSearch) searchTagKeysOnTimeRange(tks map[string]struct{}, tr Time return is.searchTagKeys(tks, maxTagKeys) } var mu sync.Mutex - var wg sync.WaitGroup + wg := getWaitGroup() var errGlobal error for date := minDate; date <= maxDate; date++ { wg.Add(1) @@ -752,6 +752,7 @@ func (is *indexSearch) searchTagKeysOnTimeRange(tks map[string]struct{}, tr Time }(date) } wg.Wait() + putWaitGroup(wg) return errGlobal } @@ -926,7 +927,7 @@ func (is *indexSearch) searchTagValuesOnTimeRange(tvs map[string]struct{}, tagKe return is.searchTagValues(tvs, tagKey, maxTagValues) } var mu sync.Mutex - var wg sync.WaitGroup + wg := getWaitGroup() var errGlobal error for date := minDate; date <= maxDate; date++ { wg.Add(1) @@ -954,6 +955,7 @@ func (is *indexSearch) searchTagValuesOnTimeRange(tvs map[string]struct{}, tagKe }(date) } wg.Wait() + putWaitGroup(wg) return errGlobal } @@ -1141,7 +1143,7 @@ func (is *indexSearch) searchTagValueSuffixesForTimeRange(tvss map[string]struct return is.searchTagValueSuffixesAll(tvss, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes) } // Query over multiple days in parallel. - var wg sync.WaitGroup + wg := getWaitGroup() var errGlobal error var mu sync.Mutex // protects tvss + errGlobal from concurrent access below. for minDate <= maxDate { @@ -1171,6 +1173,7 @@ func (is *indexSearch) searchTagValueSuffixesForTimeRange(tvss map[string]struct minDate++ } wg.Wait() + putWaitGroup(wg) return errGlobal } @@ -2446,7 +2449,7 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set } // Slower path - search for metricIDs for each day in parallel. - var wg sync.WaitGroup + wg := getWaitGroup() var errGlobal error var mu sync.Mutex // protects metricIDs + errGlobal vars from concurrent access below for minDate <= maxDate { @@ -2473,6 +2476,7 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set minDate++ } wg.Wait() + putWaitGroup(wg) if errGlobal != nil { return errGlobal } diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 9b14c139a..910019d9c 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -65,7 +65,7 @@ const finalPartsToMerge = 3 // The number of shards for rawRow entries per partition. // // Higher number of shards reduces CPU contention and increases the max bandwidth on multi-core systems. -var rawRowsShardsPerPartition = (cgroup.AvailableCPUs() + 7) / 8 +var rawRowsShardsPerPartition = (cgroup.AvailableCPUs() + 3) / 4 // getMaxRawRowsPerShard returns the maximum number of rows that haven't been converted into parts yet. func getMaxRawRowsPerShard() int { @@ -481,7 +481,7 @@ func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) { func (pt *partition) flushRowsToParts(rows []rawRow) { maxRows := getMaxRawRowsPerShard() - var wg sync.WaitGroup + wg := getWaitGroup() for len(rows) > 0 { n := maxRows if n > len(rows) { @@ -495,8 +495,23 @@ func (pt *partition) flushRowsToParts(rows []rawRow) { rows = rows[n:] } wg.Wait() + putWaitGroup(wg) } +func getWaitGroup() *sync.WaitGroup { + v := wgPool.Get() + if v == nil { + return &sync.WaitGroup{} + } + return v.(*sync.WaitGroup) +} + +func putWaitGroup(wg *sync.WaitGroup) { + wgPool.Put(wg) +} + +var wgPool sync.Pool + func (pt *partition) addRowsPart(rows []rawRow) { if len(rows) == 0 { return diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 848239b6c..95d4f88a8 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -166,10 +166,11 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1447 for details. if fs.IsPathExist(s.cachePath + "/reset_cache_on_startup") { logger.Infof("removing cache directory at %q, since it contains `reset_cache_on_startup` file...", s.cachePath) - var wg sync.WaitGroup + wg := getWaitGroup() wg.Add(1) fs.MustRemoveAllWithDoneCallback(s.cachePath, wg.Done) wg.Wait() + putWaitGroup(wg) logger.Infof("cache directory at %q has been successfully removed", s.cachePath) } diff --git a/lib/workingsetcache/cache.go b/lib/workingsetcache/cache.go index 6192fb515..8c4c220b0 100644 --- a/lib/workingsetcache/cache.go +++ b/lib/workingsetcache/cache.go @@ -144,17 +144,12 @@ func (c *Cache) expirationWatcher(expireDuration time.Duration) { c.mu.Unlock() return } - // Expire prev cache and create fresh curr cache with the same capacity. - // Do not reuse prev cache, since it can occupy too big amounts of memory. + // Reset prev cache and swap it with the curr cache. prev := c.prev.Load().(*fastcache.Cache) - prev.Reset() curr := c.curr.Load().(*fastcache.Cache) c.prev.Store(curr) - // Use c.maxBytes/2 instead of cs.MaxBytesSize for creating new cache, - // since cs.MaxBytesSize may not match c.maxBytes/2, so the created cache - // couldn't be loaded from file with c.maxBytes/2 limit after saving with cs.MaxBytesSize size. - curr = fastcache.New(c.maxBytes / 2) - c.curr.Store(curr) + prev.Reset() + c.curr.Store(prev) c.mu.Unlock() } } @@ -197,9 +192,9 @@ func (c *Cache) cacheSizeWatcher() { c.mu.Lock() c.setMode(switching) prev := c.prev.Load().(*fastcache.Cache) - prev.Reset() curr := c.curr.Load().(*fastcache.Cache) c.prev.Store(curr) + prev.Reset() // use c.maxBytes instead of maxBytesSize*2 for creating new cache, since otherwise the created cache // couldn't be loaded from file with c.maxBytes limit after saving with maxBytesSize*2 limit. c.curr.Store(fastcache.New(c.maxBytes)) @@ -222,8 +217,8 @@ func (c *Cache) cacheSizeWatcher() { c.mu.Lock() c.setMode(whole) prev = c.prev.Load().(*fastcache.Cache) - prev.Reset() c.prev.Store(fastcache.New(1024)) + prev.Reset() c.mu.Unlock() } diff --git a/vendor/github.com/VictoriaMetrics/fastcache/fastcache.go b/vendor/github.com/VictoriaMetrics/fastcache/fastcache.go index daa3db773..092ba3719 100644 --- a/vendor/github.com/VictoriaMetrics/fastcache/fastcache.go +++ b/vendor/github.com/VictoriaMetrics/fastcache/fastcache.go @@ -257,10 +257,7 @@ func (b *bucket) Reset() { putChunk(chunks[i]) chunks[i] = nil } - bm := b.m - for k := range bm { - delete(bm, k) - } + b.m = make(map[uint64]uint64) b.idx = 0 b.gen = 1 atomic.StoreUint64(&b.getCalls, 0) diff --git a/vendor/golang.org/x/sys/unix/syscall_linux_amd64.go b/vendor/golang.org/x/sys/unix/syscall_linux_amd64.go index b945ab254..f5e9d6bef 100644 --- a/vendor/golang.org/x/sys/unix/syscall_linux_amd64.go +++ b/vendor/golang.org/x/sys/unix/syscall_linux_amd64.go @@ -28,6 +28,7 @@ func Lstat(path string, stat *Stat_t) (err error) { return Fstatat(AT_FDCWD, path, stat, AT_SYMLINK_NOFOLLOW) } +//sys MemfdSecret(flags int) (fd int, err error) //sys Pause() (err error) //sys pread(fd int, p []byte, offset int64) (n int, err error) = SYS_PREAD64 //sys pwrite(fd int, p []byte, offset int64) (n int, err error) = SYS_PWRITE64 diff --git a/vendor/golang.org/x/sys/unix/syscall_linux_arm64.go b/vendor/golang.org/x/sys/unix/syscall_linux_arm64.go index 81db4833a..d83e2c657 100644 --- a/vendor/golang.org/x/sys/unix/syscall_linux_arm64.go +++ b/vendor/golang.org/x/sys/unix/syscall_linux_arm64.go @@ -22,6 +22,7 @@ import "unsafe" //sysnb getrlimit(resource int, rlim *Rlimit) (err error) //sysnb Getuid() (uid int) //sys Listen(s int, n int) (err error) +//sys MemfdSecret(flags int) (fd int, err error) //sys pread(fd int, p []byte, offset int64) (n int, err error) = SYS_PREAD64 //sys pwrite(fd int, p []byte, offset int64) (n int, err error) = SYS_PWRITE64 //sys Renameat(olddirfd int, oldpath string, newdirfd int, newpath string) (err error) diff --git a/vendor/golang.org/x/sys/unix/syscall_linux_riscv64.go b/vendor/golang.org/x/sys/unix/syscall_linux_riscv64.go index 8ff7adba0..925a748a3 100644 --- a/vendor/golang.org/x/sys/unix/syscall_linux_riscv64.go +++ b/vendor/golang.org/x/sys/unix/syscall_linux_riscv64.go @@ -22,6 +22,7 @@ import "unsafe" //sysnb Getrlimit(resource int, rlim *Rlimit) (err error) //sysnb Getuid() (uid int) //sys Listen(s int, n int) (err error) +//sys MemfdSecret(flags int) (fd int, err error) //sys pread(fd int, p []byte, offset int64) (n int, err error) = SYS_PREAD64 //sys pwrite(fd int, p []byte, offset int64) (n int, err error) = SYS_PWRITE64 //sys Seek(fd int, offset int64, whence int) (off int64, err error) = SYS_LSEEK diff --git a/vendor/golang.org/x/sys/unix/zsyscall_linux_amd64.go b/vendor/golang.org/x/sys/unix/zsyscall_linux_amd64.go index c947a4d10..2a0c4aa6a 100644 --- a/vendor/golang.org/x/sys/unix/zsyscall_linux_amd64.go +++ b/vendor/golang.org/x/sys/unix/zsyscall_linux_amd64.go @@ -215,6 +215,17 @@ func Listen(s int, n int) (err error) { // THIS FILE IS GENERATED BY THE COMMAND AT THE TOP; DO NOT EDIT +func MemfdSecret(flags int) (fd int, err error) { + r0, _, e1 := Syscall(SYS_MEMFD_SECRET, uintptr(flags), 0, 0) + fd = int(r0) + if e1 != 0 { + err = errnoErr(e1) + } + return +} + +// THIS FILE IS GENERATED BY THE COMMAND AT THE TOP; DO NOT EDIT + func Pause() (err error) { _, _, e1 := Syscall(SYS_PAUSE, 0, 0, 0) if e1 != 0 { diff --git a/vendor/golang.org/x/sys/unix/zsyscall_linux_arm64.go b/vendor/golang.org/x/sys/unix/zsyscall_linux_arm64.go index dd15284d8..9f8c24e43 100644 --- a/vendor/golang.org/x/sys/unix/zsyscall_linux_arm64.go +++ b/vendor/golang.org/x/sys/unix/zsyscall_linux_arm64.go @@ -180,6 +180,17 @@ func Listen(s int, n int) (err error) { // THIS FILE IS GENERATED BY THE COMMAND AT THE TOP; DO NOT EDIT +func MemfdSecret(flags int) (fd int, err error) { + r0, _, e1 := Syscall(SYS_MEMFD_SECRET, uintptr(flags), 0, 0) + fd = int(r0) + if e1 != 0 { + err = errnoErr(e1) + } + return +} + +// THIS FILE IS GENERATED BY THE COMMAND AT THE TOP; DO NOT EDIT + func pread(fd int, p []byte, offset int64) (n int, err error) { var _p0 unsafe.Pointer if len(p) > 0 { diff --git a/vendor/golang.org/x/sys/unix/zsyscall_linux_riscv64.go b/vendor/golang.org/x/sys/unix/zsyscall_linux_riscv64.go index a1a9bcbbd..1239cc2de 100644 --- a/vendor/golang.org/x/sys/unix/zsyscall_linux_riscv64.go +++ b/vendor/golang.org/x/sys/unix/zsyscall_linux_riscv64.go @@ -180,6 +180,17 @@ func Listen(s int, n int) (err error) { // THIS FILE IS GENERATED BY THE COMMAND AT THE TOP; DO NOT EDIT +func MemfdSecret(flags int) (fd int, err error) { + r0, _, e1 := Syscall(SYS_MEMFD_SECRET, uintptr(flags), 0, 0) + fd = int(r0) + if e1 != 0 { + err = errnoErr(e1) + } + return +} + +// THIS FILE IS GENERATED BY THE COMMAND AT THE TOP; DO NOT EDIT + func pread(fd int, p []byte, offset int64) (n int, err error) { var _p0 unsafe.Pointer if len(p) > 0 { diff --git a/vendor/modules.txt b/vendor/modules.txt index 5e739fe31..738a6f750 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -16,7 +16,7 @@ cloud.google.com/go/iam cloud.google.com/go/storage cloud.google.com/go/storage/internal cloud.google.com/go/storage/internal/apiv2 -# github.com/VictoriaMetrics/fastcache v1.9.0 +# github.com/VictoriaMetrics/fastcache v1.10.0 ## explicit; go 1.13 github.com/VictoriaMetrics/fastcache # github.com/VictoriaMetrics/fasthttp v1.1.0 @@ -290,7 +290,7 @@ golang.org/x/oauth2/jwt # golang.org/x/sync v0.0.0-20210220032951-036812b2e83c ## explicit golang.org/x/sync/errgroup -# golang.org/x/sys v0.0.0-20220403205710-6acee93ad0eb +# golang.org/x/sys v0.0.0-20220405052023-b1e9470b6e64 ## explicit; go 1.17 golang.org/x/sys/internal/unsafeheader golang.org/x/sys/unix