VictoriaMetrics/lib/protoparser/promremotewrite/stream/streamparser.go
Aliaksandr Valialkin 2c334ed953
app/{vmagent,vminsert}: follow-up for NewRelic data ingestion protocol support
This is a follow-up for f60c08a7bd

Changes:

- Make sure all the urls related to NewRelic protocol start from /newrelic . Previously some urls were started from /api/v1/newrelic

- Remove /api/v1 part from NewRelic urls, since it has no sense

- Remove automatic transformation from CamelCase to snake_case for NewRelic labels and metric names,
  since it may complicate the transition from NewRelic to VictoriaMetrics. Preserve all the metric names and label names,
  so users could query metrics and labels by the same names which are used in NewRelic.
  The automatic transformation from CamelCase to snake_case can be added later as a special action for relabeling rules if needed.

- Properly update per-tenant data ingestion stats at app/vmagent/newrelic/request_handler.go . Previously it was always zero.

- Fix NewRelic urls in vmagent when multitenant data ingestion is enabled. Previously they were mistakenly started from `/`.

- Document NewRelic data ingestion url at docs/Cluster-VictoriaMetrics.md

- Remove superflouos memory allocations at lib/protoparser/newrelic

- Improve tests at lib/protoparser/newrelic/*

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3520
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4712
2023-10-16 00:25:25 +02:00

153 lines
4.4 KiB
Go

package stream
import (
"bufio"
"fmt"
"io"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/zstd"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
"github.com/golang/snappy"
)
var maxInsertRequestSize = flagutil.NewBytes("maxInsertRequestSize", 32*1024*1024, "The maximum size in bytes of a single Prometheus remote_write API request")
// Parse parses Prometheus remote_write message from reader and calls callback for the parsed timeseries.
//
// callback shouldn't hold tss after returning.
func Parse(r io.Reader, isVMRemoteWrite bool, callback func(tss []prompb.TimeSeries) error) error {
wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr)
r = wcr
ctx := getPushCtx(r)
defer putPushCtx(ctx)
if err := ctx.Read(); err != nil {
return err
}
// Synchronously process the request in order to properly return errors to Parse caller,
// so it could properly return HTTP 503 status code in response.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/896
bb := bodyBufferPool.Get()
defer bodyBufferPool.Put(bb)
var err error
if isVMRemoteWrite {
bb.B, err = zstd.Decompress(bb.B[:0], ctx.reqBuf.B)
if err != nil {
return fmt.Errorf("cannot decompress zstd-encoded request with length %d: %w", len(ctx.reqBuf.B), err)
}
} else {
bb.B, err = snappy.Decode(bb.B[:cap(bb.B)], ctx.reqBuf.B)
if err != nil {
return fmt.Errorf("cannot decompress snappy-encoded request with length %d: %w", len(ctx.reqBuf.B), err)
}
}
if int64(len(bb.B)) > maxInsertRequestSize.N {
return fmt.Errorf("too big unpacked request; mustn't exceed `-maxInsertRequestSize=%d` bytes; got %d bytes", maxInsertRequestSize.N, len(bb.B))
}
wr := getWriteRequest()
defer putWriteRequest(wr)
if err := wr.Unmarshal(bb.B); err != nil {
unmarshalErrors.Inc()
return fmt.Errorf("cannot unmarshal prompb.WriteRequest with size %d bytes: %w", len(bb.B), err)
}
rows := 0
tss := wr.Timeseries
for i := range tss {
rows += len(tss[i].Samples)
}
rowsRead.Add(rows)
if err := callback(tss); err != nil {
return fmt.Errorf("error when processing imported data: %w", err)
}
return nil
}
var bodyBufferPool bytesutil.ByteBufferPool
type pushCtx struct {
br *bufio.Reader
reqBuf bytesutil.ByteBuffer
}
func (ctx *pushCtx) reset() {
ctx.br.Reset(nil)
ctx.reqBuf.Reset()
}
func (ctx *pushCtx) Read() error {
readCalls.Inc()
lr := io.LimitReader(ctx.br, int64(maxInsertRequestSize.N)+1)
startTime := fasttime.UnixTimestamp()
reqLen, err := ctx.reqBuf.ReadFrom(lr)
if err != nil {
readErrors.Inc()
return fmt.Errorf("cannot read compressed request in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err)
}
if reqLen > int64(maxInsertRequestSize.N) {
readErrors.Inc()
return fmt.Errorf("too big packed request; mustn't exceed -maxInsertRequestSize=%d bytes", maxInsertRequestSize.N)
}
return nil
}
var (
readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="promremotewrite"}`)
readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="promremotewrite"}`)
rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="promremotewrite"}`)
unmarshalErrors = metrics.NewCounter(`vm_protoparser_unmarshal_errors_total{type="promremotewrite"}`)
)
func getPushCtx(r io.Reader) *pushCtx {
select {
case ctx := <-pushCtxPoolCh:
ctx.br.Reset(r)
return ctx
default:
if v := pushCtxPool.Get(); v != nil {
ctx := v.(*pushCtx)
ctx.br.Reset(r)
return ctx
}
return &pushCtx{
br: bufio.NewReaderSize(r, 64*1024),
}
}
}
func putPushCtx(ctx *pushCtx) {
ctx.reset()
select {
case pushCtxPoolCh <- ctx:
default:
pushCtxPool.Put(ctx)
}
}
var pushCtxPool sync.Pool
var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs())
func getWriteRequest() *prompb.WriteRequest {
v := writeRequestPool.Get()
if v == nil {
return &prompb.WriteRequest{}
}
return v.(*prompb.WriteRequest)
}
func putWriteRequest(wr *prompb.WriteRequest) {
wr.Reset()
writeRequestPool.Put(wr)
}
var writeRequestPool sync.Pool