Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files

This commit is contained in:
Aliaksandr Valialkin 2022-04-07 15:25:52 +03:00
commit ec01a188fd
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
36 changed files with 423 additions and 271 deletions

View file

@ -92,9 +92,9 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste
}
tlsCfg := authCfg.NewTLSConfig()
tr := &http.Transport{
Dial: statDial,
DialContext: statDial,
TLSClientConfig: tlsCfg,
TLSHandshakeTimeout: 5 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
MaxConnsPerHost: 2 * concurrency,
MaxIdleConnsPerHost: 2 * concurrency,
IdleConnTimeout: time.Minute,

View file

@ -1,7 +1,9 @@
package remotewrite
import (
"context"
"net"
"sync"
"sync/atomic"
"time"
@ -9,9 +11,26 @@ import (
"github.com/VictoriaMetrics/metrics"
)
func statDial(networkUnused, addr string) (conn net.Conn, err error) {
func getStdDialer() *net.Dialer {
stdDialerOnce.Do(func() {
stdDialer = &net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: netutil.TCP6Enabled(),
}
})
return stdDialer
}
var (
stdDialer *net.Dialer
stdDialerOnce sync.Once
)
func statDial(ctx context.Context, networkUnused, addr string) (conn net.Conn, err error) {
network := netutil.GetTCPNetwork()
conn, err = net.DialTimeout(network, addr, 5*time.Second)
d := getStdDialer()
conn, err = d.DialContext(ctx, network, addr)
dialsTotal.Inc()
if err != nil {
dialErrors.Inc()

View file

@ -141,6 +141,53 @@ func (ar *AlertingRule) ID() uint64 {
return ar.RuleID
}
type labelSet struct {
// origin labels from series
// used for templating
origin map[string]string
// processed labels with additional data
// used as Alert labels
processed map[string]string
}
// toLabels converts labels from given Metric
// to labelSet which contains original and processed labels.
func (ar *AlertingRule) toLabels(m datasource.Metric, qFn notifier.QueryFn) (*labelSet, error) {
ls := &labelSet{
origin: make(map[string]string, len(m.Labels)),
processed: make(map[string]string),
}
for _, l := range m.Labels {
// drop __name__ to be consistent with Prometheus alerting
if l.Name == "__name__" {
continue
}
ls.origin[l.Name] = l.Value
ls.processed[l.Name] = l.Value
}
extraLabels, err := notifier.ExecTemplate(qFn, ar.Labels, notifier.AlertTplData{
Labels: ls.origin,
Value: m.Values[0],
Expr: ar.Expr,
})
if err != nil {
return nil, fmt.Errorf("failed to expand labels: %s", err)
}
for k, v := range extraLabels {
ls.processed[k] = v
}
// set additional labels to identify group and rule name
if ar.Name != "" {
ls.processed[alertNameLabel] = ar.Name
}
if !*disableAlertGroupLabel && ar.GroupName != "" {
ls.processed[alertGroupNameLabel] = ar.GroupName
}
return ls, nil
}
// ExecRange executes alerting rule on the given time range similarly to Exec.
// It doesn't update internal states of the Rule and meant to be used just
// to get time series for backfilling.
@ -155,24 +202,7 @@ func (ar *AlertingRule) ExecRange(ctx context.Context, start, end time.Time) ([]
return nil, fmt.Errorf("`query` template isn't supported in replay mode")
}
for _, s := range series {
// set additional labels to identify group and rule Name
if ar.Name != "" {
s.SetLabel(alertNameLabel, ar.Name)
}
if !*disableAlertGroupLabel && ar.GroupName != "" {
s.SetLabel(alertGroupNameLabel, ar.GroupName)
}
// extra labels could contain templates, so we expand them first
labels, err := expandLabels(s, qFn, ar)
if err != nil {
return nil, fmt.Errorf("failed to expand labels: %s", err)
}
for k, v := range labels {
// apply extra labels to datasource
// so the hash key will be consistent on restore
s.SetLabel(k, v)
}
a, err := ar.newAlert(s, time.Time{}, qFn) // initial alert
a, err := ar.newAlert(s, nil, time.Time{}, qFn) // initial alert
if err != nil {
return nil, fmt.Errorf("failed to create alert: %s", err)
}
@ -234,28 +264,15 @@ func (ar *AlertingRule) Exec(ctx context.Context, ts time.Time) ([]prompbmarshal
updated := make(map[uint64]struct{})
// update list of active alerts
for _, m := range qMetrics {
// set additional labels to identify group and rule name
if ar.Name != "" {
m.SetLabel(alertNameLabel, ar.Name)
}
if !*disableAlertGroupLabel && ar.GroupName != "" {
m.SetLabel(alertGroupNameLabel, ar.GroupName)
}
// extra labels could contain templates, so we expand them first
labels, err := expandLabels(m, qFn, ar)
ls, err := ar.toLabels(m, qFn)
if err != nil {
return nil, fmt.Errorf("failed to expand labels: %s", err)
}
for k, v := range labels {
// apply extra labels to datasource
// so the hash key will be consistent on restore
m.SetLabel(k, v)
}
h := hash(m)
h := hash(ls.processed)
if _, ok := updated[h]; ok {
// duplicate may be caused by extra labels
// conflicting with the metric labels
ar.lastExecError = fmt.Errorf("labels %v: %w", m.Labels, errDuplicate)
ar.lastExecError = fmt.Errorf("labels %v: %w", ls.processed, errDuplicate)
return nil, ar.lastExecError
}
updated[h] = struct{}{}
@ -272,14 +289,14 @@ func (ar *AlertingRule) Exec(ctx context.Context, ts time.Time) ([]prompbmarshal
a.Value = m.Values[0]
// and re-exec template since Value can be used
// in annotations
a.Annotations, err = a.ExecTemplate(qFn, ar.Annotations)
a.Annotations, err = a.ExecTemplate(qFn, ls.origin, ar.Annotations)
if err != nil {
return nil, err
}
}
continue
}
a, err := ar.newAlert(m, ar.lastExecTime, qFn)
a, err := ar.newAlert(m, ls, ar.lastExecTime, qFn)
if err != nil {
ar.lastExecError = err
return nil, fmt.Errorf("failed to create alert: %w", err)
@ -315,19 +332,6 @@ func (ar *AlertingRule) Exec(ctx context.Context, ts time.Time) ([]prompbmarshal
return ar.toTimeSeries(ts.Unix()), nil
}
func expandLabels(m datasource.Metric, q notifier.QueryFn, ar *AlertingRule) (map[string]string, error) {
metricLabels := make(map[string]string)
for _, l := range m.Labels {
metricLabels[l.Name] = l.Value
}
tpl := notifier.AlertTplData{
Labels: metricLabels,
Value: m.Values[0],
Expr: ar.Expr,
}
return notifier.ExecTemplate(q, ar.Labels, tpl)
}
func (ar *AlertingRule) toTimeSeries(timestamp int64) []prompbmarshal.TimeSeries {
var tss []prompbmarshal.TimeSeries
for _, a := range ar.alerts {
@ -358,42 +362,43 @@ func (ar *AlertingRule) UpdateWith(r Rule) error {
}
// TODO: consider hashing algorithm in VM
func hash(m datasource.Metric) uint64 {
func hash(labels map[string]string) uint64 {
hash := fnv.New64a()
labels := m.Labels
sort.Slice(labels, func(i, j int) bool {
return labels[i].Name < labels[j].Name
})
for _, l := range labels {
keys := make([]string, 0, len(labels))
for k := range labels {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
// drop __name__ to be consistent with Prometheus alerting
if l.Name == "__name__" {
if k == "__name__" {
continue
}
hash.Write([]byte(l.Name))
hash.Write([]byte(l.Value))
name, value := k, labels[k]
hash.Write([]byte(name))
hash.Write([]byte(value))
hash.Write([]byte("\xff"))
}
return hash.Sum64()
}
func (ar *AlertingRule) newAlert(m datasource.Metric, start time.Time, qFn notifier.QueryFn) (*notifier.Alert, error) {
func (ar *AlertingRule) newAlert(m datasource.Metric, ls *labelSet, start time.Time, qFn notifier.QueryFn) (*notifier.Alert, error) {
var err error
if ls == nil {
ls, err = ar.toLabels(m, qFn)
if err != nil {
return nil, fmt.Errorf("failed to expand labels: %s", err)
}
}
a := &notifier.Alert{
GroupID: ar.GroupID,
Name: ar.Name,
Labels: map[string]string{},
Labels: ls.processed,
Value: m.Values[0],
ActiveAt: start,
Expr: ar.Expr,
}
for _, l := range m.Labels {
// drop __name__ to be consistent with Prometheus alerting
if l.Name == "__name__" {
continue
}
a.Labels[l.Name] = l.Value
}
var err error
a.Annotations, err = a.ExecTemplate(qFn, ar.Annotations)
a.Annotations, err = a.ExecTemplate(qFn, ls.origin, ar.Annotations)
return a, err
}
@ -560,15 +565,26 @@ func (ar *AlertingRule) Restore(ctx context.Context, q datasource.Querier, lookb
}
for _, m := range qMetrics {
a, err := ar.newAlert(m, time.Unix(int64(m.Values[0]), 0), qFn)
ls := &labelSet{
origin: make(map[string]string, len(m.Labels)),
processed: make(map[string]string, len(m.Labels)),
}
for _, l := range m.Labels {
if l.Name == "__name__" {
continue
}
ls.origin[l.Name] = l.Value
ls.processed[l.Name] = l.Value
}
a, err := ar.newAlert(m, ls, time.Unix(int64(m.Values[0]), 0), qFn)
if err != nil {
return fmt.Errorf("failed to create alert: %w", err)
}
a.ID = hash(m)
a.ID = hash(ls.processed)
a.State = notifier.StatePending
a.Restored = true
ar.alerts[a.ID] = a
logger.Infof("alert %q (%d) restored to state at %v", a.Name, a.ID, a.Start)
logger.Infof("alert %q (%d) restored to state at %v", a.Name, a.ID, a.ActiveAt)
}
return nil
}

View file

@ -315,10 +315,13 @@ func TestAlertingRule_Exec(t *testing.T) {
}
expAlerts := make(map[uint64]*notifier.Alert)
for _, ta := range tc.expAlerts {
labels := ta.labels
labels = append(labels, alertNameLabel)
labels = append(labels, tc.rule.Name)
h := hash(metricWithLabels(t, labels...))
labels := make(map[string]string)
for i := 0; i < len(ta.labels); i += 2 {
k, v := ta.labels[i], ta.labels[i+1]
labels[k] = v
}
labels[alertNameLabel] = tc.rule.Name
h := hash(labels)
expAlerts[h] = ta.alert
}
for key, exp := range expAlerts {
@ -513,7 +516,7 @@ func TestAlertingRule_Restore(t *testing.T) {
),
},
map[uint64]*notifier.Alert{
hash(datasource.Metric{}): {State: notifier.StatePending,
hash(nil): {State: notifier.StatePending,
ActiveAt: time.Now().Truncate(time.Hour)},
},
},
@ -529,12 +532,12 @@ func TestAlertingRule_Restore(t *testing.T) {
),
},
map[uint64]*notifier.Alert{
hash(metricWithLabels(t,
alertNameLabel, "metric labels",
alertGroupNameLabel, "groupID",
"foo", "bar",
"namespace", "baz",
)): {State: notifier.StatePending,
hash(map[string]string{
alertNameLabel: "metric labels",
alertGroupNameLabel: "groupID",
"foo": "bar",
"namespace": "baz",
}): {State: notifier.StatePending,
ActiveAt: time.Now().Truncate(time.Hour)},
},
},
@ -550,11 +553,11 @@ func TestAlertingRule_Restore(t *testing.T) {
),
},
map[uint64]*notifier.Alert{
hash(metricWithLabels(t,
"foo", "bar",
"namespace", "baz",
"source", "vm",
)): {State: notifier.StatePending,
hash(map[string]string{
"foo": "bar",
"namespace": "baz",
"source": "vm",
}): {State: notifier.StatePending,
ActiveAt: time.Now().Truncate(time.Hour)},
},
},
@ -575,11 +578,11 @@ func TestAlertingRule_Restore(t *testing.T) {
),
},
map[uint64]*notifier.Alert{
hash(metricWithLabels(t, "host", "localhost-1")): {State: notifier.StatePending,
hash(map[string]string{"host": "localhost-1"}): {State: notifier.StatePending,
ActiveAt: time.Now().Truncate(time.Hour)},
hash(metricWithLabels(t, "host", "localhost-2")): {State: notifier.StatePending,
hash(map[string]string{"host": "localhost-2"}): {State: notifier.StatePending,
ActiveAt: time.Now().Truncate(2 * time.Hour)},
hash(metricWithLabels(t, "host", "localhost-3")): {State: notifier.StatePending,
hash(map[string]string{"host": "localhost-3"}): {State: notifier.StatePending,
ActiveAt: time.Now().Truncate(3 * time.Hour)},
},
},
@ -659,7 +662,7 @@ func TestAlertingRule_Template(t *testing.T) {
metricWithValueAndLabels(t, 1, "instance", "bar"),
},
map[uint64]*notifier.Alert{
hash(metricWithLabels(t, alertNameLabel, "common", "region", "east", "instance", "foo")): {
hash(map[string]string{alertNameLabel: "common", "region": "east", "instance": "foo"}): {
Annotations: map[string]string{},
Labels: map[string]string{
alertNameLabel: "common",
@ -667,7 +670,7 @@ func TestAlertingRule_Template(t *testing.T) {
"instance": "foo",
},
},
hash(metricWithLabels(t, alertNameLabel, "common", "region", "east", "instance", "bar")): {
hash(map[string]string{alertNameLabel: "common", "region": "east", "instance": "bar"}): {
Annotations: map[string]string{},
Labels: map[string]string{
alertNameLabel: "common",
@ -682,11 +685,10 @@ func TestAlertingRule_Template(t *testing.T) {
Name: "override label",
Labels: map[string]string{
"instance": "{{ $labels.instance }}",
"region": "east",
},
Annotations: map[string]string{
"summary": `Too high connection number for "{{ $labels.instance }}" for region {{ $labels.region }}`,
"description": `It is {{ $value }} connections for "{{ $labels.instance }}"`,
"summary": `Too high connection number for "{{ $labels.instance }}"`,
"description": `{{ $labels.alertname}}: It is {{ $value }} connections for "{{ $labels.instance }}"`,
},
alerts: make(map[uint64]*notifier.Alert),
},
@ -695,64 +697,58 @@ func TestAlertingRule_Template(t *testing.T) {
metricWithValueAndLabels(t, 10, "instance", "bar", alertNameLabel, "override"),
},
map[uint64]*notifier.Alert{
hash(metricWithLabels(t, alertNameLabel, "override label", "region", "east", "instance", "foo")): {
hash(map[string]string{alertNameLabel: "override label", "instance": "foo"}): {
Labels: map[string]string{
alertNameLabel: "override label",
"instance": "foo",
"region": "east",
},
Annotations: map[string]string{
"summary": `Too high connection number for "foo" for region east`,
"description": `It is 2 connections for "foo"`,
"summary": `Too high connection number for "foo"`,
"description": `override: It is 2 connections for "foo"`,
},
},
hash(metricWithLabels(t, alertNameLabel, "override label", "region", "east", "instance", "bar")): {
hash(map[string]string{alertNameLabel: "override label", "instance": "bar"}): {
Labels: map[string]string{
alertNameLabel: "override label",
"instance": "bar",
"region": "east",
},
Annotations: map[string]string{
"summary": `Too high connection number for "bar" for region east`,
"description": `It is 10 connections for "bar"`,
"summary": `Too high connection number for "bar"`,
"description": `override: It is 10 connections for "bar"`,
},
},
},
},
{
&AlertingRule{
Name: "ExtraTemplating",
Name: "OriginLabels",
GroupName: "Testing",
Labels: map[string]string{
"name": "alert_{{ $labels.alertname }}",
"group": "group_{{ $labels.alertgroup }}",
"instance": "{{ $labels.instance }}",
},
Annotations: map[string]string{
"summary": `Alert "{{ $labels.alertname }}({{ $labels.alertgroup }})" for instance {{ $labels.instance }}`,
"description": `Alert "{{ $labels.name }}({{ $labels.group }})" for instance {{ $labels.instance }}`,
},
alerts: make(map[uint64]*notifier.Alert),
},
[]datasource.Metric{
metricWithValueAndLabels(t, 1, "instance", "foo"),
metricWithValueAndLabels(t, 1,
alertNameLabel, "originAlertname",
alertGroupNameLabel, "originGroupname",
"instance", "foo"),
},
map[uint64]*notifier.Alert{
hash(metricWithLabels(t, alertNameLabel, "ExtraTemplating",
"name", "alert_ExtraTemplating",
alertGroupNameLabel, "Testing",
"group", "group_Testing",
"instance", "foo")): {
Labels: map[string]string{
alertNameLabel: "ExtraTemplating",
"name": "alert_ExtraTemplating",
hash(map[string]string{
alertNameLabel: "OriginLabels",
alertGroupNameLabel: "Testing",
"instance": "foo"}): {
Labels: map[string]string{
alertNameLabel: "OriginLabels",
alertGroupNameLabel: "Testing",
"group": "group_Testing",
"instance": "foo",
},
Annotations: map[string]string{
"summary": `Alert "ExtraTemplating(Testing)" for instance foo`,
"description": `Alert "alert_ExtraTemplating(group_Testing)" for instance foo`,
"summary": `Alert "originAlertname(originGroupname)" for instance foo`,
},
},
},

View file

@ -174,7 +174,7 @@ func TestGroupStart(t *testing.T) {
m2 := metricWithLabels(t, "instance", inst2, "job", job)
r := g.Rules[0].(*AlertingRule)
alert1, err := r.newAlert(m1, time.Now(), nil)
alert1, err := r.newAlert(m1, nil, time.Now(), nil)
if err != nil {
t.Fatalf("faield to create alert: %s", err)
}
@ -187,13 +187,9 @@ func TestGroupStart(t *testing.T) {
// add service labels
alert1.Labels[alertNameLabel] = alert1.Name
alert1.Labels[alertGroupNameLabel] = g.Name
var labels1 []string
for k, v := range alert1.Labels {
labels1 = append(labels1, k, v)
}
alert1.ID = hash(metricWithLabels(t, labels1...))
alert1.ID = hash(alert1.Labels)
alert2, err := r.newAlert(m2, time.Now(), nil)
alert2, err := r.newAlert(m2, nil, time.Now(), nil)
if err != nil {
t.Fatalf("faield to create alert: %s", err)
}
@ -206,11 +202,7 @@ func TestGroupStart(t *testing.T) {
// add service labels
alert2.Labels[alertNameLabel] = alert2.Name
alert2.Labels[alertGroupNameLabel] = g.Name
var labels2 []string
for k, v := range alert2.Labels {
labels2 = append(labels2, k, v)
}
alert2.ID = hash(metricWithLabels(t, labels2...))
alert2.ID = hash(alert2.Labels)
finished := make(chan struct{})
fs.add(m1)

View file

@ -243,7 +243,7 @@ func getAlertURLGenerator(externalURL *url.URL, externalAlertSource string, vali
"tpl": externalAlertSource,
}
return func(alert notifier.Alert) string {
templated, err := alert.ExecTemplate(nil, m)
templated, err := alert.ExecTemplate(nil, nil, m)
if err != nil {
logger.Errorf("can not exec source template %s", err)
}

View file

@ -37,7 +37,7 @@ func (m *manager) AlertAPI(gID, aID uint64) (*APIAlert, error) {
g, ok := m.groups[gID]
if !ok {
return nil, fmt.Errorf("can't find group with id %q", gID)
return nil, fmt.Errorf("can't find group with id %d", gID)
}
for _, rule := range g.Rules {
ar, ok := rule.(*AlertingRule)
@ -48,7 +48,7 @@ func (m *manager) AlertAPI(gID, aID uint64) (*APIAlert, error) {
return apiAlert, nil
}
}
return nil, fmt.Errorf("can't find alert with id %q in group %q", aID, g.Name)
return nil, fmt.Errorf("can't find alert with id %d in group %q", aID, g.Name)
}
func (m *manager) start(ctx context.Context, groupsCfg []config.Group) error {

View file

@ -88,8 +88,8 @@ var tplHeaders = []string{
// map of annotations.
// Every alert could have a different datasource, so function
// requires a queryFunction as an argument.
func (a *Alert) ExecTemplate(q QueryFn, annotations map[string]string) (map[string]string, error) {
tplData := AlertTplData{Value: a.Value, Labels: a.Labels, Expr: a.Expr}
func (a *Alert) ExecTemplate(q QueryFn, labels, annotations map[string]string) (map[string]string, error) {
tplData := AlertTplData{Value: a.Value, Labels: labels, Expr: a.Expr}
return templateAnnotations(annotations, tplData, funcsWithQuery(q))
}

View file

@ -130,7 +130,7 @@ func TestAlert_ExecTemplate(t *testing.T) {
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
tpl, err := tc.alert.ExecTemplate(qFn, tc.annotations)
tpl, err := tc.alert.ExecTemplate(qFn, tc.alert.Labels, tc.annotations)
if err != nil {
t.Fatal(err)
}

View file

@ -33,6 +33,7 @@ Previously the `-search.maxUniqueTimeseries` command-line flag was used as a glo
When using [cluster version of VictoriaMetrics](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html), these command-line flags (including `-search.maxUniqueTimeseries`) must be passed to `vmselect` instead of `vmstorage`.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html) and [vmauth](https://docs.victoriametrics.com/vmauth.html): reduce the probability of `TLS handshake error from XX.XX.XX.XX: EOF` errors when `-remoteWrite.url` points to HTTPS url at `vmauth`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1699).
* BUGFIX: return `Content-Type: text/html` response header when requesting `/` HTTP path at VictoriaMetrics components. Previously `text/plain` response header was returned, which could lead to broken page formatting. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2323).
* BUGFIX: [Graphite Render API](https://docs.victoriametrics.com/#graphite-render-api-usage): accept floating-point values for [maxDataPoints](https://graphite.readthedocs.io/en/stable/render_api.html#maxdatapoints) query arg, since some clients send floating-point values instead of integer values for this arg.

View file

@ -6,6 +6,7 @@ sort: 18
## Release version and Docker images
0. Make sure that the release commits have no security issues.
1. Document all the changes for new release in [CHANGELOG.md](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/docs/CHANGELOG.md).
2. Create the following release tags:
* `git tag -s v1.xx.y` in `master` branch
@ -13,8 +14,9 @@ sort: 18
* `git tag -s v1.xx.y-enterprise` in `enterprise` branch
* `git tag -s v1.xx.y-enterprise-cluster` in `enterprise-cluster` branch
3. Run `TAG=v1.xx.y make publish-release`. It will create `*.tar.gz` release archives with the corresponding `_checksums.txt` files inside `bin` directory and publish Docker images for the given `TAG`, `TAG-cluster`, `TAG-enterprise` and `TAG-enterprise-cluster`.
4. Push release tag to <https://github.com/VictoriaMetrics/VictoriaMetrics> : `git push origin v1.xx.y`.
5. Go to <https://github.com/VictoriaMetrics/VictoriaMetrics/releases> , create new release from the pushed tag on step 5 and upload `*.tar.gz` archive with the corresponding `_checksums.txt` from step 2.
4. Push release tags to <https://github.com/VictoriaMetrics/VictoriaMetrics> : `git push origin v1.xx.y` and `git push origin v1.xx.y-cluster`. Do not push `-enterprise` tags to public repository.
5. Go to <https://github.com/VictoriaMetrics/VictoriaMetrics/releases> , create new release from the pushed tag on step 4 and upload `*.tar.gz` archive with the corresponding `_checksums.txt` from step 3.
6. Copy the [CHANGELOG](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/docs/CHANGELOG.md) for this release to [releases](https://github.com/VictoriaMetrics/VictoriaMetrics/releases) page.
## Building snap package

4
go.mod
View file

@ -4,7 +4,7 @@ go 1.17
require (
cloud.google.com/go/storage v1.21.0
github.com/VictoriaMetrics/fastcache v1.9.0
github.com/VictoriaMetrics/fastcache v1.10.0
// Do not use the original github.com/valyala/fasthttp because of issues
// like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b
@ -33,7 +33,7 @@ require (
github.com/valyala/quicktemplate v1.7.0
golang.org/x/net v0.0.0-20220403103023-749bd193bc2b
golang.org/x/oauth2 v0.0.0-20220309155454-6242fa91716a
golang.org/x/sys v0.0.0-20220403205710-6acee93ad0eb
golang.org/x/sys v0.0.0-20220405052023-b1e9470b6e64
google.golang.org/api v0.74.0
gopkg.in/yaml.v2 v2.4.0
)

9
go.sum
View file

@ -113,8 +113,8 @@ github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdko
github.com/SAP/go-hdb v0.14.1/go.mod h1:7fdQLVC2lER3urZLjZCm0AuMQfApof92n3aylBPEkMo=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/VictoriaMetrics/fastcache v1.9.0 h1:oMwsS6c8abz98B7ytAewQ7M1ZN/Im/iwKoE1euaFvhs=
github.com/VictoriaMetrics/fastcache v1.9.0/go.mod h1:otoTS3xu+6IzF/qByjqzjp3rTuzM3Qf0ScU1UTj97iU=
github.com/VictoriaMetrics/fastcache v1.10.0 h1:5hDJnLsKLpnUEToub7ETuRu8RCkb40woBZAUiKonXzY=
github.com/VictoriaMetrics/fastcache v1.10.0/go.mod h1:tjiYeEfYXCqacuvYw/7UoDIeJaNxq6132xHICNP77w8=
github.com/VictoriaMetrics/fasthttp v1.1.0 h1:3crd4YWHsMwu60GUXRH6OstowiFvqrwS4a/ueoLdLL0=
github.com/VictoriaMetrics/fasthttp v1.1.0/go.mod h1:/7DMcogqd+aaD3G3Hg5kFgoFwlR2uydjiWvoLp5ZTqQ=
github.com/VictoriaMetrics/metrics v1.18.1 h1:OZ0+kTTto8oPfHnVAnTOoyl0XlRhRkoQrD2n2cOuRw0=
@ -1315,12 +1315,11 @@ golang.org/x/sys v0.0.0-20211210111614-af8b64212486/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220328115105-d36c6a25d886/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220403205710-6acee93ad0eb h1:PVGECzEo9Y3uOidtkHGdd347NjLtITfJFO9BxFpmRoo=
golang.org/x/sys v0.0.0-20220403205710-6acee93ad0eb/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220405052023-b1e9470b6e64 h1:D1v9ucDTYBtbz5vNuBbAhIMAGhQhJ6Ym5ah3maMVNX4=
golang.org/x/sys v0.0.0-20220405052023-b1e9470b6e64/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=

View file

@ -19,15 +19,17 @@ func init() {
func initGOGC() {
if v := os.Getenv("GOGC"); v != "" {
n, err := strconv.Atoi(v)
n, err := strconv.ParseFloat(v, 64)
if err != nil {
n = 100
}
gogc = n
gogc = int(n)
} else {
// Set GOGC to 50% by default if it isn't set yet.
// This should reduce memory usage for typical workloads for VictoriaMetrics components.
gogc = 50
// Use lower GOGC if it isn't set yet.
// This should reduce memory usage for typical workloads for VictoriaMetrics components
// at the cost of increased CPU usage.
// It is recommended increasing GOGC if go_memstats_gc_cpu_fraction exceeds 0.05 for extended periods of time.
gogc = 30
debug.SetGCPercent(gogc)
}
}

View file

@ -7,6 +7,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/metrics"
)
var (
@ -14,11 +15,15 @@ var (
allowedBytes = flagutil.NewBytes("memory.allowedBytes", 0, `Allowed size of system memory VictoriaMetrics caches may occupy. This option overrides -memory.allowedPercent if set to a non-zero value. Too low a value may increase the cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from OS page cache resulting in higher disk IO usage`)
)
var _ = metrics.NewGauge("process_memory_limit_bytes", func() float64 {
return float64(memoryLimit)
})
var (
allowedMemory int
remainingMemory int
memoryLimit int
)
var once sync.Once
func initOnce() {
@ -26,18 +31,18 @@ func initOnce() {
// Do not use logger.Panicf here, since logger may be uninitialized yet.
panic(fmt.Errorf("BUG: memory.Allowed must be called only after flag.Parse call"))
}
mem := sysTotalMemory()
memoryLimit = sysTotalMemory()
if allowedBytes.N <= 0 {
if *allowedPercent < 1 || *allowedPercent > 200 {
logger.Panicf("FATAL: -memory.allowedPercent must be in the range [1...200]; got %g", *allowedPercent)
}
percent := *allowedPercent / 100
allowedMemory = int(float64(mem) * percent)
remainingMemory = mem - allowedMemory
allowedMemory = int(float64(memoryLimit) * percent)
remainingMemory = memoryLimit - allowedMemory
logger.Infof("limiting caches to %d bytes, leaving %d bytes to the OS according to -memory.allowedPercent=%g", allowedMemory, remainingMemory, *allowedPercent)
} else {
allowedMemory = allowedBytes.N
remainingMemory = mem - allowedMemory
remainingMemory = memoryLimit - allowedMemory
logger.Infof("limiting caches to %d bytes, leaving %d bytes to the OS according to -memory.allowedBytes=%s", allowedMemory, remainingMemory, allowedBytes.String())
}
}

View file

@ -28,9 +28,10 @@ type Item struct {
//
// The returned bytes representation belongs to data.
func (it Item) Bytes(data []byte) []byte {
n := int(it.End - it.Start)
sh := (*reflect.SliceHeader)(unsafe.Pointer(&data))
sh.Cap = int(it.End - it.Start)
sh.Len = int(it.End - it.Start)
sh.Cap = n
sh.Len = n
sh.Data += uintptr(it.Start)
return data
}
@ -48,8 +49,13 @@ func (it Item) String(data []byte) string {
func (ib *inmemoryBlock) Len() int { return len(ib.items) }
func (ib *inmemoryBlock) Less(i, j int) bool {
data := ib.data
items := ib.items
a := items[i]
b := items[j]
cpLen := uint32(len(ib.commonPrefix))
a.Start += cpLen
b.Start += cpLen
data := ib.data
return string(items[i].Bytes(data)) < string(items[j].Bytes(data))
}
@ -59,8 +65,14 @@ func (ib *inmemoryBlock) Swap(i, j int) {
}
type inmemoryBlock struct {
// commonPrefix contains common prefix for all the items stored in the inmemoryBlock
commonPrefix []byte
// data contains source data for items
data []byte
// items contains items stored in inmemoryBlock.
// Every item contains the prefix specified at commonPrefix.
items []Item
}
@ -74,17 +86,29 @@ func (ib *inmemoryBlock) Reset() {
ib.items = ib.items[:0]
}
func (ib *inmemoryBlock) updateCommonPrefix() {
func (ib *inmemoryBlock) updateCommonPrefixSorted() {
ib.commonPrefix = ib.commonPrefix[:0]
if len(ib.items) == 0 {
items := ib.items
if len(items) == 0 {
return
}
items := ib.items
data := ib.data
cp := items[0].Bytes(data)
if len(cp) == 0 {
if len(items) > 1 {
cpLen := commonPrefixLen(cp, items[len(items)-1].Bytes(data))
cp = cp[:cpLen]
}
ib.commonPrefix = append(ib.commonPrefix[:0], cp...)
}
func (ib *inmemoryBlock) updateCommonPrefixUnsorted() {
ib.commonPrefix = ib.commonPrefix[:0]
items := ib.items
if len(items) == 0 {
return
}
data := ib.data
cp := items[0].Bytes(data)
for _, it := range items[1:] {
cpLen := commonPrefixLen(cp, it.Bytes(data))
if cpLen == 0 {
@ -176,9 +200,11 @@ func (ib *inmemoryBlock) isSorted() bool {
// - returns the marshal type used for the encoding.
func (ib *inmemoryBlock) MarshalUnsortedData(sb *storageBlock, firstItemDst, commonPrefixDst []byte, compressLevel int) ([]byte, []byte, uint32, marshalType) {
if !ib.isSorted() {
ib.updateCommonPrefixUnsorted()
sort.Sort(ib)
} else {
ib.updateCommonPrefixSorted()
}
ib.updateCommonPrefix()
return ib.marshalData(sb, firstItemDst, commonPrefixDst, compressLevel)
}
@ -197,7 +223,7 @@ func (ib *inmemoryBlock) MarshalSortedData(sb *storageBlock, firstItemDst, commo
if isInTest && !ib.isSorted() {
logger.Panicf("BUG: %d items must be sorted; items:\n%s", len(ib.items), ib.debugItemsString())
}
ib.updateCommonPrefix()
ib.updateCommonPrefixSorted()
return ib.marshalData(sb, firstItemDst, commonPrefixDst, compressLevel)
}
@ -218,7 +244,7 @@ func (ib *inmemoryBlock) debugItemsString() string {
// Preconditions:
// - ib.items must be sorted.
// - updateCommonPrefix must be called.
// - updateCommonPrefix* must be called.
func (ib *inmemoryBlock) marshalData(sb *storageBlock, firstItemDst, commonPrefixDst []byte, compressLevel int) ([]byte, []byte, uint32, marshalType) {
if len(ib.items) <= 0 {
logger.Panicf("BUG: inmemoryBlock.marshalData must be called on non-empty blocks only")

View file

@ -125,9 +125,16 @@ type rawItemsShards struct {
// The number of shards for rawItems per table.
//
// Higher number of shards reduces CPU contention and increases the max bandwidth on multi-core systems.
var rawItemsShardsPerTable = cgroup.AvailableCPUs()
var rawItemsShardsPerTable = func() int {
cpus := cgroup.AvailableCPUs()
multiplier := cpus
if multiplier > 16 {
multiplier = 16
}
return (cpus*multiplier + 1) / 2
}()
const maxBlocksPerShard = 512
const maxBlocksPerShard = 256
func (riss *rawItemsShards) init() {
riss.shards = make([]rawItemsShard, rawItemsShardsPerTable)

View file

@ -45,16 +45,8 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error {
defer putStreamContext(ctx)
for ctx.Read() {
uw := getUnmarshalWork()
uw.callback = func(rows []Row) {
if err := callback(rows); err != nil {
ctx.callbackErrLock.Lock()
if ctx.callbackErr == nil {
ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err)
}
ctx.callbackErrLock.Unlock()
}
ctx.wg.Done()
}
uw.ctx = ctx
uw.callback = callback
uw.cds = cds
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
@ -153,18 +145,32 @@ var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct {
rows Rows
callback func(rows []Row)
ctx *streamContext
callback func(rows []Row) error
cds []ColumnDescriptor
reqBuf []byte
}
func (uw *unmarshalWork) reset() {
uw.rows.Reset()
uw.ctx = nil
uw.callback = nil
uw.cds = nil
uw.reqBuf = uw.reqBuf[:0]
}
func (uw *unmarshalWork) runCallback(rows []Row) {
ctx := uw.ctx
if err := uw.callback(rows); err != nil {
ctx.callbackErrLock.Lock()
if ctx.callbackErr == nil {
ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err)
}
ctx.callbackErrLock.Unlock()
}
ctx.wg.Done()
}
// Unmarshal implements common.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf), uw.cds)
@ -188,7 +194,7 @@ func (uw *unmarshalWork) Unmarshal() {
}
}
uw.callback(rows)
uw.runCallback(rows)
putUnmarshalWork(uw)
}

View file

@ -324,7 +324,8 @@ func Test_streamContext_Read(t *testing.T) {
}
uw := getUnmarshalWork()
callbackCalls := 0
uw.callback = func(rows []Row) {
uw.ctx = ctx
uw.callback = func(rows []Row) error {
callbackCalls++
if len(rows) != len(rowsExpected.Rows) {
t.Fatalf("different len of expected rows;\ngot\n%+v;\nwant\n%+v", rows, rowsExpected.Rows)
@ -332,8 +333,10 @@ func Test_streamContext_Read(t *testing.T) {
if !reflect.DeepEqual(rows, rowsExpected.Rows) {
t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows, rowsExpected.Rows)
}
return nil
}
uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf...)
ctx.wg.Add(1)
uw.Unmarshal()
if callbackCalls != 1 {
t.Fatalf("unexpected number of callback calls; got %d; want 1", callbackCalls)

View file

@ -31,16 +31,8 @@ func ParseStream(r io.Reader, callback func(rows []Row) error) error {
for ctx.Read() {
uw := getUnmarshalWork()
uw.callback = func(rows []Row) {
if err := callback(rows); err != nil {
ctx.callbackErrLock.Lock()
if ctx.callbackErr == nil {
ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err)
}
ctx.callbackErrLock.Unlock()
}
ctx.wg.Done()
}
uw.ctx = ctx
uw.callback = callback
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
common.ScheduleUnmarshalWork(uw)
@ -138,16 +130,30 @@ var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct {
rows Rows
callback func(rows []Row)
ctx *streamContext
callback func(rows []Row) error
reqBuf []byte
}
func (uw *unmarshalWork) reset() {
uw.rows.Reset()
uw.ctx = nil
uw.callback = nil
uw.reqBuf = uw.reqBuf[:0]
}
func (uw *unmarshalWork) runCallback(rows []Row) {
ctx := uw.ctx
if err := uw.callback(rows); err != nil {
ctx.callbackErrLock.Lock()
if ctx.callbackErr == nil {
ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err)
}
ctx.callbackErrLock.Unlock()
}
ctx.wg.Done()
}
// Unmarshal implements common.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf))
@ -176,7 +182,7 @@ func (uw *unmarshalWork) Unmarshal() {
}
}
uw.callback(rows)
uw.runCallback(rows)
putUnmarshalWork(uw)
}

View file

@ -56,16 +56,8 @@ func ParseStream(r io.Reader, isGzipped bool, precision, db string, callback fun
defer putStreamContext(ctx)
for ctx.Read() {
uw := getUnmarshalWork()
uw.callback = func(db string, rows []Row) {
if err := callback(db, rows); err != nil {
ctx.callbackErrLock.Lock()
if ctx.callbackErr == nil {
ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err)
}
ctx.callbackErrLock.Unlock()
}
ctx.wg.Done()
}
uw.ctx = ctx
uw.callback = callback
uw.db = db
uw.tsMultiplier = tsMultiplier
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
@ -165,7 +157,8 @@ var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct {
rows Rows
callback func(db string, rows []Row)
ctx *streamContext
callback func(db string, rows []Row) error
db string
tsMultiplier int64
reqBuf []byte
@ -173,12 +166,25 @@ type unmarshalWork struct {
func (uw *unmarshalWork) reset() {
uw.rows.Reset()
uw.ctx = nil
uw.callback = nil
uw.db = ""
uw.tsMultiplier = 0
uw.reqBuf = uw.reqBuf[:0]
}
func (uw *unmarshalWork) runCallback(rows []Row) {
ctx := uw.ctx
if err := uw.callback(uw.db, rows); err != nil {
ctx.callbackErrLock.Lock()
if ctx.callbackErr == nil {
ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err)
}
ctx.callbackErrLock.Unlock()
}
ctx.wg.Done()
}
// Unmarshal implements common.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf))
@ -225,7 +231,7 @@ func (uw *unmarshalWork) Unmarshal() {
}
}
uw.callback(uw.db, rows)
uw.runCallback(rows)
putUnmarshalWork(uw)
}

View file

@ -30,16 +30,8 @@ func ParseStream(r io.Reader, callback func(rows []Row) error) error {
defer putStreamContext(ctx)
for ctx.Read() {
uw := getUnmarshalWork()
uw.callback = func(rows []Row) {
if err := callback(rows); err != nil {
ctx.callbackErrLock.Lock()
if ctx.callbackErr == nil {
ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err)
}
ctx.callbackErrLock.Unlock()
}
ctx.wg.Done()
}
uw.ctx = ctx
uw.callback = callback
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
common.ScheduleUnmarshalWork(uw)
@ -137,16 +129,30 @@ var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct {
rows Rows
callback func(rows []Row)
ctx *streamContext
callback func(rows []Row) error
reqBuf []byte
}
func (uw *unmarshalWork) reset() {
uw.rows.Reset()
uw.ctx = nil
uw.callback = nil
uw.reqBuf = uw.reqBuf[:0]
}
func (uw *unmarshalWork) runCallback(rows []Row) {
ctx := uw.ctx
if err := uw.callback(rows); err != nil {
ctx.callbackErrLock.Lock()
if ctx.callbackErr == nil {
ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err)
}
ctx.callbackErrLock.Unlock()
}
ctx.wg.Done()
}
// Unmarshal implements common.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf))
@ -175,7 +181,7 @@ func (uw *unmarshalWork) Unmarshal() {
}
}
uw.callback(rows)
uw.runCallback(rows)
putUnmarshalWork(uw)
}

View file

@ -32,16 +32,8 @@ func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback f
for ctx.Read() {
uw := getUnmarshalWork()
uw.errLogger = errLogger
uw.callback = func(rows []Row) {
if err := callback(rows); err != nil {
ctx.callbackErrLock.Lock()
if ctx.callbackErr == nil {
ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err)
}
ctx.callbackErrLock.Unlock()
}
ctx.wg.Done()
}
uw.ctx = ctx
uw.callback = callback
uw.defaultTimestamp = defaultTimestamp
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
@ -140,7 +132,8 @@ var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct {
rows Rows
callback func(rows []Row)
ctx *streamContext
callback func(rows []Row) error
errLogger func(string)
defaultTimestamp int64
reqBuf []byte
@ -148,12 +141,25 @@ type unmarshalWork struct {
func (uw *unmarshalWork) reset() {
uw.rows.Reset()
uw.ctx = nil
uw.callback = nil
uw.errLogger = nil
uw.defaultTimestamp = 0
uw.reqBuf = uw.reqBuf[:0]
}
func (uw *unmarshalWork) runCallback(rows []Row) {
ctx := uw.ctx
if err := uw.callback(rows); err != nil {
ctx.callbackErrLock.Lock()
if ctx.callbackErr == nil {
ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err)
}
ctx.callbackErrLock.Unlock()
}
ctx.wg.Done()
}
// Unmarshal implements common.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
if uw.errLogger != nil {
@ -176,7 +182,7 @@ func (uw *unmarshalWork) Unmarshal() {
}
}
uw.callback(rows)
uw.runCallback(rows)
putUnmarshalWork(uw)
}

View file

@ -34,16 +34,8 @@ func ParseStream(r io.Reader, isGzipped bool, callback func(rows []Row) error) e
defer putStreamContext(ctx)
for ctx.Read() {
uw := getUnmarshalWork()
uw.callback = func(rows []Row) {
if err := callback(rows); err != nil {
ctx.callbackErrLock.Lock()
if ctx.callbackErr == nil {
ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err)
}
ctx.callbackErrLock.Unlock()
}
ctx.wg.Done()
}
uw.ctx = ctx
uw.callback = callback
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
common.ScheduleUnmarshalWork(uw)
@ -141,16 +133,30 @@ var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct {
rows Rows
callback func(rows []Row)
ctx *streamContext
callback func(rows []Row) error
reqBuf []byte
}
func (uw *unmarshalWork) reset() {
uw.rows.Reset()
uw.ctx = nil
uw.callback = nil
uw.reqBuf = uw.reqBuf[:0]
}
func (uw *unmarshalWork) runCallback(rows []Row) {
ctx := uw.ctx
if err := uw.callback(rows); err != nil {
ctx.callbackErrLock.Lock()
if ctx.callbackErr == nil {
ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err)
}
ctx.callbackErrLock.Unlock()
}
ctx.wg.Done()
}
// Unmarshal implements common.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf))
@ -159,7 +165,7 @@ func (uw *unmarshalWork) Unmarshal() {
row := &rows[i]
rowsRead.Add(len(row.Timestamps))
}
uw.callback(rows)
uw.runCallback(rows)
putUnmarshalWork(uw)
}

View file

@ -724,7 +724,7 @@ func (is *indexSearch) searchTagKeysOnTimeRange(tks map[string]struct{}, tr Time
return is.searchTagKeys(tks, maxTagKeys)
}
var mu sync.Mutex
var wg sync.WaitGroup
wg := getWaitGroup()
var errGlobal error
for date := minDate; date <= maxDate; date++ {
wg.Add(1)
@ -752,6 +752,7 @@ func (is *indexSearch) searchTagKeysOnTimeRange(tks map[string]struct{}, tr Time
}(date)
}
wg.Wait()
putWaitGroup(wg)
return errGlobal
}
@ -926,7 +927,7 @@ func (is *indexSearch) searchTagValuesOnTimeRange(tvs map[string]struct{}, tagKe
return is.searchTagValues(tvs, tagKey, maxTagValues)
}
var mu sync.Mutex
var wg sync.WaitGroup
wg := getWaitGroup()
var errGlobal error
for date := minDate; date <= maxDate; date++ {
wg.Add(1)
@ -954,6 +955,7 @@ func (is *indexSearch) searchTagValuesOnTimeRange(tvs map[string]struct{}, tagKe
}(date)
}
wg.Wait()
putWaitGroup(wg)
return errGlobal
}
@ -1141,7 +1143,7 @@ func (is *indexSearch) searchTagValueSuffixesForTimeRange(tvss map[string]struct
return is.searchTagValueSuffixesAll(tvss, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes)
}
// Query over multiple days in parallel.
var wg sync.WaitGroup
wg := getWaitGroup()
var errGlobal error
var mu sync.Mutex // protects tvss + errGlobal from concurrent access below.
for minDate <= maxDate {
@ -1171,6 +1173,7 @@ func (is *indexSearch) searchTagValueSuffixesForTimeRange(tvss map[string]struct
minDate++
}
wg.Wait()
putWaitGroup(wg)
return errGlobal
}
@ -2446,7 +2449,7 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set
}
// Slower path - search for metricIDs for each day in parallel.
var wg sync.WaitGroup
wg := getWaitGroup()
var errGlobal error
var mu sync.Mutex // protects metricIDs + errGlobal vars from concurrent access below
for minDate <= maxDate {
@ -2473,6 +2476,7 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set
minDate++
}
wg.Wait()
putWaitGroup(wg)
if errGlobal != nil {
return errGlobal
}

View file

@ -65,7 +65,7 @@ const finalPartsToMerge = 3
// The number of shards for rawRow entries per partition.
//
// Higher number of shards reduces CPU contention and increases the max bandwidth on multi-core systems.
var rawRowsShardsPerPartition = (cgroup.AvailableCPUs() + 7) / 8
var rawRowsShardsPerPartition = (cgroup.AvailableCPUs() + 3) / 4
// getMaxRawRowsPerShard returns the maximum number of rows that haven't been converted into parts yet.
func getMaxRawRowsPerShard() int {
@ -481,7 +481,7 @@ func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) {
func (pt *partition) flushRowsToParts(rows []rawRow) {
maxRows := getMaxRawRowsPerShard()
var wg sync.WaitGroup
wg := getWaitGroup()
for len(rows) > 0 {
n := maxRows
if n > len(rows) {
@ -495,8 +495,23 @@ func (pt *partition) flushRowsToParts(rows []rawRow) {
rows = rows[n:]
}
wg.Wait()
putWaitGroup(wg)
}
func getWaitGroup() *sync.WaitGroup {
v := wgPool.Get()
if v == nil {
return &sync.WaitGroup{}
}
return v.(*sync.WaitGroup)
}
func putWaitGroup(wg *sync.WaitGroup) {
wgPool.Put(wg)
}
var wgPool sync.Pool
func (pt *partition) addRowsPart(rows []rawRow) {
if len(rows) == 0 {
return

View file

@ -166,10 +166,11 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1447 for details.
if fs.IsPathExist(s.cachePath + "/reset_cache_on_startup") {
logger.Infof("removing cache directory at %q, since it contains `reset_cache_on_startup` file...", s.cachePath)
var wg sync.WaitGroup
wg := getWaitGroup()
wg.Add(1)
fs.MustRemoveAllWithDoneCallback(s.cachePath, wg.Done)
wg.Wait()
putWaitGroup(wg)
logger.Infof("cache directory at %q has been successfully removed", s.cachePath)
}

View file

@ -144,17 +144,12 @@ func (c *Cache) expirationWatcher(expireDuration time.Duration) {
c.mu.Unlock()
return
}
// Expire prev cache and create fresh curr cache with the same capacity.
// Do not reuse prev cache, since it can occupy too big amounts of memory.
// Reset prev cache and swap it with the curr cache.
prev := c.prev.Load().(*fastcache.Cache)
prev.Reset()
curr := c.curr.Load().(*fastcache.Cache)
c.prev.Store(curr)
// Use c.maxBytes/2 instead of cs.MaxBytesSize for creating new cache,
// since cs.MaxBytesSize may not match c.maxBytes/2, so the created cache
// couldn't be loaded from file with c.maxBytes/2 limit after saving with cs.MaxBytesSize size.
curr = fastcache.New(c.maxBytes / 2)
c.curr.Store(curr)
prev.Reset()
c.curr.Store(prev)
c.mu.Unlock()
}
}
@ -197,9 +192,9 @@ func (c *Cache) cacheSizeWatcher() {
c.mu.Lock()
c.setMode(switching)
prev := c.prev.Load().(*fastcache.Cache)
prev.Reset()
curr := c.curr.Load().(*fastcache.Cache)
c.prev.Store(curr)
prev.Reset()
// use c.maxBytes instead of maxBytesSize*2 for creating new cache, since otherwise the created cache
// couldn't be loaded from file with c.maxBytes limit after saving with maxBytesSize*2 limit.
c.curr.Store(fastcache.New(c.maxBytes))
@ -222,8 +217,8 @@ func (c *Cache) cacheSizeWatcher() {
c.mu.Lock()
c.setMode(whole)
prev = c.prev.Load().(*fastcache.Cache)
prev.Reset()
c.prev.Store(fastcache.New(1024))
prev.Reset()
c.mu.Unlock()
}

View file

@ -257,10 +257,7 @@ func (b *bucket) Reset() {
putChunk(chunks[i])
chunks[i] = nil
}
bm := b.m
for k := range bm {
delete(bm, k)
}
b.m = make(map[uint64]uint64)
b.idx = 0
b.gen = 1
atomic.StoreUint64(&b.getCalls, 0)

View file

@ -28,6 +28,7 @@ func Lstat(path string, stat *Stat_t) (err error) {
return Fstatat(AT_FDCWD, path, stat, AT_SYMLINK_NOFOLLOW)
}
//sys MemfdSecret(flags int) (fd int, err error)
//sys Pause() (err error)
//sys pread(fd int, p []byte, offset int64) (n int, err error) = SYS_PREAD64
//sys pwrite(fd int, p []byte, offset int64) (n int, err error) = SYS_PWRITE64

View file

@ -22,6 +22,7 @@ import "unsafe"
//sysnb getrlimit(resource int, rlim *Rlimit) (err error)
//sysnb Getuid() (uid int)
//sys Listen(s int, n int) (err error)
//sys MemfdSecret(flags int) (fd int, err error)
//sys pread(fd int, p []byte, offset int64) (n int, err error) = SYS_PREAD64
//sys pwrite(fd int, p []byte, offset int64) (n int, err error) = SYS_PWRITE64
//sys Renameat(olddirfd int, oldpath string, newdirfd int, newpath string) (err error)

View file

@ -22,6 +22,7 @@ import "unsafe"
//sysnb Getrlimit(resource int, rlim *Rlimit) (err error)
//sysnb Getuid() (uid int)
//sys Listen(s int, n int) (err error)
//sys MemfdSecret(flags int) (fd int, err error)
//sys pread(fd int, p []byte, offset int64) (n int, err error) = SYS_PREAD64
//sys pwrite(fd int, p []byte, offset int64) (n int, err error) = SYS_PWRITE64
//sys Seek(fd int, offset int64, whence int) (off int64, err error) = SYS_LSEEK

View file

@ -215,6 +215,17 @@ func Listen(s int, n int) (err error) {
// THIS FILE IS GENERATED BY THE COMMAND AT THE TOP; DO NOT EDIT
func MemfdSecret(flags int) (fd int, err error) {
r0, _, e1 := Syscall(SYS_MEMFD_SECRET, uintptr(flags), 0, 0)
fd = int(r0)
if e1 != 0 {
err = errnoErr(e1)
}
return
}
// THIS FILE IS GENERATED BY THE COMMAND AT THE TOP; DO NOT EDIT
func Pause() (err error) {
_, _, e1 := Syscall(SYS_PAUSE, 0, 0, 0)
if e1 != 0 {

View file

@ -180,6 +180,17 @@ func Listen(s int, n int) (err error) {
// THIS FILE IS GENERATED BY THE COMMAND AT THE TOP; DO NOT EDIT
func MemfdSecret(flags int) (fd int, err error) {
r0, _, e1 := Syscall(SYS_MEMFD_SECRET, uintptr(flags), 0, 0)
fd = int(r0)
if e1 != 0 {
err = errnoErr(e1)
}
return
}
// THIS FILE IS GENERATED BY THE COMMAND AT THE TOP; DO NOT EDIT
func pread(fd int, p []byte, offset int64) (n int, err error) {
var _p0 unsafe.Pointer
if len(p) > 0 {

View file

@ -180,6 +180,17 @@ func Listen(s int, n int) (err error) {
// THIS FILE IS GENERATED BY THE COMMAND AT THE TOP; DO NOT EDIT
func MemfdSecret(flags int) (fd int, err error) {
r0, _, e1 := Syscall(SYS_MEMFD_SECRET, uintptr(flags), 0, 0)
fd = int(r0)
if e1 != 0 {
err = errnoErr(e1)
}
return
}
// THIS FILE IS GENERATED BY THE COMMAND AT THE TOP; DO NOT EDIT
func pread(fd int, p []byte, offset int64) (n int, err error) {
var _p0 unsafe.Pointer
if len(p) > 0 {

4
vendor/modules.txt vendored
View file

@ -16,7 +16,7 @@ cloud.google.com/go/iam
cloud.google.com/go/storage
cloud.google.com/go/storage/internal
cloud.google.com/go/storage/internal/apiv2
# github.com/VictoriaMetrics/fastcache v1.9.0
# github.com/VictoriaMetrics/fastcache v1.10.0
## explicit; go 1.13
github.com/VictoriaMetrics/fastcache
# github.com/VictoriaMetrics/fasthttp v1.1.0
@ -290,7 +290,7 @@ golang.org/x/oauth2/jwt
# golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
## explicit
golang.org/x/sync/errgroup
# golang.org/x/sys v0.0.0-20220403205710-6acee93ad0eb
# golang.org/x/sys v0.0.0-20220405052023-b1e9470b6e64
## explicit; go 1.17
golang.org/x/sys/internal/unsafeheader
golang.org/x/sys/unix