diff --git a/app/vminsert/common/insert_ctx.go b/app/vminsert/common/insert_ctx.go index 33397fade..6f01c845b 100644 --- a/app/vminsert/common/insert_ctx.go +++ b/app/vminsert/common/insert_ctx.go @@ -41,7 +41,6 @@ func (ctx *InsertCtx) Reset(rowsLen int) { } ctx.mrs = ctx.mrs[:0] ctx.metricNamesBuf = ctx.metricNamesBuf[:0] - ctx.relabelCtx.Reset() } @@ -54,23 +53,28 @@ func (ctx *InsertCtx) marshalMetricNameRaw(prefix []byte, labels []prompb.Label) } // WriteDataPoint writes (timestamp, value) with the given prefix and labels into ctx buffer. -func (ctx *InsertCtx) WriteDataPoint(prefix []byte, labels []prompb.Label, timestamp int64, value float64) { +func (ctx *InsertCtx) WriteDataPoint(prefix []byte, labels []prompb.Label, timestamp int64, value float64) error { metricNameRaw := ctx.marshalMetricNameRaw(prefix, labels) - ctx.addRow(metricNameRaw, timestamp, value) + return ctx.addRow(metricNameRaw, timestamp, value) } // WriteDataPointExt writes (timestamp, value) with the given metricNameRaw and labels into ctx buffer. // // It returns metricNameRaw for the given labels if len(metricNameRaw) == 0. -func (ctx *InsertCtx) WriteDataPointExt(metricNameRaw []byte, labels []prompb.Label, timestamp int64, value float64) []byte { +func (ctx *InsertCtx) WriteDataPointExt(metricNameRaw []byte, labels []prompb.Label, timestamp int64, value float64) ([]byte, error) { if len(metricNameRaw) == 0 { metricNameRaw = ctx.marshalMetricNameRaw(nil, labels) } - ctx.addRow(metricNameRaw, timestamp, value) - return metricNameRaw + err := ctx.addRow(metricNameRaw, timestamp, value) + return metricNameRaw, err } -func (ctx *InsertCtx) addRow(metricNameRaw []byte, timestamp int64, value float64) { +func (ctx *InsertCtx) addRow(metricNameRaw []byte, timestamp int64, value float64) error { + if len(ctx.metricNamesBuf) > 16*1024*1024 { + if err := ctx.FlushBufs(); err != nil { + return err + } + } mrs := ctx.mrs if cap(mrs) > len(mrs) { mrs = mrs[:len(mrs)+1] @@ -82,6 +86,7 @@ func (ctx *InsertCtx) addRow(metricNameRaw []byte, timestamp int64, value float6 mr.MetricNameRaw = metricNameRaw mr.Timestamp = timestamp mr.Value = value + return nil } // AddLabelBytes adds (name, value) label to ctx.Labels. @@ -127,11 +132,13 @@ func (ctx *InsertCtx) ApplyRelabeling() { // FlushBufs flushes buffered rows to the underlying storage. func (ctx *InsertCtx) FlushBufs() error { - if err := vmstorage.AddRows(ctx.mrs); err != nil { - return &httpserver.ErrorWithStatusCode{ - Err: fmt.Errorf("cannot store metrics: %w", err), - StatusCode: http.StatusServiceUnavailable, - } + err := vmstorage.AddRows(ctx.mrs) + ctx.Reset(0) + if err == nil { + return nil + } + return &httpserver.ErrorWithStatusCode{ + Err: fmt.Errorf("cannot store metrics: %w", err), + StatusCode: http.StatusServiceUnavailable, } - return nil } diff --git a/app/vminsert/csvimport/request_handler.go b/app/vminsert/csvimport/request_handler.go index e3034b28e..23590476d 100644 --- a/app/vminsert/csvimport/request_handler.go +++ b/app/vminsert/csvimport/request_handler.go @@ -45,7 +45,9 @@ func insertRows(rows []parser.Row) error { // Skip metric without labels. continue } - ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value) + if err := ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value); err != nil { + return err + } } rowsInserted.Add(len(rows)) rowsPerInsert.Update(float64(len(rows))) diff --git a/app/vminsert/graphite/request_handler.go b/app/vminsert/graphite/request_handler.go index 5b3c0e4d3..84532485c 100644 --- a/app/vminsert/graphite/request_handler.go +++ b/app/vminsert/graphite/request_handler.go @@ -45,7 +45,9 @@ func insertRows(rows []parser.Row) error { // Skip metric without labels. continue } - ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value) + if err := ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value); err != nil { + return err + } } rowsInserted.Add(len(rows)) rowsPerInsert.Update(float64(len(rows))) diff --git a/app/vminsert/influx/request_handler.go b/app/vminsert/influx/request_handler.go index b6128c7d7..196117f01 100644 --- a/app/vminsert/influx/request_handler.go +++ b/app/vminsert/influx/request_handler.go @@ -98,7 +98,9 @@ func insertRows(db string, rows []parser.Row) error { // Skip metric without labels. continue } - ic.WriteDataPoint(nil, ic.Labels, r.Timestamp, f.Value) + if err := ic.WriteDataPoint(nil, ic.Labels, r.Timestamp, f.Value); err != nil { + return err + } } } else { ctx.metricNameBuf = storage.MarshalMetricNameRaw(ctx.metricNameBuf[:0], ic.Labels) @@ -115,7 +117,9 @@ func insertRows(db string, rows []parser.Row) error { // Skip metric without labels. continue } - ic.WriteDataPoint(ctx.metricNameBuf, ic.Labels[len(ic.Labels)-1:], r.Timestamp, f.Value) + if err := ic.WriteDataPoint(ctx.metricNameBuf, ic.Labels[len(ic.Labels)-1:], r.Timestamp, f.Value); err != nil { + return err + } } } rowsTotal += len(r.Fields) diff --git a/app/vminsert/opentsdb/request_handler.go b/app/vminsert/opentsdb/request_handler.go index ae0c52c3d..852708862 100644 --- a/app/vminsert/opentsdb/request_handler.go +++ b/app/vminsert/opentsdb/request_handler.go @@ -45,7 +45,9 @@ func insertRows(rows []parser.Row) error { // Skip metric without labels. continue } - ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value) + if err := ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value); err != nil { + return err + } } rowsInserted.Add(len(rows)) rowsPerInsert.Update(float64(len(rows))) diff --git a/app/vminsert/opentsdbhttp/request_handler.go b/app/vminsert/opentsdbhttp/request_handler.go index c2c888bd1..927d083b2 100644 --- a/app/vminsert/opentsdbhttp/request_handler.go +++ b/app/vminsert/opentsdbhttp/request_handler.go @@ -51,7 +51,9 @@ func insertRows(rows []parser.Row) error { // Skip metric without labels. continue } - ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value) + if err := ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value); err != nil { + return err + } } rowsInserted.Add(len(rows)) rowsPerInsert.Update(float64(len(rows))) diff --git a/app/vminsert/prometheusimport/request_handler.go b/app/vminsert/prometheusimport/request_handler.go index e76feb87d..13e779fca 100644 --- a/app/vminsert/prometheusimport/request_handler.go +++ b/app/vminsert/prometheusimport/request_handler.go @@ -44,7 +44,9 @@ func insertRows(rows []parser.Row) error { // Skip metric without labels. continue } - ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value) + if err := ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value); err != nil { + return err + } } rowsInserted.Add(len(rows)) rowsPerInsert.Update(float64(len(rows))) diff --git a/app/vminsert/prompush/push.go b/app/vminsert/prompush/push.go index 15790be1e..8333c8974 100644 --- a/app/vminsert/prompush/push.go +++ b/app/vminsert/prompush/push.go @@ -53,9 +53,14 @@ func push(ctx *common.InsertCtx, tss []prompbmarshal.TimeSeries) { continue } var metricNameRaw []byte + var err error for i := range ts.Samples { r := &ts.Samples[i] - metricNameRaw = ctx.WriteDataPointExt(metricNameRaw, ctx.Labels, r.Timestamp, r.Value) + metricNameRaw, err = ctx.WriteDataPointExt(metricNameRaw, ctx.Labels, r.Timestamp, r.Value) + if err != nil { + logger.Errorf("cannot write promscape data to storage: %s", err) + return + } } rowsTotal += len(ts.Samples) } diff --git a/app/vminsert/promremotewrite/request_handler.go b/app/vminsert/promremotewrite/request_handler.go index 684038401..27a7ffb6e 100644 --- a/app/vminsert/promremotewrite/request_handler.go +++ b/app/vminsert/promremotewrite/request_handler.go @@ -49,9 +49,13 @@ func insertRows(timeseries []prompb.TimeSeries) error { continue } var metricNameRaw []byte + var err error for i := range ts.Samples { r := &ts.Samples[i] - metricNameRaw = ctx.WriteDataPointExt(metricNameRaw, ctx.Labels, r.Timestamp, r.Value) + metricNameRaw, err = ctx.WriteDataPointExt(metricNameRaw, ctx.Labels, r.Timestamp, r.Value) + if err != nil { + return err + } } rowsTotal += len(ts.Samples) } diff --git a/app/vminsert/vmimport/request_handler.go b/app/vminsert/vmimport/request_handler.go index 896c47e66..c015cede2 100644 --- a/app/vminsert/vmimport/request_handler.go +++ b/app/vminsert/vmimport/request_handler.go @@ -59,7 +59,9 @@ func insertRows(rows []parser.Row) error { _ = timestamps[len(values)-1] for j, value := range values { timestamp := timestamps[j] - ic.WriteDataPoint(ctx.metricNameBuf, nil, timestamp, value) + if err := ic.WriteDataPoint(ctx.metricNameBuf, nil, timestamp, value); err != nil { + return err + } } rowsTotal += len(values) }