mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-11 15:34:56 +00:00
app/vminsert/prompush: limit memory usage by pushing promscrape data in smaller blocks
This commit is contained in:
parent
76036c1897
commit
032c88561b
1 changed files with 22 additions and 6 deletions
|
@ -17,22 +17,38 @@ var (
|
||||||
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="promscrape"}`)
|
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="promscrape"}`)
|
||||||
)
|
)
|
||||||
|
|
||||||
// Push pushes wr to to storage.
|
const maxRowsPerBlock = 10000
|
||||||
|
|
||||||
|
// Push pushes wr to storage.
|
||||||
func Push(wr *prompbmarshal.WriteRequest) {
|
func Push(wr *prompbmarshal.WriteRequest) {
|
||||||
ctx := getPushCtx()
|
ctx := getPushCtx()
|
||||||
defer putPushCtx(ctx)
|
defer putPushCtx(ctx)
|
||||||
|
|
||||||
timeseries := wr.Timeseries
|
tss := wr.Timeseries
|
||||||
|
for len(tss) > 0 {
|
||||||
|
// Process big tss in smaller blocks in order to reduce maxmimum memory usage
|
||||||
|
tssBlock := tss
|
||||||
|
if len(tssBlock) > maxRowsPerBlock {
|
||||||
|
tssBlock = tss[:maxRowsPerBlock]
|
||||||
|
tss = tss[maxRowsPerBlock:]
|
||||||
|
} else {
|
||||||
|
tss = nil
|
||||||
|
}
|
||||||
|
ctx.push(tssBlock)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ctx *pushCtx) push(tss []prompbmarshal.TimeSeries) {
|
||||||
rowsLen := 0
|
rowsLen := 0
|
||||||
for i := range timeseries {
|
for i := range tss {
|
||||||
rowsLen += len(timeseries[i].Samples)
|
rowsLen += len(tss[i].Samples)
|
||||||
}
|
}
|
||||||
ic := &ctx.Common
|
ic := &ctx.Common
|
||||||
ic.Reset(rowsLen)
|
ic.Reset(rowsLen)
|
||||||
rowsTotal := 0
|
rowsTotal := 0
|
||||||
labels := ctx.labels[:0]
|
labels := ctx.labels[:0]
|
||||||
for i := range timeseries {
|
for i := range tss {
|
||||||
ts := ×eries[i]
|
ts := &tss[i]
|
||||||
labels = labels[:0]
|
labels = labels[:0]
|
||||||
for j := range ts.Labels {
|
for j := range ts.Labels {
|
||||||
label := &ts.Labels[j]
|
label := &ts.Labels[j]
|
||||||
|
|
Loading…
Reference in a new issue