From 6386aeb1e004b896e2c1de134ee63b04a0edb66e Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 25 Feb 2020 19:09:46 +0200 Subject: [PATCH] app/vmagent: add ability to accept Influx line protocol data via TCP and UDP Just set `-influxListenAddr` command-line flag Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/333 --- README.md | 4 +- app/vmagent/influx/request_handler.go | 21 +++- app/vmagent/main.go | 11 +- app/vminsert/influx/request_handler.go | 21 +++- app/vminsert/main.go | 11 +- lib/ingestserver/graphite/server.go | 8 +- lib/ingestserver/influx/server.go | 152 ++++++++++++++++++++++++ lib/ingestserver/opentsdb/server.go | 8 +- lib/ingestserver/opentsdbhttp/server.go | 4 +- lib/protoparser/influx/streamparser.go | 17 +-- 10 files changed, 227 insertions(+), 30 deletions(-) create mode 100644 lib/ingestserver/influx/server.go diff --git a/README.md b/README.md index dc63a5597..66dbd7430 100644 --- a/README.md +++ b/README.md @@ -57,7 +57,7 @@ Cluster version is available [here](https://github.com/VictoriaMetrics/VictoriaM * [Metrics from Prometheus exporters](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format) such as [node_exporter](https://github.com/prometheus/node_exporter). See [these docs](#how-to-scrape-prometheus-exporters-such-as-node-exporter) for details. * [Prometheus remote write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write) - * [InfluxDB line protocol](#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) + * [InfluxDB line protocol](#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) over HTTP, TCP and UDP. * [Graphite plaintext protocol](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) with [tags](https://graphite.readthedocs.io/en/latest/tags.html#carbon) if `-graphiteListenAddr` is set. * [OpenTSDB put message](#sending-data-via-telnet-put-protocol) if `-opentsdbListenAddr` is set. @@ -268,6 +268,8 @@ For instance, put the following lines into `Telegraf` config, so it sends data t Do not forget substituting `` with the real address where VictoriaMetrics runs. +Another option is to enable TCP and UDP receiver for Influx line protocol via `-influxListenAddr` command-line flag. + VictoriaMetrics maps Influx data using the following rules: * [`db` query arg](https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint) is mapped into `db` label value unless `db` tag exists in the Influx line. diff --git a/app/vmagent/influx/request_handler.go b/app/vmagent/influx/request_handler.go index e0ed2ad92..642bd3d87 100644 --- a/app/vmagent/influx/request_handler.go +++ b/app/vmagent/influx/request_handler.go @@ -2,6 +2,7 @@ package influx import ( "flag" + "io" "net/http" "runtime" "sync" @@ -25,12 +26,26 @@ var ( rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="influx"}`) ) -// InsertHandler processes remote write for influx line protocol. +// InsertHandlerForReader processes remote write for influx line protocol. +// +// See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/ +func InsertHandlerForReader(r io.Reader) error { + return writeconcurrencylimiter.Do(func() error { + return parser.ParseStream(r, false, "", "", insertRows) + }) +} + +// InsertHandlerForHTTP processes remote write for influx line protocol. // // See https://github.com/influxdata/influxdb/blob/4cbdc197b8117fee648d62e2e5be75c6575352f0/tsdb/README.md -func InsertHandler(req *http.Request) error { +func InsertHandlerForHTTP(req *http.Request) error { return writeconcurrencylimiter.Do(func() error { - return parser.ParseStream(req, insertRows) + isGzipped := req.Header.Get("Content-Encoding") == "gzip" + q := req.URL.Query() + precision := q.Get("precision") + // Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint + db := q.Get("db") + return parser.ParseStream(req.Body, isGzipped, precision, db, insertRows) }) } diff --git a/app/vmagent/main.go b/app/vmagent/main.go index 7339e6c16..79a32e048 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -18,6 +18,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" graphiteserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/graphite" + influxserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/influx" opentsdbserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdb" opentsdbhttpserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdbhttp" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -29,6 +30,7 @@ import ( var ( httpListenAddr = flag.String("httpListenAddr", ":8429", "TCP address to listen for http connections") + influxListenAddr = flag.String("influxListenAddr", "", "TCP and UDP address to listen for Influx line protocol data. Usually :8189 must be set. Doesn't work if empty") graphiteListenAddr = flag.String("graphiteListenAddr", "", "TCP and UDP address to listen for Graphite plaintext data. Usually :2003 must be set. Doesn't work if empty") opentsdbListenAddr = flag.String("opentsdbListenAddr", "", "TCP and UDP address to listen for OpentTSDB metrics. "+ "Telnet put messages and HTTP /api/put messages are simultaneously served on TCP port. "+ @@ -37,6 +39,7 @@ var ( ) var ( + influxServer *influxserver.Server graphiteServer *graphiteserver.Server opentsdbServer *opentsdbserver.Server opentsdbhttpServer *opentsdbhttpserver.Server @@ -50,6 +53,9 @@ func main() { startTime := time.Now() remotewrite.Init() writeconcurrencylimiter.Init() + if len(*influxListenAddr) > 0 { + influxServer = influxserver.MustStart(*influxListenAddr, influx.InsertHandlerForReader) + } if len(*graphiteListenAddr) > 0 { graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, graphite.InsertHandler) } @@ -77,6 +83,9 @@ func main() { promscrape.Stop() + if len(*influxListenAddr) > 0 { + influxServer.MustStop() + } if len(*graphiteListenAddr) > 0 { graphiteServer.MustStop() } @@ -114,7 +123,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { return true case "/write", "/api/v2/write": influxWriteRequests.Inc() - if err := influx.InsertHandler(r); err != nil { + if err := influx.InsertHandlerForHTTP(r); err != nil { influxWriteErrors.Inc() httpserver.Errorf(w, "error in %q: %s", r.URL.Path, err) return true diff --git a/app/vminsert/influx/request_handler.go b/app/vminsert/influx/request_handler.go index 43d03457e..cb25d1205 100644 --- a/app/vminsert/influx/request_handler.go +++ b/app/vminsert/influx/request_handler.go @@ -2,6 +2,7 @@ package influx import ( "flag" + "io" "net/http" "runtime" "sync" @@ -24,12 +25,26 @@ var ( rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="influx"}`) ) -// InsertHandler processes remote write for influx line protocol. +// InsertHandlerForReader processes remote write for influx line protocol. +// +// See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/ +func InsertHandlerForReader(r io.Reader) error { + return writeconcurrencylimiter.Do(func() error { + return parser.ParseStream(r, false, "", "", insertRows) + }) +} + +// InsertHandlerForHTTP processes remote write for influx line protocol. // // See https://github.com/influxdata/influxdb/blob/4cbdc197b8117fee648d62e2e5be75c6575352f0/tsdb/README.md -func InsertHandler(req *http.Request) error { +func InsertHandlerForHTTP(req *http.Request) error { return writeconcurrencylimiter.Do(func() error { - return parser.ParseStream(req, insertRows) + isGzipped := req.Header.Get("Content-Encoding") == "gzip" + q := req.URL.Query() + precision := q.Get("precision") + // Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint + db := q.Get("db") + return parser.ParseStream(req.Body, isGzipped, precision, db, insertRows) }) } diff --git a/app/vminsert/main.go b/app/vminsert/main.go index 20951dd35..80c4359f2 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -15,6 +15,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/vmimport" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" graphiteserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/graphite" + influxserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/influx" opentsdbserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdb" opentsdbhttpserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdbhttp" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape" @@ -25,6 +26,7 @@ import ( var ( graphiteListenAddr = flag.String("graphiteListenAddr", "", "TCP and UDP address to listen for Graphite plaintext data. Usually :2003 must be set. Doesn't work if empty") + influxListenAddr = flag.String("influxListenAddr", "", "TCP and UDP address to listen for Influx line protocol data. Usually :8189 must be set. Doesn't work if empty") opentsdbListenAddr = flag.String("opentsdbListenAddr", "", "TCP and UDP address to listen for OpentTSDB metrics. "+ "Telnet put messages and HTTP /api/put messages are simultaneously served on TCP port. "+ "Usually :4242 must be set. Doesn't work if empty") @@ -33,6 +35,7 @@ var ( ) var ( + influxServer *influxserver.Server graphiteServer *graphiteserver.Server opentsdbServer *opentsdbserver.Server opentsdbhttpServer *opentsdbhttpserver.Server @@ -43,6 +46,9 @@ func Init() { storage.SetMaxLabelsPerTimeseries(*maxLabelsPerTimeseries) writeconcurrencylimiter.Init() + if len(*influxListenAddr) > 0 { + influxServer = influxserver.MustStart(*influxListenAddr, influx.InsertHandlerForReader) + } if len(*graphiteListenAddr) > 0 { graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, graphite.InsertHandler) } @@ -58,6 +64,9 @@ func Init() { // Stop stops vminsert. func Stop() { promscrape.Stop() + if len(*influxListenAddr) > 0 { + influxServer.MustStop() + } if len(*graphiteListenAddr) > 0 { graphiteServer.MustStop() } @@ -93,7 +102,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { return true case "/write", "/api/v2/write": influxWriteRequests.Inc() - if err := influx.InsertHandler(r); err != nil { + if err := influx.InsertHandlerForHTTP(r); err != nil { influxWriteErrors.Inc() httpserver.Errorf(w, "error in %q: %s", r.URL.Path, err) return true diff --git a/lib/ingestserver/graphite/server.go b/lib/ingestserver/graphite/server.go index 41955a4c0..02be8da7b 100644 --- a/lib/ingestserver/graphite/server.go +++ b/lib/ingestserver/graphite/server.go @@ -15,11 +15,11 @@ import ( ) var ( - writeRequestsTCP = metrics.NewCounter(`vm_ingestserver_graphite_requests_total{name="write", net="tcp"}`) - writeErrorsTCP = metrics.NewCounter(`vm_ingestserver_graphite_request_errors_total{name="write", net="tcp"}`) + writeRequestsTCP = metrics.NewCounter(`vm_ingestserver_requests_total{type="graphite", name="write", net="tcp"}`) + writeErrorsTCP = metrics.NewCounter(`vm_ingestserver_request_errors_total{type="graphite", name="write", net="tcp"}`) - writeRequestsUDP = metrics.NewCounter(`vm_ingestserver_graphite_requests_total{name="write", net="udp"}`) - writeErrorsUDP = metrics.NewCounter(`vm_ingestserver_graphite_request_errors_total{name="write", net="udp"}`) + writeRequestsUDP = metrics.NewCounter(`vm_ingestserver_requests_total{type="graphite", name="write", net="udp"}`) + writeErrorsUDP = metrics.NewCounter(`vm_ingestserver_request_errors_total{type="graphite", name="write", net="udp"}`) ) // Server accepts Graphite plaintext lines over TCP and UDP. diff --git a/lib/ingestserver/influx/server.go b/lib/ingestserver/influx/server.go new file mode 100644 index 000000000..f429d9bfa --- /dev/null +++ b/lib/ingestserver/influx/server.go @@ -0,0 +1,152 @@ +package influx + +import ( + "io" + "net" + "runtime" + "strings" + "sync" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" + "github.com/VictoriaMetrics/metrics" +) + +var ( + writeRequestsTCP = metrics.NewCounter(`vm_ingestserver_requests_total{type="influx", name="write", net="tcp"}`) + writeErrorsTCP = metrics.NewCounter(`vm_ingestserver_request_errors_total{type="influx", name="write", net="tcp"}`) + + writeRequestsUDP = metrics.NewCounter(`vm_ingestserver_requests_total{type="influx", name="write", net="udp"}`) + writeErrorsUDP = metrics.NewCounter(`vm_ingestserver_request_errors_total{type="influx", name="write", net="udp"}`) +) + +// Server accepts Influx line protocol over TCP and UDP. +type Server struct { + addr string + lnTCP net.Listener + lnUDP net.PacketConn + wg sync.WaitGroup +} + +// MustStart starts Influx server on the given addr. +// +// The incoming connections are processed with insertHandler. +// +// MustStop must be called on the returned server when it is no longer needed. +func MustStart(addr string, insertHandler func(r io.Reader) error) *Server { + logger.Infof("starting TCP Influx server at %q", addr) + lnTCP, err := netutil.NewTCPListener("influx", addr) + if err != nil { + logger.Fatalf("cannot start TCP Influx server at %q: %s", addr, err) + } + + logger.Infof("starting UDP Influx server at %q", addr) + lnUDP, err := net.ListenPacket("udp4", addr) + if err != nil { + logger.Fatalf("cannot start UDP Influx server at %q: %s", addr, err) + } + + s := &Server{ + addr: addr, + lnTCP: lnTCP, + lnUDP: lnUDP, + } + s.wg.Add(1) + go func() { + defer s.wg.Done() + serveTCP(lnTCP, insertHandler) + logger.Infof("stopped TCP Influx server at %q", addr) + }() + s.wg.Add(1) + go func() { + defer s.wg.Done() + serveUDP(lnUDP, insertHandler) + logger.Infof("stopped UDP Influx server at %q", addr) + }() + return s +} + +// MustStop stops the server. +func (s *Server) MustStop() { + logger.Infof("stopping TCP Influx server at %q...", s.addr) + if err := s.lnTCP.Close(); err != nil { + logger.Errorf("cannot close TCP Influx server: %s", err) + } + logger.Infof("stopping UDP Influx server at %q...", s.addr) + if err := s.lnUDP.Close(); err != nil { + logger.Errorf("cannot close UDP Influx server: %s", err) + } + s.wg.Wait() + logger.Infof("TCP and UDP Influx servers at %q have been stopped", s.addr) +} + +func serveTCP(ln net.Listener, insertHandler func(r io.Reader) error) { + for { + c, err := ln.Accept() + if err != nil { + if ne, ok := err.(net.Error); ok { + if ne.Temporary() { + logger.Errorf("influx: temporary error when listening for TCP addr %q: %s", ln.Addr(), err) + time.Sleep(time.Second) + continue + } + if strings.Contains(err.Error(), "use of closed network connection") { + break + } + logger.Fatalf("unrecoverable error when accepting TCP Influx connections: %s", err) + } + logger.Fatalf("unexpected error when accepting TCP Influx connections: %s", err) + } + go func() { + writeRequestsTCP.Inc() + if err := insertHandler(c); err != nil { + writeErrorsTCP.Inc() + logger.Errorf("error in TCP Influx conn %q<->%q: %s", c.LocalAddr(), c.RemoteAddr(), err) + } + _ = c.Close() + }() + } +} + +func serveUDP(ln net.PacketConn, insertHandler func(r io.Reader) error) { + gomaxprocs := runtime.GOMAXPROCS(-1) + var wg sync.WaitGroup + for i := 0; i < gomaxprocs; i++ { + wg.Add(1) + go func() { + defer wg.Done() + var bb bytesutil.ByteBuffer + bb.B = bytesutil.Resize(bb.B, 64*1024) + for { + bb.Reset() + bb.B = bb.B[:cap(bb.B)] + n, addr, err := ln.ReadFrom(bb.B) + if err != nil { + writeErrorsUDP.Inc() + if ne, ok := err.(net.Error); ok { + if ne.Temporary() { + logger.Errorf("influx: temporary error when listening for UDP addr %q: %s", ln.LocalAddr(), err) + time.Sleep(time.Second) + continue + } + if strings.Contains(err.Error(), "use of closed network connection") { + break + } + } + logger.Errorf("cannot read Influx UDP data: %s", err) + continue + } + bb.B = bb.B[:n] + writeRequestsUDP.Inc() + if err := insertHandler(bb.NewReader()); err != nil { + writeErrorsUDP.Inc() + logger.Errorf("error in UDP Influx conn %q<->%q: %s", ln.LocalAddr(), addr, err) + continue + } + } + }() + } + wg.Wait() +} diff --git a/lib/ingestserver/opentsdb/server.go b/lib/ingestserver/opentsdb/server.go index b38e217fe..af83bc923 100644 --- a/lib/ingestserver/opentsdb/server.go +++ b/lib/ingestserver/opentsdb/server.go @@ -17,11 +17,11 @@ import ( ) var ( - writeRequestsTCP = metrics.NewCounter(`vm_opentsdb_requests_total{name="write", net="tcp"}`) - writeErrorsTCP = metrics.NewCounter(`vm_opentsdb_request_errors_total{name="write", net="tcp"}`) + writeRequestsTCP = metrics.NewCounter(`vm_ingestserver_requests_total{type="opentsdb", name="write", net="tcp"}`) + writeErrorsTCP = metrics.NewCounter(`vm_ingestserver_request_errors_total{type="opentsdb", name="write", net="tcp"}`) - writeRequestsUDP = metrics.NewCounter(`vm_opentsdb_requests_total{name="write", net="udp"}`) - writeErrorsUDP = metrics.NewCounter(`vm_opentsdb_request_errors_total{name="write", net="udp"}`) + writeRequestsUDP = metrics.NewCounter(`vm_ingestserver_requests_total{type="opentsdb", name="write", net="udp"}`) + writeErrorsUDP = metrics.NewCounter(`vm_ingestserver_request_errors_total{type="opentsdb", name="write", net="udp"}`) ) // Server is a server for collecting OpenTSDB TCP and UDP metrics. diff --git a/lib/ingestserver/opentsdbhttp/server.go b/lib/ingestserver/opentsdbhttp/server.go index 65b06af55..eeb83d9c3 100644 --- a/lib/ingestserver/opentsdbhttp/server.go +++ b/lib/ingestserver/opentsdbhttp/server.go @@ -14,8 +14,8 @@ import ( ) var ( - writeRequests = metrics.NewCounter(`vm_opentsdbhttp_requests_total{name="write", net="tcp"}`) - writeErrors = metrics.NewCounter(`vm_opentsdbhttp_request_errors_total{name="write", net="tcp"}`) + writeRequests = metrics.NewCounter(`vm_ingestserver_requests_total{type="opentsdbhttp", name="write", net="tcp"}`) + writeErrors = metrics.NewCounter(`vm_ingestserver_request_errors_total{type="opentsdbhttp", name="write", net="tcp"}`) ) // Server represents HTTP OpenTSDB server. diff --git a/lib/protoparser/influx/streamparser.go b/lib/protoparser/influx/streamparser.go index eade73eac..c09976d17 100644 --- a/lib/protoparser/influx/streamparser.go +++ b/lib/protoparser/influx/streamparser.go @@ -3,7 +3,6 @@ package influx import ( "fmt" "io" - "net/http" "runtime" "sync" "time" @@ -13,15 +12,14 @@ import ( "github.com/VictoriaMetrics/metrics" ) -// ParseStream parses req and calls callback for the parsed rows. +// ParseStream parses r with the given args and calls callback for the parsed rows. // -// The callback can be called multiple times for streamed data from req. +// The callback can be called multiple times for streamed data from r. // // callback shouldn't hold rows after returning. -func ParseStream(req *http.Request, callback func(db string, rows []Row) error) error { +func ParseStream(r io.Reader, isGzipped bool, precision, db string, callback func(db string, rows []Row) error) error { readCalls.Inc() - r := req.Body - if req.Header.Get("Content-Encoding") == "gzip" { + if isGzipped { zr, err := common.GetGzipReader(r) if err != nil { return fmt.Errorf("cannot read gzipped influx line protocol data: %s", err) @@ -30,9 +28,9 @@ func ParseStream(req *http.Request, callback func(db string, rows []Row) error) r = zr } - q := req.URL.Query() + // Default precision is 'ns'. See https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/#timestamp tsMultiplier := int64(1e6) - switch q.Get("precision") { + switch precision { case "ns": tsMultiplier = 1e6 case "u": @@ -47,9 +45,6 @@ func ParseStream(req *http.Request, callback func(db string, rows []Row) error) tsMultiplier = -1e3 * 3600 } - // Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint - db := q.Get("db") - ctx := getStreamContext() defer putStreamContext(ctx) for ctx.Read(r, tsMultiplier) {