Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files

This commit is contained in:
Aliaksandr Valialkin 2021-04-30 10:11:24 +03:00
commit bceb8082f6
42 changed files with 1754 additions and 402 deletions

View file

@ -3,10 +3,8 @@ package main
import ( import (
"flag" "flag"
"fmt" "fmt"
"io"
"net/http" "net/http"
"os" "os"
"path"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert"
@ -95,15 +93,21 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
if r.Method != "GET" { if r.Method != "GET" {
return false return false
} }
fmt.Fprintf(w, "<h2>Single-node VictoriaMetrics.</h2></br>") fmt.Fprintf(w, "<h2>Single-node VictoriaMetrics</h2></br>")
fmt.Fprintf(w, "See docs at <a href='https://docs.victoriametrics.com/'>https://docs.victoriametrics.com/</a></br>") fmt.Fprintf(w, "See docs at <a href='https://docs.victoriametrics.com/'>https://docs.victoriametrics.com/</a></br>")
fmt.Fprintf(w, "Useful endpoints: </br>") fmt.Fprintf(w, "Useful endpoints:</br>")
writeAPIHelp(w, [][]string{ httpserver.WriteAPIHelp(w, [][2]string{
{"/targets", "discovered targets list"}, {"/targets", "discovered targets list"},
{"/api/v1/targets", "advanced information about discovered targets in JSON format"}, {"/api/v1/targets", "advanced information about discovered targets in JSON format"},
{"/metrics", "available service metrics"}, {"/metrics", "available service metrics"},
{"/api/v1/status/tsdb", "tsdb status page"}, {"/api/v1/status/tsdb", "tsdb status page"},
{"/api/v1/status/top_queries", "top queries"},
{"/api/v1/status/active_queries", "active queries"},
}) })
for _, p := range customAPIPathList {
p, doc := p[0], p[1]
fmt.Fprintf(w, "<a href=%q>%s</a> - %s<br/>", p, p, doc)
}
return true return true
} }
if vminsert.RequestHandler(w, r) { if vminsert.RequestHandler(w, r) {
@ -118,19 +122,6 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
return false return false
} }
func writeAPIHelp(w io.Writer, pathList [][]string) {
pathPrefix := httpserver.GetPathPrefix()
for _, p := range pathList {
p, doc := p[0], p[1]
p = path.Join(pathPrefix, p)
fmt.Fprintf(w, "<a href='%s'>%q</a> - %s<br/>", p, p, doc)
}
for _, p := range customAPIPathList {
p, doc := p[0], p[1]
fmt.Fprintf(w, "<a href='%s'>%q</a> - %s<br/>", p, p, doc)
}
}
func usage() { func usage() {
const s = ` const s = `
victoria-metrics is a time series database and monitoring solution. victoria-metrics is a time series database and monitoring solution.

View file

@ -148,7 +148,15 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
if r.Method != "GET" { if r.Method != "GET" {
return false return false
} }
fmt.Fprintf(w, "vmagent - see docs at https://docs.victoriametrics.com/vmagent.html") fmt.Fprintf(w, "<h2>vmagent</h2>")
fmt.Fprintf(w, "See docs at <a href='https://docs.victoriametrics.com/vmagent.html'>https://docs.victoriametrics.com/vmagent.html</a></br>")
fmt.Fprintf(w, "Useful endpoints:</br>")
httpserver.WriteAPIHelp(w, [][2]string{
{"/targets", "discovered targets list"},
{"/api/v1/targets", "advanced information about discovered targets in JSON format"},
{"/metrics", "available service metrics"},
{"/-/reload", "reload configuration"},
})
return true return true
} }
path := strings.Replace(r.URL.Path, "//", "/", -1) path := strings.Replace(r.URL.Path, "//", "/", -1)

View file

@ -29,6 +29,8 @@ type AlertingRule struct {
GroupID uint64 GroupID uint64
GroupName string GroupName string
q datasource.Querier
// guard status fields // guard status fields
mu sync.RWMutex mu sync.RWMutex
// stores list of active alerts // stores list of active alerts
@ -49,7 +51,7 @@ type alertingRuleMetrics struct {
active *gauge active *gauge
} }
func newAlertingRule(group *Group, cfg config.Rule) *AlertingRule { func newAlertingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rule) *AlertingRule {
ar := &AlertingRule{ ar := &AlertingRule{
Type: cfg.Type, Type: cfg.Type,
RuleID: cfg.ID, RuleID: cfg.ID,
@ -60,8 +62,12 @@ func newAlertingRule(group *Group, cfg config.Rule) *AlertingRule {
Annotations: cfg.Annotations, Annotations: cfg.Annotations,
GroupID: group.ID(), GroupID: group.ID(),
GroupName: group.Name, GroupName: group.Name,
alerts: make(map[uint64]*notifier.Alert), q: qb.BuildWithParams(datasource.QuerierParams{
metrics: &alertingRuleMetrics{}, DataSourceType: &cfg.Type,
EvaluationInterval: group.Interval,
}),
alerts: make(map[uint64]*notifier.Alert),
metrics: &alertingRuleMetrics{},
} }
labels := fmt.Sprintf(`alertname=%q, group=%q, id="%d"`, ar.Name, group.Name, ar.ID()) labels := fmt.Sprintf(`alertname=%q, group=%q, id="%d"`, ar.Name, group.Name, ar.ID())
@ -121,8 +127,8 @@ func (ar *AlertingRule) ID() uint64 {
// Exec executes AlertingRule expression via the given Querier. // Exec executes AlertingRule expression via the given Querier.
// Based on the Querier results AlertingRule maintains notifier.Alerts // Based on the Querier results AlertingRule maintains notifier.Alerts
func (ar *AlertingRule) Exec(ctx context.Context, q datasource.Querier, series bool) ([]prompbmarshal.TimeSeries, error) { func (ar *AlertingRule) Exec(ctx context.Context, series bool) ([]prompbmarshal.TimeSeries, error) {
qMetrics, err := q.Query(ctx, ar.Expr, ar.Type) qMetrics, err := ar.q.Query(ctx, ar.Expr)
ar.mu.Lock() ar.mu.Lock()
defer ar.mu.Unlock() defer ar.mu.Unlock()
@ -139,7 +145,7 @@ func (ar *AlertingRule) Exec(ctx context.Context, q datasource.Querier, series b
} }
} }
qFn := func(query string) ([]datasource.Metric, error) { return q.Query(ctx, query, ar.Type) } qFn := func(query string) ([]datasource.Metric, error) { return ar.q.Query(ctx, query) }
updated := make(map[uint64]struct{}) updated := make(map[uint64]struct{})
// update list of active alerts // update list of active alerts
for _, m := range qMetrics { for _, m := range qMetrics {
@ -407,7 +413,7 @@ func (ar *AlertingRule) Restore(ctx context.Context, q datasource.Querier, lookb
return fmt.Errorf("querier is nil") return fmt.Errorf("querier is nil")
} }
qFn := func(query string) ([]datasource.Metric, error) { return q.Query(ctx, query, ar.Type) } qFn := func(query string) ([]datasource.Metric, error) { return ar.q.Query(ctx, query) }
// account for external labels in filter // account for external labels in filter
var labelsFilter string var labelsFilter string
@ -420,7 +426,7 @@ func (ar *AlertingRule) Restore(ctx context.Context, q datasource.Querier, lookb
// remote write protocol which is used for state persistence in vmalert. // remote write protocol which is used for state persistence in vmalert.
expr := fmt.Sprintf("last_over_time(%s{alertname=%q%s}[%ds])", expr := fmt.Sprintf("last_over_time(%s{alertname=%q%s}[%ds])",
alertForStateMetricName, ar.Name, labelsFilter, int(lookback.Seconds())) alertForStateMetricName, ar.Name, labelsFilter, int(lookback.Seconds()))
qMetrics, err := q.Query(ctx, expr, ar.Type) qMetrics, err := q.Query(ctx, expr)
if err != nil { if err != nil {
return err return err
} }

View file

@ -294,11 +294,12 @@ func TestAlertingRule_Exec(t *testing.T) {
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.rule.Name, func(t *testing.T) { t.Run(tc.rule.Name, func(t *testing.T) {
fq := &fakeQuerier{} fq := &fakeQuerier{}
tc.rule.q = fq
tc.rule.GroupID = fakeGroup.ID() tc.rule.GroupID = fakeGroup.ID()
for _, step := range tc.steps { for _, step := range tc.steps {
fq.reset() fq.reset()
fq.add(step...) fq.add(step...)
if _, err := tc.rule.Exec(context.TODO(), fq, false); err != nil { if _, err := tc.rule.Exec(context.TODO(), false); err != nil {
t.Fatalf("unexpected err: %s", err) t.Fatalf("unexpected err: %s", err)
} }
// artificial delay between applying steps // artificial delay between applying steps
@ -410,6 +411,7 @@ func TestAlertingRule_Restore(t *testing.T) {
t.Run(tc.rule.Name, func(t *testing.T) { t.Run(tc.rule.Name, func(t *testing.T) {
fq := &fakeQuerier{} fq := &fakeQuerier{}
tc.rule.GroupID = fakeGroup.ID() tc.rule.GroupID = fakeGroup.ID()
tc.rule.q = fq
fq.add(tc.metrics...) fq.add(tc.metrics...)
if err := tc.rule.Restore(context.TODO(), fq, time.Hour, nil); err != nil { if err := tc.rule.Restore(context.TODO(), fq, time.Hour, nil); err != nil {
t.Fatalf("unexpected err: %s", err) t.Fatalf("unexpected err: %s", err)
@ -437,17 +439,18 @@ func TestAlertingRule_Exec_Negative(t *testing.T) {
fq := &fakeQuerier{} fq := &fakeQuerier{}
ar := newTestAlertingRule("test", 0) ar := newTestAlertingRule("test", 0)
ar.Labels = map[string]string{"job": "test"} ar.Labels = map[string]string{"job": "test"}
ar.q = fq
// successful attempt // successful attempt
fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "bar")) fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "bar"))
_, err := ar.Exec(context.TODO(), fq, false) _, err := ar.Exec(context.TODO(), false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// label `job` will collide with rule extra label and will make both time series equal // label `job` will collide with rule extra label and will make both time series equal
fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "baz")) fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "baz"))
_, err = ar.Exec(context.TODO(), fq, false) _, err = ar.Exec(context.TODO(), false)
if !errors.Is(err, errDuplicate) { if !errors.Is(err, errDuplicate) {
t.Fatalf("expected to have %s error; got %s", errDuplicate, err) t.Fatalf("expected to have %s error; got %s", errDuplicate, err)
} }
@ -456,7 +459,7 @@ func TestAlertingRule_Exec_Negative(t *testing.T) {
expErr := "connection reset by peer" expErr := "connection reset by peer"
fq.setErr(errors.New(expErr)) fq.setErr(errors.New(expErr))
_, err = ar.Exec(context.TODO(), fq, false) _, err = ar.Exec(context.TODO(), false)
if err == nil { if err == nil {
t.Fatalf("expected to get err; got nil") t.Fatalf("expected to get err; got nil")
} }
@ -544,8 +547,9 @@ func TestAlertingRule_Template(t *testing.T) {
t.Run(tc.rule.Name, func(t *testing.T) { t.Run(tc.rule.Name, func(t *testing.T) {
fq := &fakeQuerier{} fq := &fakeQuerier{}
tc.rule.GroupID = fakeGroup.ID() tc.rule.GroupID = fakeGroup.ID()
tc.rule.q = fq
fq.add(tc.metrics...) fq.add(tc.metrics...)
if _, err := tc.rule.Exec(context.TODO(), fq, false); err != nil { if _, err := tc.rule.Exec(context.TODO(), false); err != nil {
t.Fatalf("unexpected err: %s", err) t.Fatalf("unexpected err: %s", err)
} }
for hash, expAlert := range tc.expAlerts { for hash, expAlert := range tc.expAlerts {

View file

@ -4,11 +4,16 @@ import (
"context" "context"
) )
// QuerierBuilder builds Querier with given params.
type QuerierBuilder interface {
BuildWithParams(params QuerierParams) Querier
}
// Querier interface wraps Query method which // Querier interface wraps Query method which
// executes given query and returns list of Metrics // executes given query and returns list of Metrics
// as result // as result
type Querier interface { type Querier interface {
Query(ctx context.Context, query string, engine Type) ([]Metric, error) Query(ctx context.Context, query string) ([]Metric, error)
} }
// Metric is the basic entity which should be return by datasource // Metric is the basic entity which should be return by datasource

View file

@ -23,12 +23,13 @@ var (
lookBack = flag.Duration("datasource.lookback", 0, `Lookback defines how far into the past to look when evaluating queries. For example, if the datasource.lookback=5m then param "time" with value now()-5m will be added to every query.`) lookBack = flag.Duration("datasource.lookback", 0, `Lookback defines how far into the past to look when evaluating queries. For example, if the datasource.lookback=5m then param "time" with value now()-5m will be added to every query.`)
queryStep = flag.Duration("datasource.queryStep", 0, "queryStep defines how far a value can fallback to when evaluating queries. "+ queryStep = flag.Duration("datasource.queryStep", 0, "queryStep defines how far a value can fallback to when evaluating queries. "+
"For example, if datasource.queryStep=15s then param \"step\" with value \"15s\" will be added to every query.") "For example, if datasource.queryStep=15s then param \"step\" with value \"15s\" will be added to every query."+
"If queryStep isn't specified, rule's evaluationInterval will be used instead.")
maxIdleConnections = flag.Int("datasource.maxIdleConnections", 100, `Defines the number of idle (keep-alive connections) to each configured datasource. Consider setting this value equal to the value: groups_total * group.concurrency. Too low a value may result in a high number of sockets in TIME_WAIT state.`) maxIdleConnections = flag.Int("datasource.maxIdleConnections", 100, `Defines the number of idle (keep-alive connections) to each configured datasource. Consider setting this value equal to the value: groups_total * group.concurrency. Too low a value may result in a high number of sockets in TIME_WAIT state.`)
) )
// Init creates a Querier from provided flag values. // Init creates a Querier from provided flag values.
func Init() (Querier, error) { func Init() (QuerierBuilder, error) {
if *addr == "" { if *addr == "" {
return nil, fmt.Errorf("datasource.url is empty") return nil, fmt.Errorf("datasource.url is empty")
} }

View file

@ -81,6 +81,9 @@ type VMStorage struct {
appendTypePrefix bool appendTypePrefix bool
lookBack time.Duration lookBack time.Duration
queryStep time.Duration queryStep time.Duration
dataSourceType Type
evaluationInterval time.Duration
} }
const queryPath = "/api/v1/query" const queryPath = "/api/v1/query"
@ -89,6 +92,40 @@ const graphitePath = "/render"
const prometheusPrefix = "/prometheus" const prometheusPrefix = "/prometheus"
const graphitePrefix = "/graphite" const graphitePrefix = "/graphite"
// QuerierParams params for Querier.
type QuerierParams struct {
DataSourceType *Type
EvaluationInterval time.Duration
}
// Clone makes clone of VMStorage, shares http client.
func (s *VMStorage) Clone() *VMStorage {
return &VMStorage{
c: s.c,
datasourceURL: s.datasourceURL,
basicAuthUser: s.basicAuthUser,
basicAuthPass: s.basicAuthPass,
lookBack: s.lookBack,
queryStep: s.queryStep,
appendTypePrefix: s.appendTypePrefix,
dataSourceType: s.dataSourceType,
}
}
// ApplyParams - changes given querier params.
func (s *VMStorage) ApplyParams(params QuerierParams) *VMStorage {
if params.DataSourceType != nil {
s.dataSourceType = *params.DataSourceType
s.evaluationInterval = params.EvaluationInterval
}
return s
}
// BuildWithParams - implements interface.
func (s *VMStorage) BuildWithParams(params QuerierParams) Querier {
return s.Clone().ApplyParams(params)
}
// NewVMStorage is a constructor for VMStorage // NewVMStorage is a constructor for VMStorage
func NewVMStorage(baseURL, basicAuthUser, basicAuthPass string, lookBack time.Duration, queryStep time.Duration, appendTypePrefix bool, c *http.Client) *VMStorage { func NewVMStorage(baseURL, basicAuthUser, basicAuthPass string, lookBack time.Duration, queryStep time.Duration, appendTypePrefix bool, c *http.Client) *VMStorage {
return &VMStorage{ return &VMStorage{
@ -99,27 +136,33 @@ func NewVMStorage(baseURL, basicAuthUser, basicAuthPass string, lookBack time.Du
appendTypePrefix: appendTypePrefix, appendTypePrefix: appendTypePrefix,
lookBack: lookBack, lookBack: lookBack,
queryStep: queryStep, queryStep: queryStep,
dataSourceType: NewPrometheusType(),
} }
} }
// Query reads metrics from datasource by given query and type // Query executes the given query and returns parsed response
func (s *VMStorage) Query(ctx context.Context, query string, dataSourceType Type) ([]Metric, error) { func (s *VMStorage) Query(ctx context.Context, query string) ([]Metric, error) {
switch dataSourceType.name { req, err := s.prepareReq(query, time.Now())
case "", prometheusType: if err != nil {
return s.queryDataSource(ctx, query, s.setPrometheusReqParams, parsePrometheusResponse) return nil, err
case graphiteType:
return s.queryDataSource(ctx, query, s.setGraphiteReqParams, parseGraphiteResponse)
default:
return nil, fmt.Errorf("engine not found: %q", dataSourceType)
} }
resp, err := s.do(ctx, req)
if err != nil {
return nil, err
}
defer func() {
_ = resp.Body.Close()
}()
parseFn := parsePrometheusResponse
if s.dataSourceType.name != prometheusType {
parseFn = parseGraphiteResponse
}
return parseFn(req, resp)
} }
func (s *VMStorage) queryDataSource( func (s *VMStorage) prepareReq(query string, timestamp time.Time) (*http.Request, error) {
ctx context.Context,
query string,
setReqParams func(r *http.Request, query string),
processResponse func(r *http.Request, resp *http.Response,
) ([]Metric, error)) ([]Metric, error) {
req, err := http.NewRequest("POST", s.datasourceURL, nil) req, err := http.NewRequest("POST", s.datasourceURL, nil)
if err != nil { if err != nil {
return nil, err return nil, err
@ -128,20 +171,32 @@ func (s *VMStorage) queryDataSource(
if s.basicAuthPass != "" { if s.basicAuthPass != "" {
req.SetBasicAuth(s.basicAuthUser, s.basicAuthPass) req.SetBasicAuth(s.basicAuthUser, s.basicAuthPass)
} }
setReqParams(req, query)
switch s.dataSourceType.name {
case "", prometheusType:
s.setPrometheusReqParams(req, query, timestamp)
case graphiteType:
s.setGraphiteReqParams(req, query, timestamp)
default:
return nil, fmt.Errorf("engine not found: %q", s.dataSourceType.name)
}
return req, nil
}
func (s *VMStorage) do(ctx context.Context, req *http.Request) (*http.Response, error) {
resp, err := s.c.Do(req.WithContext(ctx)) resp, err := s.c.Do(req.WithContext(ctx))
if err != nil { if err != nil {
return nil, fmt.Errorf("error getting response from %s: %w", req.URL, err) return nil, fmt.Errorf("error getting response from %s: %w", req.URL, err)
} }
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
body, _ := ioutil.ReadAll(resp.Body) body, _ := ioutil.ReadAll(resp.Body)
return nil, fmt.Errorf("datasource returns unexpected response code %d for %s. Response body %s", resp.StatusCode, req.URL, body) _ = resp.Body.Close()
return nil, fmt.Errorf("unexpected response code %d for %s. Response body %s", resp.StatusCode, req.URL, body)
} }
return processResponse(req, resp) return resp, nil
} }
func (s *VMStorage) setPrometheusReqParams(r *http.Request, query string) { func (s *VMStorage) setPrometheusReqParams(r *http.Request, query string, timestamp time.Time) {
if s.appendTypePrefix { if s.appendTypePrefix {
r.URL.Path += prometheusPrefix r.URL.Path += prometheusPrefix
} }
@ -149,16 +204,24 @@ func (s *VMStorage) setPrometheusReqParams(r *http.Request, query string) {
q := r.URL.Query() q := r.URL.Query()
q.Set("query", query) q.Set("query", query)
if s.lookBack > 0 { if s.lookBack > 0 {
lookBack := time.Now().Add(-s.lookBack) timestamp = timestamp.Add(-s.lookBack)
q.Set("time", fmt.Sprintf("%d", lookBack.Unix()))
} }
if s.evaluationInterval > 0 {
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1232
timestamp = timestamp.Truncate(s.evaluationInterval)
// set step as evaluationInterval by default
q.Set("step", s.evaluationInterval.String())
}
q.Set("time", fmt.Sprintf("%d", timestamp.Unix()))
if s.queryStep > 0 { if s.queryStep > 0 {
// override step with user-specified value
q.Set("step", s.queryStep.String()) q.Set("step", s.queryStep.String())
} }
r.URL.RawQuery = q.Encode() r.URL.RawQuery = q.Encode()
} }
func (s *VMStorage) setGraphiteReqParams(r *http.Request, query string) { func (s *VMStorage) setGraphiteReqParams(r *http.Request, query string, timestamp time.Time) {
if s.appendTypePrefix { if s.appendTypePrefix {
r.URL.Path += graphitePrefix r.URL.Path += graphitePrefix
} }
@ -168,7 +231,7 @@ func (s *VMStorage) setGraphiteReqParams(r *http.Request, query string) {
q.Set("target", query) q.Set("target", query)
from := "-5min" from := "-5min"
if s.lookBack > 0 { if s.lookBack > 0 {
lookBack := time.Now().Add(-s.lookBack) lookBack := timestamp.Add(-s.lookBack)
from = strconv.FormatInt(lookBack.Unix(), 10) from = strconv.FormatInt(lookBack.Unix(), 10)
} }
q.Set("from", from) q.Set("from", from)

View file

@ -2,8 +2,10 @@ package datasource
import ( import (
"context" "context"
"fmt"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"reflect"
"strconv" "strconv"
"testing" "testing"
"time" "time"
@ -69,26 +71,31 @@ func TestVMSelectQuery(t *testing.T) {
srv := httptest.NewServer(mux) srv := httptest.NewServer(mux)
defer srv.Close() defer srv.Close()
am := NewVMStorage(srv.URL, basicAuthName, basicAuthPass, time.Minute, 0, false, srv.Client())
if _, err := am.Query(ctx, query, NewPrometheusType()); err == nil { s := NewVMStorage(srv.URL, basicAuthName, basicAuthPass, time.Minute, 0, false, srv.Client())
p := NewPrometheusType()
pq := s.BuildWithParams(QuerierParams{DataSourceType: &p, EvaluationInterval: 15 * time.Second})
if _, err := pq.Query(ctx, query); err == nil {
t.Fatalf("expected connection error got nil") t.Fatalf("expected connection error got nil")
} }
if _, err := am.Query(ctx, query, NewPrometheusType()); err == nil { if _, err := pq.Query(ctx, query); err == nil {
t.Fatalf("expected invalid response status error got nil") t.Fatalf("expected invalid response status error got nil")
} }
if _, err := am.Query(ctx, query, NewPrometheusType()); err == nil { if _, err := pq.Query(ctx, query); err == nil {
t.Fatalf("expected response body error got nil") t.Fatalf("expected response body error got nil")
} }
if _, err := am.Query(ctx, query, NewPrometheusType()); err == nil { if _, err := pq.Query(ctx, query); err == nil {
t.Fatalf("expected error status got nil") t.Fatalf("expected error status got nil")
} }
if _, err := am.Query(ctx, query, NewPrometheusType()); err == nil { if _, err := pq.Query(ctx, query); err == nil {
t.Fatalf("expected unknown status got nil") t.Fatalf("expected unknown status got nil")
} }
if _, err := am.Query(ctx, query, NewPrometheusType()); err == nil { if _, err := pq.Query(ctx, query); err == nil {
t.Fatalf("expected non-vector resultType error got nil") t.Fatalf("expected non-vector resultType error got nil")
} }
m, err := am.Query(ctx, query, NewPrometheusType()) m, err := pq.Query(ctx, query)
if err != nil { if err != nil {
t.Fatalf("unexpected %s", err) t.Fatalf("unexpected %s", err)
} }
@ -100,13 +107,14 @@ func TestVMSelectQuery(t *testing.T) {
Timestamp: 1583786142, Timestamp: 1583786142,
Value: 13763, Value: 13763,
} }
if m[0].Timestamp != expected.Timestamp && if !reflect.DeepEqual(m[0], expected) {
m[0].Value != expected.Value &&
m[0].Labels[0].Value != expected.Labels[0].Value &&
m[0].Labels[0].Name != expected.Labels[0].Name {
t.Fatalf("unexpected metric %+v want %+v", m[0], expected) t.Fatalf("unexpected metric %+v want %+v", m[0], expected)
} }
m, err = am.Query(ctx, queryRender, NewGraphiteType())
g := NewGraphiteType()
gq := s.BuildWithParams(QuerierParams{DataSourceType: &g})
m, err = gq.Query(ctx, queryRender)
if err != nil { if err != nil {
t.Fatalf("unexpected %s", err) t.Fatalf("unexpected %s", err)
} }
@ -118,10 +126,139 @@ func TestVMSelectQuery(t *testing.T) {
Timestamp: 1611758403, Timestamp: 1611758403,
Value: 10, Value: 10,
} }
if m[0].Timestamp != expected.Timestamp && if !reflect.DeepEqual(m[0], expected) {
m[0].Value != expected.Value &&
m[0].Labels[0].Value != expected.Labels[0].Value &&
m[0].Labels[0].Name != expected.Labels[0].Name {
t.Fatalf("unexpected metric %+v want %+v", m[0], expected) t.Fatalf("unexpected metric %+v want %+v", m[0], expected)
} }
} }
func TestPrepareReq(t *testing.T) {
query := "up"
timestamp := time.Date(2001, 2, 3, 4, 5, 6, 0, time.UTC)
testCases := []struct {
name string
vm *VMStorage
checkFn func(t *testing.T, r *http.Request)
}{
{
"prometheus path",
&VMStorage{
dataSourceType: NewPrometheusType(),
},
func(t *testing.T, r *http.Request) {
checkEqualString(t, queryPath, r.URL.Path)
},
},
{
"prometheus prefix",
&VMStorage{
dataSourceType: NewPrometheusType(),
appendTypePrefix: true,
},
func(t *testing.T, r *http.Request) {
checkEqualString(t, prometheusPrefix+queryPath, r.URL.Path)
},
},
{
"graphite path",
&VMStorage{
dataSourceType: NewGraphiteType(),
},
func(t *testing.T, r *http.Request) {
checkEqualString(t, graphitePath, r.URL.Path)
},
},
{
"graphite prefix",
&VMStorage{
dataSourceType: NewGraphiteType(),
appendTypePrefix: true,
},
func(t *testing.T, r *http.Request) {
checkEqualString(t, graphitePrefix+graphitePath, r.URL.Path)
},
},
{
"default params",
&VMStorage{},
func(t *testing.T, r *http.Request) {
exp := fmt.Sprintf("query=%s&time=%d", query, timestamp.Unix())
checkEqualString(t, exp, r.URL.RawQuery)
},
},
{
"basic auth",
&VMStorage{
basicAuthUser: "foo",
basicAuthPass: "bar",
},
func(t *testing.T, r *http.Request) {
u, p, _ := r.BasicAuth()
checkEqualString(t, "foo", u)
checkEqualString(t, "bar", p)
},
},
{
"lookback",
&VMStorage{
lookBack: time.Minute,
},
func(t *testing.T, r *http.Request) {
exp := fmt.Sprintf("query=%s&time=%d", query, timestamp.Add(-time.Minute).Unix())
checkEqualString(t, exp, r.URL.RawQuery)
},
},
{
"evaluation interval",
&VMStorage{
evaluationInterval: 15 * time.Second,
},
func(t *testing.T, r *http.Request) {
evalInterval := 15 * time.Second
tt := timestamp.Truncate(evalInterval)
exp := fmt.Sprintf("query=%s&step=%v&time=%d", query, evalInterval, tt.Unix())
checkEqualString(t, exp, r.URL.RawQuery)
},
},
{
"lookback + evaluation interval",
&VMStorage{
lookBack: time.Minute,
evaluationInterval: 15 * time.Second,
},
func(t *testing.T, r *http.Request) {
evalInterval := 15 * time.Second
tt := timestamp.Add(-time.Minute)
tt = tt.Truncate(evalInterval)
exp := fmt.Sprintf("query=%s&step=%v&time=%d", query, evalInterval, tt.Unix())
checkEqualString(t, exp, r.URL.RawQuery)
},
},
{
"step override",
&VMStorage{
queryStep: time.Minute,
},
func(t *testing.T, r *http.Request) {
exp := fmt.Sprintf("query=%s&step=%v&time=%d", query, time.Minute, timestamp.Unix())
checkEqualString(t, exp, r.URL.RawQuery)
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
req, err := tc.vm.prepareReq(query, timestamp)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
tc.checkFn(t, req)
})
}
}
func checkEqualString(t *testing.T, exp, got string) {
t.Helper()
if got != exp {
t.Errorf("expected to get %q; got %q", exp, got)
}
}

View file

@ -49,7 +49,7 @@ func newGroupMetrics(name, file string) *groupMetrics {
return m return m
} }
func newGroup(cfg config.Group, defaultInterval time.Duration, labels map[string]string) *Group { func newGroup(cfg config.Group, qb datasource.QuerierBuilder, defaultInterval time.Duration, labels map[string]string) *Group {
g := &Group{ g := &Group{
Type: cfg.Type, Type: cfg.Type,
Name: cfg.Name, Name: cfg.Name,
@ -81,17 +81,17 @@ func newGroup(cfg config.Group, defaultInterval time.Duration, labels map[string
} }
r.Labels[k] = v r.Labels[k] = v
} }
rules[i] = g.newRule(r) rules[i] = g.newRule(qb, r)
} }
g.Rules = rules g.Rules = rules
return g return g
} }
func (g *Group) newRule(rule config.Rule) Rule { func (g *Group) newRule(qb datasource.QuerierBuilder, rule config.Rule) Rule {
if rule.Alert != "" { if rule.Alert != "" {
return newAlertingRule(g, rule) return newAlertingRule(qb, g, rule)
} }
return newRecordingRule(g, rule) return newRecordingRule(qb, g, rule)
} }
// ID return unique group ID that consists of // ID return unique group ID that consists of
@ -106,7 +106,7 @@ func (g *Group) ID() uint64 {
} }
// Restore restores alerts state for group rules // Restore restores alerts state for group rules
func (g *Group) Restore(ctx context.Context, q datasource.Querier, lookback time.Duration, labels map[string]string) error { func (g *Group) Restore(ctx context.Context, qb datasource.QuerierBuilder, lookback time.Duration, labels map[string]string) error {
for _, rule := range g.Rules { for _, rule := range g.Rules {
rr, ok := rule.(*AlertingRule) rr, ok := rule.(*AlertingRule)
if !ok { if !ok {
@ -115,6 +115,7 @@ func (g *Group) Restore(ctx context.Context, q datasource.Querier, lookback time
if rr.For < 1 { if rr.For < 1 {
continue continue
} }
q := qb.BuildWithParams(datasource.QuerierParams{})
if err := rr.Restore(ctx, q, lookback, labels); err != nil { if err := rr.Restore(ctx, q, lookback, labels); err != nil {
return fmt.Errorf("error while restoring rule %q: %w", rule, err) return fmt.Errorf("error while restoring rule %q: %w", rule, err)
} }
@ -189,7 +190,7 @@ func (g *Group) close() {
var skipRandSleepOnGroupStart bool var skipRandSleepOnGroupStart bool
func (g *Group) start(ctx context.Context, querier datasource.Querier, nts []notifier.Notifier, rw *remotewrite.Client) { func (g *Group) start(ctx context.Context, nts []notifier.Notifier, rw *remotewrite.Client) {
defer func() { close(g.finishedCh) }() defer func() { close(g.finishedCh) }()
// Spread group rules evaluation over time in order to reduce load on VictoriaMetrics. // Spread group rules evaluation over time in order to reduce load on VictoriaMetrics.
@ -213,7 +214,7 @@ func (g *Group) start(ctx context.Context, querier datasource.Querier, nts []not
} }
logger.Infof("group %q started; interval=%v; concurrency=%d", g.Name, g.Interval, g.Concurrency) logger.Infof("group %q started; interval=%v; concurrency=%d", g.Name, g.Interval, g.Concurrency)
e := &executor{querier, nts, rw} e := &executor{nts, rw}
t := time.NewTicker(g.Interval) t := time.NewTicker(g.Interval)
defer t.Stop() defer t.Stop()
for { for {
@ -256,7 +257,6 @@ func (g *Group) start(ctx context.Context, querier datasource.Querier, nts []not
} }
type executor struct { type executor struct {
querier datasource.Querier
notifiers []notifier.Notifier notifiers []notifier.Notifier
rw *remotewrite.Client rw *remotewrite.Client
} }
@ -310,7 +310,7 @@ func (e *executor) exec(ctx context.Context, rule Rule, returnSeries bool, inter
execDuration.UpdateDuration(execStart) execDuration.UpdateDuration(execStart)
}() }()
tss, err := rule.Exec(ctx, e.querier, returnSeries) tss, err := rule.Exec(ctx, returnSeries)
if err != nil { if err != nil {
execErrors.Inc() execErrors.Inc()
return fmt.Errorf("rule %q: failed to execute: %w", rule, err) return fmt.Errorf("rule %q: failed to execute: %w", rule, err)

View file

@ -6,6 +6,8 @@ import (
"testing" "testing"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
) )
@ -105,20 +107,32 @@ func TestUpdateWith(t *testing.T) {
{Record: "foo5"}, {Record: "foo5"},
}, },
}, },
{
"update datasource type",
[]config.Rule{
{Alert: "foo1", Type: datasource.NewPrometheusType()},
{Alert: "foo3", Type: datasource.NewGraphiteType()},
},
[]config.Rule{
{Alert: "foo1", Type: datasource.NewGraphiteType()},
{Alert: "foo10", Type: datasource.NewPrometheusType()},
},
},
} }
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
g := &Group{Name: "test"} g := &Group{Name: "test"}
qb := &fakeQuerier{}
for _, r := range tc.currentRules { for _, r := range tc.currentRules {
r.ID = config.HashRule(r) r.ID = config.HashRule(r)
g.Rules = append(g.Rules, g.newRule(r)) g.Rules = append(g.Rules, g.newRule(qb, r))
} }
ng := &Group{Name: "test"} ng := &Group{Name: "test"}
for _, r := range tc.newRules { for _, r := range tc.newRules {
r.ID = config.HashRule(r) r.ID = config.HashRule(r)
ng.Rules = append(ng.Rules, ng.newRule(r)) ng.Rules = append(ng.Rules, ng.newRule(qb, r))
} }
err := g.updateWith(ng) err := g.updateWith(ng)
@ -156,11 +170,11 @@ func TestGroupStart(t *testing.T) {
t.Fatalf("failed to parse rules: %s", err) t.Fatalf("failed to parse rules: %s", err)
} }
const evalInterval = time.Millisecond const evalInterval = time.Millisecond
g := newGroup(groups[0], evalInterval, map[string]string{"cluster": "east-1"})
g.Concurrency = 2
fn := &fakeNotifier{}
fs := &fakeQuerier{} fs := &fakeQuerier{}
fn := &fakeNotifier{}
g := newGroup(groups[0], fs, evalInterval, map[string]string{"cluster": "east-1"})
g.Concurrency = 2
const inst1, inst2, job = "foo", "bar", "baz" const inst1, inst2, job = "foo", "bar", "baz"
m1 := metricWithLabels(t, "instance", inst1, "job", job) m1 := metricWithLabels(t, "instance", inst1, "job", job)
@ -195,7 +209,7 @@ func TestGroupStart(t *testing.T) {
fs.add(m1) fs.add(m1)
fs.add(m2) fs.add(m2)
go func() { go func() {
g.start(context.Background(), fs, []notifier.Notifier{fn}, nil) g.start(context.Background(), []notifier.Notifier{fn}, nil)
close(finished) close(finished)
}() }()

View file

@ -38,7 +38,11 @@ func (fq *fakeQuerier) add(metrics ...datasource.Metric) {
fq.Unlock() fq.Unlock()
} }
func (fq *fakeQuerier) Query(_ context.Context, _ string, _ datasource.Type) ([]datasource.Metric, error) { func (fq *fakeQuerier) BuildWithParams(_ datasource.QuerierParams) datasource.Querier {
return fq
}
func (fq *fakeQuerier) Query(_ context.Context, _ string) ([]datasource.Metric, error) {
fq.Lock() fq.Lock()
defer fq.Unlock() defer fq.Unlock()
if fq.err != nil { if fq.err != nil {
@ -160,6 +164,9 @@ func compareAlertingRules(t *testing.T, a, b *AlertingRule) error {
if !reflect.DeepEqual(a.Labels, b.Labels) { if !reflect.DeepEqual(a.Labels, b.Labels) {
return fmt.Errorf("expected to have labels %#v; got %#v", a.Labels, b.Labels) return fmt.Errorf("expected to have labels %#v; got %#v", a.Labels, b.Labels)
} }
if a.Type.String() != b.Type.String() {
return fmt.Errorf("expected to have Type %#v; got %#v", a.Type.String(), b.Type.String())
}
return nil return nil
} }

View file

@ -140,10 +140,10 @@ func newManager(ctx context.Context) (*manager, error) {
} }
manager := &manager{ manager := &manager{
groups: make(map[uint64]*Group), groups: make(map[uint64]*Group),
querier: q, querierBuilder: q,
notifiers: nts, notifiers: nts,
labels: map[string]string{}, labels: map[string]string{},
} }
rw, err := remotewrite.Init(ctx) rw, err := remotewrite.Init(ctx)
if err != nil { if err != nil {

View file

@ -15,11 +15,12 @@ import (
// manager controls group states // manager controls group states
type manager struct { type manager struct {
querier datasource.Querier querierBuilder datasource.QuerierBuilder
notifiers []notifier.Notifier notifiers []notifier.Notifier
rw *remotewrite.Client rw *remotewrite.Client
rr datasource.Querier // remote read builder.
rr datasource.QuerierBuilder
wg sync.WaitGroup wg sync.WaitGroup
labels map[string]string labels map[string]string
@ -74,7 +75,7 @@ func (m *manager) startGroup(ctx context.Context, group *Group, restore bool) {
m.wg.Add(1) m.wg.Add(1)
id := group.ID() id := group.ID()
go func() { go func() {
group.start(ctx, m.querier, m.notifiers, m.rw) group.start(ctx, m.notifiers, m.rw)
m.wg.Done() m.wg.Done()
}() }()
m.groups[id] = group m.groups[id] = group
@ -89,7 +90,7 @@ func (m *manager) update(ctx context.Context, path []string, validateTpl, valida
groupsRegistry := make(map[uint64]*Group) groupsRegistry := make(map[uint64]*Group)
for _, cfg := range groupsCfg { for _, cfg := range groupsCfg {
ng := newGroup(cfg, *evaluationInterval, m.labels) ng := newGroup(cfg, m.querierBuilder, *evaluationInterval, m.labels)
groupsRegistry[ng.ID()] = ng groupsRegistry[ng.ID()] = ng
} }

View file

@ -37,9 +37,9 @@ func TestManagerEmptyRulesDir(t *testing.T) {
// Should be executed with -race flag // Should be executed with -race flag
func TestManagerUpdateConcurrent(t *testing.T) { func TestManagerUpdateConcurrent(t *testing.T) {
m := &manager{ m := &manager{
groups: make(map[uint64]*Group), groups: make(map[uint64]*Group),
querier: &fakeQuerier{}, querierBuilder: &fakeQuerier{},
notifiers: []notifier.Notifier{&fakeNotifier{}}, notifiers: []notifier.Notifier{&fakeNotifier{}},
} }
paths := []string{ paths := []string{
"config/testdata/dir/rules0-good.rules", "config/testdata/dir/rules0-good.rules",
@ -242,7 +242,7 @@ func TestManagerUpdate(t *testing.T) {
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO()) ctx, cancel := context.WithCancel(context.TODO())
m := &manager{groups: make(map[uint64]*Group), querier: &fakeQuerier{}} m := &manager{groups: make(map[uint64]*Group), querierBuilder: &fakeQuerier{}}
path := []string{tc.initPath} path := []string{tc.initPath}
if err := m.update(ctx, path, true, true, false); err != nil { if err := m.update(ctx, path, true, true, false); err != nil {
t.Fatalf("failed to complete initial rules update: %s", err) t.Fatalf("failed to complete initial rules update: %s", err)

View file

@ -25,6 +25,8 @@ type RecordingRule struct {
Labels map[string]string Labels map[string]string
GroupID uint64 GroupID uint64
q datasource.Querier
// guard status fields // guard status fields
mu sync.RWMutex mu sync.RWMutex
// stores last moment of time Exec was called // stores last moment of time Exec was called
@ -52,7 +54,7 @@ func (rr *RecordingRule) ID() uint64 {
return rr.RuleID return rr.RuleID
} }
func newRecordingRule(group *Group, cfg config.Rule) *RecordingRule { func newRecordingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rule) *RecordingRule {
rr := &RecordingRule{ rr := &RecordingRule{
Type: cfg.Type, Type: cfg.Type,
RuleID: cfg.ID, RuleID: cfg.ID,
@ -61,6 +63,10 @@ func newRecordingRule(group *Group, cfg config.Rule) *RecordingRule {
Labels: cfg.Labels, Labels: cfg.Labels,
GroupID: group.ID(), GroupID: group.ID(),
metrics: &recordingRuleMetrics{}, metrics: &recordingRuleMetrics{},
q: qb.BuildWithParams(datasource.QuerierParams{
DataSourceType: &cfg.Type,
EvaluationInterval: group.Interval,
}),
} }
labels := fmt.Sprintf(`recording=%q, group=%q, id="%d"`, rr.Name, group.Name, rr.ID()) labels := fmt.Sprintf(`recording=%q, group=%q, id="%d"`, rr.Name, group.Name, rr.ID())
@ -82,12 +88,12 @@ func (rr *RecordingRule) Close() {
} }
// Exec executes RecordingRule expression via the given Querier. // Exec executes RecordingRule expression via the given Querier.
func (rr *RecordingRule) Exec(ctx context.Context, q datasource.Querier, series bool) ([]prompbmarshal.TimeSeries, error) { func (rr *RecordingRule) Exec(ctx context.Context, series bool) ([]prompbmarshal.TimeSeries, error) {
if !series { if !series {
return nil, nil return nil, nil
} }
qMetrics, err := q.Query(ctx, rr.Expr, rr.Type) qMetrics, err := rr.q.Query(ctx, rr.Expr)
rr.mu.Lock() rr.mu.Lock()
defer rr.mu.Unlock() defer rr.mu.Unlock()
@ -100,7 +106,7 @@ func (rr *RecordingRule) Exec(ctx context.Context, q datasource.Querier, series
duplicates := make(map[uint64]prompbmarshal.TimeSeries, len(qMetrics)) duplicates := make(map[uint64]prompbmarshal.TimeSeries, len(qMetrics))
var tss []prompbmarshal.TimeSeries var tss []prompbmarshal.TimeSeries
for _, r := range qMetrics { for _, r := range qMetrics {
ts := rr.toTimeSeries(r, rr.lastExecTime) ts := rr.toTimeSeries(r, time.Unix(r.Timestamp, 0))
h := hashTimeSeries(ts) h := hashTimeSeries(ts)
if _, ok := duplicates[h]; ok { if _, ok := duplicates[h]; ok {
rr.lastExecError = errDuplicate rr.lastExecError = errDuplicate

View file

@ -76,7 +76,8 @@ func TestRecoridngRule_ToTimeSeries(t *testing.T) {
t.Run(tc.rule.Name, func(t *testing.T) { t.Run(tc.rule.Name, func(t *testing.T) {
fq := &fakeQuerier{} fq := &fakeQuerier{}
fq.add(tc.metrics...) fq.add(tc.metrics...)
tss, err := tc.rule.Exec(context.TODO(), fq, true) tc.rule.q = fq
tss, err := tc.rule.Exec(context.TODO(), true)
if err != nil { if err != nil {
t.Fatalf("unexpected Exec err: %s", err) t.Fatalf("unexpected Exec err: %s", err)
} }
@ -95,8 +96,8 @@ func TestRecoridngRule_ToTimeSeriesNegative(t *testing.T) {
fq := &fakeQuerier{} fq := &fakeQuerier{}
expErr := "connection reset by peer" expErr := "connection reset by peer"
fq.setErr(errors.New(expErr)) fq.setErr(errors.New(expErr))
rr.q = fq
_, err := rr.Exec(context.TODO(), fq, true) _, err := rr.Exec(context.TODO(), true)
if err == nil { if err == nil {
t.Fatalf("expected to get err; got nil") t.Fatalf("expected to get err; got nil")
} }
@ -111,7 +112,7 @@ func TestRecoridngRule_ToTimeSeriesNegative(t *testing.T) {
fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "foo")) fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "foo"))
fq.add(metricWithValueAndLabels(t, 2, "__name__", "foo", "job", "bar")) fq.add(metricWithValueAndLabels(t, 2, "__name__", "foo", "job", "bar"))
_, err = rr.Exec(context.TODO(), fq, true) _, err = rr.Exec(context.TODO(), true)
if err == nil { if err == nil {
t.Fatalf("expected to get err; got nil") t.Fatalf("expected to get err; got nil")
} }

View file

@ -26,7 +26,7 @@ var (
// Init creates a Querier from provided flag values. // Init creates a Querier from provided flag values.
// Returns nil if addr flag wasn't set. // Returns nil if addr flag wasn't set.
func Init() (datasource.Querier, error) { func Init() (datasource.QuerierBuilder, error) {
if *addr == "" { if *addr == "" {
return nil, nil return nil, nil
} }

View file

@ -4,7 +4,6 @@ import (
"context" "context"
"errors" "errors"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
) )
@ -18,7 +17,7 @@ type Rule interface {
// Exec executes the rule with given context // Exec executes the rule with given context
// and Querier. If returnSeries is true, Exec // and Querier. If returnSeries is true, Exec
// may return TimeSeries as result of execution // may return TimeSeries as result of execution
Exec(ctx context.Context, q datasource.Querier, returnSeries bool) ([]prompbmarshal.TimeSeries, error) Exec(ctx context.Context, returnSeries bool) ([]prompbmarshal.TimeSeries, error)
// UpdateWith performs modification of current Rule // UpdateWith performs modification of current Rule
// with fields of the given Rule. // with fields of the given Rule.
UpdateWith(Rule) error UpdateWith(Rule) error

View file

@ -17,25 +17,19 @@ type requestHandler struct {
m *manager m *manager
} }
var pathList = [][]string{
{"/api/v1/groups", "list all loaded groups and rules"},
{"/api/v1/alerts", "list all active alerts"},
{"/api/v1/groupID/alertID/status", "get alert status by ID"},
// /metrics is served by httpserver by default
{"/metrics", "list of application metrics"},
{"/-/reload", "reload configuration"},
}
func (rh *requestHandler) handler(w http.ResponseWriter, r *http.Request) bool { func (rh *requestHandler) handler(w http.ResponseWriter, r *http.Request) bool {
switch r.URL.Path { switch r.URL.Path {
case "/": case "/":
if r.Method != "GET" { if r.Method != "GET" {
return false return false
} }
for _, path := range pathList { httpserver.WriteAPIHelp(w, [][2]string{
p, doc := path[0], path[1] {"/api/v1/groups", "list all loaded groups and rules"},
fmt.Fprintf(w, "<a href='%s'>%q</a> - %s<br/>", p, p, doc) {"/api/v1/alerts", "list all active alerts"},
} {"/api/v1/groupID/alertID/status", "get alert status by ID"},
{"/metrics", "list of application metrics"},
{"/-/reload", "reload configuration"},
})
return true return true
case "/api/v1/groups": case "/api/v1/groups":
data, err := rh.listGroups() data, err := rh.listGroups()

View file

@ -1,5 +1,5 @@
--- ---
sort: 18 sort: 16
--- ---
# Articles # Articles

View file

@ -9,9 +9,13 @@ sort: 15
* FEATURE: vmauth: add ability to set madatory query args in `url_prefix`. For example, `url_prefix: http://vm:8428/?extra_label=team=dev` would add `extra_label=team=dev` query arg to all the incoming requests. See [the example](https://docs.victoriametrics.com/vmauth.html#auth-config) for more details. * FEATURE: vmauth: add ability to set madatory query args in `url_prefix`. For example, `url_prefix: http://vm:8428/?extra_label=team=dev` would add `extra_label=team=dev` query arg to all the incoming requests. See [the example](https://docs.victoriametrics.com/vmauth.html#auth-config) for more details.
* FEATURE: add OpenTSDB migration option to vmctl. See more details [here](https://docs.victoriametrics.com/vmctl#migrating-data-from-opentsdb). * FEATURE: add OpenTSDB migration option to vmctl. See more details [here](https://docs.victoriametrics.com/vmctl#migrating-data-from-opentsdb).
Thanks to @johnseekins! Thanks to @johnseekins!
* FEATURE: improved new time series registration speed on systems with many CPU cores. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1244). Thanks to @waldoweng for the idea and [draft implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1243).
* FEATURE: vmagent: list user-visible endpoints at `http://vmagent:8429/`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1251).
* FEATURE: vmalert: use the same technique as Grafana for determining evaluation timestamps for recording rules. This should make consistent graphs for series generated by recording rules compared to graphs generated for queries from recording rules in Grafana. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1232).
* BUGFIX: vmagent: properly update `role: endpoints` and `role: endpointslices` scrape targets if the underlying service changes in `kubernetes_sd_config`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240). * BUGFIX: vmagent: properly update `role: endpoints` and `role: endpointslices` scrape targets if the underlying service changes in `kubernetes_sd_config`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240).
* BUGFIX: vmagent: apply `scrape_timeout` on receiving the first response byte from `stream_parse: true` scrape targets. Previously it was applied to receiving and *processing* the full response stream. This could result in false timeout errors when scrape target exposes millions of metrics as described [here](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1017#issuecomment-767235047). * BUGFIX: vmagent: apply `scrape_timeout` on receiving the first response byte from `stream_parse: true` scrape targets. Previously it was applied to receiving and *processing* the full response stream. This could result in false timeout errors when scrape target exposes millions of metrics as described [here](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1017#issuecomment-767235047).
* BUGFIX: vmagent: eliminate possible data race when obtaining value for the metric `vm_persistentqueue_bytes_pending`. The data race could result in incorrect value for this metric.
* BUGFIX: vmstorage: remove empty directories on startup. Such directories can be left after unclean shutdown on NFS storage. Previously such directories could lead to crashloop until manually removed. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1142). * BUGFIX: vmstorage: remove empty directories on startup. Such directories can be left after unclean shutdown on NFS storage. Previously such directories could lead to crashloop until manually removed. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1142).

View file

@ -1,5 +1,5 @@
--- ---
sort: 20 sort: 21
--- ---
# Docs # Docs

View file

@ -2,31 +2,35 @@
sort: 19 sort: 19
--- ---
## VictoriaMetrics Cluster Per Tenant Statistic # VictoriaMetrics Cluster Per Tenant Statistic
<img alt="cluster-per-tenant-stat" src="per-tenant-stats.jpg"> <img alt="cluster-per-tenant-stat" src="per-tenant-stats.jpg">
The enterprise version of VictoriaMetrics cluster exposes the usage statistics for each tenant. VictoriaMetrics cluster for enterprise provides various metrics and statistics usage per tenant:
When the next statistic is exposed:
- `vminsert` - `vminsert`
* `vm_tenant_inserted_rows_total` - total number of inserted rows. Find out which tenant
puts the most of the pressure on the storage.
* `vm_tenant_inserted_rows_total` - the ingestion rate by tenant
- `vmselect` - `vmselect`
* `vm_tenant_select_requests_duration_ms_total` - query latency.
* `vm_tenant_select_requests_duration_ms_total` - query latency by tenant. It can be useful to identify the tenant with the heaviest queries Helps to identify tenants with the heaviest queries.
* `vm_tenant_select_requests_total` - total requests. You can calculate request rate (qps) with this metric * `vm_tenant_select_requests_total` - total number of requests.
Discover which tenant sends the most of the queries and how it changes with time.
- `vmstorage` - `vmstorage`
* `vm_tenant_active_timeseries` - the number of active timeseries * `vm_tenant_active_timeseries` - number of active time series.
* `vm_tenant_used_tenant_bytes` - the disk space consumed by the metrics for a particular tenant This metric correlates with memory usage, so can be used to find the most expensive
* `vm_tenant_timeseries_created_total` - the total number for timeseries by tenant tenant in terms of memory.
* `vm_tenant_used_tenant_bytes` - disk space usage. Helps to track disk space usage
per tenant.
* `vm_tenant_timeseries_created_total` - number of new time series created. Helps to track
the churn rate per tenant, or identify inefficient usage of the system.
Collect the metrics by any scrape agent you like (`vmagent`, `victoriametrics`, Prometheus, etc) and put into TSDB.
It is ok to use existing cluster for storing such metrics, but make sure to use a different tenant for it to avoid collisions.
Or just run a separate TSDB (VM single, Promethes, etc.) to keep the data isolated from the main cluster.
The information should be scraped by the agent (`vmagent`, `victoriametrics`, prometheus, etc) and stored in the TSDB. This can be the same cluster but a different tenant however, we encourage the use of one more instance of TSDB (more lightweight, eg. VM single) for the monitoring of monitoring. Example of the scraping configuration for statistic is the following:
the config example for statistic scraping
```yaml ```yaml
scrape_configs: scrape_configs:
@ -36,20 +40,23 @@ scrape_configs:
- targets: ['vmselect:8481','vmstorage:8482','vminsert:8480'] - targets: ['vmselect:8481','vmstorage:8482','vminsert:8480']
``` ```
### Visualization ## Visualization
Visualisation of statistics can be done in Grafana using this dashboard [link](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/cluster/dashboards/clusterbytenant.json) Visualisation of statistics can be done in Grafana using the following
[dashboard](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/cluster/dashboards/clusterbytenant.json).
### Integration with vmgateway ## Integration with vmgateway
Per Tenant Statistics are the source data for the `vmgateway` rate limiter. More information can be found [here](https://docs.victoriametrics.com/vmgateway.html) `vmgateway` supports integration with Per Tenant Statistics data for rate limiting purposes.
More information can be found [here](https://docs.victoriametrics.com/vmgateway.html)
### Integration with vmalert ## Integration with vmalert
You can generate alerts based on each tenants' resource usage and notify the system/users that they are reaching the limits. You can generate alerts based on each tenant's resource usage and send notifications
to prevent limits exhaustion.
Here is an example of an alert for high churn rate by the tenant Here is an alert example for high churn rate by the tenant:
```yaml ```yaml

View file

@ -1,5 +1,5 @@
--- ---
sort: 16 sort: 18
--- ---
# Release process guidance # Release process guidance

2
go.mod
View file

@ -17,7 +17,7 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/snappy v0.0.3 github.com/golang/snappy v0.0.3
github.com/influxdata/influxdb v1.8.5 github.com/influxdata/influxdb v1.8.5
github.com/klauspost/compress v1.12.1 github.com/klauspost/compress v1.12.2
github.com/oklog/ulid v1.3.1 github.com/oklog/ulid v1.3.1
github.com/prometheus/client_golang v1.10.0 // indirect github.com/prometheus/client_golang v1.10.0 // indirect
github.com/prometheus/common v0.21.0 // indirect github.com/prometheus/common v0.21.0 // indirect

4
go.sum
View file

@ -517,8 +517,8 @@ github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0
github.com/klauspost/compress v1.10.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.10.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.11.0/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.11.0/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.12.1 h1:/+xsCsk06wE38cyiqOR/o7U2fSftcH72xD+BQXmja/g= github.com/klauspost/compress v1.12.2 h1:2KCfW3I9M7nSc5wOqXAlW2v2U6v+w6cbjvbfp+OykW8=
github.com/klauspost/compress v1.12.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/klauspost/compress v1.12.2/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6/go.mod h1:+ZoRqAPRLkC4NPOvfYeR5KNOrY6TD+/sAC3HXPZgDYg= github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6/go.mod h1:+ZoRqAPRLkC4NPOvfYeR5KNOrY6TD+/sAC3HXPZgDYg=
github.com/klauspost/pgzip v1.0.2-0.20170402124221-0bf5dcad4ada/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= github.com/klauspost/pgzip v1.0.2-0.20170402124221-0bf5dcad4ada/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=

View file

@ -11,6 +11,7 @@ import (
"net" "net"
"net/http" "net/http"
"net/http/pprof" "net/http/pprof"
"path"
"runtime" "runtime"
"strconv" "strconv"
"strings" "strings"
@ -589,3 +590,12 @@ func IsTLS() bool {
func GetPathPrefix() string { func GetPathPrefix() string {
return *pathPrefix return *pathPrefix
} }
// WriteAPIHelp writes pathList to w in HTML format.
func WriteAPIHelp(w io.Writer, pathList [][2]string) {
for _, p := range pathList {
p, doc := p[0], p[1]
p = path.Join(*pathPrefix, p)
fmt.Fprintf(w, "<a href=%q>%s</a> - %s<br/>", p, p, doc)
}
}

View file

@ -103,9 +103,10 @@ type Table struct {
partsLock sync.Mutex partsLock sync.Mutex
parts []*partWrapper parts []*partWrapper
rawItemsBlocks []*inmemoryBlock // rawItems contains recently added items that haven't been converted to parts yet.
rawItemsLock sync.Mutex //
rawItemsLastFlushTime uint64 // rawItems aren't used in search for performance reasons
rawItems rawItemsShards
snapshotLock sync.RWMutex snapshotLock sync.RWMutex
@ -124,6 +125,97 @@ type Table struct {
rawItemsPendingFlushesWG syncwg.WaitGroup rawItemsPendingFlushesWG syncwg.WaitGroup
} }
type rawItemsShards struct {
shardIdx uint32
// shards reduce lock contention when adding rows on multi-CPU systems.
shards []rawItemsShard
}
// The number of shards for rawItems per table.
//
// Higher number of shards reduces CPU contention and increases the max bandwidth on multi-core systems.
var rawItemsShardsPerTable = cgroup.AvailableCPUs()
const maxBlocksPerShard = 512
func (riss *rawItemsShards) init() {
riss.shards = make([]rawItemsShard, rawItemsShardsPerTable)
}
func (riss *rawItemsShards) addItems(tb *Table, items [][]byte) error {
n := atomic.AddUint32(&riss.shardIdx, 1)
shards := riss.shards
idx := n % uint32(len(shards))
shard := &shards[idx]
return shard.addItems(tb, items)
}
func (riss *rawItemsShards) Len() int {
n := 0
for i := range riss.shards {
n += riss.shards[i].Len()
}
return n
}
type rawItemsShard struct {
mu sync.Mutex
ibs []*inmemoryBlock
lastFlushTime uint64
}
func (ris *rawItemsShard) Len() int {
ris.mu.Lock()
n := 0
for _, ib := range ris.ibs {
n += len(ib.items)
}
ris.mu.Unlock()
return n
}
func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) error {
var err error
var blocksToMerge []*inmemoryBlock
ris.mu.Lock()
ibs := ris.ibs
if len(ibs) == 0 {
ib := getInmemoryBlock()
ibs = append(ibs, ib)
ris.ibs = ibs
}
ib := ibs[len(ibs)-1]
for _, item := range items {
if !ib.Add(item) {
ib = getInmemoryBlock()
if !ib.Add(item) {
putInmemoryBlock(ib)
err = fmt.Errorf("cannot insert an item %q into an empty inmemoryBlock; it looks like the item is too large? len(item)=%d", item, len(item))
break
}
ibs = append(ibs, ib)
ris.ibs = ibs
}
}
if len(ibs) >= maxBlocksPerShard {
blocksToMerge = ibs
ris.ibs = make([]*inmemoryBlock, 0, maxBlocksPerShard)
ris.lastFlushTime = fasttime.UnixTimestamp()
}
ris.mu.Unlock()
if blocksToMerge == nil {
// Fast path.
return err
}
// Slow path: merge blocksToMerge.
tb.mergeRawItemsBlocks(blocksToMerge)
return err
}
type partWrapper struct { type partWrapper struct {
p *part p *part
@ -195,6 +287,7 @@ func OpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallb
flockF: flockF, flockF: flockF,
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
} }
tb.rawItems.init()
tb.startPartMergers() tb.startPartMergers()
tb.startRawItemsFlusher() tb.startRawItemsFlusher()
@ -314,11 +407,7 @@ func (tb *Table) UpdateMetrics(m *TableMetrics) {
m.ItemsMerged += atomic.LoadUint64(&tb.itemsMerged) m.ItemsMerged += atomic.LoadUint64(&tb.itemsMerged)
m.AssistedMerges += atomic.LoadUint64(&tb.assistedMerges) m.AssistedMerges += atomic.LoadUint64(&tb.assistedMerges)
tb.rawItemsLock.Lock() m.PendingItems += uint64(tb.rawItems.Len())
for _, ib := range tb.rawItemsBlocks {
m.PendingItems += uint64(len(ib.items))
}
tb.rawItemsLock.Unlock()
tb.partsLock.Lock() tb.partsLock.Lock()
m.PartsCount += uint64(len(tb.parts)) m.PartsCount += uint64(len(tb.parts))
@ -352,42 +441,10 @@ func (tb *Table) UpdateMetrics(m *TableMetrics) {
// AddItems adds the given items to the tb. // AddItems adds the given items to the tb.
func (tb *Table) AddItems(items [][]byte) error { func (tb *Table) AddItems(items [][]byte) error {
var err error if err := tb.rawItems.addItems(tb, items); err != nil {
var blocksToMerge []*inmemoryBlock return fmt.Errorf("cannot insert data into %q: %w", tb.path, err)
tb.rawItemsLock.Lock()
if len(tb.rawItemsBlocks) == 0 {
ib := getInmemoryBlock()
tb.rawItemsBlocks = append(tb.rawItemsBlocks, ib)
} }
ib := tb.rawItemsBlocks[len(tb.rawItemsBlocks)-1] return nil
for _, item := range items {
if !ib.Add(item) {
ib = getInmemoryBlock()
if !ib.Add(item) {
putInmemoryBlock(ib)
err = fmt.Errorf("cannot insert an item %q into an empty inmemoryBlock on %q; it looks like the item is too large? len(item)=%d",
item, tb.path, len(item))
break
}
tb.rawItemsBlocks = append(tb.rawItemsBlocks, ib)
}
}
if len(tb.rawItemsBlocks) >= 512 {
blocksToMerge = tb.rawItemsBlocks
tb.rawItemsBlocks = nil
tb.rawItemsLastFlushTime = fasttime.UnixTimestamp()
}
tb.rawItemsLock.Unlock()
if blocksToMerge == nil {
// Fast path.
return err
}
// Slow path: merge blocksToMerge.
tb.mergeRawItemsBlocks(blocksToMerge)
return err
} }
// getParts appends parts snapshot to dst and returns it. // getParts appends parts snapshot to dst and returns it.
@ -522,9 +579,25 @@ func (tb *Table) DebugFlush() {
} }
func (tb *Table) flushRawItems(isFinal bool) { func (tb *Table) flushRawItems(isFinal bool) {
tb.rawItems.flush(tb, isFinal)
}
func (riss *rawItemsShards) flush(tb *Table, isFinal bool) {
tb.rawItemsPendingFlushesWG.Add(1) tb.rawItemsPendingFlushesWG.Add(1)
defer tb.rawItemsPendingFlushesWG.Done() defer tb.rawItemsPendingFlushesWG.Done()
var wg sync.WaitGroup
wg.Add(len(riss.shards))
for i := range riss.shards {
go func(ris *rawItemsShard) {
ris.flush(tb, isFinal)
wg.Done()
}(&riss.shards[i])
}
wg.Wait()
}
func (ris *rawItemsShard) flush(tb *Table, isFinal bool) {
mustFlush := false mustFlush := false
currentTime := fasttime.UnixTimestamp() currentTime := fasttime.UnixTimestamp()
flushSeconds := int64(rawItemsFlushInterval.Seconds()) flushSeconds := int64(rawItemsFlushInterval.Seconds())
@ -533,14 +606,14 @@ func (tb *Table) flushRawItems(isFinal bool) {
} }
var blocksToMerge []*inmemoryBlock var blocksToMerge []*inmemoryBlock
tb.rawItemsLock.Lock() ris.mu.Lock()
if isFinal || currentTime-tb.rawItemsLastFlushTime > uint64(flushSeconds) { if isFinal || currentTime-ris.lastFlushTime > uint64(flushSeconds) {
mustFlush = true mustFlush = true
blocksToMerge = tb.rawItemsBlocks blocksToMerge = ris.ibs
tb.rawItemsBlocks = nil ris.ibs = make([]*inmemoryBlock, 0, maxBlocksPerShard)
tb.rawItemsLastFlushTime = currentTime ris.lastFlushTime = currentTime
} }
tb.rawItemsLock.Unlock() ris.mu.Unlock()
if mustFlush { if mustFlush {
tb.mergeRawItemsBlocks(blocksToMerge) tb.mergeRawItemsBlocks(blocksToMerge)

View file

@ -1,11 +1,13 @@
package persistentqueue package persistentqueue
import ( import (
"fmt"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/metrics"
) )
// FastQueue is fast persistent queue, which prefers sending data via memory. // FastQueue is fast persistent queue, which prefers sending data via memory.
@ -47,6 +49,12 @@ func MustOpenFastQueue(path, name string, maxInmemoryBlocks, maxPendingBytes int
} }
fq.cond.L = &fq.mu fq.cond.L = &fq.mu
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp() fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vm_persistentqueue_bytes_pending{path=%q}`, path), func() float64 {
fq.mu.Lock()
n := fq.pq.GetPendingBytes()
fq.mu.Unlock()
return float64(n)
})
pendingBytes := fq.GetPendingBytes() pendingBytes := fq.GetPendingBytes()
logger.Infof("opened fast persistent queue at %q with maxInmemoryBlocks=%d, it contains %d pending bytes", path, maxInmemoryBlocks, pendingBytes) logger.Infof("opened fast persistent queue at %q with maxInmemoryBlocks=%d, it contains %d pending bytes", path, maxInmemoryBlocks, pendingBytes)
return fq return fq

View file

@ -61,8 +61,6 @@ type queue struct {
blocksRead *metrics.Counter blocksRead *metrics.Counter
bytesRead *metrics.Counter bytesRead *metrics.Counter
bytesPending *metrics.Gauge
} }
// ResetIfEmpty resets q if it is empty. // ResetIfEmpty resets q if it is empty.
@ -173,9 +171,6 @@ func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingB
q.bytesWritten = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_bytes_written_total{path=%q}`, path)) q.bytesWritten = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_bytes_written_total{path=%q}`, path))
q.blocksRead = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_blocks_read_total{path=%q}`, path)) q.blocksRead = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_blocks_read_total{path=%q}`, path))
q.bytesRead = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_bytes_read_total{path=%q}`, path)) q.bytesRead = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_bytes_read_total{path=%q}`, path))
q.bytesPending = metrics.GetOrCreateGauge(fmt.Sprintf(`vm_persistentqueue_bytes_pending{path=%q}`, path), func() float64 {
return float64(q.GetPendingBytes())
})
cleanOnError := func() { cleanOnError := func() {
if q.reader != nil { if q.reader != nil {

View file

@ -33,6 +33,8 @@ type WatchEvent struct {
// object is any Kubernetes object. // object is any Kubernetes object.
type object interface { type object interface {
key() string key() string
// getTargetLabels must be called under gw.mu lock.
getTargetLabels(gw *groupWatcher) []map[string]string getTargetLabels(gw *groupWatcher) []map[string]string
} }
@ -74,6 +76,11 @@ func newAPIWatcher(apiServer string, ac *promauth.Config, sdc *SDConfig, swcFunc
func (aw *apiWatcher) mustStart() { func (aw *apiWatcher) mustStart() {
aw.gw.startWatchersForRole(aw.role, aw) aw.gw.startWatchersForRole(aw.role, aw)
if aw.role == "endpoints" || aw.role == "endpointslices" {
// endpoints and endpointslices watchers query pod and service objects. So start watchers for these roles as well.
aw.gw.startWatchersForRole("pod", nil)
aw.gw.startWatchersForRole("service", nil)
}
} }
func (aw *apiWatcher) mustStop() { func (aw *apiWatcher) mustStop() {
@ -226,63 +233,13 @@ var (
}) })
) )
// getObjectByRole returns an object with the given (namespace, name) key and the given role. func (gw *groupWatcher) getObjectByRoleLocked(role, namespace, name string) object {
func (gw *groupWatcher) getObjectByRole(role, namespace, name string) object {
if gw == nil { if gw == nil {
// this is needed for testing // this is needed for testing
return nil return nil
} }
o := gw.getCachedObjectByRole(role, namespace, name)
if o != nil {
// Fast path: the object has been found in the cache.
return o
}
// The object wasn't found in the cache. Try querying it directly from API server.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1182#issuecomment-813353359 for details.
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_loads_total{role=%q}`, role)).Inc()
objectType := getObjectTypeByRole(role)
path := getAPIPath(objectType, namespace, "")
path += "/" + name
requestURL := gw.apiServer + path
resp, err := gw.doRequest(requestURL)
if err != nil {
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_load_errors_total{role=%q}`, role)).Inc()
logger.Errorf("cannot obtain data for object %s (namespace=%q, name=%q): %s", role, namespace, name, err)
return nil
}
data, err := ioutil.ReadAll(resp.Body)
_ = resp.Body.Close()
if err != nil {
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_load_errors_total{role=%q}`, role)).Inc()
logger.Errorf("cannot read response from %q: %s", requestURL, err)
return nil
}
if resp.StatusCode != http.StatusOK {
if resp.StatusCode == http.StatusNotFound {
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_load_misses_total{role=%q}`, role)).Inc()
return nil
}
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_load_errors_total{role=%q}`, role)).Inc()
logger.Errorf("unexpected status code when reading response from %q; got %d; want %d; response body: %q", requestURL, resp.StatusCode, http.StatusOK, data)
return nil
}
parseObject, _ := getObjectParsersForRole(role)
o, err = parseObject(data)
if err != nil {
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_direct_object_load_errors_total{role=%q}`, role)).Inc()
logger.Errorf("cannot parse object obtained from %q: %s; response body: %q", requestURL, err, data)
return nil
}
// There is no need in storing the object in urlWatcher cache, since it should be eventually populated there by urlWatcher itself.
return o
}
func (gw *groupWatcher) getCachedObjectByRole(role, namespace, name string) object {
key := namespace + "/" + name key := namespace + "/" + name
gw.startWatchersForRole(role, nil) for _, uw := range gw.m {
uws := gw.getURLWatchers()
for _, uw := range uws {
if uw.role != role { if uw.role != role {
// Role mismatch // Role mismatch
continue continue
@ -291,29 +248,26 @@ func (gw *groupWatcher) getCachedObjectByRole(role, namespace, name string) obje
// Namespace mismatch // Namespace mismatch
continue continue
} }
uw.mu.Lock() if o := uw.objectsByKey[key]; o != nil {
o := uw.objectsByKey[key]
uw.mu.Unlock()
if o != nil {
return o return o
} }
} }
return nil return nil
} }
func (gw *groupWatcher) refreshEndpointsLabels(namespace, key string) { func (gw *groupWatcher) refreshEndpointsLabelsLocked(namespace, key string) {
// Refresh endpoints and endpointslices labels for the corresponding service as Prometheus does. // Refresh endpoints and endpointslices labels for the corresponding service as Prometheus does.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240 // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
gw.refreshObjectLabels("endpoints", namespace, key) gw.refreshObjectLabelsLocked("endpoints", namespace, key)
gw.refreshObjectLabels("endpointslices", namespace, key) gw.refreshObjectLabelsLocked("endpointslices", namespace, key)
} }
func (gw *groupWatcher) refreshObjectLabels(role, namespace, key string) { func (gw *groupWatcher) refreshObjectLabelsLocked(role, namespace, key string) {
// There is no need in starting url watcher for the given role, for _, uw := range gw.m {
// since there is no (namespace, key) object yet for this role. if len(uw.aws) == 0 {
// gw.startWatchersForRole(role, nil) // No apiWatchers to notify
uws := gw.getURLWatchers() continue
for _, uw := range uws { }
if uw.role != role { if uw.role != role {
// Role mismatch // Role mismatch
continue continue
@ -322,16 +276,9 @@ func (gw *groupWatcher) refreshObjectLabels(role, namespace, key string) {
// Namespace mismatch // Namespace mismatch
continue continue
} }
var aws []*apiWatcher if o := uw.objectsByKey[key]; o != nil {
uw.mu.Lock()
o := uw.objectsByKey[key]
if o != nil {
aws = uw.getAPIWatchersLocked()
}
uw.mu.Unlock()
if len(aws) > 0 {
labels := o.getTargetLabels(gw) labels := o.getTargetLabels(gw)
for _, aw := range aws { for aw := range uw.aws {
aw.setScrapeWorks(namespace, key, labels) aw.setScrapeWorks(namespace, key, labels)
} }
} }
@ -349,14 +296,14 @@ func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) {
uw = newURLWatcher(role, namespaces[i], apiURL, gw) uw = newURLWatcher(role, namespaces[i], apiURL, gw)
gw.m[apiURL] = uw gw.m[apiURL] = uw
} }
if aw != nil {
uw.subscribeAPIWatcherLocked(aw)
}
gw.mu.Unlock() gw.mu.Unlock()
if needStart { if needStart {
uw.reloadObjects() uw.reloadObjects()
go uw.watchForUpdates() go uw.watchForUpdates()
} }
if aw != nil {
uw.subscribeAPIWatcher(aw)
}
} }
} }
@ -373,30 +320,24 @@ func (gw *groupWatcher) doRequest(requestURL string) (*http.Response, error) {
} }
func (gw *groupWatcher) registerPendingAPIWatchers() { func (gw *groupWatcher) registerPendingAPIWatchers() {
uws := gw.getURLWatchers()
for _, uw := range uws {
uw.registerPendingAPIWatchers()
}
}
func (gw *groupWatcher) getURLWatchers() []*urlWatcher {
gw.mu.Lock() gw.mu.Lock()
uws := make([]*urlWatcher, 0, len(gw.m)) defer gw.mu.Unlock()
for _, uw := range gw.m { for _, uw := range gw.m {
uws = append(uws, uw) uw.registerPendingAPIWatchersLocked()
} }
gw.mu.Unlock()
return uws
} }
func (gw *groupWatcher) unsubscribeAPIWatcher(aw *apiWatcher) { func (gw *groupWatcher) unsubscribeAPIWatcher(aw *apiWatcher) {
uws := gw.getURLWatchers() gw.mu.Lock()
for _, uw := range uws { defer gw.mu.Unlock()
uw.unsubscribeAPIWatcher(aw) for _, uw := range gw.m {
uw.unsubscribeAPIWatcherLocked(aw)
} }
} }
// urlWatcher watches for an apiURL and updates object states in objectsByKey. // urlWatcher watches for an apiURL and updates object states in objectsByKey.
//
// urlWatcher fields must be accessed under gw.mu lock.
type urlWatcher struct { type urlWatcher struct {
role string role string
namespace string namespace string
@ -406,14 +347,11 @@ type urlWatcher struct {
parseObject parseObjectFunc parseObject parseObjectFunc
parseObjectList parseObjectListFunc parseObjectList parseObjectListFunc
// mu protects aws, awsPending, objectsByKey and resourceVersion
mu sync.Mutex
// awsPending contains pending apiWatcher objects, which are registered in a batch. // awsPending contains pending apiWatcher objects, which are registered in a batch.
// Batch registering saves CPU time needed for registering big number of Kubernetes objects // Batch registering saves CPU time needed for registering big number of Kubernetes objects
// shared among big number of scrape jobs, since per-object labels are generated only once // shared among big number of scrape jobs, since per-object labels are generated only once
// for all the scrape jobs (each scrape job is associated with a single apiWatcher). // for all the scrape jobs (each scrape job is associated with a single apiWatcher).
// See reloadScrapeWorksForAPIWatchers for details. // See reloadScrapeWorksForAPIWatchersLocked for details.
awsPending map[*apiWatcher]struct{} awsPending map[*apiWatcher]struct{}
// aws contains registered apiWatcher objects // aws contains registered apiWatcher objects
@ -457,33 +395,31 @@ func newURLWatcher(role, namespace, apiURL string, gw *groupWatcher) *urlWatcher
return uw return uw
} }
func (uw *urlWatcher) subscribeAPIWatcher(aw *apiWatcher) { func (uw *urlWatcher) subscribeAPIWatcherLocked(aw *apiWatcher) {
uw.mu.Lock()
if _, ok := uw.aws[aw]; !ok { if _, ok := uw.aws[aw]; !ok {
if _, ok := uw.awsPending[aw]; !ok { if _, ok := uw.awsPending[aw]; !ok {
uw.awsPending[aw] = struct{}{} uw.awsPending[aw] = struct{}{}
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Inc() metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Inc()
} }
} }
uw.mu.Unlock()
} }
func (uw *urlWatcher) registerPendingAPIWatchers() { func (uw *urlWatcher) registerPendingAPIWatchersLocked() {
uw.mu.Lock() if len(uw.awsPending) == 0 {
return
}
awsPending := make([]*apiWatcher, 0, len(uw.awsPending)) awsPending := make([]*apiWatcher, 0, len(uw.awsPending))
for aw := range uw.awsPending { for aw := range uw.awsPending {
awsPending = append(awsPending, aw) awsPending = append(awsPending, aw)
delete(uw.awsPending, aw)
uw.aws[aw] = struct{}{} uw.aws[aw] = struct{}{}
} }
uw.reloadScrapeWorksForAPIWatchers(awsPending, uw.objectsByKey) uw.reloadScrapeWorksForAPIWatchersLocked(uw.awsPending)
uw.mu.Unlock() uw.awsPending = make(map[*apiWatcher]struct{})
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="working"}`, uw.role)).Add(len(awsPending)) metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="working"}`, uw.role)).Add(len(awsPending))
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Add(-len(awsPending)) metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Add(-len(awsPending))
} }
func (uw *urlWatcher) unsubscribeAPIWatcher(aw *apiWatcher) { func (uw *urlWatcher) unsubscribeAPIWatcherLocked(aw *apiWatcher) {
uw.mu.Lock()
if _, ok := uw.awsPending[aw]; ok { if _, ok := uw.awsPending[aw]; ok {
delete(uw.awsPending, aw) delete(uw.awsPending, aw)
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Dec() metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="pending"}`, uw.role)).Dec()
@ -492,20 +428,19 @@ func (uw *urlWatcher) unsubscribeAPIWatcher(aw *apiWatcher) {
delete(uw.aws, aw) delete(uw.aws, aw)
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="working"}`, uw.role)).Dec() metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_subscribers{role=%q,status="working"}`, uw.role)).Dec()
} }
uw.mu.Unlock()
} }
func (uw *urlWatcher) setResourceVersion(resourceVersion string) { func (uw *urlWatcher) setResourceVersion(resourceVersion string) {
uw.mu.Lock() uw.gw.mu.Lock()
uw.resourceVersion = resourceVersion uw.resourceVersion = resourceVersion
uw.mu.Unlock() uw.gw.mu.Unlock()
} }
// reloadObjects reloads objects to the latest state and returns resourceVersion for the latest state. // reloadObjects reloads objects to the latest state and returns resourceVersion for the latest state.
func (uw *urlWatcher) reloadObjects() string { func (uw *urlWatcher) reloadObjects() string {
uw.mu.Lock() uw.gw.mu.Lock()
resourceVersion := uw.resourceVersion resourceVersion := uw.resourceVersion
uw.mu.Unlock() uw.gw.mu.Unlock()
if resourceVersion != "" { if resourceVersion != "" {
// Fast path - there is no need in reloading the objects. // Fast path - there is no need in reloading the objects.
return resourceVersion return resourceVersion
@ -530,7 +465,7 @@ func (uw *urlWatcher) reloadObjects() string {
return "" return ""
} }
uw.mu.Lock() uw.gw.mu.Lock()
var updated, removed, added int var updated, removed, added int
for key := range uw.objectsByKey { for key := range uw.objectsByKey {
if o, ok := objectsByKey[key]; ok { if o, ok := objectsByKey[key]; ok {
@ -551,31 +486,34 @@ func (uw *urlWatcher) reloadObjects() string {
uw.objectsRemoved.Add(removed) uw.objectsRemoved.Add(removed)
uw.objectsAdded.Add(added) uw.objectsAdded.Add(added)
uw.objectsCount.Add(added - removed) uw.objectsCount.Add(added - removed)
uw.reloadScrapeWorksForAPIWatchersLocked(uw.aws)
uw.resourceVersion = metadata.ResourceVersion uw.resourceVersion = metadata.ResourceVersion
aws := uw.getAPIWatchersLocked()
uw.mu.Unlock()
uw.reloadScrapeWorksForAPIWatchers(aws, objectsByKey)
if uw.role == "service" { if uw.role == "service" {
// Refresh endpoints labels for the corresponding services as Prometheus does. // Refresh endpoints labels for the corresponding services as Prometheus does.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240 // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
for key := range objectsByKey { for key := range objectsByKey {
uw.gw.refreshEndpointsLabels(uw.namespace, key) uw.gw.refreshEndpointsLabelsLocked(uw.namespace, key)
} }
} }
uw.gw.mu.Unlock()
logger.Infof("reloaded %d objects from %q", len(objectsByKey), requestURL) logger.Infof("reloaded %d objects from %q", len(objectsByKey), requestURL)
return uw.resourceVersion return uw.resourceVersion
} }
func (uw *urlWatcher) reloadScrapeWorksForAPIWatchers(aws []*apiWatcher, objectsByKey map[string]object) { func (uw *urlWatcher) reloadScrapeWorksForAPIWatchersLocked(awsMap map[*apiWatcher]struct{}) {
if len(aws) == 0 { if len(awsMap) == 0 {
return return
} }
aws := make([]*apiWatcher, 0, len(awsMap))
for aw := range awsMap {
aws = append(aws, aw)
}
swosByKey := make([]map[string][]interface{}, len(aws)) swosByKey := make([]map[string][]interface{}, len(aws))
for i := range aws { for i := range aws {
swosByKey[i] = make(map[string][]interface{}) swosByKey[i] = make(map[string][]interface{})
} }
for key, o := range objectsByKey { for key, o := range uw.objectsByKey {
labels := o.getTargetLabels(uw.gw) labels := o.getTargetLabels(uw.gw)
for i, aw := range aws { for i, aw := range aws {
swos := aw.getScrapeWorkObjectsForLabels(labels) swos := aw.getScrapeWorkObjectsForLabels(labels)
@ -589,15 +527,6 @@ func (uw *urlWatcher) reloadScrapeWorksForAPIWatchers(aws []*apiWatcher, objects
} }
} }
func (uw *urlWatcher) getAPIWatchersLocked() []*apiWatcher {
awsMap := uw.aws
aws := make([]*apiWatcher, 0, len(awsMap))
for aw := range awsMap {
aws = append(aws, aw)
}
return aws
}
// watchForUpdates watches for object updates starting from uw.resourceVersion and updates the corresponding objects to the latest state. // watchForUpdates watches for object updates starting from uw.resourceVersion and updates the corresponding objects to the latest state.
// //
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes // See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
@ -673,7 +602,7 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
return err return err
} }
key := o.key() key := o.key()
uw.mu.Lock() uw.gw.mu.Lock()
if _, ok := uw.objectsByKey[key]; !ok { if _, ok := uw.objectsByKey[key]; !ok {
uw.objectsCount.Inc() uw.objectsCount.Inc()
uw.objectsAdded.Inc() uw.objectsAdded.Inc()
@ -681,41 +610,39 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
uw.objectsUpdated.Inc() uw.objectsUpdated.Inc()
} }
uw.objectsByKey[key] = o uw.objectsByKey[key] = o
aws := uw.getAPIWatchersLocked() if len(uw.aws) > 0 {
uw.mu.Unlock()
if len(aws) > 0 {
labels := o.getTargetLabels(uw.gw) labels := o.getTargetLabels(uw.gw)
for _, aw := range aws { for aw := range uw.aws {
aw.setScrapeWorks(uw.namespace, key, labels) aw.setScrapeWorks(uw.namespace, key, labels)
} }
} }
if uw.role == "service" { if uw.role == "service" {
// Refresh endpoints labels for the corresponding service as Prometheus does. // Refresh endpoints labels for the corresponding service as Prometheus does.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240 // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
uw.gw.refreshEndpointsLabels(uw.namespace, key) uw.gw.refreshEndpointsLabelsLocked(uw.namespace, key)
} }
uw.gw.mu.Unlock()
case "DELETED": case "DELETED":
o, err := uw.parseObject(we.Object) o, err := uw.parseObject(we.Object)
if err != nil { if err != nil {
return err return err
} }
key := o.key() key := o.key()
uw.mu.Lock() uw.gw.mu.Lock()
if _, ok := uw.objectsByKey[key]; ok { if _, ok := uw.objectsByKey[key]; ok {
uw.objectsCount.Dec() uw.objectsCount.Dec()
uw.objectsRemoved.Inc() uw.objectsRemoved.Inc()
delete(uw.objectsByKey, key) delete(uw.objectsByKey, key)
} }
aws := uw.getAPIWatchersLocked() for aw := range uw.aws {
uw.mu.Unlock()
for _, aw := range aws {
aw.removeScrapeWorks(uw.namespace, key) aw.removeScrapeWorks(uw.namespace, key)
} }
if uw.role == "service" { if uw.role == "service" {
// Refresh endpoints labels for the corresponding service as Prometheus does. // Refresh endpoints labels for the corresponding service as Prometheus does.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240 // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
uw.gw.refreshEndpointsLabels(uw.namespace, key) uw.gw.refreshEndpointsLabelsLocked(uw.namespace, key)
} }
uw.gw.mu.Unlock()
case "BOOKMARK": case "BOOKMARK":
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks // See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks
bm, err := parseBookmark(we.Object) bm, err := parseBookmark(we.Object)

View file

@ -1,8 +1,13 @@
package kubernetes package kubernetes
import ( import (
"encoding/json"
"net/http"
"net/http/httptest"
"reflect" "reflect"
"sync"
"testing" "testing"
"time"
) )
func TestGetAPIPathsWithNamespaces(t *testing.T) { func TestGetAPIPathsWithNamespaces(t *testing.T) {
@ -175,3 +180,849 @@ func TestParseBookmark(t *testing.T) {
t.Fatalf("unexpected resourceVersion; got %q; want %q", bm.Metadata.ResourceVersion, expectedResourceVersion) t.Fatalf("unexpected resourceVersion; got %q; want %q", bm.Metadata.ResourceVersion, expectedResourceVersion)
} }
} }
func TestGetScrapeWorkObjects(t *testing.T) {
type testCase struct {
name string
sdc *SDConfig
expectedTargetsLen int
initAPIObjectsByRole map[string][]byte
// will be added for watching api.
watchAPIMustAddObjectsByRole map[string][][]byte
}
cases := []testCase{
{
name: "simple 1 pod with update 1",
sdc: &SDConfig{
Role: "pod",
},
expectedTargetsLen: 2,
initAPIObjectsByRole: map[string][]byte{
"pod": []byte(`{
"kind": "PodList",
"apiVersion": "v1",
"metadata": {
"resourceVersion": "72425"
},
"items": [
{
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"labels": {
"app.kubernetes.io/instance": "stack",
"pod-template-hash": "5b9c6cf775"
},
"name": "stack-name-1",
"namespace": "default"
},
"spec": {
"containers": [
{
"name": "generic-pod"
}
]
},
"status": {
"podIP": "10.10.2.2",
"phase": "Running"
}
}]}`),
},
watchAPIMustAddObjectsByRole: map[string][][]byte{
"pod": {
[]byte(`{
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"labels": {
"app.kubernetes.io/instance": "stack",
"pod-template-hash": "5b9c6cf775"
},
"name": "stack-next-2",
"namespace": "default"
},
"spec": {
"containers": [
{
"name": "generic-pod-2"
}
]
},
"status": {
"podIP": "10.10.2.5",
"phase": "Running"
}
}`),
},
},
},
{
name: "endpoints with service update",
sdc: &SDConfig{
Role: "endpoints",
},
expectedTargetsLen: 2,
initAPIObjectsByRole: map[string][]byte{
"service": []byte(`{
"kind": "ServiceList",
"apiVersion": "v1",
"metadata": {
"resourceVersion": "72425"
},
"items": []}`),
"endpoints": []byte(`{
"kind": "EndpointsList",
"apiVersion": "v1",
"metadata": {
"resourceVersion": "72425"
},
"items": [
{
"apiVersion": "v1",
"kind": "Endpoints",
"metadata": {
"annotations": {
"endpoints.kubernetes.io/last-change-trigger-time": "2021-04-27T02:06:55Z"
},
"labels": {
"app.kubernetes.io/managed-by": "Helm"
},
"name": "stack-kube-state-metrics",
"namespace": "default"
},
"subsets": [
{
"addresses": [
{
"ip": "10.244.0.5",
"nodeName": "kind-control-plane",
"targetRef": {
"kind": "Pod",
"name": "stack-kube-state-metrics-db5879bf8-bg78p",
"namespace": "default"
}
}
],
"ports": [
{
"name": "http",
"port": 8080,
"protocol": "TCP"
}
]
}
]
}
]}`),
"pod": []byte(`{
"kind": "PodList",
"apiVersion": "v1",
"metadata": {
"resourceVersion": "72425"
},
"items": [
{
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"labels": {
"app.kubernetes.io/instance": "stack"
},
"name": "stack-kube-state-metrics-db5879bf8-bg78p",
"namespace": "default"
},
"spec": {
"containers": [
{
"image": "k8s.gcr.io/kube-state-metrics/kube-state-metrics:v1.9.8",
"name": "kube-state-metrics",
"ports": [
{
"containerPort": 8080,
"protocol": "TCP"
}
]
},
{
"image": "k8s.gcr.io/kube-state-metrics/kube-state-metrics:v1.9.8",
"name": "kube-state-metrics-2",
"ports": [
{
"containerPort": 8085,
"protocol": "TCP"
}
]
}
]
},
"status": {
"phase": "Running",
"podIP": "10.244.0.5"
}
}
]}`),
},
watchAPIMustAddObjectsByRole: map[string][][]byte{
"service": {
[]byte(`{
"apiVersion": "v1",
"kind": "Service",
"metadata": {
"annotations": {
"meta.helm.sh/release-name": "stack"
},
"labels": {
"app.kubernetes.io/managed-by": "Helm",
"app.kubernetes.io/name": "kube-state-metrics"
},
"name": "stack-kube-state-metrics",
"namespace": "default"
},
"spec": {
"clusterIP": "10.97.109.249",
"ports": [
{
"name": "http",
"port": 8080,
"protocol": "TCP",
"targetPort": 8080
}
],
"selector": {
"app.kubernetes.io/instance": "stack",
"app.kubernetes.io/name": "kube-state-metrics"
},
"type": "ClusterIP"
}
}`),
},
},
},
{
name: "get nodes",
sdc: &SDConfig{Role: "node"},
expectedTargetsLen: 2,
initAPIObjectsByRole: map[string][]byte{
"node": []byte(`{
"kind": "NodeList",
"apiVersion": "v1",
"metadata": {
"selfLink": "/api/v1/nodes",
"resourceVersion": "22627"
},
"items": [
{
"apiVersion": "v1",
"kind": "Node",
"metadata": {
"annotations": {
"kubeadm.alpha.kubernetes.io/cri-socket": "/run/containerd/containerd.sock"
},
"labels": {
"beta.kubernetes.io/arch": "amd64",
"beta.kubernetes.io/os": "linux"
},
"name": "kind-control-plane-new"
},
"status": {
"addresses": [
{
"address": "10.10.2.5",
"type": "InternalIP"
},
{
"address": "kind-control-plane",
"type": "Hostname"
}
]
}
}
]}`),
},
watchAPIMustAddObjectsByRole: map[string][][]byte{
"node": {
[]byte(`{
"apiVersion": "v1",
"kind": "Node",
"metadata": {
"annotations": {
"kubeadm.alpha.kubernetes.io/cri-socket": "/run/containerd/containerd.sock"
},
"labels": {
"beta.kubernetes.io/arch": "amd64",
"beta.kubernetes.io/os": "linux"
},
"name": "kind-control-plane"
},
"status": {
"addresses": [
{
"address": "10.10.2.2",
"type": "InternalIP"
},
{
"address": "kind-control-plane",
"type": "Hostname"
}
]
}
}`),
},
},
},
{
name: "2 service with 2 added",
sdc: &SDConfig{Role: "service"},
expectedTargetsLen: 4,
initAPIObjectsByRole: map[string][]byte{
"service": []byte(`{
"kind": "ServiceList",
"apiVersion": "v1",
"metadata": {
"selfLink": "/api/v1/services",
"resourceVersion": "60485"
},
"items": [
{
"metadata": {
"name": "kube-dns",
"namespace": "kube-system",
"labels": {
"k8s-app": "kube-dns"
}
},
"spec": {
"ports": [
{
"name": "dns",
"protocol": "UDP",
"port": 53,
"targetPort": 53
},
{
"name": "dns-tcp",
"protocol": "TCP",
"port": 53,
"targetPort": 53
}
],
"selector": {
"k8s-app": "kube-dns"
},
"clusterIP": "10.96.0.10",
"type": "ClusterIP",
"sessionAffinity": "None"
}
}
]
}`),
},
watchAPIMustAddObjectsByRole: map[string][][]byte{
"service": {
[]byte(`{
"metadata": {
"name": "another-service-1",
"namespace": "default",
"labels": {
"k8s-app": "kube-dns"
}
},
"spec": {
"ports": [
{
"name": "some-app-1-tcp",
"protocol": "TCP",
"port": 1053,
"targetPort": 1053
}
],
"selector": {
"k8s-app": "some-app-1"
},
"clusterIP": "10.96.0.10",
"type": "ClusterIP"
}
}`),
[]byte(`{
"metadata": {
"name": "another-service-2",
"namespace": "default",
"labels": {
"k8s-app": "kube-dns"
}
},
"spec": {
"ports": [
{
"name": "some-app-2-tcp",
"protocol": "TCP",
"port": 1053,
"targetPort": 1053
}
],
"selector": {
"k8s-app": "some-app-2"
},
"clusterIP": "10.96.0.15",
"type": "ClusterIP"
}
}`),
},
},
},
{
name: "1 ingress with 2 add",
expectedTargetsLen: 3,
sdc: &SDConfig{
Role: "ingress",
},
initAPIObjectsByRole: map[string][]byte{
"ingress": []byte(`{
"kind": "IngressList",
"apiVersion": "extensions/v1beta1",
"metadata": {
"selfLink": "/apis/extensions/v1beta1/ingresses",
"resourceVersion": "351452"
},
"items": [
{
"metadata": {
"name": "test-ingress",
"namespace": "default"
},
"spec": {
"backend": {
"serviceName": "testsvc",
"servicePort": 80
},
"rules": [
{
"host": "foobar"
}
]
},
"status": {
"loadBalancer": {
"ingress": [
{
"ip": "172.17.0.2"
}
]
}
}
}
]
}`),
},
watchAPIMustAddObjectsByRole: map[string][][]byte{
"ingress": {
[]byte(`{
"metadata": {
"name": "test-ingress-1",
"namespace": "default"
},
"spec": {
"backend": {
"serviceName": "testsvc",
"servicePort": 801
},
"rules": [
{
"host": "foobar"
}
]
},
"status": {
"loadBalancer": {
"ingress": [
{
"ip": "172.17.0.3"
}
]
}
}
}`),
[]byte(`{
"metadata": {
"name": "test-ingress-2",
"namespace": "default"
},
"spec": {
"backend": {
"serviceName": "testsvc",
"servicePort": 802
},
"rules": [
{
"host": "foobar"
}
]
},
"status": {
"loadBalancer": {
"ingress": [
{
"ip": "172.17.0.3"
}
]
}
}
}`),
},
},
},
{
name: "7 endpointslices slice with 1 service update",
sdc: &SDConfig{
Role: "endpointslices",
},
expectedTargetsLen: 7,
initAPIObjectsByRole: map[string][]byte{
"endpointslices": []byte(`{
"kind": "EndpointSliceList",
"apiVersion": "discovery.k8s.io/v1beta1",
"metadata": {
"selfLink": "/apis/discovery.k8s.io/v1beta1/endpointslices",
"resourceVersion": "1177"
},
"items": [
{
"metadata": {
"name": "kubernetes",
"namespace": "default",
"labels": {
"kubernetes.io/service-name": "kubernetes"
}
},
"addressType": "IPv4",
"endpoints": [
{
"addresses": [
"172.18.0.2"
],
"conditions": {
"ready": true
}
}
],
"ports": [
{
"name": "https",
"protocol": "TCP",
"port": 6443
}
]
},
{
"metadata": {
"name": "kube-dns",
"namespace": "kube-system",
"labels": {
"kubernetes.io/service-name": "kube-dns"
}
},
"addressType": "IPv4",
"endpoints": [
{
"addresses": [
"10.244.0.3"
],
"conditions": {
"ready": true
},
"targetRef": {
"kind": "Pod",
"namespace": "kube-system",
"name": "coredns-66bff467f8-z8czk",
"uid": "36a545ff-dbba-4192-a5f6-1dbb0c21c73d",
"resourceVersion": "603"
},
"topology": {
"kubernetes.io/hostname": "kind-control-plane"
}
},
{
"addresses": [
"10.244.0.4"
],
"conditions": {
"ready": true
},
"targetRef": {
"kind": "Pod",
"namespace": "kube-system",
"name": "coredns-66bff467f8-kpbhk",
"uid": "db38d8b4-847a-4e82-874c-fe444fba2718",
"resourceVersion": "576"
},
"topology": {
"kubernetes.io/hostname": "kind-control-plane"
}
}
],
"ports": [
{
"name": "dns-tcp",
"protocol": "TCP",
"port": 53
},
{
"name": "metrics",
"protocol": "TCP",
"port": 9153
},
{
"name": "dns",
"protocol": "UDP",
"port": 53
}
]
}
]
}`),
"pod": []byte(`{
"kind": "PodList",
"apiVersion": "v1",
"metadata": {
"resourceVersion": "72425"
},
"items": [
{
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"labels": {
"app.kubernetes.io/instance": "stack",
"pod-template-hash": "5b9c6cf775"
},
"name": "coredns-66bff467f8-kpbhk",
"namespace": "kube-system"
},
"spec": {
"containers": [
{
"name": "generic-pod"
}
]
},
"status": {
"podIP": "10.10.2.2",
"phase": "Running"
}
},
{
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"labels": {
"app.kubernetes.io/instance": "stack",
"pod-template-hash": "5b9c6cf775"
},
"name": "coredns-66bff467f8-z8czk",
"namespace": "kube-system"
},
"spec": {
"containers": [
{
"name": "generic-pod"
}
]
},
"status": {
"podIP": "10.10.2.3",
"phase": "Running"
}
}
]}`),
"service": []byte(`{
"kind": "ServiceList",
"apiVersion": "v1",
"metadata": {
"selfLink": "/api/v1/services",
"resourceVersion": "60485"
},
"items": [
{
"metadata": {
"name": "kube-dns",
"namespace": "kube-system",
"labels": {
"k8s-app": "kube-dns"
}
},
"spec": {
"ports": [
{
"name": "dns",
"protocol": "UDP",
"port": 53,
"targetPort": 53
},
{
"name": "dns-tcp",
"protocol": "TCP",
"port": 53,
"targetPort": 53
}
],
"selector": {
"k8s-app": "kube-dns"
},
"clusterIP": "10.96.0.10",
"type": "ClusterIP",
"sessionAffinity": "None"
}
}
]
}`),
},
watchAPIMustAddObjectsByRole: map[string][][]byte{
"service": {
[]byte(` {
"metadata": {
"name": "kube-dns",
"namespace": "kube-system",
"labels": {
"k8s-app": "kube-dns",
"some-new": "label-value"
}
},
"spec": {
"ports": [
{
"name": "dns-tcp",
"protocol": "TCP",
"port": 53,
"targetPort": 53
}
],
"selector": {
"k8s-app": "kube-dns"
},
"clusterIP": "10.96.0.10",
"type": "ClusterIP",
"sessionAffinity": "None"
}
}
`),
},
},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
watchPublishersByRole := make(map[string]*watchObjectBroadcast)
mux := http.NewServeMux()
for role, obj := range tc.initAPIObjectsByRole {
watchBroadCaster := &watchObjectBroadcast{}
watchPublishersByRole[role] = watchBroadCaster
apiPath := getAPIPath(getObjectTypeByRole(role), "", "")
addAPIURLHandler(t, mux, apiPath, obj, watchBroadCaster)
}
testAPIServer := httptest.NewServer(mux)
tc.sdc.APIServer = testAPIServer.URL
ac, err := newAPIConfig(tc.sdc, "", func(metaLabels map[string]string) interface{} {
var res []interface{}
for k := range metaLabels {
res = append(res, k)
}
return res
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
tc.sdc.cfg = ac
ac.aw.mustStart()
defer ac.aw.mustStop()
_, err = tc.sdc.GetScrapeWorkObjects()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// need to wait, for subscribers to start.
time.Sleep(80 * time.Millisecond)
for role, objs := range tc.watchAPIMustAddObjectsByRole {
for _, obj := range objs {
watchPublishersByRole[role].pub(obj)
}
}
for _, ch := range watchPublishersByRole {
ch.shutdown()
}
if len(tc.watchAPIMustAddObjectsByRole) > 0 {
// updates async, need to wait some time.
// i guess, poll is not reliable.
time.Sleep(80 * time.Millisecond)
}
got, err := tc.sdc.GetScrapeWorkObjects()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(got) != tc.expectedTargetsLen {
t.Fatalf("unexpected count of objects, got: %d, want: %d", len(got), tc.expectedTargetsLen)
}
})
}
}
type watchObjectBroadcast struct {
mu sync.Mutex
subscribers []chan []byte
}
func (o *watchObjectBroadcast) pub(msg []byte) {
o.mu.Lock()
defer o.mu.Unlock()
for i := range o.subscribers {
c := o.subscribers[i]
select {
case c <- msg:
default:
}
}
}
func (o *watchObjectBroadcast) sub() <-chan []byte {
c := make(chan []byte, 5)
o.mu.Lock()
o.subscribers = append(o.subscribers, c)
o.mu.Unlock()
return c
}
func (o *watchObjectBroadcast) shutdown() {
o.mu.Lock()
defer o.mu.Unlock()
for i := range o.subscribers {
c := o.subscribers[i]
close(c)
}
}
func addAPIURLHandler(t *testing.T, mux *http.ServeMux, apiURL string, initObjects []byte, notifier *watchObjectBroadcast) {
t.Helper()
mux.HandleFunc(apiURL, func(w http.ResponseWriter, r *http.Request) {
if needWatch := r.URL.Query().Get("watch"); len(needWatch) > 0 {
// start watch handler
w.WriteHeader(200)
flusher := w.(http.Flusher)
flusher.Flush()
updateC := notifier.sub()
for obj := range updateC {
we := WatchEvent{
Type: "ADDED",
Object: obj,
}
szd, err := json.Marshal(we)
if err != nil {
t.Fatalf("cannot serialize: %v", err)
}
_, _ = w.Write(szd)
flusher.Flush()
}
return
}
w.WriteHeader(200)
_, _ = w.Write(initObjects)
})
}

View file

@ -92,7 +92,7 @@ type EndpointPort struct {
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#endpoints // See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#endpoints
func (eps *Endpoints) getTargetLabels(gw *groupWatcher) []map[string]string { func (eps *Endpoints) getTargetLabels(gw *groupWatcher) []map[string]string {
var svc *Service var svc *Service
if o := gw.getObjectByRole("service", eps.Metadata.Namespace, eps.Metadata.Name); o != nil { if o := gw.getObjectByRoleLocked("service", eps.Metadata.Namespace, eps.Metadata.Name); o != nil {
svc = o.(*Service) svc = o.(*Service)
} }
podPortsSeen := make(map[*Pod][]int) podPortsSeen := make(map[*Pod][]int)
@ -140,7 +140,7 @@ func appendEndpointLabelsForAddresses(ms []map[string]string, gw *groupWatcher,
for _, ea := range eas { for _, ea := range eas {
var p *Pod var p *Pod
if ea.TargetRef.Name != "" { if ea.TargetRef.Name != "" {
if o := gw.getObjectByRole("pod", ea.TargetRef.Namespace, ea.TargetRef.Name); o != nil { if o := gw.getObjectByRoleLocked("pod", ea.TargetRef.Namespace, ea.TargetRef.Name); o != nil {
p = o.(*Pod) p = o.(*Pod)
} }
} }

View file

@ -39,14 +39,14 @@ func parseEndpointSlice(data []byte) (object, error) {
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#endpointslices // See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#endpointslices
func (eps *EndpointSlice) getTargetLabels(gw *groupWatcher) []map[string]string { func (eps *EndpointSlice) getTargetLabels(gw *groupWatcher) []map[string]string {
var svc *Service var svc *Service
if o := gw.getObjectByRole("service", eps.Metadata.Namespace, eps.Metadata.Name); o != nil { if o := gw.getObjectByRoleLocked("service", eps.Metadata.Namespace, eps.Metadata.Name); o != nil {
svc = o.(*Service) svc = o.(*Service)
} }
podPortsSeen := make(map[*Pod][]int) podPortsSeen := make(map[*Pod][]int)
var ms []map[string]string var ms []map[string]string
for _, ess := range eps.Endpoints { for _, ess := range eps.Endpoints {
var p *Pod var p *Pod
if o := gw.getObjectByRole("pod", ess.TargetRef.Namespace, ess.TargetRef.Name); o != nil { if o := gw.getObjectByRoleLocked("pod", ess.TargetRef.Namespace, ess.TargetRef.Name); o != nil {
p = o.(*Pod) p = o.(*Pod)
} }
for _, epp := range eps.Ports { for _, epp := range eps.Ports {

View file

@ -437,54 +437,49 @@ func (pt *partition) AddRows(rows []rawRow) {
} }
type rawRowsShards struct { type rawRowsShards struct {
lock sync.Mutex shardIdx uint32
shardIdx int
// Shards reduce lock contention when adding rows on multi-CPU systems. // Shards reduce lock contention when adding rows on multi-CPU systems.
shards []rawRowsShard shards []rawRowsShard
} }
func (rrs *rawRowsShards) init() { func (rrss *rawRowsShards) init() {
rrs.shards = make([]rawRowsShard, rawRowsShardsPerPartition) rrss.shards = make([]rawRowsShard, rawRowsShardsPerPartition)
} }
func (rrs *rawRowsShards) addRows(pt *partition, rows []rawRow) { func (rrss *rawRowsShards) addRows(pt *partition, rows []rawRow) {
rrs.lock.Lock() n := atomic.AddUint32(&rrss.shardIdx, 1)
rrs.shardIdx++ shards := rrss.shards
if rrs.shardIdx >= len(rrs.shards) { idx := n % uint32(len(shards))
rrs.shardIdx = 0 shard := &shards[idx]
}
shard := &rrs.shards[rrs.shardIdx]
rrs.lock.Unlock()
shard.addRows(pt, rows) shard.addRows(pt, rows)
} }
func (rrs *rawRowsShards) Len() int { func (rrss *rawRowsShards) Len() int {
n := 0 n := 0
for i := range rrs.shards[:] { for i := range rrss.shards[:] {
n += rrs.shards[i].Len() n += rrss.shards[i].Len()
} }
return n return n
} }
type rawRowsShard struct { type rawRowsShard struct {
lock sync.Mutex mu sync.Mutex
rows []rawRow rows []rawRow
lastFlushTime uint64 lastFlushTime uint64
} }
func (rrs *rawRowsShard) Len() int { func (rrs *rawRowsShard) Len() int {
rrs.lock.Lock() rrs.mu.Lock()
n := len(rrs.rows) n := len(rrs.rows)
rrs.lock.Unlock() rrs.mu.Unlock()
return n return n
} }
func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) { func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) {
var rrss []*rawRows var rrss []*rawRows
rrs.lock.Lock() rrs.mu.Lock()
if cap(rrs.rows) == 0 { if cap(rrs.rows) == 0 {
rrs.rows = getRawRowsMaxSize().rows rrs.rows = getRawRowsMaxSize().rows
} }
@ -506,7 +501,7 @@ func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) {
rrss = append(rrss, rr) rrss = append(rrss, rr)
rrs.lastFlushTime = fasttime.UnixTimestamp() rrs.lastFlushTime = fasttime.UnixTimestamp()
} }
rrs.lock.Unlock() rrs.mu.Unlock()
for _, rr := range rrss { for _, rr := range rrss {
pt.addRowsPart(rr.rows) pt.addRowsPart(rr.rows)
@ -752,10 +747,16 @@ func (pt *partition) flushRawRows(isFinal bool) {
pt.rawRows.flush(pt, isFinal) pt.rawRows.flush(pt, isFinal)
} }
func (rrs *rawRowsShards) flush(pt *partition, isFinal bool) { func (rrss *rawRowsShards) flush(pt *partition, isFinal bool) {
for i := range rrs.shards[:] { var wg sync.WaitGroup
rrs.shards[i].flush(pt, isFinal) wg.Add(len(rrss.shards))
for i := range rrss.shards {
go func(rrs *rawRowsShard) {
rrs.flush(pt, isFinal)
wg.Done()
}(&rrss.shards[i])
} }
wg.Wait()
} }
func (rrs *rawRowsShard) flush(pt *partition, isFinal bool) { func (rrs *rawRowsShard) flush(pt *partition, isFinal bool) {
@ -766,12 +767,12 @@ func (rrs *rawRowsShard) flush(pt *partition, isFinal bool) {
flushSeconds = 1 flushSeconds = 1
} }
rrs.lock.Lock() rrs.mu.Lock()
if isFinal || currentTime-rrs.lastFlushTime > uint64(flushSeconds) { if isFinal || currentTime-rrs.lastFlushTime > uint64(flushSeconds) {
rr = getRawRowsMaxSize() rr = getRawRowsMaxSize()
rrs.rows, rr.rows = rr.rows, rrs.rows rrs.rows, rr.rows = rr.rows, rrs.rows
} }
rrs.lock.Unlock() rrs.mu.Unlock()
if rr != nil { if rr != nil {
pt.addRowsPart(rr.rows) pt.addRowsPart(rr.rows)
@ -1363,7 +1364,7 @@ func (pt *partition) removeStaleParts() {
pt.snapshotLock.RLock() pt.snapshotLock.RLock()
var removeWG sync.WaitGroup var removeWG sync.WaitGroup
for pw := range m { for pw := range m {
logger.Infof("removing part %q, since its data is out of the configured retention (%d secs)", pw.p.path, retentionDeadline/1000) logger.Infof("removing part %q, since its data is out of the configured retention (%d secs)", pw.p.path, pt.retentionMsecs/1000)
removeWG.Add(1) removeWG.Add(1)
fs.MustRemoveAllWithDoneCallback(pw.p.path, removeWG.Done) fs.MustRemoveAllWithDoneCallback(pw.p.path, removeWG.Done)
} }

View file

@ -182,12 +182,27 @@ func (e *fastEncL5) Encode(dst *tokens, src []byte) {
// match. But, prior to the match, src[nextEmit:s] are unmatched. Emit // match. But, prior to the match, src[nextEmit:s] are unmatched. Emit
// them as literal bytes. // them as literal bytes.
// Extend the 4-byte match as long as possible.
if l == 0 { if l == 0 {
// Extend the 4-byte match as long as possible.
l = e.matchlenLong(s+4, t+4, src) + 4 l = e.matchlenLong(s+4, t+4, src) + 4
} else if l == maxMatchLength { } else if l == maxMatchLength {
l += e.matchlenLong(s+l, t+l, src) l += e.matchlenLong(s+l, t+l, src)
} }
// Try to locate a better match by checking the end of best match...
if sAt := s + l; l < 30 && sAt < sLimit {
eLong := e.bTable[hash7(load6432(src, sAt), tableBits)].Cur.offset
// Test current
t2 := eLong - e.cur - l
off := s - t2
if t2 >= 0 && off < maxMatchOffset && off > 0 {
if l2 := e.matchlenLong(s, t2, src); l2 > l {
t = t2
l = l2
}
}
}
// Extend backwards // Extend backwards
for t > 0 && s > nextEmit && src[t-1] == src[s-1] { for t > 0 && s > nextEmit && src[t-1] == src[s-1] {
s-- s--

View file

@ -211,6 +211,31 @@ func (e *fastEncL6) Encode(dst *tokens, src []byte) {
l += e.matchlenLong(s+l, t+l, src) l += e.matchlenLong(s+l, t+l, src)
} }
// Try to locate a better match by checking the end-of-match...
if sAt := s + l; sAt < sLimit {
eLong := &e.bTable[hash7(load6432(src, sAt), tableBits)]
// Test current
t2 := eLong.Cur.offset - e.cur - l
off := s - t2
if off < maxMatchOffset {
if off > 0 && t2 >= 0 {
if l2 := e.matchlenLong(s, t2, src); l2 > l {
t = t2
l = l2
}
}
// Test next:
t2 = eLong.Prev.offset - e.cur - l
off := s - t2
if off > 0 && off < maxMatchOffset && t2 >= 0 {
if l2 := e.matchlenLong(s, t2, src); l2 > l {
t = t2
l = l2
}
}
}
}
// Extend backwards // Extend backwards
for t > 0 && s > nextEmit && src[t-1] == src[s-1] { for t > 0 && s > nextEmit && src[t-1] == src[s-1] {
s-- s--

View file

@ -152,8 +152,8 @@ This package:
file out level insize outsize millis mb/s file out level insize outsize millis mb/s
silesia.tar zskp 1 211947520 73101992 643 313.87 silesia.tar zskp 1 211947520 73101992 643 313.87
silesia.tar zskp 2 211947520 67504318 969 208.38 silesia.tar zskp 2 211947520 67504318 969 208.38
silesia.tar zskp 3 211947520 65177448 1899 106.44 silesia.tar zskp 3 211947520 64595893 2007 100.68
silesia.tar zskp 4 211947520 61381950 8115 24.91 silesia.tar zskp 4 211947520 60995370 7691 26.28
cgo zstd: cgo zstd:
silesia.tar zstd 1 211947520 73605392 543 371.56 silesia.tar zstd 1 211947520 73605392 543 371.56
@ -171,8 +171,8 @@ https://files.klauspost.com/compress/gob-stream.7z
file out level insize outsize millis mb/s file out level insize outsize millis mb/s
gob-stream zskp 1 1911399616 235022249 3088 590.30 gob-stream zskp 1 1911399616 235022249 3088 590.30
gob-stream zskp 2 1911399616 205669791 3786 481.34 gob-stream zskp 2 1911399616 205669791 3786 481.34
gob-stream zskp 3 1911399616 185792019 9324 195.48 gob-stream zskp 3 1911399616 175034659 9636 189.17
gob-stream zskp 4 1911399616 171537212 32113 56.76 gob-stream zskp 4 1911399616 167273881 29337 62.13
gob-stream zstd 1 1911399616 249810424 2637 691.26 gob-stream zstd 1 1911399616 249810424 2637 691.26
gob-stream zstd 3 1911399616 208192146 3490 522.31 gob-stream zstd 3 1911399616 208192146 3490 522.31
gob-stream zstd 6 1911399616 193632038 6687 272.56 gob-stream zstd 6 1911399616 193632038 6687 272.56
@ -187,8 +187,8 @@ http://mattmahoney.net/dc/textdata.html
file out level insize outsize millis mb/s file out level insize outsize millis mb/s
enwik9 zskp 1 1000000000 343848582 3609 264.18 enwik9 zskp 1 1000000000 343848582 3609 264.18
enwik9 zskp 2 1000000000 317276632 5746 165.97 enwik9 zskp 2 1000000000 317276632 5746 165.97
enwik9 zskp 3 1000000000 294540704 11725 81.34 enwik9 zskp 3 1000000000 292243069 12162 78.41
enwik9 zskp 4 1000000000 276609671 44029 21.66 enwik9 zskp 4 1000000000 275241169 36430 26.18
enwik9 zstd 1 1000000000 358072021 3110 306.65 enwik9 zstd 1 1000000000 358072021 3110 306.65
enwik9 zstd 3 1000000000 313734672 4784 199.35 enwik9 zstd 3 1000000000 313734672 4784 199.35
enwik9 zstd 6 1000000000 295138875 10290 92.68 enwik9 zstd 6 1000000000 295138875 10290 92.68
@ -202,8 +202,8 @@ https://files.klauspost.com/compress/github-june-2days-2019.json.zst
file out level insize outsize millis mb/s file out level insize outsize millis mb/s
github-june-2days-2019.json zskp 1 6273951764 699045015 10620 563.40 github-june-2days-2019.json zskp 1 6273951764 699045015 10620 563.40
github-june-2days-2019.json zskp 2 6273951764 617881763 11687 511.96 github-june-2days-2019.json zskp 2 6273951764 617881763 11687 511.96
github-june-2days-2019.json zskp 3 6273951764 537511906 29252 204.54 github-june-2days-2019.json zskp 3 6273951764 524340691 34043 175.75
github-june-2days-2019.json zskp 4 6273951764 512796117 97791 61.18 github-june-2days-2019.json zskp 4 6273951764 503314661 93811 63.78
github-june-2days-2019.json zstd 1 6273951764 766284037 8450 708.00 github-june-2days-2019.json zstd 1 6273951764 766284037 8450 708.00
github-june-2days-2019.json zstd 3 6273951764 661889476 10927 547.57 github-june-2days-2019.json zstd 3 6273951764 661889476 10927 547.57
github-june-2days-2019.json zstd 6 6273951764 642756859 22996 260.18 github-june-2days-2019.json zstd 6 6273951764 642756859 22996 260.18
@ -217,8 +217,8 @@ https://files.klauspost.com/compress/rawstudio-mint14.7z
file out level insize outsize millis mb/s file out level insize outsize millis mb/s
rawstudio-mint14.tar zskp 1 8558382592 3667489370 20210 403.84 rawstudio-mint14.tar zskp 1 8558382592 3667489370 20210 403.84
rawstudio-mint14.tar zskp 2 8558382592 3364592300 31873 256.07 rawstudio-mint14.tar zskp 2 8558382592 3364592300 31873 256.07
rawstudio-mint14.tar zskp 3 8558382592 3224594213 71751 113.75 rawstudio-mint14.tar zskp 3 8558382592 3158085214 77675 105.08
rawstudio-mint14.tar zskp 4 8558382592 3027332295 486243 16.79 rawstudio-mint14.tar zskp 4 8558382592 3020370044 404956 20.16
rawstudio-mint14.tar zstd 1 8558382592 3609250104 17136 476.27 rawstudio-mint14.tar zstd 1 8558382592 3609250104 17136 476.27
rawstudio-mint14.tar zstd 3 8558382592 3341679997 29262 278.92 rawstudio-mint14.tar zstd 3 8558382592 3341679997 29262 278.92
rawstudio-mint14.tar zstd 6 8558382592 3235846406 77904 104.77 rawstudio-mint14.tar zstd 6 8558382592 3235846406 77904 104.77
@ -232,8 +232,8 @@ https://files.klauspost.com/compress/nyc-taxi-data-10M.csv.zst
file out level insize outsize millis mb/s file out level insize outsize millis mb/s
nyc-taxi-data-10M.csv zskp 1 3325605752 641339945 8925 355.35 nyc-taxi-data-10M.csv zskp 1 3325605752 641339945 8925 355.35
nyc-taxi-data-10M.csv zskp 2 3325605752 591748091 11268 281.44 nyc-taxi-data-10M.csv zskp 2 3325605752 591748091 11268 281.44
nyc-taxi-data-10M.csv zskp 3 3325605752 538490114 19880 159.53 nyc-taxi-data-10M.csv zskp 3 3325605752 530289687 25239 125.66
nyc-taxi-data-10M.csv zskp 4 3325605752 495986829 89368 35.49 nyc-taxi-data-10M.csv zskp 4 3325605752 490907191 65939 48.10
nyc-taxi-data-10M.csv zstd 1 3325605752 687399637 8233 385.18 nyc-taxi-data-10M.csv zstd 1 3325605752 687399637 8233 385.18
nyc-taxi-data-10M.csv zstd 3 3325605752 598514411 10065 315.07 nyc-taxi-data-10M.csv zstd 3 3325605752 598514411 10065 315.07
nyc-taxi-data-10M.csv zstd 6 3325605752 570522953 20038 158.27 nyc-taxi-data-10M.csv zstd 6 3325605752 570522953 20038 158.27

View file

@ -220,6 +220,20 @@ encodeLoop:
best = bestOf(best, matchAt(candidateL.prev-e.cur, s, uint32(cv), -1)) best = bestOf(best, matchAt(candidateL.prev-e.cur, s, uint32(cv), -1))
best = bestOf(best, matchAt(candidateL2.offset-e.cur, s+1, uint32(cv2), -1)) best = bestOf(best, matchAt(candidateL2.offset-e.cur, s+1, uint32(cv2), -1))
best = bestOf(best, matchAt(candidateL2.prev-e.cur, s+1, uint32(cv2), -1)) best = bestOf(best, matchAt(candidateL2.prev-e.cur, s+1, uint32(cv2), -1))
// See if we can find a better match by checking where the current best ends.
// Use that offset to see if we can find a better full match.
if sAt := best.s + best.length; sAt < sLimit {
nextHashL := hash8(load6432(src, sAt), bestLongTableBits)
candidateEnd := e.longTable[nextHashL]
if pos := candidateEnd.offset - e.cur - best.length; pos >= 0 {
bestEnd := bestOf(best, matchAt(pos, best.s, load3232(src, best.s), -1))
if pos := candidateEnd.prev - e.cur - best.length; pos >= 0 {
bestEnd = bestOf(bestEnd, matchAt(pos, best.s, load3232(src, best.s), -1))
}
best = bestEnd
}
}
} }
// We have a match, we can store the forward value // We have a match, we can store the forward value

View file

@ -412,8 +412,41 @@ encodeLoop:
cv = load6432(src, s) cv = load6432(src, s)
} }
// A 4-byte match has been found. Update recent offsets. // Try to find a better match by searching for a long match at the end of the current best match
// We'll later see if more than 4 bytes. if true && s+matched < sLimit {
nextHashL := hash8(load6432(src, s+matched), betterLongTableBits)
cv := load3232(src, s)
candidateL := e.longTable[nextHashL]
coffsetL := candidateL.offset - e.cur - matched
if coffsetL >= 0 && coffsetL < s && s-coffsetL < e.maxMatchOff && cv == load3232(src, coffsetL) {
// Found a long match, at least 4 bytes.
matchedNext := e.matchlen(s+4, coffsetL+4, src) + 4
if matchedNext > matched {
t = coffsetL
matched = matchedNext
if debugMatches {
println("long match at end-of-match")
}
}
}
// Check prev long...
if true {
coffsetL = candidateL.prev - e.cur - matched
if coffsetL >= 0 && coffsetL < s && s-coffsetL < e.maxMatchOff && cv == load3232(src, coffsetL) {
// Found a long match, at least 4 bytes.
matchedNext := e.matchlen(s+4, coffsetL+4, src) + 4
if matchedNext > matched {
t = coffsetL
matched = matchedNext
if debugMatches {
println("prev long match at end-of-match")
}
}
}
}
}
// A match has been found. Update recent offsets.
offset2 = offset1 offset2 = offset1
offset1 = s - t offset1 = s - t
@ -905,9 +938,41 @@ encodeLoop:
} }
cv = load6432(src, s) cv = load6432(src, s)
} }
// Try to find a better match by searching for a long match at the end of the current best match
if s+matched < sLimit {
nextHashL := hash8(load6432(src, s+matched), betterLongTableBits)
cv := load3232(src, s)
candidateL := e.longTable[nextHashL]
coffsetL := candidateL.offset - e.cur - matched
if coffsetL >= 0 && coffsetL < s && s-coffsetL < e.maxMatchOff && cv == load3232(src, coffsetL) {
// Found a long match, at least 4 bytes.
matchedNext := e.matchlen(s+4, coffsetL+4, src) + 4
if matchedNext > matched {
t = coffsetL
matched = matchedNext
if debugMatches {
println("long match at end-of-match")
}
}
}
// A 4-byte match has been found. Update recent offsets. // Check prev long...
// We'll later see if more than 4 bytes. if true {
coffsetL = candidateL.prev - e.cur - matched
if coffsetL >= 0 && coffsetL < s && s-coffsetL < e.maxMatchOff && cv == load3232(src, coffsetL) {
// Found a long match, at least 4 bytes.
matchedNext := e.matchlen(s+4, coffsetL+4, src) + 4
if matchedNext > matched {
t = coffsetL
matched = matchedNext
if debugMatches {
println("prev long match at end-of-match")
}
}
}
}
}
// A match has been found. Update recent offsets.
offset2 = offset1 offset2 = offset1
offset1 = s - t offset1 = s - t

120
vendor/github.com/klauspost/compress/zstd/zip.go generated vendored Normal file
View file

@ -0,0 +1,120 @@
// Copyright 2019+ Klaus Post. All rights reserved.
// License information can be found in the LICENSE file.
package zstd
import (
"errors"
"io"
"sync"
)
// ZipMethodWinZip is the method for Zstandard compressed data inside Zip files for WinZip.
// See https://www.winzip.com/win/en/comp_info.html
const ZipMethodWinZip = 93
// ZipMethodPKWare is the method number used by PKWARE to indicate Zstandard compression.
// See https://pkware.cachefly.net/webdocs/APPNOTE/APPNOTE-6.3.7.TXT
const ZipMethodPKWare = 20
var zipReaderPool sync.Pool
// newZipReader cannot be used since we would leak goroutines...
func newZipReader(r io.Reader) io.ReadCloser {
dec, ok := zipReaderPool.Get().(*Decoder)
if ok {
dec.Reset(r)
} else {
d, err := NewReader(r, WithDecoderConcurrency(1), WithDecoderLowmem(true))
if err != nil {
panic(err)
}
dec = d
}
return &pooledZipReader{dec: dec}
}
type pooledZipReader struct {
mu sync.Mutex // guards Close and Read
dec *Decoder
}
func (r *pooledZipReader) Read(p []byte) (n int, err error) {
r.mu.Lock()
defer r.mu.Unlock()
if r.dec == nil {
return 0, errors.New("Read after Close")
}
dec, err := r.dec.Read(p)
return dec, err
}
func (r *pooledZipReader) Close() error {
r.mu.Lock()
defer r.mu.Unlock()
var err error
if r.dec != nil {
err = r.dec.Reset(nil)
zipReaderPool.Put(r.dec)
r.dec = nil
}
return err
}
type pooledZipWriter struct {
mu sync.Mutex // guards Close and Read
enc *Encoder
}
func (w *pooledZipWriter) Write(p []byte) (n int, err error) {
w.mu.Lock()
defer w.mu.Unlock()
if w.enc == nil {
return 0, errors.New("Write after Close")
}
return w.enc.Write(p)
}
func (w *pooledZipWriter) Close() error {
w.mu.Lock()
defer w.mu.Unlock()
var err error
if w.enc != nil {
err = w.enc.Close()
zipReaderPool.Put(w.enc)
w.enc = nil
}
return err
}
// ZipCompressor returns a compressor that can be registered with zip libraries.
// The provided encoder options will be used on all encodes.
func ZipCompressor(opts ...EOption) func(w io.Writer) (io.WriteCloser, error) {
var pool sync.Pool
return func(w io.Writer) (io.WriteCloser, error) {
enc, ok := pool.Get().(*Encoder)
if ok {
enc.Reset(w)
} else {
var err error
enc, err = NewWriter(w, opts...)
if err != nil {
return nil, err
}
}
return &pooledZipWriter{enc: enc}, nil
}
}
// ZipDecompressor returns a decompressor that can be registered with zip libraries.
// See ZipCompressor for example.
func ZipDecompressor() func(r io.Reader) io.ReadCloser {
return func(r io.Reader) io.ReadCloser {
d, err := NewReader(r, WithDecoderConcurrency(1), WithDecoderLowmem(true))
if err != nil {
panic(err)
}
return d.IOReadCloser()
}
}

2
vendor/modules.txt vendored
View file

@ -126,7 +126,7 @@ github.com/jmespath/go-jmespath
github.com/jstemmer/go-junit-report github.com/jstemmer/go-junit-report
github.com/jstemmer/go-junit-report/formatter github.com/jstemmer/go-junit-report/formatter
github.com/jstemmer/go-junit-report/parser github.com/jstemmer/go-junit-report/parser
# github.com/klauspost/compress v1.12.1 # github.com/klauspost/compress v1.12.2
## explicit ## explicit
github.com/klauspost/compress/flate github.com/klauspost/compress/flate
github.com/klauspost/compress/fse github.com/klauspost/compress/fse