app/vmstorage/transport: refactoring: split Server into VMInsertServer and VMStorageServer

This makes the code more clear
This commit is contained in:
Aliaksandr Valialkin 2022-06-23 19:19:36 +03:00
parent 71b0dfdefa
commit e0ce6c0ff8
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
3 changed files with 240 additions and 173 deletions

View file

@ -104,13 +104,14 @@ func main() {
registerStorageMetrics(strg)
common.StartUnmarshalWorkers()
srv, err := transport.NewServer(*vminsertAddr, *vmselectAddr, strg)
vminsertSrv, err := transport.NewVMInsertServer(*vminsertAddr, strg)
if err != nil {
logger.Fatalf("cannot create a server with vminsertAddr=%s, vmselectAddr=%s: %s", *vminsertAddr, *vmselectAddr, err)
logger.Fatalf("cannot create a server with -vminsertAddr=%s: %s", *vminsertAddr, err)
}
vmselectSrv, err := transport.NewVMSelectServer(*vmselectAddr, strg)
if err != nil {
logger.Fatalf("cannot create a server with -vmselectAddr=%s: %s", *vmselectAddr, err)
}
go srv.RunVMInsert()
go srv.RunVMSelect()
requestHandler := newRequestHandler(strg)
go func() {
@ -130,7 +131,8 @@ func main() {
logger.Infof("gracefully shutting down the service")
startTime = time.Now()
stopStaleSnapshotsRemover()
srv.MustClose()
vmselectSrv.MustClose()
vminsertSrv.MustClose()
common.StopUnmarshalWorkers()
logger.Infof("successfully shut down the service in %.3f seconds", time.Since(startTime).Seconds())

View file

@ -0,0 +1,158 @@
package transport
import (
"flag"
"fmt"
"net"
"sync"
"sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/clusternative"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
)
var precisionBits = flag.Int("precisionBits", 64, "The number of precision bits to store per each value. Lower precision bits improves data compression at the cost of precision loss")
// VMInsertServer processes connections from vminsert.
type VMInsertServer struct {
// storage is a pointer to the underlying storage.
storage *storage.Storage
// ln is the listener for incoming connections to the server.
ln net.Listener
// connsMap is a map of currently established connections to the server.
// It is used for closing the connections when MustStop() is called.
connsMap ingestserver.ConnsMap
// wg is used for waiting for worker goroutines to stop when MustStop() is called.
wg sync.WaitGroup
// stopFlag is set to true when the server needs to stop.
stopFlag uint32
}
// NewVMInsertServer starts VMInsertServer at the given addr serving the given storage.
func NewVMInsertServer(addr string, storage *storage.Storage) (*VMInsertServer, error) {
ln, err := netutil.NewTCPListener("vminsert", addr, nil)
if err != nil {
return nil, fmt.Errorf("unable to listen vminsertAddr %s: %w", addr, err)
}
if err := encoding.CheckPrecisionBits(uint8(*precisionBits)); err != nil {
return nil, fmt.Errorf("invalid -precisionBits: %w", err)
}
s := &VMInsertServer{
storage: storage,
ln: ln,
}
s.connsMap.Init()
s.wg.Add(1)
go func() {
s.run()
s.wg.Done()
}()
return s, nil
}
func (s *VMInsertServer) run() {
logger.Infof("accepting vminsert conns at %s", s.ln.Addr())
for {
c, err := s.ln.Accept()
if err != nil {
if pe, ok := err.(net.Error); ok && pe.Temporary() {
continue
}
if s.isStopping() {
return
}
logger.Panicf("FATAL: cannot process vminsert conns at %s: %s", s.ln.Addr(), err)
}
logger.Infof("accepted vminsert conn from %s", c.RemoteAddr())
if !s.connsMap.Add(c) {
// The server is closed.
_ = c.Close()
return
}
vminsertConns.Inc()
s.wg.Add(1)
go func() {
defer func() {
s.connsMap.Delete(c)
vminsertConns.Dec()
s.wg.Done()
}()
// There is no need in response compression, since
// vmstorage sends only small packets to vminsert.
compressionLevel := 0
bc, err := handshake.VMInsertServer(c, compressionLevel)
if err != nil {
if s.isStopping() {
// c is stopped inside VMInsertServer.MustClose
return
}
logger.Errorf("cannot perform vminsert handshake with client %q: %s", c.RemoteAddr(), err)
_ = c.Close()
return
}
defer func() {
if !s.isStopping() {
logger.Infof("closing vminsert conn from %s", c.RemoteAddr())
}
_ = bc.Close()
}()
logger.Infof("processing vminsert conn from %s", c.RemoteAddr())
err = clusternative.ParseStream(bc, func(rows []storage.MetricRow) error {
vminsertMetricsRead.Add(len(rows))
return s.storage.AddRows(rows, uint8(*precisionBits))
}, s.storage.IsReadOnly)
if err != nil {
if s.isStopping() {
return
}
vminsertConnErrors.Inc()
logger.Errorf("cannot process vminsert conn from %s: %s", c.RemoteAddr(), err)
}
}()
}
}
var (
vminsertConns = metrics.NewCounter("vm_vminsert_conns")
vminsertConnErrors = metrics.NewCounter("vm_vminsert_conn_errors_total")
vminsertMetricsRead = metrics.NewCounter("vm_vminsert_metrics_read_total")
)
// MustClose gracefully closes s so it no longer touches s.storage after returning.
func (s *VMInsertServer) MustClose() {
// Mark the server as stoping.
s.setIsStopping()
// Stop accepting new connections from vminsert.
if err := s.ln.Close(); err != nil {
logger.Panicf("FATAL: cannot close vminsert listener: %s", err)
}
// Close existing connections from vminsert, so the goroutines
// processing these connections are finished.
s.connsMap.CloseAll()
// Wait until all the goroutines processing vminsert conns are finished.
s.wg.Wait()
}
func (s *VMInsertServer) setIsStopping() {
atomic.StoreUint32(&s.stopFlag, 1)
}
func (s *VMInsertServer) isStopping() bool {
return atomic.LoadUint32(&s.stopFlag) != 0
}

View file

@ -19,7 +19,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/clusternative"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
@ -31,61 +30,55 @@ var (
maxTagValueSuffixesPerSearch = flag.Int("search.maxTagValueSuffixesPerSearch", 100e3, "The maximum number of tag value suffixes returned from /metrics/find")
maxMetricsPerSearch = flag.Int("search.maxUniqueTimeseries", 0, "The maximum number of unique time series, which can be scanned during every query. This allows protecting against heavy queries, which select unexpectedly high number of series. Zero means 'no limit'. See also -search.max* command-line flags at vmselect")
precisionBits = flag.Int("precisionBits", 64, "The number of precision bits to store per each value. Lower precision bits improves data compression at the cost of precision loss")
disableRPCCompression = flag.Bool(`rpc.disableCompression`, false, "Whether to disable compression of the data sent from vmstorage to vmselect. This reduces CPU usage at the cost of higher network bandwidth usage")
disableRPCCompression = flag.Bool(`rpc.disableCompression`, false, "Whether to disable compression of the data sent from vmstorage to vmselect. "+
"This reduces CPU usage at the cost of higher network bandwidth usage")
denyQueriesOutsideRetention = flag.Bool("denyQueriesOutsideRetention", false, "Whether to deny queries outside of the configured -retentionPeriod. "+
"When set, then /api/v1/query_range would return '503 Service Unavailable' error for queries with 'from' value outside -retentionPeriod. "+
"This may be useful when multiple data sources with distinct retentions are hidden behind query-tee")
)
// Server processes connections from vminsert and vmselect.
type Server struct {
// Move stopFlag to the top of the struct in order to fix atomic access to it on 32-bit arches.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
stopFlag uint64
// VMSelectServer processes connections from vmselect.
type VMSelectServer struct {
// storage is a pointer to the underlying storage.
storage *storage.Storage
vminsertLN net.Listener
vmselectLN net.Listener
// ln is the listener for incoming connections to the server.
ln net.Listener
vminsertWG sync.WaitGroup
vmselectWG sync.WaitGroup
// connsMap is a map of currently established connections to the server.
// It is used for closing the connections when MustStop() is called.
connsMap ingestserver.ConnsMap
vminsertConnsMap ingestserver.ConnsMap
vmselectConnsMap ingestserver.ConnsMap
// wg is used for waiting for worker goroutines to stop when MustStop() is called.
wg sync.WaitGroup
// stopFlag is set to true when the server needs to stop.
stopFlag uint32
}
// NewServer returns new Server.
func NewServer(vminsertAddr, vmselectAddr string, storage *storage.Storage) (*Server, error) {
vminsertLN, err := netutil.NewTCPListener("vminsert", vminsertAddr, nil)
// NewVMSelectServer starts new VMSelectServer at the given addr, which serves the given storage.
func NewVMSelectServer(addr string, storage *storage.Storage) (*VMSelectServer, error) {
ln, err := netutil.NewTCPListener("vmselect", addr, nil)
if err != nil {
return nil, fmt.Errorf("unable to listen vminsertAddr %s: %w", vminsertAddr, err)
return nil, fmt.Errorf("unable to listen vmselectAddr %s: %w", addr, err)
}
vmselectLN, err := netutil.NewTCPListener("vmselect", vmselectAddr, nil)
if err != nil {
return nil, fmt.Errorf("unable to listen vmselectAddr %s: %w", vmselectAddr, err)
}
if err := encoding.CheckPrecisionBits(uint8(*precisionBits)); err != nil {
return nil, fmt.Errorf("invalid -precisionBits: %w", err)
}
s := &Server{
s := &VMSelectServer{
storage: storage,
vminsertLN: vminsertLN,
vmselectLN: vmselectLN,
ln: ln,
}
s.vminsertConnsMap.Init()
s.vmselectConnsMap.Init()
s.connsMap.Init()
s.wg.Add(1)
go func() {
s.run()
s.wg.Done()
}()
return s, nil
}
// RunVMInsert runs a server accepting connections from vminsert.
func (s *Server) RunVMInsert() {
logger.Infof("accepting vminsert conns at %s", s.vminsertLN.Addr())
func (s *VMSelectServer) run() {
logger.Infof("accepting vmselect conns at %s", s.ln.Addr())
for {
c, err := s.vminsertLN.Accept()
c, err := s.ln.Accept()
if err != nil {
if pe, ok := err.(net.Error); ok && pe.Temporary() {
continue
@ -93,89 +86,22 @@ func (s *Server) RunVMInsert() {
if s.isStopping() {
return
}
logger.Panicf("FATAL: cannot process vminsert conns at %s: %s", s.vminsertLN.Addr(), err)
}
logger.Infof("accepted vminsert conn from %s", c.RemoteAddr())
if !s.vminsertConnsMap.Add(c) {
// The server is closed.
_ = c.Close()
return
}
vminsertConns.Inc()
s.vminsertWG.Add(1)
go func() {
defer func() {
s.vminsertConnsMap.Delete(c)
vminsertConns.Dec()
s.vminsertWG.Done()
}()
// There is no need in response compression, since
// vmstorage sends only small packets to vminsert.
compressionLevel := 0
bc, err := handshake.VMInsertServer(c, compressionLevel)
if err != nil {
if s.isStopping() {
// c is stopped inside Server.MustClose
return
}
logger.Errorf("cannot perform vminsert handshake with client %q: %s", c.RemoteAddr(), err)
_ = c.Close()
return
}
defer func() {
if !s.isStopping() {
logger.Infof("closing vminsert conn from %s", c.RemoteAddr())
}
_ = bc.Close()
}()
logger.Infof("processing vminsert conn from %s", c.RemoteAddr())
if err := s.processVMInsertConn(bc); err != nil {
if s.isStopping() {
return
}
vminsertConnErrors.Inc()
logger.Errorf("cannot process vminsert conn from %s: %s", c.RemoteAddr(), err)
}
}()
}
}
var (
vminsertConns = metrics.NewCounter("vm_vminsert_conns")
vminsertConnErrors = metrics.NewCounter("vm_vminsert_conn_errors_total")
)
// RunVMSelect runs a server accepting connections from vmselect.
func (s *Server) RunVMSelect() {
logger.Infof("accepting vmselect conns at %s", s.vmselectLN.Addr())
for {
c, err := s.vmselectLN.Accept()
if err != nil {
if pe, ok := err.(net.Error); ok && pe.Temporary() {
continue
}
if s.isStopping() {
return
}
logger.Panicf("FATAL: cannot process vmselect conns at %s: %s", s.vmselectLN.Addr(), err)
logger.Panicf("FATAL: cannot process vmselect conns at %s: %s", s.ln.Addr(), err)
}
logger.Infof("accepted vmselect conn from %s", c.RemoteAddr())
if !s.vmselectConnsMap.Add(c) {
if !s.connsMap.Add(c) {
// The server is closed.
_ = c.Close()
return
}
vmselectConns.Inc()
s.vmselectWG.Add(1)
s.wg.Add(1)
go func() {
defer func() {
s.vmselectConnsMap.Delete(c)
s.connsMap.Delete(c)
vmselectConns.Dec()
s.vmselectWG.Done()
s.wg.Done()
}()
// Compress responses to vmselect even if they already contain compressed blocks.
@ -190,7 +116,7 @@ func (s *Server) RunVMSelect() {
bc, err := handshake.VMSelectServer(c, compressionLevel)
if err != nil {
if s.isStopping() {
// c is closed inside Server.MustClose
// c is closed inside VMSelectServer.MustClose
return
}
logger.Errorf("cannot perform vmselect handshake with client %q: %s", c.RemoteAddr(), err)
@ -206,7 +132,7 @@ func (s *Server) RunVMSelect() {
}()
logger.Infof("processing vmselect conn from %s", c.RemoteAddr())
if err := s.processVMSelectConn(bc); err != nil {
if err := s.processConn(bc); err != nil {
if s.isStopping() {
return
}
@ -222,58 +148,39 @@ var (
vmselectConnErrors = metrics.NewCounter("vm_vmselect_conn_errors_total")
)
// MustClose gracefully closes the server,
// so it no longer touches s.storage after returning.
func (s *Server) MustClose() {
// MustClose gracefully closes s, so it no longer touches s.storage after returning.
func (s *VMSelectServer) MustClose() {
// Mark the server as stoping.
s.setIsStopping()
// Stop accepting new connections from vminsert and vmselect.
if err := s.vminsertLN.Close(); err != nil {
logger.Panicf("FATAL: cannot close vminsert listener: %s", err)
}
if err := s.vmselectLN.Close(); err != nil {
// Stop accepting new connections from vmselect.
if err := s.ln.Close(); err != nil {
logger.Panicf("FATAL: cannot close vmselect listener: %s", err)
}
// Close existing connections from vminsert, so the goroutines
// processing these connections are finished.
s.vminsertConnsMap.CloseAll()
// Close existing connections from vmselect, so the goroutines
// processing these connections are finished.
s.vmselectConnsMap.CloseAll()
s.connsMap.CloseAll()
// Wait until all the goroutines processing vminsert and vmselect conns
// are finished.
s.vminsertWG.Wait()
s.vmselectWG.Wait()
// Wait until all the goroutines processing vmselect conns are finished.
s.wg.Wait()
}
func (s *Server) setIsStopping() {
atomic.StoreUint64(&s.stopFlag, 1)
func (s *VMSelectServer) setIsStopping() {
atomic.StoreUint32(&s.stopFlag, 1)
}
func (s *Server) isStopping() bool {
return atomic.LoadUint64(&s.stopFlag) != 0
func (s *VMSelectServer) isStopping() bool {
return atomic.LoadUint32(&s.stopFlag) != 0
}
func (s *Server) processVMInsertConn(bc *handshake.BufferedConn) error {
return clusternative.ParseStream(bc, func(rows []storage.MetricRow) error {
vminsertMetricsRead.Add(len(rows))
return s.storage.AddRows(rows, uint8(*precisionBits))
}, s.storage.IsReadOnly)
}
var vminsertMetricsRead = metrics.NewCounter("vm_vminsert_metrics_read_total")
func (s *Server) processVMSelectConn(bc *handshake.BufferedConn) error {
func (s *VMSelectServer) processConn(bc *handshake.BufferedConn) error {
ctx := &vmselectRequestCtx{
bc: bc,
sizeBuf: make([]byte, 8),
}
for {
if err := s.processVMSelectRequest(ctx); err != nil {
if err := s.processRequest(ctx); err != nil {
if err == io.EOF {
// Remote client gracefully closed the connection.
return nil
@ -369,6 +276,9 @@ func (ctx *vmselectRequestCtx) readAccountIDProjectID() (uint32, uint32, error)
return accountID, projectID, nil
}
// maxSearchQuerySize is the maximum size of SearchQuery packet in bytes.
const maxSearchQuerySize = 1024 * 1024
func (ctx *vmselectRequestCtx) readSearchQuery() error {
if err := ctx.readDataBufBytes(maxSearchQuerySize); err != nil {
return fmt.Errorf("cannot read searchQuery: %w", err)
@ -475,7 +385,7 @@ func (ctx *vmselectRequestCtx) writeUint64(n uint64) error {
const maxRPCNameSize = 128
func (s *Server) processVMSelectRequest(ctx *vmselectRequestCtx) error {
func (s *VMSelectServer) processRequest(ctx *vmselectRequestCtx) error {
// Read rpcName
// Do not set deadline on reading rpcName, since it may take a
// lot of time for idle connection.
@ -512,7 +422,7 @@ func (s *Server) processVMSelectRequest(ctx *vmselectRequestCtx) error {
ctx.deadline = fasttime.UnixTimestamp() + uint64(timeout)
// Process the rpcName call.
if err := s.processVMSelectRPC(ctx, rpcName); err != nil {
if err := s.processRPC(ctx, rpcName); err != nil {
return fmt.Errorf("cannot execute %q: %s", rpcName, err)
}
@ -525,26 +435,26 @@ func (s *Server) processVMSelectRequest(ctx *vmselectRequestCtx) error {
return nil
}
func (s *Server) processVMSelectRPC(ctx *vmselectRequestCtx, rpcName string) error {
func (s *VMSelectServer) processRPC(ctx *vmselectRequestCtx, rpcName string) error {
switch rpcName {
case "search_v6":
return s.processVMSelectSearch(ctx)
return s.processSearch(ctx)
case "searchMetricNames_v3":
return s.processVMSelectSearchMetricNames(ctx)
return s.processSearchMetricNames(ctx)
case "labelValues_v5":
return s.processVMSelectLabelValues(ctx)
return s.processLabelValues(ctx)
case "tagValueSuffixes_v3":
return s.processVMSelectTagValueSuffixes(ctx)
return s.processTagValueSuffixes(ctx)
case "labelNames_v5":
return s.processVMSelectLabelNames(ctx)
return s.processLabelNames(ctx)
case "seriesCount_v4":
return s.processVMSelectSeriesCount(ctx)
return s.processSeriesCount(ctx)
case "tsdbStatus_v5":
return s.processVMSelectTSDBStatus(ctx)
return s.processTSDBStatus(ctx)
case "deleteMetrics_v5":
return s.processVMSelectDeleteMetrics(ctx)
return s.processDeleteMetrics(ctx)
case "registerMetricNames_v3":
return s.processVMSelectRegisterMetricNames(ctx)
return s.processRegisterMetricNames(ctx)
default:
return fmt.Errorf("unsupported rpcName: %q", rpcName)
}
@ -553,7 +463,7 @@ func (s *Server) processVMSelectRPC(ctx *vmselectRequestCtx, rpcName string) err
const maxMetricNameRawSize = 1024 * 1024
const maxMetricNamesPerRequest = 1024 * 1024
func (s *Server) processVMSelectRegisterMetricNames(ctx *vmselectRequestCtx) error {
func (s *VMSelectServer) processRegisterMetricNames(ctx *vmselectRequestCtx) error {
vmselectRegisterMetricNamesRequests.Inc()
// Read request
@ -590,7 +500,7 @@ func (s *Server) processVMSelectRegisterMetricNames(ctx *vmselectRequestCtx) err
return nil
}
func (s *Server) processVMSelectDeleteMetrics(ctx *vmselectRequestCtx) error {
func (s *VMSelectServer) processDeleteMetrics(ctx *vmselectRequestCtx) error {
vmselectDeleteMetricsRequests.Inc()
// Read request
@ -624,7 +534,7 @@ func (s *Server) processVMSelectDeleteMetrics(ctx *vmselectRequestCtx) error {
return nil
}
func (s *Server) processVMSelectLabelNames(ctx *vmselectRequestCtx) error {
func (s *VMSelectServer) processLabelNames(ctx *vmselectRequestCtx) error {
vmselectLabelNamesRequests.Inc()
// Read request
@ -673,7 +583,7 @@ func (s *Server) processVMSelectLabelNames(ctx *vmselectRequestCtx) error {
const maxLabelValueSize = 16 * 1024
func (s *Server) processVMSelectLabelValues(ctx *vmselectRequestCtx) error {
func (s *VMSelectServer) processLabelValues(ctx *vmselectRequestCtx) error {
vmselectLabelValuesRequests.Inc()
// Read request
@ -729,7 +639,7 @@ func (s *Server) processVMSelectLabelValues(ctx *vmselectRequestCtx) error {
return nil
}
func (s *Server) processVMSelectTagValueSuffixes(ctx *vmselectRequestCtx) error {
func (s *VMSelectServer) processTagValueSuffixes(ctx *vmselectRequestCtx) error {
vmselectTagValueSuffixesRequests.Inc()
// read request
@ -785,7 +695,7 @@ func (s *Server) processVMSelectTagValueSuffixes(ctx *vmselectRequestCtx) error
return nil
}
func (s *Server) processVMSelectSeriesCount(ctx *vmselectRequestCtx) error {
func (s *VMSelectServer) processSeriesCount(ctx *vmselectRequestCtx) error {
vmselectSeriesCountRequests.Inc()
// Read request
@ -812,7 +722,7 @@ func (s *Server) processVMSelectSeriesCount(ctx *vmselectRequestCtx) error {
return nil
}
func (s *Server) processVMSelectTSDBStatus(ctx *vmselectRequestCtx) error {
func (s *VMSelectServer) processTSDBStatus(ctx *vmselectRequestCtx) error {
vmselectTSDBStatusRequests.Inc()
// Read request
@ -892,10 +802,7 @@ func writeTopHeapEntries(ctx *vmselectRequestCtx, a []storage.TopHeapEntry) erro
return nil
}
// maxSearchQuerySize is the maximum size of SearchQuery packet in bytes.
const maxSearchQuerySize = 1024 * 1024
func (s *Server) processVMSelectSearchMetricNames(ctx *vmselectRequestCtx) error {
func (s *VMSelectServer) processSearchMetricNames(ctx *vmselectRequestCtx) error {
vmselectSearchMetricNamesRequests.Inc()
// Read request.
@ -937,7 +844,7 @@ func (s *Server) processVMSelectSearchMetricNames(ctx *vmselectRequestCtx) error
return nil
}
func (s *Server) processVMSelectSearch(ctx *vmselectRequestCtx) error {
func (s *VMSelectServer) processSearch(ctx *vmselectRequestCtx) error {
vmselectSearchRequests.Inc()
// Read request.