diff --git a/app/vmstorage/transport/server.go b/app/vmstorage/transport/server.go index 5a39f04dd2..3b9d5cdeca 100644 --- a/app/vmstorage/transport/server.go +++ b/app/vmstorage/transport/server.go @@ -16,6 +16,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/clusternative" @@ -51,44 +52,8 @@ type Server struct { vminsertWG sync.WaitGroup vmselectWG sync.WaitGroup - vminsertConnsMap connsMap - vmselectConnsMap connsMap -} - -type connsMap struct { - mu sync.Mutex - m map[net.Conn]struct{} - isClosed bool -} - -func (cm *connsMap) Init() { - cm.m = make(map[net.Conn]struct{}) - cm.isClosed = false -} - -func (cm *connsMap) Add(c net.Conn) bool { - cm.mu.Lock() - ok := !cm.isClosed - if ok { - cm.m[c] = struct{}{} - } - cm.mu.Unlock() - return ok -} - -func (cm *connsMap) Delete(c net.Conn) { - cm.mu.Lock() - delete(cm.m, c) - cm.mu.Unlock() -} - -func (cm *connsMap) CloseAll() { - cm.mu.Lock() - for c := range cm.m { - _ = c.Close() - } - cm.isClosed = true - cm.mu.Unlock() + vminsertConnsMap ingestserver.ConnsMap + vmselectConnsMap ingestserver.ConnsMap } // NewServer returns new Server. diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index a1ca57c280..e679800a2f 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -10,6 +10,7 @@ sort: 15 * BUGFIX: vmagent: fix possible race when refreshing `role: endpoints` and `role: endpointslices` scrape targets in `kubernetes_sd_config`. Prevoiusly `pod` objects could be updated after the related `endpoints` object update. This could lead to missing scrape targets. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240). * BUGFIX: properly remove stale parts outside the configured retention if `-retentionPeriod` is smaller than one month. Previously stale parts could remain active for up to a month after they go outside the retention. * BUGFIX: stop the process on panic errors, since such errors may leave the process in inconsistent state. Previously panics could be recovered, which could result in unexpected hard-to-debug further behavior of running process. +* BUGFIX: vminsert, vmagent: make sure data ingestion connections are closed before completing graceful shutdown. Previously the connection may remain open, which could result in trailing samples loss. ## tip diff --git a/lib/ingestserver/clusternative/server.go b/lib/ingestserver/clusternative/server.go index 9cf0b047a3..db1c5d2638 100644 --- a/lib/ingestserver/clusternative/server.go +++ b/lib/ingestserver/clusternative/server.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/metrics" @@ -22,6 +23,7 @@ type Server struct { addr string lnTCP net.Listener wg sync.WaitGroup + cm ingestserver.ConnsMap } // MustStart starts clusternative server on the given addr. @@ -39,10 +41,11 @@ func MustStart(addr string, insertHandler func(c net.Conn) error) *Server { addr: addr, lnTCP: lnTCP, } + s.cm.Init() s.wg.Add(1) go func() { defer s.wg.Done() - serveTCP(lnTCP, insertHandler) + s.serveTCP(insertHandler) logger.Infof("stopped TCP clusternative server at %q", addr) }() return s @@ -54,18 +57,20 @@ func (s *Server) MustStop() { if err := s.lnTCP.Close(); err != nil { logger.Errorf("cannot close TCP clusternative server: %s", err) } + s.cm.CloseAll() s.wg.Wait() logger.Infof("TCP clusternative server at %q has been stopped", s.addr) } -func serveTCP(ln net.Listener, insertHandler func(c net.Conn) error) { +func (s *Server) serveTCP(insertHandler func(c net.Conn) error) { + var wg sync.WaitGroup for { - c, err := ln.Accept() + c, err := s.lnTCP.Accept() if err != nil { var ne net.Error if errors.As(err, &ne) { if ne.Temporary() { - logger.Errorf("clusternative: temporary error when listening for TCP addr %q: %s", ln.Addr(), err) + logger.Errorf("clusternative: temporary error when listening for TCP addr %q: %s", s.lnTCP.Addr(), err) time.Sleep(time.Second) continue } @@ -76,13 +81,24 @@ func serveTCP(ln net.Listener, insertHandler func(c net.Conn) error) { } logger.Fatalf("unexpected error when accepting TCP clusternative connections: %s", err) } + if !s.cm.Add(c) { + // The server is already closed. + _ = c.Close() + break + } + wg.Add(1) go func() { + defer func() { + s.cm.Delete(c) + _ = c.Close() + wg.Done() + }() writeRequestsTCP.Inc() if err := insertHandler(c); err != nil { writeErrorsTCP.Inc() logger.Errorf("error in TCP clusternative conn %q<->%q: %s", c.LocalAddr(), c.RemoteAddr(), err) } - _ = c.Close() }() } + wg.Wait() } diff --git a/lib/ingestserver/conns_map.go b/lib/ingestserver/conns_map.go new file mode 100644 index 0000000000..8da7d1239c --- /dev/null +++ b/lib/ingestserver/conns_map.go @@ -0,0 +1,47 @@ +package ingestserver + +import ( + "net" + "sync" +) + +// ConnsMap is used for tracking active connections. +type ConnsMap struct { + mu sync.Mutex + m map[net.Conn]struct{} + isClosed bool +} + +// Init initializes cm. +func (cm *ConnsMap) Init() { + cm.m = make(map[net.Conn]struct{}) + cm.isClosed = false +} + +// Add adds c to cm. +func (cm *ConnsMap) Add(c net.Conn) bool { + cm.mu.Lock() + ok := !cm.isClosed + if ok { + cm.m[c] = struct{}{} + } + cm.mu.Unlock() + return ok +} + +// Delete deletes c from cm. +func (cm *ConnsMap) Delete(c net.Conn) { + cm.mu.Lock() + delete(cm.m, c) + cm.mu.Unlock() +} + +// CloseAll closes all the added conns. +func (cm *ConnsMap) CloseAll() { + cm.mu.Lock() + for c := range cm.m { + _ = c.Close() + } + cm.isClosed = true + cm.mu.Unlock() +} diff --git a/lib/ingestserver/graphite/server.go b/lib/ingestserver/graphite/server.go index 90c0a055ef..bdc949f0c1 100644 --- a/lib/ingestserver/graphite/server.go +++ b/lib/ingestserver/graphite/server.go @@ -10,6 +10,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/metrics" @@ -29,6 +30,7 @@ type Server struct { lnTCP net.Listener lnUDP net.PacketConn wg sync.WaitGroup + cm ingestserver.ConnsMap } // MustStart starts graphite server on the given addr. @@ -54,16 +56,17 @@ func MustStart(addr string, insertHandler func(r io.Reader) error) *Server { lnTCP: lnTCP, lnUDP: lnUDP, } + s.cm.Init() s.wg.Add(1) go func() { defer s.wg.Done() - serveTCP(lnTCP, insertHandler) + s.serveTCP(insertHandler) logger.Infof("stopped TCP Graphite server at %q", addr) }() s.wg.Add(1) go func() { defer s.wg.Done() - serveUDP(lnUDP, insertHandler) + s.serveUDP(insertHandler) logger.Infof("stopped UDP Graphite server at %q", addr) }() return s @@ -79,18 +82,20 @@ func (s *Server) MustStop() { if err := s.lnUDP.Close(); err != nil { logger.Errorf("cannot close UDP Graphite server: %s", err) } + s.cm.CloseAll() s.wg.Wait() logger.Infof("TCP and UDP Graphite servers at %q have been stopped", s.addr) } -func serveTCP(ln net.Listener, insertHandler func(r io.Reader) error) { +func (s *Server) serveTCP(insertHandler func(r io.Reader) error) { + var wg sync.WaitGroup for { - c, err := ln.Accept() + c, err := s.lnTCP.Accept() if err != nil { var ne net.Error if errors.As(err, &ne) { if ne.Temporary() { - logger.Errorf("graphite: temporary error when listening for TCP addr %q: %s", ln.Addr(), err) + logger.Errorf("graphite: temporary error when listening for TCP addr %q: %s", s.lnTCP.Addr(), err) time.Sleep(time.Second) continue } @@ -101,18 +106,28 @@ func serveTCP(ln net.Listener, insertHandler func(r io.Reader) error) { } logger.Fatalf("unexpected error when accepting TCP Graphite connections: %s", err) } + if !s.cm.Add(c) { + _ = c.Close() + break + } + wg.Add(1) go func() { + defer func() { + s.cm.Delete(c) + _ = c.Close() + wg.Done() + }() writeRequestsTCP.Inc() if err := insertHandler(c); err != nil { writeErrorsTCP.Inc() logger.Errorf("error in TCP Graphite conn %q<->%q: %s", c.LocalAddr(), c.RemoteAddr(), err) } - _ = c.Close() }() } + wg.Wait() } -func serveUDP(ln net.PacketConn, insertHandler func(r io.Reader) error) { +func (s *Server) serveUDP(insertHandler func(r io.Reader) error) { gomaxprocs := cgroup.AvailableCPUs() var wg sync.WaitGroup for i := 0; i < gomaxprocs; i++ { @@ -124,13 +139,13 @@ func serveUDP(ln net.PacketConn, insertHandler func(r io.Reader) error) { for { bb.Reset() bb.B = bb.B[:cap(bb.B)] - n, addr, err := ln.ReadFrom(bb.B) + n, addr, err := s.lnUDP.ReadFrom(bb.B) if err != nil { writeErrorsUDP.Inc() var ne net.Error if errors.As(err, &ne) { if ne.Temporary() { - logger.Errorf("graphite: temporary error when listening for UDP addr %q: %s", ln.LocalAddr(), err) + logger.Errorf("graphite: temporary error when listening for UDP addr %q: %s", s.lnUDP.LocalAddr(), err) time.Sleep(time.Second) continue } @@ -145,7 +160,7 @@ func serveUDP(ln net.PacketConn, insertHandler func(r io.Reader) error) { writeRequestsUDP.Inc() if err := insertHandler(bb.NewReader()); err != nil { writeErrorsUDP.Inc() - logger.Errorf("error in UDP Graphite conn %q<->%q: %s", ln.LocalAddr(), addr, err) + logger.Errorf("error in UDP Graphite conn %q<->%q: %s", s.lnUDP.LocalAddr(), addr, err) continue } } diff --git a/lib/ingestserver/influx/server.go b/lib/ingestserver/influx/server.go index 48ead76a97..8a14f2b901 100644 --- a/lib/ingestserver/influx/server.go +++ b/lib/ingestserver/influx/server.go @@ -10,6 +10,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/metrics" @@ -29,6 +30,7 @@ type Server struct { lnTCP net.Listener lnUDP net.PacketConn wg sync.WaitGroup + cm ingestserver.ConnsMap } // MustStart starts Influx server on the given addr. @@ -54,16 +56,17 @@ func MustStart(addr string, insertHandler func(r io.Reader) error) *Server { lnTCP: lnTCP, lnUDP: lnUDP, } + s.cm.Init() s.wg.Add(1) go func() { defer s.wg.Done() - serveTCP(lnTCP, insertHandler) + s.serveTCP(insertHandler) logger.Infof("stopped TCP Influx server at %q", addr) }() s.wg.Add(1) go func() { defer s.wg.Done() - serveUDP(lnUDP, insertHandler) + s.serveUDP(insertHandler) logger.Infof("stopped UDP Influx server at %q", addr) }() return s @@ -79,18 +82,20 @@ func (s *Server) MustStop() { if err := s.lnUDP.Close(); err != nil { logger.Errorf("cannot close UDP Influx server: %s", err) } + s.cm.CloseAll() s.wg.Wait() logger.Infof("TCP and UDP Influx servers at %q have been stopped", s.addr) } -func serveTCP(ln net.Listener, insertHandler func(r io.Reader) error) { +func (s *Server) serveTCP(insertHandler func(r io.Reader) error) { + var wg sync.WaitGroup for { - c, err := ln.Accept() + c, err := s.lnTCP.Accept() if err != nil { var ne net.Error if errors.As(err, &ne) { if ne.Temporary() { - logger.Errorf("influx: temporary error when listening for TCP addr %q: %s", ln.Addr(), err) + logger.Errorf("influx: temporary error when listening for TCP addr %q: %s", s.lnTCP.Addr(), err) time.Sleep(time.Second) continue } @@ -101,18 +106,28 @@ func serveTCP(ln net.Listener, insertHandler func(r io.Reader) error) { } logger.Fatalf("unexpected error when accepting TCP Influx connections: %s", err) } + if !s.cm.Add(c) { + _ = c.Close() + break + } + wg.Add(1) go func() { + defer func() { + s.cm.Delete(c) + _ = c.Close() + wg.Done() + }() writeRequestsTCP.Inc() if err := insertHandler(c); err != nil { writeErrorsTCP.Inc() logger.Errorf("error in TCP Influx conn %q<->%q: %s", c.LocalAddr(), c.RemoteAddr(), err) } - _ = c.Close() }() } + wg.Wait() } -func serveUDP(ln net.PacketConn, insertHandler func(r io.Reader) error) { +func (s *Server) serveUDP(insertHandler func(r io.Reader) error) { gomaxprocs := cgroup.AvailableCPUs() var wg sync.WaitGroup for i := 0; i < gomaxprocs; i++ { @@ -124,13 +139,13 @@ func serveUDP(ln net.PacketConn, insertHandler func(r io.Reader) error) { for { bb.Reset() bb.B = bb.B[:cap(bb.B)] - n, addr, err := ln.ReadFrom(bb.B) + n, addr, err := s.lnUDP.ReadFrom(bb.B) if err != nil { writeErrorsUDP.Inc() var ne net.Error if errors.As(err, &ne) { if ne.Temporary() { - logger.Errorf("influx: temporary error when listening for UDP addr %q: %s", ln.LocalAddr(), err) + logger.Errorf("influx: temporary error when listening for UDP addr %q: %s", s.lnUDP.LocalAddr(), err) time.Sleep(time.Second) continue } @@ -145,7 +160,7 @@ func serveUDP(ln net.PacketConn, insertHandler func(r io.Reader) error) { writeRequestsUDP.Inc() if err := insertHandler(bb.NewReader()); err != nil { writeErrorsUDP.Inc() - logger.Errorf("error in UDP Influx conn %q<->%q: %s", ln.LocalAddr(), addr, err) + logger.Errorf("error in UDP Influx conn %q<->%q: %s", s.lnUDP.LocalAddr(), addr, err) continue } } diff --git a/lib/ingestserver/opentsdb/server.go b/lib/ingestserver/opentsdb/server.go index 178f3ddb6d..a16267a66e 100644 --- a/lib/ingestserver/opentsdb/server.go +++ b/lib/ingestserver/opentsdb/server.go @@ -11,6 +11,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdbhttp" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" @@ -34,6 +35,7 @@ type Server struct { httpServer *opentsdbhttp.Server lnUDP net.PacketConn wg sync.WaitGroup + cm ingestserver.ConnsMap } // MustStart starts OpenTSDB collector on the given addr. @@ -62,10 +64,11 @@ func MustStart(addr string, telnetInsertHandler func(r io.Reader) error, httpIns httpServer: httpServer, lnUDP: lnUDP, } + s.cm.Init() s.wg.Add(1) go func() { defer s.wg.Done() - serveTelnet(lnTelnet, telnetInsertHandler) + s.serveTelnet(lnTelnet, telnetInsertHandler) logger.Infof("stopped TCP telnet OpenTSDB server at %q", addr) }() s.wg.Add(1) @@ -77,7 +80,7 @@ func MustStart(addr string, telnetInsertHandler func(r io.Reader) error, httpIns s.wg.Add(1) go func() { defer s.wg.Done() - serveUDP(lnUDP, telnetInsertHandler) + s.serveUDP(telnetInsertHandler) logger.Infof("stopped UDP OpenTSDB server at %q", addr) }() return s @@ -97,13 +100,13 @@ func (s *Server) MustStop() { if err := s.lnUDP.Close(); err != nil { logger.Errorf("cannot stop UDP OpenTSDB server: %s", err) } - - // Wait until all the servers are stopped. + s.cm.CloseAll() s.wg.Wait() logger.Infof("TCP and UDP OpenTSDB servers at %q have been stopped", s.addr) } -func serveTelnet(ln net.Listener, insertHandler func(r io.Reader) error) { +func (s *Server) serveTelnet(ln net.Listener, insertHandler func(r io.Reader) error) { + var wg sync.WaitGroup for { c, err := ln.Accept() if err != nil { @@ -121,18 +124,28 @@ func serveTelnet(ln net.Listener, insertHandler func(r io.Reader) error) { } logger.Fatalf("unexpected error when accepting TCP OpenTSDB connections: %s", err) } + if !s.cm.Add(c) { + _ = c.Close() + break + } + wg.Add(1) go func() { + defer func() { + s.cm.Delete(c) + _ = c.Close() + wg.Done() + }() writeRequestsTCP.Inc() if err := insertHandler(c); err != nil { writeErrorsTCP.Inc() logger.Errorf("error in TCP OpenTSDB conn %q<->%q: %s", c.LocalAddr(), c.RemoteAddr(), err) } - _ = c.Close() }() } + wg.Wait() } -func serveUDP(ln net.PacketConn, insertHandler func(r io.Reader) error) { +func (s *Server) serveUDP(insertHandler func(r io.Reader) error) { gomaxprocs := cgroup.AvailableCPUs() var wg sync.WaitGroup for i := 0; i < gomaxprocs; i++ { @@ -144,13 +157,13 @@ func serveUDP(ln net.PacketConn, insertHandler func(r io.Reader) error) { for { bb.Reset() bb.B = bb.B[:cap(bb.B)] - n, addr, err := ln.ReadFrom(bb.B) + n, addr, err := s.lnUDP.ReadFrom(bb.B) if err != nil { writeErrorsUDP.Inc() var ne net.Error if errors.As(err, &ne) { if ne.Temporary() { - logger.Errorf("opentsdb: temporary error when listening for UDP addr %q: %s", ln.LocalAddr(), err) + logger.Errorf("opentsdb: temporary error when listening for UDP addr %q: %s", s.lnUDP.LocalAddr(), err) time.Sleep(time.Second) continue } @@ -165,7 +178,7 @@ func serveUDP(ln net.PacketConn, insertHandler func(r io.Reader) error) { writeRequestsUDP.Inc() if err := insertHandler(bb.NewReader()); err != nil { writeErrorsUDP.Inc() - logger.Errorf("error in UDP OpenTSDB conn %q<->%q: %s", ln.LocalAddr(), addr, err) + logger.Errorf("error in UDP OpenTSDB conn %q<->%q: %s", s.lnUDP.LocalAddr(), addr, err) continue } } diff --git a/lib/protoparser/common/unmarshal_work.go b/lib/protoparser/common/unmarshal_work.go index 55758e933e..773489a412 100644 --- a/lib/protoparser/common/unmarshal_work.go +++ b/lib/protoparser/common/unmarshal_work.go @@ -40,7 +40,7 @@ func StartUnmarshalWorkers() { // StopUnmarshalWorkers stops unmarshal workers. // -// No more calles to ScheduleUnmarshalWork are allowed after callsing stopUnmarshalWorkers +// No more calles to ScheduleUnmarshalWork are allowed after calling stopUnmarshalWorkers func StopUnmarshalWorkers() { close(unmarshalWorkCh) unmarshalWorkersWG.Wait()