diff --git a/README.md b/README.md index cf1a58b47..c5958d054 100644 --- a/README.md +++ b/README.md @@ -135,6 +135,7 @@ with [the official Grafana dashboard for VictoriaMetrics cluster](https://grafan - `` may have the following values: - `prometheus` - for inserting data with [Prometheus remote write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write) - `influx/write` or `influx/api/v2/write` - for inserting data with [Influx line protocol](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/) + - `opentsdb/api/put` - for accepting [OpenTSDB HTTP /api/put requests](http://opentsdb.net/docs/build/html/api_http/put.html). - `prometheus/api/v1/import` - for importing data obtained via `api/v1/export` on `vmselect` (see below). * URLs for querying: `http://:8481/select//prometheus/`, where: diff --git a/app/vminsert/graphite/server.go b/app/vminsert/graphite/server.go index f93404dd2..222f72651 100644 --- a/app/vminsert/graphite/server.go +++ b/app/vminsert/graphite/server.go @@ -22,36 +22,62 @@ var ( writeErrorsUDP = metrics.NewCounter(`vm_graphite_request_errors_total{name="write", net="udp"}`) ) -// Serve starts graphite server on the given addr. -func Serve(addr string) { +// Server accepts Graphite plaintext lines over TCP and UDP. +type Server struct { + addr string + lnTCP net.Listener + lnUDP net.PacketConn + wg sync.WaitGroup +} + +// MustStart starts graphite server on the given addr. +// +// MustStop must be called on the returned server when it is no longer needed. +func MustStart(addr string) *Server { logger.Infof("starting TCP Graphite server at %q", addr) lnTCP, err := netutil.NewTCPListener("graphite", addr) if err != nil { logger.Fatalf("cannot start TCP Graphite server at %q: %s", addr, err) } - listenerTCP = lnTCP logger.Infof("starting UDP Graphite server at %q", addr) lnUDP, err := net.ListenPacket("udp4", addr) if err != nil { logger.Fatalf("cannot start UDP Graphite server at %q: %s", addr, err) } - listenerUDP = lnUDP - var wg sync.WaitGroup - wg.Add(1) + s := &Server{ + addr: addr, + lnTCP: lnTCP, + lnUDP: lnUDP, + } + s.wg.Add(1) go func() { - defer wg.Done() - serveTCP(listenerTCP) + defer s.wg.Done() + serveTCP(lnTCP) logger.Infof("stopped TCP Graphite server at %q", addr) }() - wg.Add(1) + s.wg.Add(1) go func() { - defer wg.Done() - serveUDP(listenerUDP) + defer s.wg.Done() + serveUDP(lnUDP) logger.Infof("stopped UDP Graphite server at %q", addr) }() - wg.Wait() + return s +} + +// MustStop stops the server. +func (s *Server) MustStop() { + logger.Infof("stopping TCP Graphite server at %q...", s.addr) + if err := s.lnTCP.Close(); err != nil { + logger.Errorf("cannot close TCP Graphite server: %s", err) + } + logger.Infof("stopping UDP Graphite server at %q...", s.addr) + if err := s.lnUDP.Close(); err != nil { + logger.Errorf("cannot close UDP Graphite server: %s", err) + } + s.wg.Wait() + logger.Infof("TCP and UDP Graphite servers at %q have been stopped", s.addr) } func serveTCP(ln net.Listener) { @@ -60,6 +86,7 @@ func serveTCP(ln net.Listener) { if err != nil { if ne, ok := err.(net.Error); ok { if ne.Temporary() { + logger.Errorf("graphite: temporary error when listening for TCP addr %q: %s", ln.Addr(), err) time.Sleep(time.Second) continue } @@ -100,6 +127,7 @@ func serveUDP(ln net.PacketConn) { writeErrorsUDP.Inc() if ne, ok := err.(net.Error); ok { if ne.Temporary() { + logger.Errorf("graphite: temporary error when listening for UDP addr %q: %s", ln.LocalAddr(), err) time.Sleep(time.Second) continue } @@ -122,20 +150,3 @@ func serveUDP(ln net.PacketConn) { } wg.Wait() } - -var ( - listenerTCP net.Listener - listenerUDP net.PacketConn -) - -// Stop stops the server. -func Stop() { - logger.Infof("stopping TCP Graphite server at %q...", listenerTCP.Addr()) - if err := listenerTCP.Close(); err != nil { - logger.Errorf("cannot close TCP Graphite server: %s", err) - } - logger.Infof("stopping UDP Graphite server at %q...", listenerUDP.LocalAddr()) - if err := listenerUDP.Close(); err != nil { - logger.Errorf("cannot close UDP Graphite server: %s", err) - } -} diff --git a/app/vminsert/main.go b/app/vminsert/main.go index 22998068b..8cc558a8a 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -26,8 +26,10 @@ import ( ) var ( - graphiteListenAddr = flag.String("graphiteListenAddr", "", "TCP and UDP address to listen for Graphite plaintext data. Usually :2003 must be set. Doesn't work if empty") - opentsdbListenAddr = flag.String("opentsdbListenAddr", "", "TCP and UDP address to listen for OpentTSDB put messages. Usually :4242 must be set. Doesn't work if empty") + graphiteListenAddr = flag.String("graphiteListenAddr", "", "TCP and UDP address to listen for Graphite plaintext data. Usually :2003 must be set. Doesn't work if empty") + opentsdbListenAddr = flag.String("opentsdbListenAddr", "", "TCP and UDP address to listen for OpentTSDB metrics. "+ + "Telnet put messages and HTTP /api/put messages are simultaneously served on TCP port. "+ + "Usually :4242 must be set. Doesn't work if empty") opentsdbHTTPListenAddr = flag.String("opentsdbHTTPListenAddr", "", "TCP address to listen for OpentTSDB HTTP put requests. Usually :4242 must be set. Doesn't work if empty") httpListenAddr = flag.String("httpListenAddr", ":8480", "Address to listen for http connections") maxInsertRequestSize = flag.Int("maxInsertRequestSize", 32*1024*1024, "The maximum size of a single insert request in bytes") @@ -35,6 +37,12 @@ var ( storageNodes = flagutil.NewArray("storageNode", "Address of vmstorage nodes; usage: -storageNode=vmstorage-host1:8400 -storageNode=vmstorage-host2:8400") ) +var ( + graphiteServer *graphite.Server + opentsdbServer *opentsdb.Server + opentsdbhttpServer *opentsdbhttp.Server +) + func main() { flag.Parse() buildinfo.Init() @@ -52,13 +60,13 @@ func main() { concurrencylimiter.Init() if len(*graphiteListenAddr) > 0 { - go graphite.Serve(*graphiteListenAddr) + graphiteServer = graphite.MustStart(*graphiteListenAddr) } if len(*opentsdbListenAddr) > 0 { - go opentsdb.Serve(*opentsdbListenAddr) + opentsdbServer = opentsdb.MustStart(*opentsdbListenAddr, int64(*maxInsertRequestSize)) } if len(*opentsdbHTTPListenAddr) > 0 { - go opentsdbhttp.Serve(*opentsdbHTTPListenAddr, int64(*maxInsertRequestSize)) + opentsdbhttpServer = opentsdbhttp.MustStart(*opentsdbHTTPListenAddr, int64(*maxInsertRequestSize)) } go func() { @@ -76,13 +84,13 @@ func main() { logger.Infof("successfully shut down the service in %s", time.Since(startTime)) if len(*graphiteListenAddr) > 0 { - graphite.Stop() + graphiteServer.MustStop() } if len(*opentsdbListenAddr) > 0 { - opentsdb.Stop() + opentsdbServer.MustStop() } if len(*opentsdbHTTPListenAddr) > 0 { - opentsdbhttp.Stop() + opentsdbhttpServer.MustStop() } logger.Infof("shutting down neststorage...") diff --git a/app/vminsert/opentsdb/listener_switch.go b/app/vminsert/opentsdb/listener_switch.go new file mode 100644 index 000000000..bd4689587 --- /dev/null +++ b/app/vminsert/opentsdb/listener_switch.go @@ -0,0 +1,159 @@ +package opentsdb + +import ( + "io" + "net" + "sync" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" +) + +// listenerSwitch listens for incoming connections and multiplexes them to OpenTSDB http or telnet listeners +// depending on the first byte in the accepted connection. +// +// It is expected that both listeners - http and telnet consume incoming connections as soon as possible. +type listenerSwitch struct { + ln net.Listener + wg sync.WaitGroup + + telnetConnsCh chan net.Conn + httpConnsCh chan net.Conn + + closeLock sync.Mutex + closed bool + acceptErr error + closeErr error +} + +func newListenerSwitch(ln net.Listener) *listenerSwitch { + ls := &listenerSwitch{ + ln: ln, + } + ls.telnetConnsCh = make(chan net.Conn) + ls.httpConnsCh = make(chan net.Conn) + ls.wg.Add(1) + go func() { + ls.worker() + close(ls.telnetConnsCh) + close(ls.httpConnsCh) + ls.wg.Done() + }() + return ls +} + +func (ls *listenerSwitch) stop() error { + var err error + ls.closeLock.Lock() + if !ls.closed { + err = ls.ln.Close() + ls.closeErr = err + ls.closed = true + } + ls.closeLock.Unlock() + + if err == nil { + // Wait until worker detects the closed ls.ln and exits. + ls.wg.Wait() + } + return err +} + +func (ls *listenerSwitch) worker() { + var buf [1]byte + for { + c, err := ls.ln.Accept() + if err != nil { + if ne, ok := err.(net.Error); ok && ne.Temporary() { + logger.Infof("listenerSwitch: temporary error at %q: %s; sleeping for a second...", ls.ln.Addr(), err) + time.Sleep(time.Second) + continue + } + ls.closeLock.Lock() + ls.acceptErr = err + ls.closeLock.Unlock() + return + } + if _, err := io.ReadFull(c, buf[:]); err != nil { + logger.Errorf("listenerSwitch: cannot read one byte from the underlying connection for %q: %s", ls.ln.Addr(), err) + _ = c.Close() + continue + } + + // It is expected that both listeners - http and telnet consume incoming connections as soon as possible, + // so the below code shouldn't block for extended periods of time. + pc := &peekedConn{ + Conn: c, + firstChar: buf[0], + } + if buf[0] == 'p' { + // Assume the request starts with `put`. + ls.telnetConnsCh <- pc + } else { + // Assume the request starts with `POST`. + ls.httpConnsCh <- pc + } + } +} + +type peekedConn struct { + net.Conn + firstChar byte + firstCharRead bool +} + +func (pc *peekedConn) Read(p []byte) (int, error) { + // It is assumed that the pc cannot be read from concurrent goroutines. + if pc.firstCharRead { + // Fast path - first char already read. + return pc.Conn.Read(p) + } + + // Slow path - read the first char. + if len(p) == 0 { + return 0, nil + } + p[0] = pc.firstChar + pc.firstCharRead = true + n, err := pc.Conn.Read(p[1:]) + return n + 1, err +} + +func (ls *listenerSwitch) newTelnetListener() *chanListener { + return &chanListener{ + ls: ls, + ch: ls.telnetConnsCh, + } +} + +func (ls *listenerSwitch) newHTTPListener() *chanListener { + return &chanListener{ + ls: ls, + ch: ls.httpConnsCh, + } +} + +type chanListener struct { + ls *listenerSwitch + ch chan net.Conn +} + +func (cl *chanListener) Accept() (net.Conn, error) { + c, ok := <-cl.ch + if ok { + return c, nil + } + + cl.ls.closeLock.Lock() + err := cl.ls.acceptErr + cl.ls.closeLock.Unlock() + return nil, err +} + +func (cl *chanListener) Close() error { + return cl.ls.stop() +} + +func (cl *chanListener) Addr() net.Addr { + return cl.ls.ln.Addr() +} diff --git a/app/vminsert/opentsdb/server.go b/app/vminsert/opentsdb/server.go index 302c1fde2..252ef81ae 100644 --- a/app/vminsert/opentsdb/server.go +++ b/app/vminsert/opentsdb/server.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdbhttp" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -22,44 +23,91 @@ var ( writeErrorsUDP = metrics.NewCounter(`vm_opentsdb_request_errors_total{name="write", net="udp"}`) ) -// Serve starts OpenTSDB collector on the given addr. -func Serve(addr string) { +// Server is a server for collecting OpenTSDB TCP and UDP metrics. +// +// It accepts simultaneously Telnet put requests and HTTP put requests over TCP. +type Server struct { + addr string + ls *listenerSwitch + httpServer *opentsdbhttp.Server + lnUDP net.PacketConn + wg sync.WaitGroup +} + +// MustStart starts OpenTSDB collector on the given addr. +// +// MustStop must be called on the returned server when it is no longer needed. +func MustStart(addr string, maxRequestSize int64) *Server { logger.Infof("starting TCP OpenTSDB collector at %q", addr) lnTCP, err := netutil.NewTCPListener("opentsdb", addr) if err != nil { logger.Fatalf("cannot start TCP OpenTSDB collector at %q: %s", addr, err) } - listenerTCP = lnTCP + ls := newListenerSwitch(lnTCP) + lnHTTP := ls.newHTTPListener() + lnTelnet := ls.newTelnetListener() + httpServer := opentsdbhttp.MustServe(lnHTTP, maxRequestSize) logger.Infof("starting UDP OpenTSDB collector at %q", addr) lnUDP, err := net.ListenPacket("udp4", addr) if err != nil { logger.Fatalf("cannot start UDP OpenTSDB collector at %q: %s", addr, err) } - listenerUDP = lnUDP - var wg sync.WaitGroup - wg.Add(1) + s := &Server{ + addr: addr, + ls: ls, + httpServer: httpServer, + lnUDP: lnUDP, + } + s.wg.Add(1) go func() { - defer wg.Done() - serveTCP(listenerTCP) - logger.Infof("stopped TCP OpenTSDB collector at %q", addr) + defer s.wg.Done() + serveTelnet(lnTelnet) + logger.Infof("stopped TCP telnet OpenTSDB server at %q", addr) }() - wg.Add(1) + s.wg.Add(1) go func() { - defer wg.Done() - serveUDP(listenerUDP) - logger.Infof("stopped UDP OpenTSDB collector at %q", addr) + defer s.wg.Done() + httpServer.Wait() + // Do not log when httpServer is stopped, since this is logged by the server itself. }() - wg.Wait() + s.wg.Add(1) + go func() { + defer s.wg.Done() + serveUDP(lnUDP) + logger.Infof("stopped UDP OpenTSDB server at %q", addr) + }() + return s } -func serveTCP(ln net.Listener) { +// MustStop stops the server. +func (s *Server) MustStop() { + // Stop HTTP server. Do not emit log message, since it is emitted by the httpServer. + s.httpServer.MustStop() + + logger.Infof("stopping TCP telnet OpenTSDB server at %q...", s.addr) + if err := s.ls.stop(); err != nil { + logger.Errorf("cannot stop TCP telnet OpenTSDB server: %s", err) + } + + logger.Infof("stopping UDP OpenTSDB server at %q...", s.addr) + if err := s.lnUDP.Close(); err != nil { + logger.Errorf("cannot stop UDP OpenTSDB server: %s", err) + } + + // Wait until all the servers are stopped. + s.wg.Wait() + logger.Infof("TCP and UDP OpenTSDB servers at %q have been stopped", s.addr) +} + +func serveTelnet(ln net.Listener) { for { c, err := ln.Accept() if err != nil { if ne, ok := err.(net.Error); ok { if ne.Temporary() { + logger.Errorf("opentsdb: temporary error when listening for TCP addr %q: %s", ln.Addr(), err) time.Sleep(time.Second) continue } @@ -100,6 +148,7 @@ func serveUDP(ln net.PacketConn) { writeErrorsUDP.Inc() if ne, ok := err.(net.Error); ok { if ne.Temporary() { + logger.Errorf("opentsdb: temporary error when listening for UDP addr %q: %s", ln.LocalAddr(), err) time.Sleep(time.Second) continue } @@ -122,20 +171,3 @@ func serveUDP(ln net.PacketConn) { } wg.Wait() } - -var ( - listenerTCP net.Listener - listenerUDP net.PacketConn -) - -// Stop stops the server. -func Stop() { - logger.Infof("stopping TCP OpenTSDB server at %q...", listenerTCP.Addr()) - if err := listenerTCP.Close(); err != nil { - logger.Errorf("cannot close TCP OpenTSDB server: %s", err) - } - logger.Infof("stopping UDP OpenTSDB server at %q...", listenerUDP.LocalAddr()) - if err := listenerUDP.Close(); err != nil { - logger.Errorf("cannot close UDP OpenTSDB server: %s", err) - } -} diff --git a/app/vminsert/opentsdbhttp/server.go b/app/vminsert/opentsdbhttp/server.go index eed28a8ce..7828cd6aa 100644 --- a/app/vminsert/opentsdbhttp/server.go +++ b/app/vminsert/opentsdbhttp/server.go @@ -2,12 +2,15 @@ package opentsdbhttp import ( "context" + "net" "net/http" + "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/metrics" ) @@ -16,72 +19,99 @@ var ( writeErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/insert/{}/api/put", protocol="opentsdb-http"}`) ) -var ( - httpServer *http.Server - httpAddr string - maxRequestSize int64 -) +// Server represents HTTP OpenTSDB server. +type Server struct { + s *http.Server + ln net.Listener + wg sync.WaitGroup +} -// Serve starts HTTP OpenTSDB server on the given addr. -func Serve(addr string, maxReqSize int64) { +// MustStart starts HTTP OpenTSDB server on the given addr. +// +// MustStop must be called on the returned server when it is no longer needed. +func MustStart(addr string, maxRequestSize int64) *Server { logger.Infof("starting HTTP OpenTSDB server at %q", addr) - httpAddr = addr - maxRequestSize = maxReqSize - httpServer = &http.Server{ - Addr: addr, - Handler: http.HandlerFunc(requestHandler), + lnTCP, err := netutil.NewTCPListener("opentsdbhttp", addr) + if err != nil { + logger.Fatalf("cannot start HTTP OpenTSDB collector at %q: %s", addr, err) + } + return MustServe(lnTCP, maxRequestSize) +} + +// MustServe serves OpenTSDB HTTP put requests from ln with up to maxRequestSize size. +// +// MustStop must be called on the returned server when it is no longer needed. +func MustServe(ln net.Listener, maxRequestSize int64) *Server { + h := newRequestHandler(maxRequestSize) + hs := &http.Server{ + Handler: h, ReadTimeout: 30 * time.Second, WriteTimeout: 10 * time.Second, } + s := &Server{ + s: hs, + ln: ln, + } + s.wg.Add(1) go func() { - err := httpServer.ListenAndServe() + defer s.wg.Done() + err := s.s.Serve(s.ln) if err == http.ErrServerClosed { return } if err != nil { - logger.Fatalf("error serving HTTP OpenTSDB: %s", err) + logger.Fatalf("error serving HTTP OpenTSDB at %q: %s", s.ln.Addr(), err) } }() + return s } -// requestHandler handles HTTP OpenTSDB insert request. -func requestHandler(w http.ResponseWriter, r *http.Request) { - p, err := httpserver.ParsePath(r.URL.Path) - if err != nil { - httpserver.Errorf(w, "cannot parse path %q: %s", r.URL.Path, err) - return - } - if p.Prefix != "insert" { - // This is not our link. - httpserver.Errorf(w, "unexpected path requested on HTTP OpenTSDB server: %q", r.URL.Path) - return - } - at, err := auth.NewToken(p.AuthToken) - if err != nil { - httpserver.Errorf(w, "auth error: %s", err) - return - } - - switch p.Suffix { - case "api/put": - writeRequests.Inc() - if err := insertHandler(at, r, maxRequestSize); err != nil { - writeErrors.Inc() - httpserver.Errorf(w, "error in %q: %s", r.URL.Path, err) - return - } - w.WriteHeader(http.StatusNoContent) - default: - httpserver.Errorf(w, "unexpected path requested on HTTP OpenTSDB server: %q", r.URL.Path) - } +// Wait waits until the server is stopped with MustStop. +func (s *Server) Wait() { + s.wg.Wait() } -// Stop stops HTTP OpenTSDB server. -func Stop() { - logger.Infof("stopping HTTP OpenTSDB server at %q...", httpAddr) +// MustStop stops HTTP OpenTSDB server. +func (s *Server) MustStop() { + logger.Infof("stopping HTTP OpenTSDB server at %q...", s.ln.Addr()) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - if err := httpServer.Shutdown(ctx); err != nil { - logger.Fatalf("cannot close HTTP OpenTSDB server: %s", err) + if err := s.s.Shutdown(ctx); err != nil { + logger.Fatalf("cannot close HTTP OpenTSDB server at %q: %s", s.ln.Addr(), err) } + s.wg.Wait() + logger.Infof("OpenTSDB HTTP server at %q has been stopped", s.ln.Addr()) +} + +func newRequestHandler(maxRequestSize int64) http.Handler { + rh := func(w http.ResponseWriter, r *http.Request) { + p, err := httpserver.ParsePath(r.URL.Path) + if err != nil { + httpserver.Errorf(w, "cannot parse path %q: %s", r.URL.Path, err) + return + } + if p.Prefix != "insert" { + // This is not our link. + httpserver.Errorf(w, "unexpected path requested on HTTP OpenTSDB server: %q", r.URL.Path) + return + } + at, err := auth.NewToken(p.AuthToken) + if err != nil { + httpserver.Errorf(w, "auth error: %s", err) + return + } + switch p.Suffix { + case "api/put", "opentsdb/api/put": + writeRequests.Inc() + if err := insertHandler(at, r, maxRequestSize); err != nil { + writeErrors.Inc() + httpserver.Errorf(w, "error in %q: %s", r.URL.Path, err) + return + } + w.WriteHeader(http.StatusNoContent) + default: + httpserver.Errorf(w, "unexpected path requested on HTTP OpenTSDB server: %q", r.URL.Path) + } + } + return http.HandlerFunc(rh) } diff --git a/lib/netutil/tcplistener.go b/lib/netutil/tcplistener.go index 31d3ae40e..02f8dda9f 100644 --- a/lib/netutil/tcplistener.go +++ b/lib/netutil/tcplistener.go @@ -6,6 +6,7 @@ import ( "net" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/metrics" ) @@ -72,6 +73,8 @@ func (ln *TCPListener) Accept() (net.Conn, error) { ln.accepts.Inc() if err != nil { if ne, ok := err.(net.Error); ok && ne.Temporary() { + logger.Errorf("temporary error when listening for TCP addr %q: %s", ln.Addr(), err) + time.Sleep(time.Second) continue } ln.acceptErrors.Inc()