diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index adf2cf85fe..e096d8e6e5 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -131,6 +131,10 @@ type scrapeWork struct { writeRequest prompbmarshal.WriteRequest labels []prompbmarshal.Label samples []prompbmarshal.Sample + + // the prevSeriesMap and lh are used for fast calculation of `scrape_series_added` metric. + prevSeriesMap map[uint64]struct{} + lh *xxhash.Digest } func (sw *scrapeWork) run(stopCh <-chan struct{}) { @@ -226,10 +230,12 @@ func (sw *scrapeWork) scrapeInternal(timestamp int64) error { scrapesSkippedBySampleLimit.Inc() } samplesPostRelabeling := len(sw.writeRequest.Timeseries) + seriesAdded := sw.getSeriesAdded() sw.addAutoTimeseries("up", float64(up), timestamp) sw.addAutoTimeseries("scrape_duration_seconds", duration, timestamp) sw.addAutoTimeseries("scrape_samples_scraped", float64(samplesScraped), timestamp) sw.addAutoTimeseries("scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), timestamp) + sw.addAutoTimeseries("scrape_series_added", float64(seriesAdded), timestamp) startTime := time.Now() sw.PushData(&sw.writeRequest) pushDataDuration.UpdateDuration(startTime) @@ -240,6 +246,44 @@ func (sw *scrapeWork) scrapeInternal(timestamp int64) error { return err } +func (sw *scrapeWork) getSeriesAdded() int { + if sw.lh == nil { + sw.lh = xxhash.New() + } + mPrev := sw.prevSeriesMap + seriesAdded := 0 + for _, ts := range sw.writeRequest.Timeseries { + h := getLabelsHash(sw.lh, ts.Labels) + if _, ok := mPrev[h]; !ok { + seriesAdded++ + } + } + if seriesAdded == 0 { + // Fast path: no new time series added during the last scrape. + return 0 + } + + // Slow path: update the sw.prevSeriesMap, since new time series were added. + m := make(map[uint64]struct{}, len(sw.writeRequest.Timeseries)) + for _, ts := range sw.writeRequest.Timeseries { + h := getLabelsHash(sw.lh, ts.Labels) + m[h] = struct{}{} + } + sw.prevSeriesMap = m + return seriesAdded +} + +func getLabelsHash(lh *xxhash.Digest, labels []prompbmarshal.Label) uint64 { + // It is OK if there will be hash collisions for distinct sets of labels, + // since the accuracy for `scrape_series_added` metric may be lower than 100%. + lh.Reset() + for _, label := range labels { + lh.WriteString(label.Name) + lh.WriteString(label.Value) + } + return lh.Sum64() +} + // addAutoTimeseries adds automatically generated time series with the given name, value and timestamp. // // See https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series diff --git a/lib/promscrape/scrapework_test.go b/lib/promscrape/scrapework_test.go index 3fd1eb4a7c..a755994169 100644 --- a/lib/promscrape/scrapework_test.go +++ b/lib/promscrape/scrapework_test.go @@ -17,6 +17,7 @@ func TestScrapeWorkScrapeInternalFailure(t *testing.T) { scrape_samples_scraped 0 123 scrape_duration_seconds 0 123 scrape_samples_post_metric_relabeling 0 123 + scrape_series_added 0 123 ` timeseriesExpected := parseData(dataExpected) @@ -97,6 +98,7 @@ func TestScrapeWorkScrapeInternalSuccess(t *testing.T) { scrape_samples_scraped 0 123 scrape_duration_seconds 0 123 scrape_samples_post_metric_relabeling 0 123 + scrape_series_added 0 123 `) f(` foo{bar="baz",empty_label=""} 34.45 3 @@ -108,6 +110,7 @@ func TestScrapeWorkScrapeInternalSuccess(t *testing.T) { scrape_samples_scraped 2 123 scrape_duration_seconds 0 123 scrape_samples_post_metric_relabeling 2 123 + scrape_series_added 2 123 `) f(` foo{bar="baz"} 34.45 3 @@ -127,6 +130,7 @@ func TestScrapeWorkScrapeInternalSuccess(t *testing.T) { scrape_samples_scraped{foo="x"} 2 123 scrape_duration_seconds{foo="x"} 0 123 scrape_samples_post_metric_relabeling{foo="x"} 2 123 + scrape_series_added{foo="x"} 2 123 `) f(` foo{job="orig",bar="baz"} 34.45 @@ -146,6 +150,7 @@ func TestScrapeWorkScrapeInternalSuccess(t *testing.T) { scrape_samples_scraped{job="override"} 2 123 scrape_duration_seconds{job="override"} 0 123 scrape_samples_post_metric_relabeling{job="override"} 2 123 + scrape_series_added{job="override"} 2 123 `) // Empty instance override. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/453 f(` @@ -170,6 +175,7 @@ func TestScrapeWorkScrapeInternalSuccess(t *testing.T) { scrape_samples_scraped{instance="foobar",job="xxx"} 2 123 scrape_duration_seconds{instance="foobar",job="xxx"} 0 123 scrape_samples_post_metric_relabeling{instance="foobar",job="xxx"} 2 123 + scrape_series_added{instance="foobar",job="xxx"} 2 123 `) f(` no_instance{instance="",job="some_job",label="val1",test=""} 5555 @@ -193,6 +199,7 @@ func TestScrapeWorkScrapeInternalSuccess(t *testing.T) { scrape_samples_scraped{instance="foobar",job="xxx"} 2 123 scrape_duration_seconds{instance="foobar",job="xxx"} 0 123 scrape_samples_post_metric_relabeling{instance="foobar",job="xxx"} 2 123 + scrape_series_added{instance="foobar",job="xxx"} 2 123 `) f(` foo{job="orig",bar="baz"} 34.45 @@ -212,6 +219,7 @@ func TestScrapeWorkScrapeInternalSuccess(t *testing.T) { scrape_samples_scraped{job="override"} 2 123 scrape_duration_seconds{job="override"} 0 123 scrape_samples_post_metric_relabeling{job="override"} 2 123 + scrape_series_added{job="override"} 2 123 `) f(` foo{bar="baz"} 34.44 @@ -249,6 +257,7 @@ func TestScrapeWorkScrapeInternalSuccess(t *testing.T) { scrape_samples_scraped{job="xx"} 2 123 scrape_duration_seconds{job="xx"} 0 123 scrape_samples_post_metric_relabeling{job="xx"} 2 123 + scrape_series_added{job="xx"} 2 123 `) f(` foo{bar="baz"} 34.44 @@ -285,6 +294,7 @@ func TestScrapeWorkScrapeInternalSuccess(t *testing.T) { scrape_samples_scraped{job="xx",instance="foo.com"} 4 123 scrape_duration_seconds{job="xx",instance="foo.com"} 0 123 scrape_samples_post_metric_relabeling{job="xx",instance="foo.com"} 1 123 + scrape_series_added{job="xx",instance="foo.com"} 1 123 `) f(` foo{bar="baz"} 34.44 @@ -297,6 +307,7 @@ func TestScrapeWorkScrapeInternalSuccess(t *testing.T) { scrape_samples_scraped 2 123 scrape_duration_seconds 0 123 scrape_samples_post_metric_relabeling 0 123 + scrape_series_added 0 123 `) }