VictoriaMetrics/lib/ingestserver/influx/server.go
Aliaksandr Valialkin f4989edd96
lib/bytesutil: split Resize() into ResizeNoCopy() and ResizeWithCopy() functions
Previously bytesutil.Resize() was copying the original byte slice contents to a newly allocated slice.
This wasted CPU cycles and memory bandwidth in some places, where the original slice contents wasn't needed
after slize resizing. Switch such places to bytesutil.ResizeNoCopy().

Rename the original bytesutil.Resize() function to bytesutil.ResizeWithCopy() for the sake of improved readability.

Additionally, allocate new slice with `make()` instead of `append()`. This guarantees that the capacity of the allocated slice
exactly matches the requested size. The `append()` could return a slice with bigger capacity as an optimization for further `append()` calls.
This could result in excess memory usage when the returned byte slice was cached (for instance, in lib/blockcache).

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2007
2022-01-25 15:24:44 +02:00

170 lines
4.7 KiB
Go

package influx
import (
"errors"
"io"
"net"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/metrics"
)
var (
writeRequestsTCP = metrics.NewCounter(`vm_ingestserver_requests_total{type="influx", name="write", net="tcp"}`)
writeErrorsTCP = metrics.NewCounter(`vm_ingestserver_request_errors_total{type="influx", name="write", net="tcp"}`)
writeRequestsUDP = metrics.NewCounter(`vm_ingestserver_requests_total{type="influx", name="write", net="udp"}`)
writeErrorsUDP = metrics.NewCounter(`vm_ingestserver_request_errors_total{type="influx", name="write", net="udp"}`)
)
// Server accepts InfluxDB line protocol over TCP and UDP.
type Server struct {
addr string
lnTCP net.Listener
lnUDP net.PacketConn
wg sync.WaitGroup
cm ingestserver.ConnsMap
}
// MustStart starts InfluxDB server on the given addr.
//
// The incoming connections are processed with insertHandler.
//
// MustStop must be called on the returned server when it is no longer needed.
func MustStart(addr string, insertHandler func(r io.Reader) error) *Server {
logger.Infof("starting TCP InfluxDB server at %q", addr)
lnTCP, err := netutil.NewTCPListener("influx", addr)
if err != nil {
logger.Fatalf("cannot start TCP InfluxDB server at %q: %s", addr, err)
}
logger.Infof("starting UDP InfluxDB server at %q", addr)
lnUDP, err := net.ListenPacket(netutil.GetUDPNetwork(), addr)
if err != nil {
logger.Fatalf("cannot start UDP InfluxDB server at %q: %s", addr, err)
}
s := &Server{
addr: addr,
lnTCP: lnTCP,
lnUDP: lnUDP,
}
s.cm.Init()
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.serveTCP(insertHandler)
logger.Infof("stopped TCP InfluxDB server at %q", addr)
}()
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.serveUDP(insertHandler)
logger.Infof("stopped UDP InfluxDB server at %q", addr)
}()
return s
}
// MustStop stops the server.
func (s *Server) MustStop() {
logger.Infof("stopping TCP InfluxDB server at %q...", s.addr)
if err := s.lnTCP.Close(); err != nil {
logger.Errorf("cannot close TCP InfluxDB server: %s", err)
}
logger.Infof("stopping UDP InfluxDB server at %q...", s.addr)
if err := s.lnUDP.Close(); err != nil {
logger.Errorf("cannot close UDP InfluxDB server: %s", err)
}
s.cm.CloseAll()
s.wg.Wait()
logger.Infof("TCP and UDP InfluxDB servers at %q have been stopped", s.addr)
}
func (s *Server) serveTCP(insertHandler func(r io.Reader) error) {
var wg sync.WaitGroup
for {
c, err := s.lnTCP.Accept()
if err != nil {
var ne net.Error
if errors.As(err, &ne) {
if ne.Temporary() {
logger.Errorf("influx: temporary error when listening for TCP addr %q: %s", s.lnTCP.Addr(), err)
time.Sleep(time.Second)
continue
}
if strings.Contains(err.Error(), "use of closed network connection") {
break
}
logger.Fatalf("unrecoverable error when accepting TCP InfluxDB connections: %s", err)
}
logger.Fatalf("unexpected error when accepting TCP InfluxDB connections: %s", err)
}
if !s.cm.Add(c) {
_ = c.Close()
break
}
wg.Add(1)
go func() {
defer func() {
s.cm.Delete(c)
_ = c.Close()
wg.Done()
}()
writeRequestsTCP.Inc()
if err := insertHandler(c); err != nil {
writeErrorsTCP.Inc()
logger.Errorf("error in TCP InfluxDB conn %q<->%q: %s", c.LocalAddr(), c.RemoteAddr(), err)
}
}()
}
wg.Wait()
}
func (s *Server) serveUDP(insertHandler func(r io.Reader) error) {
gomaxprocs := cgroup.AvailableCPUs()
var wg sync.WaitGroup
for i := 0; i < gomaxprocs; i++ {
wg.Add(1)
go func() {
defer wg.Done()
var bb bytesutil.ByteBuffer
bb.B = bytesutil.ResizeNoCopy(bb.B, 64*1024)
for {
bb.Reset()
bb.B = bb.B[:cap(bb.B)]
n, addr, err := s.lnUDP.ReadFrom(bb.B)
if err != nil {
writeErrorsUDP.Inc()
var ne net.Error
if errors.As(err, &ne) {
if ne.Temporary() {
logger.Errorf("influx: temporary error when listening for UDP addr %q: %s", s.lnUDP.LocalAddr(), err)
time.Sleep(time.Second)
continue
}
if strings.Contains(err.Error(), "use of closed network connection") {
break
}
}
logger.Errorf("cannot read InfluxDB UDP data: %s", err)
continue
}
bb.B = bb.B[:n]
writeRequestsUDP.Inc()
if err := insertHandler(bb.NewReader()); err != nil {
writeErrorsUDP.Inc()
logger.Errorf("error in UDP InfluxDB conn %q<->%q: %s", s.lnUDP.LocalAddr(), addr, err)
continue
}
}
}()
}
wg.Wait()
}