From 7aea5f58c42815dd555ffb54f21b696b6b69b983 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 8 May 2021 19:36:00 +0300 Subject: [PATCH] lib/ingestserver: properly close incoming connections during graceful shutdown --- docs/CHANGELOG.md | 1 + lib/ingestserver/conns_map.go | 47 ++++++++++++++++++++++++ lib/ingestserver/graphite/server.go | 35 +++++++++++++----- lib/ingestserver/influx/server.go | 35 +++++++++++++----- lib/ingestserver/opentsdb/server.go | 33 ++++++++++++----- lib/protoparser/common/unmarshal_work.go | 2 +- 6 files changed, 122 insertions(+), 31 deletions(-) create mode 100644 lib/ingestserver/conns_map.go diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index a1ca57c28..e679800a2 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -10,6 +10,7 @@ sort: 15 * BUGFIX: vmagent: fix possible race when refreshing `role: endpoints` and `role: endpointslices` scrape targets in `kubernetes_sd_config`. Prevoiusly `pod` objects could be updated after the related `endpoints` object update. This could lead to missing scrape targets. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240). * BUGFIX: properly remove stale parts outside the configured retention if `-retentionPeriod` is smaller than one month. Previously stale parts could remain active for up to a month after they go outside the retention. * BUGFIX: stop the process on panic errors, since such errors may leave the process in inconsistent state. Previously panics could be recovered, which could result in unexpected hard-to-debug further behavior of running process. +* BUGFIX: vminsert, vmagent: make sure data ingestion connections are closed before completing graceful shutdown. Previously the connection may remain open, which could result in trailing samples loss. ## tip diff --git a/lib/ingestserver/conns_map.go b/lib/ingestserver/conns_map.go new file mode 100644 index 000000000..8da7d1239 --- /dev/null +++ b/lib/ingestserver/conns_map.go @@ -0,0 +1,47 @@ +package ingestserver + +import ( + "net" + "sync" +) + +// ConnsMap is used for tracking active connections. +type ConnsMap struct { + mu sync.Mutex + m map[net.Conn]struct{} + isClosed bool +} + +// Init initializes cm. +func (cm *ConnsMap) Init() { + cm.m = make(map[net.Conn]struct{}) + cm.isClosed = false +} + +// Add adds c to cm. +func (cm *ConnsMap) Add(c net.Conn) bool { + cm.mu.Lock() + ok := !cm.isClosed + if ok { + cm.m[c] = struct{}{} + } + cm.mu.Unlock() + return ok +} + +// Delete deletes c from cm. +func (cm *ConnsMap) Delete(c net.Conn) { + cm.mu.Lock() + delete(cm.m, c) + cm.mu.Unlock() +} + +// CloseAll closes all the added conns. +func (cm *ConnsMap) CloseAll() { + cm.mu.Lock() + for c := range cm.m { + _ = c.Close() + } + cm.isClosed = true + cm.mu.Unlock() +} diff --git a/lib/ingestserver/graphite/server.go b/lib/ingestserver/graphite/server.go index 90c0a055e..bdc949f0c 100644 --- a/lib/ingestserver/graphite/server.go +++ b/lib/ingestserver/graphite/server.go @@ -10,6 +10,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/metrics" @@ -29,6 +30,7 @@ type Server struct { lnTCP net.Listener lnUDP net.PacketConn wg sync.WaitGroup + cm ingestserver.ConnsMap } // MustStart starts graphite server on the given addr. @@ -54,16 +56,17 @@ func MustStart(addr string, insertHandler func(r io.Reader) error) *Server { lnTCP: lnTCP, lnUDP: lnUDP, } + s.cm.Init() s.wg.Add(1) go func() { defer s.wg.Done() - serveTCP(lnTCP, insertHandler) + s.serveTCP(insertHandler) logger.Infof("stopped TCP Graphite server at %q", addr) }() s.wg.Add(1) go func() { defer s.wg.Done() - serveUDP(lnUDP, insertHandler) + s.serveUDP(insertHandler) logger.Infof("stopped UDP Graphite server at %q", addr) }() return s @@ -79,18 +82,20 @@ func (s *Server) MustStop() { if err := s.lnUDP.Close(); err != nil { logger.Errorf("cannot close UDP Graphite server: %s", err) } + s.cm.CloseAll() s.wg.Wait() logger.Infof("TCP and UDP Graphite servers at %q have been stopped", s.addr) } -func serveTCP(ln net.Listener, insertHandler func(r io.Reader) error) { +func (s *Server) serveTCP(insertHandler func(r io.Reader) error) { + var wg sync.WaitGroup for { - c, err := ln.Accept() + c, err := s.lnTCP.Accept() if err != nil { var ne net.Error if errors.As(err, &ne) { if ne.Temporary() { - logger.Errorf("graphite: temporary error when listening for TCP addr %q: %s", ln.Addr(), err) + logger.Errorf("graphite: temporary error when listening for TCP addr %q: %s", s.lnTCP.Addr(), err) time.Sleep(time.Second) continue } @@ -101,18 +106,28 @@ func serveTCP(ln net.Listener, insertHandler func(r io.Reader) error) { } logger.Fatalf("unexpected error when accepting TCP Graphite connections: %s", err) } + if !s.cm.Add(c) { + _ = c.Close() + break + } + wg.Add(1) go func() { + defer func() { + s.cm.Delete(c) + _ = c.Close() + wg.Done() + }() writeRequestsTCP.Inc() if err := insertHandler(c); err != nil { writeErrorsTCP.Inc() logger.Errorf("error in TCP Graphite conn %q<->%q: %s", c.LocalAddr(), c.RemoteAddr(), err) } - _ = c.Close() }() } + wg.Wait() } -func serveUDP(ln net.PacketConn, insertHandler func(r io.Reader) error) { +func (s *Server) serveUDP(insertHandler func(r io.Reader) error) { gomaxprocs := cgroup.AvailableCPUs() var wg sync.WaitGroup for i := 0; i < gomaxprocs; i++ { @@ -124,13 +139,13 @@ func serveUDP(ln net.PacketConn, insertHandler func(r io.Reader) error) { for { bb.Reset() bb.B = bb.B[:cap(bb.B)] - n, addr, err := ln.ReadFrom(bb.B) + n, addr, err := s.lnUDP.ReadFrom(bb.B) if err != nil { writeErrorsUDP.Inc() var ne net.Error if errors.As(err, &ne) { if ne.Temporary() { - logger.Errorf("graphite: temporary error when listening for UDP addr %q: %s", ln.LocalAddr(), err) + logger.Errorf("graphite: temporary error when listening for UDP addr %q: %s", s.lnUDP.LocalAddr(), err) time.Sleep(time.Second) continue } @@ -145,7 +160,7 @@ func serveUDP(ln net.PacketConn, insertHandler func(r io.Reader) error) { writeRequestsUDP.Inc() if err := insertHandler(bb.NewReader()); err != nil { writeErrorsUDP.Inc() - logger.Errorf("error in UDP Graphite conn %q<->%q: %s", ln.LocalAddr(), addr, err) + logger.Errorf("error in UDP Graphite conn %q<->%q: %s", s.lnUDP.LocalAddr(), addr, err) continue } } diff --git a/lib/ingestserver/influx/server.go b/lib/ingestserver/influx/server.go index 48ead76a9..8a14f2b90 100644 --- a/lib/ingestserver/influx/server.go +++ b/lib/ingestserver/influx/server.go @@ -10,6 +10,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/metrics" @@ -29,6 +30,7 @@ type Server struct { lnTCP net.Listener lnUDP net.PacketConn wg sync.WaitGroup + cm ingestserver.ConnsMap } // MustStart starts Influx server on the given addr. @@ -54,16 +56,17 @@ func MustStart(addr string, insertHandler func(r io.Reader) error) *Server { lnTCP: lnTCP, lnUDP: lnUDP, } + s.cm.Init() s.wg.Add(1) go func() { defer s.wg.Done() - serveTCP(lnTCP, insertHandler) + s.serveTCP(insertHandler) logger.Infof("stopped TCP Influx server at %q", addr) }() s.wg.Add(1) go func() { defer s.wg.Done() - serveUDP(lnUDP, insertHandler) + s.serveUDP(insertHandler) logger.Infof("stopped UDP Influx server at %q", addr) }() return s @@ -79,18 +82,20 @@ func (s *Server) MustStop() { if err := s.lnUDP.Close(); err != nil { logger.Errorf("cannot close UDP Influx server: %s", err) } + s.cm.CloseAll() s.wg.Wait() logger.Infof("TCP and UDP Influx servers at %q have been stopped", s.addr) } -func serveTCP(ln net.Listener, insertHandler func(r io.Reader) error) { +func (s *Server) serveTCP(insertHandler func(r io.Reader) error) { + var wg sync.WaitGroup for { - c, err := ln.Accept() + c, err := s.lnTCP.Accept() if err != nil { var ne net.Error if errors.As(err, &ne) { if ne.Temporary() { - logger.Errorf("influx: temporary error when listening for TCP addr %q: %s", ln.Addr(), err) + logger.Errorf("influx: temporary error when listening for TCP addr %q: %s", s.lnTCP.Addr(), err) time.Sleep(time.Second) continue } @@ -101,18 +106,28 @@ func serveTCP(ln net.Listener, insertHandler func(r io.Reader) error) { } logger.Fatalf("unexpected error when accepting TCP Influx connections: %s", err) } + if !s.cm.Add(c) { + _ = c.Close() + break + } + wg.Add(1) go func() { + defer func() { + s.cm.Delete(c) + _ = c.Close() + wg.Done() + }() writeRequestsTCP.Inc() if err := insertHandler(c); err != nil { writeErrorsTCP.Inc() logger.Errorf("error in TCP Influx conn %q<->%q: %s", c.LocalAddr(), c.RemoteAddr(), err) } - _ = c.Close() }() } + wg.Wait() } -func serveUDP(ln net.PacketConn, insertHandler func(r io.Reader) error) { +func (s *Server) serveUDP(insertHandler func(r io.Reader) error) { gomaxprocs := cgroup.AvailableCPUs() var wg sync.WaitGroup for i := 0; i < gomaxprocs; i++ { @@ -124,13 +139,13 @@ func serveUDP(ln net.PacketConn, insertHandler func(r io.Reader) error) { for { bb.Reset() bb.B = bb.B[:cap(bb.B)] - n, addr, err := ln.ReadFrom(bb.B) + n, addr, err := s.lnUDP.ReadFrom(bb.B) if err != nil { writeErrorsUDP.Inc() var ne net.Error if errors.As(err, &ne) { if ne.Temporary() { - logger.Errorf("influx: temporary error when listening for UDP addr %q: %s", ln.LocalAddr(), err) + logger.Errorf("influx: temporary error when listening for UDP addr %q: %s", s.lnUDP.LocalAddr(), err) time.Sleep(time.Second) continue } @@ -145,7 +160,7 @@ func serveUDP(ln net.PacketConn, insertHandler func(r io.Reader) error) { writeRequestsUDP.Inc() if err := insertHandler(bb.NewReader()); err != nil { writeErrorsUDP.Inc() - logger.Errorf("error in UDP Influx conn %q<->%q: %s", ln.LocalAddr(), addr, err) + logger.Errorf("error in UDP Influx conn %q<->%q: %s", s.lnUDP.LocalAddr(), addr, err) continue } } diff --git a/lib/ingestserver/opentsdb/server.go b/lib/ingestserver/opentsdb/server.go index 178f3ddb6..a16267a66 100644 --- a/lib/ingestserver/opentsdb/server.go +++ b/lib/ingestserver/opentsdb/server.go @@ -11,6 +11,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdbhttp" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" @@ -34,6 +35,7 @@ type Server struct { httpServer *opentsdbhttp.Server lnUDP net.PacketConn wg sync.WaitGroup + cm ingestserver.ConnsMap } // MustStart starts OpenTSDB collector on the given addr. @@ -62,10 +64,11 @@ func MustStart(addr string, telnetInsertHandler func(r io.Reader) error, httpIns httpServer: httpServer, lnUDP: lnUDP, } + s.cm.Init() s.wg.Add(1) go func() { defer s.wg.Done() - serveTelnet(lnTelnet, telnetInsertHandler) + s.serveTelnet(lnTelnet, telnetInsertHandler) logger.Infof("stopped TCP telnet OpenTSDB server at %q", addr) }() s.wg.Add(1) @@ -77,7 +80,7 @@ func MustStart(addr string, telnetInsertHandler func(r io.Reader) error, httpIns s.wg.Add(1) go func() { defer s.wg.Done() - serveUDP(lnUDP, telnetInsertHandler) + s.serveUDP(telnetInsertHandler) logger.Infof("stopped UDP OpenTSDB server at %q", addr) }() return s @@ -97,13 +100,13 @@ func (s *Server) MustStop() { if err := s.lnUDP.Close(); err != nil { logger.Errorf("cannot stop UDP OpenTSDB server: %s", err) } - - // Wait until all the servers are stopped. + s.cm.CloseAll() s.wg.Wait() logger.Infof("TCP and UDP OpenTSDB servers at %q have been stopped", s.addr) } -func serveTelnet(ln net.Listener, insertHandler func(r io.Reader) error) { +func (s *Server) serveTelnet(ln net.Listener, insertHandler func(r io.Reader) error) { + var wg sync.WaitGroup for { c, err := ln.Accept() if err != nil { @@ -121,18 +124,28 @@ func serveTelnet(ln net.Listener, insertHandler func(r io.Reader) error) { } logger.Fatalf("unexpected error when accepting TCP OpenTSDB connections: %s", err) } + if !s.cm.Add(c) { + _ = c.Close() + break + } + wg.Add(1) go func() { + defer func() { + s.cm.Delete(c) + _ = c.Close() + wg.Done() + }() writeRequestsTCP.Inc() if err := insertHandler(c); err != nil { writeErrorsTCP.Inc() logger.Errorf("error in TCP OpenTSDB conn %q<->%q: %s", c.LocalAddr(), c.RemoteAddr(), err) } - _ = c.Close() }() } + wg.Wait() } -func serveUDP(ln net.PacketConn, insertHandler func(r io.Reader) error) { +func (s *Server) serveUDP(insertHandler func(r io.Reader) error) { gomaxprocs := cgroup.AvailableCPUs() var wg sync.WaitGroup for i := 0; i < gomaxprocs; i++ { @@ -144,13 +157,13 @@ func serveUDP(ln net.PacketConn, insertHandler func(r io.Reader) error) { for { bb.Reset() bb.B = bb.B[:cap(bb.B)] - n, addr, err := ln.ReadFrom(bb.B) + n, addr, err := s.lnUDP.ReadFrom(bb.B) if err != nil { writeErrorsUDP.Inc() var ne net.Error if errors.As(err, &ne) { if ne.Temporary() { - logger.Errorf("opentsdb: temporary error when listening for UDP addr %q: %s", ln.LocalAddr(), err) + logger.Errorf("opentsdb: temporary error when listening for UDP addr %q: %s", s.lnUDP.LocalAddr(), err) time.Sleep(time.Second) continue } @@ -165,7 +178,7 @@ func serveUDP(ln net.PacketConn, insertHandler func(r io.Reader) error) { writeRequestsUDP.Inc() if err := insertHandler(bb.NewReader()); err != nil { writeErrorsUDP.Inc() - logger.Errorf("error in UDP OpenTSDB conn %q<->%q: %s", ln.LocalAddr(), addr, err) + logger.Errorf("error in UDP OpenTSDB conn %q<->%q: %s", s.lnUDP.LocalAddr(), addr, err) continue } } diff --git a/lib/protoparser/common/unmarshal_work.go b/lib/protoparser/common/unmarshal_work.go index 55758e933..773489a41 100644 --- a/lib/protoparser/common/unmarshal_work.go +++ b/lib/protoparser/common/unmarshal_work.go @@ -40,7 +40,7 @@ func StartUnmarshalWorkers() { // StopUnmarshalWorkers stops unmarshal workers. // -// No more calles to ScheduleUnmarshalWork are allowed after callsing stopUnmarshalWorkers +// No more calles to ScheduleUnmarshalWork are allowed after calling stopUnmarshalWorkers func StopUnmarshalWorkers() { close(unmarshalWorkCh) unmarshalWorkersWG.Wait()