2019-05-22 21:23:23 +00:00
package transport
import (
2020-08-10 10:17:12 +00:00
"errors"
2019-05-22 21:23:23 +00:00
"flag"
"fmt"
"io"
"net"
2020-06-30 21:58:26 +00:00
"net/http"
2019-05-22 21:23:23 +00:00
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2020-12-08 18:49:32 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
2019-05-22 21:23:23 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/consts"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
2020-05-15 12:42:30 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
2019-05-22 21:23:23 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake"
2020-06-30 21:58:26 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
2019-05-22 21:23:23 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
)
var (
2020-09-10 21:29:26 +00:00
maxTagKeysPerSearch = flag . Int ( "search.maxTagKeys" , 100e3 , "The maximum number of tag keys returned per search" )
maxTagValuesPerSearch = flag . Int ( "search.maxTagValues" , 100e3 , "The maximum number of tag values returned per search" )
maxTagValueSuffixesPerSearch = flag . Int ( "search.maxTagValueSuffixesPerSearch" , 100e3 , "The maximum number of tag value suffixes returned from /metrics/find" )
maxMetricsPerSearch = flag . Int ( "search.maxUniqueTimeseries" , 300e3 , "The maximum number of unique time series each search can scan" )
2019-05-22 21:23:23 +00:00
2019-06-10 11:52:41 +00:00
precisionBits = flag . Int ( "precisionBits" , 64 , "The number of precision bits to store per each value. Lower precision bits improves data compression at the cost of precision loss" )
disableRPCCompression = flag . Bool ( ` rpc.disableCompression ` , false , "Disable compression of RPC traffic. This reduces CPU usage at the cost of higher network bandwidth usage" )
2020-06-30 21:58:26 +00:00
denyQueriesOutsideRetention = flag . Bool ( "denyQueriesOutsideRetention" , false , "Whether to deny queries outside of the configured -retentionPeriod. " +
"When set, then /api/v1/query_range would return '503 Service Unavailable' error for queries with 'from' value outside -retentionPeriod. " +
"This may be useful when multiple data sources with distinct retentions are hidden behind query-tee" )
2019-05-22 21:23:23 +00:00
)
// Server processes connections from vminsert and vmselect.
type Server struct {
2020-05-12 17:20:57 +00:00
// Move stopFlag to the top of the struct in order to fix atomic access to it on 32-bit arches.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
stopFlag uint64
2019-05-22 21:23:23 +00:00
storage * storage . Storage
vminsertLN net . Listener
vmselectLN net . Listener
vminsertWG sync . WaitGroup
vmselectWG sync . WaitGroup
vminsertConnsMap connsMap
vmselectConnsMap connsMap
}
type connsMap struct {
2020-06-05 08:50:11 +00:00
mu sync . Mutex
m map [ net . Conn ] struct { }
isClosed bool
2019-05-22 21:23:23 +00:00
}
func ( cm * connsMap ) Init ( ) {
cm . m = make ( map [ net . Conn ] struct { } )
2020-06-05 08:50:11 +00:00
cm . isClosed = false
2019-05-22 21:23:23 +00:00
}
2020-06-05 08:50:11 +00:00
func ( cm * connsMap ) Add ( c net . Conn ) bool {
2019-05-22 21:23:23 +00:00
cm . mu . Lock ( )
2020-06-05 08:50:11 +00:00
ok := ! cm . isClosed
if ok {
cm . m [ c ] = struct { } { }
}
2019-05-22 21:23:23 +00:00
cm . mu . Unlock ( )
2020-06-05 08:50:11 +00:00
return ok
2019-05-22 21:23:23 +00:00
}
func ( cm * connsMap ) Delete ( c net . Conn ) {
cm . mu . Lock ( )
delete ( cm . m , c )
cm . mu . Unlock ( )
}
func ( cm * connsMap ) CloseAll ( ) {
cm . mu . Lock ( )
for c := range cm . m {
_ = c . Close ( )
}
2020-06-05 08:50:11 +00:00
cm . isClosed = true
2019-05-22 21:23:23 +00:00
cm . mu . Unlock ( )
}
// NewServer returns new Server.
func NewServer ( vminsertAddr , vmselectAddr string , storage * storage . Storage ) ( * Server , error ) {
vminsertLN , err := netutil . NewTCPListener ( "vminsert" , vminsertAddr )
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "unable to listen vminsertAddr %s: %w" , vminsertAddr , err )
2019-05-22 21:23:23 +00:00
}
vmselectLN , err := netutil . NewTCPListener ( "vmselect" , vmselectAddr )
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "unable to listen vmselectAddr %s: %w" , vmselectAddr , err )
2019-05-22 21:23:23 +00:00
}
if err := encoding . CheckPrecisionBits ( uint8 ( * precisionBits ) ) ; err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "invalid -precisionBits: %w" , err )
2019-05-22 21:23:23 +00:00
}
s := & Server {
storage : storage ,
vminsertLN : vminsertLN ,
vmselectLN : vmselectLN ,
}
s . vminsertConnsMap . Init ( )
s . vmselectConnsMap . Init ( )
return s , nil
}
// RunVMInsert runs a server accepting connections from vminsert.
func ( s * Server ) RunVMInsert ( ) {
logger . Infof ( "accepting vminsert conns at %s" , s . vminsertLN . Addr ( ) )
for {
c , err := s . vminsertLN . Accept ( )
if err != nil {
if pe , ok := err . ( net . Error ) ; ok && pe . Temporary ( ) {
continue
}
if s . isStopping ( ) {
return
}
logger . Panicf ( "FATAL: cannot process vminsert conns at %s: %s" , s . vminsertLN . Addr ( ) , err )
}
logger . Infof ( "accepted vminsert conn from %s" , c . RemoteAddr ( ) )
2020-06-05 08:50:11 +00:00
if ! s . vminsertConnsMap . Add ( c ) {
// The server is closed.
_ = c . Close ( )
return
}
2019-05-22 21:23:23 +00:00
vminsertConns . Inc ( )
s . vminsertWG . Add ( 1 )
go func ( ) {
defer func ( ) {
s . vminsertConnsMap . Delete ( c )
vminsertConns . Dec ( )
s . vminsertWG . Done ( )
} ( )
// There is no need in response compression, since
2020-05-21 11:04:45 +00:00
// vmstorage sends only small packets to vminsert.
2019-05-22 21:23:23 +00:00
compressionLevel := 0
bc , err := handshake . VMInsertServer ( c , compressionLevel )
if err != nil {
if s . isStopping ( ) {
// c is stopped inside Server.MustClose
return
}
logger . Errorf ( "cannot perform vminsert handshake with client %q: %s" , c . RemoteAddr ( ) , err )
_ = c . Close ( )
return
}
defer func ( ) {
if ! s . isStopping ( ) {
logger . Infof ( "closing vminsert conn from %s" , c . RemoteAddr ( ) )
}
_ = bc . Close ( )
} ( )
logger . Infof ( "processing vminsert conn from %s" , c . RemoteAddr ( ) )
if err := s . processVMInsertConn ( bc ) ; err != nil {
if s . isStopping ( ) {
return
}
vminsertConnErrors . Inc ( )
logger . Errorf ( "cannot process vminsert conn from %s: %s" , c . RemoteAddr ( ) , err )
}
} ( )
}
}
var (
vminsertConns = metrics . NewCounter ( "vm_vminsert_conns" )
vminsertConnErrors = metrics . NewCounter ( "vm_vminsert_conn_errors_total" )
)
// RunVMSelect runs a server accepting connections from vmselect.
func ( s * Server ) RunVMSelect ( ) {
logger . Infof ( "accepting vmselect conns at %s" , s . vmselectLN . Addr ( ) )
for {
c , err := s . vmselectLN . Accept ( )
if err != nil {
if pe , ok := err . ( net . Error ) ; ok && pe . Temporary ( ) {
continue
}
if s . isStopping ( ) {
return
}
logger . Panicf ( "FATAL: cannot process vmselect conns at %s: %s" , s . vmselectLN . Addr ( ) , err )
}
logger . Infof ( "accepted vmselect conn from %s" , c . RemoteAddr ( ) )
2020-06-05 08:50:11 +00:00
if ! s . vmselectConnsMap . Add ( c ) {
// The server is closed.
_ = c . Close ( )
return
}
2019-05-22 21:23:23 +00:00
vmselectConns . Inc ( )
s . vmselectWG . Add ( 1 )
go func ( ) {
defer func ( ) {
s . vmselectConnsMap . Delete ( c )
vmselectConns . Dec ( )
s . vmselectWG . Done ( )
} ( )
2019-06-10 11:52:41 +00:00
// Compress responses to vmselect even if they already contain compressed blocks.
// Responses contain uncompressed metric names, which should compress well
// when the response contains high number of time series.
// Additionally, recently added metric blocks are usually uncompressed, so the compression
// should save network bandwidth.
compressionLevel := 1
if * disableRPCCompression {
compressionLevel = 0
}
2019-05-22 21:23:23 +00:00
bc , err := handshake . VMSelectServer ( c , compressionLevel )
if err != nil {
if s . isStopping ( ) {
// c is closed inside Server.MustClose
return
}
logger . Errorf ( "cannot perform vmselect handshake with client %q: %s" , c . RemoteAddr ( ) , err )
_ = c . Close ( )
return
}
defer func ( ) {
if ! s . isStopping ( ) {
logger . Infof ( "closing vmselect conn from %s" , c . RemoteAddr ( ) )
}
_ = bc . Close ( )
} ( )
logger . Infof ( "processing vmselect conn from %s" , c . RemoteAddr ( ) )
if err := s . processVMSelectConn ( bc ) ; err != nil {
if s . isStopping ( ) {
return
}
vmselectConnErrors . Inc ( )
logger . Errorf ( "cannot process vmselect conn %s: %s" , c . RemoteAddr ( ) , err )
}
} ( )
}
}
var (
vmselectConns = metrics . NewCounter ( "vm_vmselect_conns" )
vmselectConnErrors = metrics . NewCounter ( "vm_vmselect_conn_errors_total" )
)
// MustClose gracefully closes the server,
// so it no longer touches s.storage after returning.
func ( s * Server ) MustClose ( ) {
// Mark the server as stoping.
s . setIsStopping ( )
// Stop accepting new connections from vminsert and vmselect.
if err := s . vminsertLN . Close ( ) ; err != nil {
logger . Panicf ( "FATAL: cannot close vminsert listener: %s" , err )
}
if err := s . vmselectLN . Close ( ) ; err != nil {
logger . Panicf ( "FATAL: cannot close vmselect listener: %s" , err )
}
// Close existing connections from vminsert, so the goroutines
// processing these connections are finished.
s . vminsertConnsMap . CloseAll ( )
// Close existing connections from vmselect, so the goroutines
// processing these connections are finished.
s . vmselectConnsMap . CloseAll ( )
// Wait until all the goroutines processing vminsert and vmselect conns
// are finished.
s . vminsertWG . Wait ( )
s . vmselectWG . Wait ( )
}
func ( s * Server ) setIsStopping ( ) {
atomic . StoreUint64 ( & s . stopFlag , 1 )
}
func ( s * Server ) isStopping ( ) bool {
return atomic . LoadUint64 ( & s . stopFlag ) != 0
}
2020-04-27 06:32:08 +00:00
func ( s * Server ) processVMInsertConn ( bc * handshake . BufferedConn ) error {
2019-05-22 21:23:23 +00:00
sizeBuf := make ( [ ] byte , 8 )
2020-09-28 18:37:58 +00:00
var reqBuf [ ] byte
remoteAddr := bc . RemoteAddr ( ) . String ( )
2019-05-22 21:23:23 +00:00
for {
2020-04-27 06:32:08 +00:00
if _ , err := io . ReadFull ( bc , sizeBuf ) ; err != nil {
2019-05-22 21:23:23 +00:00
if err == io . EOF {
// Remote end gracefully closed the connection.
return nil
}
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot read packet size: %w" , err )
2019-05-22 21:23:23 +00:00
}
packetSize := encoding . UnmarshalUint64 ( sizeBuf )
if packetSize > consts . MaxInsertPacketSize {
return fmt . Errorf ( "too big packet size: %d; shouldn't exceed %d" , packetSize , consts . MaxInsertPacketSize )
}
2020-09-28 18:37:58 +00:00
reqBuf = bytesutil . Resize ( reqBuf , int ( packetSize ) )
if n , err := io . ReadFull ( bc , reqBuf ) ; err != nil {
2021-03-02 19:18:32 +00:00
return fmt . Errorf ( "cannot read packet with size %d bytes: %w; read only %d bytes" , packetSize , err , n )
2019-05-22 21:23:23 +00:00
}
2020-09-28 18:37:58 +00:00
// Send `ack` to vminsert that the packet has been received.
2020-04-27 06:32:08 +00:00
deadline := time . Now ( ) . Add ( 5 * time . Second )
if err := bc . SetWriteDeadline ( deadline ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot set write deadline for sending `ack` to vminsert: %w" , err )
2020-04-27 06:32:08 +00:00
}
sizeBuf [ 0 ] = 1
if _ , err := bc . Write ( sizeBuf [ : 1 ] ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot send `ack` to vminsert: %w" , err )
2020-04-27 06:32:08 +00:00
}
if err := bc . Flush ( ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot flush `ack` to vminsert: %w" , err )
2020-04-27 06:32:08 +00:00
}
2019-05-22 21:23:23 +00:00
vminsertPacketsRead . Inc ( )
2020-09-28 18:37:58 +00:00
uw := getUnmarshalWork ( )
uw . storage = s . storage
uw . remoteAddr = remoteAddr
uw . reqBuf , reqBuf = reqBuf , uw . reqBuf
unmarshalWorkCh <- uw
2019-05-22 21:23:23 +00:00
}
}
var (
vminsertPacketsRead = metrics . NewCounter ( "vm_vminsert_packets_read_total" )
vminsertMetricsRead = metrics . NewCounter ( "vm_vminsert_metrics_read_total" )
)
2020-09-28 18:37:58 +00:00
func getUnmarshalWork ( ) * unmarshalWork {
v := unmarshalWorkPool . Get ( )
if v == nil {
return & unmarshalWork { }
}
return v . ( * unmarshalWork )
}
// StartUnmarshalWorkers starts workers for unmarshaling data obtained from vminsert connections.
//
// This function must be called before servers are created via NewServer.
func StartUnmarshalWorkers ( ) {
2020-12-08 18:49:32 +00:00
gomaxprocs := cgroup . AvailableCPUs ( )
2020-09-28 18:37:58 +00:00
unmarshalWorkCh = make ( chan * unmarshalWork , gomaxprocs )
unmarshalWorkersWG . Add ( gomaxprocs )
for i := 0 ; i < gomaxprocs ; i ++ {
go func ( ) {
defer unmarshalWorkersWG . Done ( )
for uw := range unmarshalWorkCh {
uw . Unmarshal ( )
putUnmarshalWork ( uw )
}
} ( )
}
}
// StopUnmarshalWorkers stops unmarshal workers which were started with StartUnmarshalWorkers.
//
// This function must be called after Server.MustClose().
func StopUnmarshalWorkers ( ) {
close ( unmarshalWorkCh )
unmarshalWorkersWG . Wait ( )
}
var (
unmarshalWorkCh chan * unmarshalWork
unmarshalWorkersWG sync . WaitGroup
)
func putUnmarshalWork ( uw * unmarshalWork ) {
uw . reset ( )
unmarshalWorkPool . Put ( uw )
}
var unmarshalWorkPool sync . Pool
type unmarshalWork struct {
storage * storage . Storage
remoteAddr string
mrs [ ] storage . MetricRow
reqBuf [ ] byte
lastResetTime uint64
}
func ( uw * unmarshalWork ) reset ( ) {
if ( len ( uw . mrs ) * 4 > cap ( uw . mrs ) || len ( uw . reqBuf ) * 4 > cap ( uw . reqBuf ) ) && fasttime . UnixTimestamp ( ) - uw . lastResetTime > 10 {
// Periodically reset mrs and reqBuf in order to prevent from gradual memory usage growth
// when ceratin entries in mr contain too long labels.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/490 for details.
uw . mrs = nil
uw . reqBuf = nil
uw . lastResetTime = fasttime . UnixTimestamp ( )
}
uw . storage = nil
uw . remoteAddr = ""
uw . mrs = uw . mrs [ : 0 ]
uw . reqBuf = uw . reqBuf [ : 0 ]
}
func ( uw * unmarshalWork ) Unmarshal ( ) {
mrs := uw . mrs [ : 0 ]
tail := uw . reqBuf
for len ( tail ) > 0 {
if len ( mrs ) < cap ( mrs ) {
mrs = mrs [ : len ( mrs ) + 1 ]
} else {
mrs = append ( mrs , storage . MetricRow { } )
}
mr := & mrs [ len ( mrs ) - 1 ]
var err error
tail , err = mr . Unmarshal ( tail )
if err != nil {
logger . Errorf ( "cannot unmarshal MetricRow obtained from %s: %s" , uw . remoteAddr , err )
uw . mrs = mrs [ : 0 ]
return
}
if len ( mrs ) >= 10000 {
// Store the collected mrs in order to reduce memory usage
// when too big number of mrs are sent in each packet.
// This should help with https://github.com/VictoriaMetrics/VictoriaMetrics/issues/490
uw . mrs = mrs
uw . flushRows ( )
mrs = uw . mrs [ : 0 ]
}
}
uw . mrs = mrs
uw . flushRows ( )
}
func ( uw * unmarshalWork ) flushRows ( ) {
vminsertMetricsRead . Add ( len ( uw . mrs ) )
err := uw . storage . AddRows ( uw . mrs , uint8 ( * precisionBits ) )
uw . mrs = uw . mrs [ : 0 ]
if err != nil {
logger . Errorf ( "cannot store metrics obtained from %s: %s" , uw . remoteAddr , err )
}
}
2019-05-22 21:23:23 +00:00
func ( s * Server ) processVMSelectConn ( bc * handshake . BufferedConn ) error {
ctx := & vmselectRequestCtx {
bc : bc ,
sizeBuf : make ( [ ] byte , 8 ) ,
}
for {
2019-12-02 18:44:18 +00:00
if err := s . processVMSelectRequest ( ctx ) ; err != nil {
2019-05-22 21:23:23 +00:00
if err == io . EOF {
// Remote client gracefully closed the connection.
return nil
}
2020-08-10 10:17:12 +00:00
if errors . Is ( err , storage . ErrDeadlineExceeded ) {
return fmt . Errorf ( "cannot process vmselect request in %d seconds: %w" , ctx . timeout , err )
}
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot process vmselect request: %w" , err )
2019-05-22 21:23:23 +00:00
}
if err := bc . Flush ( ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot flush compressed buffers: %w" , err )
2019-05-22 21:23:23 +00:00
}
}
}
type vmselectRequestCtx struct {
bc * handshake . BufferedConn
sizeBuf [ ] byte
dataBuf [ ] byte
sq storage . SearchQuery
tfss [ ] * storage . TagFilters
sr storage . Search
2020-04-27 05:13:41 +00:00
mb storage . MetricBlock
2020-07-23 17:42:57 +00:00
2020-08-10 10:17:12 +00:00
// timeout in seconds for the current request
timeout uint64
2020-07-23 17:42:57 +00:00
// deadline in unix timestamp seconds for the current request.
deadline uint64
2019-05-22 21:23:23 +00:00
}
2020-11-04 22:15:43 +00:00
func ( ctx * vmselectRequestCtx ) readTimeRange ( ) ( storage . TimeRange , error ) {
var tr storage . TimeRange
minTimestamp , err := ctx . readUint64 ( )
if err != nil {
return tr , fmt . Errorf ( "cannot read minTimestamp: %w" , err )
}
maxTimestamp , err := ctx . readUint64 ( )
if err != nil {
return tr , fmt . Errorf ( "cannot read maxTimestamp: %w" , err )
}
tr . MinTimestamp = int64 ( minTimestamp )
tr . MaxTimestamp = int64 ( maxTimestamp )
return tr , nil
}
2019-05-22 21:23:23 +00:00
func ( ctx * vmselectRequestCtx ) readUint32 ( ) ( uint32 , error ) {
ctx . sizeBuf = bytesutil . Resize ( ctx . sizeBuf , 4 )
if _ , err := io . ReadFull ( ctx . bc , ctx . sizeBuf ) ; err != nil {
if err == io . EOF {
return 0 , err
}
2020-06-30 19:58:18 +00:00
return 0 , fmt . Errorf ( "cannot read uint32: %w" , err )
2019-05-22 21:23:23 +00:00
}
n := encoding . UnmarshalUint32 ( ctx . sizeBuf )
return n , nil
}
2020-09-10 21:29:26 +00:00
func ( ctx * vmselectRequestCtx ) readUint64 ( ) ( uint64 , error ) {
ctx . sizeBuf = bytesutil . Resize ( ctx . sizeBuf , 8 )
if _ , err := io . ReadFull ( ctx . bc , ctx . sizeBuf ) ; err != nil {
if err == io . EOF {
return 0 , err
}
return 0 , fmt . Errorf ( "cannot read uint64: %w" , err )
}
n := encoding . UnmarshalUint64 ( ctx . sizeBuf )
return n , nil
}
func ( ctx * vmselectRequestCtx ) readAccountIDProjectID ( ) ( uint32 , uint32 , error ) {
accountID , err := ctx . readUint32 ( )
if err != nil {
return 0 , 0 , fmt . Errorf ( "cannot read accountID: %w" , err )
}
projectID , err := ctx . readUint32 ( )
if err != nil {
return 0 , 0 , fmt . Errorf ( "cannot read projectID: %w" , err )
}
return accountID , projectID , nil
}
2020-11-16 08:55:55 +00:00
func ( ctx * vmselectRequestCtx ) readSearchQuery ( ) error {
if err := ctx . readDataBufBytes ( maxSearchQuerySize ) ; err != nil {
return fmt . Errorf ( "cannot read searchQuery: %w" , err )
}
tail , err := ctx . sq . Unmarshal ( ctx . dataBuf )
if err != nil {
return fmt . Errorf ( "cannot unmarshal SearchQuery: %w" , err )
}
if len ( tail ) > 0 {
return fmt . Errorf ( "unexpected non-zero tail left after unmarshaling SearchQuery: (len=%d) %q" , len ( tail ) , tail )
}
return nil
}
2019-05-22 21:23:23 +00:00
func ( ctx * vmselectRequestCtx ) readDataBufBytes ( maxDataSize int ) error {
ctx . sizeBuf = bytesutil . Resize ( ctx . sizeBuf , 8 )
if _ , err := io . ReadFull ( ctx . bc , ctx . sizeBuf ) ; err != nil {
if err == io . EOF {
return err
}
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot read data size: %w" , err )
2019-05-22 21:23:23 +00:00
}
dataSize := encoding . UnmarshalUint64 ( ctx . sizeBuf )
if dataSize > uint64 ( maxDataSize ) {
return fmt . Errorf ( "too big data size: %d; it mustn't exceed %d bytes" , dataSize , maxDataSize )
}
ctx . dataBuf = bytesutil . Resize ( ctx . dataBuf , int ( dataSize ) )
if dataSize == 0 {
return nil
}
2019-09-11 11:11:37 +00:00
if n , err := io . ReadFull ( ctx . bc , ctx . dataBuf ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot read data with size %d: %w; read only %d bytes" , dataSize , err , n )
2019-05-22 21:23:23 +00:00
}
return nil
}
2019-08-04 19:15:33 +00:00
func ( ctx * vmselectRequestCtx ) readBool ( ) ( bool , error ) {
ctx . dataBuf = bytesutil . Resize ( ctx . dataBuf , 1 )
if _ , err := io . ReadFull ( ctx . bc , ctx . dataBuf ) ; err != nil {
if err == io . EOF {
return false , err
}
2020-06-30 19:58:18 +00:00
return false , fmt . Errorf ( "cannot read bool: %w" , err )
2019-08-04 19:15:33 +00:00
}
v := ctx . dataBuf [ 0 ] != 0
return v , nil
}
2020-09-10 21:29:26 +00:00
func ( ctx * vmselectRequestCtx ) readByte ( ) ( byte , error ) {
ctx . dataBuf = bytesutil . Resize ( ctx . dataBuf , 1 )
if _ , err := io . ReadFull ( ctx . bc , ctx . dataBuf ) ; err != nil {
if err == io . EOF {
return 0 , err
}
return 0 , fmt . Errorf ( "cannot read byte: %w" , err )
}
b := ctx . dataBuf [ 0 ]
return b , nil
}
2019-05-22 21:23:23 +00:00
func ( ctx * vmselectRequestCtx ) writeDataBufBytes ( ) error {
if err := ctx . writeUint64 ( uint64 ( len ( ctx . dataBuf ) ) ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot write data size: %w" , err )
2019-05-22 21:23:23 +00:00
}
if len ( ctx . dataBuf ) == 0 {
return nil
}
if _ , err := ctx . bc . Write ( ctx . dataBuf ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot write data with size %d: %w" , len ( ctx . dataBuf ) , err )
2019-05-22 21:23:23 +00:00
}
return nil
}
2020-02-13 15:32:54 +00:00
// maxErrorMessageSize is the maximum size of error message to send to clients.
const maxErrorMessageSize = 64 * 1024
func ( ctx * vmselectRequestCtx ) writeErrorMessage ( err error ) error {
2020-08-10 10:17:12 +00:00
if errors . Is ( err , storage . ErrDeadlineExceeded ) {
err = fmt . Errorf ( "cannot execute request in %d seconds: %w" , ctx . timeout , err )
}
2020-02-13 15:32:54 +00:00
errMsg := err . Error ( )
if len ( errMsg ) > maxErrorMessageSize {
// Trim too long error message.
errMsg = errMsg [ : maxErrorMessageSize ]
}
if err := ctx . writeString ( errMsg ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot send error message %q to client: %w" , errMsg , err )
2020-02-13 15:32:54 +00:00
}
return nil
}
2019-05-22 21:23:23 +00:00
func ( ctx * vmselectRequestCtx ) writeString ( s string ) error {
ctx . dataBuf = append ( ctx . dataBuf [ : 0 ] , s ... )
return ctx . writeDataBufBytes ( )
}
func ( ctx * vmselectRequestCtx ) writeUint64 ( n uint64 ) error {
ctx . sizeBuf = encoding . MarshalUint64 ( ctx . sizeBuf [ : 0 ] , n )
if _ , err := ctx . bc . Write ( ctx . sizeBuf ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot write uint64 %d: %w" , n , err )
2019-05-22 21:23:23 +00:00
}
return nil
}
const maxRPCNameSize = 128
var zeroTime time . Time
func ( s * Server ) processVMSelectRequest ( ctx * vmselectRequestCtx ) error {
// Read rpcName
// Do not set deadline on reading rpcName, since it may take a
// lot of time for idle connection.
if err := ctx . readDataBufBytes ( maxRPCNameSize ) ; err != nil {
if err == io . EOF {
// Remote client gracefully closed the connection.
return err
}
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot read rpcName: %w" , err )
2019-05-22 21:23:23 +00:00
}
2020-07-23 17:42:57 +00:00
rpcName := string ( ctx . dataBuf )
2019-05-22 21:23:23 +00:00
// Limit the time required for reading request args.
if err := ctx . bc . SetReadDeadline ( time . Now ( ) . Add ( 5 * time . Second ) ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot set read deadline for reading request args: %w" , err )
2019-05-22 21:23:23 +00:00
}
defer func ( ) {
_ = ctx . bc . SetReadDeadline ( zeroTime )
} ( )
2020-07-23 17:42:57 +00:00
// Read the timeout for request execution.
timeout , err := ctx . readUint32 ( )
if err != nil {
return fmt . Errorf ( "cannot read timeout for the request %q: %w" , rpcName , err )
}
2020-08-10 10:17:12 +00:00
ctx . timeout = uint64 ( timeout )
2020-07-23 17:42:57 +00:00
ctx . deadline = fasttime . UnixTimestamp ( ) + uint64 ( timeout )
switch rpcName {
case "search_v4" :
2020-11-16 08:55:55 +00:00
return s . processVMSelectSearch ( ctx )
case "searchMetricNames_v1" :
return s . processVMSelectSearchMetricNames ( ctx )
2020-11-04 22:15:43 +00:00
case "labelValuesOnTimeRange_v1" :
return s . processVMSelectLabelValuesOnTimeRange ( ctx )
2020-07-23 17:42:57 +00:00
case "labelValues_v2" :
2019-05-22 21:23:23 +00:00
return s . processVMSelectLabelValues ( ctx )
2020-09-10 21:29:26 +00:00
case "tagValueSuffixes_v1" :
return s . processVMSelectTagValueSuffixes ( ctx )
2020-07-23 17:42:57 +00:00
case "labelEntries_v2" :
2019-06-10 15:55:20 +00:00
return s . processVMSelectLabelEntries ( ctx )
2020-11-04 22:15:43 +00:00
case "labelsOnTimeRange_v1" :
return s . processVMSelectLabelsOnTimeRange ( ctx )
2020-07-23 17:42:57 +00:00
case "labels_v2" :
2019-05-22 21:23:23 +00:00
return s . processVMSelectLabels ( ctx )
2020-07-23 17:42:57 +00:00
case "seriesCount_v2" :
2019-05-22 21:23:23 +00:00
return s . processVMSelectSeriesCount ( ctx )
2020-07-23 17:42:57 +00:00
case "tsdbStatus_v2" :
2020-04-22 16:57:36 +00:00
return s . processVMSelectTSDBStatus ( ctx )
2020-07-23 17:42:57 +00:00
case "deleteMetrics_v3" :
2019-05-22 21:23:23 +00:00
return s . processVMSelectDeleteMetrics ( ctx )
2020-11-23 10:33:17 +00:00
case "registerMetricNames_v1" :
return s . processVMSelectRegisterMetricNames ( ctx )
2019-05-22 21:23:23 +00:00
default :
return fmt . Errorf ( "unsupported rpcName: %q" , ctx . dataBuf )
}
}
2020-11-23 10:33:17 +00:00
const maxMetricNameRawSize = 1024 * 1024
const maxMetricNamesPerRequest = 1024 * 1024
func ( s * Server ) processVMSelectRegisterMetricNames ( ctx * vmselectRequestCtx ) error {
vmselectRegisterMetricNamesRequests . Inc ( )
// Read request
metricsCount , err := ctx . readUint64 ( )
if err != nil {
return fmt . Errorf ( "cannot read metricsCount: %w" , err )
}
if metricsCount > maxMetricNamesPerRequest {
return fmt . Errorf ( "too many metric names in a single request; got %d; mustn't exceed %d" , metricsCount , maxMetricNamesPerRequest )
}
mrs := make ( [ ] storage . MetricRow , metricsCount )
for i := 0 ; i < int ( metricsCount ) ; i ++ {
if err := ctx . readDataBufBytes ( maxMetricNameRawSize ) ; err != nil {
return fmt . Errorf ( "cannot read metricNameRaw: %w" , err )
}
mr := & mrs [ i ]
mr . MetricNameRaw = append ( mr . MetricNameRaw [ : 0 ] , ctx . dataBuf ... )
n , err := ctx . readUint64 ( )
if err != nil {
return fmt . Errorf ( "cannot read timestamp: %w" , err )
}
mr . Timestamp = int64 ( n )
}
// Register metric names from mrs.
if err := s . storage . RegisterMetricNames ( mrs ) ; err != nil {
return ctx . writeErrorMessage ( err )
}
// Send an empty error message to vmselect.
if err := ctx . writeString ( "" ) ; err != nil {
return fmt . Errorf ( "cannot send empty error message: %w" , err )
}
return nil
}
2019-05-22 21:23:23 +00:00
const maxTagFiltersSize = 64 * 1024
func ( s * Server ) processVMSelectDeleteMetrics ( ctx * vmselectRequestCtx ) error {
vmselectDeleteMetricsRequests . Inc ( )
// Read request
if err := ctx . readDataBufBytes ( maxTagFiltersSize ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot read labelName: %w" , err )
2019-05-22 21:23:23 +00:00
}
tail , err := ctx . sq . Unmarshal ( ctx . dataBuf )
if err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot unmarshal SearchQuery: %w" , err )
2019-05-22 21:23:23 +00:00
}
if len ( tail ) > 0 {
return fmt . Errorf ( "unexpected non-zero tail left after unmarshaling SearchQuery: (len=%d) %q" , len ( tail ) , tail )
}
// Setup ctx.tfss
2021-02-02 22:24:05 +00:00
tr := storage . TimeRange {
MinTimestamp : 0 ,
MaxTimestamp : time . Now ( ) . UnixNano ( ) / 1e6 ,
}
if err := ctx . setupTfss ( s . storage , tr ) ; err != nil {
2020-02-13 15:32:54 +00:00
return ctx . writeErrorMessage ( err )
2019-05-22 21:23:23 +00:00
}
// Delete the given metrics.
deletedCount , err := s . storage . DeleteMetrics ( ctx . tfss )
if err != nil {
2020-02-13 15:32:54 +00:00
return ctx . writeErrorMessage ( err )
2019-05-22 21:23:23 +00:00
}
// Send an empty error message to vmselect.
if err := ctx . writeString ( "" ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot send empty error message: %w" , err )
2019-05-22 21:23:23 +00:00
}
// Send deletedCount to vmselect.
if err := ctx . writeUint64 ( uint64 ( deletedCount ) ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot send deletedCount=%d: %w" , deletedCount , err )
2019-05-22 21:23:23 +00:00
}
return nil
}
2020-11-04 22:15:43 +00:00
func ( s * Server ) processVMSelectLabelsOnTimeRange ( ctx * vmselectRequestCtx ) error {
vmselectLabelsOnTimeRangeRequests . Inc ( )
// Read request
accountID , projectID , err := ctx . readAccountIDProjectID ( )
if err != nil {
return err
}
tr , err := ctx . readTimeRange ( )
if err != nil {
return err
}
// Search for tag keys
labels , err := s . storage . SearchTagKeysOnTimeRange ( accountID , projectID , tr , * maxTagKeysPerSearch , ctx . deadline )
if err != nil {
return ctx . writeErrorMessage ( err )
}
// Send an empty error message to vmselect.
if err := ctx . writeString ( "" ) ; err != nil {
return fmt . Errorf ( "cannot send empty error message: %w" , err )
}
// Send labels to vmselect
for _ , label := range labels {
if len ( label ) == 0 {
// Do this substitution in order to prevent clashing with 'end of response' marker.
label = "__name__"
}
if err := ctx . writeString ( label ) ; err != nil {
return fmt . Errorf ( "cannot write label %q: %w" , label , err )
}
}
// Send 'end of response' marker
if err := ctx . writeString ( "" ) ; err != nil {
return fmt . Errorf ( "cannot send 'end of response' marker" )
}
return nil
}
2019-05-22 21:23:23 +00:00
func ( s * Server ) processVMSelectLabels ( ctx * vmselectRequestCtx ) error {
vmselectLabelsRequests . Inc ( )
// Read request
2020-09-10 21:29:26 +00:00
accountID , projectID , err := ctx . readAccountIDProjectID ( )
2019-05-22 21:23:23 +00:00
if err != nil {
2020-09-10 21:29:26 +00:00
return err
2019-05-22 21:23:23 +00:00
}
// Search for tag keys
2020-07-23 17:42:57 +00:00
labels , err := s . storage . SearchTagKeys ( accountID , projectID , * maxTagKeysPerSearch , ctx . deadline )
2019-05-22 21:23:23 +00:00
if err != nil {
2020-02-13 15:32:54 +00:00
return ctx . writeErrorMessage ( err )
2019-05-22 21:23:23 +00:00
}
// Send an empty error message to vmselect.
if err := ctx . writeString ( "" ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot send empty error message: %w" , err )
2019-05-22 21:23:23 +00:00
}
// Send labels to vmselect
for _ , label := range labels {
if len ( label ) == 0 {
// Do this substitution in order to prevent clashing with 'end of response' marker.
label = "__name__"
}
if err := ctx . writeString ( label ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot write label %q: %w" , label , err )
2019-05-22 21:23:23 +00:00
}
}
// Send 'end of response' marker
if err := ctx . writeString ( "" ) ; err != nil {
return fmt . Errorf ( "cannot send 'end of response' marker" )
}
return nil
}
const maxLabelValueSize = 16 * 1024
2020-11-04 22:15:43 +00:00
func ( s * Server ) processVMSelectLabelValuesOnTimeRange ( ctx * vmselectRequestCtx ) error {
vmselectLabelValuesOnTimeRangeRequests . Inc ( )
// Read request
accountID , projectID , err := ctx . readAccountIDProjectID ( )
if err != nil {
return err
}
2020-11-05 00:07:59 +00:00
if err := ctx . readDataBufBytes ( maxLabelValueSize ) ; err != nil {
return fmt . Errorf ( "cannot read labelName: %w" , err )
}
labelName := string ( ctx . dataBuf )
2020-11-04 22:15:43 +00:00
tr , err := ctx . readTimeRange ( )
if err != nil {
return err
}
// Search for tag values
2020-11-05 00:07:59 +00:00
labelValues , err := s . storage . SearchTagValuesOnTimeRange ( accountID , projectID , [ ] byte ( labelName ) , tr , * maxTagValuesPerSearch , ctx . deadline )
2020-11-04 22:15:43 +00:00
if err != nil {
return ctx . writeErrorMessage ( err )
}
// Send an empty error message to vmselect.
if err := ctx . writeString ( "" ) ; err != nil {
return fmt . Errorf ( "cannot send empty error message: %w" , err )
}
return writeLabelValues ( ctx , labelValues )
}
2019-05-22 21:23:23 +00:00
func ( s * Server ) processVMSelectLabelValues ( ctx * vmselectRequestCtx ) error {
vmselectLabelValuesRequests . Inc ( )
// Read request
2020-09-10 21:29:26 +00:00
accountID , projectID , err := ctx . readAccountIDProjectID ( )
2019-05-22 21:23:23 +00:00
if err != nil {
2020-09-10 21:29:26 +00:00
return err
2019-05-22 21:23:23 +00:00
}
if err := ctx . readDataBufBytes ( maxLabelValueSize ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot read labelName: %w" , err )
2019-05-22 21:23:23 +00:00
}
labelName := ctx . dataBuf
// Search for tag values
2020-07-23 17:42:57 +00:00
labelValues , err := s . storage . SearchTagValues ( accountID , projectID , labelName , * maxTagValuesPerSearch , ctx . deadline )
2019-05-22 21:23:23 +00:00
if err != nil {
2020-02-13 15:32:54 +00:00
return ctx . writeErrorMessage ( err )
2019-05-22 21:23:23 +00:00
}
// Send an empty error message to vmselect.
if err := ctx . writeString ( "" ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot send empty error message: %w" , err )
2019-05-22 21:23:23 +00:00
}
2019-06-10 15:55:20 +00:00
return writeLabelValues ( ctx , labelValues )
}
2020-09-10 21:29:26 +00:00
func ( s * Server ) processVMSelectTagValueSuffixes ( ctx * vmselectRequestCtx ) error {
vmselectTagValueSuffixesRequests . Inc ( )
// read request
accountID , projectID , err := ctx . readAccountIDProjectID ( )
if err != nil {
return err
}
2020-11-04 22:15:43 +00:00
tr , err := ctx . readTimeRange ( )
2020-09-10 21:29:26 +00:00
if err != nil {
2020-11-04 22:15:43 +00:00
return err
2020-09-10 21:29:26 +00:00
}
if err := ctx . readDataBufBytes ( maxLabelValueSize ) ; err != nil {
return fmt . Errorf ( "cannot read tagKey: %w" , err )
}
tagKey := append ( [ ] byte { } , ctx . dataBuf ... )
if err := ctx . readDataBufBytes ( maxLabelValueSize ) ; err != nil {
return fmt . Errorf ( "cannot read tagValuePrefix: %w" , err )
}
tagValuePrefix := append ( [ ] byte { } , ctx . dataBuf ... )
delimiter , err := ctx . readByte ( )
if err != nil {
2020-09-23 19:46:24 +00:00
return fmt . Errorf ( "cannot read delimiter: %w" , err )
2020-09-10 21:29:26 +00:00
}
// Search for tag value suffixes
suffixes , err := s . storage . SearchTagValueSuffixes ( accountID , projectID , tr , tagKey , tagValuePrefix , delimiter , * maxTagValueSuffixesPerSearch , ctx . deadline )
if err != nil {
return ctx . writeErrorMessage ( err )
}
2021-02-02 22:24:05 +00:00
if len ( suffixes ) >= * maxTagValueSuffixesPerSearch {
err := fmt . Errorf ( "more than -search.maxTagValueSuffixesPerSearch=%d tag value suffixes found " +
"for tagKey=%q, tagValuePrefix=%q, delimiter=%c on time range %s; " +
"either narrow down the query or increase -search.maxTagValueSuffixesPerSearch command-line flag value" ,
* maxTagValueSuffixesPerSearch , tagKey , tagValuePrefix , delimiter , tr . String ( ) )
return ctx . writeErrorMessage ( err )
}
2020-09-10 21:29:26 +00:00
// Send an empty error message to vmselect.
if err := ctx . writeString ( "" ) ; err != nil {
return fmt . Errorf ( "cannot send empty error message: %w" , err )
}
// Send suffixes to vmselect.
// Suffixes may contain empty string, so prepend suffixes with suffixCount.
if err := ctx . writeUint64 ( uint64 ( len ( suffixes ) ) ) ; err != nil {
return fmt . Errorf ( "cannot write suffixesCount: %w" , err )
}
for i , suffix := range suffixes {
if err := ctx . writeString ( suffix ) ; err != nil {
return fmt . Errorf ( "cannot write suffix #%d: %w" , i + 1 , err )
}
}
return nil
}
2019-06-10 15:55:20 +00:00
func writeLabelValues ( ctx * vmselectRequestCtx , labelValues [ ] string ) error {
2019-05-22 21:23:23 +00:00
for _ , labelValue := range labelValues {
if len ( labelValue ) == 0 {
// Skip empty label values, since they have no sense for prometheus.
continue
}
if err := ctx . writeString ( labelValue ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot write labelValue %q: %w" , labelValue , err )
2019-05-22 21:23:23 +00:00
}
}
2019-06-10 15:55:20 +00:00
// Send 'end of label values' marker
if err := ctx . writeString ( "" ) ; err != nil {
return fmt . Errorf ( "cannot send 'end of response' marker" )
}
return nil
}
func ( s * Server ) processVMSelectLabelEntries ( ctx * vmselectRequestCtx ) error {
vmselectLabelEntriesRequests . Inc ( )
// Read request
2020-09-10 21:29:26 +00:00
accountID , projectID , err := ctx . readAccountIDProjectID ( )
2019-06-10 15:55:20 +00:00
if err != nil {
2020-09-10 21:29:26 +00:00
return err
2019-06-10 15:55:20 +00:00
}
// Perform the request
2020-07-23 17:42:57 +00:00
labelEntries , err := s . storage . SearchTagEntries ( accountID , projectID , * maxTagKeysPerSearch , * maxTagValuesPerSearch , ctx . deadline )
2019-06-10 15:55:20 +00:00
if err != nil {
2020-02-13 15:32:54 +00:00
return ctx . writeErrorMessage ( err )
2019-06-10 15:55:20 +00:00
}
// Send an empty error message to vmselect.
if err := ctx . writeString ( "" ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot send empty error message: %w" , err )
2019-06-10 15:55:20 +00:00
}
// Send labelEntries to vmselect
for i := range labelEntries {
e := & labelEntries [ i ]
2019-06-10 16:51:05 +00:00
label := e . Key
if label == "" {
// Do this substitution in order to prevent clashing with 'end of response' marker.
label = "__name__"
}
if err := ctx . writeString ( label ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot write label %q: %w" , label , err )
2019-06-10 15:55:20 +00:00
}
if err := writeLabelValues ( ctx , e . Values ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot write label values for %q: %w" , label , err )
2019-06-10 15:55:20 +00:00
}
}
2019-05-22 21:23:23 +00:00
// Send 'end of response' marker
if err := ctx . writeString ( "" ) ; err != nil {
return fmt . Errorf ( "cannot send 'end of response' marker" )
}
return nil
}
func ( s * Server ) processVMSelectSeriesCount ( ctx * vmselectRequestCtx ) error {
vmselectSeriesCountRequests . Inc ( )
// Read request
2020-09-10 21:29:26 +00:00
accountID , projectID , err := ctx . readAccountIDProjectID ( )
2019-05-22 21:23:23 +00:00
if err != nil {
2020-09-10 21:29:26 +00:00
return err
2019-05-22 21:23:23 +00:00
}
// Execute the request
2020-07-23 17:42:57 +00:00
n , err := s . storage . GetSeriesCount ( accountID , projectID , ctx . deadline )
2019-05-22 21:23:23 +00:00
if err != nil {
2020-02-13 15:32:54 +00:00
return ctx . writeErrorMessage ( err )
2019-05-22 21:23:23 +00:00
}
// Send an empty error message to vmselect.
if err := ctx . writeString ( "" ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot send empty error message: %w" , err )
2019-05-22 21:23:23 +00:00
}
// Send series count to vmselect.
if err := ctx . writeUint64 ( n ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot write series count to vmselect: %w" , err )
2019-05-22 21:23:23 +00:00
}
return nil
}
2020-04-22 16:57:36 +00:00
func ( s * Server ) processVMSelectTSDBStatus ( ctx * vmselectRequestCtx ) error {
vmselectTSDBStatusRequests . Inc ( )
// Read request
2020-09-10 21:29:26 +00:00
accountID , projectID , err := ctx . readAccountIDProjectID ( )
2020-04-22 16:57:36 +00:00
if err != nil {
2020-09-10 21:29:26 +00:00
return err
2020-04-22 16:57:36 +00:00
}
date , err := ctx . readUint32 ( )
if err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot read date: %w" , err )
2020-04-22 16:57:36 +00:00
}
topN , err := ctx . readUint32 ( )
if err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot read topN: %w" , err )
2020-04-22 16:57:36 +00:00
}
// Execute the request
2020-07-23 17:42:57 +00:00
status , err := s . storage . GetTSDBStatusForDate ( accountID , projectID , uint64 ( date ) , int ( topN ) , ctx . deadline )
2020-04-22 16:57:36 +00:00
if err != nil {
return ctx . writeErrorMessage ( err )
}
// Send an empty error message to vmselect.
if err := ctx . writeString ( "" ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot send empty error message: %w" , err )
2020-04-22 16:57:36 +00:00
}
// Send status to vmselect.
if err := writeTopHeapEntries ( ctx , status . SeriesCountByMetricName ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot write seriesCountByMetricName to vmselect: %w" , err )
2020-04-22 16:57:36 +00:00
}
if err := writeTopHeapEntries ( ctx , status . LabelValueCountByLabelName ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot write labelValueCountByLabelName to vmselect: %w" , err )
2020-04-22 16:57:36 +00:00
}
if err := writeTopHeapEntries ( ctx , status . SeriesCountByLabelValuePair ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot write seriesCountByLabelValuePair to vmselect: %w" , err )
2020-04-22 16:57:36 +00:00
}
return nil
}
func writeTopHeapEntries ( ctx * vmselectRequestCtx , a [ ] storage . TopHeapEntry ) error {
if err := ctx . writeUint64 ( uint64 ( len ( a ) ) ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot write topHeapEntries size: %w" , err )
2020-04-22 16:57:36 +00:00
}
for _ , e := range a {
if err := ctx . writeString ( e . Name ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot write topHeapEntry name: %w" , err )
2020-04-22 16:57:36 +00:00
}
if err := ctx . writeUint64 ( e . Count ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot write topHeapEntry count: %w" , err )
2020-04-22 16:57:36 +00:00
}
}
return nil
}
2019-05-22 21:23:23 +00:00
// maxSearchQuerySize is the maximum size of SearchQuery packet in bytes.
const maxSearchQuerySize = 1024 * 1024
2020-11-16 08:55:55 +00:00
func ( s * Server ) processVMSelectSearchMetricNames ( ctx * vmselectRequestCtx ) error {
vmselectSearchMetricNamesRequests . Inc ( )
2019-05-22 21:23:23 +00:00
2020-11-16 08:55:55 +00:00
// Read request.
if err := ctx . readSearchQuery ( ) ; err != nil {
return err
2019-05-22 21:23:23 +00:00
}
2020-11-16 08:55:55 +00:00
// Search metric names.
tr := storage . TimeRange {
MinTimestamp : ctx . sq . MinTimestamp ,
MaxTimestamp : ctx . sq . MaxTimestamp ,
}
2021-02-02 22:24:05 +00:00
if err := ctx . setupTfss ( s . storage , tr ) ; err != nil {
return ctx . writeErrorMessage ( err )
}
2020-11-16 16:00:50 +00:00
mns , err := s . storage . SearchMetricNames ( ctx . tfss , tr , * maxMetricsPerSearch , ctx . deadline )
2019-05-22 21:23:23 +00:00
if err != nil {
2020-11-16 08:55:55 +00:00
return ctx . writeErrorMessage ( err )
2019-05-22 21:23:23 +00:00
}
2020-11-16 08:55:55 +00:00
// Send empty error message to vmselect.
if err := ctx . writeString ( "" ) ; err != nil {
return fmt . Errorf ( "cannot send empty error message: %w" , err )
}
// Send response.
metricNamesCount := len ( mns )
if err := ctx . writeUint64 ( uint64 ( metricNamesCount ) ) ; err != nil {
return fmt . Errorf ( "cannot send metricNamesCount: %w" , err )
}
for i , mn := range mns {
ctx . dataBuf = mn . Marshal ( ctx . dataBuf [ : 0 ] )
if err := ctx . writeDataBufBytes ( ) ; err != nil {
return fmt . Errorf ( "cannot send metricName #%d: %w" , i + 1 , err )
}
}
return nil
}
func ( s * Server ) processVMSelectSearch ( ctx * vmselectRequestCtx ) error {
vmselectSearchRequests . Inc ( )
// Read request.
if err := ctx . readSearchQuery ( ) ; err != nil {
return err
2019-05-22 21:23:23 +00:00
}
2019-08-04 19:15:33 +00:00
fetchData , err := ctx . readBool ( )
if err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot read `fetchData` bool: %w" , err )
2019-08-04 19:15:33 +00:00
}
2019-05-22 21:23:23 +00:00
// Setup search.
tr := storage . TimeRange {
MinTimestamp : ctx . sq . MinTimestamp ,
MaxTimestamp : ctx . sq . MaxTimestamp ,
}
2021-02-02 22:24:05 +00:00
if err := ctx . setupTfss ( s . storage , tr ) ; err != nil {
return ctx . writeErrorMessage ( err )
}
2020-06-30 21:58:26 +00:00
if err := checkTimeRange ( s . storage , tr ) ; err != nil {
return ctx . writeErrorMessage ( err )
}
2021-03-16 23:12:28 +00:00
startTime := time . Now ( )
2020-07-23 17:42:57 +00:00
ctx . sr . Init ( s . storage , ctx . tfss , tr , * maxMetricsPerSearch , ctx . deadline )
2021-03-16 23:12:28 +00:00
indexSearchDuration . UpdateDuration ( startTime )
2019-05-22 21:23:23 +00:00
defer ctx . sr . MustClose ( )
if err := ctx . sr . Error ( ) ; err != nil {
2020-02-13 15:32:54 +00:00
return ctx . writeErrorMessage ( err )
2019-05-22 21:23:23 +00:00
}
// Send empty error message to vmselect.
if err := ctx . writeString ( "" ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot send empty error message: %w" , err )
2019-05-22 21:23:23 +00:00
}
// Send found blocks to vmselect.
for ctx . sr . NextMetricBlock ( ) {
2020-04-27 05:13:41 +00:00
ctx . mb . MetricName = ctx . sr . MetricBlockRef . MetricName
ctx . sr . MetricBlockRef . BlockRef . MustReadBlock ( & ctx . mb . Block , fetchData )
2019-05-22 21:23:23 +00:00
vmselectMetricBlocksRead . Inc ( )
2020-04-27 05:13:41 +00:00
vmselectMetricRowsRead . Add ( ctx . mb . Block . RowsCount ( ) )
2019-05-22 21:23:23 +00:00
2020-04-27 05:13:41 +00:00
ctx . dataBuf = ctx . mb . Marshal ( ctx . dataBuf [ : 0 ] )
2019-05-22 21:23:23 +00:00
if err := ctx . writeDataBufBytes ( ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot send MetricBlock: %w" , err )
2019-05-22 21:23:23 +00:00
}
}
if err := ctx . sr . Error ( ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "search error: %w" , err )
2019-05-22 21:23:23 +00:00
}
// Send 'end of response' marker
if err := ctx . writeString ( "" ) ; err != nil {
return fmt . Errorf ( "cannot send 'end of response' marker" )
}
return nil
}
2021-03-16 23:12:28 +00:00
var indexSearchDuration = metrics . NewHistogram ( ` vm_index_search_duration_seconds ` )
2020-06-30 21:58:26 +00:00
// checkTimeRange returns true if the given tr is denied for querying.
func checkTimeRange ( s * storage . Storage , tr storage . TimeRange ) error {
if ! * denyQueriesOutsideRetention {
return nil
}
2020-10-20 13:10:46 +00:00
retentionMsecs := s . RetentionMsecs ( )
minAllowedTimestamp := int64 ( fasttime . UnixTimestamp ( ) * 1000 ) - retentionMsecs
2020-06-30 21:58:26 +00:00
if tr . MinTimestamp > minAllowedTimestamp {
return nil
}
return & httpserver . ErrorWithStatusCode {
2020-10-20 13:10:46 +00:00
Err : fmt . Errorf ( "the given time range %s is outside the allowed retention %.3f days according to -denyQueriesOutsideRetention" ,
& tr , float64 ( retentionMsecs ) / ( 24 * 3600 * 1000 ) ) ,
2020-06-30 21:58:26 +00:00
StatusCode : http . StatusServiceUnavailable ,
}
}
2019-05-22 21:23:23 +00:00
var (
2020-11-23 10:33:17 +00:00
vmselectRegisterMetricNamesRequests = metrics . NewCounter ( ` vm_vmselect_rpc_requests_total { name="register_metric_names"} ` )
vmselectDeleteMetricsRequests = metrics . NewCounter ( ` vm_vmselect_rpc_requests_total { name="delete_metrics"} ` )
vmselectLabelsOnTimeRangeRequests = metrics . NewCounter ( ` vm_vmselect_rpc_requests_total { name="labels_on_time_range"} ` )
vmselectLabelsRequests = metrics . NewCounter ( ` vm_vmselect_rpc_requests_total { name="labels"} ` )
vmselectLabelValuesOnTimeRangeRequests = metrics . NewCounter ( ` vm_vmselect_rpc_requests_total { name="label_values_on_time_range"} ` )
vmselectLabelValuesRequests = metrics . NewCounter ( ` vm_vmselect_rpc_requests_total { name="label_values"} ` )
vmselectTagValueSuffixesRequests = metrics . NewCounter ( ` vm_vmselect_rpc_requests_total { name="tag_value_suffixes"} ` )
vmselectLabelEntriesRequests = metrics . NewCounter ( ` vm_vmselect_rpc_requests_total { name="label_entries"} ` )
vmselectSeriesCountRequests = metrics . NewCounter ( ` vm_vmselect_rpc_requests_total { name="series_count"} ` )
vmselectTSDBStatusRequests = metrics . NewCounter ( ` vm_vmselect_rpc_requests_total { name="tsdb_status"} ` )
vmselectSearchMetricNamesRequests = metrics . NewCounter ( ` vm_vmselect_rpc_requests_total { name="search_metric_names"} ` )
vmselectSearchRequests = metrics . NewCounter ( ` vm_vmselect_rpc_requests_total { name="search"} ` )
vmselectMetricBlocksRead = metrics . NewCounter ( ` vm_vmselect_metric_blocks_read_total ` )
vmselectMetricRowsRead = metrics . NewCounter ( ` vm_vmselect_metric_rows_read_total ` )
2019-05-22 21:23:23 +00:00
)
2021-02-02 22:24:05 +00:00
func ( ctx * vmselectRequestCtx ) setupTfss ( s * storage . Storage , tr storage . TimeRange ) error {
2019-05-22 21:23:23 +00:00
tfss := ctx . tfss [ : 0 ]
2021-02-02 22:24:05 +00:00
accountID := ctx . sq . AccountID
projectID := ctx . sq . ProjectID
2019-05-22 21:23:23 +00:00
for _ , tagFilters := range ctx . sq . TagFilterss {
2021-02-02 22:24:05 +00:00
tfs := storage . NewTagFilters ( accountID , projectID )
2019-05-22 21:23:23 +00:00
for i := range tagFilters {
tf := & tagFilters [ i ]
2021-02-02 22:24:05 +00:00
if string ( tf . Key ) == "__graphite__" {
query := tf . Value
paths , err := s . SearchGraphitePaths ( accountID , projectID , tr , query , * maxMetricsPerSearch , ctx . deadline )
if err != nil {
return fmt . Errorf ( "error when searching for Graphite paths for query %q: %w" , query , err )
}
if len ( paths ) >= * maxMetricsPerSearch {
return fmt . Errorf ( "more than -search.maxUniqueTimeseries=%d time series match Graphite query %q; " +
"either narrow down the query or increase -search.maxUniqueTimeseries command-line flag value" , * maxMetricsPerSearch , query )
}
tfs . AddGraphiteQuery ( query , paths , tf . IsNegative )
continue
}
2019-05-22 21:23:23 +00:00
if err := tfs . Add ( tf . Key , tf . Value , tf . IsNegative , tf . IsRegexp ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot parse tag filter %s: %w" , tf , err )
2019-05-22 21:23:23 +00:00
}
}
2020-06-05 10:17:23 +00:00
tfss = append ( tfss , tfs )
2020-03-30 15:34:51 +00:00
tfss = append ( tfss , tfs . Finalize ( ) ... )
2019-05-22 21:23:23 +00:00
}
ctx . tfss = tfss
return nil
}