lib/protoparser: use all the available CPU cores for processing ingested data from a single /api/v1/import stream

Previously a single data ingestion stream to /api/v1/import could load only a single CPU core.
This commit is contained in:
Aliaksandr Valialkin 2020-09-26 04:20:47 +03:00
parent c00627c103
commit b4bf722d8f
3 changed files with 66 additions and 15 deletions

View file

@ -822,9 +822,6 @@ For example, `/api/v1/import?extra_label=foo=bar` would add `"foo":"bar"` label
Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail.
Each request to `/api/v1/import` can load up to a single vCPU core on VictoriaMetrics. Import speed can be improved by splitting the original file into smaller parts
and importing them concurrently. Note that the original file must be split on newlines.
### Relabeling

View file

@ -822,9 +822,6 @@ For example, `/api/v1/import?extra_label=foo=bar` would add `"foo":"bar"` label
Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail.
Each request to `/api/v1/import` can load up to a single vCPU core on VictoriaMetrics. Import speed can be improved by splitting the original file into smaller parts
and importing them concurrently. Note that the original file must be split on newlines.
### Relabeling

View file

@ -1,6 +1,7 @@
package vmimport
import (
"bufio"
"fmt"
"io"
"net/http"
@ -9,17 +10,20 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/metrics"
)
var maxLineLen = flagutil.NewBytes("import.maxLineLen", 100*1024*1024, "The maximum length in bytes of a single line accepted by /api/v1/import")
var maxLineLen = flagutil.NewBytes("import.maxLineLen", 100*1024*1024, "The maximum length in bytes of a single line accepted by /api/v1/import; "+
"the line length can be limited with `max_rows_per_line` query arg passed to /api/v1/export")
// ParseStream parses /api/v1/import lines from req and calls callback for the parsed rows.
//
// The callback can be called multiple times for streamed data from req.
//
// callback shouldn't hold rows after returning.
// callback is called from multiple concurrent goroutines.
func ParseStream(req *http.Request, callback func(rows []Row) error) error {
r := req.Body
if req.Header.Get("Content-Encoding") == "gzip" {
@ -30,13 +34,45 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error {
defer common.PutGzipReader(zr)
r = zr
}
// By default req.Body uses 4Kb buffer. This size is too small for typical request to /api/v1/import,
// so use slightly bigger buffer in order to reduce read syscall overhead.
br := bufio.NewReaderSize(r, 1024*1024)
// Start gomaxprocs workers for processing the parsed data in parallel.
gomaxprocs := runtime.GOMAXPROCS(-1)
workCh := make(chan *unmarshalWork, 8*gomaxprocs)
var wg sync.WaitGroup
defer func() {
close(workCh)
wg.Wait()
}()
wg.Add(gomaxprocs)
for i := 0; i < gomaxprocs; i++ {
go func() {
defer wg.Done()
for uw := range workCh {
uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf))
rows := uw.rows.Rows
for i := range rows {
row := &rows[i]
rowsRead.Add(len(row.Timestamps))
}
if err := callback(rows); err != nil {
logger.Errorf("error when processing imported data: %s", err)
putUnmarshalWork(uw)
continue
}
putUnmarshalWork(uw)
}
}()
}
ctx := getStreamContext()
defer putStreamContext(ctx)
for ctx.Read(r) {
if err := callback(ctx.Rows.Rows); err != nil {
return err
}
for ctx.Read(br) {
uw := getUnmarshalWork()
uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf...)
workCh <- uw
}
return ctx.Error()
}
@ -54,8 +90,6 @@ func (ctx *streamContext) Read(r io.Reader) bool {
}
return false
}
ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf))
rowsRead.Add(len(ctx.Rows.Rows))
return true
}
@ -66,7 +100,6 @@ var (
)
type streamContext struct {
Rows Rows
reqBuf []byte
tailBuf []byte
err error
@ -80,7 +113,6 @@ func (ctx *streamContext) Error() error {
}
func (ctx *streamContext) reset() {
ctx.Rows.Reset()
ctx.reqBuf = ctx.reqBuf[:0]
ctx.tailBuf = ctx.tailBuf[:0]
ctx.err = nil
@ -109,3 +141,28 @@ func putStreamContext(ctx *streamContext) {
var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1))
type unmarshalWork struct {
rows Rows
reqBuf []byte
}
func (uw *unmarshalWork) reset() {
uw.rows.Reset()
uw.reqBuf = uw.reqBuf[:0]
}
func getUnmarshalWork() *unmarshalWork {
v := unmarshalWorkPool.Get()
if v == nil {
return &unmarshalWork{}
}
return v.(*unmarshalWork)
}
func putUnmarshalWork(uw *unmarshalWork) {
uw.reset()
unmarshalWorkPool.Put(uw)
}
var unmarshalWorkPool sync.Pool