mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vminsert: code prettifying
This commit is contained in:
parent
b6a976b98d
commit
c00627c103
3 changed files with 20 additions and 7 deletions
|
@ -22,10 +22,19 @@ func Push(wr *prompbmarshal.WriteRequest) {
|
||||||
tss := wr.Timeseries
|
tss := wr.Timeseries
|
||||||
for len(tss) > 0 {
|
for len(tss) > 0 {
|
||||||
// Process big tss in smaller blocks in order to reduce maxmimum memory usage
|
// Process big tss in smaller blocks in order to reduce maxmimum memory usage
|
||||||
|
samplesCount := 0
|
||||||
|
i := 0
|
||||||
|
for i < len(tss) {
|
||||||
|
samplesCount += len(tss[i].Samples)
|
||||||
|
i++
|
||||||
|
if samplesCount > maxRowsPerBlock {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
tssBlock := tss
|
tssBlock := tss
|
||||||
if len(tssBlock) > maxRowsPerBlock {
|
if i < len(tss) {
|
||||||
tssBlock = tss[:maxRowsPerBlock]
|
tssBlock = tss[:i]
|
||||||
tss = tss[maxRowsPerBlock:]
|
tss = tss[i:]
|
||||||
} else {
|
} else {
|
||||||
tss = nil
|
tss = nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,14 +50,15 @@ func insertRows(timeseries []prompb.TimeSeries) error {
|
||||||
}
|
}
|
||||||
var metricNameRaw []byte
|
var metricNameRaw []byte
|
||||||
var err error
|
var err error
|
||||||
for i := range ts.Samples {
|
samples := ts.Samples
|
||||||
r := &ts.Samples[i]
|
for i := range samples {
|
||||||
|
r := &samples[i]
|
||||||
metricNameRaw, err = ctx.WriteDataPointExt(metricNameRaw, ctx.Labels, r.Timestamp, r.Value)
|
metricNameRaw, err = ctx.WriteDataPointExt(metricNameRaw, ctx.Labels, r.Timestamp, r.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
rowsTotal += len(ts.Samples)
|
rowsTotal += len(samples)
|
||||||
}
|
}
|
||||||
rowsInserted.Add(rowsTotal)
|
rowsInserted.Add(rowsTotal)
|
||||||
rowsPerInsert.Update(float64(rowsTotal))
|
rowsPerInsert.Update(float64(rowsTotal))
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||||
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||||
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport"
|
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport"
|
||||||
|
@ -68,7 +69,9 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
|
||||||
ctx.metricNameBuf = storage.MarshalMetricNameRaw(ctx.metricNameBuf[:0], ic.Labels)
|
ctx.metricNameBuf = storage.MarshalMetricNameRaw(ctx.metricNameBuf[:0], ic.Labels)
|
||||||
values := r.Values
|
values := r.Values
|
||||||
timestamps := r.Timestamps
|
timestamps := r.Timestamps
|
||||||
_ = timestamps[len(values)-1]
|
if len(timestamps) != len(values) {
|
||||||
|
logger.Panicf("BUG: len(timestamps)=%d must match len(values)=%d", len(timestamps), len(values))
|
||||||
|
}
|
||||||
for j, value := range values {
|
for j, value := range values {
|
||||||
timestamp := timestamps[j]
|
timestamp := timestamps[j]
|
||||||
if err := ic.WriteDataPoint(ctx.metricNameBuf, nil, timestamp, value); err != nil {
|
if err := ic.WriteDataPoint(ctx.metricNameBuf, nil, timestamp, value); err != nil {
|
||||||
|
|
Loading…
Reference in a new issue