mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 15:16:42 +00:00
9cfdbc582f
### Describe Your Changes doing similar changes for both vmagent and vminsert (like one in https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7399) ends up with almost same implementations for each of packages instead of having this shared code in one place. one of the reasons is the same Timeseries and Labels structure from different prompb and prompbmarshal packages. My proposal is to use structures from prompb package only to marshal/unmarshal sent/received data, but for internal transformations use only structures from prompbmarshal package Another example, where it already can help to simplify code is streaming aggregation pipeline for vmsingle (now it first marshals prompb.Timeseries to storage.MetricRow and then if streaming aggregation or deduplication is enabled it unmarshals all the series back but to prompbmarshal.Timeseries) ### Checklist The following checks are **mandatory**: - [ ] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/).
112 lines
3.1 KiB
Go
112 lines
3.1 KiB
Go
package main
|
|
|
|
import (
|
|
"flag"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/appmetrics"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
|
)
|
|
|
|
var (
|
|
selfScrapeInterval = flag.Duration("selfScrapeInterval", 0, "Interval for self-scraping own metrics at /metrics page")
|
|
selfScrapeInstance = flag.String("selfScrapeInstance", "self", "Value for 'instance' label, which is added to self-scraped metrics")
|
|
selfScrapeJob = flag.String("selfScrapeJob", "victoria-metrics", "Value for 'job' label, which is added to self-scraped metrics")
|
|
)
|
|
|
|
var selfScraperStopCh chan struct{}
|
|
var selfScraperWG sync.WaitGroup
|
|
|
|
func startSelfScraper() {
|
|
selfScraperStopCh = make(chan struct{})
|
|
selfScraperWG.Add(1)
|
|
go func() {
|
|
defer selfScraperWG.Done()
|
|
selfScraper(*selfScrapeInterval)
|
|
}()
|
|
}
|
|
|
|
func stopSelfScraper() {
|
|
close(selfScraperStopCh)
|
|
selfScraperWG.Wait()
|
|
}
|
|
|
|
func selfScraper(scrapeInterval time.Duration) {
|
|
if scrapeInterval <= 0 {
|
|
// Self-scrape is disabled.
|
|
return
|
|
}
|
|
logger.Infof("started self-scraping `/metrics` page with interval %.3f seconds", scrapeInterval.Seconds())
|
|
|
|
var bb bytesutil.ByteBuffer
|
|
var rows prometheus.Rows
|
|
var mrs []storage.MetricRow
|
|
var labels []prompbmarshal.Label
|
|
t := time.NewTicker(scrapeInterval)
|
|
f := func(currentTime time.Time, sendStaleMarkers bool) {
|
|
currentTimestamp := currentTime.UnixNano() / 1e6
|
|
bb.Reset()
|
|
appmetrics.WritePrometheusMetrics(&bb)
|
|
s := bytesutil.ToUnsafeString(bb.B)
|
|
rows.Reset()
|
|
rows.Unmarshal(s)
|
|
mrs = mrs[:0]
|
|
for i := range rows.Rows {
|
|
r := &rows.Rows[i]
|
|
labels = labels[:0]
|
|
labels = addLabel(labels, "", r.Metric)
|
|
labels = addLabel(labels, "job", *selfScrapeJob)
|
|
labels = addLabel(labels, "instance", *selfScrapeInstance)
|
|
for j := range r.Tags {
|
|
t := &r.Tags[j]
|
|
labels = addLabel(labels, t.Key, t.Value)
|
|
}
|
|
if len(mrs) < cap(mrs) {
|
|
mrs = mrs[:len(mrs)+1]
|
|
} else {
|
|
mrs = append(mrs, storage.MetricRow{})
|
|
}
|
|
mr := &mrs[len(mrs)-1]
|
|
mr.MetricNameRaw = storage.MarshalMetricNameRaw(mr.MetricNameRaw[:0], labels)
|
|
mr.Timestamp = currentTimestamp
|
|
if sendStaleMarkers {
|
|
mr.Value = decimal.StaleNaN
|
|
} else {
|
|
mr.Value = r.Value
|
|
}
|
|
}
|
|
if err := vmstorage.AddRows(mrs); err != nil {
|
|
logger.Errorf("cannot store self-scraped metrics: %s", err)
|
|
}
|
|
}
|
|
for {
|
|
select {
|
|
case <-selfScraperStopCh:
|
|
f(time.Now(), true)
|
|
t.Stop()
|
|
logger.Infof("stopped self-scraping `/metrics` page")
|
|
return
|
|
case currentTime := <-t.C:
|
|
f(currentTime, false)
|
|
}
|
|
}
|
|
}
|
|
|
|
func addLabel(dst []prompbmarshal.Label, key, value string) []prompbmarshal.Label {
|
|
if len(dst) < cap(dst) {
|
|
dst = dst[:len(dst)+1]
|
|
} else {
|
|
dst = append(dst, prompbmarshal.Label{})
|
|
}
|
|
lb := &dst[len(dst)-1]
|
|
lb.Name = key
|
|
lb.Value = value
|
|
return dst
|
|
}
|