Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files

This commit is contained in:
Aliaksandr Valialkin 2020-11-25 23:03:44 +02:00
commit afe6d2e736
43 changed files with 562 additions and 262 deletions

View file

@ -566,6 +566,7 @@ VictoriaMetrics supports the following handlers from [Graphite Tags API](https:/
* [/tags/findSeries](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags)
* [/tags/autoComplete/tags](https://graphite.readthedocs.io/en/stable/tags.html#auto-complete-support)
* [/tags/autoComplete/values](https://graphite.readthedocs.io/en/stable/tags.html#auto-complete-support)
* [/tags/delSeries](https://graphite.readthedocs.io/en/stable/tags.html#removing-series-from-the-tagdb)
## How to build from sources

View file

@ -17,6 +17,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
)
@ -25,6 +26,8 @@ var (
minScrapeInterval = flag.Duration("dedup.minScrapeInterval", 0, "Remove superflouos samples from time series if they are located closer to each other than this duration. "+
"This may be useful for reducing overhead when multiple identically configured Prometheus instances write data to the same VictoriaMetrics. "+
"Deduplication is disabled if the -dedup.minScrapeInterval is 0")
dryRun = flag.Bool("dryRun", false, "Whether to check only -promscrape.config and then exit. "+
"Unknown config entries are allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse")
)
func main() {
@ -34,6 +37,18 @@ func main() {
buildinfo.Init()
logger.Init()
cgroup.UpdateGOMAXPROCSToCPUQuota()
if promscrape.IsDryRun() {
*dryRun = true
}
if *dryRun {
if err := promscrape.CheckConfig(); err != nil {
logger.Fatalf("error when checking -promscrape.config: %s", err)
}
logger.Infof("-promscrape.config is ok; exitting with 0 status code")
return
}
logger.Infof("starting VictoriaMetrics at %q...", *httpListenAddr)
startTime := time.Now()
storage.SetMinScrapeIntervalForDeduplication(*minScrapeInterval)

View file

@ -48,7 +48,8 @@ var (
"Usually :4242 must be set. Doesn't work if empty")
opentsdbHTTPListenAddr = flag.String("opentsdbHTTPListenAddr", "", "TCP address to listen for OpentTSDB HTTP put requests. Usually :4242 must be set. Doesn't work if empty")
dryRun = flag.Bool("dryRun", false, "Whether to check only config files without running vmagent. The following files are checked: "+
"-promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig . See also -promscrape.config.dryRun")
"-promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig . "+
"Unknown config entries are allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse")
)
var (
@ -68,15 +69,19 @@ func main() {
logger.Init()
cgroup.UpdateGOMAXPROCSToCPUQuota()
if *dryRun {
if err := flag.Set("promscrape.config.strictParse", "true"); err != nil {
logger.Panicf("BUG: cannot set promscrape.config.strictParse=true: %s", err)
if promscrape.IsDryRun() {
if err := promscrape.CheckConfig(); err != nil {
logger.Fatalf("error when checking -promscrape.config: %s", err)
}
logger.Infof("-promscrape.config is ok; exitting with 0 status code")
return
}
if *dryRun {
if err := remotewrite.CheckRelabelConfigs(); err != nil {
logger.Fatalf("error when checking relabel configs: %s", err)
}
if err := promscrape.CheckConfig(); err != nil {
logger.Fatalf("error when checking Prometheus config: %s", err)
logger.Fatalf("error when checking -promscrape.config: %s", err)
}
logger.Infof("all the configs are ok; exitting with 0 status code")
return

View file

@ -43,7 +43,7 @@ func main() {
cgroup.UpdateGOMAXPROCSToCPUQuota()
if len(*snapshotCreateURL) > 0 {
logger.Infof("%s", "Snapshots enabled")
logger.Infof("Snapshots enabled")
logger.Infof("Snapshot create url %s", *snapshotCreateURL)
if len(*snapshotDeleteURL) <= 0 {
err := flag.Set("snapshot.deleteURL", strings.Replace(*snapshotCreateURL, "/create", "/delete", 1))
@ -55,17 +55,17 @@ func main() {
name, err := snapshot.Create(*snapshotCreateURL)
if err != nil {
logger.Fatalf("%s", err)
logger.Fatalf("cannot create snapshot: %s", err)
}
err = flag.Set("snapshotName", name)
if err != nil {
logger.Fatalf("Failed to set snapshotName flag: %v", err)
logger.Fatalf("cannot set snapshotName flag: %v", err)
}
defer func() {
err := snapshot.Delete(*snapshotDeleteURL, name)
if err != nil {
logger.Fatalf("%s", err)
logger.Fatalf("cannot delete snapshot: %s", err)
}
}()
}

View file

@ -20,7 +20,7 @@ type snapshot struct {
// Create creates a snapshot and the provided api endpoint and returns
// the snapshot name
func Create(createSnapshotURL string) (string, error) {
logger.Infof("%s", "Creating snapshot")
logger.Infof("Creating snapshot")
u, err := url.Parse(createSnapshotURL)
if err != nil {
return "", err

View file

@ -1,102 +0,0 @@
package graphite
import (
"fmt"
"net/http"
"sort"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
graphiteparser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
)
// TagsTagSeriesHandler implements /tags/tagSeries handler.
//
// See https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb
func TagsTagSeriesHandler(w http.ResponseWriter, r *http.Request) error {
return registerMetrics(w, r, false)
}
// TagsTagMultiSeriesHandler implements /tags/tagMultiSeries handler.
//
// See https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb
func TagsTagMultiSeriesHandler(w http.ResponseWriter, r *http.Request) error {
return registerMetrics(w, r, true)
}
func registerMetrics(w http.ResponseWriter, r *http.Request, isJSONResponse bool) error {
startTime := time.Now()
if err := r.ParseForm(); err != nil {
return fmt.Errorf("cannot parse form values: %w", err)
}
paths := r.Form["path"]
var row graphiteparser.Row
var labels []prompb.Label
var b []byte
var tagsPool []graphiteparser.Tag
mrs := make([]storage.MetricRow, len(paths))
ct := time.Now().UnixNano() / 1e6
canonicalPaths := make([]string, len(paths))
for i, path := range paths {
var err error
tagsPool, err = row.UnmarshalMetricAndTags(path, tagsPool[:0])
if err != nil {
return fmt.Errorf("cannot parse path=%q: %w", path, err)
}
// Construct canonical path according to https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb
sort.Slice(row.Tags, func(i, j int) bool {
return row.Tags[i].Key < row.Tags[j].Key
})
b = append(b[:0], row.Metric...)
for _, tag := range row.Tags {
b = append(b, ';')
b = append(b, tag.Key...)
b = append(b, '=')
b = append(b, tag.Value...)
}
canonicalPaths[i] = string(b)
// Convert parsed metric and tags to labels.
labels = append(labels[:0], prompb.Label{
Name: []byte("__name__"),
Value: []byte(row.Metric),
})
for _, tag := range row.Tags {
labels = append(labels, prompb.Label{
Name: []byte(tag.Key),
Value: []byte(tag.Value),
})
}
// Put labels with the current timestamp to MetricRow
mr := &mrs[i]
mr.MetricNameRaw = storage.MarshalMetricNameRaw(mr.MetricNameRaw[:0], labels)
mr.Timestamp = ct
}
if err := vmstorage.RegisterMetricNames(mrs); err != nil {
return fmt.Errorf("cannot register paths: %w", err)
}
// Return response
contentType := "text/plain; charset=utf-8"
if isJSONResponse {
contentType = "application/json; charset=utf-8"
}
w.Header().Set("Content-Type", contentType)
WriteTagsTagMultiSeriesResponse(w, canonicalPaths, isJSONResponse)
if isJSONResponse {
tagsTagMultiSeriesDuration.UpdateDuration(startTime)
} else {
tagsTagSeriesDuration.UpdateDuration(startTime)
}
return nil
}
var (
tagsTagSeriesDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/tags/tagSeries"}`)
tagsTagMultiSeriesDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/tags/tagMultiSeries"}`)
)

View file

@ -1,75 +0,0 @@
// Code generated by qtc from "tags_tag_multi_series_response.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
// TagsTagMultiSeriesResponse generates response for /tags/tagMultiSeries .See https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:5
package graphite
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:5
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:5
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:5
func StreamTagsTagMultiSeriesResponse(qw422016 *qt422016.Writer, canonicalPaths []string, isJSONResponse bool) {
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:6
if isJSONResponse {
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:6
qw422016.N().S(`[`)
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:6
}
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:7
for i, path := range canonicalPaths {
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:8
qw422016.N().Q(path)
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:9
if i+1 < len(canonicalPaths) {
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:9
qw422016.N().S(`,`)
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:9
}
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:10
}
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:11
if isJSONResponse {
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:11
qw422016.N().S(`]`)
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:11
}
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12
}
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12
func WriteTagsTagMultiSeriesResponse(qq422016 qtio422016.Writer, canonicalPaths []string, isJSONResponse bool) {
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12
StreamTagsTagMultiSeriesResponse(qw422016, canonicalPaths, isJSONResponse)
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12
qt422016.ReleaseWriter(qw422016)
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12
}
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12
func TagsTagMultiSeriesResponse(canonicalPaths []string, isJSONResponse bool) string {
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12
qb422016 := qt422016.AcquireByteBuffer()
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12
WriteTagsTagMultiSeriesResponse(qb422016, canonicalPaths, isJSONResponse)
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12
qs422016 := string(qb422016.B)
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12
qt422016.ReleaseByteBuffer(qb422016)
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12
return qs422016
//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12
}

View file

@ -40,7 +40,7 @@ var (
"Telnet put messages and HTTP /api/put messages are simultaneously served on TCP port. "+
"Usually :4242 must be set. Doesn't work if empty")
opentsdbHTTPListenAddr = flag.String("opentsdbHTTPListenAddr", "", "TCP address to listen for OpentTSDB HTTP put requests. Usually :4242 must be set. Doesn't work if empty")
maxLabelsPerTimeseries = flag.Int("maxLabelsPerTimeseries", 30, "The maximum number of labels accepted per time series. Superflouos labels are dropped")
maxLabelsPerTimeseries = flag.Int("maxLabelsPerTimeseries", 30, "The maximum number of labels accepted per time series. Superfluous labels are dropped")
)
var (
@ -153,22 +153,6 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
influxQueryRequests.Inc()
fmt.Fprintf(w, `{"results":[{"series":[{"values":[]}]}]}`)
return true
case "/tags/tagSeries":
graphiteTagsTagSeriesRequests.Inc()
if err := graphite.TagsTagSeriesHandler(w, r); err != nil {
graphiteTagsTagSeriesErrors.Inc()
httpserver.Errorf(w, r, "error in %q: %s", r.URL.Path, err)
return true
}
return true
case "/tags/tagMultiSeries":
graphiteTagsTagMultiSeriesRequests.Inc()
if err := graphite.TagsTagMultiSeriesHandler(w, r); err != nil {
graphiteTagsTagMultiSeriesErrors.Inc()
httpserver.Errorf(w, r, "error in %q: %s", r.URL.Path, err)
return true
}
return true
case "/targets":
promscrapeTargetsRequests.Inc()
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
@ -223,12 +207,6 @@ var (
influxQueryRequests = metrics.NewCounter(`vm_http_requests_total{path="/query", protocol="influx"}`)
graphiteTagsTagSeriesRequests = metrics.NewCounter(`vm_http_requests_total{path="/tags/tagSeries", protocol="graphite"}`)
graphiteTagsTagSeriesErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/tags/tagSeries", protocol="graphite"}`)
graphiteTagsTagMultiSeriesRequests = metrics.NewCounter(`vm_http_requests_total{path="/tags/tagMultiSeries", protocol="graphite"}`)
graphiteTagsTagMultiSeriesErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/tags/tagMultiSeries", protocol="graphite"}`)
promscrapeTargetsRequests = metrics.NewCounter(`vm_http_requests_total{path="/targets"}`)
promscrapeAPIV1TargetsRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/targets"}`)

View file

@ -12,10 +12,146 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/bufferedwriter"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
graphiteparser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
)
// TagsDelSeriesHandler implements /tags/delSeries handler.
//
// See https://graphite.readthedocs.io/en/stable/tags.html#removing-series-from-the-tagdb
func TagsDelSeriesHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error {
if err := r.ParseForm(); err != nil {
return fmt.Errorf("cannot parse form values: %w", err)
}
paths := r.Form["path"]
totalDeleted := 0
var row graphiteparser.Row
var tagsPool []graphiteparser.Tag
ct := time.Now().UnixNano() / 1e6
for _, path := range paths {
var err error
tagsPool, err = row.UnmarshalMetricAndTags(path, tagsPool[:0])
if err != nil {
return fmt.Errorf("cannot parse path=%q: %w", path, err)
}
tfs := make([]storage.TagFilter, 0, 1+len(row.Tags))
tfs = append(tfs, storage.TagFilter{
Key: nil,
Value: []byte(row.Metric),
})
for _, tag := range row.Tags {
tfs = append(tfs, storage.TagFilter{
Key: []byte(tag.Key),
Value: []byte(tag.Value),
})
}
sq := storage.NewSearchQuery(0, ct, [][]storage.TagFilter{tfs})
n, err := netstorage.DeleteSeries(sq)
if err != nil {
return fmt.Errorf("cannot delete series for %q: %w", sq, err)
}
totalDeleted += n
}
w.Header().Set("Content-Type", "application/json; charset=utf-8")
if totalDeleted > 0 {
fmt.Fprintf(w, "true")
} else {
fmt.Fprintf(w, "false")
}
return nil
}
// TagsTagSeriesHandler implements /tags/tagSeries handler.
//
// See https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb
func TagsTagSeriesHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error {
return registerMetrics(startTime, w, r, false)
}
// TagsTagMultiSeriesHandler implements /tags/tagMultiSeries handler.
//
// See https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb
func TagsTagMultiSeriesHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error {
return registerMetrics(startTime, w, r, true)
}
func registerMetrics(startTime time.Time, w http.ResponseWriter, r *http.Request, isJSONResponse bool) error {
if err := r.ParseForm(); err != nil {
return fmt.Errorf("cannot parse form values: %w", err)
}
paths := r.Form["path"]
var row graphiteparser.Row
var labels []prompb.Label
var b []byte
var tagsPool []graphiteparser.Tag
mrs := make([]storage.MetricRow, len(paths))
ct := time.Now().UnixNano() / 1e6
canonicalPaths := make([]string, len(paths))
for i, path := range paths {
var err error
tagsPool, err = row.UnmarshalMetricAndTags(path, tagsPool[:0])
if err != nil {
return fmt.Errorf("cannot parse path=%q: %w", path, err)
}
// Construct canonical path according to https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb
sort.Slice(row.Tags, func(i, j int) bool {
return row.Tags[i].Key < row.Tags[j].Key
})
b = append(b[:0], row.Metric...)
for _, tag := range row.Tags {
b = append(b, ';')
b = append(b, tag.Key...)
b = append(b, '=')
b = append(b, tag.Value...)
}
canonicalPaths[i] = string(b)
// Convert parsed metric and tags to labels.
labels = append(labels[:0], prompb.Label{
Name: []byte("__name__"),
Value: []byte(row.Metric),
})
for _, tag := range row.Tags {
labels = append(labels, prompb.Label{
Name: []byte(tag.Key),
Value: []byte(tag.Value),
})
}
// Put labels with the current timestamp to MetricRow
mr := &mrs[i]
mr.MetricNameRaw = storage.MarshalMetricNameRaw(mr.MetricNameRaw[:0], labels)
mr.Timestamp = ct
}
if err := vmstorage.RegisterMetricNames(mrs); err != nil {
return fmt.Errorf("cannot register paths: %w", err)
}
// Return response
contentType := "text/plain; charset=utf-8"
if isJSONResponse {
contentType = "application/json; charset=utf-8"
}
w.Header().Set("Content-Type", contentType)
WriteTagsTagMultiSeriesResponse(w, canonicalPaths, isJSONResponse)
if isJSONResponse {
tagsTagMultiSeriesDuration.UpdateDuration(startTime)
} else {
tagsTagSeriesDuration.UpdateDuration(startTime)
}
return nil
}
var (
tagsTagSeriesDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/tags/tagSeries"}`)
tagsTagMultiSeriesDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/tags/tagMultiSeries"}`)
)
// TagsAutoCompleteValuesHandler implements /tags/autoComplete/values endpoint from Graphite Tags API.
//
// See https://graphite.readthedocs.io/en/stable/tags.html#auto-complete-support

View file

@ -0,0 +1,75 @@
// Code generated by qtc from "tags_tag_multi_series_response.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
// TagsTagMultiSeriesResponse generates response for /tags/tagMultiSeries .See https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:5
package graphite
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:5
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:5
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:5
func StreamTagsTagMultiSeriesResponse(qw422016 *qt422016.Writer, canonicalPaths []string, isJSONResponse bool) {
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:6
if isJSONResponse {
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:6
qw422016.N().S(`[`)
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:6
}
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:7
for i, path := range canonicalPaths {
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:8
qw422016.N().Q(path)
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:9
if i+1 < len(canonicalPaths) {
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:9
qw422016.N().S(`,`)
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:9
}
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:10
}
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:11
if isJSONResponse {
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:11
qw422016.N().S(`]`)
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:11
}
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:12
}
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:12
func WriteTagsTagMultiSeriesResponse(qq422016 qtio422016.Writer, canonicalPaths []string, isJSONResponse bool) {
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:12
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:12
StreamTagsTagMultiSeriesResponse(qw422016, canonicalPaths, isJSONResponse)
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:12
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:12
}
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:12
func TagsTagMultiSeriesResponse(canonicalPaths []string, isJSONResponse bool) string {
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:12
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:12
WriteTagsTagMultiSeriesResponse(qb422016, canonicalPaths, isJSONResponse)
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:12
qs422016 := string(qb422016.B)
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:12
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:12
return qs422016
//line app/vmselect/graphite/tags_tag_multi_series_response.qtpl:12
}

View file

@ -23,7 +23,7 @@ import (
)
var (
deleteAuthKey = flag.String("deleteAuthKey", "", "authKey for metrics' deletion via /api/v1/admin/tsdb/delete_series")
deleteAuthKey = flag.String("deleteAuthKey", "", "authKey for metrics' deletion via /api/v1/admin/tsdb/delete_series and /tags/delSeries")
maxConcurrentRequests = flag.Int("search.maxConcurrentRequests", getDefaultMaxConcurrentRequests(), "The maximum number of concurrent search requests. "+
"It shouldn't be high, since a single request can saturate all the CPU cores. See also -search.maxQueueDuration")
maxQueueDuration = flag.Duration("search.maxQueueDuration", 10*time.Second, "The maximum time the request waits for execution when -search.maxConcurrentRequests "+
@ -269,6 +269,22 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
return true
}
return true
case "/tags/tagSeries":
graphiteTagsTagSeriesRequests.Inc()
if err := graphite.TagsTagSeriesHandler(startTime, w, r); err != nil {
graphiteTagsTagSeriesErrors.Inc()
httpserver.Errorf(w, r, "error in %q: %s", r.URL.Path, err)
return true
}
return true
case "/tags/tagMultiSeries":
graphiteTagsTagMultiSeriesRequests.Inc()
if err := graphite.TagsTagMultiSeriesHandler(startTime, w, r); err != nil {
graphiteTagsTagMultiSeriesErrors.Inc()
httpserver.Errorf(w, r, "error in %q: %s", r.URL.Path, err)
return true
}
return true
case "/tags":
graphiteTagsRequests.Inc()
if err := graphite.TagsHandler(startTime, w, r); err != nil {
@ -303,6 +319,19 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
return true
}
return true
case "/tags/delSeries":
graphiteTagsDelSeriesRequests.Inc()
authKey := r.FormValue("authKey")
if authKey != *deleteAuthKey {
httpserver.Errorf(w, r, "invalid authKey %q. It must match the value from -deleteAuthKey command line flag", authKey)
return true
}
if err := graphite.TagsDelSeriesHandler(startTime, w, r); err != nil {
graphiteTagsDelSeriesErrors.Inc()
httpserver.Errorf(w, r, "error in %q: %s", r.URL.Path, err)
return true
}
return true
case "/api/v1/rules":
// Return dumb placeholder
rulesRequests.Inc()
@ -416,6 +445,12 @@ var (
graphiteMetricsIndexRequests = metrics.NewCounter(`vm_http_requests_total{path="/metrics/index.json"}`)
graphiteMetricsIndexErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/metrics/index.json"}`)
graphiteTagsTagSeriesRequests = metrics.NewCounter(`vm_http_requests_total{path="/tags/tagSeries"}`)
graphiteTagsTagSeriesErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/tags/tagSeries"}`)
graphiteTagsTagMultiSeriesRequests = metrics.NewCounter(`vm_http_requests_total{path="/tags/tagMultiSeries"}`)
graphiteTagsTagMultiSeriesErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/tags/tagMultiSeries"}`)
graphiteTagsRequests = metrics.NewCounter(`vm_http_requests_total{path="/tags"}`)
graphiteTagsErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/tags"}`)
@ -431,6 +466,9 @@ var (
graphiteTagsAutoCompleteValuesRequests = metrics.NewCounter(`vm_http_requests_total{path="/tags/autoComplete/values"}`)
graphiteTagsAutoCompleteValuesErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/tags/autoComplete/values"}`)
graphiteTagsDelSeriesRequests = metrics.NewCounter(`vm_http_requests_total{path="/tags/delSeries"}`)
graphiteTagsDelSeriesErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/tags/delSeries"}`)
rulesRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/rules"}`)
alertsRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/alerts"}`)
metadataRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/metadata"}`)

View file

@ -19,6 +19,7 @@
* [Calculating the Error of Quantile Estimation with Histograms](https://linuxczar.net/blog/2020/08/13/histogram-error/)
* [Monitoring private clouds with VictoriaMetrics at LeroyMerlin](https://www.youtube.com/watch?v=74swsWqf0Uc)
* [Monitoring Kubernetes with VictoriaMetrics+Prometheus](https://speakerdeck.com/bo0km4n/victoriametrics-plus-prometheusdegou-zhu-surufu-shu-kubernetesfalsejian-shi-ji-pan)
* [High-performance Graphite storage solution on top of VictoriaMetrics](https://golangexample.com/a-high-performance-graphite-storage-solution/)
## Our articles
@ -48,3 +49,4 @@
* [Filtering and modifying time series during import to VictoriaMetrics](https://medium.com/@romanhavronenko/victoriametrics-how-to-migrate-data-from-prometheus-filtering-and-modifying-time-series-6d40cea4bf21)
* [Anomaly Detection in VictoriaMetrics](https://medium.com/@VictoriaMetrics/anomaly-detection-in-victoriametrics-9528538786a7)
* [How to use relabeling in Prometheus and VictoriaMetrics](https://valyala.medium.com/how-to-use-relabeling-in-prometheus-and-victoriametrics-8b90fc22c4b2)
* [First look at performance comparison between InfluxDB IOx and VictoriaMetrics](https://medium.com/@VictoriaMetrics/first-look-at-perfomance-comparassion-between-influxdb-iox-and-victoriametrics-e590f847935b)

View file

@ -11,11 +11,20 @@
* FEATURE: vminsert: export `vm_rpc_vmstorage_is_reachable` metric, which can be used for monitoring reachability of vmstorage nodes from vminsert nodes.
* FEATURE: vmagent: add Netflix Eureka service discovery (aka [eureka_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#eureka_sd_config)).
See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/851
* FEATURE: add `filters` option to `dockerswarm_sd_config` like Prometheus did in v2.23.0 - see https://github.com/prometheus/prometheus/pull/8074
* FEATURE: expose `__meta_ec2_ipv6_addresses` label for `ec2_sd_config` like Prometheus will do in the next release.
* FEATURE: add `-loggerWarnsPerSecondLimit` command-line flag for rate limiting of WARN messages in logs. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/905
* FEATURE: apply `loggerErrorsPerSecondLimit` and `-loggerWarnsPerSecondLimit` rate limit per caller. I.e. log messages are suppressed if the same caller logs the same message
at the rate exceeding the given limit. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/905#issuecomment-729395855
* FEATURE: add remoteAddr to slow query log in order to simplify identifying the client that sends slow queries to VictoriaMetrics.
Slow query logging is controlled with `-search.logSlowQueryDuration` command-line flag.
* FEATURE: add `/tags/delSeries` handler from Graphite Tags API. See https://victoriametrics.github.io/#graphite-tags-api-usage
* FEATURE: log metric name plus all its labels when the metric timestamp is out of the configured retention. This should simplify detecting the source of metrics with unexpected timestamps.
* FEATURE: add `-dryRun` command-line flag to single-node VictoriaMetrics in order to check config file pointed by `-promscrape.config`.
* BUGFIX: properly parse Prometheus metrics with [exemplars](https://github.com/OpenObservability/OpenMetrics/blob/master/OpenMetrics.md#exemplars-1) such as `foo 123 # {bar="baz"} 1`.
* BUGFIX: properly parse "infinity" values in [OpenMetrics format](https://github.com/OpenObservability/OpenMetrics/blob/master/OpenMetrics.md#abnf).
See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/924
# [v1.47.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.47.0)

View file

@ -205,11 +205,14 @@ or [an alternative dashboard for VictoriaMetrics cluster](https://grafana.com/gr
- `metrics/find` - searches Graphite metrics. See [these docs](https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find).
- `metrics/expand` - expands Graphite metrics. See [these docs](https://graphite-api.readthedocs.io/en/latest/api.html#metrics-expand).
- `metrics/index.json` - returns all the metric names. See [these docs](https://graphite-api.readthedocs.io/en/latest/api.html#metrics-index-json).
- `tags/tagSeries` - registers time series. See [these docs](https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb).
- `tags/tagMultiSeries` - register multiple time series. See [these docs](https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb).
- `tags` - returns tag names. See [these docs](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags).
- `tags/<tag_name>` - returns tag values for the given `<tag_name>`. See [these docs](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags).
- `tags/findSeries` - returns series matching the given `expr`. See [these docs](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags).
- `tags/autoComplete/tags` - returns tags matching the given `tagPrefix` and/or `expr`. See [these docs](https://graphite.readthedocs.io/en/stable/tags.html#auto-complete-support).
- `tags/autoComplete/values` - returns tag values matching the given `valuePrefix` and/or `expr`. See [these docs](https://graphite.readthedocs.io/en/stable/tags.html#auto-complete-support).
- `tags/delSeries` - deletes series matching the given `path`. See [these docs](https://graphite.readthedocs.io/en/stable/tags.html#removing-series-from-the-tagdb).
* URL for time series deletion: `http://<vmselect>:8481/delete/<accountID>/prometheus/api/v1/admin/tsdb/delete_series?match[]=<timeseries_selector_for_delete>`.
Note that the `delete_series` handler should be used only in exceptional cases such as deletion of accidentally ingested incorrect time series. It shouldn't

View file

@ -566,6 +566,7 @@ VictoriaMetrics supports the following handlers from [Graphite Tags API](https:/
* [/tags/findSeries](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags)
* [/tags/autoComplete/tags](https://graphite.readthedocs.io/en/stable/tags.html#auto-complete-support)
* [/tags/autoComplete/values](https://graphite.readthedocs.io/en/stable/tags.html#auto-complete-support)
* [/tags/delSeries](https://graphite.readthedocs.io/en/stable/tags.html#removing-series-from-the-tagdb)
## How to build from sources

2
go.mod
View file

@ -16,7 +16,7 @@ require (
github.com/golang/snappy v0.0.2
github.com/klauspost/compress v1.11.3
github.com/prometheus/prometheus v1.8.2-0.20201119142752-3ad25a6dc3d9
github.com/valyala/fastjson v1.6.1
github.com/valyala/fastjson v1.6.3
github.com/valyala/fastrand v1.0.0
github.com/valyala/fasttemplate v1.2.1
github.com/valyala/gozstd v1.8.3

4
go.sum
View file

@ -717,8 +717,8 @@ github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtX
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.16.0/go.mod h1:YOKImeEosDdBPnxc0gy7INqi3m1zK6A+xl6TwOBhHCA=
github.com/valyala/fastjson v1.6.1 h1:qJs/Kz/HebWzk8LmhOrSm7kdOyJBr1XB+zSkYtEEfQE=
github.com/valyala/fastjson v1.6.1/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY=
github.com/valyala/fastjson v1.6.3 h1:tAKFnnwmeMGPbwJ7IwxcTPCNr3uIzoIj3/Fh90ra4xc=
github.com/valyala/fastjson v1.6.3/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY=
github.com/valyala/fastrand v1.0.0 h1:LUKT9aKer2dVQNUi3waewTbKV+7H17kvWFNKs2ObdkI=
github.com/valyala/fastrand v1.0.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ=
github.com/valyala/fasttemplate v1.2.1 h1:TVEnxayobAdVkhQfrfes2IzOB6o+z4roRkPF52WA1u4=

View file

@ -22,9 +22,9 @@ var (
loggerOutput = flag.String("loggerOutput", "stderr", "Output for the logs. Supported values: stderr, stdout")
disableTimestamps = flag.Bool("loggerDisableTimestamps", false, "Whether to disable writing timestamps in logs")
errorsPerSecondLimit = flag.Int("loggerErrorsPerSecondLimit", 10, "Per-second limit on the number of ERROR messages. If more than the given number of errors "+
errorsPerSecondLimit = flag.Int("loggerErrorsPerSecondLimit", 0, "Per-second limit on the number of ERROR messages. If more than the given number of errors "+
"are emitted per second, then the remaining errors are suppressed. Zero value disables the rate limit")
warnsPerSecondLimit = flag.Int("loggerWarnsPerSecondLimit", 10, "Per-second limit on the number of WARN messages. If more than the given number of warns "+
warnsPerSecondLimit = flag.Int("loggerWarnsPerSecondLimit", 0, "Per-second limit on the number of WARN messages. If more than the given number of warns "+
"are emitted per second, then the remaining warns are suppressed. Zero value disables the rate limit")
)

View file

@ -31,6 +31,12 @@ type inmemoryBlock struct {
func (ib *inmemoryBlock) Reset() {
ib.commonPrefix = ib.commonPrefix[:0]
ib.data = ib.data[:0]
items := ib.items
for i := range items {
// Remove reference to by slice, so GC could free the byte slice.
items[i] = nil
}
ib.items = ib.items[:0]
}

View file

@ -245,7 +245,7 @@ func (idxbc *indexBlockCache) Get(k uint64) *indexBlock {
func (idxbc *indexBlockCache) Put(k uint64, idxb *indexBlock) {
idxbc.mu.Lock()
// Remove superflouos entries.
// Remove superfluous entries.
if overflow := len(idxbc.m) - getMaxCachedIndexBlocksPerPart(); overflow > 0 {
// Remove 10% of items from the cache.
overflow = int(float64(len(idxbc.m)) * 0.1)
@ -393,7 +393,7 @@ func (ibc *inmemoryBlockCache) Get(k inmemoryBlockCacheKey) *inmemoryBlock {
func (ibc *inmemoryBlockCache) Put(k inmemoryBlockCacheKey, ib *inmemoryBlock) {
ibc.mu.Lock()
// Clean superflouos entries in cache.
// Clean superfluous entries in cache.
if overflow := len(ibc.m) - getMaxCachedInmemoryBlocksPerPart(); overflow > 0 {
// Remove 10% of items from the cache.
overflow = int(float64(len(ibc.m)) * 0.1)

View file

@ -369,7 +369,7 @@ func (tb *Table) AddItems(items [][]byte) error {
tb.rawItemsBlocks = append(tb.rawItemsBlocks, ib)
}
}
if len(tb.rawItemsBlocks) >= 1024 {
if len(tb.rawItemsBlocks) >= 512 {
blocksToMerge = tb.rawItemsBlocks
tb.rawItemsBlocks = nil
tb.rawItemsLastFlushTime = fasttime.UnixTimestamp()

View file

@ -136,7 +136,7 @@ func testTableSearchSerial(tb *Table, items []string) error {
n++
}
if ts.NextItem() {
return fmt.Errorf("superflouos item found at position %d when searching for %q: %q", n, key, ts.Item)
return fmt.Errorf("superfluous item found at position %d when searching for %q: %q", n, key, ts.Item)
}
if err := ts.Error(); err != nil {
return fmt.Errorf("unexpected error when searching for %q: %w", key, err)

View file

@ -5,7 +5,6 @@ import (
"fmt"
"io/ioutil"
"net/url"
"os"
"path/filepath"
"strings"
"sync/atomic"
@ -134,16 +133,14 @@ func loadConfig(path string) (cfg *Config, data []byte, err error) {
if err := cfgObj.parse(data, path); err != nil {
return nil, nil, fmt.Errorf("cannot parse Prometheus config from %q: %w", path, err)
}
if *dryRun {
// This is a dirty hack for checking Prometheus config only.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/362
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/508 for details.
logger.Infof("Success: the config at %q has no errors; exitting with 0 status code", path)
os.Exit(0)
}
return &cfgObj, data, nil
}
// IsDryRun returns true if -promscrape.config.dryRun command-line flag is set
func IsDryRun() bool {
return *dryRun
}
func (cfg *Config) parse(data []byte, path string) error {
if err := unmarshalMaybeStrict(data, cfg); err != nil {
return fmt.Errorf("cannot unmarshal data: %w", err)

View file

@ -1,8 +1,12 @@
package dockerswarm
import (
"encoding/json"
"fmt"
"net/url"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
@ -12,6 +16,9 @@ var configMap = discoveryutils.NewConfigMap()
type apiConfig struct {
client *discoveryutils.Client
port int
// filtersQueryArg contains escaped `filters` query arg to add to each request to Docker Swarm API.
filtersQueryArg string
}
func getAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
@ -24,7 +31,8 @@ func getAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
cfg := &apiConfig{
port: sdc.Port,
port: sdc.Port,
filtersQueryArg: getFiltersQueryArg(sdc.Filters),
}
ac, err := promauth.NewConfig(baseDir, sdc.BasicAuth, sdc.BearerToken, sdc.BearerTokenFile, sdc.TLSConfig)
if err != nil {
@ -37,3 +45,36 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
cfg.client = client
return cfg, nil
}
func (cfg *apiConfig) getAPIResponse(path string) ([]byte, error) {
if len(cfg.filtersQueryArg) > 0 {
separator := "?"
if strings.Contains(path, "?") {
separator = "&"
}
path += separator + "filters=" + cfg.filtersQueryArg
}
return cfg.client.GetAPIResponse(path)
}
func getFiltersQueryArg(filters []Filter) string {
if len(filters) == 0 {
return ""
}
m := make(map[string]map[string]bool)
for _, f := range filters {
x := m[f.Name]
if x == nil {
x = make(map[string]bool)
m[f.Name] = x
}
for _, value := range f.Values {
x[value] = true
}
}
buf, err := json.Marshal(m)
if err != nil {
logger.Panicf("BUG: unexpected error in json.Marshal: %s", err)
}
return url.QueryEscape(string(buf))
}

View file

@ -0,0 +1,26 @@
package dockerswarm
import (
"testing"
)
func TestGetFiltersQueryArg(t *testing.T) {
f := func(filters []Filter, queryArgExpected string) {
t.Helper()
queryArg := getFiltersQueryArg(filters)
if queryArg != queryArgExpected {
t.Fatalf("unexpected query arg; got %s; want %s", queryArg, queryArgExpected)
}
}
f(nil, "")
f([]Filter{
{
Name: "name",
Values: []string{"foo", "bar"},
},
{
Name: "xxx",
Values: []string{"aa"},
},
}, "%7B%22name%22%3A%7B%22bar%22%3Atrue%2C%22foo%22%3Atrue%7D%2C%22xxx%22%3A%7B%22aa%22%3Atrue%7D%7D")
}

View file

@ -10,17 +10,25 @@ import (
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dockerswarm_sd_config
type SDConfig struct {
Host string `yaml:"host"`
Host string `yaml:"host"`
Role string `yaml:"role"`
Port int `yaml:"port,omitempty"`
Filters []Filter `yaml:"filters,omitempty"`
// TODO: add support for proxy_url
TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"`
Role string `yaml:"role"`
Port int `yaml:"port,omitempty"`
// refresh_interval is obtained from `-promscrape.dockerswarmSDCheckInterval` command-line option
BasicAuth *promauth.BasicAuthConfig `yaml:"basic_auth,omitempty"`
BearerToken string `yaml:"bearer_token,omitempty"`
BearerTokenFile string `yaml:"bearer_token_file,omitempty"`
}
// Filter is a filter, which can be passed to SDConfig.
type Filter struct {
Name string `yaml:"name"`
Values []string `yaml:"values"`
}
// GetLabels returns dockerswarm labels according to sdc.
func GetLabels(sdc *SDConfig, baseDir string) ([]map[string]string, error) {
cfg, err := getAPIConfig(sdc, baseDir)

View file

@ -27,7 +27,7 @@ func getNetworksLabelsByNetworkID(cfg *apiConfig) (map[string]map[string]string,
}
func getNetworks(cfg *apiConfig) ([]network, error) {
resp, err := cfg.client.GetAPIResponse("/networks")
resp, err := cfg.getAPIResponse("/networks")
if err != nil {
return nil, fmt.Errorf("cannot query dockerswarm api for networks: %w", err)
}

View file

@ -46,7 +46,7 @@ func getNodesLabels(cfg *apiConfig) ([]map[string]string, error) {
}
func getNodes(cfg *apiConfig) ([]node, error) {
resp, err := cfg.client.GetAPIResponse("/nodes")
resp, err := cfg.getAPIResponse("/nodes")
if err != nil {
return nil, fmt.Errorf("cannot query dockerswarm api for nodes: %w", err)
}

View file

@ -59,7 +59,7 @@ func getServicesLabels(cfg *apiConfig) ([]map[string]string, error) {
}
func getServices(cfg *apiConfig) ([]service, error) {
data, err := cfg.client.GetAPIResponse("/services")
data, err := cfg.getAPIResponse("/services")
if err != nil {
return nil, fmt.Errorf("cannot query dockerswarm api for services: %w", err)
}

View file

@ -58,7 +58,7 @@ func getTasksLabels(cfg *apiConfig) ([]map[string]string, error) {
}
func getTasks(cfg *apiConfig) ([]task, error) {
resp, err := cfg.client.GetAPIResponse("/tasks")
resp, err := cfg.getAPIResponse("/tasks")
if err != nil {
return nil, fmt.Errorf("cannot query dockerswarm api for tasks: %w", err)
}

View file

@ -104,7 +104,13 @@ type NetworkInterfaceSet struct {
// NetworkInterface represents NetworkInterface from https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_InstanceNetworkInterface.html
type NetworkInterface struct {
SubnetID string `xml:"subnetId"`
SubnetID string `xml:"subnetId"`
IPv6AddressesSet Ipv6AddressesSet `xml:"ipv6AddressesSet"`
}
// Ipv6AddressesSet represents ipv6AddressesSet from https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_InstanceNetworkInterface.html
type Ipv6AddressesSet struct {
Items []string `xml:"item"`
}
// TagSet represents TagSet from https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_Instance.html
@ -151,21 +157,27 @@ func (inst *Instance) appendTargetLabels(ms []map[string]string, ownerID string,
"__meta_ec2_vpc_id": inst.VPCID,
}
if len(inst.VPCID) > 0 {
// Deduplicate VPC Subnet IDs maintaining the order of the network interfaces returned by EC2.
subnets := make([]string, 0, len(inst.NetworkInterfaceSet.Items))
seenSubnets := make(map[string]bool, len(inst.NetworkInterfaceSet.Items))
var ipv6Addrs []string
for _, ni := range inst.NetworkInterfaceSet.Items {
if len(ni.SubnetID) == 0 {
continue
}
// Deduplicate VPC Subnet IDs maintaining the order of the network interfaces returned by EC2.
if !seenSubnets[ni.SubnetID] {
seenSubnets[ni.SubnetID] = true
subnets = append(subnets, ni.SubnetID)
}
// Collect ipv6 addresses
ipv6Addrs = append(ipv6Addrs, ni.IPv6AddressesSet.Items...)
}
// We surround the separated list with the separator as well. This way regular expressions
// in relabeling rules don't have to consider tag positions.
m["__meta_ec2_subnet_id"] = "," + strings.Join(subnets, ",") + ","
if len(ipv6Addrs) > 0 {
m["__meta_ec2_ipv6_addresses"] = "," + strings.Join(ipv6Addrs, ",") + ","
}
}
for _, t := range inst.TagSet.Items {
if len(t.Key) == 0 || len(t.Value) == 0 {

View file

@ -83,7 +83,7 @@ func parseRows(sc *scanner, dst []Row, tags []Tag, metrics []metric, cds []Colum
tagsLen := len(tags)
for sc.NextColumn() {
if col >= uint(len(cds)) {
// Skip superflouous column.
// Skip superfluous column.
continue
}
cd := &cds[col]

View file

@ -165,7 +165,7 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
},
})
// Superflouos columns
// Superfluous columns
f("1:metric:foo", `123,456,foo,bar`, []Row{
{
Metric: "foo",

View file

@ -70,6 +70,14 @@ func (r *Row) reset() {
r.Timestamp = 0
}
func skipTrailingComment(s string) string {
n := strings.IndexByte(s, '#')
if n < 0 {
return s
}
return s[:n]
}
func skipLeadingWhitespace(s string) string {
// Prometheus treats ' ' and '\t' as whitespace
// according to https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-format-details
@ -133,6 +141,7 @@ func (r *Row) unmarshal(s string, tagsPool []Tag, noEscapes bool) ([]Tag, error)
return tagsPool, fmt.Errorf("metric cannot be empty")
}
s = skipLeadingWhitespace(s)
s = skipTrailingComment(s)
if len(s) == 0 {
return tagsPool, fmt.Errorf("value cannot be empty")
}
@ -146,17 +155,21 @@ func (r *Row) unmarshal(s string, tagsPool []Tag, noEscapes bool) ([]Tag, error)
r.Value = v
return tagsPool, nil
}
// There is timestamp.
// There is a timestamp.
v, err := fastfloat.Parse(s[:n])
if err != nil {
return tagsPool, fmt.Errorf("cannot parse value %q: %w", s[:n], err)
}
r.Value = v
s = skipLeadingWhitespace(s[n+1:])
if len(s) == 0 {
// There is no timestamp - just a whitespace after the value.
return tagsPool, nil
}
ts, err := fastfloat.ParseInt64(s)
if err != nil {
return tagsPool, fmt.Errorf("cannot parse timestamp %q: %w", s, err)
}
r.Value = v
r.Timestamp = ts
return tagsPool, nil
}

View file

@ -1,6 +1,7 @@
package prometheus
import (
"math"
"reflect"
"testing"
)
@ -202,6 +203,77 @@ cassandra_token_ownership_ratio 78.9`, &Rows{
}},
})
// Exemplars - see https://github.com/OpenObservability/OpenMetrics/blob/master/OpenMetrics.md#exemplars-1
f(`foo_bucket{le="10",a="#b"} 17 # {trace_id="oHg5SJ#YRHA0"} 9.8 1520879607.789
abc 123 456#foobar
foo 344#bar`, &Rows{
Rows: []Row{
{
Metric: "foo_bucket",
Tags: []Tag{
{
Key: "le",
Value: "10",
},
{
Key: "a",
Value: "#b",
},
},
Value: 17,
},
{
Metric: "abc",
Value: 123,
Timestamp: 456,
},
{
Metric: "foo",
Value: 344,
},
},
})
// "Infinity" word - this has been added in OpenMetrics.
// See https://github.com/OpenObservability/OpenMetrics/blob/master/OpenMetrics.md
// Checks for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/924
inf := math.Inf(1)
f(`
foo Infinity
bar +Infinity
baz -infinity
aaa +inf
bbb -INF
ccc INF
`, &Rows{
Rows: []Row{
{
Metric: "foo",
Value: inf,
},
{
Metric: "bar",
Value: inf,
},
{
Metric: "baz",
Value: -inf,
},
{
Metric: "aaa",
Value: inf,
},
{
Metric: "bbb",
Value: -inf,
},
{
Metric: "ccc",
Value: inf,
},
},
})
// Timestamp bigger than 1<<31
f("aaa 1123 429496729600", &Rows{
Rows: []Row{{

View file

@ -44,7 +44,7 @@ func TestDeduplicateSamples(t *testing.T) {
}
}
if j != len(timestampsCopy) {
t.Fatalf("superflouos timestamps found starting from index %d: %v", j, timestampsCopy[j:])
t.Fatalf("superfluous timestamps found starting from index %d: %v", j, timestampsCopy[j:])
}
}
f(time.Millisecond, nil, []int64{})
@ -94,7 +94,7 @@ func TestDeduplicateSamplesDuringMerge(t *testing.T) {
}
}
if j != len(timestampsCopy) {
t.Fatalf("superflouos timestamps found starting from index %d: %v", j, timestampsCopy[j:])
t.Fatalf("superfluous timestamps found starting from index %d: %v", j, timestampsCopy[j:])
}
}
f(time.Millisecond, nil, []int64{})

View file

@ -343,17 +343,17 @@ func hasTag(tags []string, key []byte) bool {
}
// String returns user-readable representation of the metric name.
//
// Use this function only for debug logging.
func (mn *MetricName) String() string {
mn.sortTags()
var mnCopy MetricName
mnCopy.CopyFrom(mn)
mnCopy.sortTags()
var tags []string
for i := range mn.Tags {
t := &mn.Tags[i]
tags = append(tags, fmt.Sprintf("%q=%q", t.Key, t.Value))
for i := range mnCopy.Tags {
t := &mnCopy.Tags[i]
tags = append(tags, fmt.Sprintf("%s=%q", t.Key, t.Value))
}
tagsStr := strings.Join(tags, ", ")
return fmt.Sprintf("MetricGroup=%q, tags=[%s]", mn.MetricGroup, tagsStr)
tagsStr := strings.Join(tags, ",")
return fmt.Sprintf("%s{%s}", mnCopy.MetricGroup, tagsStr)
}
// SortAndMarshal sorts mn tags and then marshals them to dst.
@ -425,7 +425,7 @@ var maxLabelsPerTimeseries = 30
// SetMaxLabelsPerTimeseries sets the limit on the number of labels
// per each time series.
//
// Superfouos labels are dropped.
// Superfluous labels are dropped.
func SetMaxLabelsPerTimeseries(maxLabels int) {
if maxLabels <= 0 {
logger.Panicf("BUG: maxLabels must be positive; got %d", maxLabels)

View file

@ -6,6 +6,32 @@ import (
"testing"
)
func TestMetricNameString(t *testing.T) {
f := func(mn *MetricName, resultExpected string) {
t.Helper()
result := mn.String()
if result != resultExpected {
t.Fatalf("unexpected result\ngot\n%s\nwant\n%s", result, resultExpected)
}
}
f(&MetricName{
MetricGroup: []byte("foobar"),
}, "foobar{}")
f(&MetricName{
MetricGroup: []byte("abc"),
Tags: []Tag{
{
Key: []byte("foo"),
Value: []byte("bar"),
},
{
Key: []byte("baz"),
Value: []byte("123"),
},
},
}, `abc{baz="123",foo="bar"}`)
}
func TestMetricNameSortTags(t *testing.T) {
testMetricNameSortTags(t, []string{}, []string{})
testMetricNameSortTags(t, []string{"foo"}, []string{"foo"})

View file

@ -249,7 +249,7 @@ func (ibc *indexBlockCache) Get(k uint64) *indexBlock {
func (ibc *indexBlockCache) Put(k uint64, ib *indexBlock) {
ibc.mu.Lock()
// Clean superflouos cache entries.
// Clean superfluous cache entries.
if overflow := len(ibc.m) - getMaxCachedIndexBlocksPerPart(); overflow > 0 {
// Remove 10% of items from the cache.
overflow = int(float64(len(ibc.m)) * 0.1)

View file

@ -1238,9 +1238,10 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
if mr.Timestamp < minTimestamp {
// Skip rows with too small timestamps outside the retention.
if firstWarn == nil {
metricName := getUserReadableMetricName(mr.MetricNameRaw)
firstWarn = fmt.Errorf("cannot insert row with too small timestamp %d outside the retention; minimum allowed timestamp is %d; "+
"probably you need updating -retentionPeriod command-line flag",
mr.Timestamp, minTimestamp)
"probably you need updating -retentionPeriod command-line flag; metricName: %s",
mr.Timestamp, minTimestamp, metricName)
}
atomic.AddUint64(&s.tooSmallTimestampRows, 1)
continue
@ -1248,9 +1249,9 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
if mr.Timestamp > maxTimestamp {
// Skip rows with too big timestamps significantly exceeding the current time.
if firstWarn == nil {
firstWarn = fmt.Errorf("cannot insert row with too big timestamp %d exceeding the current time; maximum allowd timestamp is %d; "+
"propbably you need updating -retentionPeriod command-line flag",
mr.Timestamp, maxTimestamp)
metricName := getUserReadableMetricName(mr.MetricNameRaw)
firstWarn = fmt.Errorf("cannot insert row with too big timestamp %d exceeding the current time; maximum allowed timestamp is %d; metricName: %s",
mr.Timestamp, maxTimestamp, metricName)
}
atomic.AddUint64(&s.tooBigTimestampRows, 1)
continue
@ -1359,6 +1360,14 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
return rows, nil
}
func getUserReadableMetricName(metricNameRaw []byte) string {
var mn MetricName
if err := mn.unmarshalRaw(metricNameRaw); err != nil {
return fmt.Sprintf("cannot unmarshal metricNameRaw %q: %s", metricNameRaw, err)
}
return mn.String()
}
type pendingMetricRow struct {
MetricName []byte
mr MetricRow

View file

@ -237,7 +237,9 @@ func ParseBestEffort(s string) float64 {
if strings.HasPrefix(s, "+") {
s = s[1:]
}
if strings.EqualFold(s, "inf") {
// "infinity" is needed for OpenMetrics support.
// See https://github.com/OpenObservability/OpenMetrics/blob/master/OpenMetrics.md
if strings.EqualFold(s, "inf") || strings.EqualFold(s, "infinity") {
if minus {
return -inf
}
@ -385,7 +387,9 @@ func Parse(s string) (float64, error) {
if strings.HasPrefix(ss, "+") {
ss = ss[1:]
}
if strings.EqualFold(ss, "inf") {
// "infinity" is needed for OpenMetrics support.
// See https://github.com/OpenObservability/OpenMetrics/blob/master/OpenMetrics.md
if strings.EqualFold(ss, "inf") || strings.EqualFold(ss, "infinity") {
if minus {
return -inf, nil
}

2
vendor/modules.txt vendored
View file

@ -141,7 +141,7 @@ github.com/prometheus/prometheus/tsdb/tsdbutil
github.com/prometheus/prometheus/tsdb/wal
# github.com/valyala/bytebufferpool v1.0.0
github.com/valyala/bytebufferpool
# github.com/valyala/fastjson v1.6.1
# github.com/valyala/fastjson v1.6.3
github.com/valyala/fastjson
github.com/valyala/fastjson/fastfloat
# github.com/valyala/fastrand v1.0.0