diff --git a/app/vminsert/prompush/push.go b/app/vminsert/prompush/push.go index 8333c8974..6291114f4 100644 --- a/app/vminsert/prompush/push.go +++ b/app/vminsert/prompush/push.go @@ -22,10 +22,19 @@ func Push(wr *prompbmarshal.WriteRequest) { tss := wr.Timeseries for len(tss) > 0 { // Process big tss in smaller blocks in order to reduce maxmimum memory usage + samplesCount := 0 + i := 0 + for i < len(tss) { + samplesCount += len(tss[i].Samples) + i++ + if samplesCount > maxRowsPerBlock { + break + } + } tssBlock := tss - if len(tssBlock) > maxRowsPerBlock { - tssBlock = tss[:maxRowsPerBlock] - tss = tss[maxRowsPerBlock:] + if i < len(tss) { + tssBlock = tss[:i] + tss = tss[i:] } else { tss = nil } diff --git a/app/vminsert/promremotewrite/request_handler.go b/app/vminsert/promremotewrite/request_handler.go index 27a7ffb6e..74568920c 100644 --- a/app/vminsert/promremotewrite/request_handler.go +++ b/app/vminsert/promremotewrite/request_handler.go @@ -50,14 +50,15 @@ func insertRows(timeseries []prompb.TimeSeries) error { } var metricNameRaw []byte var err error - for i := range ts.Samples { - r := &ts.Samples[i] + samples := ts.Samples + for i := range samples { + r := &samples[i] metricNameRaw, err = ctx.WriteDataPointExt(metricNameRaw, ctx.Labels, r.Timestamp, r.Value) if err != nil { return err } } - rowsTotal += len(ts.Samples) + rowsTotal += len(samples) } rowsInserted.Add(rowsTotal) rowsPerInsert.Update(float64(rowsTotal)) diff --git a/app/vminsert/vmimport/request_handler.go b/app/vminsert/vmimport/request_handler.go index c5ccac205..3094409ec 100644 --- a/app/vminsert/vmimport/request_handler.go +++ b/app/vminsert/vmimport/request_handler.go @@ -7,6 +7,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport" @@ -68,7 +69,9 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { ctx.metricNameBuf = storage.MarshalMetricNameRaw(ctx.metricNameBuf[:0], ic.Labels) values := r.Values timestamps := r.Timestamps - _ = timestamps[len(values)-1] + if len(timestamps) != len(values) { + logger.Panicf("BUG: len(timestamps)=%d must match len(values)=%d", len(timestamps), len(values)) + } for j, value := range values { timestamp := timestamps[j] if err := ic.WriteDataPoint(ctx.metricNameBuf, nil, timestamp, value); err != nil {