diff --git a/app/vminsert/graphite/server.go b/app/vminsert/graphite/server.go index 401d516d2..621e3c6e0 100644 --- a/app/vminsert/graphite/server.go +++ b/app/vminsert/graphite/server.go @@ -21,36 +21,62 @@ var ( writeErrorsUDP = metrics.NewCounter(`vm_graphite_request_errors_total{name="write", net="udp"}`) ) -// Serve starts graphite server on the given addr. -func Serve(addr string) { +// Server accepts Graphite plaintext lines over TCP and UDP. +type Server struct { + addr string + lnTCP net.Listener + lnUDP net.PacketConn + wg sync.WaitGroup +} + +// MustStart starts graphite server on the given addr. +// +// MustStop must be called on the returned server when it is no longer needed. +func MustStart(addr string) *Server { logger.Infof("starting TCP Graphite server at %q", addr) lnTCP, err := netutil.NewTCPListener("graphite", addr) if err != nil { logger.Fatalf("cannot start TCP Graphite server at %q: %s", addr, err) } - listenerTCP = lnTCP logger.Infof("starting UDP Graphite server at %q", addr) lnUDP, err := net.ListenPacket("udp4", addr) if err != nil { logger.Fatalf("cannot start UDP Graphite server at %q: %s", addr, err) } - listenerUDP = lnUDP - var wg sync.WaitGroup - wg.Add(1) + s := &Server{ + addr: addr, + lnTCP: lnTCP, + lnUDP: lnUDP, + } + s.wg.Add(1) go func() { - defer wg.Done() - serveTCP(listenerTCP) + defer s.wg.Done() + serveTCP(lnTCP) logger.Infof("stopped TCP Graphite server at %q", addr) }() - wg.Add(1) + s.wg.Add(1) go func() { - defer wg.Done() - serveUDP(listenerUDP) + defer s.wg.Done() + serveUDP(lnUDP) logger.Infof("stopped UDP Graphite server at %q", addr) }() - wg.Wait() + return s +} + +// MustStop stops the server. +func (s *Server) MustStop() { + logger.Infof("stopping TCP Graphite server at %q...", s.addr) + if err := s.lnTCP.Close(); err != nil { + logger.Errorf("cannot close TCP Graphite server: %s", err) + } + logger.Infof("stopping UDP Graphite server at %q...", s.addr) + if err := s.lnUDP.Close(); err != nil { + logger.Errorf("cannot close UDP Graphite server: %s", err) + } + s.wg.Wait() + logger.Infof("TCP and UDP Graphite servers at %q have been stopped", s.addr) } func serveTCP(ln net.Listener) { @@ -59,6 +85,7 @@ func serveTCP(ln net.Listener) { if err != nil { if ne, ok := err.(net.Error); ok { if ne.Temporary() { + logger.Errorf("graphite: temporary error when listening for TCP addr %q: %s", ln.Addr(), err) time.Sleep(time.Second) continue } @@ -97,6 +124,7 @@ func serveUDP(ln net.PacketConn) { writeErrorsUDP.Inc() if ne, ok := err.(net.Error); ok { if ne.Temporary() { + logger.Errorf("graphite: temporary error when listening for UDP addr %q: %s", ln.LocalAddr(), err) time.Sleep(time.Second) continue } @@ -119,20 +147,3 @@ func serveUDP(ln net.PacketConn) { } wg.Wait() } - -var ( - listenerTCP net.Listener - listenerUDP net.PacketConn -) - -// Stop stops the server. -func Stop() { - logger.Infof("stopping TCP Graphite server at %q...", listenerTCP.Addr()) - if err := listenerTCP.Close(); err != nil { - logger.Errorf("cannot close TCP Graphite server: %s", err) - } - logger.Infof("stopping UDP Graphite server at %q...", listenerUDP.LocalAddr()) - if err := listenerUDP.Close(); err != nil { - logger.Errorf("cannot close UDP Graphite server: %s", err) - } -} diff --git a/app/vminsert/main.go b/app/vminsert/main.go index e16813df4..a724f7a0a 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -19,39 +19,47 @@ import ( ) var ( - graphiteListenAddr = flag.String("graphiteListenAddr", "", "TCP and UDP address to listen for Graphite plaintext data. Usually :2003 must be set. Doesn't work if empty") - opentsdbListenAddr = flag.String("opentsdbListenAddr", "", "TCP and UDP address to listen for OpentTSDB put messages. Usually :4242 must be set. Doesn't work if empty") + graphiteListenAddr = flag.String("graphiteListenAddr", "", "TCP and UDP address to listen for Graphite plaintext data. Usually :2003 must be set. Doesn't work if empty") + opentsdbListenAddr = flag.String("opentsdbListenAddr", "", "TCP and UDP address to listen for OpentTSDB metrics. "+ + "Telnet put messages and HTTP /api/put messages are simultaneously served on TCP port. "+ + "Usually :4242 must be set. Doesn't work if empty") opentsdbHTTPListenAddr = flag.String("opentsdbHTTPListenAddr", "", "TCP address to listen for OpentTSDB HTTP put requests. Usually :4242 must be set. Doesn't work if empty") maxInsertRequestSize = flag.Int("maxInsertRequestSize", 32*1024*1024, "The maximum size of a single insert request in bytes") maxLabelsPerTimeseries = flag.Int("maxLabelsPerTimeseries", 30, "The maximum number of labels accepted per time series. Superflouos labels are dropped") ) +var ( + graphiteServer *graphite.Server + opentsdbServer *opentsdb.Server + opentsdbhttpServer *opentsdbhttp.Server +) + // Init initializes vminsert. func Init() { storage.SetMaxLabelsPerTimeseries(*maxLabelsPerTimeseries) concurrencylimiter.Init() if len(*graphiteListenAddr) > 0 { - go graphite.Serve(*graphiteListenAddr) + graphiteServer = graphite.MustStart(*graphiteListenAddr) } if len(*opentsdbListenAddr) > 0 { - go opentsdb.Serve(*opentsdbListenAddr) + opentsdbServer = opentsdb.MustStart(*opentsdbListenAddr, int64(*maxInsertRequestSize)) } if len(*opentsdbHTTPListenAddr) > 0 { - go opentsdbhttp.Serve(*opentsdbHTTPListenAddr, int64(*maxInsertRequestSize)) + opentsdbhttpServer = opentsdbhttp.MustStart(*opentsdbHTTPListenAddr, int64(*maxInsertRequestSize)) } } // Stop stops vminsert. func Stop() { if len(*graphiteListenAddr) > 0 { - graphite.Stop() + graphiteServer.MustStop() } if len(*opentsdbListenAddr) > 0 { - opentsdb.Stop() + opentsdbServer.MustStop() } if len(*opentsdbHTTPListenAddr) > 0 { - opentsdbhttp.Stop() + opentsdbhttpServer.MustStop() } } diff --git a/app/vminsert/opentsdb/listener_switch.go b/app/vminsert/opentsdb/listener_switch.go new file mode 100644 index 000000000..bd4689587 --- /dev/null +++ b/app/vminsert/opentsdb/listener_switch.go @@ -0,0 +1,159 @@ +package opentsdb + +import ( + "io" + "net" + "sync" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" +) + +// listenerSwitch listens for incoming connections and multiplexes them to OpenTSDB http or telnet listeners +// depending on the first byte in the accepted connection. +// +// It is expected that both listeners - http and telnet consume incoming connections as soon as possible. +type listenerSwitch struct { + ln net.Listener + wg sync.WaitGroup + + telnetConnsCh chan net.Conn + httpConnsCh chan net.Conn + + closeLock sync.Mutex + closed bool + acceptErr error + closeErr error +} + +func newListenerSwitch(ln net.Listener) *listenerSwitch { + ls := &listenerSwitch{ + ln: ln, + } + ls.telnetConnsCh = make(chan net.Conn) + ls.httpConnsCh = make(chan net.Conn) + ls.wg.Add(1) + go func() { + ls.worker() + close(ls.telnetConnsCh) + close(ls.httpConnsCh) + ls.wg.Done() + }() + return ls +} + +func (ls *listenerSwitch) stop() error { + var err error + ls.closeLock.Lock() + if !ls.closed { + err = ls.ln.Close() + ls.closeErr = err + ls.closed = true + } + ls.closeLock.Unlock() + + if err == nil { + // Wait until worker detects the closed ls.ln and exits. + ls.wg.Wait() + } + return err +} + +func (ls *listenerSwitch) worker() { + var buf [1]byte + for { + c, err := ls.ln.Accept() + if err != nil { + if ne, ok := err.(net.Error); ok && ne.Temporary() { + logger.Infof("listenerSwitch: temporary error at %q: %s; sleeping for a second...", ls.ln.Addr(), err) + time.Sleep(time.Second) + continue + } + ls.closeLock.Lock() + ls.acceptErr = err + ls.closeLock.Unlock() + return + } + if _, err := io.ReadFull(c, buf[:]); err != nil { + logger.Errorf("listenerSwitch: cannot read one byte from the underlying connection for %q: %s", ls.ln.Addr(), err) + _ = c.Close() + continue + } + + // It is expected that both listeners - http and telnet consume incoming connections as soon as possible, + // so the below code shouldn't block for extended periods of time. + pc := &peekedConn{ + Conn: c, + firstChar: buf[0], + } + if buf[0] == 'p' { + // Assume the request starts with `put`. + ls.telnetConnsCh <- pc + } else { + // Assume the request starts with `POST`. + ls.httpConnsCh <- pc + } + } +} + +type peekedConn struct { + net.Conn + firstChar byte + firstCharRead bool +} + +func (pc *peekedConn) Read(p []byte) (int, error) { + // It is assumed that the pc cannot be read from concurrent goroutines. + if pc.firstCharRead { + // Fast path - first char already read. + return pc.Conn.Read(p) + } + + // Slow path - read the first char. + if len(p) == 0 { + return 0, nil + } + p[0] = pc.firstChar + pc.firstCharRead = true + n, err := pc.Conn.Read(p[1:]) + return n + 1, err +} + +func (ls *listenerSwitch) newTelnetListener() *chanListener { + return &chanListener{ + ls: ls, + ch: ls.telnetConnsCh, + } +} + +func (ls *listenerSwitch) newHTTPListener() *chanListener { + return &chanListener{ + ls: ls, + ch: ls.httpConnsCh, + } +} + +type chanListener struct { + ls *listenerSwitch + ch chan net.Conn +} + +func (cl *chanListener) Accept() (net.Conn, error) { + c, ok := <-cl.ch + if ok { + return c, nil + } + + cl.ls.closeLock.Lock() + err := cl.ls.acceptErr + cl.ls.closeLock.Unlock() + return nil, err +} + +func (cl *chanListener) Close() error { + return cl.ls.stop() +} + +func (cl *chanListener) Addr() net.Addr { + return cl.ls.ln.Addr() +} diff --git a/app/vminsert/opentsdb/server.go b/app/vminsert/opentsdb/server.go index 229e09a35..ce6d14367 100644 --- a/app/vminsert/opentsdb/server.go +++ b/app/vminsert/opentsdb/server.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdbhttp" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" @@ -21,44 +22,91 @@ var ( writeErrorsUDP = metrics.NewCounter(`vm_opentsdb_request_errors_total{name="write", net="udp"}`) ) -// Serve starts OpenTSDB collector on the given addr. -func Serve(addr string) { +// Server is a server for collecting OpenTSDB TCP and UDP metrics. +// +// It accepts simultaneously Telnet put requests and HTTP put requests over TCP. +type Server struct { + addr string + ls *listenerSwitch + httpServer *opentsdbhttp.Server + lnUDP net.PacketConn + wg sync.WaitGroup +} + +// MustStart starts OpenTSDB collector on the given addr. +// +// MustStop must be called on the returned server when it is no longer needed. +func MustStart(addr string, maxRequestSize int64) *Server { logger.Infof("starting TCP OpenTSDB collector at %q", addr) lnTCP, err := netutil.NewTCPListener("opentsdb", addr) if err != nil { logger.Fatalf("cannot start TCP OpenTSDB collector at %q: %s", addr, err) } - listenerTCP = lnTCP + ls := newListenerSwitch(lnTCP) + lnHTTP := ls.newHTTPListener() + lnTelnet := ls.newTelnetListener() + httpServer := opentsdbhttp.MustServe(lnHTTP, maxRequestSize) logger.Infof("starting UDP OpenTSDB collector at %q", addr) lnUDP, err := net.ListenPacket("udp4", addr) if err != nil { logger.Fatalf("cannot start UDP OpenTSDB collector at %q: %s", addr, err) } - listenerUDP = lnUDP - var wg sync.WaitGroup - wg.Add(1) + s := &Server{ + addr: addr, + ls: ls, + httpServer: httpServer, + lnUDP: lnUDP, + } + s.wg.Add(1) go func() { - defer wg.Done() - serveTCP(listenerTCP) - logger.Infof("stopped TCP OpenTSDB collector at %q", addr) + defer s.wg.Done() + serveTelnet(lnTelnet) + logger.Infof("stopped TCP telnet OpenTSDB server at %q", addr) }() - wg.Add(1) + s.wg.Add(1) go func() { - defer wg.Done() - serveUDP(listenerUDP) - logger.Infof("stopped UDP OpenTSDB collector at %q", addr) + defer s.wg.Done() + httpServer.Wait() + // Do not log when httpServer is stopped, since this is logged by the server itself. }() - wg.Wait() + s.wg.Add(1) + go func() { + defer s.wg.Done() + serveUDP(lnUDP) + logger.Infof("stopped UDP OpenTSDB server at %q", addr) + }() + return s } -func serveTCP(ln net.Listener) { +// MustStop stops the server. +func (s *Server) MustStop() { + // Stop HTTP server. Do not emit log message, since it is emitted by the httpServer. + s.httpServer.MustStop() + + logger.Infof("stopping TCP telnet OpenTSDB server at %q...", s.addr) + if err := s.ls.stop(); err != nil { + logger.Errorf("cannot stop TCP telnet OpenTSDB server: %s", err) + } + + logger.Infof("stopping UDP OpenTSDB server at %q...", s.addr) + if err := s.lnUDP.Close(); err != nil { + logger.Errorf("cannot stop UDP OpenTSDB server: %s", err) + } + + // Wait until all the servers are stopped. + s.wg.Wait() + logger.Infof("TCP and UDP OpenTSDB servers at %q have been stopped", s.addr) +} + +func serveTelnet(ln net.Listener) { for { c, err := ln.Accept() if err != nil { if ne, ok := err.(net.Error); ok { if ne.Temporary() { + logger.Errorf("opentsdb: temporary error when listening for TCP addr %q: %s", ln.Addr(), err) time.Sleep(time.Second) continue } @@ -97,6 +145,7 @@ func serveUDP(ln net.PacketConn) { writeErrorsUDP.Inc() if ne, ok := err.(net.Error); ok { if ne.Temporary() { + logger.Errorf("opentsdb: temporary error when listening for UDP addr %q: %s", ln.LocalAddr(), err) time.Sleep(time.Second) continue } @@ -119,20 +168,3 @@ func serveUDP(ln net.PacketConn) { } wg.Wait() } - -var ( - listenerTCP net.Listener - listenerUDP net.PacketConn -) - -// Stop stops the server. -func Stop() { - logger.Infof("stopping TCP OpenTSDB server at %q...", listenerTCP.Addr()) - if err := listenerTCP.Close(); err != nil { - logger.Errorf("cannot close TCP OpenTSDB server: %s", err) - } - logger.Infof("stopping UDP OpenTSDB server at %q...", listenerUDP.LocalAddr()) - if err := listenerUDP.Close(); err != nil { - logger.Errorf("cannot close UDP OpenTSDB server: %s", err) - } -} diff --git a/app/vminsert/opentsdbhttp/server.go b/app/vminsert/opentsdbhttp/server.go index 6652d21fe..cb882838a 100644 --- a/app/vminsert/opentsdbhttp/server.go +++ b/app/vminsert/opentsdbhttp/server.go @@ -2,11 +2,14 @@ package opentsdbhttp import ( "context" + "net" "net/http" + "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/metrics" ) @@ -15,56 +18,84 @@ var ( writeErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/put", protocol="opentsdb-http"}`) ) -var ( - httpServer *http.Server - httpAddr string - maxRequestSize int64 -) +// Server represents HTTP OpenTSDB server. +type Server struct { + s *http.Server + ln net.Listener + wg sync.WaitGroup +} -// Serve starts HTTP OpenTSDB server on the given addr. -func Serve(addr string, maxReqSize int64) { +// MustStart starts HTTP OpenTSDB server on the given addr. +// +// MustStop must be called on the returned server when it is no longer needed. +func MustStart(addr string, maxRequestSize int64) *Server { logger.Infof("starting HTTP OpenTSDB server at %q", addr) - httpAddr = addr - maxRequestSize = maxReqSize - httpServer = &http.Server{ - Addr: addr, - Handler: http.HandlerFunc(requestHandler), + lnTCP, err := netutil.NewTCPListener("opentsdbhttp", addr) + if err != nil { + logger.Fatalf("cannot start HTTP OpenTSDB collector at %q: %s", addr, err) + } + return MustServe(lnTCP, maxRequestSize) +} + +// MustServe serves OpenTSDB HTTP put requests from ln with up to maxRequestSize size. +// +// MustStop must be called on the returned server when it is no longer needed. +func MustServe(ln net.Listener, maxRequestSize int64) *Server { + h := newRequestHandler(maxRequestSize) + hs := &http.Server{ + Handler: h, ReadTimeout: 30 * time.Second, WriteTimeout: 10 * time.Second, } + s := &Server{ + s: hs, + ln: ln, + } + s.wg.Add(1) go func() { - err := httpServer.ListenAndServe() + defer s.wg.Done() + err := s.s.Serve(s.ln) if err == http.ErrServerClosed { return } if err != nil { - logger.Fatalf("error serving HTTP OpenTSDB: %s", err) + logger.Fatalf("error serving HTTP OpenTSDB at %q: %s", s.ln.Addr(), err) } }() + return s } -// requestHandler handles HTTP OpenTSDB insert request. -func requestHandler(w http.ResponseWriter, r *http.Request) { - switch r.URL.Path { - case "/api/put": - writeRequests.Inc() - if err := insertHandler(r, maxRequestSize); err != nil { - writeErrors.Inc() - httpserver.Errorf(w, "error in %q: %s", r.URL.Path, err) - return - } - w.WriteHeader(http.StatusNoContent) - default: - httpserver.Errorf(w, "unexpected path requested on HTTP OpenTSDB server: %q", r.URL.Path) - } +// Wait waits until the server is stopped with MustStop. +func (s *Server) Wait() { + s.wg.Wait() } -// Stop stops HTTP OpenTSDB server. -func Stop() { - logger.Infof("stopping HTTP OpenTSDB server at %q...", httpAddr) +// MustStop stops HTTP OpenTSDB server. +func (s *Server) MustStop() { + logger.Infof("stopping HTTP OpenTSDB server at %q...", s.ln.Addr()) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - if err := httpServer.Shutdown(ctx); err != nil { - logger.Fatalf("cannot close HTTP OpenTSDB server: %s", err) + if err := s.s.Shutdown(ctx); err != nil { + logger.Fatalf("cannot close HTTP OpenTSDB server at %q: %s", s.ln.Addr(), err) } + s.wg.Wait() + logger.Infof("OpenTSDB HTTP server at %q has been stopped", s.ln.Addr()) +} + +func newRequestHandler(maxRequestSize int64) http.Handler { + rh := func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/api/put": + writeRequests.Inc() + if err := insertHandler(r, maxRequestSize); err != nil { + writeErrors.Inc() + httpserver.Errorf(w, "error in %q: %s", r.URL.Path, err) + return + } + w.WriteHeader(http.StatusNoContent) + default: + httpserver.Errorf(w, "unexpected path requested on HTTP OpenTSDB server: %q", r.URL.Path) + } + } + return http.HandlerFunc(rh) } diff --git a/lib/netutil/tcplistener.go b/lib/netutil/tcplistener.go index 31d3ae40e..02f8dda9f 100644 --- a/lib/netutil/tcplistener.go +++ b/lib/netutil/tcplistener.go @@ -6,6 +6,7 @@ import ( "net" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/metrics" ) @@ -72,6 +73,8 @@ func (ln *TCPListener) Accept() (net.Conn, error) { ln.accepts.Inc() if err != nil { if ne, ok := err.(net.Error); ok && ne.Temporary() { + logger.Errorf("temporary error when listening for TCP addr %q: %s", ln.Addr(), err) + time.Sleep(time.Second) continue } ln.acceptErrors.Inc()