package promscrape import ( "bytes" "flag" "fmt" "io" "math" "math/bits" "strings" "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/leveledbytebufferpool" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus/stream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" "github.com/VictoriaMetrics/metrics" "github.com/cespare/xxhash/v2" ) var ( suppressScrapeErrors = flag.Bool("promscrape.suppressScrapeErrors", false, "Whether to suppress scrape errors logging. "+ "The last error for each target is always available at '/targets' page even if scrape errors logging is suppressed. "+ "See also -promscrape.suppressScrapeErrorsDelay") suppressScrapeErrorsDelay = flag.Duration("promscrape.suppressScrapeErrorsDelay", 0, "The delay for suppressing repeated scrape errors logging per each scrape targets. "+ "This may be used for reducing the number of log lines related to scrape errors. See also -promscrape.suppressScrapeErrors") minResponseSizeForStreamParse = flagutil.NewBytes("promscrape.minResponseSizeForStreamParse", 1e6, "The minimum target response size for automatic switching to stream parsing mode, which can reduce memory usage. See https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode") ) // ScrapeWork represents a unit of work for scraping Prometheus metrics. // // It must be immutable during its lifetime, since it is read from concurrently running goroutines. type ScrapeWork struct { // Full URL (including query args) for the scrape. ScrapeURL string // Interval for scraping the ScrapeURL. ScrapeInterval time.Duration // Timeout for scraping the ScrapeURL. ScrapeTimeout time.Duration // How to deal with conflicting labels. // See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config HonorLabels bool // How to deal with scraped timestamps. // See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config HonorTimestamps bool // Whether to deny redirects during requests to scrape config. DenyRedirects bool // Do not support enable_http2 option because of the following reasons: // // - http2 is used very rarely comparing to http for Prometheus metrics exposition and service discovery // - http2 is much harder to debug than http // - http2 has very bad security record because of its complexity - see https://portswigger.net/research/http2 // // VictoriaMetrics components are compiled with nethttpomithttp2 tag because of these issues. // // EnableHTTP2 bool // OriginalLabels contains original labels before relabeling. // // These labels are needed for relabeling troubleshooting at /targets page. // // OriginalLabels are sorted by name. OriginalLabels *promutils.Labels // Labels to add to the scraped metrics. // // The list contains at least the following labels according to https://www.robustperception.io/life-of-a-label/ // // * job // * instance // * user-defined labels set via `relabel_configs` section in `scrape_config` // // See also https://prometheus.io/docs/concepts/jobs_instances/ // // Labels are sorted by name. Labels *promutils.Labels // ExternalLabels contains labels from global->external_labels section of -promscrape.config // // These labels are added to scraped metrics after the relabeling. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3137 // // ExternalLabels are sorted by name. ExternalLabels *promutils.Labels // ProxyURL HTTP proxy url ProxyURL *proxy.URL // Auth config for ProxyUR: ProxyAuthConfig *promauth.Config // Auth config AuthConfig *promauth.Config // Optional `relabel_configs`. RelabelConfigs *promrelabel.ParsedConfigs // Optional `metric_relabel_configs`. MetricRelabelConfigs *promrelabel.ParsedConfigs // The maximum number of metrics to scrape after relabeling. SampleLimit int // Whether to disable response compression when querying ScrapeURL. DisableCompression bool // Whether to disable HTTP keep-alive when querying ScrapeURL. DisableKeepAlive bool // Whether to parse target responses in a streaming manner. StreamParse bool // The interval for aligning the first scrape. ScrapeAlignInterval time.Duration // The offset for the first scrape. ScrapeOffset time.Duration // Optional limit on the number of unique series the scrape target can expose. SeriesLimit int // Whether to process stale markers for the given target. // See https://docs.victoriametrics.com/vmagent.html#prometheus-staleness-markers NoStaleMarkers bool // The Tenant Info AuthToken *auth.Token // The original 'job_name' jobNameOriginal string } func (sw *ScrapeWork) canSwitchToStreamParseMode() bool { // Deny switching to stream parse mode if `sample_limit` or `series_limit` options are set, // since these limits cannot be applied in stream parsing mode. return sw.SampleLimit <= 0 && sw.SeriesLimit <= 0 } // key returns unique identifier for the given sw. // // It can be used for comparing for equality for two ScrapeWork objects. func (sw *ScrapeWork) key() string { // Do not take into account OriginalLabels, since they can be changed with relabeling. // Do not take into account RelabelConfigs, since it is already applied to Labels. // Take into account JobNameOriginal in order to capture the case when the original job_name is changed via relabeling. key := fmt.Sprintf("JobNameOriginal=%s, ScrapeURL=%s, ScrapeInterval=%s, ScrapeTimeout=%s, HonorLabels=%v, HonorTimestamps=%v, DenyRedirects=%v, Labels=%s, "+ "ExternalLabels=%s, "+ "ProxyURL=%s, ProxyAuthConfig=%s, AuthConfig=%s, MetricRelabelConfigs=%q, "+ "SampleLimit=%d, DisableCompression=%v, DisableKeepAlive=%v, StreamParse=%v, "+ "ScrapeAlignInterval=%s, ScrapeOffset=%s, SeriesLimit=%d, NoStaleMarkers=%v", sw.jobNameOriginal, sw.ScrapeURL, sw.ScrapeInterval, sw.ScrapeTimeout, sw.HonorLabels, sw.HonorTimestamps, sw.DenyRedirects, sw.Labels.String(), sw.ExternalLabels.String(), sw.ProxyURL.String(), sw.ProxyAuthConfig.String(), sw.AuthConfig.String(), sw.MetricRelabelConfigs.String(), sw.SampleLimit, sw.DisableCompression, sw.DisableKeepAlive, sw.StreamParse, sw.ScrapeAlignInterval, sw.ScrapeOffset, sw.SeriesLimit, sw.NoStaleMarkers) return key } // Job returns job for the ScrapeWork func (sw *ScrapeWork) Job() string { return sw.Labels.Get("job") } type scrapeWork struct { // Config for the scrape. Config *ScrapeWork // ReadData is called for reading the data. ReadData func(dst []byte) ([]byte, error) // GetStreamReader is called if Config.StreamParse is set. GetStreamReader func() (*streamReader, error) // PushData is called for pushing collected data. PushData func(at *auth.Token, wr *prompbmarshal.WriteRequest) // ScrapeGroup is name of ScrapeGroup that // scrapeWork belongs to ScrapeGroup string tmpRow parser.Row // This flag is set to true if series_limit is exceeded. seriesLimitExceeded bool // labelsHashBuf is used for calculating the hash on series labels labelsHashBuf []byte // Optional limiter on the number of unique series per scrape target. seriesLimiter *bloomfilter.Limiter // prevBodyLen contains the previous response body length for the given scrape work. // It is used as a hint in order to reduce memory usage for body buffers. prevBodyLen int // prevLabelsLen contains the number labels scraped during the previous scrape. // It is used as a hint in order to reduce memory usage when parsing scrape responses. prevLabelsLen int // lastScrape holds the last response from scrape target. // It is used for staleness tracking and for populating scrape_series_added metric. // The lastScrape isn't populated if -promscrape.noStaleMarkers is set. This reduces memory usage. lastScrape []byte // lastScrapeCompressed is used for storing the compressed lastScrape between scrapes // in stream parsing mode in order to reduce memory usage when the lastScrape size // equals to or exceeds -promscrape.minResponseSizeForStreamParse lastScrapeCompressed []byte // nextErrorLogTime is the timestamp in millisecond when the next scrape error should be logged. nextErrorLogTime int64 // failureRequestsCount is the number of suppressed scrape errors during the last suppressScrapeErrorsDelay failureRequestsCount int // successRequestsCount is the number of success requests during the last suppressScrapeErrorsDelay successRequestsCount int } func (sw *scrapeWork) loadLastScrape() string { if len(sw.lastScrapeCompressed) > 0 { b, err := encoding.DecompressZSTD(sw.lastScrape[:0], sw.lastScrapeCompressed) if err != nil { logger.Panicf("BUG: cannot unpack compressed previous response: %s", err) } sw.lastScrape = b } return bytesutil.ToUnsafeString(sw.lastScrape) } func (sw *scrapeWork) storeLastScrape(lastScrape []byte) { mustCompress := minResponseSizeForStreamParse.N > 0 && len(lastScrape) >= minResponseSizeForStreamParse.IntN() if mustCompress { sw.lastScrapeCompressed = encoding.CompressZSTDLevel(sw.lastScrapeCompressed[:0], lastScrape, 1) sw.lastScrape = nil } else { sw.lastScrape = append(sw.lastScrape[:0], lastScrape...) sw.lastScrapeCompressed = nil } } func (sw *scrapeWork) finalizeLastScrape() { if len(sw.lastScrapeCompressed) > 0 { // The compressed lastScrape is available in sw.lastScrapeCompressed. // Release the memory occupied by sw.lastScrape, so it won't be occupied between scrapes. sw.lastScrape = nil } if len(sw.lastScrape) > 0 { // Release the memory occupied by sw.lastScrapeCompressed, so it won't be occupied between scrapes. sw.lastScrapeCompressed = nil } } func (sw *scrapeWork) run(stopCh <-chan struct{}, globalStopCh <-chan struct{}) { var randSleep uint64 scrapeInterval := sw.Config.ScrapeInterval scrapeAlignInterval := sw.Config.ScrapeAlignInterval scrapeOffset := sw.Config.ScrapeOffset if scrapeOffset > 0 { scrapeAlignInterval = scrapeInterval } if scrapeAlignInterval <= 0 { // Calculate start time for the first scrape from ScrapeURL and labels. // This should spread load when scraping many targets with different // scrape urls and labels. // This also makes consistent scrape times across restarts // for a target with the same ScrapeURL and labels. // // Include clusterName to the key in order to guarantee that the same // scrape target is scraped at different offsets per each cluster. // This guarantees that the deduplication consistently leaves samples received from the same vmagent. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2679 // // Include clusterMemberID to the key in order to guarantee that each member in vmagent cluster // scrapes replicated targets at different time offsets. This guarantees that the deduplication consistently leaves samples // received from the same vmagent replica. // See https://docs.victoriametrics.com/vmagent.html#scraping-big-number-of-targets key := fmt.Sprintf("clusterName=%s, clusterMemberID=%d, ScrapeURL=%s, Labels=%s", *clusterName, clusterMemberID, sw.Config.ScrapeURL, sw.Config.Labels.String()) h := xxhash.Sum64(bytesutil.ToUnsafeBytes(key)) randSleep = uint64(float64(scrapeInterval) * (float64(h) / (1 << 64))) sleepOffset := uint64(time.Now().UnixNano()) % uint64(scrapeInterval) if randSleep < sleepOffset { randSleep += uint64(scrapeInterval) } randSleep -= sleepOffset } else { d := uint64(scrapeAlignInterval) randSleep = d - uint64(time.Now().UnixNano())%d if scrapeOffset > 0 { randSleep += uint64(scrapeOffset) } randSleep %= uint64(scrapeInterval) } timer := timerpool.Get(time.Duration(randSleep)) var timestamp int64 var ticker *time.Ticker select { case <-stopCh: timerpool.Put(timer) return case <-timer.C: timerpool.Put(timer) ticker = time.NewTicker(scrapeInterval) timestamp = time.Now().UnixNano() / 1e6 sw.scrapeAndLogError(timestamp, timestamp) } defer ticker.Stop() for { timestamp += scrapeInterval.Milliseconds() select { case <-stopCh: t := time.Now().UnixNano() / 1e6 lastScrape := sw.loadLastScrape() select { case <-globalStopCh: // Do not send staleness markers on graceful shutdown as Prometheus does. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2013#issuecomment-1006994079 default: // Send staleness markers to all the metrics scraped last time from the target // when the given target disappears as Prometheus does. // Use the current real timestamp for staleness markers, so queries // stop returning data just after the time the target disappears. sw.sendStaleSeries(lastScrape, "", t, true) } if sw.seriesLimiter != nil { sw.seriesLimiter.MustStop() sw.seriesLimiter = nil } return case tt := <-ticker.C: t := tt.UnixNano() / 1e6 if d := math.Abs(float64(t - timestamp)); d > 0 && d/float64(scrapeInterval.Milliseconds()) > 0.1 { // Too big jitter. Adjust timestamp timestamp = t } sw.scrapeAndLogError(timestamp, t) } } } func (sw *scrapeWork) logError(s string) { if !*suppressScrapeErrors { logger.ErrorfSkipframes(1, "error when scraping %q from job %q with labels %s: %s; "+ "scrape errors can be disabled by -promscrape.suppressScrapeErrors command-line flag", sw.Config.ScrapeURL, sw.Config.Job(), sw.Config.Labels.String(), s) } } func (sw *scrapeWork) scrapeAndLogError(scrapeTimestamp, realTimestamp int64) { err := sw.scrapeInternal(scrapeTimestamp, realTimestamp) if *suppressScrapeErrors { return } if err == nil { sw.successRequestsCount++ return } sw.failureRequestsCount++ if sw.nextErrorLogTime == 0 { sw.nextErrorLogTime = realTimestamp + suppressScrapeErrorsDelay.Milliseconds() } if realTimestamp < sw.nextErrorLogTime { return } totalRequests := sw.failureRequestsCount + sw.successRequestsCount logger.Warnf("cannot scrape target %q (%s) %d out of %d times during -promscrape.suppressScrapeErrorsDelay=%s; the last error: %s", sw.Config.ScrapeURL, sw.Config.Labels.String(), sw.failureRequestsCount, totalRequests, *suppressScrapeErrorsDelay, err) sw.nextErrorLogTime = realTimestamp + suppressScrapeErrorsDelay.Milliseconds() sw.failureRequestsCount = 0 sw.successRequestsCount = 0 } var ( scrapeDuration = metrics.NewHistogram("vm_promscrape_scrape_duration_seconds") scrapeResponseSize = metrics.NewHistogram("vm_promscrape_scrape_response_size_bytes") scrapedSamples = metrics.NewHistogram("vm_promscrape_scraped_samples") scrapesSkippedBySampleLimit = metrics.NewCounter("vm_promscrape_scrapes_skipped_by_sample_limit_total") scrapesFailed = metrics.NewCounter("vm_promscrape_scrapes_failed_total") pushDataDuration = metrics.NewHistogram("vm_promscrape_push_data_duration_seconds") ) func (sw *scrapeWork) mustSwitchToStreamParseMode(responseSize int) bool { if minResponseSizeForStreamParse.N <= 0 { return false } return sw.Config.canSwitchToStreamParseMode() && responseSize >= minResponseSizeForStreamParse.IntN() } // getTargetResponse() fetches response from sw target in the same way as when scraping the target. func (sw *scrapeWork) getTargetResponse() ([]byte, error) { // use stream reader when stream mode enabled if *streamParse || sw.Config.StreamParse || sw.mustSwitchToStreamParseMode(sw.prevBodyLen) { // Read the response in stream mode. sr, err := sw.GetStreamReader() if err != nil { return nil, err } data, err := io.ReadAll(sr) sr.MustClose() return data, err } // Read the response in usual mode. return sw.ReadData(nil) } func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error { if *streamParse || sw.Config.StreamParse || sw.mustSwitchToStreamParseMode(sw.prevBodyLen) { // Read data from scrape targets in streaming manner. // This case is optimized for targets exposing more than ten thousand of metrics per target. return sw.scrapeStream(scrapeTimestamp, realTimestamp) } // Common case: read all the data from scrape target to memory (body) and then process it. // This case should work more optimally than stream parse code for common case when scrape target exposes // up to a few thousand metrics. body := leveledbytebufferpool.Get(sw.prevBodyLen) var err error body.B, err = sw.ReadData(body.B[:0]) releaseBody, err := sw.processScrapedData(scrapeTimestamp, realTimestamp, body, err) if releaseBody { leveledbytebufferpool.Put(body) } return err } var processScrapedDataConcurrencyLimitCh = make(chan struct{}, cgroup.AvailableCPUs()) func (sw *scrapeWork) processScrapedData(scrapeTimestamp, realTimestamp int64, body *bytesutil.ByteBuffer, err error) (bool, error) { // This function is CPU-bound, while it may allocate big amounts of memory. // That's why it is a good idea to limit the number of concurrent calls to this function // in order to limit memory usage under high load without sacrificing the performance. processScrapedDataConcurrencyLimitCh <- struct{}{} defer func() { <-processScrapedDataConcurrencyLimitCh }() endTimestamp := time.Now().UnixNano() / 1e6 duration := float64(endTimestamp-realTimestamp) / 1e3 scrapeDuration.Update(duration) scrapeResponseSize.Update(float64(len(body.B))) up := 1 wc := writeRequestCtxPool.Get(sw.prevLabelsLen) lastScrape := sw.loadLastScrape() bodyString := bytesutil.ToUnsafeString(body.B) areIdenticalSeries := sw.areIdenticalSeries(lastScrape, bodyString) if err != nil { up = 0 scrapesFailed.Inc() } else { wc.rows.UnmarshalWithErrLogger(bodyString, sw.logError) } srcRows := wc.rows.Rows samplesScraped := len(srcRows) scrapedSamples.Update(float64(samplesScraped)) for i := range srcRows { sw.addRowToTimeseries(wc, &srcRows[i], scrapeTimestamp, true) } samplesPostRelabeling := len(wc.writeRequest.Timeseries) if sw.Config.SampleLimit > 0 && samplesPostRelabeling > sw.Config.SampleLimit { wc.resetNoRows() up = 0 scrapesSkippedBySampleLimit.Inc() err = fmt.Errorf("the response from %q exceeds sample_limit=%d; "+ "either reduce the sample count for the target or increase sample_limit", sw.Config.ScrapeURL, sw.Config.SampleLimit) } if up == 0 { bodyString = "" } seriesAdded := 0 if !areIdenticalSeries { // The returned value for seriesAdded may be bigger than the real number of added series // if some series were removed during relabeling. // This is a trade-off between performance and accuracy. seriesAdded = sw.getSeriesAdded(lastScrape, bodyString) } samplesDropped := 0 if sw.seriesLimitExceeded || !areIdenticalSeries { samplesDropped = sw.applySeriesLimit(wc) } am := &autoMetrics{ up: up, scrapeDurationSeconds: duration, samplesScraped: samplesScraped, samplesPostRelabeling: samplesPostRelabeling, seriesAdded: seriesAdded, seriesLimitSamplesDropped: samplesDropped, } sw.addAutoMetrics(am, wc, scrapeTimestamp) sw.pushData(sw.Config.AuthToken, &wc.writeRequest) sw.prevLabelsLen = len(wc.labels) sw.prevBodyLen = len(bodyString) wc.reset() mustSwitchToStreamParse := sw.mustSwitchToStreamParseMode(len(bodyString)) if !mustSwitchToStreamParse { // Return wc to the pool if the parsed response size was smaller than -promscrape.minResponseSizeForStreamParse // This should reduce memory usage when scraping targets with big responses. writeRequestCtxPool.Put(wc) } // body must be released only after wc is released, since wc refers to body. if !areIdenticalSeries { // Send stale markers for disappeared metrics with the real scrape timestamp // in order to guarantee that query doesn't return data after this time for the disappeared metrics. sw.sendStaleSeries(lastScrape, bodyString, realTimestamp, false) sw.storeLastScrape(body.B) } sw.finalizeLastScrape() tsmGlobal.Update(sw, up == 1, realTimestamp, int64(duration*1000), samplesScraped, err) return !mustSwitchToStreamParse, err } func (sw *scrapeWork) pushData(at *auth.Token, wr *prompbmarshal.WriteRequest) { startTime := time.Now() sw.PushData(at, wr) pushDataDuration.UpdateDuration(startTime) } type streamBodyReader struct { body []byte bodyLen int readOffset int } func (sbr *streamBodyReader) Init(sr *streamReader) error { sbr.body = nil sbr.bodyLen = 0 sbr.readOffset = 0 // Read the whole response body in memory before parsing it in stream mode. // This minimizes the time needed for reading response body from scrape target. startTime := fasttime.UnixTimestamp() body, err := io.ReadAll(sr) if err != nil { d := fasttime.UnixTimestamp() - startTime return fmt.Errorf("cannot read stream body in %d seconds: %w", d, err) } sbr.body = body sbr.bodyLen = len(body) return nil } func (sbr *streamBodyReader) Read(b []byte) (int, error) { if sbr.readOffset >= len(sbr.body) { return 0, io.EOF } n := copy(b, sbr.body[sbr.readOffset:]) sbr.readOffset += n return n, nil } func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { samplesScraped := 0 samplesPostRelabeling := 0 wc := writeRequestCtxPool.Get(sw.prevLabelsLen) // Do not pool sbr and do not pre-allocate sbr.body in order to reduce memory usage when scraping big responses. var sbr streamBodyReader lastScrape := sw.loadLastScrape() bodyString := "" areIdenticalSeries := true samplesDropped := 0 sr, err := sw.GetStreamReader() if err != nil { err = fmt.Errorf("cannot read data: %w", err) } else { var mu sync.Mutex err = sbr.Init(sr) if err == nil { bodyString = bytesutil.ToUnsafeString(sbr.body) areIdenticalSeries = sw.areIdenticalSeries(lastScrape, bodyString) err = stream.Parse(&sbr, scrapeTimestamp, false, false, func(rows []parser.Row) error { mu.Lock() defer mu.Unlock() samplesScraped += len(rows) for i := range rows { sw.addRowToTimeseries(wc, &rows[i], scrapeTimestamp, true) } samplesPostRelabeling += len(wc.writeRequest.Timeseries) if sw.Config.SampleLimit > 0 && samplesPostRelabeling > sw.Config.SampleLimit { wc.resetNoRows() scrapesSkippedBySampleLimit.Inc() return fmt.Errorf("the response from %q exceeds sample_limit=%d; "+ "either reduce the sample count for the target or increase sample_limit", sw.Config.ScrapeURL, sw.Config.SampleLimit) } if sw.seriesLimitExceeded || !areIdenticalSeries { samplesDropped += sw.applySeriesLimit(wc) } // Push the collected rows to sw before returning from the callback, since they cannot be held // after returning from the callback - this will result in data race. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825#issuecomment-723198247 sw.pushData(sw.Config.AuthToken, &wc.writeRequest) wc.resetNoRows() return nil }, sw.logError) } sr.MustClose() } scrapedSamples.Update(float64(samplesScraped)) endTimestamp := time.Now().UnixNano() / 1e6 duration := float64(endTimestamp-realTimestamp) / 1e3 scrapeDuration.Update(duration) scrapeResponseSize.Update(float64(sbr.bodyLen)) up := 1 if err != nil { // Mark the scrape as failed even if it already read and pushed some samples // to remote storage. This makes the logic compatible with Prometheus. up = 0 scrapesFailed.Inc() } seriesAdded := 0 if !areIdenticalSeries { // The returned value for seriesAdded may be bigger than the real number of added series // if some series were removed during relabeling. // This is a trade-off between performance and accuracy. seriesAdded = sw.getSeriesAdded(lastScrape, bodyString) } am := &autoMetrics{ up: up, scrapeDurationSeconds: duration, samplesScraped: samplesScraped, samplesPostRelabeling: samplesPostRelabeling, seriesAdded: seriesAdded, seriesLimitSamplesDropped: samplesDropped, } sw.addAutoMetrics(am, wc, scrapeTimestamp) sw.pushData(sw.Config.AuthToken, &wc.writeRequest) sw.prevLabelsLen = len(wc.labels) sw.prevBodyLen = sbr.bodyLen wc.reset() writeRequestCtxPool.Put(wc) if !areIdenticalSeries { // Send stale markers for disappeared metrics with the real scrape timestamp // in order to guarantee that query doesn't return data after this time for the disappeared metrics. sw.sendStaleSeries(lastScrape, bodyString, realTimestamp, false) sw.storeLastScrape(sbr.body) } sw.finalizeLastScrape() tsmGlobal.Update(sw, up == 1, realTimestamp, int64(duration*1000), samplesScraped, err) // Do not track active series in streaming mode, since this may need too big amounts of memory // when the target exports too big number of metrics. return err } func (sw *scrapeWork) areIdenticalSeries(prevData, currData string) bool { if sw.Config.NoStaleMarkers && sw.Config.SeriesLimit <= 0 { // Do not spend CPU time on tracking the changes in series if stale markers are disabled. // The check for series_limit is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3660 return true } return parser.AreIdenticalSeriesFast(prevData, currData) } // leveledWriteRequestCtxPool allows reducing memory usage when writeRequesCtx // structs contain mixed number of labels. // // Its logic has been copied from leveledbytebufferpool. type leveledWriteRequestCtxPool struct { pools [13]sync.Pool } func (lwp *leveledWriteRequestCtxPool) Get(labelsCapacity int) *writeRequestCtx { id, capacityNeeded := lwp.getPoolIDAndCapacity(labelsCapacity) for i := 0; i < 2; i++ { if id < 0 || id >= len(lwp.pools) { break } if v := lwp.pools[id].Get(); v != nil { return v.(*writeRequestCtx) } id++ } return &writeRequestCtx{ labels: make([]prompbmarshal.Label, 0, capacityNeeded), } } func (lwp *leveledWriteRequestCtxPool) Put(wc *writeRequestCtx) { capacity := cap(wc.labels) id, poolCapacity := lwp.getPoolIDAndCapacity(capacity) if capacity <= poolCapacity { wc.reset() lwp.pools[id].Put(wc) } } func (lwp *leveledWriteRequestCtxPool) getPoolIDAndCapacity(size int) (int, int) { size-- if size < 0 { size = 0 } size >>= 3 id := bits.Len(uint(size)) if id >= len(lwp.pools) { id = len(lwp.pools) - 1 } return id, (1 << (id + 3)) } type writeRequestCtx struct { rows parser.Rows writeRequest prompbmarshal.WriteRequest labels []prompbmarshal.Label samples []prompbmarshal.Sample } func (wc *writeRequestCtx) reset() { wc.rows.Reset() wc.resetNoRows() } func (wc *writeRequestCtx) resetNoRows() { wc.writeRequest.Reset() labels := wc.labels for i := range labels { labels[i] = prompbmarshal.Label{} } wc.labels = labels[:0] wc.samples = wc.samples[:0] } var writeRequestCtxPool leveledWriteRequestCtxPool func (sw *scrapeWork) getSeriesAdded(lastScrape, currScrape string) int { if currScrape == "" { return 0 } bodyString := parser.GetRowsDiff(currScrape, lastScrape) return strings.Count(bodyString, "\n") } func (sw *scrapeWork) applySeriesLimit(wc *writeRequestCtx) int { if sw.Config.SeriesLimit <= 0 { return 0 } if sw.seriesLimiter == nil { sw.seriesLimiter = bloomfilter.NewLimiter(sw.Config.SeriesLimit, 24*time.Hour) } sl := sw.seriesLimiter dstSeries := wc.writeRequest.Timeseries[:0] samplesDropped := 0 for _, ts := range wc.writeRequest.Timeseries { h := sw.getLabelsHash(ts.Labels) if !sl.Add(h) { samplesDropped++ continue } dstSeries = append(dstSeries, ts) } prompbmarshal.ResetTimeSeries(wc.writeRequest.Timeseries[len(dstSeries):]) wc.writeRequest.Timeseries = dstSeries if samplesDropped > 0 && !sw.seriesLimitExceeded { sw.seriesLimitExceeded = true } return samplesDropped } var sendStaleSeriesConcurrencyLimitCh = make(chan struct{}, cgroup.AvailableCPUs()) func (sw *scrapeWork) sendStaleSeries(lastScrape, currScrape string, timestamp int64, addAutoSeries bool) { // This function is CPU-bound, while it may allocate big amounts of memory. // That's why it is a good idea to limit the number of concurrent calls to this function // in order to limit memory usage under high load without sacrificing the performance. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3668 sendStaleSeriesConcurrencyLimitCh <- struct{}{} defer func() { <-sendStaleSeriesConcurrencyLimitCh }() if sw.Config.NoStaleMarkers { return } bodyString := lastScrape if currScrape != "" { bodyString = parser.GetRowsDiff(lastScrape, currScrape) } wc := writeRequestCtxPool.Get(sw.prevLabelsLen) defer func() { wc.reset() writeRequestCtxPool.Put(wc) }() if bodyString != "" { // Send stale markers in streaming mode in order to reduce memory usage // when stale markers for targets exposing big number of metrics must be generated. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3668 // and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3675 var mu sync.Mutex br := bytes.NewBufferString(bodyString) err := stream.Parse(br, timestamp, false, false, func(rows []parser.Row) error { mu.Lock() defer mu.Unlock() for i := range rows { sw.addRowToTimeseries(wc, &rows[i], timestamp, true) } // Apply series limit to stale markers in order to prevent sending stale markers for newly created series. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3660 if sw.seriesLimitExceeded { sw.applySeriesLimit(wc) } // Push the collected rows to sw before returning from the callback, since they cannot be held // after returning from the callback - this will result in data race. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825#issuecomment-723198247 setStaleMarkersForRows(wc.writeRequest.Timeseries) sw.pushData(sw.Config.AuthToken, &wc.writeRequest) wc.resetNoRows() return nil }, sw.logError) if err != nil { sw.logError(fmt.Errorf("cannot send stale markers: %w", err).Error()) } } if addAutoSeries { am := &autoMetrics{} sw.addAutoMetrics(am, wc, timestamp) setStaleMarkersForRows(wc.writeRequest.Timeseries) sw.pushData(sw.Config.AuthToken, &wc.writeRequest) } } func setStaleMarkersForRows(series []prompbmarshal.TimeSeries) { for _, tss := range series { samples := tss.Samples for i := range samples { samples[i].Value = decimal.StaleNaN } staleSamplesCreated.Add(len(samples)) } } var staleSamplesCreated = metrics.NewCounter(`vm_promscrape_stale_samples_created_total`) func (sw *scrapeWork) getLabelsHash(labels []prompbmarshal.Label) uint64 { // It is OK if there will be hash collisions for distinct sets of labels, // since the accuracy for `scrape_series_added` metric may be lower than 100%. b := sw.labelsHashBuf[:0] for _, label := range labels { b = append(b, label.Name...) b = append(b, label.Value...) } sw.labelsHashBuf = b return xxhash.Sum64(b) } type autoMetrics struct { up int scrapeDurationSeconds float64 samplesScraped int samplesPostRelabeling int seriesAdded int seriesLimitSamplesDropped int } func isAutoMetric(s string) bool { switch s { case "up", "scrape_duration_seconds", "scrape_samples_scraped", "scrape_samples_post_metric_relabeling", "scrape_series_added", "scrape_timeout_seconds", "scrape_samples_limit", "scrape_series_limit_samples_dropped", "scrape_series_limit", "scrape_series_current": return true } return false } func (sw *scrapeWork) addAutoMetrics(am *autoMetrics, wc *writeRequestCtx, timestamp int64) { sw.addAutoTimeseries(wc, "up", float64(am.up), timestamp) sw.addAutoTimeseries(wc, "scrape_duration_seconds", am.scrapeDurationSeconds, timestamp) sw.addAutoTimeseries(wc, "scrape_samples_scraped", float64(am.samplesScraped), timestamp) sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(am.samplesPostRelabeling), timestamp) sw.addAutoTimeseries(wc, "scrape_series_added", float64(am.seriesAdded), timestamp) sw.addAutoTimeseries(wc, "scrape_timeout_seconds", sw.Config.ScrapeTimeout.Seconds(), timestamp) if sampleLimit := sw.Config.SampleLimit; sampleLimit > 0 { // Expose scrape_samples_limit metric if sample_limit config is set for the target. // See https://github.com/VictoriaMetrics/operator/issues/497 sw.addAutoTimeseries(wc, "scrape_samples_limit", float64(sampleLimit), timestamp) } if sl := sw.seriesLimiter; sl != nil { sw.addAutoTimeseries(wc, "scrape_series_limit_samples_dropped", float64(am.seriesLimitSamplesDropped), timestamp) sw.addAutoTimeseries(wc, "scrape_series_limit", float64(sl.MaxItems()), timestamp) sw.addAutoTimeseries(wc, "scrape_series_current", float64(sl.CurrentItems()), timestamp) } } // addAutoTimeseries adds automatically generated time series with the given name, value and timestamp. // // See https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series func (sw *scrapeWork) addAutoTimeseries(wc *writeRequestCtx, name string, value float64, timestamp int64) { sw.tmpRow.Metric = name sw.tmpRow.Tags = nil sw.tmpRow.Value = value sw.tmpRow.Timestamp = timestamp sw.addRowToTimeseries(wc, &sw.tmpRow, timestamp, false) } func (sw *scrapeWork) addRowToTimeseries(wc *writeRequestCtx, r *parser.Row, timestamp int64, needRelabel bool) { metric := r.Metric // Add `exported_` prefix to metrics, which clash with the automatically generated // metric names only if the following conditions are met: // // - The `honor_labels` option isn't set to true in the scrape_config. // If `honor_labels: true`, then the scraped metric name must remain unchanged // because the user explicitly asked about it in the config. // - The metric has no labels (tags). If it has labels, then the metric value // will be written into a separate time series comparing to automatically generated time series. // // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3557 // and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3406 if needRelabel && !sw.Config.HonorLabels && len(r.Tags) == 0 && isAutoMetric(metric) { bb := bbPool.Get() bb.B = append(bb.B, "exported_"...) bb.B = append(bb.B, metric...) metric = bytesutil.InternBytes(bb.B) bbPool.Put(bb) } labelsLen := len(wc.labels) targetLabels := sw.Config.Labels.GetLabels() wc.labels = appendLabels(wc.labels, metric, r.Tags, targetLabels, sw.Config.HonorLabels) if needRelabel { wc.labels = sw.Config.MetricRelabelConfigs.Apply(wc.labels, labelsLen) } wc.labels = promrelabel.FinalizeLabels(wc.labels[:labelsLen], wc.labels[labelsLen:]) if len(wc.labels) == labelsLen { // Skip row without labels. return } // Add labels from `global->external_labels` section after the relabeling like Prometheus does. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3137 externalLabels := sw.Config.ExternalLabels.GetLabels() wc.labels = appendExtraLabels(wc.labels, externalLabels, labelsLen, sw.Config.HonorLabels) sampleTimestamp := r.Timestamp if !sw.Config.HonorTimestamps || sampleTimestamp == 0 { sampleTimestamp = timestamp } wc.samples = append(wc.samples, prompbmarshal.Sample{ Value: r.Value, Timestamp: sampleTimestamp, }) wr := &wc.writeRequest wr.Timeseries = append(wr.Timeseries, prompbmarshal.TimeSeries{ Labels: wc.labels[labelsLen:], Samples: wc.samples[len(wc.samples)-1:], }) } var bbPool bytesutil.ByteBufferPool func appendLabels(dst []prompbmarshal.Label, metric string, src []parser.Tag, extraLabels []prompbmarshal.Label, honorLabels bool) []prompbmarshal.Label { dstLen := len(dst) dst = append(dst, prompbmarshal.Label{ Name: "__name__", Value: metric, }) for i := range src { tag := &src[i] dst = append(dst, prompbmarshal.Label{ Name: tag.Key, Value: tag.Value, }) } return appendExtraLabels(dst, extraLabels, dstLen, honorLabels) } func appendExtraLabels(dst, extraLabels []prompbmarshal.Label, offset int, honorLabels bool) []prompbmarshal.Label { // Add extraLabels to labels. // Handle duplicates in the same way as Prometheus does. if len(dst) == offset { // Fast path - add extraLabels to dst without the need to de-duplicate. dst = append(dst, extraLabels...) return dst } offsetEnd := len(dst) for _, label := range extraLabels { labels := dst[offset:offsetEnd] prevLabel := promrelabel.GetLabelByName(labels, label.Name) if prevLabel == nil { // Fast path - the label doesn't exist in labels, so just add it to dst. dst = append(dst, label) continue } if honorLabels { // Skip the extra label with the same name. continue } // Rename the prevLabel to "exported_" + label.Name // See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config exportedName := "exported_" + label.Name exportedLabel := promrelabel.GetLabelByName(labels, exportedName) if exportedLabel != nil { // The label with the name exported_ already exists. // Add yet another 'exported_' prefix to it. exportedLabel.Name = "exported_" + exportedName } prevLabel.Name = exportedName dst = append(dst, label) } return dst }