diff --git a/app/vmagent/main.go b/app/vmagent/main.go index 473a5fc89..291eea7f1 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -29,6 +29,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -81,6 +82,7 @@ func main() { logger.Infof("starting vmagent at %q...", *httpListenAddr) startTime := time.Now() remotewrite.Init() + common.StartUnmarshalWorkers() writeconcurrencylimiter.Init() if len(*influxListenAddr) > 0 { influxServer = influxserver.MustStart(*influxListenAddr, influx.InsertHandlerForReader) @@ -128,6 +130,7 @@ func main() { if len(*opentsdbHTTPListenAddr) > 0 { opentsdbhttpServer.MustStop() } + common.StopUnmarshalWorkers() remotewrite.Stop() logger.Infof("successfully stopped vmagent in %.3f seconds", time.Since(startTime).Seconds()) diff --git a/app/vminsert/main.go b/app/vminsert/main.go index 6f9e2246b..8c831588a 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -33,6 +33,7 @@ import ( opentsdbhttpserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdbhttp" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" @@ -76,7 +77,7 @@ func main() { relabel.Init() storage.SetMaxLabelsPerTimeseries(*maxLabelsPerTimeseries) - + common.StartUnmarshalWorkers() writeconcurrencylimiter.Init() if len(*influxListenAddr) > 0 { influxServer = influxserver.MustStart(*influxListenAddr, func(r io.Reader) error { @@ -126,6 +127,7 @@ func main() { if len(*opentsdbHTTPListenAddr) > 0 { opentsdbhttpServer.MustStop() } + common.StopUnmarshalWorkers() logger.Infof("shutting down neststorage...") startTime = time.Now() diff --git a/lib/protoparser/common/unmarshal_work.go b/lib/protoparser/common/unmarshal_work.go new file mode 100644 index 000000000..ac16e289d --- /dev/null +++ b/lib/protoparser/common/unmarshal_work.go @@ -0,0 +1,53 @@ +package common + +import ( + "runtime" + "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" +) + +// ScheduleUnmarshalWork schedules uw to run in the worker pool. +// +// It is expected that StartUnmarshalWorkers is already called. +func ScheduleUnmarshalWork(uw UnmarshalWork) { + unmarshalWorkCh <- uw +} + +// UnmarshalWork is a unit of unmarshal work. +type UnmarshalWork interface { + // Unmarshal must implement CPU-bound unmarshal work. + Unmarshal() +} + +// StartUnmarshalWorkers starts unmarshal workers. +func StartUnmarshalWorkers() { + if unmarshalWorkCh != nil { + logger.Panicf("BUG: it looks like startUnmarshalWorkers() has been alread called without stopUnmarshalWorkers()") + } + gomaxprocs := runtime.GOMAXPROCS(-1) + unmarshalWorkCh = make(chan UnmarshalWork, 2*gomaxprocs) + unmarshalWorkersWG.Add(gomaxprocs) + for i := 0; i < gomaxprocs; i++ { + go func() { + defer unmarshalWorkersWG.Done() + for uw := range unmarshalWorkCh { + uw.Unmarshal() + } + }() + } +} + +// StopUnmarshalWorkers stops unmarshal workers. +// +// No more calles to ScheduleUnmarshalWork are allowed after callsing stopUnmarshalWorkers +func StopUnmarshalWorkers() { + close(unmarshalWorkCh) + unmarshalWorkersWG.Wait() + unmarshalWorkCh = nil +} + +var ( + unmarshalWorkCh chan UnmarshalWork + unmarshalWorkersWG sync.WaitGroup +) diff --git a/lib/protoparser/csvimport/streamparser.go b/lib/protoparser/csvimport/streamparser.go index 5828ea7ff..cd0bd0314 100644 --- a/lib/protoparser/csvimport/streamparser.go +++ b/lib/protoparser/csvimport/streamparser.go @@ -11,6 +11,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/metrics" ) @@ -43,15 +44,17 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error { } ctx := getStreamContext(r) defer putStreamContext(ctx) - for ctx.Read(cds) { - if err := callback(ctx.Rows.Rows); err != nil { - return err - } + for ctx.Read() { + uw := getUnmarshalWork() + uw.callback = callback + uw.cds = cds + uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf...) + common.ScheduleUnmarshalWork(uw) } return ctx.Error() } -func (ctx *streamContext) Read(cds []ColumnDescriptor) bool { +func (ctx *streamContext) Read() bool { readCalls.Inc() if ctx.err != nil { return false @@ -64,28 +67,6 @@ func (ctx *streamContext) Read(cds []ColumnDescriptor) bool { } return false } - ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf), cds) - rowsRead.Add(len(ctx.Rows.Rows)) - - rows := ctx.Rows.Rows - - // Set missing timestamps - currentTs := time.Now().UnixNano() / 1e6 - for i := range rows { - row := &rows[i] - if row.Timestamp == 0 { - row.Timestamp = currentTs - } - } - - // Trim timestamps if required. - if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1 { - for i := range rows { - row := &rows[i] - row.Timestamp -= row.Timestamp % tsTrim - } - } - return true } @@ -96,7 +77,6 @@ var ( ) type streamContext struct { - Rows Rows br *bufio.Reader reqBuf []byte tailBuf []byte @@ -111,7 +91,6 @@ func (ctx *streamContext) Error() error { } func (ctx *streamContext) reset() { - ctx.Rows.Reset() ctx.br.Reset(nil) ctx.reqBuf = ctx.reqBuf[:0] ctx.tailBuf = ctx.tailBuf[:0] @@ -146,3 +125,63 @@ func putStreamContext(ctx *streamContext) { var streamContextPool sync.Pool var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) + +type unmarshalWork struct { + rows Rows + callback func(rows []Row) error + cds []ColumnDescriptor + reqBuf []byte +} + +func (uw *unmarshalWork) reset() { + uw.rows.Reset() + uw.callback = nil + uw.cds = nil + uw.reqBuf = uw.reqBuf[:0] +} + +// Unmarshal implements common.UnmarshalWork +func (uw *unmarshalWork) Unmarshal() { + uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf), uw.cds) + rows := uw.rows.Rows + rowsRead.Add(len(rows)) + + // Set missing timestamps + currentTs := time.Now().UnixNano() / 1e6 + for i := range rows { + row := &rows[i] + if row.Timestamp == 0 { + row.Timestamp = currentTs + } + } + + // Trim timestamps if required. + if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1 { + for i := range rows { + row := &rows[i] + row.Timestamp -= row.Timestamp % tsTrim + } + } + + if err := uw.callback(rows); err != nil { + logger.Errorf("error when processing imported data: %s", err) + putUnmarshalWork(uw) + return + } + putUnmarshalWork(uw) +} + +func getUnmarshalWork() *unmarshalWork { + v := unmarshalWorkPool.Get() + if v == nil { + return &unmarshalWork{} + } + return v.(*unmarshalWork) +} + +func putUnmarshalWork(uw *unmarshalWork) { + uw.reset() + unmarshalWorkPool.Put(uw) +} + +var unmarshalWorkPool sync.Pool diff --git a/lib/protoparser/graphite/parser_test.go b/lib/protoparser/graphite/parser_test.go index 1ad5a5dfd..86510f213 100644 --- a/lib/protoparser/graphite/parser_test.go +++ b/lib/protoparser/graphite/parser_test.go @@ -185,12 +185,25 @@ func Test_streamContext_Read(t *testing.T) { f := func(s string, rowsExpected *Rows) { t.Helper() ctx := getStreamContext(strings.NewReader(s)) - ctx.Read() - if len(ctx.Rows.Rows) != len(rowsExpected.Rows) { - t.Fatalf("different len of expected rows;\ngot\n%+v;\nwant\n%+v", ctx.Rows, rowsExpected.Rows) + if !ctx.Read() { + t.Fatalf("expecting successful read") } - if !reflect.DeepEqual(ctx.Rows.Rows, rowsExpected.Rows) { - t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", ctx.Rows.Rows, rowsExpected.Rows) + uw := getUnmarshalWork() + callbackCalls := 0 + uw.callback = func(rows []Row) error { + callbackCalls++ + if len(rows) != len(rowsExpected.Rows) { + t.Fatalf("different len of expected rows;\ngot\n%+v;\nwant\n%+v", rows, rowsExpected.Rows) + } + if !reflect.DeepEqual(rows, rowsExpected.Rows) { + t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows, rowsExpected.Rows) + } + return nil + } + uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf...) + uw.Unmarshal() + if callbackCalls != 1 { + t.Fatalf("unexpected number of callback calls; got %d; want 1", callbackCalls) } } diff --git a/lib/protoparser/graphite/streamparser.go b/lib/protoparser/graphite/streamparser.go index 08942bf3e..f553d529a 100644 --- a/lib/protoparser/graphite/streamparser.go +++ b/lib/protoparser/graphite/streamparser.go @@ -11,6 +11,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/metrics" ) @@ -30,9 +31,10 @@ func ParseStream(r io.Reader, callback func(rows []Row) error) error { defer putStreamContext(ctx) for ctx.Read() { - if err := callback(ctx.Rows.Rows); err != nil { - return err - } + uw := getUnmarshalWork() + uw.callback = callback + uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf...) + common.ScheduleUnmarshalWork(uw) } return ctx.Error() } @@ -50,38 +52,10 @@ func (ctx *streamContext) Read() bool { } return false } - ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf)) - rowsRead.Add(len(ctx.Rows.Rows)) - - rows := ctx.Rows.Rows - - // Fill missing timestamps with the current timestamp rounded to seconds. - currentTimestamp := int64(fasttime.UnixTimestamp()) - for i := range rows { - r := &rows[i] - if r.Timestamp == 0 || r.Timestamp == -1 { - r.Timestamp = currentTimestamp - } - } - - // Convert timestamps from seconds to milliseconds. - for i := range rows { - rows[i].Timestamp *= 1e3 - } - - // Trim timestamps if required. - if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1000 { - for i := range rows { - row := &rows[i] - row.Timestamp -= row.Timestamp % tsTrim - } - } - return true } type streamContext struct { - Rows Rows br *bufio.Reader reqBuf []byte tailBuf []byte @@ -96,7 +70,6 @@ func (ctx *streamContext) Error() error { } func (ctx *streamContext) reset() { - ctx.Rows.Reset() ctx.br.Reset(nil) ctx.reqBuf = ctx.reqBuf[:0] ctx.tailBuf = ctx.tailBuf[:0] @@ -137,3 +110,66 @@ func putStreamContext(ctx *streamContext) { var streamContextPool sync.Pool var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) + +type unmarshalWork struct { + rows Rows + callback func(rows []Row) error + reqBuf []byte +} + +func (uw *unmarshalWork) reset() { + uw.rows.Reset() + uw.callback = nil + uw.reqBuf = uw.reqBuf[:0] +} + +// Unmarshal implements common.UnmarshalWork +func (uw *unmarshalWork) Unmarshal() { + uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf)) + rows := uw.rows.Rows + rowsRead.Add(len(rows)) + + // Fill missing timestamps with the current timestamp rounded to seconds. + currentTimestamp := int64(fasttime.UnixTimestamp()) + for i := range rows { + r := &rows[i] + if r.Timestamp == 0 || r.Timestamp == -1 { + r.Timestamp = currentTimestamp + } + } + + // Convert timestamps from seconds to milliseconds. + for i := range rows { + rows[i].Timestamp *= 1e3 + } + + // Trim timestamps if required. + if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1000 { + for i := range rows { + row := &rows[i] + row.Timestamp -= row.Timestamp % tsTrim + } + } + + if err := uw.callback(rows); err != nil { + logger.Errorf("error when processing imported data: %s", err) + putUnmarshalWork(uw) + return + } + putUnmarshalWork(uw) +} + +func getUnmarshalWork() *unmarshalWork { + v := unmarshalWorkPool.Get() + if v == nil { + return &unmarshalWork{} + } + return v.(*unmarshalWork) +} + +func putUnmarshalWork(uw *unmarshalWork) { + uw.reset() + unmarshalWorkPool.Put(uw) +} + +var unmarshalWorkPool sync.Pool diff --git a/lib/protoparser/influx/streamparser.go b/lib/protoparser/influx/streamparser.go index 6a6a6c381..bf4d6f552 100644 --- a/lib/protoparser/influx/streamparser.go +++ b/lib/protoparser/influx/streamparser.go @@ -10,6 +10,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/metrics" ) @@ -53,15 +54,18 @@ func ParseStream(r io.Reader, isGzipped bool, precision, db string, callback fun ctx := getStreamContext(r) defer putStreamContext(ctx) - for ctx.Read(tsMultiplier) { - if err := callback(db, ctx.Rows.Rows); err != nil { - return err - } + for ctx.Read() { + uw := getUnmarshalWork() + uw.callback = callback + uw.db = db + uw.tsMultiplier = tsMultiplier + uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf...) + common.ScheduleUnmarshalWork(uw) } return ctx.Error() } -func (ctx *streamContext) Read(tsMultiplier int64) bool { +func (ctx *streamContext) Read() bool { readCalls.Inc() if ctx.err != nil { return false @@ -74,43 +78,6 @@ func (ctx *streamContext) Read(tsMultiplier int64) bool { } return false } - ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf)) - rowsRead.Add(len(ctx.Rows.Rows)) - - rows := ctx.Rows.Rows - - // Adjust timestamps according to tsMultiplier - currentTs := time.Now().UnixNano() / 1e6 - if tsMultiplier >= 1 { - for i := range rows { - row := &rows[i] - if row.Timestamp == 0 { - row.Timestamp = currentTs - } else { - row.Timestamp /= tsMultiplier - } - } - } else if tsMultiplier < 0 { - tsMultiplier = -tsMultiplier - currentTs -= currentTs % tsMultiplier - for i := range rows { - row := &rows[i] - if row.Timestamp == 0 { - row.Timestamp = currentTs - } else { - row.Timestamp *= tsMultiplier - } - } - } - - // Trim timestamps if required. - if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1 { - for i := range rows { - row := &rows[i] - row.Timestamp -= row.Timestamp % tsTrim - } - } - return true } @@ -121,7 +88,6 @@ var ( ) type streamContext struct { - Rows Rows br *bufio.Reader reqBuf []byte tailBuf []byte @@ -136,7 +102,6 @@ func (ctx *streamContext) Error() error { } func (ctx *streamContext) reset() { - ctx.Rows.Reset() ctx.br.Reset(nil) ctx.reqBuf = ctx.reqBuf[:0] ctx.tailBuf = ctx.tailBuf[:0] @@ -171,3 +136,81 @@ func putStreamContext(ctx *streamContext) { var streamContextPool sync.Pool var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) + +type unmarshalWork struct { + rows Rows + callback func(db string, rows []Row) error + db string + tsMultiplier int64 + reqBuf []byte +} + +func (uw *unmarshalWork) reset() { + uw.rows.Reset() + uw.callback = nil + uw.db = "" + uw.tsMultiplier = 0 + uw.reqBuf = uw.reqBuf[:0] +} + +// Unmarshal implements common.UnmarshalWork +func (uw *unmarshalWork) Unmarshal() { + uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf)) + rows := uw.rows.Rows + rowsRead.Add(len(rows)) + + // Adjust timestamps according to uw.tsMultiplier + currentTs := time.Now().UnixNano() / 1e6 + tsMultiplier := uw.tsMultiplier + if tsMultiplier >= 1 { + for i := range rows { + row := &rows[i] + if row.Timestamp == 0 { + row.Timestamp = currentTs + } else { + row.Timestamp /= tsMultiplier + } + } + } else if tsMultiplier < 0 { + tsMultiplier = -tsMultiplier + currentTs -= currentTs % tsMultiplier + for i := range rows { + row := &rows[i] + if row.Timestamp == 0 { + row.Timestamp = currentTs + } else { + row.Timestamp *= tsMultiplier + } + } + } + + // Trim timestamps if required. + if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1 { + for i := range rows { + row := &rows[i] + row.Timestamp -= row.Timestamp % tsTrim + } + } + + if err := uw.callback(uw.db, rows); err != nil { + logger.Errorf("error when processing imported data: %s", err) + putUnmarshalWork(uw) + return + } + putUnmarshalWork(uw) +} + +func getUnmarshalWork() *unmarshalWork { + v := unmarshalWorkPool.Get() + if v == nil { + return &unmarshalWork{} + } + return v.(*unmarshalWork) +} + +func putUnmarshalWork(uw *unmarshalWork) { + uw.reset() + unmarshalWorkPool.Put(uw) +} + +var unmarshalWorkPool sync.Pool diff --git a/lib/protoparser/native/streamparser.go b/lib/protoparser/native/streamparser.go index ad9cb6bac..2f362183f 100644 --- a/lib/protoparser/native/streamparser.go +++ b/lib/protoparser/native/streamparser.go @@ -5,7 +5,6 @@ import ( "fmt" "io" "net/http" - "runtime" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -45,46 +44,18 @@ func ParseStream(req *http.Request, callback func(block *Block) error) error { tr.MinTimestamp = encoding.UnmarshalInt64(trBuf) tr.MaxTimestamp = encoding.UnmarshalInt64(trBuf[8:]) - // Start GOMAXPROC workers in order to process ingested data in parallel. - gomaxprocs := runtime.GOMAXPROCS(-1) - workCh := make(chan *unmarshalWork, 8*gomaxprocs) - var wg sync.WaitGroup - defer func() { - close(workCh) - wg.Wait() - }() - wg.Add(gomaxprocs) - for i := 0; i < gomaxprocs; i++ { - go func() { - defer wg.Done() - var tmpBlock storage.Block - for uw := range workCh { - if err := uw.unmarshal(&tmpBlock, tr); err != nil { - parseErrors.Inc() - logger.Errorf("error when unmarshaling native block: %s", err) - putUnmarshalWork(uw) - continue - } - if err := callback(&uw.block); err != nil { - processErrors.Inc() - logger.Errorf("error when processing native block: %s", err) - putUnmarshalWork(uw) - continue - } - putUnmarshalWork(uw) - } - }() - } - // Read native blocks and feed workers with work. sizeBuf := make([]byte, 4) for { uw := getUnmarshalWork() + uw.tr = tr + uw.callback = callback // Read uw.metricNameBuf if _, err := io.ReadFull(br, sizeBuf); err != nil { if err == io.EOF { // End of stream + putUnmarshalWork(uw) return nil } readErrors.Inc() @@ -122,8 +93,7 @@ func ParseStream(req *http.Request, callback func(block *Block) error) error { readCalls.Inc() blocksRead.Inc() - // Feed workers with work. - workCh <- uw + common.ScheduleUnmarshalWork(uw) } } @@ -152,22 +122,43 @@ var ( type unmarshalWork struct { tr storage.TimeRange + callback func(block *Block) error metricNameBuf []byte blockBuf []byte block Block } func (uw *unmarshalWork) reset() { + uw.callback = nil uw.metricNameBuf = uw.metricNameBuf[:0] uw.blockBuf = uw.blockBuf[:0] uw.block.reset() } -func (uw *unmarshalWork) unmarshal(tmpBlock *storage.Block, tr storage.TimeRange) error { +// Unmarshal implements common.UnmarshalWork +func (uw *unmarshalWork) Unmarshal() { + if err := uw.unmarshal(); err != nil { + parseErrors.Inc() + logger.Errorf("error when unmarshaling native block: %s", err) + putUnmarshalWork(uw) + return + } + if err := uw.callback(&uw.block); err != nil { + processErrors.Inc() + logger.Errorf("error when processing native block: %s", err) + putUnmarshalWork(uw) + return + } + putUnmarshalWork(uw) +} + +func (uw *unmarshalWork) unmarshal() error { block := &uw.block if err := block.MetricName.UnmarshalNoAccountIDProjectID(uw.metricNameBuf); err != nil { return fmt.Errorf("cannot unmarshal metricName from %d bytes: %w", len(uw.metricNameBuf), err) } + tmpBlock := blockPool.Get().(*storage.Block) + defer blockPool.Put(tmpBlock) tail, err := tmpBlock.UnmarshalPortable(uw.blockBuf) if err != nil { return fmt.Errorf("cannot unmarshal native block from %d bytes: %w", len(uw.blockBuf), err) @@ -175,11 +166,17 @@ func (uw *unmarshalWork) unmarshal(tmpBlock *storage.Block, tr storage.TimeRange if len(tail) > 0 { return fmt.Errorf("unexpected non-empty tail left after unmarshaling native block from %d bytes; len(tail)=%d bytes", len(uw.blockBuf), len(tail)) } - block.Timestamps, block.Values = tmpBlock.AppendRowsWithTimeRangeFilter(block.Timestamps[:0], block.Values[:0], tr) + block.Timestamps, block.Values = tmpBlock.AppendRowsWithTimeRangeFilter(block.Timestamps[:0], block.Values[:0], uw.tr) rowsRead.Add(len(block.Timestamps)) return nil } +var blockPool = &sync.Pool{ + New: func() interface{} { + return &storage.Block{} + }, +} + func getUnmarshalWork() *unmarshalWork { v := unmarshalWorkPool.Get() if v == nil { diff --git a/lib/protoparser/opentsdb/streamparser.go b/lib/protoparser/opentsdb/streamparser.go index 7f1762f06..bc9507f74 100644 --- a/lib/protoparser/opentsdb/streamparser.go +++ b/lib/protoparser/opentsdb/streamparser.go @@ -11,6 +11,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/metrics" ) @@ -29,9 +30,10 @@ func ParseStream(r io.Reader, callback func(rows []Row) error) error { ctx := getStreamContext(r) defer putStreamContext(ctx) for ctx.Read() { - if err := callback(ctx.Rows.Rows); err != nil { - return err - } + uw := getUnmarshalWork() + uw.callback = callback + uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf...) + common.ScheduleUnmarshalWork(uw) } return ctx.Error() } @@ -49,38 +51,10 @@ func (ctx *streamContext) Read() bool { } return false } - ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf)) - rowsRead.Add(len(ctx.Rows.Rows)) - - rows := ctx.Rows.Rows - - // Fill in missing timestamps - currentTimestamp := int64(fasttime.UnixTimestamp()) - for i := range rows { - r := &rows[i] - if r.Timestamp == 0 { - r.Timestamp = currentTimestamp - } - } - - // Convert timestamps from seconds to milliseconds - for i := range rows { - rows[i].Timestamp *= 1e3 - } - - // Trim timestamps if required. - if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1000 { - for i := range rows { - row := &rows[i] - row.Timestamp -= row.Timestamp % tsTrim - } - } - return true } type streamContext struct { - Rows Rows br *bufio.Reader reqBuf []byte tailBuf []byte @@ -95,7 +69,6 @@ func (ctx *streamContext) Error() error { } func (ctx *streamContext) reset() { - ctx.Rows.Reset() ctx.br.Reset(nil) ctx.reqBuf = ctx.reqBuf[:0] ctx.tailBuf = ctx.tailBuf[:0] @@ -136,3 +109,66 @@ func putStreamContext(ctx *streamContext) { var streamContextPool sync.Pool var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) + +type unmarshalWork struct { + rows Rows + callback func(rows []Row) error + reqBuf []byte +} + +func (uw *unmarshalWork) reset() { + uw.rows.Reset() + uw.callback = nil + uw.reqBuf = uw.reqBuf[:0] +} + +// Unmarshal implements common.UnmarshalWork +func (uw *unmarshalWork) Unmarshal() { + uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf)) + rows := uw.rows.Rows + rowsRead.Add(len(rows)) + + // Fill in missing timestamps + currentTimestamp := int64(fasttime.UnixTimestamp()) + for i := range rows { + r := &rows[i] + if r.Timestamp == 0 { + r.Timestamp = currentTimestamp + } + } + + // Convert timestamps from seconds to milliseconds + for i := range rows { + rows[i].Timestamp *= 1e3 + } + + // Trim timestamps if required. + if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1000 { + for i := range rows { + row := &rows[i] + row.Timestamp -= row.Timestamp % tsTrim + } + } + + if err := uw.callback(rows); err != nil { + logger.Errorf("error when processing imported data: %s", err) + putUnmarshalWork(uw) + return + } + putUnmarshalWork(uw) +} + +func getUnmarshalWork() *unmarshalWork { + v := unmarshalWorkPool.Get() + if v == nil { + return &unmarshalWork{} + } + return v.(*unmarshalWork) +} + +func putUnmarshalWork(uw *unmarshalWork) { + uw.reset() + unmarshalWorkPool.Put(uw) +} + +var unmarshalWorkPool sync.Pool diff --git a/lib/protoparser/opentsdbhttp/parser_pool.go b/lib/protoparser/opentsdbhttp/parser_pool.go index f64763107..3863528a1 100644 --- a/lib/protoparser/opentsdbhttp/parser_pool.go +++ b/lib/protoparser/opentsdbhttp/parser_pool.go @@ -4,17 +4,17 @@ import ( "github.com/valyala/fastjson" ) -// GetParser returns JSON parser. +// getJSONParser returns JSON parser. // -// The parser must be returned to the pool via PutParser when no longer needed. -func GetParser() *fastjson.Parser { +// The parser must be returned to the pool via putJSONParser when no longer needed. +func getJSONParser() *fastjson.Parser { return parserPool.Get() } -// PutParser returns p to the pool. +// putJSONParser returns p to the pool. // // p cannot be used after returning to the pool. -func PutParser(p *fastjson.Parser) { +func putJSONParser(p *fastjson.Parser) { parserPool.Put(p) } diff --git a/lib/protoparser/opentsdbhttp/parser_test.go b/lib/protoparser/opentsdbhttp/parser_test.go index fc43804e9..7b75bbaa5 100644 --- a/lib/protoparser/opentsdbhttp/parser_test.go +++ b/lib/protoparser/opentsdbhttp/parser_test.go @@ -9,8 +9,8 @@ func TestRowsUnmarshalFailure(t *testing.T) { f := func(s string) { t.Helper() var rows Rows - p := GetParser() - defer PutParser(p) + p := getJSONParser() + defer putJSONParser(p) v, err := p.Parse(s) if err != nil { // Expected JSON parser error @@ -84,8 +84,8 @@ func TestRowsUnmarshalSuccess(t *testing.T) { t.Helper() var rows Rows - p := GetParser() - defer PutParser(p) + p := getJSONParser() + defer putJSONParser(p) v, err := p.Parse(s) if err != nil { t.Fatalf("cannot parse json %s: %s", s, err) diff --git a/lib/protoparser/opentsdbhttp/streamparser.go b/lib/protoparser/opentsdbhttp/streamparser.go index eaa1df9cd..bc8d9ca77 100644 --- a/lib/protoparser/opentsdbhttp/streamparser.go +++ b/lib/protoparser/opentsdbhttp/streamparser.go @@ -13,6 +13,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/metrics" ) @@ -56,59 +57,21 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error { return fmt.Errorf("too big HTTP OpenTSDB request; mustn't exceed `-opentsdbhttp.maxInsertRequestSize=%d` bytes", maxInsertRequestSize.N) } - // Unmarshal the request to ctx.Rows - p := GetParser() - defer PutParser(p) - v, err := p.ParseBytes(ctx.reqBuf.B) - if err != nil { - unmarshalErrors.Inc() - return fmt.Errorf("cannot parse HTTP OpenTSDB json: %w", err) - } - ctx.Rows.Unmarshal(v) - rowsRead.Add(len(ctx.Rows.Rows)) - - rows := ctx.Rows.Rows - - // Fill in missing timestamps - currentTimestamp := int64(fasttime.UnixTimestamp()) - for i := range rows { - r := &rows[i] - if r.Timestamp == 0 { - r.Timestamp = currentTimestamp - } - } - - // Convert timestamps in seconds to milliseconds if needed. - // See http://opentsdb.net/docs/javadoc/net/opentsdb/core/Const.html#SECOND_MASK - for i := range rows { - r := &rows[i] - if r.Timestamp&secondMask == 0 { - r.Timestamp *= 1e3 - } - } - - // Trim timestamps if required. - if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1 { - for i := range rows { - row := &rows[i] - row.Timestamp -= row.Timestamp % tsTrim - } - } - - // Insert ctx.Rows to db. - return callback(rows) + uw := getUnmarshalWork() + uw.callback = callback + uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf.B...) + common.ScheduleUnmarshalWork(uw) + return nil } const secondMask int64 = 0x7FFFFFFF00000000 type streamContext struct { - Rows Rows br *bufio.Reader reqBuf bytesutil.ByteBuffer } func (ctx *streamContext) reset() { - ctx.Rows.Reset() ctx.br.Reset(nil) ctx.reqBuf.Reset() } @@ -148,3 +111,78 @@ func putStreamContext(ctx *streamContext) { var streamContextPool sync.Pool var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) + +type unmarshalWork struct { + rows Rows + callback func(rows []Row) error + reqBuf []byte +} + +func (uw *unmarshalWork) reset() { + uw.rows.Reset() + uw.callback = nil + uw.reqBuf = uw.reqBuf[:0] +} + +// Unmarshal implements common.UnmarshalWork +func (uw *unmarshalWork) Unmarshal() { + p := getJSONParser() + defer putJSONParser(p) + v, err := p.ParseBytes(uw.reqBuf) + if err != nil { + unmarshalErrors.Inc() + logger.Errorf("cannot parse HTTP OpenTSDB json: %s", err) + return + } + uw.rows.Unmarshal(v) + rows := uw.rows.Rows + rowsRead.Add(len(rows)) + + // Fill in missing timestamps + currentTimestamp := int64(fasttime.UnixTimestamp()) + for i := range rows { + r := &rows[i] + if r.Timestamp == 0 { + r.Timestamp = currentTimestamp + } + } + + // Convert timestamps in seconds to milliseconds if needed. + // See http://opentsdb.net/docs/javadoc/net/opentsdb/core/Const.html#SECOND_MASK + for i := range rows { + r := &rows[i] + if r.Timestamp&secondMask == 0 { + r.Timestamp *= 1e3 + } + } + + // Trim timestamps if required. + if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1 { + for i := range rows { + row := &rows[i] + row.Timestamp -= row.Timestamp % tsTrim + } + } + + if err := uw.callback(rows); err != nil { + logger.Errorf("error when processing imported data: %s", err) + putUnmarshalWork(uw) + return + } + putUnmarshalWork(uw) +} + +func getUnmarshalWork() *unmarshalWork { + v := unmarshalWorkPool.Get() + if v == nil { + return &unmarshalWork{} + } + return v.(*unmarshalWork) +} + +func putUnmarshalWork(uw *unmarshalWork) { + uw.reset() + unmarshalWorkPool.Put(uw) +} + +var unmarshalWorkPool sync.Pool diff --git a/lib/protoparser/prometheus/streamparser.go b/lib/protoparser/prometheus/streamparser.go index b4d317696..061a7dc36 100644 --- a/lib/protoparser/prometheus/streamparser.go +++ b/lib/protoparser/prometheus/streamparser.go @@ -9,6 +9,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/metrics" ) @@ -29,15 +30,17 @@ func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback f } ctx := getStreamContext(r) defer putStreamContext(ctx) - for ctx.Read(defaultTimestamp) { - if err := callback(ctx.Rows.Rows); err != nil { - return err - } + for ctx.Read() { + uw := getUnmarshalWork() + uw.callback = callback + uw.defaultTimestamp = defaultTimestamp + uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf...) + common.ScheduleUnmarshalWork(uw) } return ctx.Error() } -func (ctx *streamContext) Read(defaultTimestamp int64) bool { +func (ctx *streamContext) Read() bool { readCalls.Inc() if ctx.err != nil { return false @@ -50,26 +53,10 @@ func (ctx *streamContext) Read(defaultTimestamp int64) bool { } return false } - ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf)) - rowsRead.Add(len(ctx.Rows.Rows)) - - rows := ctx.Rows.Rows - - // Fill missing timestamps with the current timestamp. - if defaultTimestamp <= 0 { - defaultTimestamp = int64(time.Now().UnixNano() / 1e6) - } - for i := range rows { - r := &rows[i] - if r.Timestamp == 0 { - r.Timestamp = defaultTimestamp - } - } return true } type streamContext struct { - Rows Rows br *bufio.Reader reqBuf []byte tailBuf []byte @@ -84,7 +71,6 @@ func (ctx *streamContext) Error() error { } func (ctx *streamContext) reset() { - ctx.Rows.Reset() ctx.br.Reset(nil) ctx.reqBuf = ctx.reqBuf[:0] ctx.tailBuf = ctx.tailBuf[:0] @@ -125,3 +111,58 @@ func putStreamContext(ctx *streamContext) { var streamContextPool sync.Pool var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) + +type unmarshalWork struct { + rows Rows + callback func(rows []Row) error + defaultTimestamp int64 + reqBuf []byte +} + +func (uw *unmarshalWork) reset() { + uw.rows.Reset() + uw.callback = nil + uw.defaultTimestamp = 0 + uw.reqBuf = uw.reqBuf[:0] +} + +// Unmarshal implements common.UnmarshalWork +func (uw *unmarshalWork) Unmarshal() { + uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf)) + rows := uw.rows.Rows + rowsRead.Add(len(rows)) + + // Fill missing timestamps with the current timestamp. + defaultTimestamp := uw.defaultTimestamp + if defaultTimestamp <= 0 { + defaultTimestamp = int64(time.Now().UnixNano() / 1e6) + } + for i := range rows { + r := &rows[i] + if r.Timestamp == 0 { + r.Timestamp = defaultTimestamp + } + } + + if err := uw.callback(rows); err != nil { + logger.Errorf("error when processing imported data: %s", err) + putUnmarshalWork(uw) + return + } + putUnmarshalWork(uw) +} + +func getUnmarshalWork() *unmarshalWork { + v := unmarshalWorkPool.Get() + if v == nil { + return &unmarshalWork{} + } + return v.(*unmarshalWork) +} + +func putUnmarshalWork(uw *unmarshalWork) { + uw.reset() + unmarshalWorkPool.Put(uw) +} + +var unmarshalWorkPool sync.Pool diff --git a/lib/protoparser/promremotewrite/streamparser.go b/lib/protoparser/promremotewrite/streamparser.go index a4ef73846..a6c88b58a 100644 --- a/lib/protoparser/promremotewrite/streamparser.go +++ b/lib/protoparser/promremotewrite/streamparser.go @@ -10,7 +10,9 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/metrics" "github.com/golang/snappy" ) @@ -19,49 +21,42 @@ var maxInsertRequestSize = flagutil.NewBytes("maxInsertRequestSize", 32*1024*102 // ParseStream parses Prometheus remote_write message req and calls callback for the parsed timeseries. // -// callback shouldn't hold timeseries after returning. -func ParseStream(req *http.Request, callback func(timeseries []prompb.TimeSeries) error) error { +// callback shouldn't hold tss after returning. +func ParseStream(req *http.Request, callback func(tss []prompb.TimeSeries) error) error { ctx := getPushCtx(req.Body) defer putPushCtx(ctx) if err := ctx.Read(); err != nil { return err } - return callback(ctx.wr.Timeseries) + uw := getUnmarshalWork() + uw.callback = callback + uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf.B...) + common.ScheduleUnmarshalWork(uw) + return nil } type pushCtx struct { - wr prompb.WriteRequest br *bufio.Reader - reqBuf []byte + reqBuf bytesutil.ByteBuffer } func (ctx *pushCtx) reset() { - ctx.wr.Reset() ctx.br.Reset(nil) - ctx.reqBuf = ctx.reqBuf[:0] + ctx.reqBuf.Reset() } func (ctx *pushCtx) Read() error { readCalls.Inc() - var err error - - ctx.reqBuf, err = readSnappy(ctx.reqBuf[:0], ctx.br) + lr := io.LimitReader(ctx.br, int64(maxInsertRequestSize.N)+1) + reqLen, err := ctx.reqBuf.ReadFrom(lr) if err != nil { readErrors.Inc() - return fmt.Errorf("cannot read prompb.WriteRequest: %w", err) + return fmt.Errorf("cannot read compressed request: %w", err) } - if err = ctx.wr.Unmarshal(ctx.reqBuf); err != nil { - unmarshalErrors.Inc() - return fmt.Errorf("cannot unmarshal prompb.WriteRequest with size %d bytes: %w", len(ctx.reqBuf), err) + if reqLen > int64(maxInsertRequestSize.N) { + readErrors.Inc() + return fmt.Errorf("too big packed request; mustn't exceed `-maxInsertRequestSize=%d` bytes", maxInsertRequestSize.N) } - - rows := 0 - tss := ctx.wr.Timeseries - for i := range tss { - rows += len(tss[i].Samples) - } - rowsRead.Add(rows) - return nil } @@ -101,34 +96,66 @@ func putPushCtx(ctx *pushCtx) { var pushCtxPool sync.Pool var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1)) -func readSnappy(dst []byte, r io.Reader) ([]byte, error) { - lr := io.LimitReader(r, int64(maxInsertRequestSize.N)+1) +type unmarshalWork struct { + wr prompb.WriteRequest + callback func(tss []prompb.TimeSeries) error + reqBuf []byte +} + +func (uw *unmarshalWork) reset() { + uw.wr.Reset() + uw.callback = nil + uw.reqBuf = uw.reqBuf[:0] +} + +// Unmarshal implements common.UnmarshalWork +func (uw *unmarshalWork) Unmarshal() { bb := bodyBufferPool.Get() - reqLen, err := bb.ReadFrom(lr) + defer bodyBufferPool.Put(bb) + var err error + bb.B, err = snappy.Decode(bb.B[:cap(bb.B)], uw.reqBuf) if err != nil { - bodyBufferPool.Put(bb) - return dst, fmt.Errorf("cannot read compressed request: %w", err) + logger.Errorf("cannot decompress request with length %d: %s", len(uw.reqBuf), err) + return } - if reqLen > int64(maxInsertRequestSize.N) { - return dst, fmt.Errorf("too big packed request; mustn't exceed `-maxInsertRequestSize=%d` bytes", maxInsertRequestSize.N) + if len(bb.B) > maxInsertRequestSize.N { + logger.Errorf("too big unpacked request; mustn't exceed `-maxInsertRequestSize=%d` bytes; got %d bytes", maxInsertRequestSize.N, len(bb.B)) + return + } + if err := uw.wr.Unmarshal(bb.B); err != nil { + unmarshalErrors.Inc() + logger.Errorf("cannot unmarshal prompb.WriteRequest with size %d bytes: %s", len(bb.B), err) + return } - buf := dst[len(dst):cap(dst)] - buf, err = snappy.Decode(buf, bb.B) - bodyBufferPool.Put(bb) - if err != nil { - err = fmt.Errorf("cannot decompress request with length %d: %w", reqLen, err) - return dst, err + rows := 0 + tss := uw.wr.Timeseries + for i := range tss { + rows += len(tss[i].Samples) } - if len(buf) > maxInsertRequestSize.N { - return dst, fmt.Errorf("too big unpacked request; mustn't exceed `-maxInsertRequestSize=%d` bytes; got %d bytes", maxInsertRequestSize.N, len(buf)) + rowsRead.Add(rows) + + if err := uw.callback(tss); err != nil { + logger.Errorf("error when processing imported data: %s", err) + putUnmarshalWork(uw) + return } - if len(buf) > 0 && len(dst) < cap(dst) && &buf[0] == &dst[len(dst):cap(dst)][0] { - dst = dst[:len(dst)+len(buf)] - } else { - dst = append(dst, buf...) - } - return dst, nil + putUnmarshalWork(uw) } var bodyBufferPool bytesutil.ByteBufferPool + +func getUnmarshalWork() *unmarshalWork { + v := unmarshalWorkPool.Get() + if v == nil { + return &unmarshalWork{} + } + return v.(*unmarshalWork) +} + +func putUnmarshalWork(uw *unmarshalWork) { + uw.reset() + unmarshalWorkPool.Put(uw) +} + +var unmarshalWorkPool sync.Pool diff --git a/lib/protoparser/vmimport/streamparser.go b/lib/protoparser/vmimport/streamparser.go index 38040ddac..f0eefdfe4 100644 --- a/lib/protoparser/vmimport/streamparser.go +++ b/lib/protoparser/vmimport/streamparser.go @@ -34,42 +34,13 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error { defer common.PutGzipReader(zr) r = zr } - - // Start gomaxprocs workers for processing the parsed data in parallel. - gomaxprocs := runtime.GOMAXPROCS(-1) - workCh := make(chan *unmarshalWork, 8*gomaxprocs) - var wg sync.WaitGroup - defer func() { - close(workCh) - wg.Wait() - }() - wg.Add(gomaxprocs) - for i := 0; i < gomaxprocs; i++ { - go func() { - defer wg.Done() - for uw := range workCh { - uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf)) - rows := uw.rows.Rows - for i := range rows { - row := &rows[i] - rowsRead.Add(len(row.Timestamps)) - } - if err := callback(rows); err != nil { - logger.Errorf("error when processing imported data: %s", err) - putUnmarshalWork(uw) - continue - } - putUnmarshalWork(uw) - } - }() - } - ctx := getStreamContext(r) defer putStreamContext(ctx) for ctx.Read() { uw := getUnmarshalWork() + uw.callback = callback uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf...) - workCh <- uw + common.ScheduleUnmarshalWork(uw) } return ctx.Error() } @@ -147,15 +118,33 @@ var streamContextPool sync.Pool var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) type unmarshalWork struct { - rows Rows - reqBuf []byte + rows Rows + callback func(rows []Row) error + reqBuf []byte } func (uw *unmarshalWork) reset() { uw.rows.Reset() + uw.callback = nil uw.reqBuf = uw.reqBuf[:0] } +// Unmarshal implements common.UnmarshalWork +func (uw *unmarshalWork) Unmarshal() { + uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf)) + rows := uw.rows.Rows + for i := range rows { + row := &rows[i] + rowsRead.Add(len(row.Timestamps)) + } + if err := uw.callback(rows); err != nil { + logger.Errorf("error when processing imported data: %s", err) + putUnmarshalWork(uw) + return + } + putUnmarshalWork(uw) +} + func getUnmarshalWork() *unmarshalWork { v := unmarshalWorkPool.Get() if v == nil {