Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files

This commit is contained in:
Aliaksandr Valialkin 2023-01-18 00:01:57 -08:00
commit 7737321133
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
4 changed files with 78 additions and 8 deletions

View file

@ -86,6 +86,61 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run("duration-constant", func(t *testing.T) {
t.Parallel()
q := `1h23m5S`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{4985, 4985, 4985, 4985, 4985, 4985},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run("num-with-suffix-1", func(t *testing.T) {
t.Parallel()
q := `123M`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{123e6, 123e6, 123e6, 123e6, 123e6, 123e6},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run("num-with-suffix-2", func(t *testing.T) {
t.Parallel()
q := `1.23TB`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1.23e12, 1.23e12, 1.23e12, 1.23e12, 1.23e12, 1.23e12},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run("num-with-suffix-3", func(t *testing.T) {
t.Parallel()
q := `1.23Mib`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1.23 * (1 << 20), 1.23 * (1 << 20), 1.23 * (1 << 20), 1.23 * (1 << 20), 1.23 * (1 << 20), 1.23 * (1 << 20)},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run("num-with-suffix-4", func(t *testing.T) {
t.Parallel()
q := `1.23mib`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1.23 * (1 << 20), 1.23 * (1 << 20), 1.23 * (1 << 20), 1.23 * (1 << 20), 1.23 * (1 << 20), 1.23 * (1 << 20)},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run("simple-arithmetic", func(t *testing.T) {
t.Parallel()
q := `-1+2 *3 ^ 4+5%6`
@ -7357,7 +7412,7 @@ func TestExecSuccess(t *testing.T) {
})
t.Run(`rollup_scrape_interval()`, func(t *testing.T) {
t.Parallel()
q := `sort_by_label(rollup_scrape_interval(1[5M:10s]), "rollup")`
q := `sort_by_label(rollup_scrape_interval(1[5m:10S]), "rollup")`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{10, 10, 10, 10, 10, 10},

View file

@ -28,6 +28,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): consistently put the scrape url with scrape target labels to all error logs for failed scrapes. Previously some failed scrapes were logged without this information.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): do not send stale markers to remote storage for series exceeding the configured [series limit](https://docs.victoriametrics.com/vmagent.html#cardinality-limiter). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3660).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly apply [series limit](https://docs.victoriametrics.com/vmagent.html#cardinality-limiter) when [staleness tracking](https://docs.victoriametrics.com/vmagent.html#prometheus-staleness-markers) is disabled.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): reduce memory usage spikes when big number of scrape targets disappear at once. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3668). Thanks to @lzfhust for [the initial fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3669).
* BUGFIX: [Pushgateway import](https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format): properly return `200 OK` HTTP response code. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3636).
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly parse `M` and `Mi` suffixes as `1e6` multipliers in `1M` and `1Mi` numeric constants. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3664). The issue has been introduced in [v1.86.0](https://docs.victoriametrics.com/CHANGELOG.html#v1860).
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): properly display range query results at `Table` view. For example, `up[5m]` query now shows all the raw samples for the last 5 minutes for the `up` metric at the `Table` view. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3516).

View file

@ -431,15 +431,15 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
return err
}
var concurrencyLimitCh = make(chan struct{}, cgroup.AvailableCPUs())
var processScrapedDataConcurrencyLimitCh = make(chan struct{}, cgroup.AvailableCPUs())
func (sw *scrapeWork) processScrapedData(scrapeTimestamp, realTimestamp int64, body *bytesutil.ByteBuffer, err error) (bool, error) {
// This function is CPU-bound, while it may allocate big amounts of memory.
// That's why it is a good idea to limit the number of concurrent calls to this function
// in order to limit memory usage under high load without sacrificing the performance.
concurrencyLimitCh <- struct{}{}
processScrapedDataConcurrencyLimitCh <- struct{}{}
defer func() {
<-concurrencyLimitCh
<-processScrapedDataConcurrencyLimitCh
}()
endTimestamp := time.Now().UnixNano() / 1e6
@ -765,7 +765,17 @@ func (sw *scrapeWork) applySeriesLimit(wc *writeRequestCtx) int {
return samplesDropped
}
var sendStaleSeriesConcurrencyLimitCh = make(chan struct{}, cgroup.AvailableCPUs())
func (sw *scrapeWork) sendStaleSeries(lastScrape, currScrape string, timestamp int64, addAutoSeries bool) {
// This function is CPU-bound, while it may allocate big amounts of memory.
// That's why it is a good idea to limit the number of concurrent calls to this function
// in order to limit memory usage under high load without sacrificing the performance.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3668
sendStaleSeriesConcurrencyLimitCh <- struct{}{}
defer func() {
<-sendStaleSeriesConcurrencyLimitCh
}()
if sw.Config.NoStaleMarkers {
return
}
@ -773,7 +783,11 @@ func (sw *scrapeWork) sendStaleSeries(lastScrape, currScrape string, timestamp i
if currScrape != "" {
bodyString = parser.GetRowsDiff(lastScrape, currScrape)
}
wc := &writeRequestCtx{}
wc := writeRequestCtxPool.Get(sw.prevLabelsLen)
defer func() {
wc.reset()
writeRequestCtxPool.Put(wc)
}()
if bodyString != "" {
wc.rows.UnmarshalWithErrLogger(bodyString, sw.logError)
srcRows := wc.rows.Rows

View file

@ -497,7 +497,7 @@ func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) []rawRow {
rrs.mu.Lock()
if cap(rrs.rows) == 0 {
rrs.rows = newRawRowsBlock()
rrs.rows = newRawRows()
}
n := copy(rrs.rows[len(rrs.rows):cap(rrs.rows)], rows)
rrs.rows = rrs.rows[:len(rrs.rows)+n]
@ -524,7 +524,7 @@ type rawRowsBlock struct {
rows []rawRow
}
func newRawRowsBlock() []rawRow {
func newRawRows() []rawRow {
n := getMaxRawRowsPerShard()
return make([]rawRow, 0, n)
}
@ -533,7 +533,7 @@ func getRawRowsBlock() *rawRowsBlock {
v := rawRowsBlockPool.Get()
if v == nil {
return &rawRowsBlock{
rows: newRawRowsBlock(),
rows: newRawRows(),
}
}
return v.(*rawRowsBlock)