changes protoparser apis for accepting reading from io.Reader (#1624)

adds InsertHandlerForReader apis to vmagent
This commit is contained in:
Nikolay 2021-09-20 14:49:28 +03:00 committed by GitHub
parent 15ea4c6dae
commit ad08d9dfc0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 48 additions and 16 deletions

View file

@ -35,9 +35,9 @@ var (
// InsertHandlerForReader processes remote write for influx line protocol.
//
// See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/
func InsertHandlerForReader(r io.Reader) error {
func InsertHandlerForReader(r io.Reader, isGzipped bool) error {
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(r, false, "", "", func(db string, rows []parser.Row) error {
return parser.ParseStream(r, isGzipped, "", "", func(db string, rows []parser.Row) error {
return insertRows(nil, db, rows, nil)
})
})

View file

@ -3,6 +3,7 @@ package main
import (
"flag"
"fmt"
"io"
"net/http"
"os"
"strings"
@ -93,7 +94,9 @@ func main() {
common.StartUnmarshalWorkers()
writeconcurrencylimiter.Init()
if len(*influxListenAddr) > 0 {
influxServer = influxserver.MustStart(*influxListenAddr, influx.InsertHandlerForReader)
influxServer = influxserver.MustStart(*influxListenAddr, func(r io.Reader) error {
return influx.InsertHandlerForReader(r, false)
})
}
if len(*graphiteListenAddr) > 0 {
graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, graphite.InsertHandler)

View file

@ -1,6 +1,7 @@
package prometheusimport
import (
"io"
"net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
@ -38,6 +39,15 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
})
}
// InsertHandlerForReader processes metrics from given reader with optional gzip format
func InsertHandlerForReader(r io.Reader, isGzipped bool) error {
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(r, 0, isGzipped, func(rows []parser.Row) error {
return insertRows(nil, rows, nil)
}, nil)
})
}
func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)

View file

@ -1,6 +1,7 @@
package promremotewrite
import (
"io"
"net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
@ -29,12 +30,21 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
return err
}
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, func(tss []prompb.TimeSeries) error {
return parser.ParseStream(req.Body, func(tss []prompb.TimeSeries) error {
return insertRows(at, tss, extraLabels)
})
})
}
// InsertHandlerForReader processes metrics from given reader
func InsertHandlerForReader(r io.Reader) error {
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(r, func(tss []prompb.TimeSeries) error {
return insertRows(nil, tss, nil)
})
})
}
func insertRows(at *auth.Token, timeseries []prompb.TimeSeries, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)

View file

@ -1,6 +1,7 @@
package vmimport
import (
"io"
"net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
@ -31,12 +32,22 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
return err
}
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, func(rows []parser.Row) error {
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
return parser.ParseStream(req.Body, isGzipped, func(rows []parser.Row) error {
return insertRows(at, rows, extraLabels)
})
})
}
// InsertHandlerForReader processes metrics from given reader
func InsertHandlerForReader(r io.Reader, isGzipped bool) error {
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(r, isGzipped, func(rows []parser.Row) error {
return insertRows(nil, rows, nil)
})
})
}
func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)

View file

@ -25,7 +25,7 @@ func InsertHandler(req *http.Request) error {
return err
}
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, func(tss []prompb.TimeSeries) error {
return parser.ParseStream(req.Body, func(tss []prompb.TimeSeries) error {
return insertRows(tss, extraLabels)
})
})

View file

@ -30,7 +30,8 @@ func InsertHandler(req *http.Request) error {
return err
}
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, func(rows []parser.Row) error {
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
return parser.ParseStream(req.Body, isGzipped, func(rows []parser.Row) error {
return insertRows(rows, extraLabels)
})
})

View file

@ -4,7 +4,6 @@ import (
"bufio"
"fmt"
"io"
"net/http"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@ -18,11 +17,11 @@ import (
var maxInsertRequestSize = flagutil.NewBytes("maxInsertRequestSize", 32*1024*1024, "The maximum size in bytes of a single Prometheus remote_write API request")
// ParseStream parses Prometheus remote_write message req and calls callback for the parsed timeseries.
// ParseStream parses Prometheus remote_write message from reader and calls callback for the parsed timeseries.
//
// callback shouldn't hold tss after returning.
func ParseStream(req *http.Request, callback func(tss []prompb.TimeSeries) error) error {
ctx := getPushCtx(req.Body)
func ParseStream(r io.Reader, callback func(tss []prompb.TimeSeries) error) error {
ctx := getPushCtx(r)
defer putPushCtx(ctx)
if err := ctx.Read(); err != nil {
return err

View file

@ -4,7 +4,6 @@ import (
"bufio"
"fmt"
"io"
"net/http"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@ -19,12 +18,11 @@ var maxLineLen = flagutil.NewBytes("import.maxLineLen", 100*1024*1024, "The maxi
// ParseStream parses /api/v1/import lines from req and calls callback for the parsed rows.
//
// The callback can be called concurrently multiple times for streamed data from req.
// The callback can be called concurrently multiple times for streamed data from reader.
//
// callback shouldn't hold rows after returning.
func ParseStream(req *http.Request, callback func(rows []Row) error) error {
r := req.Body
if req.Header.Get("Content-Encoding") == "gzip" {
func ParseStream(r io.Reader, isGzipped bool, callback func(rows []Row) error) error {
if isGzipped {
zr, err := common.GetGzipReader(r)
if err != nil {
return fmt.Errorf("cannot read gzipped vmimport data: %w", err)