changes protoparser apis for accepting reading from io.Reader (#1624)

adds InsertHandlerForReader apis to vmagent
This commit is contained in:
Nikolay 2021-09-20 14:49:28 +03:00 committed by Aliaksandr Valialkin
parent 3b9fd99735
commit dd53abf36d
9 changed files with 48 additions and 16 deletions

View file

@ -35,9 +35,9 @@ var (
// InsertHandlerForReader processes remote write for influx line protocol. // InsertHandlerForReader processes remote write for influx line protocol.
// //
// See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/ // See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/
func InsertHandlerForReader(r io.Reader) error { func InsertHandlerForReader(r io.Reader, isGzipped bool) error {
return writeconcurrencylimiter.Do(func() error { return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(r, false, "", "", func(db string, rows []parser.Row) error { return parser.ParseStream(r, isGzipped, "", "", func(db string, rows []parser.Row) error {
return insertRows(nil, db, rows, nil) return insertRows(nil, db, rows, nil)
}) })
}) })

View file

@ -3,6 +3,7 @@ package main
import ( import (
"flag" "flag"
"fmt" "fmt"
"io"
"net/http" "net/http"
"os" "os"
"strings" "strings"
@ -93,7 +94,9 @@ func main() {
common.StartUnmarshalWorkers() common.StartUnmarshalWorkers()
writeconcurrencylimiter.Init() writeconcurrencylimiter.Init()
if len(*influxListenAddr) > 0 { if len(*influxListenAddr) > 0 {
influxServer = influxserver.MustStart(*influxListenAddr, influx.InsertHandlerForReader) influxServer = influxserver.MustStart(*influxListenAddr, func(r io.Reader) error {
return influx.InsertHandlerForReader(r, false)
})
} }
if len(*graphiteListenAddr) > 0 { if len(*graphiteListenAddr) > 0 {
graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, graphite.InsertHandler) graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, graphite.InsertHandler)

View file

@ -1,6 +1,7 @@
package prometheusimport package prometheusimport
import ( import (
"io"
"net/http" "net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
@ -38,6 +39,15 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
}) })
} }
// InsertHandlerForReader processes metrics from given reader with optional gzip format
func InsertHandlerForReader(r io.Reader, isGzipped bool) error {
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(r, 0, isGzipped, func(rows []parser.Row) error {
return insertRows(nil, rows, nil)
}, nil)
})
}
func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error { func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx() ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx) defer common.PutPushCtx(ctx)

View file

@ -1,6 +1,7 @@
package promremotewrite package promremotewrite
import ( import (
"io"
"net/http" "net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
@ -29,12 +30,21 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
return err return err
} }
return writeconcurrencylimiter.Do(func() error { return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, func(tss []prompb.TimeSeries) error { return parser.ParseStream(req.Body, func(tss []prompb.TimeSeries) error {
return insertRows(at, tss, extraLabels) return insertRows(at, tss, extraLabels)
}) })
}) })
} }
// InsertHandlerForReader processes metrics from given reader
func InsertHandlerForReader(r io.Reader) error {
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(r, func(tss []prompb.TimeSeries) error {
return insertRows(nil, tss, nil)
})
})
}
func insertRows(at *auth.Token, timeseries []prompb.TimeSeries, extraLabels []prompbmarshal.Label) error { func insertRows(at *auth.Token, timeseries []prompb.TimeSeries, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx() ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx) defer common.PutPushCtx(ctx)

View file

@ -1,6 +1,7 @@
package vmimport package vmimport
import ( import (
"io"
"net/http" "net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
@ -31,12 +32,22 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
return err return err
} }
return writeconcurrencylimiter.Do(func() error { return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, func(rows []parser.Row) error { isGzipped := req.Header.Get("Content-Encoding") == "gzip"
return parser.ParseStream(req.Body, isGzipped, func(rows []parser.Row) error {
return insertRows(at, rows, extraLabels) return insertRows(at, rows, extraLabels)
}) })
}) })
} }
// InsertHandlerForReader processes metrics from given reader
func InsertHandlerForReader(r io.Reader, isGzipped bool) error {
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(r, isGzipped, func(rows []parser.Row) error {
return insertRows(nil, rows, nil)
})
})
}
func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error { func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx() ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx) defer common.PutPushCtx(ctx)

View file

@ -29,7 +29,7 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
return err return err
} }
return writeconcurrencylimiter.Do(func() error { return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, func(tss []prompb.TimeSeries) error { return parser.ParseStream(req.Body, func(tss []prompb.TimeSeries) error {
return insertRows(at, tss, extraLabels) return insertRows(at, tss, extraLabels)
}) })
}) })

View file

@ -31,7 +31,8 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
return err return err
} }
return writeconcurrencylimiter.Do(func() error { return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, func(rows []parser.Row) error { isGzipped := req.Header.Get("Content-Encoding") == "gzip"
return parser.ParseStream(req.Body, isGzipped, func(rows []parser.Row) error {
return insertRows(at, rows, extraLabels) return insertRows(at, rows, extraLabels)
}) })
}) })

View file

@ -4,7 +4,6 @@ import (
"bufio" "bufio"
"fmt" "fmt"
"io" "io"
"net/http"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@ -18,11 +17,11 @@ import (
var maxInsertRequestSize = flagutil.NewBytes("maxInsertRequestSize", 32*1024*1024, "The maximum size in bytes of a single Prometheus remote_write API request") var maxInsertRequestSize = flagutil.NewBytes("maxInsertRequestSize", 32*1024*1024, "The maximum size in bytes of a single Prometheus remote_write API request")
// ParseStream parses Prometheus remote_write message req and calls callback for the parsed timeseries. // ParseStream parses Prometheus remote_write message from reader and calls callback for the parsed timeseries.
// //
// callback shouldn't hold tss after returning. // callback shouldn't hold tss after returning.
func ParseStream(req *http.Request, callback func(tss []prompb.TimeSeries) error) error { func ParseStream(r io.Reader, callback func(tss []prompb.TimeSeries) error) error {
ctx := getPushCtx(req.Body) ctx := getPushCtx(r)
defer putPushCtx(ctx) defer putPushCtx(ctx)
if err := ctx.Read(); err != nil { if err := ctx.Read(); err != nil {
return err return err

View file

@ -4,7 +4,6 @@ import (
"bufio" "bufio"
"fmt" "fmt"
"io" "io"
"net/http"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@ -19,12 +18,11 @@ var maxLineLen = flagutil.NewBytes("import.maxLineLen", 100*1024*1024, "The maxi
// ParseStream parses /api/v1/import lines from req and calls callback for the parsed rows. // ParseStream parses /api/v1/import lines from req and calls callback for the parsed rows.
// //
// The callback can be called concurrently multiple times for streamed data from req. // The callback can be called concurrently multiple times for streamed data from reader.
// //
// callback shouldn't hold rows after returning. // callback shouldn't hold rows after returning.
func ParseStream(req *http.Request, callback func(rows []Row) error) error { func ParseStream(r io.Reader, isGzipped bool, callback func(rows []Row) error) error {
r := req.Body if isGzipped {
if req.Header.Get("Content-Encoding") == "gzip" {
zr, err := common.GetGzipReader(r) zr, err := common.GetGzipReader(r)
if err != nil { if err != nil {
return fmt.Errorf("cannot read gzipped vmimport data: %w", err) return fmt.Errorf("cannot read gzipped vmimport data: %w", err)