lib/promscrape: remove ID field from ScrapeWork struct. Use a pointer to ScrapeWork as a key in targetStatusMap

This simplifies the code a bit.
This commit is contained in:
Aliaksandr Valialkin 2020-12-17 14:30:33 +02:00
parent 9abb2d6c74
commit 2dfa746c91
7 changed files with 29 additions and 30 deletions

View file

@ -8,7 +8,6 @@ import (
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate"
@ -744,7 +743,6 @@ func appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConfig, target string, e
// Reduce memory usage by interning all the strings in labels.
internLabelStrings(labels)
dst = append(dst, &ScrapeWork{
ID: atomic.AddUint64(&nextScrapeWorkID, 1),
ScrapeURL: scrapeURL,
ScrapeInterval: swc.scrapeInterval,
ScrapeTimeout: swc.scrapeTimeout,
@ -764,9 +762,6 @@ func appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConfig, target string, e
return dst, nil
}
// Each ScrapeWork has an ID, which is used for locating it when updating its status.
var nextScrapeWorkID uint64
func internLabelStrings(labels []prompbmarshal.Label) {
for i := range labels {
label := &labels[i]

View file

@ -442,7 +442,6 @@ scrape_configs:
func resetNonEssentialFields(sws []*ScrapeWork) {
for i := range sws {
sws[i].ID = 0
sws[i].OriginalLabels = nil
}
}

View file

@ -355,7 +355,7 @@ func newScraper(sw *ScrapeWork, group string, pushData func(wr *prompbmarshal.Wr
stopCh: make(chan struct{}),
}
c := newClient(sw)
sc.sw.Config = *sw
sc.sw.Config = sw
sc.sw.ScrapeGroup = group
sc.sw.ReadData = c.ReadData
sc.sw.GetStreamReader = c.GetStreamReader

View file

@ -27,10 +27,9 @@ var (
)
// ScrapeWork represents a unit of work for scraping Prometheus metrics.
//
// It must be immutable during its lifetime, since it is read from concurrently running goroutines.
type ScrapeWork struct {
// Unique ID for the ScrapeWork.
ID uint64
// Full URL (including query args) for the scrape.
ScrapeURL string
@ -144,7 +143,7 @@ func promLabelsString(labels []prompbmarshal.Label) string {
type scrapeWork struct {
// Config for the scrape.
Config ScrapeWork
Config *ScrapeWork
// ReadData is called for reading the data.
ReadData func(dst []byte) ([]byte, error)
@ -308,7 +307,7 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
// body must be released only after wc is released, since wc refers to body.
sw.prevBodyLen = len(body.B)
leveledbytebufferpool.Put(body)
tsmGlobal.Update(&sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), err)
tsmGlobal.Update(sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), err)
return err
}
@ -369,7 +368,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
sw.prevRowsLen = len(wc.rows.Rows)
wc.reset()
writeRequestCtxPool.Put(wc)
tsmGlobal.Update(&sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), err)
tsmGlobal.Update(sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), err)
return nil
}

View file

@ -49,6 +49,7 @@ func TestScrapeWorkScrapeInternalFailure(t *testing.T) {
timeseriesExpected := parseData(dataExpected)
var sw scrapeWork
sw.Config = &ScrapeWork{}
readDataCalls := 0
sw.ReadData = func(dst []byte) ([]byte, error) {
@ -87,7 +88,7 @@ func TestScrapeWorkScrapeInternalSuccess(t *testing.T) {
timeseriesExpected := parseData(dataExpected)
var sw scrapeWork
sw.Config = *cfg
sw.Config = cfg
readDataCalls := 0
sw.ReadData = func(dst []byte) ([]byte, error) {

View file

@ -37,6 +37,7 @@ vm_tcplistener_write_calls_total{name="https", addr=":443"} 132356
b.SetBytes(int64(len(data)))
b.RunParallel(func(pb *testing.PB) {
var sw scrapeWork
sw.Config = &ScrapeWork{}
sw.ReadData = readDataFunc
sw.PushData = func(wr *prompbmarshal.WriteRequest) {}
timestamp := int64(0)

View file

@ -59,45 +59,49 @@ func WriteAPIV1Targets(w io.Writer, state string) {
type targetStatusMap struct {
mu sync.Mutex
m map[uint64]*targetStatus
m map[*ScrapeWork]*targetStatus
}
func newTargetStatusMap() *targetStatusMap {
return &targetStatusMap{
m: make(map[uint64]*targetStatus),
m: make(map[*ScrapeWork]*targetStatus),
}
}
func (tsm *targetStatusMap) Reset() {
tsm.mu.Lock()
tsm.m = make(map[uint64]*targetStatus)
tsm.m = make(map[*ScrapeWork]*targetStatus)
tsm.mu.Unlock()
}
func (tsm *targetStatusMap) Register(sw *ScrapeWork) {
tsm.mu.Lock()
tsm.m[sw.ID] = &targetStatus{
sw: *sw,
tsm.m[sw] = &targetStatus{
sw: sw,
}
tsm.mu.Unlock()
}
func (tsm *targetStatusMap) Unregister(sw *ScrapeWork) {
tsm.mu.Lock()
delete(tsm.m, sw.ID)
delete(tsm.m, sw)
tsm.mu.Unlock()
}
func (tsm *targetStatusMap) Update(sw *ScrapeWork, group string, up bool, scrapeTime, scrapeDuration int64, err error) {
tsm.mu.Lock()
tsm.m[sw.ID] = &targetStatus{
sw: *sw,
up: up,
scrapeGroup: group,
scrapeTime: scrapeTime,
scrapeDuration: scrapeDuration,
err: err,
ts := tsm.m[sw]
if ts == nil {
ts = &targetStatus{
sw: sw,
}
tsm.m[sw] = ts
}
ts.up = up
ts.scrapeGroup = group
ts.scrapeTime = scrapeTime
ts.scrapeDuration = scrapeDuration
ts.err = err
tsm.mu.Unlock()
}
@ -123,8 +127,8 @@ func (tsm *targetStatusMap) WriteActiveTargetsJSON(w io.Writer) {
st targetStatus
}
kss := make([]keyStatus, 0, len(tsm.m))
for _, st := range tsm.m {
key := promLabelsString(st.sw.OriginalLabels)
for sw, st := range tsm.m {
key := promLabelsString(sw.OriginalLabels)
kss = append(kss, keyStatus{
key: key,
st: *st,
@ -176,7 +180,7 @@ func writeLabelsJSON(w io.Writer, labels []prompbmarshal.Label) {
}
type targetStatus struct {
sw ScrapeWork
sw *ScrapeWork
up bool
scrapeGroup string
scrapeTime int64