mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-11 15:34:56 +00:00
app/vminsert: limit memory usage when ingesting data in big packets
This commit is contained in:
parent
53c87ba341
commit
a090627059
10 changed files with 55 additions and 23 deletions
|
@ -41,7 +41,6 @@ func (ctx *InsertCtx) Reset(rowsLen int) {
|
||||||
}
|
}
|
||||||
ctx.mrs = ctx.mrs[:0]
|
ctx.mrs = ctx.mrs[:0]
|
||||||
ctx.metricNamesBuf = ctx.metricNamesBuf[:0]
|
ctx.metricNamesBuf = ctx.metricNamesBuf[:0]
|
||||||
|
|
||||||
ctx.relabelCtx.Reset()
|
ctx.relabelCtx.Reset()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -54,23 +53,28 @@ func (ctx *InsertCtx) marshalMetricNameRaw(prefix []byte, labels []prompb.Label)
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteDataPoint writes (timestamp, value) with the given prefix and labels into ctx buffer.
|
// WriteDataPoint writes (timestamp, value) with the given prefix and labels into ctx buffer.
|
||||||
func (ctx *InsertCtx) WriteDataPoint(prefix []byte, labels []prompb.Label, timestamp int64, value float64) {
|
func (ctx *InsertCtx) WriteDataPoint(prefix []byte, labels []prompb.Label, timestamp int64, value float64) error {
|
||||||
metricNameRaw := ctx.marshalMetricNameRaw(prefix, labels)
|
metricNameRaw := ctx.marshalMetricNameRaw(prefix, labels)
|
||||||
ctx.addRow(metricNameRaw, timestamp, value)
|
return ctx.addRow(metricNameRaw, timestamp, value)
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteDataPointExt writes (timestamp, value) with the given metricNameRaw and labels into ctx buffer.
|
// WriteDataPointExt writes (timestamp, value) with the given metricNameRaw and labels into ctx buffer.
|
||||||
//
|
//
|
||||||
// It returns metricNameRaw for the given labels if len(metricNameRaw) == 0.
|
// It returns metricNameRaw for the given labels if len(metricNameRaw) == 0.
|
||||||
func (ctx *InsertCtx) WriteDataPointExt(metricNameRaw []byte, labels []prompb.Label, timestamp int64, value float64) []byte {
|
func (ctx *InsertCtx) WriteDataPointExt(metricNameRaw []byte, labels []prompb.Label, timestamp int64, value float64) ([]byte, error) {
|
||||||
if len(metricNameRaw) == 0 {
|
if len(metricNameRaw) == 0 {
|
||||||
metricNameRaw = ctx.marshalMetricNameRaw(nil, labels)
|
metricNameRaw = ctx.marshalMetricNameRaw(nil, labels)
|
||||||
}
|
}
|
||||||
ctx.addRow(metricNameRaw, timestamp, value)
|
err := ctx.addRow(metricNameRaw, timestamp, value)
|
||||||
return metricNameRaw
|
return metricNameRaw, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ctx *InsertCtx) addRow(metricNameRaw []byte, timestamp int64, value float64) {
|
func (ctx *InsertCtx) addRow(metricNameRaw []byte, timestamp int64, value float64) error {
|
||||||
|
if len(ctx.metricNamesBuf) > 16*1024*1024 {
|
||||||
|
if err := ctx.FlushBufs(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
mrs := ctx.mrs
|
mrs := ctx.mrs
|
||||||
if cap(mrs) > len(mrs) {
|
if cap(mrs) > len(mrs) {
|
||||||
mrs = mrs[:len(mrs)+1]
|
mrs = mrs[:len(mrs)+1]
|
||||||
|
@ -82,6 +86,7 @@ func (ctx *InsertCtx) addRow(metricNameRaw []byte, timestamp int64, value float6
|
||||||
mr.MetricNameRaw = metricNameRaw
|
mr.MetricNameRaw = metricNameRaw
|
||||||
mr.Timestamp = timestamp
|
mr.Timestamp = timestamp
|
||||||
mr.Value = value
|
mr.Value = value
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddLabelBytes adds (name, value) label to ctx.Labels.
|
// AddLabelBytes adds (name, value) label to ctx.Labels.
|
||||||
|
@ -127,11 +132,13 @@ func (ctx *InsertCtx) ApplyRelabeling() {
|
||||||
|
|
||||||
// FlushBufs flushes buffered rows to the underlying storage.
|
// FlushBufs flushes buffered rows to the underlying storage.
|
||||||
func (ctx *InsertCtx) FlushBufs() error {
|
func (ctx *InsertCtx) FlushBufs() error {
|
||||||
if err := vmstorage.AddRows(ctx.mrs); err != nil {
|
err := vmstorage.AddRows(ctx.mrs)
|
||||||
return &httpserver.ErrorWithStatusCode{
|
ctx.Reset(0)
|
||||||
Err: fmt.Errorf("cannot store metrics: %w", err),
|
if err == nil {
|
||||||
StatusCode: http.StatusServiceUnavailable,
|
return nil
|
||||||
}
|
}
|
||||||
|
return &httpserver.ErrorWithStatusCode{
|
||||||
|
Err: fmt.Errorf("cannot store metrics: %w", err),
|
||||||
|
StatusCode: http.StatusServiceUnavailable,
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,7 +45,9 @@ func insertRows(rows []parser.Row) error {
|
||||||
// Skip metric without labels.
|
// Skip metric without labels.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value)
|
if err := ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
rowsInserted.Add(len(rows))
|
rowsInserted.Add(len(rows))
|
||||||
rowsPerInsert.Update(float64(len(rows)))
|
rowsPerInsert.Update(float64(len(rows)))
|
||||||
|
|
|
@ -45,7 +45,9 @@ func insertRows(rows []parser.Row) error {
|
||||||
// Skip metric without labels.
|
// Skip metric without labels.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value)
|
if err := ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
rowsInserted.Add(len(rows))
|
rowsInserted.Add(len(rows))
|
||||||
rowsPerInsert.Update(float64(len(rows)))
|
rowsPerInsert.Update(float64(len(rows)))
|
||||||
|
|
|
@ -98,7 +98,9 @@ func insertRows(db string, rows []parser.Row) error {
|
||||||
// Skip metric without labels.
|
// Skip metric without labels.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
ic.WriteDataPoint(nil, ic.Labels, r.Timestamp, f.Value)
|
if err := ic.WriteDataPoint(nil, ic.Labels, r.Timestamp, f.Value); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
ctx.metricNameBuf = storage.MarshalMetricNameRaw(ctx.metricNameBuf[:0], ic.Labels)
|
ctx.metricNameBuf = storage.MarshalMetricNameRaw(ctx.metricNameBuf[:0], ic.Labels)
|
||||||
|
@ -115,7 +117,9 @@ func insertRows(db string, rows []parser.Row) error {
|
||||||
// Skip metric without labels.
|
// Skip metric without labels.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
ic.WriteDataPoint(ctx.metricNameBuf, ic.Labels[len(ic.Labels)-1:], r.Timestamp, f.Value)
|
if err := ic.WriteDataPoint(ctx.metricNameBuf, ic.Labels[len(ic.Labels)-1:], r.Timestamp, f.Value); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
rowsTotal += len(r.Fields)
|
rowsTotal += len(r.Fields)
|
||||||
|
|
|
@ -45,7 +45,9 @@ func insertRows(rows []parser.Row) error {
|
||||||
// Skip metric without labels.
|
// Skip metric without labels.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value)
|
if err := ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
rowsInserted.Add(len(rows))
|
rowsInserted.Add(len(rows))
|
||||||
rowsPerInsert.Update(float64(len(rows)))
|
rowsPerInsert.Update(float64(len(rows)))
|
||||||
|
|
|
@ -51,7 +51,9 @@ func insertRows(rows []parser.Row) error {
|
||||||
// Skip metric without labels.
|
// Skip metric without labels.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value)
|
if err := ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
rowsInserted.Add(len(rows))
|
rowsInserted.Add(len(rows))
|
||||||
rowsPerInsert.Update(float64(len(rows)))
|
rowsPerInsert.Update(float64(len(rows)))
|
||||||
|
|
|
@ -44,7 +44,9 @@ func insertRows(rows []parser.Row) error {
|
||||||
// Skip metric without labels.
|
// Skip metric without labels.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value)
|
if err := ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
rowsInserted.Add(len(rows))
|
rowsInserted.Add(len(rows))
|
||||||
rowsPerInsert.Update(float64(len(rows)))
|
rowsPerInsert.Update(float64(len(rows)))
|
||||||
|
|
|
@ -53,9 +53,14 @@ func push(ctx *common.InsertCtx, tss []prompbmarshal.TimeSeries) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
var metricNameRaw []byte
|
var metricNameRaw []byte
|
||||||
|
var err error
|
||||||
for i := range ts.Samples {
|
for i := range ts.Samples {
|
||||||
r := &ts.Samples[i]
|
r := &ts.Samples[i]
|
||||||
metricNameRaw = ctx.WriteDataPointExt(metricNameRaw, ctx.Labels, r.Timestamp, r.Value)
|
metricNameRaw, err = ctx.WriteDataPointExt(metricNameRaw, ctx.Labels, r.Timestamp, r.Value)
|
||||||
|
if err != nil {
|
||||||
|
logger.Errorf("cannot write promscape data to storage: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
rowsTotal += len(ts.Samples)
|
rowsTotal += len(ts.Samples)
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,9 +49,13 @@ func insertRows(timeseries []prompb.TimeSeries) error {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
var metricNameRaw []byte
|
var metricNameRaw []byte
|
||||||
|
var err error
|
||||||
for i := range ts.Samples {
|
for i := range ts.Samples {
|
||||||
r := &ts.Samples[i]
|
r := &ts.Samples[i]
|
||||||
metricNameRaw = ctx.WriteDataPointExt(metricNameRaw, ctx.Labels, r.Timestamp, r.Value)
|
metricNameRaw, err = ctx.WriteDataPointExt(metricNameRaw, ctx.Labels, r.Timestamp, r.Value)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
rowsTotal += len(ts.Samples)
|
rowsTotal += len(ts.Samples)
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,7 +59,9 @@ func insertRows(rows []parser.Row) error {
|
||||||
_ = timestamps[len(values)-1]
|
_ = timestamps[len(values)-1]
|
||||||
for j, value := range values {
|
for j, value := range values {
|
||||||
timestamp := timestamps[j]
|
timestamp := timestamps[j]
|
||||||
ic.WriteDataPoint(ctx.metricNameBuf, nil, timestamp, value)
|
if err := ic.WriteDataPoint(ctx.metricNameBuf, nil, timestamp, value); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
rowsTotal += len(values)
|
rowsTotal += len(values)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue