app/{vminsert,vmagent}: improve data ingestion speed over a single connection

Process data obtianed from a single connection on all the available CPU cores.
This commit is contained in:
Aliaksandr Valialkin 2020-09-28 04:11:55 +03:00
parent 7072db75cb
commit 8df33bd5c1
15 changed files with 646 additions and 329 deletions

View file

@ -29,6 +29,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -81,6 +82,7 @@ func main() {
logger.Infof("starting vmagent at %q...", *httpListenAddr) logger.Infof("starting vmagent at %q...", *httpListenAddr)
startTime := time.Now() startTime := time.Now()
remotewrite.Init() remotewrite.Init()
common.StartUnmarshalWorkers()
writeconcurrencylimiter.Init() writeconcurrencylimiter.Init()
if len(*influxListenAddr) > 0 { if len(*influxListenAddr) > 0 {
influxServer = influxserver.MustStart(*influxListenAddr, influx.InsertHandlerForReader) influxServer = influxserver.MustStart(*influxListenAddr, influx.InsertHandlerForReader)
@ -128,6 +130,7 @@ func main() {
if len(*opentsdbHTTPListenAddr) > 0 { if len(*opentsdbHTTPListenAddr) > 0 {
opentsdbhttpServer.MustStop() opentsdbhttpServer.MustStop()
} }
common.StopUnmarshalWorkers()
remotewrite.Stop() remotewrite.Stop()
logger.Infof("successfully stopped vmagent in %.3f seconds", time.Since(startTime).Seconds()) logger.Infof("successfully stopped vmagent in %.3f seconds", time.Since(startTime).Seconds())

View file

@ -33,6 +33,7 @@ import (
opentsdbhttpserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdbhttp" opentsdbhttpserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdbhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
@ -76,7 +77,7 @@ func main() {
relabel.Init() relabel.Init()
storage.SetMaxLabelsPerTimeseries(*maxLabelsPerTimeseries) storage.SetMaxLabelsPerTimeseries(*maxLabelsPerTimeseries)
common.StartUnmarshalWorkers()
writeconcurrencylimiter.Init() writeconcurrencylimiter.Init()
if len(*influxListenAddr) > 0 { if len(*influxListenAddr) > 0 {
influxServer = influxserver.MustStart(*influxListenAddr, func(r io.Reader) error { influxServer = influxserver.MustStart(*influxListenAddr, func(r io.Reader) error {
@ -126,6 +127,7 @@ func main() {
if len(*opentsdbHTTPListenAddr) > 0 { if len(*opentsdbHTTPListenAddr) > 0 {
opentsdbhttpServer.MustStop() opentsdbhttpServer.MustStop()
} }
common.StopUnmarshalWorkers()
logger.Infof("shutting down neststorage...") logger.Infof("shutting down neststorage...")
startTime = time.Now() startTime = time.Now()

View file

@ -0,0 +1,53 @@
package common
import (
"runtime"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// ScheduleUnmarshalWork schedules uw to run in the worker pool.
//
// It is expected that StartUnmarshalWorkers is already called.
func ScheduleUnmarshalWork(uw UnmarshalWork) {
unmarshalWorkCh <- uw
}
// UnmarshalWork is a unit of unmarshal work.
type UnmarshalWork interface {
// Unmarshal must implement CPU-bound unmarshal work.
Unmarshal()
}
// StartUnmarshalWorkers starts unmarshal workers.
func StartUnmarshalWorkers() {
if unmarshalWorkCh != nil {
logger.Panicf("BUG: it looks like startUnmarshalWorkers() has been alread called without stopUnmarshalWorkers()")
}
gomaxprocs := runtime.GOMAXPROCS(-1)
unmarshalWorkCh = make(chan UnmarshalWork, 2*gomaxprocs)
unmarshalWorkersWG.Add(gomaxprocs)
for i := 0; i < gomaxprocs; i++ {
go func() {
defer unmarshalWorkersWG.Done()
for uw := range unmarshalWorkCh {
uw.Unmarshal()
}
}()
}
}
// StopUnmarshalWorkers stops unmarshal workers.
//
// No more calles to ScheduleUnmarshalWork are allowed after callsing stopUnmarshalWorkers
func StopUnmarshalWorkers() {
close(unmarshalWorkCh)
unmarshalWorkersWG.Wait()
unmarshalWorkCh = nil
}
var (
unmarshalWorkCh chan UnmarshalWork
unmarshalWorkersWG sync.WaitGroup
)

View file

@ -11,6 +11,7 @@ import (
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -43,15 +44,17 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error {
} }
ctx := getStreamContext(r) ctx := getStreamContext(r)
defer putStreamContext(ctx) defer putStreamContext(ctx)
for ctx.Read(cds) { for ctx.Read() {
if err := callback(ctx.Rows.Rows); err != nil { uw := getUnmarshalWork()
return err uw.callback = callback
} uw.cds = cds
uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf...)
common.ScheduleUnmarshalWork(uw)
} }
return ctx.Error() return ctx.Error()
} }
func (ctx *streamContext) Read(cds []ColumnDescriptor) bool { func (ctx *streamContext) Read() bool {
readCalls.Inc() readCalls.Inc()
if ctx.err != nil { if ctx.err != nil {
return false return false
@ -64,28 +67,6 @@ func (ctx *streamContext) Read(cds []ColumnDescriptor) bool {
} }
return false return false
} }
ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf), cds)
rowsRead.Add(len(ctx.Rows.Rows))
rows := ctx.Rows.Rows
// Set missing timestamps
currentTs := time.Now().UnixNano() / 1e6
for i := range rows {
row := &rows[i]
if row.Timestamp == 0 {
row.Timestamp = currentTs
}
}
// Trim timestamps if required.
if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1 {
for i := range rows {
row := &rows[i]
row.Timestamp -= row.Timestamp % tsTrim
}
}
return true return true
} }
@ -96,7 +77,6 @@ var (
) )
type streamContext struct { type streamContext struct {
Rows Rows
br *bufio.Reader br *bufio.Reader
reqBuf []byte reqBuf []byte
tailBuf []byte tailBuf []byte
@ -111,7 +91,6 @@ func (ctx *streamContext) Error() error {
} }
func (ctx *streamContext) reset() { func (ctx *streamContext) reset() {
ctx.Rows.Reset()
ctx.br.Reset(nil) ctx.br.Reset(nil)
ctx.reqBuf = ctx.reqBuf[:0] ctx.reqBuf = ctx.reqBuf[:0]
ctx.tailBuf = ctx.tailBuf[:0] ctx.tailBuf = ctx.tailBuf[:0]
@ -146,3 +125,63 @@ func putStreamContext(ctx *streamContext) {
var streamContextPool sync.Pool var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1))
type unmarshalWork struct {
rows Rows
callback func(rows []Row) error
cds []ColumnDescriptor
reqBuf []byte
}
func (uw *unmarshalWork) reset() {
uw.rows.Reset()
uw.callback = nil
uw.cds = nil
uw.reqBuf = uw.reqBuf[:0]
}
// Unmarshal implements common.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf), uw.cds)
rows := uw.rows.Rows
rowsRead.Add(len(rows))
// Set missing timestamps
currentTs := time.Now().UnixNano() / 1e6
for i := range rows {
row := &rows[i]
if row.Timestamp == 0 {
row.Timestamp = currentTs
}
}
// Trim timestamps if required.
if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1 {
for i := range rows {
row := &rows[i]
row.Timestamp -= row.Timestamp % tsTrim
}
}
if err := uw.callback(rows); err != nil {
logger.Errorf("error when processing imported data: %s", err)
putUnmarshalWork(uw)
return
}
putUnmarshalWork(uw)
}
func getUnmarshalWork() *unmarshalWork {
v := unmarshalWorkPool.Get()
if v == nil {
return &unmarshalWork{}
}
return v.(*unmarshalWork)
}
func putUnmarshalWork(uw *unmarshalWork) {
uw.reset()
unmarshalWorkPool.Put(uw)
}
var unmarshalWorkPool sync.Pool

View file

@ -185,12 +185,25 @@ func Test_streamContext_Read(t *testing.T) {
f := func(s string, rowsExpected *Rows) { f := func(s string, rowsExpected *Rows) {
t.Helper() t.Helper()
ctx := getStreamContext(strings.NewReader(s)) ctx := getStreamContext(strings.NewReader(s))
ctx.Read() if !ctx.Read() {
if len(ctx.Rows.Rows) != len(rowsExpected.Rows) { t.Fatalf("expecting successful read")
t.Fatalf("different len of expected rows;\ngot\n%+v;\nwant\n%+v", ctx.Rows, rowsExpected.Rows)
} }
if !reflect.DeepEqual(ctx.Rows.Rows, rowsExpected.Rows) { uw := getUnmarshalWork()
t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", ctx.Rows.Rows, rowsExpected.Rows) callbackCalls := 0
uw.callback = func(rows []Row) error {
callbackCalls++
if len(rows) != len(rowsExpected.Rows) {
t.Fatalf("different len of expected rows;\ngot\n%+v;\nwant\n%+v", rows, rowsExpected.Rows)
}
if !reflect.DeepEqual(rows, rowsExpected.Rows) {
t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows, rowsExpected.Rows)
}
return nil
}
uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf...)
uw.Unmarshal()
if callbackCalls != 1 {
t.Fatalf("unexpected number of callback calls; got %d; want 1", callbackCalls)
} }
} }

View file

@ -11,6 +11,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -30,9 +31,10 @@ func ParseStream(r io.Reader, callback func(rows []Row) error) error {
defer putStreamContext(ctx) defer putStreamContext(ctx)
for ctx.Read() { for ctx.Read() {
if err := callback(ctx.Rows.Rows); err != nil { uw := getUnmarshalWork()
return err uw.callback = callback
} uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf...)
common.ScheduleUnmarshalWork(uw)
} }
return ctx.Error() return ctx.Error()
} }
@ -50,38 +52,10 @@ func (ctx *streamContext) Read() bool {
} }
return false return false
} }
ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf))
rowsRead.Add(len(ctx.Rows.Rows))
rows := ctx.Rows.Rows
// Fill missing timestamps with the current timestamp rounded to seconds.
currentTimestamp := int64(fasttime.UnixTimestamp())
for i := range rows {
r := &rows[i]
if r.Timestamp == 0 || r.Timestamp == -1 {
r.Timestamp = currentTimestamp
}
}
// Convert timestamps from seconds to milliseconds.
for i := range rows {
rows[i].Timestamp *= 1e3
}
// Trim timestamps if required.
if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1000 {
for i := range rows {
row := &rows[i]
row.Timestamp -= row.Timestamp % tsTrim
}
}
return true return true
} }
type streamContext struct { type streamContext struct {
Rows Rows
br *bufio.Reader br *bufio.Reader
reqBuf []byte reqBuf []byte
tailBuf []byte tailBuf []byte
@ -96,7 +70,6 @@ func (ctx *streamContext) Error() error {
} }
func (ctx *streamContext) reset() { func (ctx *streamContext) reset() {
ctx.Rows.Reset()
ctx.br.Reset(nil) ctx.br.Reset(nil)
ctx.reqBuf = ctx.reqBuf[:0] ctx.reqBuf = ctx.reqBuf[:0]
ctx.tailBuf = ctx.tailBuf[:0] ctx.tailBuf = ctx.tailBuf[:0]
@ -137,3 +110,66 @@ func putStreamContext(ctx *streamContext) {
var streamContextPool sync.Pool var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1))
type unmarshalWork struct {
rows Rows
callback func(rows []Row) error
reqBuf []byte
}
func (uw *unmarshalWork) reset() {
uw.rows.Reset()
uw.callback = nil
uw.reqBuf = uw.reqBuf[:0]
}
// Unmarshal implements common.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf))
rows := uw.rows.Rows
rowsRead.Add(len(rows))
// Fill missing timestamps with the current timestamp rounded to seconds.
currentTimestamp := int64(fasttime.UnixTimestamp())
for i := range rows {
r := &rows[i]
if r.Timestamp == 0 || r.Timestamp == -1 {
r.Timestamp = currentTimestamp
}
}
// Convert timestamps from seconds to milliseconds.
for i := range rows {
rows[i].Timestamp *= 1e3
}
// Trim timestamps if required.
if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1000 {
for i := range rows {
row := &rows[i]
row.Timestamp -= row.Timestamp % tsTrim
}
}
if err := uw.callback(rows); err != nil {
logger.Errorf("error when processing imported data: %s", err)
putUnmarshalWork(uw)
return
}
putUnmarshalWork(uw)
}
func getUnmarshalWork() *unmarshalWork {
v := unmarshalWorkPool.Get()
if v == nil {
return &unmarshalWork{}
}
return v.(*unmarshalWork)
}
func putUnmarshalWork(uw *unmarshalWork) {
uw.reset()
unmarshalWorkPool.Put(uw)
}
var unmarshalWorkPool sync.Pool

View file

@ -10,6 +10,7 @@ import (
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -53,15 +54,18 @@ func ParseStream(r io.Reader, isGzipped bool, precision, db string, callback fun
ctx := getStreamContext(r) ctx := getStreamContext(r)
defer putStreamContext(ctx) defer putStreamContext(ctx)
for ctx.Read(tsMultiplier) { for ctx.Read() {
if err := callback(db, ctx.Rows.Rows); err != nil { uw := getUnmarshalWork()
return err uw.callback = callback
} uw.db = db
uw.tsMultiplier = tsMultiplier
uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf...)
common.ScheduleUnmarshalWork(uw)
} }
return ctx.Error() return ctx.Error()
} }
func (ctx *streamContext) Read(tsMultiplier int64) bool { func (ctx *streamContext) Read() bool {
readCalls.Inc() readCalls.Inc()
if ctx.err != nil { if ctx.err != nil {
return false return false
@ -74,43 +78,6 @@ func (ctx *streamContext) Read(tsMultiplier int64) bool {
} }
return false return false
} }
ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf))
rowsRead.Add(len(ctx.Rows.Rows))
rows := ctx.Rows.Rows
// Adjust timestamps according to tsMultiplier
currentTs := time.Now().UnixNano() / 1e6
if tsMultiplier >= 1 {
for i := range rows {
row := &rows[i]
if row.Timestamp == 0 {
row.Timestamp = currentTs
} else {
row.Timestamp /= tsMultiplier
}
}
} else if tsMultiplier < 0 {
tsMultiplier = -tsMultiplier
currentTs -= currentTs % tsMultiplier
for i := range rows {
row := &rows[i]
if row.Timestamp == 0 {
row.Timestamp = currentTs
} else {
row.Timestamp *= tsMultiplier
}
}
}
// Trim timestamps if required.
if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1 {
for i := range rows {
row := &rows[i]
row.Timestamp -= row.Timestamp % tsTrim
}
}
return true return true
} }
@ -121,7 +88,6 @@ var (
) )
type streamContext struct { type streamContext struct {
Rows Rows
br *bufio.Reader br *bufio.Reader
reqBuf []byte reqBuf []byte
tailBuf []byte tailBuf []byte
@ -136,7 +102,6 @@ func (ctx *streamContext) Error() error {
} }
func (ctx *streamContext) reset() { func (ctx *streamContext) reset() {
ctx.Rows.Reset()
ctx.br.Reset(nil) ctx.br.Reset(nil)
ctx.reqBuf = ctx.reqBuf[:0] ctx.reqBuf = ctx.reqBuf[:0]
ctx.tailBuf = ctx.tailBuf[:0] ctx.tailBuf = ctx.tailBuf[:0]
@ -171,3 +136,81 @@ func putStreamContext(ctx *streamContext) {
var streamContextPool sync.Pool var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1))
type unmarshalWork struct {
rows Rows
callback func(db string, rows []Row) error
db string
tsMultiplier int64
reqBuf []byte
}
func (uw *unmarshalWork) reset() {
uw.rows.Reset()
uw.callback = nil
uw.db = ""
uw.tsMultiplier = 0
uw.reqBuf = uw.reqBuf[:0]
}
// Unmarshal implements common.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf))
rows := uw.rows.Rows
rowsRead.Add(len(rows))
// Adjust timestamps according to uw.tsMultiplier
currentTs := time.Now().UnixNano() / 1e6
tsMultiplier := uw.tsMultiplier
if tsMultiplier >= 1 {
for i := range rows {
row := &rows[i]
if row.Timestamp == 0 {
row.Timestamp = currentTs
} else {
row.Timestamp /= tsMultiplier
}
}
} else if tsMultiplier < 0 {
tsMultiplier = -tsMultiplier
currentTs -= currentTs % tsMultiplier
for i := range rows {
row := &rows[i]
if row.Timestamp == 0 {
row.Timestamp = currentTs
} else {
row.Timestamp *= tsMultiplier
}
}
}
// Trim timestamps if required.
if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1 {
for i := range rows {
row := &rows[i]
row.Timestamp -= row.Timestamp % tsTrim
}
}
if err := uw.callback(uw.db, rows); err != nil {
logger.Errorf("error when processing imported data: %s", err)
putUnmarshalWork(uw)
return
}
putUnmarshalWork(uw)
}
func getUnmarshalWork() *unmarshalWork {
v := unmarshalWorkPool.Get()
if v == nil {
return &unmarshalWork{}
}
return v.(*unmarshalWork)
}
func putUnmarshalWork(uw *unmarshalWork) {
uw.reset()
unmarshalWorkPool.Put(uw)
}
var unmarshalWorkPool sync.Pool

View file

@ -5,7 +5,6 @@ import (
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"runtime"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@ -45,46 +44,18 @@ func ParseStream(req *http.Request, callback func(block *Block) error) error {
tr.MinTimestamp = encoding.UnmarshalInt64(trBuf) tr.MinTimestamp = encoding.UnmarshalInt64(trBuf)
tr.MaxTimestamp = encoding.UnmarshalInt64(trBuf[8:]) tr.MaxTimestamp = encoding.UnmarshalInt64(trBuf[8:])
// Start GOMAXPROC workers in order to process ingested data in parallel.
gomaxprocs := runtime.GOMAXPROCS(-1)
workCh := make(chan *unmarshalWork, 8*gomaxprocs)
var wg sync.WaitGroup
defer func() {
close(workCh)
wg.Wait()
}()
wg.Add(gomaxprocs)
for i := 0; i < gomaxprocs; i++ {
go func() {
defer wg.Done()
var tmpBlock storage.Block
for uw := range workCh {
if err := uw.unmarshal(&tmpBlock, tr); err != nil {
parseErrors.Inc()
logger.Errorf("error when unmarshaling native block: %s", err)
putUnmarshalWork(uw)
continue
}
if err := callback(&uw.block); err != nil {
processErrors.Inc()
logger.Errorf("error when processing native block: %s", err)
putUnmarshalWork(uw)
continue
}
putUnmarshalWork(uw)
}
}()
}
// Read native blocks and feed workers with work. // Read native blocks and feed workers with work.
sizeBuf := make([]byte, 4) sizeBuf := make([]byte, 4)
for { for {
uw := getUnmarshalWork() uw := getUnmarshalWork()
uw.tr = tr
uw.callback = callback
// Read uw.metricNameBuf // Read uw.metricNameBuf
if _, err := io.ReadFull(br, sizeBuf); err != nil { if _, err := io.ReadFull(br, sizeBuf); err != nil {
if err == io.EOF { if err == io.EOF {
// End of stream // End of stream
putUnmarshalWork(uw)
return nil return nil
} }
readErrors.Inc() readErrors.Inc()
@ -122,8 +93,7 @@ func ParseStream(req *http.Request, callback func(block *Block) error) error {
readCalls.Inc() readCalls.Inc()
blocksRead.Inc() blocksRead.Inc()
// Feed workers with work. common.ScheduleUnmarshalWork(uw)
workCh <- uw
} }
} }
@ -152,22 +122,43 @@ var (
type unmarshalWork struct { type unmarshalWork struct {
tr storage.TimeRange tr storage.TimeRange
callback func(block *Block) error
metricNameBuf []byte metricNameBuf []byte
blockBuf []byte blockBuf []byte
block Block block Block
} }
func (uw *unmarshalWork) reset() { func (uw *unmarshalWork) reset() {
uw.callback = nil
uw.metricNameBuf = uw.metricNameBuf[:0] uw.metricNameBuf = uw.metricNameBuf[:0]
uw.blockBuf = uw.blockBuf[:0] uw.blockBuf = uw.blockBuf[:0]
uw.block.reset() uw.block.reset()
} }
func (uw *unmarshalWork) unmarshal(tmpBlock *storage.Block, tr storage.TimeRange) error { // Unmarshal implements common.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
if err := uw.unmarshal(); err != nil {
parseErrors.Inc()
logger.Errorf("error when unmarshaling native block: %s", err)
putUnmarshalWork(uw)
return
}
if err := uw.callback(&uw.block); err != nil {
processErrors.Inc()
logger.Errorf("error when processing native block: %s", err)
putUnmarshalWork(uw)
return
}
putUnmarshalWork(uw)
}
func (uw *unmarshalWork) unmarshal() error {
block := &uw.block block := &uw.block
if err := block.MetricName.UnmarshalNoAccountIDProjectID(uw.metricNameBuf); err != nil { if err := block.MetricName.UnmarshalNoAccountIDProjectID(uw.metricNameBuf); err != nil {
return fmt.Errorf("cannot unmarshal metricName from %d bytes: %w", len(uw.metricNameBuf), err) return fmt.Errorf("cannot unmarshal metricName from %d bytes: %w", len(uw.metricNameBuf), err)
} }
tmpBlock := blockPool.Get().(*storage.Block)
defer blockPool.Put(tmpBlock)
tail, err := tmpBlock.UnmarshalPortable(uw.blockBuf) tail, err := tmpBlock.UnmarshalPortable(uw.blockBuf)
if err != nil { if err != nil {
return fmt.Errorf("cannot unmarshal native block from %d bytes: %w", len(uw.blockBuf), err) return fmt.Errorf("cannot unmarshal native block from %d bytes: %w", len(uw.blockBuf), err)
@ -175,11 +166,17 @@ func (uw *unmarshalWork) unmarshal(tmpBlock *storage.Block, tr storage.TimeRange
if len(tail) > 0 { if len(tail) > 0 {
return fmt.Errorf("unexpected non-empty tail left after unmarshaling native block from %d bytes; len(tail)=%d bytes", len(uw.blockBuf), len(tail)) return fmt.Errorf("unexpected non-empty tail left after unmarshaling native block from %d bytes; len(tail)=%d bytes", len(uw.blockBuf), len(tail))
} }
block.Timestamps, block.Values = tmpBlock.AppendRowsWithTimeRangeFilter(block.Timestamps[:0], block.Values[:0], tr) block.Timestamps, block.Values = tmpBlock.AppendRowsWithTimeRangeFilter(block.Timestamps[:0], block.Values[:0], uw.tr)
rowsRead.Add(len(block.Timestamps)) rowsRead.Add(len(block.Timestamps))
return nil return nil
} }
var blockPool = &sync.Pool{
New: func() interface{} {
return &storage.Block{}
},
}
func getUnmarshalWork() *unmarshalWork { func getUnmarshalWork() *unmarshalWork {
v := unmarshalWorkPool.Get() v := unmarshalWorkPool.Get()
if v == nil { if v == nil {

View file

@ -11,6 +11,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -29,9 +30,10 @@ func ParseStream(r io.Reader, callback func(rows []Row) error) error {
ctx := getStreamContext(r) ctx := getStreamContext(r)
defer putStreamContext(ctx) defer putStreamContext(ctx)
for ctx.Read() { for ctx.Read() {
if err := callback(ctx.Rows.Rows); err != nil { uw := getUnmarshalWork()
return err uw.callback = callback
} uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf...)
common.ScheduleUnmarshalWork(uw)
} }
return ctx.Error() return ctx.Error()
} }
@ -49,38 +51,10 @@ func (ctx *streamContext) Read() bool {
} }
return false return false
} }
ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf))
rowsRead.Add(len(ctx.Rows.Rows))
rows := ctx.Rows.Rows
// Fill in missing timestamps
currentTimestamp := int64(fasttime.UnixTimestamp())
for i := range rows {
r := &rows[i]
if r.Timestamp == 0 {
r.Timestamp = currentTimestamp
}
}
// Convert timestamps from seconds to milliseconds
for i := range rows {
rows[i].Timestamp *= 1e3
}
// Trim timestamps if required.
if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1000 {
for i := range rows {
row := &rows[i]
row.Timestamp -= row.Timestamp % tsTrim
}
}
return true return true
} }
type streamContext struct { type streamContext struct {
Rows Rows
br *bufio.Reader br *bufio.Reader
reqBuf []byte reqBuf []byte
tailBuf []byte tailBuf []byte
@ -95,7 +69,6 @@ func (ctx *streamContext) Error() error {
} }
func (ctx *streamContext) reset() { func (ctx *streamContext) reset() {
ctx.Rows.Reset()
ctx.br.Reset(nil) ctx.br.Reset(nil)
ctx.reqBuf = ctx.reqBuf[:0] ctx.reqBuf = ctx.reqBuf[:0]
ctx.tailBuf = ctx.tailBuf[:0] ctx.tailBuf = ctx.tailBuf[:0]
@ -136,3 +109,66 @@ func putStreamContext(ctx *streamContext) {
var streamContextPool sync.Pool var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1))
type unmarshalWork struct {
rows Rows
callback func(rows []Row) error
reqBuf []byte
}
func (uw *unmarshalWork) reset() {
uw.rows.Reset()
uw.callback = nil
uw.reqBuf = uw.reqBuf[:0]
}
// Unmarshal implements common.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf))
rows := uw.rows.Rows
rowsRead.Add(len(rows))
// Fill in missing timestamps
currentTimestamp := int64(fasttime.UnixTimestamp())
for i := range rows {
r := &rows[i]
if r.Timestamp == 0 {
r.Timestamp = currentTimestamp
}
}
// Convert timestamps from seconds to milliseconds
for i := range rows {
rows[i].Timestamp *= 1e3
}
// Trim timestamps if required.
if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1000 {
for i := range rows {
row := &rows[i]
row.Timestamp -= row.Timestamp % tsTrim
}
}
if err := uw.callback(rows); err != nil {
logger.Errorf("error when processing imported data: %s", err)
putUnmarshalWork(uw)
return
}
putUnmarshalWork(uw)
}
func getUnmarshalWork() *unmarshalWork {
v := unmarshalWorkPool.Get()
if v == nil {
return &unmarshalWork{}
}
return v.(*unmarshalWork)
}
func putUnmarshalWork(uw *unmarshalWork) {
uw.reset()
unmarshalWorkPool.Put(uw)
}
var unmarshalWorkPool sync.Pool

View file

@ -4,17 +4,17 @@ import (
"github.com/valyala/fastjson" "github.com/valyala/fastjson"
) )
// GetParser returns JSON parser. // getJSONParser returns JSON parser.
// //
// The parser must be returned to the pool via PutParser when no longer needed. // The parser must be returned to the pool via putJSONParser when no longer needed.
func GetParser() *fastjson.Parser { func getJSONParser() *fastjson.Parser {
return parserPool.Get() return parserPool.Get()
} }
// PutParser returns p to the pool. // putJSONParser returns p to the pool.
// //
// p cannot be used after returning to the pool. // p cannot be used after returning to the pool.
func PutParser(p *fastjson.Parser) { func putJSONParser(p *fastjson.Parser) {
parserPool.Put(p) parserPool.Put(p)
} }

View file

@ -9,8 +9,8 @@ func TestRowsUnmarshalFailure(t *testing.T) {
f := func(s string) { f := func(s string) {
t.Helper() t.Helper()
var rows Rows var rows Rows
p := GetParser() p := getJSONParser()
defer PutParser(p) defer putJSONParser(p)
v, err := p.Parse(s) v, err := p.Parse(s)
if err != nil { if err != nil {
// Expected JSON parser error // Expected JSON parser error
@ -84,8 +84,8 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
t.Helper() t.Helper()
var rows Rows var rows Rows
p := GetParser() p := getJSONParser()
defer PutParser(p) defer putJSONParser(p)
v, err := p.Parse(s) v, err := p.Parse(s)
if err != nil { if err != nil {
t.Fatalf("cannot parse json %s: %s", s, err) t.Fatalf("cannot parse json %s: %s", s, err)

View file

@ -13,6 +13,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -56,59 +57,21 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error {
return fmt.Errorf("too big HTTP OpenTSDB request; mustn't exceed `-opentsdbhttp.maxInsertRequestSize=%d` bytes", maxInsertRequestSize.N) return fmt.Errorf("too big HTTP OpenTSDB request; mustn't exceed `-opentsdbhttp.maxInsertRequestSize=%d` bytes", maxInsertRequestSize.N)
} }
// Unmarshal the request to ctx.Rows uw := getUnmarshalWork()
p := GetParser() uw.callback = callback
defer PutParser(p) uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf.B...)
v, err := p.ParseBytes(ctx.reqBuf.B) common.ScheduleUnmarshalWork(uw)
if err != nil { return nil
unmarshalErrors.Inc()
return fmt.Errorf("cannot parse HTTP OpenTSDB json: %w", err)
}
ctx.Rows.Unmarshal(v)
rowsRead.Add(len(ctx.Rows.Rows))
rows := ctx.Rows.Rows
// Fill in missing timestamps
currentTimestamp := int64(fasttime.UnixTimestamp())
for i := range rows {
r := &rows[i]
if r.Timestamp == 0 {
r.Timestamp = currentTimestamp
}
}
// Convert timestamps in seconds to milliseconds if needed.
// See http://opentsdb.net/docs/javadoc/net/opentsdb/core/Const.html#SECOND_MASK
for i := range rows {
r := &rows[i]
if r.Timestamp&secondMask == 0 {
r.Timestamp *= 1e3
}
}
// Trim timestamps if required.
if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1 {
for i := range rows {
row := &rows[i]
row.Timestamp -= row.Timestamp % tsTrim
}
}
// Insert ctx.Rows to db.
return callback(rows)
} }
const secondMask int64 = 0x7FFFFFFF00000000 const secondMask int64 = 0x7FFFFFFF00000000
type streamContext struct { type streamContext struct {
Rows Rows
br *bufio.Reader br *bufio.Reader
reqBuf bytesutil.ByteBuffer reqBuf bytesutil.ByteBuffer
} }
func (ctx *streamContext) reset() { func (ctx *streamContext) reset() {
ctx.Rows.Reset()
ctx.br.Reset(nil) ctx.br.Reset(nil)
ctx.reqBuf.Reset() ctx.reqBuf.Reset()
} }
@ -148,3 +111,78 @@ func putStreamContext(ctx *streamContext) {
var streamContextPool sync.Pool var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1))
type unmarshalWork struct {
rows Rows
callback func(rows []Row) error
reqBuf []byte
}
func (uw *unmarshalWork) reset() {
uw.rows.Reset()
uw.callback = nil
uw.reqBuf = uw.reqBuf[:0]
}
// Unmarshal implements common.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
p := getJSONParser()
defer putJSONParser(p)
v, err := p.ParseBytes(uw.reqBuf)
if err != nil {
unmarshalErrors.Inc()
logger.Errorf("cannot parse HTTP OpenTSDB json: %s", err)
return
}
uw.rows.Unmarshal(v)
rows := uw.rows.Rows
rowsRead.Add(len(rows))
// Fill in missing timestamps
currentTimestamp := int64(fasttime.UnixTimestamp())
for i := range rows {
r := &rows[i]
if r.Timestamp == 0 {
r.Timestamp = currentTimestamp
}
}
// Convert timestamps in seconds to milliseconds if needed.
// See http://opentsdb.net/docs/javadoc/net/opentsdb/core/Const.html#SECOND_MASK
for i := range rows {
r := &rows[i]
if r.Timestamp&secondMask == 0 {
r.Timestamp *= 1e3
}
}
// Trim timestamps if required.
if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1 {
for i := range rows {
row := &rows[i]
row.Timestamp -= row.Timestamp % tsTrim
}
}
if err := uw.callback(rows); err != nil {
logger.Errorf("error when processing imported data: %s", err)
putUnmarshalWork(uw)
return
}
putUnmarshalWork(uw)
}
func getUnmarshalWork() *unmarshalWork {
v := unmarshalWorkPool.Get()
if v == nil {
return &unmarshalWork{}
}
return v.(*unmarshalWork)
}
func putUnmarshalWork(uw *unmarshalWork) {
uw.reset()
unmarshalWorkPool.Put(uw)
}
var unmarshalWorkPool sync.Pool

View file

@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -29,15 +30,17 @@ func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback f
} }
ctx := getStreamContext(r) ctx := getStreamContext(r)
defer putStreamContext(ctx) defer putStreamContext(ctx)
for ctx.Read(defaultTimestamp) { for ctx.Read() {
if err := callback(ctx.Rows.Rows); err != nil { uw := getUnmarshalWork()
return err uw.callback = callback
} uw.defaultTimestamp = defaultTimestamp
uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf...)
common.ScheduleUnmarshalWork(uw)
} }
return ctx.Error() return ctx.Error()
} }
func (ctx *streamContext) Read(defaultTimestamp int64) bool { func (ctx *streamContext) Read() bool {
readCalls.Inc() readCalls.Inc()
if ctx.err != nil { if ctx.err != nil {
return false return false
@ -50,26 +53,10 @@ func (ctx *streamContext) Read(defaultTimestamp int64) bool {
} }
return false return false
} }
ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf))
rowsRead.Add(len(ctx.Rows.Rows))
rows := ctx.Rows.Rows
// Fill missing timestamps with the current timestamp.
if defaultTimestamp <= 0 {
defaultTimestamp = int64(time.Now().UnixNano() / 1e6)
}
for i := range rows {
r := &rows[i]
if r.Timestamp == 0 {
r.Timestamp = defaultTimestamp
}
}
return true return true
} }
type streamContext struct { type streamContext struct {
Rows Rows
br *bufio.Reader br *bufio.Reader
reqBuf []byte reqBuf []byte
tailBuf []byte tailBuf []byte
@ -84,7 +71,6 @@ func (ctx *streamContext) Error() error {
} }
func (ctx *streamContext) reset() { func (ctx *streamContext) reset() {
ctx.Rows.Reset()
ctx.br.Reset(nil) ctx.br.Reset(nil)
ctx.reqBuf = ctx.reqBuf[:0] ctx.reqBuf = ctx.reqBuf[:0]
ctx.tailBuf = ctx.tailBuf[:0] ctx.tailBuf = ctx.tailBuf[:0]
@ -125,3 +111,58 @@ func putStreamContext(ctx *streamContext) {
var streamContextPool sync.Pool var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1))
type unmarshalWork struct {
rows Rows
callback func(rows []Row) error
defaultTimestamp int64
reqBuf []byte
}
func (uw *unmarshalWork) reset() {
uw.rows.Reset()
uw.callback = nil
uw.defaultTimestamp = 0
uw.reqBuf = uw.reqBuf[:0]
}
// Unmarshal implements common.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf))
rows := uw.rows.Rows
rowsRead.Add(len(rows))
// Fill missing timestamps with the current timestamp.
defaultTimestamp := uw.defaultTimestamp
if defaultTimestamp <= 0 {
defaultTimestamp = int64(time.Now().UnixNano() / 1e6)
}
for i := range rows {
r := &rows[i]
if r.Timestamp == 0 {
r.Timestamp = defaultTimestamp
}
}
if err := uw.callback(rows); err != nil {
logger.Errorf("error when processing imported data: %s", err)
putUnmarshalWork(uw)
return
}
putUnmarshalWork(uw)
}
func getUnmarshalWork() *unmarshalWork {
v := unmarshalWorkPool.Get()
if v == nil {
return &unmarshalWork{}
}
return v.(*unmarshalWork)
}
func putUnmarshalWork(uw *unmarshalWork) {
uw.reset()
unmarshalWorkPool.Put(uw)
}
var unmarshalWorkPool sync.Pool

View file

@ -10,7 +10,9 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
"github.com/golang/snappy" "github.com/golang/snappy"
) )
@ -19,49 +21,42 @@ var maxInsertRequestSize = flagutil.NewBytes("maxInsertRequestSize", 32*1024*102
// ParseStream parses Prometheus remote_write message req and calls callback for the parsed timeseries. // ParseStream parses Prometheus remote_write message req and calls callback for the parsed timeseries.
// //
// callback shouldn't hold timeseries after returning. // callback shouldn't hold tss after returning.
func ParseStream(req *http.Request, callback func(timeseries []prompb.TimeSeries) error) error { func ParseStream(req *http.Request, callback func(tss []prompb.TimeSeries) error) error {
ctx := getPushCtx(req.Body) ctx := getPushCtx(req.Body)
defer putPushCtx(ctx) defer putPushCtx(ctx)
if err := ctx.Read(); err != nil { if err := ctx.Read(); err != nil {
return err return err
} }
return callback(ctx.wr.Timeseries) uw := getUnmarshalWork()
uw.callback = callback
uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf.B...)
common.ScheduleUnmarshalWork(uw)
return nil
} }
type pushCtx struct { type pushCtx struct {
wr prompb.WriteRequest
br *bufio.Reader br *bufio.Reader
reqBuf []byte reqBuf bytesutil.ByteBuffer
} }
func (ctx *pushCtx) reset() { func (ctx *pushCtx) reset() {
ctx.wr.Reset()
ctx.br.Reset(nil) ctx.br.Reset(nil)
ctx.reqBuf = ctx.reqBuf[:0] ctx.reqBuf.Reset()
} }
func (ctx *pushCtx) Read() error { func (ctx *pushCtx) Read() error {
readCalls.Inc() readCalls.Inc()
var err error lr := io.LimitReader(ctx.br, int64(maxInsertRequestSize.N)+1)
reqLen, err := ctx.reqBuf.ReadFrom(lr)
ctx.reqBuf, err = readSnappy(ctx.reqBuf[:0], ctx.br)
if err != nil { if err != nil {
readErrors.Inc() readErrors.Inc()
return fmt.Errorf("cannot read prompb.WriteRequest: %w", err) return fmt.Errorf("cannot read compressed request: %w", err)
} }
if err = ctx.wr.Unmarshal(ctx.reqBuf); err != nil { if reqLen > int64(maxInsertRequestSize.N) {
unmarshalErrors.Inc() readErrors.Inc()
return fmt.Errorf("cannot unmarshal prompb.WriteRequest with size %d bytes: %w", len(ctx.reqBuf), err) return fmt.Errorf("too big packed request; mustn't exceed `-maxInsertRequestSize=%d` bytes", maxInsertRequestSize.N)
} }
rows := 0
tss := ctx.wr.Timeseries
for i := range tss {
rows += len(tss[i].Samples)
}
rowsRead.Add(rows)
return nil return nil
} }
@ -101,34 +96,66 @@ func putPushCtx(ctx *pushCtx) {
var pushCtxPool sync.Pool var pushCtxPool sync.Pool
var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1)) var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1))
func readSnappy(dst []byte, r io.Reader) ([]byte, error) { type unmarshalWork struct {
lr := io.LimitReader(r, int64(maxInsertRequestSize.N)+1) wr prompb.WriteRequest
callback func(tss []prompb.TimeSeries) error
reqBuf []byte
}
func (uw *unmarshalWork) reset() {
uw.wr.Reset()
uw.callback = nil
uw.reqBuf = uw.reqBuf[:0]
}
// Unmarshal implements common.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
bb := bodyBufferPool.Get() bb := bodyBufferPool.Get()
reqLen, err := bb.ReadFrom(lr) defer bodyBufferPool.Put(bb)
var err error
bb.B, err = snappy.Decode(bb.B[:cap(bb.B)], uw.reqBuf)
if err != nil { if err != nil {
bodyBufferPool.Put(bb) logger.Errorf("cannot decompress request with length %d: %s", len(uw.reqBuf), err)
return dst, fmt.Errorf("cannot read compressed request: %w", err) return
} }
if reqLen > int64(maxInsertRequestSize.N) { if len(bb.B) > maxInsertRequestSize.N {
return dst, fmt.Errorf("too big packed request; mustn't exceed `-maxInsertRequestSize=%d` bytes", maxInsertRequestSize.N) logger.Errorf("too big unpacked request; mustn't exceed `-maxInsertRequestSize=%d` bytes; got %d bytes", maxInsertRequestSize.N, len(bb.B))
return
}
if err := uw.wr.Unmarshal(bb.B); err != nil {
unmarshalErrors.Inc()
logger.Errorf("cannot unmarshal prompb.WriteRequest with size %d bytes: %s", len(bb.B), err)
return
} }
buf := dst[len(dst):cap(dst)] rows := 0
buf, err = snappy.Decode(buf, bb.B) tss := uw.wr.Timeseries
bodyBufferPool.Put(bb) for i := range tss {
if err != nil { rows += len(tss[i].Samples)
err = fmt.Errorf("cannot decompress request with length %d: %w", reqLen, err)
return dst, err
} }
if len(buf) > maxInsertRequestSize.N { rowsRead.Add(rows)
return dst, fmt.Errorf("too big unpacked request; mustn't exceed `-maxInsertRequestSize=%d` bytes; got %d bytes", maxInsertRequestSize.N, len(buf))
if err := uw.callback(tss); err != nil {
logger.Errorf("error when processing imported data: %s", err)
putUnmarshalWork(uw)
return
} }
if len(buf) > 0 && len(dst) < cap(dst) && &buf[0] == &dst[len(dst):cap(dst)][0] { putUnmarshalWork(uw)
dst = dst[:len(dst)+len(buf)]
} else {
dst = append(dst, buf...)
}
return dst, nil
} }
var bodyBufferPool bytesutil.ByteBufferPool var bodyBufferPool bytesutil.ByteBufferPool
func getUnmarshalWork() *unmarshalWork {
v := unmarshalWorkPool.Get()
if v == nil {
return &unmarshalWork{}
}
return v.(*unmarshalWork)
}
func putUnmarshalWork(uw *unmarshalWork) {
uw.reset()
unmarshalWorkPool.Put(uw)
}
var unmarshalWorkPool sync.Pool

View file

@ -34,42 +34,13 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error {
defer common.PutGzipReader(zr) defer common.PutGzipReader(zr)
r = zr r = zr
} }
// Start gomaxprocs workers for processing the parsed data in parallel.
gomaxprocs := runtime.GOMAXPROCS(-1)
workCh := make(chan *unmarshalWork, 8*gomaxprocs)
var wg sync.WaitGroup
defer func() {
close(workCh)
wg.Wait()
}()
wg.Add(gomaxprocs)
for i := 0; i < gomaxprocs; i++ {
go func() {
defer wg.Done()
for uw := range workCh {
uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf))
rows := uw.rows.Rows
for i := range rows {
row := &rows[i]
rowsRead.Add(len(row.Timestamps))
}
if err := callback(rows); err != nil {
logger.Errorf("error when processing imported data: %s", err)
putUnmarshalWork(uw)
continue
}
putUnmarshalWork(uw)
}
}()
}
ctx := getStreamContext(r) ctx := getStreamContext(r)
defer putStreamContext(ctx) defer putStreamContext(ctx)
for ctx.Read() { for ctx.Read() {
uw := getUnmarshalWork() uw := getUnmarshalWork()
uw.callback = callback
uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf...) uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf...)
workCh <- uw common.ScheduleUnmarshalWork(uw)
} }
return ctx.Error() return ctx.Error()
} }
@ -148,14 +119,32 @@ var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1))
type unmarshalWork struct { type unmarshalWork struct {
rows Rows rows Rows
callback func(rows []Row) error
reqBuf []byte reqBuf []byte
} }
func (uw *unmarshalWork) reset() { func (uw *unmarshalWork) reset() {
uw.rows.Reset() uw.rows.Reset()
uw.callback = nil
uw.reqBuf = uw.reqBuf[:0] uw.reqBuf = uw.reqBuf[:0]
} }
// Unmarshal implements common.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf))
rows := uw.rows.Rows
for i := range rows {
row := &rows[i]
rowsRead.Add(len(row.Timestamps))
}
if err := uw.callback(rows); err != nil {
logger.Errorf("error when processing imported data: %s", err)
putUnmarshalWork(uw)
return
}
putUnmarshalWork(uw)
}
func getUnmarshalWork() *unmarshalWork { func getUnmarshalWork() *unmarshalWork {
v := unmarshalWorkPool.Get() v := unmarshalWorkPool.Get()
if v == nil { if v == nil {