mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-02-19 15:30:17 +00:00
lib/protoparser: use 64KB read buffer instead of default 4KB buffer provided by net/http.Server
This should reduce syscall overhead when reading big amounts of data
This commit is contained in:
parent
6d8c23fdbd
commit
7072db75cb
10 changed files with 139 additions and 63 deletions
|
@ -1,6 +1,7 @@
|
||||||
package csvimport
|
package csvimport
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bufio"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
@ -40,10 +41,9 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error {
|
||||||
defer common.PutGzipReader(zr)
|
defer common.PutGzipReader(zr)
|
||||||
r = zr
|
r = zr
|
||||||
}
|
}
|
||||||
|
ctx := getStreamContext(r)
|
||||||
ctx := getStreamContext()
|
|
||||||
defer putStreamContext(ctx)
|
defer putStreamContext(ctx)
|
||||||
for ctx.Read(r, cds) {
|
for ctx.Read(cds) {
|
||||||
if err := callback(ctx.Rows.Rows); err != nil {
|
if err := callback(ctx.Rows.Rows); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -51,12 +51,12 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error {
|
||||||
return ctx.Error()
|
return ctx.Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ctx *streamContext) Read(r io.Reader, cds []ColumnDescriptor) bool {
|
func (ctx *streamContext) Read(cds []ColumnDescriptor) bool {
|
||||||
readCalls.Inc()
|
readCalls.Inc()
|
||||||
if ctx.err != nil {
|
if ctx.err != nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf)
|
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf)
|
||||||
if ctx.err != nil {
|
if ctx.err != nil {
|
||||||
if ctx.err != io.EOF {
|
if ctx.err != io.EOF {
|
||||||
readErrors.Inc()
|
readErrors.Inc()
|
||||||
|
@ -97,6 +97,7 @@ var (
|
||||||
|
|
||||||
type streamContext struct {
|
type streamContext struct {
|
||||||
Rows Rows
|
Rows Rows
|
||||||
|
br *bufio.Reader
|
||||||
reqBuf []byte
|
reqBuf []byte
|
||||||
tailBuf []byte
|
tailBuf []byte
|
||||||
err error
|
err error
|
||||||
|
@ -111,20 +112,26 @@ func (ctx *streamContext) Error() error {
|
||||||
|
|
||||||
func (ctx *streamContext) reset() {
|
func (ctx *streamContext) reset() {
|
||||||
ctx.Rows.Reset()
|
ctx.Rows.Reset()
|
||||||
|
ctx.br.Reset(nil)
|
||||||
ctx.reqBuf = ctx.reqBuf[:0]
|
ctx.reqBuf = ctx.reqBuf[:0]
|
||||||
ctx.tailBuf = ctx.tailBuf[:0]
|
ctx.tailBuf = ctx.tailBuf[:0]
|
||||||
ctx.err = nil
|
ctx.err = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getStreamContext() *streamContext {
|
func getStreamContext(r io.Reader) *streamContext {
|
||||||
select {
|
select {
|
||||||
case ctx := <-streamContextPoolCh:
|
case ctx := <-streamContextPoolCh:
|
||||||
|
ctx.br.Reset(r)
|
||||||
return ctx
|
return ctx
|
||||||
default:
|
default:
|
||||||
if v := streamContextPool.Get(); v != nil {
|
if v := streamContextPool.Get(); v != nil {
|
||||||
return v.(*streamContext)
|
ctx := v.(*streamContext)
|
||||||
|
ctx.br.Reset(r)
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
return &streamContext{
|
||||||
|
br: bufio.NewReaderSize(r, 64*1024),
|
||||||
}
|
}
|
||||||
return &streamContext{}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -184,8 +184,8 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
|
||||||
func Test_streamContext_Read(t *testing.T) {
|
func Test_streamContext_Read(t *testing.T) {
|
||||||
f := func(s string, rowsExpected *Rows) {
|
f := func(s string, rowsExpected *Rows) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
ctx := &streamContext{}
|
ctx := getStreamContext(strings.NewReader(s))
|
||||||
ctx.Read(strings.NewReader(s))
|
ctx.Read()
|
||||||
if len(ctx.Rows.Rows) != len(rowsExpected.Rows) {
|
if len(ctx.Rows.Rows) != len(rowsExpected.Rows) {
|
||||||
t.Fatalf("different len of expected rows;\ngot\n%+v;\nwant\n%+v", ctx.Rows, rowsExpected.Rows)
|
t.Fatalf("different len of expected rows;\ngot\n%+v;\nwant\n%+v", ctx.Rows, rowsExpected.Rows)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package graphite
|
package graphite
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bufio"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
@ -25,10 +26,10 @@ var (
|
||||||
//
|
//
|
||||||
// callback shouldn't hold rows after returning.
|
// callback shouldn't hold rows after returning.
|
||||||
func ParseStream(r io.Reader, callback func(rows []Row) error) error {
|
func ParseStream(r io.Reader, callback func(rows []Row) error) error {
|
||||||
ctx := getStreamContext()
|
ctx := getStreamContext(r)
|
||||||
defer putStreamContext(ctx)
|
defer putStreamContext(ctx)
|
||||||
|
|
||||||
for ctx.Read(r) {
|
for ctx.Read() {
|
||||||
if err := callback(ctx.Rows.Rows); err != nil {
|
if err := callback(ctx.Rows.Rows); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -36,12 +37,12 @@ func ParseStream(r io.Reader, callback func(rows []Row) error) error {
|
||||||
return ctx.Error()
|
return ctx.Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ctx *streamContext) Read(r io.Reader) bool {
|
func (ctx *streamContext) Read() bool {
|
||||||
readCalls.Inc()
|
readCalls.Inc()
|
||||||
if ctx.err != nil {
|
if ctx.err != nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf)
|
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf)
|
||||||
if ctx.err != nil {
|
if ctx.err != nil {
|
||||||
if ctx.err != io.EOF {
|
if ctx.err != io.EOF {
|
||||||
readErrors.Inc()
|
readErrors.Inc()
|
||||||
|
@ -81,6 +82,7 @@ func (ctx *streamContext) Read(r io.Reader) bool {
|
||||||
|
|
||||||
type streamContext struct {
|
type streamContext struct {
|
||||||
Rows Rows
|
Rows Rows
|
||||||
|
br *bufio.Reader
|
||||||
reqBuf []byte
|
reqBuf []byte
|
||||||
tailBuf []byte
|
tailBuf []byte
|
||||||
err error
|
err error
|
||||||
|
@ -95,6 +97,7 @@ func (ctx *streamContext) Error() error {
|
||||||
|
|
||||||
func (ctx *streamContext) reset() {
|
func (ctx *streamContext) reset() {
|
||||||
ctx.Rows.Reset()
|
ctx.Rows.Reset()
|
||||||
|
ctx.br.Reset(nil)
|
||||||
ctx.reqBuf = ctx.reqBuf[:0]
|
ctx.reqBuf = ctx.reqBuf[:0]
|
||||||
ctx.tailBuf = ctx.tailBuf[:0]
|
ctx.tailBuf = ctx.tailBuf[:0]
|
||||||
ctx.err = nil
|
ctx.err = nil
|
||||||
|
@ -106,15 +109,20 @@ var (
|
||||||
rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="graphite"}`)
|
rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="graphite"}`)
|
||||||
)
|
)
|
||||||
|
|
||||||
func getStreamContext() *streamContext {
|
func getStreamContext(r io.Reader) *streamContext {
|
||||||
select {
|
select {
|
||||||
case ctx := <-streamContextPoolCh:
|
case ctx := <-streamContextPoolCh:
|
||||||
|
ctx.br.Reset(r)
|
||||||
return ctx
|
return ctx
|
||||||
default:
|
default:
|
||||||
if v := streamContextPool.Get(); v != nil {
|
if v := streamContextPool.Get(); v != nil {
|
||||||
return v.(*streamContext)
|
ctx := v.(*streamContext)
|
||||||
|
ctx.br.Reset(r)
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
return &streamContext{
|
||||||
|
br: bufio.NewReaderSize(r, 64*1024),
|
||||||
}
|
}
|
||||||
return &streamContext{}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package influx
|
package influx
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bufio"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
@ -50,9 +51,9 @@ func ParseStream(r io.Reader, isGzipped bool, precision, db string, callback fun
|
||||||
tsMultiplier = -1e3 * 3600
|
tsMultiplier = -1e3 * 3600
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx := getStreamContext()
|
ctx := getStreamContext(r)
|
||||||
defer putStreamContext(ctx)
|
defer putStreamContext(ctx)
|
||||||
for ctx.Read(r, tsMultiplier) {
|
for ctx.Read(tsMultiplier) {
|
||||||
if err := callback(db, ctx.Rows.Rows); err != nil {
|
if err := callback(db, ctx.Rows.Rows); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -60,12 +61,12 @@ func ParseStream(r io.Reader, isGzipped bool, precision, db string, callback fun
|
||||||
return ctx.Error()
|
return ctx.Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ctx *streamContext) Read(r io.Reader, tsMultiplier int64) bool {
|
func (ctx *streamContext) Read(tsMultiplier int64) bool {
|
||||||
readCalls.Inc()
|
readCalls.Inc()
|
||||||
if ctx.err != nil {
|
if ctx.err != nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf)
|
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf)
|
||||||
if ctx.err != nil {
|
if ctx.err != nil {
|
||||||
if ctx.err != io.EOF {
|
if ctx.err != io.EOF {
|
||||||
readErrors.Inc()
|
readErrors.Inc()
|
||||||
|
@ -121,6 +122,7 @@ var (
|
||||||
|
|
||||||
type streamContext struct {
|
type streamContext struct {
|
||||||
Rows Rows
|
Rows Rows
|
||||||
|
br *bufio.Reader
|
||||||
reqBuf []byte
|
reqBuf []byte
|
||||||
tailBuf []byte
|
tailBuf []byte
|
||||||
err error
|
err error
|
||||||
|
@ -135,20 +137,26 @@ func (ctx *streamContext) Error() error {
|
||||||
|
|
||||||
func (ctx *streamContext) reset() {
|
func (ctx *streamContext) reset() {
|
||||||
ctx.Rows.Reset()
|
ctx.Rows.Reset()
|
||||||
|
ctx.br.Reset(nil)
|
||||||
ctx.reqBuf = ctx.reqBuf[:0]
|
ctx.reqBuf = ctx.reqBuf[:0]
|
||||||
ctx.tailBuf = ctx.tailBuf[:0]
|
ctx.tailBuf = ctx.tailBuf[:0]
|
||||||
ctx.err = nil
|
ctx.err = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getStreamContext() *streamContext {
|
func getStreamContext(r io.Reader) *streamContext {
|
||||||
select {
|
select {
|
||||||
case ctx := <-streamContextPoolCh:
|
case ctx := <-streamContextPoolCh:
|
||||||
|
ctx.br.Reset(r)
|
||||||
return ctx
|
return ctx
|
||||||
default:
|
default:
|
||||||
if v := streamContextPool.Get(); v != nil {
|
if v := streamContextPool.Get(); v != nil {
|
||||||
return v.(*streamContext)
|
ctx := v.(*streamContext)
|
||||||
|
ctx.br.Reset(r)
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
return &streamContext{
|
||||||
|
br: bufio.NewReaderSize(r, 64*1024),
|
||||||
}
|
}
|
||||||
return &streamContext{}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -32,9 +32,8 @@ func ParseStream(req *http.Request, callback func(block *Block) error) error {
|
||||||
defer common.PutGzipReader(zr)
|
defer common.PutGzipReader(zr)
|
||||||
r = zr
|
r = zr
|
||||||
}
|
}
|
||||||
// By default req.Body uses 4Kb buffer. This size is too small for typical request to /api/v1/import/native,
|
br := getBufferedReader(r)
|
||||||
// so use slightly bigger buffer in order to reduce read syscall overhead.
|
defer putBufferedReader(br)
|
||||||
br := bufio.NewReaderSize(r, 1024*1024)
|
|
||||||
|
|
||||||
// Read time range (tr)
|
// Read time range (tr)
|
||||||
trBuf := make([]byte, 16)
|
trBuf := make([]byte, 16)
|
||||||
|
@ -195,3 +194,20 @@ func putUnmarshalWork(uw *unmarshalWork) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var unmarshalWorkPool sync.Pool
|
var unmarshalWorkPool sync.Pool
|
||||||
|
|
||||||
|
func getBufferedReader(r io.Reader) *bufio.Reader {
|
||||||
|
v := bufferedReaderPool.Get()
|
||||||
|
if v == nil {
|
||||||
|
return bufio.NewReaderSize(r, 64*1024)
|
||||||
|
}
|
||||||
|
br := v.(*bufio.Reader)
|
||||||
|
br.Reset(r)
|
||||||
|
return br
|
||||||
|
}
|
||||||
|
|
||||||
|
func putBufferedReader(br *bufio.Reader) {
|
||||||
|
br.Reset(nil)
|
||||||
|
bufferedReaderPool.Put(br)
|
||||||
|
}
|
||||||
|
|
||||||
|
var bufferedReaderPool sync.Pool
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package opentsdb
|
package opentsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bufio"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
@ -25,9 +26,9 @@ var (
|
||||||
//
|
//
|
||||||
// callback shouldn't hold rows after returning.
|
// callback shouldn't hold rows after returning.
|
||||||
func ParseStream(r io.Reader, callback func(rows []Row) error) error {
|
func ParseStream(r io.Reader, callback func(rows []Row) error) error {
|
||||||
ctx := getStreamContext()
|
ctx := getStreamContext(r)
|
||||||
defer putStreamContext(ctx)
|
defer putStreamContext(ctx)
|
||||||
for ctx.Read(r) {
|
for ctx.Read() {
|
||||||
if err := callback(ctx.Rows.Rows); err != nil {
|
if err := callback(ctx.Rows.Rows); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -35,12 +36,12 @@ func ParseStream(r io.Reader, callback func(rows []Row) error) error {
|
||||||
return ctx.Error()
|
return ctx.Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ctx *streamContext) Read(r io.Reader) bool {
|
func (ctx *streamContext) Read() bool {
|
||||||
readCalls.Inc()
|
readCalls.Inc()
|
||||||
if ctx.err != nil {
|
if ctx.err != nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf)
|
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf)
|
||||||
if ctx.err != nil {
|
if ctx.err != nil {
|
||||||
if ctx.err != io.EOF {
|
if ctx.err != io.EOF {
|
||||||
readErrors.Inc()
|
readErrors.Inc()
|
||||||
|
@ -80,6 +81,7 @@ func (ctx *streamContext) Read(r io.Reader) bool {
|
||||||
|
|
||||||
type streamContext struct {
|
type streamContext struct {
|
||||||
Rows Rows
|
Rows Rows
|
||||||
|
br *bufio.Reader
|
||||||
reqBuf []byte
|
reqBuf []byte
|
||||||
tailBuf []byte
|
tailBuf []byte
|
||||||
err error
|
err error
|
||||||
|
@ -94,6 +96,7 @@ func (ctx *streamContext) Error() error {
|
||||||
|
|
||||||
func (ctx *streamContext) reset() {
|
func (ctx *streamContext) reset() {
|
||||||
ctx.Rows.Reset()
|
ctx.Rows.Reset()
|
||||||
|
ctx.br.Reset(nil)
|
||||||
ctx.reqBuf = ctx.reqBuf[:0]
|
ctx.reqBuf = ctx.reqBuf[:0]
|
||||||
ctx.tailBuf = ctx.tailBuf[:0]
|
ctx.tailBuf = ctx.tailBuf[:0]
|
||||||
ctx.err = nil
|
ctx.err = nil
|
||||||
|
@ -105,15 +108,20 @@ var (
|
||||||
rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="opentsdb"}`)
|
rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="opentsdb"}`)
|
||||||
)
|
)
|
||||||
|
|
||||||
func getStreamContext() *streamContext {
|
func getStreamContext(r io.Reader) *streamContext {
|
||||||
select {
|
select {
|
||||||
case ctx := <-streamContextPoolCh:
|
case ctx := <-streamContextPoolCh:
|
||||||
|
ctx.br.Reset(r)
|
||||||
return ctx
|
return ctx
|
||||||
default:
|
default:
|
||||||
if v := streamContextPool.Get(); v != nil {
|
if v := streamContextPool.Get(); v != nil {
|
||||||
return v.(*streamContext)
|
ctx := v.(*streamContext)
|
||||||
|
ctx.br.Reset(r)
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
return &streamContext{
|
||||||
|
br: bufio.NewReaderSize(r, 64*1024),
|
||||||
}
|
}
|
||||||
return &streamContext{}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package opentsdbhttp
|
package opentsdbhttp
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bufio"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
@ -40,11 +41,11 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error {
|
||||||
r = zr
|
r = zr
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx := getStreamContext()
|
ctx := getStreamContext(r)
|
||||||
defer putStreamContext(ctx)
|
defer putStreamContext(ctx)
|
||||||
|
|
||||||
// Read the request in ctx.reqBuf
|
// Read the request in ctx.reqBuf
|
||||||
lr := io.LimitReader(r, int64(maxInsertRequestSize.N)+1)
|
lr := io.LimitReader(ctx.br, int64(maxInsertRequestSize.N)+1)
|
||||||
reqLen, err := ctx.reqBuf.ReadFrom(lr)
|
reqLen, err := ctx.reqBuf.ReadFrom(lr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
readErrors.Inc()
|
readErrors.Inc()
|
||||||
|
@ -102,11 +103,13 @@ const secondMask int64 = 0x7FFFFFFF00000000
|
||||||
|
|
||||||
type streamContext struct {
|
type streamContext struct {
|
||||||
Rows Rows
|
Rows Rows
|
||||||
|
br *bufio.Reader
|
||||||
reqBuf bytesutil.ByteBuffer
|
reqBuf bytesutil.ByteBuffer
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ctx *streamContext) reset() {
|
func (ctx *streamContext) reset() {
|
||||||
ctx.Rows.Reset()
|
ctx.Rows.Reset()
|
||||||
|
ctx.br.Reset(nil)
|
||||||
ctx.reqBuf.Reset()
|
ctx.reqBuf.Reset()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -117,15 +120,20 @@ var (
|
||||||
unmarshalErrors = metrics.NewCounter(`vm_protoparser_unmarshal_errors_total{type="opentsdbhttp"}`)
|
unmarshalErrors = metrics.NewCounter(`vm_protoparser_unmarshal_errors_total{type="opentsdbhttp"}`)
|
||||||
)
|
)
|
||||||
|
|
||||||
func getStreamContext() *streamContext {
|
func getStreamContext(r io.Reader) *streamContext {
|
||||||
select {
|
select {
|
||||||
case ctx := <-streamContextPoolCh:
|
case ctx := <-streamContextPoolCh:
|
||||||
|
ctx.br.Reset(r)
|
||||||
return ctx
|
return ctx
|
||||||
default:
|
default:
|
||||||
if v := streamContextPool.Get(); v != nil {
|
if v := streamContextPool.Get(); v != nil {
|
||||||
return v.(*streamContext)
|
ctx := v.(*streamContext)
|
||||||
|
ctx.br.Reset(r)
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
return &streamContext{
|
||||||
|
br: bufio.NewReaderSize(r, 64*1024),
|
||||||
}
|
}
|
||||||
return &streamContext{}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package prometheus
|
package prometheus
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bufio"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
@ -26,9 +27,9 @@ func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback f
|
||||||
defer common.PutGzipReader(zr)
|
defer common.PutGzipReader(zr)
|
||||||
r = zr
|
r = zr
|
||||||
}
|
}
|
||||||
ctx := getStreamContext()
|
ctx := getStreamContext(r)
|
||||||
defer putStreamContext(ctx)
|
defer putStreamContext(ctx)
|
||||||
for ctx.Read(r, defaultTimestamp) {
|
for ctx.Read(defaultTimestamp) {
|
||||||
if err := callback(ctx.Rows.Rows); err != nil {
|
if err := callback(ctx.Rows.Rows); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -36,12 +37,12 @@ func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback f
|
||||||
return ctx.Error()
|
return ctx.Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ctx *streamContext) Read(r io.Reader, defaultTimestamp int64) bool {
|
func (ctx *streamContext) Read(defaultTimestamp int64) bool {
|
||||||
readCalls.Inc()
|
readCalls.Inc()
|
||||||
if ctx.err != nil {
|
if ctx.err != nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf)
|
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf)
|
||||||
if ctx.err != nil {
|
if ctx.err != nil {
|
||||||
if ctx.err != io.EOF {
|
if ctx.err != io.EOF {
|
||||||
readErrors.Inc()
|
readErrors.Inc()
|
||||||
|
@ -69,6 +70,7 @@ func (ctx *streamContext) Read(r io.Reader, defaultTimestamp int64) bool {
|
||||||
|
|
||||||
type streamContext struct {
|
type streamContext struct {
|
||||||
Rows Rows
|
Rows Rows
|
||||||
|
br *bufio.Reader
|
||||||
reqBuf []byte
|
reqBuf []byte
|
||||||
tailBuf []byte
|
tailBuf []byte
|
||||||
err error
|
err error
|
||||||
|
@ -83,6 +85,7 @@ func (ctx *streamContext) Error() error {
|
||||||
|
|
||||||
func (ctx *streamContext) reset() {
|
func (ctx *streamContext) reset() {
|
||||||
ctx.Rows.Reset()
|
ctx.Rows.Reset()
|
||||||
|
ctx.br.Reset(nil)
|
||||||
ctx.reqBuf = ctx.reqBuf[:0]
|
ctx.reqBuf = ctx.reqBuf[:0]
|
||||||
ctx.tailBuf = ctx.tailBuf[:0]
|
ctx.tailBuf = ctx.tailBuf[:0]
|
||||||
ctx.err = nil
|
ctx.err = nil
|
||||||
|
@ -94,15 +97,20 @@ var (
|
||||||
rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="prometheus"}`)
|
rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="prometheus"}`)
|
||||||
)
|
)
|
||||||
|
|
||||||
func getStreamContext() *streamContext {
|
func getStreamContext(r io.Reader) *streamContext {
|
||||||
select {
|
select {
|
||||||
case ctx := <-streamContextPoolCh:
|
case ctx := <-streamContextPoolCh:
|
||||||
|
ctx.br.Reset(r)
|
||||||
return ctx
|
return ctx
|
||||||
default:
|
default:
|
||||||
if v := streamContextPool.Get(); v != nil {
|
if v := streamContextPool.Get(); v != nil {
|
||||||
return v.(*streamContext)
|
ctx := v.(*streamContext)
|
||||||
|
ctx.br.Reset(r)
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
return &streamContext{
|
||||||
|
br: bufio.NewReaderSize(r, 64*1024),
|
||||||
}
|
}
|
||||||
return &streamContext{}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package promremotewrite
|
package promremotewrite
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bufio"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -20,9 +21,9 @@ var maxInsertRequestSize = flagutil.NewBytes("maxInsertRequestSize", 32*1024*102
|
||||||
//
|
//
|
||||||
// callback shouldn't hold timeseries after returning.
|
// callback shouldn't hold timeseries after returning.
|
||||||
func ParseStream(req *http.Request, callback func(timeseries []prompb.TimeSeries) error) error {
|
func ParseStream(req *http.Request, callback func(timeseries []prompb.TimeSeries) error) error {
|
||||||
ctx := getPushCtx()
|
ctx := getPushCtx(req.Body)
|
||||||
defer putPushCtx(ctx)
|
defer putPushCtx(ctx)
|
||||||
if err := ctx.Read(req); err != nil {
|
if err := ctx.Read(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return callback(ctx.wr.Timeseries)
|
return callback(ctx.wr.Timeseries)
|
||||||
|
@ -30,18 +31,21 @@ func ParseStream(req *http.Request, callback func(timeseries []prompb.TimeSeries
|
||||||
|
|
||||||
type pushCtx struct {
|
type pushCtx struct {
|
||||||
wr prompb.WriteRequest
|
wr prompb.WriteRequest
|
||||||
|
br *bufio.Reader
|
||||||
reqBuf []byte
|
reqBuf []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ctx *pushCtx) reset() {
|
func (ctx *pushCtx) reset() {
|
||||||
ctx.wr.Reset()
|
ctx.wr.Reset()
|
||||||
|
ctx.br.Reset(nil)
|
||||||
ctx.reqBuf = ctx.reqBuf[:0]
|
ctx.reqBuf = ctx.reqBuf[:0]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ctx *pushCtx) Read(r *http.Request) error {
|
func (ctx *pushCtx) Read() error {
|
||||||
readCalls.Inc()
|
readCalls.Inc()
|
||||||
var err error
|
var err error
|
||||||
ctx.reqBuf, err = readSnappy(ctx.reqBuf[:0], r.Body)
|
|
||||||
|
ctx.reqBuf, err = readSnappy(ctx.reqBuf[:0], ctx.br)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
readErrors.Inc()
|
readErrors.Inc()
|
||||||
return fmt.Errorf("cannot read prompb.WriteRequest: %w", err)
|
return fmt.Errorf("cannot read prompb.WriteRequest: %w", err)
|
||||||
|
@ -68,15 +72,20 @@ var (
|
||||||
unmarshalErrors = metrics.NewCounter(`vm_protoparser_unmarshal_errors_total{type="promremotewrite"}`)
|
unmarshalErrors = metrics.NewCounter(`vm_protoparser_unmarshal_errors_total{type="promremotewrite"}`)
|
||||||
)
|
)
|
||||||
|
|
||||||
func getPushCtx() *pushCtx {
|
func getPushCtx(r io.Reader) *pushCtx {
|
||||||
select {
|
select {
|
||||||
case ctx := <-pushCtxPoolCh:
|
case ctx := <-pushCtxPoolCh:
|
||||||
|
ctx.br.Reset(r)
|
||||||
return ctx
|
return ctx
|
||||||
default:
|
default:
|
||||||
if v := pushCtxPool.Get(); v != nil {
|
if v := pushCtxPool.Get(); v != nil {
|
||||||
return v.(*pushCtx)
|
ctx := v.(*pushCtx)
|
||||||
|
ctx.br.Reset(r)
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
return &pushCtx{
|
||||||
|
br: bufio.NewReaderSize(r, 64*1024),
|
||||||
}
|
}
|
||||||
return &pushCtx{}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -34,9 +34,6 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error {
|
||||||
defer common.PutGzipReader(zr)
|
defer common.PutGzipReader(zr)
|
||||||
r = zr
|
r = zr
|
||||||
}
|
}
|
||||||
// By default req.Body uses 4Kb buffer. This size is too small for typical request to /api/v1/import,
|
|
||||||
// so use slightly bigger buffer in order to reduce read syscall overhead.
|
|
||||||
br := bufio.NewReaderSize(r, 1024*1024)
|
|
||||||
|
|
||||||
// Start gomaxprocs workers for processing the parsed data in parallel.
|
// Start gomaxprocs workers for processing the parsed data in parallel.
|
||||||
gomaxprocs := runtime.GOMAXPROCS(-1)
|
gomaxprocs := runtime.GOMAXPROCS(-1)
|
||||||
|
@ -67,9 +64,9 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error {
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx := getStreamContext()
|
ctx := getStreamContext(r)
|
||||||
defer putStreamContext(ctx)
|
defer putStreamContext(ctx)
|
||||||
for ctx.Read(br) {
|
for ctx.Read() {
|
||||||
uw := getUnmarshalWork()
|
uw := getUnmarshalWork()
|
||||||
uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf...)
|
uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf...)
|
||||||
workCh <- uw
|
workCh <- uw
|
||||||
|
@ -77,12 +74,12 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error {
|
||||||
return ctx.Error()
|
return ctx.Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ctx *streamContext) Read(r io.Reader) bool {
|
func (ctx *streamContext) Read() bool {
|
||||||
readCalls.Inc()
|
readCalls.Inc()
|
||||||
if ctx.err != nil {
|
if ctx.err != nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlockExt(r, ctx.reqBuf, ctx.tailBuf, maxLineLen.N)
|
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlockExt(ctx.br, ctx.reqBuf, ctx.tailBuf, maxLineLen.N)
|
||||||
if ctx.err != nil {
|
if ctx.err != nil {
|
||||||
if ctx.err != io.EOF {
|
if ctx.err != io.EOF {
|
||||||
readErrors.Inc()
|
readErrors.Inc()
|
||||||
|
@ -100,6 +97,7 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
type streamContext struct {
|
type streamContext struct {
|
||||||
|
br *bufio.Reader
|
||||||
reqBuf []byte
|
reqBuf []byte
|
||||||
tailBuf []byte
|
tailBuf []byte
|
||||||
err error
|
err error
|
||||||
|
@ -113,20 +111,26 @@ func (ctx *streamContext) Error() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ctx *streamContext) reset() {
|
func (ctx *streamContext) reset() {
|
||||||
|
ctx.br.Reset(nil)
|
||||||
ctx.reqBuf = ctx.reqBuf[:0]
|
ctx.reqBuf = ctx.reqBuf[:0]
|
||||||
ctx.tailBuf = ctx.tailBuf[:0]
|
ctx.tailBuf = ctx.tailBuf[:0]
|
||||||
ctx.err = nil
|
ctx.err = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getStreamContext() *streamContext {
|
func getStreamContext(r io.Reader) *streamContext {
|
||||||
select {
|
select {
|
||||||
case ctx := <-streamContextPoolCh:
|
case ctx := <-streamContextPoolCh:
|
||||||
|
ctx.br.Reset(r)
|
||||||
return ctx
|
return ctx
|
||||||
default:
|
default:
|
||||||
if v := streamContextPool.Get(); v != nil {
|
if v := streamContextPool.Get(); v != nil {
|
||||||
return v.(*streamContext)
|
ctx := v.(*streamContext)
|
||||||
|
ctx.br.Reset(r)
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
return &streamContext{
|
||||||
|
br: bufio.NewReaderSize(r, 64*1024),
|
||||||
}
|
}
|
||||||
return &streamContext{}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue