From 00ec2b71892e7ce6a0598f7c5b7d98ebd1cf05b6 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 26 Sep 2020 04:20:47 +0300 Subject: [PATCH] lib/protoparser: use all the available CPU cores for processing ingested data from a single /api/v1/import stream Previously a single data ingestion stream to /api/v1/import could load only a single CPU core. --- docs/Single-server-VictoriaMetrics.md | 3 - lib/protoparser/vmimport/streamparser.go | 75 +++++++++++++++++++++--- 2 files changed, 66 insertions(+), 12 deletions(-) diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index 2becf1587..f46d550d8 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -822,9 +822,6 @@ For example, `/api/v1/import?extra_label=foo=bar` would add `"foo":"bar"` label Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail. -Each request to `/api/v1/import` can load up to a single vCPU core on VictoriaMetrics. Import speed can be improved by splitting the original file into smaller parts -and importing them concurrently. Note that the original file must be split on newlines. - ### Relabeling diff --git a/lib/protoparser/vmimport/streamparser.go b/lib/protoparser/vmimport/streamparser.go index 9cfc0be24..781c13238 100644 --- a/lib/protoparser/vmimport/streamparser.go +++ b/lib/protoparser/vmimport/streamparser.go @@ -1,6 +1,7 @@ package vmimport import ( + "bufio" "fmt" "io" "net/http" @@ -9,17 +10,20 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/metrics" ) -var maxLineLen = flagutil.NewBytes("import.maxLineLen", 100*1024*1024, "The maximum length in bytes of a single line accepted by /api/v1/import") +var maxLineLen = flagutil.NewBytes("import.maxLineLen", 100*1024*1024, "The maximum length in bytes of a single line accepted by /api/v1/import; "+ + "the line length can be limited with `max_rows_per_line` query arg passed to /api/v1/export") // ParseStream parses /api/v1/import lines from req and calls callback for the parsed rows. // // The callback can be called multiple times for streamed data from req. // // callback shouldn't hold rows after returning. +// callback is called from multiple concurrent goroutines. func ParseStream(req *http.Request, callback func(rows []Row) error) error { r := req.Body if req.Header.Get("Content-Encoding") == "gzip" { @@ -30,13 +34,45 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error { defer common.PutGzipReader(zr) r = zr } + // By default req.Body uses 4Kb buffer. This size is too small for typical request to /api/v1/import, + // so use slightly bigger buffer in order to reduce read syscall overhead. + br := bufio.NewReaderSize(r, 1024*1024) + + // Start gomaxprocs workers for processing the parsed data in parallel. + gomaxprocs := runtime.GOMAXPROCS(-1) + workCh := make(chan *unmarshalWork, 8*gomaxprocs) + var wg sync.WaitGroup + defer func() { + close(workCh) + wg.Wait() + }() + wg.Add(gomaxprocs) + for i := 0; i < gomaxprocs; i++ { + go func() { + defer wg.Done() + for uw := range workCh { + uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf)) + rows := uw.rows.Rows + for i := range rows { + row := &rows[i] + rowsRead.Add(len(row.Timestamps)) + } + if err := callback(rows); err != nil { + logger.Errorf("error when processing imported data: %s", err) + putUnmarshalWork(uw) + continue + } + putUnmarshalWork(uw) + } + }() + } ctx := getStreamContext() defer putStreamContext(ctx) - for ctx.Read(r) { - if err := callback(ctx.Rows.Rows); err != nil { - return err - } + for ctx.Read(br) { + uw := getUnmarshalWork() + uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf...) + workCh <- uw } return ctx.Error() } @@ -54,8 +90,6 @@ func (ctx *streamContext) Read(r io.Reader) bool { } return false } - ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf)) - rowsRead.Add(len(ctx.Rows.Rows)) return true } @@ -66,7 +100,6 @@ var ( ) type streamContext struct { - Rows Rows reqBuf []byte tailBuf []byte err error @@ -80,7 +113,6 @@ func (ctx *streamContext) Error() error { } func (ctx *streamContext) reset() { - ctx.Rows.Reset() ctx.reqBuf = ctx.reqBuf[:0] ctx.tailBuf = ctx.tailBuf[:0] ctx.err = nil @@ -109,3 +141,28 @@ func putStreamContext(ctx *streamContext) { var streamContextPool sync.Pool var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) + +type unmarshalWork struct { + rows Rows + reqBuf []byte +} + +func (uw *unmarshalWork) reset() { + uw.rows.Reset() + uw.reqBuf = uw.reqBuf[:0] +} + +func getUnmarshalWork() *unmarshalWork { + v := unmarshalWorkPool.Get() + if v == nil { + return &unmarshalWork{} + } + return v.(*unmarshalWork) +} + +func putUnmarshalWork(uw *unmarshalWork) { + uw.reset() + unmarshalWorkPool.Put(uw) +} + +var unmarshalWorkPool sync.Pool