2022-06-28 11:04:14 +00:00
package servers
2022-06-23 16:19:36 +00:00
import (
2023-05-18 17:37:56 +00:00
"errors"
2022-06-23 16:19:36 +00:00
"flag"
"fmt"
"net"
"sync"
"sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
2023-02-13 18:37:53 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/clusternative/stream"
2022-06-23 16:19:36 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
)
var precisionBits = flag . Int ( "precisionBits" , 64 , "The number of precision bits to store per each value. Lower precision bits improves data compression at the cost of precision loss" )
// VMInsertServer processes connections from vminsert.
type VMInsertServer struct {
// storage is a pointer to the underlying storage.
storage * storage . Storage
// ln is the listener for incoming connections to the server.
ln net . Listener
// connsMap is a map of currently established connections to the server.
// It is used for closing the connections when MustStop() is called.
connsMap ingestserver . ConnsMap
// wg is used for waiting for worker goroutines to stop when MustStop() is called.
wg sync . WaitGroup
// stopFlag is set to true when the server needs to stop.
stopFlag uint32
}
// NewVMInsertServer starts VMInsertServer at the given addr serving the given storage.
func NewVMInsertServer ( addr string , storage * storage . Storage ) ( * VMInsertServer , error ) {
2023-01-27 07:08:35 +00:00
ln , err := netutil . NewTCPListener ( "vminsert" , addr , false , nil )
2022-06-23 16:19:36 +00:00
if err != nil {
return nil , fmt . Errorf ( "unable to listen vminsertAddr %s: %w" , addr , err )
}
if err := encoding . CheckPrecisionBits ( uint8 ( * precisionBits ) ) ; err != nil {
return nil , fmt . Errorf ( "invalid -precisionBits: %w" , err )
}
s := & VMInsertServer {
storage : storage ,
ln : ln ,
}
s . connsMap . Init ( )
s . wg . Add ( 1 )
go func ( ) {
s . run ( )
s . wg . Done ( )
} ( )
return s , nil
}
func ( s * VMInsertServer ) run ( ) {
logger . Infof ( "accepting vminsert conns at %s" , s . ln . Addr ( ) )
for {
c , err := s . ln . Accept ( )
if err != nil {
if pe , ok := err . ( net . Error ) ; ok && pe . Temporary ( ) {
continue
}
if s . isStopping ( ) {
return
}
logger . Panicf ( "FATAL: cannot process vminsert conns at %s: %s" , s . ln . Addr ( ) , err )
}
if ! s . connsMap . Add ( c ) {
// The server is closed.
_ = c . Close ( )
return
}
vminsertConns . Inc ( )
s . wg . Add ( 1 )
go func ( ) {
defer func ( ) {
s . connsMap . Delete ( c )
vminsertConns . Dec ( )
s . wg . Done ( )
} ( )
// There is no need in response compression, since
// vmstorage sends only small packets to vminsert.
compressionLevel := 0
bc , err := handshake . VMInsertServer ( c , compressionLevel )
if err != nil {
if s . isStopping ( ) {
2022-07-05 21:41:49 +00:00
// c is stopped inside VMInsertServer.MustStop
2022-06-23 16:19:36 +00:00
return
}
2023-05-18 17:37:56 +00:00
if ! errors . Is ( err , handshake . ErrIgnoreHealthcheck ) {
logger . Errorf ( "cannot perform vminsert handshake with client %q: %s" , c . RemoteAddr ( ) , err )
}
2022-06-23 16:19:36 +00:00
_ = c . Close ( )
return
}
defer func ( ) {
if ! s . isStopping ( ) {
logger . Infof ( "closing vminsert conn from %s" , c . RemoteAddr ( ) )
}
_ = bc . Close ( )
} ( )
logger . Infof ( "processing vminsert conn from %s" , c . RemoteAddr ( ) )
2023-02-13 18:37:53 +00:00
err = stream . Parse ( bc , func ( rows [ ] storage . MetricRow ) error {
2022-06-23 16:19:36 +00:00
vminsertMetricsRead . Add ( len ( rows ) )
return s . storage . AddRows ( rows , uint8 ( * precisionBits ) )
} , s . storage . IsReadOnly )
if err != nil {
if s . isStopping ( ) {
return
}
vminsertConnErrors . Inc ( )
logger . Errorf ( "cannot process vminsert conn from %s: %s" , c . RemoteAddr ( ) , err )
}
} ( )
}
}
var (
vminsertConns = metrics . NewCounter ( "vm_vminsert_conns" )
vminsertConnErrors = metrics . NewCounter ( "vm_vminsert_conn_errors_total" )
vminsertMetricsRead = metrics . NewCounter ( "vm_vminsert_metrics_read_total" )
)
2022-07-05 21:41:49 +00:00
// MustStop gracefully stops s so it no longer touches s.storage after returning.
func ( s * VMInsertServer ) MustStop ( ) {
2022-06-23 16:19:36 +00:00
// Mark the server as stoping.
s . setIsStopping ( )
// Stop accepting new connections from vminsert.
if err := s . ln . Close ( ) ; err != nil {
logger . Panicf ( "FATAL: cannot close vminsert listener: %s" , err )
}
// Close existing connections from vminsert, so the goroutines
// processing these connections are finished.
s . connsMap . CloseAll ( )
// Wait until all the goroutines processing vminsert conns are finished.
s . wg . Wait ( )
}
func ( s * VMInsertServer ) setIsStopping ( ) {
atomic . StoreUint32 ( & s . stopFlag , 1 )
}
func ( s * VMInsertServer ) isStopping ( ) bool {
return atomic . LoadUint32 ( & s . stopFlag ) != 0
}