Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files

This commit is contained in:
Aliaksandr Valialkin 2022-12-02 19:18:34 -08:00
commit 079953b4ea
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
16 changed files with 128 additions and 82 deletions

View file

@ -2,7 +2,7 @@ PKG_PREFIX := github.com/VictoriaMetrics/VictoriaMetrics
DATEINFO_TAG ?= $(shell date -u +'%Y%m%d-%H%M%S')
BUILDINFO_TAG ?= $(shell echo $$(git describe --long --all | tr '/' '-')$$( \
git diff-index --quiet HEAD -- || echo '-dirty-'$$(git diff-index -u HEAD | openssl sha1 | cut -c 10-17)))
git diff-index --quiet HEAD -- || echo '-dirty-'$$(git diff-index -u HEAD | openssl sha1 | cut -d' ' -f2 | cut -c 1-8)))
PKG_TAG ?= $(shell git tag -l --points-at HEAD)
ifeq ($(PKG_TAG),)

View file

@ -1050,6 +1050,8 @@ The shortlist of configuration flags is the following:
Optional OAuth2 scopes to use for -notifier.url. Scopes must be delimited by ';'.
-remoteWrite.oauth2.tokenUrl string
Optional OAuth2 tokenURL to use for -notifier.url.
-remoteWrite.sendTimeout duration
Timeout for sending data to the configured -remoteWrite.url. (default 30s)
-remoteWrite.showURL
Whether to show -remoteWrite.url in the exported metrics. It is hidden by default, since it can contain sensitive info such as auth key
-remoteWrite.tlsCAFile string

View file

@ -149,6 +149,16 @@ func (s *VMStorage) setPrometheusInstantReqParams(r *http.Request, query string,
timestamp = timestamp.Truncate(s.evaluationInterval)
}
q.Set("time", fmt.Sprintf("%d", timestamp.Unix()))
if s.evaluationInterval > 0 { // set step as evaluationInterval by default
// always convert to seconds to keep compatibility with older
// Prometheus versions. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1943
q.Set("step", fmt.Sprintf("%ds", int(s.evaluationInterval.Seconds())))
}
if s.queryStep > 0 { // override step with user-specified value
// always convert to seconds to keep compatibility with older
// Prometheus versions. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1943
q.Set("step", fmt.Sprintf("%ds", int(s.queryStep.Seconds())))
}
r.URL.RawQuery = q.Encode()
s.setPrometheusReqParams(r, query)
}
@ -163,6 +173,11 @@ func (s *VMStorage) setPrometheusRangeReqParams(r *http.Request, query string, s
q := r.URL.Query()
q.Add("start", fmt.Sprintf("%d", start.Unix()))
q.Add("end", fmt.Sprintf("%d", end.Unix()))
if s.evaluationInterval > 0 { // set step as evaluationInterval by default
// always convert to seconds to keep compatibility with older
// Prometheus versions. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1943
q.Set("step", fmt.Sprintf("%ds", int(s.evaluationInterval.Seconds())))
}
r.URL.RawQuery = q.Encode()
s.setPrometheusReqParams(r, query)
}
@ -178,15 +193,5 @@ func (s *VMStorage) setPrometheusReqParams(r *http.Request, query string) {
}
}
q.Set("query", query)
if s.evaluationInterval > 0 { // set step as evaluationInterval by default
// always convert to seconds to keep compatibility with older
// Prometheus versions. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1943
q.Set("step", fmt.Sprintf("%ds", int(s.evaluationInterval.Seconds())))
}
if s.queryStep > 0 { // override step with user-specified value
// always convert to seconds to keep compatibility with older
// Prometheus versions. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1943
q.Set("step", fmt.Sprintf("%ds", int(s.queryStep.Seconds())))
}
r.URL.RawQuery = q.Encode()
}

View file

@ -199,6 +199,10 @@ func TestVMRangeQuery(t *testing.T) {
if _, err := strconv.ParseInt(endTS, 10, 64); err != nil {
t.Errorf("failed to parse 'end' query param: %s", err)
}
step := r.URL.Query().Get("step")
if step != "15s" {
t.Errorf("expected 'step' query param to be 15s; got %q instead", step)
}
switch c {
case 0:
w.Write([]byte(`{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"__name__":"vm_rows"},"values":[[1583786142,"13763"]]}]}}`))
@ -212,7 +216,7 @@ func TestVMRangeQuery(t *testing.T) {
if err != nil {
t.Fatalf("unexpected: %s", err)
}
s := NewVMStorage(srv.URL, authCfg, time.Minute, 0, false, srv.Client())
s := NewVMStorage(srv.URL, authCfg, time.Minute, *queryStep, false, srv.Client())
pq := s.BuildWithParams(QuerierParams{DataSourceType: string(datasourcePrometheus), EvaluationInterval: 15 * time.Second})

View file

@ -168,9 +168,12 @@ func (g *Group) Restore(ctx context.Context, qb datasource.QuerierBuilder, lookb
if rr.For < 1 {
continue
}
// ignore g.ExtraFilterLabels on purpose, so it
// won't affect the restore procedure.
q := qb.BuildWithParams(datasource.QuerierParams{})
// ignore QueryParams on purpose, because they could contain
// query filters. This may affect the restore procedure.
q := qb.BuildWithParams(datasource.QuerierParams{
DataSourceType: g.Type.String(),
Headers: g.Headers,
})
if err := rr.Restore(ctx, q, lookback, labels); err != nil {
return fmt.Errorf("error while restoring rule %q: %w", rule, err)
}

View file

@ -77,13 +77,12 @@ func Init(ctx context.Context) (*Client, error) {
}
return NewClient(ctx, Config{
Addr: *addr,
AuthCfg: authCfg,
Concurrency: *concurrency,
MaxQueueSize: *maxQueueSize,
MaxBatchSize: *maxBatchSize,
FlushInterval: *flushInterval,
DisablePathAppend: *disablePathAppend,
Transport: t,
Addr: *addr,
AuthCfg: authCfg,
Concurrency: *concurrency,
MaxQueueSize: *maxQueueSize,
MaxBatchSize: *maxBatchSize,
FlushInterval: *flushInterval,
Transport: t,
})
}

View file

@ -22,6 +22,7 @@ import (
var (
disablePathAppend = flag.Bool("remoteWrite.disablePathAppend", false, "Whether to disable automatic appending of '/api/v1/write' path to the configured -remoteWrite.url.")
sendTimeout = flag.Duration("remoteWrite.sendTimeout", 30*time.Second, "Timeout for sending data to the configured -remoteWrite.url.")
)
// Client is an asynchronous HTTP client for writing
@ -57,13 +58,8 @@ type Config struct {
MaxQueueSize int
// FlushInterval defines time interval for flushing batches
FlushInterval time.Duration
// WriteTimeout defines timeout for HTTP write request
// to remote storage
WriteTimeout time.Duration
// Transport will be used by the underlying http.Client
Transport *http.Transport
// DisablePathAppend can be used to not automatically append '/api/v1/write' to the remote write url
DisablePathAppend bool
}
const (
@ -89,9 +85,6 @@ func NewClient(ctx context.Context, cfg Config) (*Client, error) {
if cfg.FlushInterval == 0 {
cfg.FlushInterval = defaultFlushInterval
}
if cfg.WriteTimeout == 0 {
cfg.WriteTimeout = defaultWriteTimeout
}
if cfg.Transport == nil {
cfg.Transport = http.DefaultTransport.(*http.Transport).Clone()
}
@ -101,7 +94,7 @@ func NewClient(ctx context.Context, cfg Config) (*Client, error) {
}
c := &Client{
c: &http.Client{
Timeout: cfg.WriteTimeout,
Timeout: *sendTimeout,
Transport: cfg.Transport,
},
addr: strings.TrimSuffix(cfg.Addr, "/"),

View file

@ -4436,7 +4436,7 @@
{
"targetBlank": true,
"title": "Drilldown",
"url": "/d/oS7Bi_0Wz?viewPanel=196&${__url_time_range}${__all_variables}"
"url": "/d/oS7Bi_0Wz?viewPanel=196&${__url_time_range}&${__all_variables}"
}
],
"mappings": [],

View file

@ -110,7 +110,7 @@ See also [case studies](https://docs.victoriametrics.com/CaseStudies.html).
* [How ClickHouse inspired us to build a high performance time series database](https://www.youtube.com/watch?v=p9qjb_yoBro). See also [slides](https://docs.google.com/presentation/d/1SdFrwsyR-HMXfbzrY8xfDZH_Dg6E7E5NJ84tQozMn3w/edit?usp=sharing)
* [OSA Con 2022: Specifics of data analysis in Time Series Databases](https://www.youtube.com/watch?v=_zORxrgLtec)
* [OSMC 2022 | VictoriaMetrics: scaling to 100 million metrics per second](https://www.slideshare.net/NETWAYS/osmc-2022-victoriametrics-scaling-to-100-million-metrics-per-second-by-aliaksandr-valialkin)
* [OSMC 2022. VictoriaMetrics: scaling to 100 million metrics per second](https://www.slideshare.net/NETWAYS/osmc-2022-victoriametrics-scaling-to-100-million-metrics-per-second-by-aliaksandr-valialkin)
* [CNCF Paris Meetup 2022-09-15 - VictoriaMetrics - The cost of scale in Prometheus ecosystem](https://www.youtube.com/watch?v=gcZYHpri2Hw). See also [slides](https://docs.google.com/presentation/d/1jhZuKnAXi15M-mdBP5a4ZAiyrMeHhYmzO8xcZ6pMyLc/edit?usp=sharing)
* [Comparing Thanos to VictoriaMetrics cluster](https://faun.pub/comparing-thanos-to-victoriametrics-cluster-b193bea1683)
* [Evaluation performance and correctness: VictoriaMetrics response](https://valyala.medium.com/evaluating-performance-and-correctness-victoriametrics-response-e27315627e87)

View file

@ -19,6 +19,10 @@ The following tip changes can be tested by building VictoriaMetrics components f
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `exported_` prefix to metric names exported by scrape targets if these metric names clash with [automatically generated metrics](https://docs.victoriametrics.com/vmagent.html#automatically-generated-metrics) such as `up`, `scrape_samples_scraped`, etc. This prevents from corruption of automatically generated metrics. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3406).
* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): improve error message when the requested path cannot be properly parsed, so users could identify the issue and properly fix the path. Now the error message links to [url format docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3402).
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add ability to copy data from sources via Prometheus `remote_read` protocol. See [these docs](https://docs.victoriametrics.com/vmctl.html#migrating-data-by-remote-read-protocol). The related issues: [one](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3132) and [two](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1101).
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): add `-remoteWrite.sendTimeout` command-line flag, which allows configuring timeout for sending data to `-remoteWrite.url`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3408).
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): properly pass HTTP headers during the alert state restore procedure. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3418).
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): properly specify rule evaluation step during the [replay mode](https://docs.victoriametrics.com/vmalert.html#rules-backfilling). The `step` value was previously overriden by `-datasource.queryStep` command-line flag.
## [v1.84.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.84.0)

View file

@ -209,8 +209,8 @@ Additionally, all the VictoriaMetrics components allow setting flag values via e
## Automatic vmstorage discovery
[Entrprise version of VictoriaMetrics](https://docs.victoriametrics.com/enterprise.html) supports [dns+srv](https://en.wikipedia.org/wiki/SRV_record) names
at `-storageNode` command-line flag passed to `vminsert` and `vmstorage`. In this case the provided `dns+srv` names are resolved
into tcp addresses of `vmstorage` nodes to connect to. The list of discovered `vmstorage` nodes is automatically updated at `vminsert` and `vmstorage`
at `-storageNode` command-line flag passed to `vminsert` and `vmselect`. In this case the provided `dns+srv` names are resolved
into tcp addresses of `vmstorage` nodes to connect to. The list of discovered `vmstorage` nodes is automatically updated at `vminsert` and `vmselect`
when it changes behind the corresponding `dns+srv` names. The `dns+srv` names must be prefixed with `dns+srv:` prefix.
It is possible passing multiple `dns+srv` names to `-storageNode` command-line flag. In this case all these names are resolved to tcp addresses of `vmstorage` nodes to connect to.

View file

@ -1054,6 +1054,8 @@ The shortlist of configuration flags is the following:
Optional OAuth2 scopes to use for -notifier.url. Scopes must be delimited by ';'.
-remoteWrite.oauth2.tokenUrl string
Optional OAuth2 tokenURL to use for -notifier.url.
-remoteWrite.sendTimeout duration
Timeout for sending data to the configured -remoteWrite.url. (default 30s)
-remoteWrite.showURL
Whether to show -remoteWrite.url in the exported metrics. It is hidden by default, since it can contain sensitive info such as auth key
-remoteWrite.tlsCAFile string

View file

@ -586,21 +586,13 @@ func (tb *Table) convertToV1280() {
}
func (tb *Table) mergePartsOptimal(pws []*partWrapper, stopCh <-chan struct{}) error {
defer func() {
// Remove isInMerge flag from pws.
tb.partsLock.Lock()
for _, pw := range pws {
// Do not check for pws.isInMerge set to false,
// since it may be set to false in mergeParts below.
pw.isInMerge = false
}
tb.partsLock.Unlock()
}()
for len(pws) > defaultPartsToMerge {
if err := tb.mergeParts(pws[:defaultPartsToMerge], stopCh, false); err != nil {
pwsChunk := pws[:defaultPartsToMerge]
pws = pws[defaultPartsToMerge:]
if err := tb.mergeParts(pwsChunk, stopCh, false); err != nil {
tb.releasePartsToMerge(pws)
return fmt.Errorf("cannot merge %d parts: %w", defaultPartsToMerge, err)
}
pws = pws[defaultPartsToMerge:]
}
if len(pws) == 0 {
return nil
@ -874,6 +866,17 @@ func (tb *Table) partMerger() error {
var errNothingToMerge = fmt.Errorf("nothing to merge")
func (tb *Table) releasePartsToMerge(pws []*partWrapper) {
tb.partsLock.Lock()
for _, pw := range pws {
if !pw.isInMerge {
logger.Panicf("BUG: missing isInMerge flag on the part %q", pw.p.path)
}
pw.isInMerge = false
}
tb.partsLock.Unlock()
}
// mergeParts merges pws.
//
// Merging is immediately stopped if stopCh is closed.
@ -884,6 +887,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isOuterP
// Nothing to merge.
return errNothingToMerge
}
defer tb.releasePartsToMerge(pws)
atomic.AddUint64(&tb.mergesCount, 1)
atomic.AddUint64(&tb.activeMerges, 1)
@ -891,18 +895,6 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isOuterP
startTime := time.Now()
defer func() {
// Remove isInMerge flag from pws.
tb.partsLock.Lock()
for _, pw := range pws {
if !pw.isInMerge {
logger.Panicf("BUG: missing isInMerge flag on the part %q", pw.p.path)
}
pw.isInMerge = false
}
tb.partsLock.Unlock()
}()
// Prepare blockStreamReaders for source parts.
bsrs := make([]*blockStreamReader, 0, len(pws))
defer func() {

View file

@ -800,21 +800,13 @@ func (pt *partition) flushInmemoryParts(dstPws []*partWrapper, force bool) ([]*p
}
func (pt *partition) mergePartsOptimal(pws []*partWrapper, stopCh <-chan struct{}) error {
defer func() {
// Remove isInMerge flag from pws.
pt.partsLock.Lock()
for _, pw := range pws {
// Do not check for pws.isInMerge set to false,
// since it may be set to false in mergeParts below.
pw.isInMerge = false
}
pt.partsLock.Unlock()
}()
for len(pws) > defaultPartsToMerge {
if err := pt.mergeParts(pws[:defaultPartsToMerge], stopCh); err != nil {
pwsChunk := pws[:defaultPartsToMerge]
pws = pws[defaultPartsToMerge:]
if err := pt.mergeParts(pwsChunk, stopCh); err != nil {
pt.releasePartsToMerge(pws)
return fmt.Errorf("cannot merge %d parts: %w", defaultPartsToMerge, err)
}
pws = pws[defaultPartsToMerge:]
}
if len(pws) == 0 {
return nil

View file

@ -2309,7 +2309,7 @@ func (s *Storage) updateNextDayMetricIDs(date uint64) {
if e.date == date {
pendingMetricIDs.Union(&e.v)
} else {
// Do not add pendingMetricIDs from the previous day to the cyrrent day,
// Do not add pendingMetricIDs from the previous day to the current day,
// since this may result in missing registration of the metricIDs in the per-day inverted index.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3309
pendingMetricIDs = &uint64set.Set{}
@ -2337,14 +2337,15 @@ func (s *Storage) updateCurrHourMetricIDs(hour uint64) {
var m *uint64set.Set
if hm.hour == hour {
m = hm.m.Clone()
} else {
m = &uint64set.Set{}
}
if hour%24 != 0 {
// Do not add pending metricIDs from the previous hour to the current hour on the next day,
// since this may result in missing registration of the metricIDs in the per-day inverted index.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3309
m.Union(newMetricIDs)
} else {
m = newMetricIDs
if hour%24 == 0 {
// Do not add pending metricIDs from the previous hour to the current hour on the next day,
// since this may result in missing registration of the metricIDs in the per-day inverted index.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3309
m = &uint64set.Set{}
}
}
hmNew := &hourMetricIDs{
m: m,

View file

@ -157,9 +157,12 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
t.Run("empty_pending_metric_ids_stale_curr_hour", func(t *testing.T) {
s := newStorage()
hour := fasttime.UnixHour()
if hour%24 == 0 {
hour++
}
hmOrig := &hourMetricIDs{
m: &uint64set.Set{},
hour: 123,
hour: hour - 1,
}
hmOrig.m.Add(12)
hmOrig.m.Add(34)
@ -220,9 +223,12 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
s.pendingHourEntries = pendingHourEntries
hour := fasttime.UnixHour()
if hour%24 == 0 {
hour++
}
hmOrig := &hourMetricIDs{
m: &uint64set.Set{},
hour: 123,
hour: hour - 1,
}
hmOrig.m.Add(12)
hmOrig.m.Add(34)
@ -287,6 +293,49 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0)
}
})
t.Run("nonempty_pending_metric_ids_valid_curr_hour_start_of_day", func(t *testing.T) {
s := newStorage()
pendingHourEntries := &uint64set.Set{}
pendingHourEntries.Add(343)
pendingHourEntries.Add(32424)
pendingHourEntries.Add(8293432)
s.pendingHourEntries = pendingHourEntries
hour := fasttime.UnixHour()
hour -= hour % 24
hmOrig := &hourMetricIDs{
m: &uint64set.Set{},
hour: hour,
}
hmOrig.m.Add(12)
hmOrig.m.Add(34)
s.currHourMetricIDs.Store(hmOrig)
s.updateCurrHourMetricIDs(hour)
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
if hmCurr.hour != hour {
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
}
m := pendingHourEntries.Clone()
hmOrig.m.ForEach(func(part []uint64) bool {
for _, metricID := range part {
m.Add(metricID)
}
return true
})
if !hmCurr.m.Equal(m) {
t.Fatalf("unexpected hm.m; got %v; want %v", hmCurr.m, m)
}
hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs)
hmEmpty := &hourMetricIDs{}
if !reflect.DeepEqual(hmPrev, hmEmpty) {
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty)
}
if s.pendingHourEntries.Len() != 0 {
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0)
}
})
t.Run("nonempty_pending_metric_ids_from_previous_hour_new_day", func(t *testing.T) {
s := newStorage()