mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-31 15:06:26 +00:00
23e53bdb80
it should simplify migration and keep good performance for vmstorage component
1061 lines
33 KiB
Go
1061 lines
33 KiB
Go
package vmselectapi
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
)
|
|
|
|
// Server processes vmselect requests.
|
|
type Server struct {
|
|
// api contains the implementation of the server API for vmselect requests.
|
|
api API
|
|
|
|
// limits contains various limits for the Server.
|
|
limits Limits
|
|
|
|
// disableResponseCompression controls whether vmselect server must compress responses.
|
|
disableResponseCompression bool
|
|
|
|
// ln is the listener for incoming connections to the server.
|
|
ln net.Listener
|
|
|
|
// The channel for limiting the number of concurrently executed requests.
|
|
concurrencyLimitCh chan struct{}
|
|
|
|
// connsMap is a map of currently established connections to the server.
|
|
// It is used for closing the connections when MustStop() is called.
|
|
connsMap ingestserver.ConnsMap
|
|
|
|
// wg is used for waiting for worker goroutines to stop when MustStop() is called.
|
|
wg sync.WaitGroup
|
|
|
|
// stopFlag is set to true when the server needs to stop.
|
|
stopFlag uint32
|
|
|
|
concurrencyLimitReached *metrics.Counter
|
|
concurrencyLimitTimeout *metrics.Counter
|
|
|
|
vmselectConns *metrics.Counter
|
|
vmselectConnErrors *metrics.Counter
|
|
|
|
indexSearchDuration *metrics.Histogram
|
|
|
|
registerMetricNamesRequests *metrics.Counter
|
|
deleteSeriesRequests *metrics.Counter
|
|
labelNamesRequests *metrics.Counter
|
|
labelValuesRequests *metrics.Counter
|
|
tagValueSuffixesRequests *metrics.Counter
|
|
seriesCountRequests *metrics.Counter
|
|
tsdbStatusRequests *metrics.Counter
|
|
searchMetricNamesRequests *metrics.Counter
|
|
searchRequests *metrics.Counter
|
|
tenantsRequests *metrics.Counter
|
|
|
|
metricBlocksRead *metrics.Counter
|
|
metricRowsRead *metrics.Counter
|
|
}
|
|
|
|
// Limits contains various limits for Server.
|
|
type Limits struct {
|
|
// MaxLabelNames is the maximum label names, which may be returned from labelNames request.
|
|
MaxLabelNames int
|
|
|
|
// MaxLabelValues is the maximum label values, which may be returned from labelValues request.
|
|
MaxLabelValues int
|
|
|
|
// MaxTagValueSuffixes is the maximum number of entries, which can be returned from tagValueSuffixes request.
|
|
MaxTagValueSuffixes int
|
|
|
|
// MaxConcurrentRequests is the maximum number of concurrent requests a server can process.
|
|
//
|
|
// The remaining requests wait for up to MaxQueueDuration for their execution.
|
|
MaxConcurrentRequests int
|
|
|
|
// MaxConcurrentRequestsFlagName is the name for the flag containing the MaxConcurrentRequests value.
|
|
MaxConcurrentRequestsFlagName string
|
|
|
|
// MaxQueueDuration is the maximum duration to wait if MaxConcurrentRequests are executed.
|
|
MaxQueueDuration time.Duration
|
|
|
|
// MaxQueueDurationFlagName is the name for the flag containing the MaxQueueDuration value.
|
|
MaxQueueDurationFlagName string
|
|
}
|
|
|
|
// NewServer starts new Server at the given addr, which serves the given api with the given limits.
|
|
//
|
|
// If disableResponseCompression is set to true, then the returned server doesn't compress responses.
|
|
func NewServer(addr string, api API, limits Limits, disableResponseCompression bool) (*Server, error) {
|
|
ln, err := netutil.NewTCPListener("vmselect", addr, false, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to listen vmselectAddr %s: %w", addr, err)
|
|
}
|
|
concurrencyLimitCh := make(chan struct{}, limits.MaxConcurrentRequests)
|
|
_ = metrics.NewGauge(`vm_vmselect_concurrent_requests_capacity`, func() float64 {
|
|
return float64(cap(concurrencyLimitCh))
|
|
})
|
|
_ = metrics.NewGauge(`vm_vmselect_concurrent_requests_current`, func() float64 {
|
|
return float64(len(concurrencyLimitCh))
|
|
})
|
|
s := &Server{
|
|
api: api,
|
|
limits: limits,
|
|
disableResponseCompression: disableResponseCompression,
|
|
ln: ln,
|
|
|
|
concurrencyLimitCh: concurrencyLimitCh,
|
|
|
|
concurrencyLimitReached: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_concurrent_requests_limit_reached_total{addr=%q}`, addr)),
|
|
concurrencyLimitTimeout: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_concurrent_requests_limit_timeout_total{addr=%q}`, addr)),
|
|
|
|
vmselectConns: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_conns{addr=%q}`, addr)),
|
|
vmselectConnErrors: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_conn_errors_total{addr=%q}`, addr)),
|
|
|
|
indexSearchDuration: metrics.NewHistogram(fmt.Sprintf(`vm_index_search_duration_seconds{addr=%q}`, addr)),
|
|
|
|
registerMetricNamesRequests: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="registerMetricNames",addr=%q}`, addr)),
|
|
deleteSeriesRequests: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="deleteSeries",addr=%q}`, addr)),
|
|
labelNamesRequests: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="labelNames",addr=%q}`, addr)),
|
|
labelValuesRequests: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="labelValues",addr=%q}`, addr)),
|
|
tagValueSuffixesRequests: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="tagValueSuffixes",addr=%q}`, addr)),
|
|
seriesCountRequests: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="seriesSount",addr=%q}`, addr)),
|
|
tsdbStatusRequests: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="tsdbStatus",addr=%q}`, addr)),
|
|
searchMetricNamesRequests: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="searchMetricNames",addr=%q}`, addr)),
|
|
searchRequests: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="search",addr=%q}`, addr)),
|
|
tenantsRequests: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="tenants",addr=%q}`, addr)),
|
|
|
|
metricBlocksRead: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_metric_blocks_read_total{addr=%q}`, addr)),
|
|
metricRowsRead: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_metric_rows_read_total{addr=%q}`, addr)),
|
|
}
|
|
|
|
s.connsMap.Init()
|
|
s.wg.Add(1)
|
|
go func() {
|
|
s.run()
|
|
s.wg.Done()
|
|
}()
|
|
return s, nil
|
|
}
|
|
|
|
func (s *Server) run() {
|
|
logger.Infof("accepting vmselect conns at %s", s.ln.Addr())
|
|
for {
|
|
c, err := s.ln.Accept()
|
|
if err != nil {
|
|
if pe, ok := err.(net.Error); ok && pe.Temporary() {
|
|
continue
|
|
}
|
|
if s.isStopping() {
|
|
return
|
|
}
|
|
logger.Panicf("FATAL: cannot process vmselect conns at %s: %s", s.ln.Addr(), err)
|
|
}
|
|
// Do not log connection accept from vmselect, since this can generate too many lines
|
|
// in the log because vmselect tends to re-establish idle connections.
|
|
|
|
if !s.connsMap.Add(c) {
|
|
// The server is closed.
|
|
_ = c.Close()
|
|
return
|
|
}
|
|
s.vmselectConns.Inc()
|
|
s.wg.Add(1)
|
|
go func() {
|
|
defer func() {
|
|
s.connsMap.Delete(c)
|
|
s.vmselectConns.Dec()
|
|
s.wg.Done()
|
|
}()
|
|
|
|
// Compress responses to vmselect even if they already contain compressed blocks.
|
|
// Responses contain uncompressed metric names, which should compress well
|
|
// when the response contains high number of time series.
|
|
// Additionally, recently added metric blocks are usually uncompressed, so the compression
|
|
// should save network bandwidth.
|
|
compressionLevel := 1
|
|
if s.disableResponseCompression {
|
|
compressionLevel = 0
|
|
}
|
|
bc, err := handshake.VMSelectServer(c, compressionLevel)
|
|
if err != nil {
|
|
if s.isStopping() {
|
|
// c is closed inside Server.MustStop
|
|
return
|
|
}
|
|
if !errors.Is(err, handshake.ErrIgnoreHealthcheck) {
|
|
logger.Errorf("cannot perform vmselect handshake with client %q: %s", c.RemoteAddr(), err)
|
|
}
|
|
_ = c.Close()
|
|
return
|
|
}
|
|
|
|
defer func() {
|
|
_ = bc.Close()
|
|
}()
|
|
if err := s.processConn(bc); err != nil {
|
|
if s.isStopping() {
|
|
return
|
|
}
|
|
s.vmselectConnErrors.Inc()
|
|
logger.Errorf("cannot process vmselect conn %s: %s", c.RemoteAddr(), err)
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
|
|
// MustStop gracefully stops s, so it no longer touches s.api after returning.
|
|
func (s *Server) MustStop() {
|
|
// Mark the server as stoping.
|
|
s.setIsStopping()
|
|
|
|
// Stop accepting new connections from vmselect.
|
|
if err := s.ln.Close(); err != nil {
|
|
logger.Panicf("FATAL: cannot close vmselect listener: %s", err)
|
|
}
|
|
|
|
// Close existing connections from vmselect, so the goroutines
|
|
// processing these connections are finished.
|
|
s.connsMap.CloseAll()
|
|
|
|
// Wait until all the goroutines processing vmselect conns are finished.
|
|
s.wg.Wait()
|
|
}
|
|
|
|
func (s *Server) setIsStopping() {
|
|
atomic.StoreUint32(&s.stopFlag, 1)
|
|
}
|
|
|
|
func (s *Server) isStopping() bool {
|
|
return atomic.LoadUint32(&s.stopFlag) != 0
|
|
}
|
|
|
|
func (s *Server) processConn(bc *handshake.BufferedConn) error {
|
|
ctx := &vmselectRequestCtx{
|
|
bc: bc,
|
|
sizeBuf: make([]byte, 8),
|
|
}
|
|
for {
|
|
if err := s.processRequest(ctx); err != nil {
|
|
if isExpectedError(err) {
|
|
return nil
|
|
}
|
|
if errors.Is(err, storage.ErrDeadlineExceeded) {
|
|
return fmt.Errorf("cannot process vmselect request in %d seconds: %w", ctx.timeout, err)
|
|
}
|
|
return fmt.Errorf("cannot process vmselect request: %w", err)
|
|
}
|
|
if err := bc.Flush(); err != nil {
|
|
return fmt.Errorf("cannot flush compressed buffers: %w", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func isExpectedError(err error) bool {
|
|
if err == io.EOF {
|
|
// Remote client gracefully closed the connection.
|
|
return true
|
|
}
|
|
if errors.Is(err, net.ErrClosed) {
|
|
return true
|
|
}
|
|
errStr := err.Error()
|
|
if strings.Contains(errStr, "broken pipe") || strings.Contains(errStr, "connection reset by peer") {
|
|
// The connection has been interrupted abruptly.
|
|
// It could happen due to unexpected network glitch or because connection was
|
|
// interrupted by remote client. In both cases, remote client will notice
|
|
// connection breach and handle it on its own. No need in mirroring the error here.
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
type vmselectRequestCtx struct {
|
|
bc *handshake.BufferedConn
|
|
sizeBuf []byte
|
|
dataBuf []byte
|
|
|
|
qt *querytracer.Tracer
|
|
sq storage.SearchQuery
|
|
mb storage.MetricBlock
|
|
|
|
// timeout in seconds for the current request
|
|
timeout uint64
|
|
|
|
// deadline in unix timestamp seconds for the current request.
|
|
deadline uint64
|
|
}
|
|
|
|
func (ctx *vmselectRequestCtx) readTimeRange() (storage.TimeRange, error) {
|
|
var tr storage.TimeRange
|
|
minTimestamp, err := ctx.readUint64()
|
|
if err != nil {
|
|
return tr, fmt.Errorf("cannot read minTimestamp: %w", err)
|
|
}
|
|
maxTimestamp, err := ctx.readUint64()
|
|
if err != nil {
|
|
return tr, fmt.Errorf("cannot read maxTimestamp: %w", err)
|
|
}
|
|
tr.MinTimestamp = int64(minTimestamp)
|
|
tr.MaxTimestamp = int64(maxTimestamp)
|
|
return tr, nil
|
|
}
|
|
|
|
func (ctx *vmselectRequestCtx) readLimit() (int, error) {
|
|
n, err := ctx.readUint32()
|
|
if err != nil {
|
|
return 0, fmt.Errorf("cannot read limit: %w", err)
|
|
}
|
|
if n > 1<<31-1 {
|
|
n = 1<<31 - 1
|
|
}
|
|
return int(n), nil
|
|
}
|
|
|
|
func (ctx *vmselectRequestCtx) readUint32() (uint32, error) {
|
|
ctx.sizeBuf = bytesutil.ResizeNoCopyMayOverallocate(ctx.sizeBuf, 4)
|
|
if _, err := io.ReadFull(ctx.bc, ctx.sizeBuf); err != nil {
|
|
if err == io.EOF {
|
|
return 0, err
|
|
}
|
|
return 0, fmt.Errorf("cannot read uint32: %w", err)
|
|
}
|
|
n := encoding.UnmarshalUint32(ctx.sizeBuf)
|
|
return n, nil
|
|
}
|
|
|
|
func (ctx *vmselectRequestCtx) readUint64() (uint64, error) {
|
|
ctx.sizeBuf = bytesutil.ResizeNoCopyMayOverallocate(ctx.sizeBuf, 8)
|
|
if _, err := io.ReadFull(ctx.bc, ctx.sizeBuf); err != nil {
|
|
if err == io.EOF {
|
|
return 0, err
|
|
}
|
|
return 0, fmt.Errorf("cannot read uint64: %w", err)
|
|
}
|
|
n := encoding.UnmarshalUint64(ctx.sizeBuf)
|
|
return n, nil
|
|
}
|
|
|
|
func (ctx *vmselectRequestCtx) readAccountIDProjectID() (uint32, uint32, error) {
|
|
accountID, err := ctx.readUint32()
|
|
if err != nil {
|
|
return 0, 0, fmt.Errorf("cannot read accountID: %w", err)
|
|
}
|
|
projectID, err := ctx.readUint32()
|
|
if err != nil {
|
|
return 0, 0, fmt.Errorf("cannot read projectID: %w", err)
|
|
}
|
|
return accountID, projectID, nil
|
|
}
|
|
|
|
// maxSearchQuerySize is the maximum size of SearchQuery packet in bytes.
|
|
const maxSearchQuerySize = 1024 * 1024
|
|
|
|
func (ctx *vmselectRequestCtx) readSearchQuery() error {
|
|
if err := ctx.readDataBufBytes(maxSearchQuerySize); err != nil {
|
|
return fmt.Errorf("cannot read searchQuery: %w", err)
|
|
}
|
|
tail, err := ctx.sq.Unmarshal(ctx.dataBuf)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot unmarshal SearchQuery: %w", err)
|
|
}
|
|
if len(tail) > 0 {
|
|
return fmt.Errorf("unexpected non-zero tail left after unmarshaling SearchQuery: (len=%d) %q", len(tail), tail)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (ctx *vmselectRequestCtx) readDataBufBytes(maxDataSize int) error {
|
|
ctx.sizeBuf = bytesutil.ResizeNoCopyMayOverallocate(ctx.sizeBuf, 8)
|
|
if _, err := io.ReadFull(ctx.bc, ctx.sizeBuf); err != nil {
|
|
if err == io.EOF {
|
|
return err
|
|
}
|
|
return fmt.Errorf("cannot read data size: %w", err)
|
|
}
|
|
dataSize := encoding.UnmarshalUint64(ctx.sizeBuf)
|
|
if dataSize > uint64(maxDataSize) {
|
|
return fmt.Errorf("too big data size: %d; it mustn't exceed %d bytes", dataSize, maxDataSize)
|
|
}
|
|
ctx.dataBuf = bytesutil.ResizeNoCopyMayOverallocate(ctx.dataBuf, int(dataSize))
|
|
if dataSize == 0 {
|
|
return nil
|
|
}
|
|
if n, err := io.ReadFull(ctx.bc, ctx.dataBuf); err != nil {
|
|
return fmt.Errorf("cannot read data with size %d: %w; read only %d bytes", dataSize, err, n)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (ctx *vmselectRequestCtx) readBool() (bool, error) {
|
|
ctx.dataBuf = bytesutil.ResizeNoCopyMayOverallocate(ctx.dataBuf, 1)
|
|
if _, err := io.ReadFull(ctx.bc, ctx.dataBuf); err != nil {
|
|
if err == io.EOF {
|
|
return false, err
|
|
}
|
|
return false, fmt.Errorf("cannot read bool: %w", err)
|
|
}
|
|
v := ctx.dataBuf[0] != 0
|
|
return v, nil
|
|
}
|
|
|
|
func (ctx *vmselectRequestCtx) readByte() (byte, error) {
|
|
ctx.dataBuf = bytesutil.ResizeNoCopyMayOverallocate(ctx.dataBuf, 1)
|
|
if _, err := io.ReadFull(ctx.bc, ctx.dataBuf); err != nil {
|
|
if err == io.EOF {
|
|
return 0, err
|
|
}
|
|
return 0, fmt.Errorf("cannot read byte: %w", err)
|
|
}
|
|
b := ctx.dataBuf[0]
|
|
return b, nil
|
|
}
|
|
|
|
func (ctx *vmselectRequestCtx) writeDataBufBytes() error {
|
|
if err := ctx.writeUint64(uint64(len(ctx.dataBuf))); err != nil {
|
|
return fmt.Errorf("cannot write data size: %w", err)
|
|
}
|
|
if len(ctx.dataBuf) == 0 {
|
|
return nil
|
|
}
|
|
if _, err := ctx.bc.Write(ctx.dataBuf); err != nil {
|
|
return fmt.Errorf("cannot write data with size %d: %w", len(ctx.dataBuf), err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// maxErrorMessageSize is the maximum size of error message to send to clients.
|
|
const maxErrorMessageSize = 64 * 1024
|
|
|
|
func (ctx *vmselectRequestCtx) writeErrorMessage(err error) error {
|
|
if errors.Is(err, storage.ErrDeadlineExceeded) {
|
|
err = fmt.Errorf("cannot execute request in %d seconds: %w", ctx.timeout, err)
|
|
}
|
|
errMsg := err.Error()
|
|
if len(errMsg) > maxErrorMessageSize {
|
|
// Trim too long error message.
|
|
errMsg = errMsg[:maxErrorMessageSize]
|
|
}
|
|
if err := ctx.writeString(errMsg); err != nil {
|
|
return fmt.Errorf("cannot send error message %q to client: %w", errMsg, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (ctx *vmselectRequestCtx) writeString(s string) error {
|
|
ctx.dataBuf = append(ctx.dataBuf[:0], s...)
|
|
return ctx.writeDataBufBytes()
|
|
}
|
|
|
|
func (ctx *vmselectRequestCtx) writeUint64(n uint64) error {
|
|
ctx.sizeBuf = encoding.MarshalUint64(ctx.sizeBuf[:0], n)
|
|
if _, err := ctx.bc.Write(ctx.sizeBuf); err != nil {
|
|
return fmt.Errorf("cannot write uint64 %d: %w", n, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
const maxRPCNameSize = 128
|
|
|
|
func (s *Server) processRequest(ctx *vmselectRequestCtx) error {
|
|
// Read rpcName
|
|
// Do not set deadline on reading rpcName, since it may take a
|
|
// lot of time for idle connection.
|
|
if err := ctx.readDataBufBytes(maxRPCNameSize); err != nil {
|
|
if err == io.EOF {
|
|
// Remote client gracefully closed the connection.
|
|
return err
|
|
}
|
|
return fmt.Errorf("cannot read rpcName: %w", err)
|
|
}
|
|
rpcName := string(ctx.dataBuf)
|
|
|
|
// Initialize query tracing.
|
|
traceEnabled, err := ctx.readBool()
|
|
if err != nil {
|
|
return fmt.Errorf("cannot read traceEnabled: %w", err)
|
|
}
|
|
ctx.qt = querytracer.New(traceEnabled, "rpc call %s() at vmstorage", rpcName)
|
|
|
|
// Limit the time required for reading request args.
|
|
if err := ctx.bc.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil {
|
|
return fmt.Errorf("cannot set read deadline for reading request args: %w", err)
|
|
}
|
|
defer func() {
|
|
_ = ctx.bc.SetReadDeadline(time.Time{})
|
|
}()
|
|
|
|
// Read the timeout for request execution.
|
|
timeout, err := ctx.readUint32()
|
|
if err != nil {
|
|
return fmt.Errorf("cannot read timeout for the request %q: %w", rpcName, err)
|
|
}
|
|
ctx.timeout = uint64(timeout)
|
|
ctx.deadline = fasttime.UnixTimestamp() + uint64(timeout)
|
|
|
|
// Process the rpcName call.
|
|
if err := s.processRPC(ctx, rpcName); err != nil {
|
|
return fmt.Errorf("cannot execute %q: %s", rpcName, err)
|
|
}
|
|
|
|
// Finish query trace.
|
|
ctx.qt.Done()
|
|
traceJSON := ctx.qt.ToJSON()
|
|
if err := ctx.writeString(traceJSON); err != nil {
|
|
return fmt.Errorf("cannot send trace with length %d bytes to vmselect: %w", len(traceJSON), err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Server) beginConcurrentRequest(ctx *vmselectRequestCtx) error {
|
|
select {
|
|
case s.concurrencyLimitCh <- struct{}{}:
|
|
return nil
|
|
default:
|
|
d := time.Duration(ctx.timeout) * time.Second
|
|
if d > s.limits.MaxQueueDuration {
|
|
d = s.limits.MaxQueueDuration
|
|
}
|
|
t := timerpool.Get(d)
|
|
s.concurrencyLimitReached.Inc()
|
|
select {
|
|
case s.concurrencyLimitCh <- struct{}{}:
|
|
timerpool.Put(t)
|
|
ctx.qt.Printf("wait in queue because -%s=%d concurrent requests are executed", s.limits.MaxConcurrentRequestsFlagName, s.limits.MaxConcurrentRequests)
|
|
return nil
|
|
case <-t.C:
|
|
timerpool.Put(t)
|
|
s.concurrencyLimitTimeout.Inc()
|
|
return fmt.Errorf("couldn't start executing the request in %.3f seconds, since -%s=%d concurrent requests "+
|
|
"are already executed. Possible solutions: to reduce the query load; to add more compute resources to the server; "+
|
|
"to increase -%s=%d; to increase -%s",
|
|
d.Seconds(), s.limits.MaxConcurrentRequestsFlagName, s.limits.MaxConcurrentRequests,
|
|
s.limits.MaxQueueDurationFlagName, s.limits.MaxQueueDuration, s.limits.MaxConcurrentRequestsFlagName)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Server) endConcurrentRequest() {
|
|
<-s.concurrencyLimitCh
|
|
}
|
|
|
|
func (s *Server) processRPC(ctx *vmselectRequestCtx, rpcName string) error {
|
|
switch rpcName {
|
|
case "search_v7":
|
|
return s.processSearch(ctx)
|
|
case "searchMetricNames_v3":
|
|
return s.processSearchMetricNames(ctx)
|
|
case "labelValues_v5":
|
|
return s.processLabelValues(ctx)
|
|
case "tagValueSuffixes_v4":
|
|
return s.processTagValueSuffixes(ctx)
|
|
case "labelNames_v5":
|
|
return s.processLabelNames(ctx)
|
|
case "seriesCount_v4":
|
|
return s.processSeriesCount(ctx)
|
|
case "tsdbStatus_v5":
|
|
return s.processTSDBStatus(ctx)
|
|
case "deleteSeries_v5":
|
|
return s.processDeleteSeries(ctx)
|
|
case "registerMetricNames_v3":
|
|
return s.processRegisterMetricNames(ctx)
|
|
case "tenants_v1":
|
|
return s.processTenants(ctx)
|
|
default:
|
|
return fmt.Errorf("unsupported rpcName: %q", rpcName)
|
|
}
|
|
}
|
|
|
|
const maxMetricNameRawSize = 1024 * 1024
|
|
const maxMetricNamesPerRequest = 1024 * 1024
|
|
|
|
func (s *Server) processRegisterMetricNames(ctx *vmselectRequestCtx) error {
|
|
s.registerMetricNamesRequests.Inc()
|
|
|
|
// Read request
|
|
metricsCount, err := ctx.readUint64()
|
|
if err != nil {
|
|
return fmt.Errorf("cannot read metricsCount: %w", err)
|
|
}
|
|
if metricsCount > maxMetricNamesPerRequest {
|
|
return fmt.Errorf("too many metric names in a single request; got %d; mustn't exceed %d", metricsCount, maxMetricNamesPerRequest)
|
|
}
|
|
mrs := make([]storage.MetricRow, metricsCount)
|
|
for i := 0; i < int(metricsCount); i++ {
|
|
if err := ctx.readDataBufBytes(maxMetricNameRawSize); err != nil {
|
|
return fmt.Errorf("cannot read metricNameRaw: %w", err)
|
|
}
|
|
mr := &mrs[i]
|
|
mr.MetricNameRaw = append(mr.MetricNameRaw[:0], ctx.dataBuf...)
|
|
n, err := ctx.readUint64()
|
|
if err != nil {
|
|
return fmt.Errorf("cannot read timestamp: %w", err)
|
|
}
|
|
mr.Timestamp = int64(n)
|
|
}
|
|
|
|
if err := s.beginConcurrentRequest(ctx); err != nil {
|
|
return ctx.writeErrorMessage(err)
|
|
}
|
|
defer s.endConcurrentRequest()
|
|
|
|
// Register metric names from mrs.
|
|
if err := s.api.RegisterMetricNames(ctx.qt, mrs, ctx.deadline); err != nil {
|
|
return ctx.writeErrorMessage(err)
|
|
}
|
|
|
|
// Send an empty error message to vmselect.
|
|
if err := ctx.writeString(""); err != nil {
|
|
return fmt.Errorf("cannot send empty error message: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Server) processDeleteSeries(ctx *vmselectRequestCtx) error {
|
|
s.deleteSeriesRequests.Inc()
|
|
|
|
// Read request
|
|
if err := ctx.readSearchQuery(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := s.beginConcurrentRequest(ctx); err != nil {
|
|
return ctx.writeErrorMessage(err)
|
|
}
|
|
defer s.endConcurrentRequest()
|
|
|
|
// Execute the request.
|
|
deletedCount, err := s.api.DeleteSeries(ctx.qt, &ctx.sq, ctx.deadline)
|
|
if err != nil {
|
|
return ctx.writeErrorMessage(err)
|
|
}
|
|
|
|
// Send an empty error message to vmselect.
|
|
if err := ctx.writeString(""); err != nil {
|
|
return fmt.Errorf("cannot send empty error message: %w", err)
|
|
}
|
|
// Send deletedCount to vmselect.
|
|
if err := ctx.writeUint64(uint64(deletedCount)); err != nil {
|
|
return fmt.Errorf("cannot send deletedCount=%d: %w", deletedCount, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Server) processLabelNames(ctx *vmselectRequestCtx) error {
|
|
s.labelNamesRequests.Inc()
|
|
|
|
// Read request
|
|
if err := ctx.readSearchQuery(); err != nil {
|
|
return err
|
|
}
|
|
maxLabelNames, err := ctx.readLimit()
|
|
if err != nil {
|
|
return fmt.Errorf("cannot read maxLabelNames: %w", err)
|
|
}
|
|
if maxLabelNames <= 0 || maxLabelNames > s.limits.MaxLabelNames {
|
|
maxLabelNames = s.limits.MaxLabelNames
|
|
}
|
|
|
|
if err := s.beginConcurrentRequest(ctx); err != nil {
|
|
return ctx.writeErrorMessage(err)
|
|
}
|
|
defer s.endConcurrentRequest()
|
|
|
|
// Execute the request
|
|
labelNames, err := s.api.LabelNames(ctx.qt, &ctx.sq, maxLabelNames, ctx.deadline)
|
|
if err != nil {
|
|
return ctx.writeErrorMessage(err)
|
|
}
|
|
|
|
// Send an empty error message to vmselect.
|
|
if err := ctx.writeString(""); err != nil {
|
|
return fmt.Errorf("cannot send empty error message: %w", err)
|
|
}
|
|
|
|
// Send labelNames to vmselect
|
|
for _, labelName := range labelNames {
|
|
if err := ctx.writeString(labelName); err != nil {
|
|
return fmt.Errorf("cannot write label name %q: %w", labelName, err)
|
|
}
|
|
}
|
|
// Send 'end of response' marker
|
|
if err := ctx.writeString(""); err != nil {
|
|
return fmt.Errorf("cannot send 'end of response' marker")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
const maxLabelValueSize = 16 * 1024
|
|
|
|
func (s *Server) processLabelValues(ctx *vmselectRequestCtx) error {
|
|
s.labelValuesRequests.Inc()
|
|
|
|
// Read request
|
|
if err := ctx.readDataBufBytes(maxLabelValueSize); err != nil {
|
|
return fmt.Errorf("cannot read labelName: %w", err)
|
|
}
|
|
labelName := string(ctx.dataBuf)
|
|
if err := ctx.readSearchQuery(); err != nil {
|
|
return err
|
|
}
|
|
maxLabelValues, err := ctx.readLimit()
|
|
if err != nil {
|
|
return fmt.Errorf("cannot read maxLabelValues: %w", err)
|
|
}
|
|
if maxLabelValues <= 0 || maxLabelValues > s.limits.MaxLabelValues {
|
|
maxLabelValues = s.limits.MaxLabelValues
|
|
}
|
|
|
|
if err := s.beginConcurrentRequest(ctx); err != nil {
|
|
return ctx.writeErrorMessage(err)
|
|
}
|
|
defer s.endConcurrentRequest()
|
|
|
|
// Execute the request
|
|
labelValues, err := s.api.LabelValues(ctx.qt, &ctx.sq, labelName, maxLabelValues, ctx.deadline)
|
|
if err != nil {
|
|
return ctx.writeErrorMessage(err)
|
|
}
|
|
|
|
// Send an empty error message to vmselect.
|
|
if err := ctx.writeString(""); err != nil {
|
|
return fmt.Errorf("cannot send empty error message: %w", err)
|
|
}
|
|
|
|
// Send labelValues to vmselect
|
|
for _, labelValue := range labelValues {
|
|
if len(labelValue) == 0 {
|
|
// Skip empty label values, since they have no sense for prometheus.
|
|
continue
|
|
}
|
|
if err := ctx.writeString(labelValue); err != nil {
|
|
return fmt.Errorf("cannot write labelValue %q: %w", labelValue, err)
|
|
}
|
|
}
|
|
// Send 'end of label values' marker
|
|
if err := ctx.writeString(""); err != nil {
|
|
return fmt.Errorf("cannot send 'end of response' marker")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Server) processTagValueSuffixes(ctx *vmselectRequestCtx) error {
|
|
s.tagValueSuffixesRequests.Inc()
|
|
|
|
// read request
|
|
accountID, projectID, err := ctx.readAccountIDProjectID()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
tr, err := ctx.readTimeRange()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := ctx.readDataBufBytes(maxLabelValueSize); err != nil {
|
|
return fmt.Errorf("cannot read tagKey: %w", err)
|
|
}
|
|
tagKey := string(ctx.dataBuf)
|
|
if err := ctx.readDataBufBytes(maxLabelValueSize); err != nil {
|
|
return fmt.Errorf("cannot read tagValuePrefix: %w", err)
|
|
}
|
|
tagValuePrefix := string(ctx.dataBuf)
|
|
delimiter, err := ctx.readByte()
|
|
if err != nil {
|
|
return fmt.Errorf("cannot read delimiter: %w", err)
|
|
}
|
|
maxSuffixes, err := ctx.readLimit()
|
|
if err != nil {
|
|
return fmt.Errorf("cannot read maxTagValueSuffixes: %d", err)
|
|
}
|
|
if maxSuffixes <= 0 || maxSuffixes > s.limits.MaxTagValueSuffixes {
|
|
maxSuffixes = s.limits.MaxTagValueSuffixes
|
|
}
|
|
|
|
if err := s.beginConcurrentRequest(ctx); err != nil {
|
|
return ctx.writeErrorMessage(err)
|
|
}
|
|
defer s.endConcurrentRequest()
|
|
|
|
// Execute the request
|
|
suffixes, err := s.api.TagValueSuffixes(ctx.qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, ctx.deadline)
|
|
if err != nil {
|
|
return ctx.writeErrorMessage(err)
|
|
}
|
|
|
|
if len(suffixes) >= s.limits.MaxTagValueSuffixes {
|
|
err := fmt.Errorf("more than %d tag value suffixes found "+
|
|
"for tagKey=%q, tagValuePrefix=%q, delimiter=%c on time range %s; "+
|
|
"either narrow down the query or increase -search.max* command-line flag value; see https://docs.victoriametrics.com/#resource-usage-limits",
|
|
s.limits.MaxTagValueSuffixes, tagKey, tagValuePrefix, delimiter, tr.String())
|
|
return ctx.writeErrorMessage(err)
|
|
}
|
|
|
|
// Send an empty error message to vmselect.
|
|
if err := ctx.writeString(""); err != nil {
|
|
return fmt.Errorf("cannot send empty error message: %w", err)
|
|
}
|
|
|
|
// Send suffixes to vmselect.
|
|
// Suffixes may contain empty string, so prepend suffixes with suffixCount.
|
|
if err := ctx.writeUint64(uint64(len(suffixes))); err != nil {
|
|
return fmt.Errorf("cannot write suffixesCount: %w", err)
|
|
}
|
|
for i, suffix := range suffixes {
|
|
if err := ctx.writeString(suffix); err != nil {
|
|
return fmt.Errorf("cannot write suffix #%d: %w", i+1, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Server) processSeriesCount(ctx *vmselectRequestCtx) error {
|
|
s.seriesCountRequests.Inc()
|
|
|
|
// Read request
|
|
accountID, projectID, err := ctx.readAccountIDProjectID()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := s.beginConcurrentRequest(ctx); err != nil {
|
|
return ctx.writeErrorMessage(err)
|
|
}
|
|
defer s.endConcurrentRequest()
|
|
|
|
// Execute the request
|
|
n, err := s.api.SeriesCount(ctx.qt, accountID, projectID, ctx.deadline)
|
|
if err != nil {
|
|
return ctx.writeErrorMessage(err)
|
|
}
|
|
|
|
// Send an empty error message to vmselect.
|
|
if err := ctx.writeString(""); err != nil {
|
|
return fmt.Errorf("cannot send empty error message: %w", err)
|
|
}
|
|
|
|
// Send series count to vmselect.
|
|
if err := ctx.writeUint64(n); err != nil {
|
|
return fmt.Errorf("cannot write series count to vmselect: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Server) processTSDBStatus(ctx *vmselectRequestCtx) error {
|
|
s.tsdbStatusRequests.Inc()
|
|
|
|
// Read request
|
|
if err := ctx.readSearchQuery(); err != nil {
|
|
return err
|
|
}
|
|
if err := ctx.readDataBufBytes(maxLabelValueSize); err != nil {
|
|
return fmt.Errorf("cannot read focusLabel: %w", err)
|
|
}
|
|
focusLabel := string(ctx.dataBuf)
|
|
topN, err := ctx.readUint32()
|
|
if err != nil {
|
|
return fmt.Errorf("cannot read topN: %w", err)
|
|
}
|
|
|
|
if err := s.beginConcurrentRequest(ctx); err != nil {
|
|
return ctx.writeErrorMessage(err)
|
|
}
|
|
defer s.endConcurrentRequest()
|
|
|
|
// Execute the request
|
|
status, err := s.api.TSDBStatus(ctx.qt, &ctx.sq, focusLabel, int(topN), ctx.deadline)
|
|
if err != nil {
|
|
return ctx.writeErrorMessage(err)
|
|
}
|
|
|
|
// Send an empty error message to vmselect.
|
|
if err := ctx.writeString(""); err != nil {
|
|
return fmt.Errorf("cannot send empty error message: %w", err)
|
|
}
|
|
|
|
// Send status to vmselect.
|
|
return writeTSDBStatus(ctx, status)
|
|
}
|
|
|
|
func (s *Server) processTenants(ctx *vmselectRequestCtx) error {
|
|
s.tenantsRequests.Inc()
|
|
|
|
// Read request
|
|
tr, err := ctx.readTimeRange()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := s.beginConcurrentRequest(ctx); err != nil {
|
|
return ctx.writeErrorMessage(err)
|
|
}
|
|
defer s.endConcurrentRequest()
|
|
|
|
// Execute the request
|
|
tenants, err := s.api.Tenants(ctx.qt, tr, ctx.deadline)
|
|
if err != nil {
|
|
return ctx.writeErrorMessage(err)
|
|
}
|
|
|
|
// Send an empty error message to vmselect.
|
|
if err := ctx.writeString(""); err != nil {
|
|
return fmt.Errorf("cannot send empty error message: %w", err)
|
|
}
|
|
|
|
// Send tenants to vmselect
|
|
for _, tenant := range tenants {
|
|
if err := ctx.writeString(tenant); err != nil {
|
|
return fmt.Errorf("cannot write tenant %q: %w", tenant, err)
|
|
}
|
|
}
|
|
// Send 'end of response' marker
|
|
if err := ctx.writeString(""); err != nil {
|
|
return fmt.Errorf("cannot send 'end of response' marker")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func writeTSDBStatus(ctx *vmselectRequestCtx, status *storage.TSDBStatus) error {
|
|
if err := ctx.writeUint64(status.TotalSeries); err != nil {
|
|
return fmt.Errorf("cannot write totalSeries to vmselect: %w", err)
|
|
}
|
|
if err := ctx.writeUint64(status.TotalLabelValuePairs); err != nil {
|
|
return fmt.Errorf("cannot write totalLabelValuePairs to vmselect: %w", err)
|
|
}
|
|
if err := writeTopHeapEntries(ctx, status.SeriesCountByMetricName); err != nil {
|
|
return fmt.Errorf("cannot write seriesCountByMetricName to vmselect: %w", err)
|
|
}
|
|
if err := writeTopHeapEntries(ctx, status.SeriesCountByLabelName); err != nil {
|
|
return fmt.Errorf("cannot write seriesCountByLabelName to vmselect: %w", err)
|
|
}
|
|
if err := writeTopHeapEntries(ctx, status.SeriesCountByFocusLabelValue); err != nil {
|
|
return fmt.Errorf("cannot write seriesCountByFocusLabelValue to vmselect: %w", err)
|
|
}
|
|
if err := writeTopHeapEntries(ctx, status.SeriesCountByLabelValuePair); err != nil {
|
|
return fmt.Errorf("cannot write seriesCountByLabelValuePair to vmselect: %w", err)
|
|
}
|
|
if err := writeTopHeapEntries(ctx, status.LabelValueCountByLabelName); err != nil {
|
|
return fmt.Errorf("cannot write labelValueCountByLabelName to vmselect: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func writeTopHeapEntries(ctx *vmselectRequestCtx, a []storage.TopHeapEntry) error {
|
|
if err := ctx.writeUint64(uint64(len(a))); err != nil {
|
|
return fmt.Errorf("cannot write topHeapEntries size: %w", err)
|
|
}
|
|
for _, e := range a {
|
|
if err := ctx.writeString(e.Name); err != nil {
|
|
return fmt.Errorf("cannot write topHeapEntry name: %w", err)
|
|
}
|
|
if err := ctx.writeUint64(e.Count); err != nil {
|
|
return fmt.Errorf("cannot write topHeapEntry count: %w", err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Server) processSearchMetricNames(ctx *vmselectRequestCtx) error {
|
|
s.searchMetricNamesRequests.Inc()
|
|
|
|
// Read request.
|
|
if err := ctx.readSearchQuery(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := s.beginConcurrentRequest(ctx); err != nil {
|
|
return ctx.writeErrorMessage(err)
|
|
}
|
|
defer s.endConcurrentRequest()
|
|
|
|
// Execute request.
|
|
metricNames, err := s.api.SearchMetricNames(ctx.qt, &ctx.sq, ctx.deadline)
|
|
if err != nil {
|
|
return ctx.writeErrorMessage(err)
|
|
}
|
|
|
|
// Send empty error message to vmselect.
|
|
if err := ctx.writeString(""); err != nil {
|
|
return fmt.Errorf("cannot send empty error message: %w", err)
|
|
}
|
|
|
|
// Send response.
|
|
metricNamesCount := len(metricNames)
|
|
if err := ctx.writeUint64(uint64(metricNamesCount)); err != nil {
|
|
return fmt.Errorf("cannot send metricNamesCount: %w", err)
|
|
}
|
|
for i, metricName := range metricNames {
|
|
if err := ctx.writeString(metricName); err != nil {
|
|
return fmt.Errorf("cannot send metricName #%d: %w", i+1, err)
|
|
}
|
|
}
|
|
ctx.qt.Printf("sent %d series to vmselect", len(metricNames))
|
|
return nil
|
|
}
|
|
|
|
func (s *Server) processSearch(ctx *vmselectRequestCtx) error {
|
|
s.searchRequests.Inc()
|
|
|
|
// Read request.
|
|
if err := ctx.readSearchQuery(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := s.beginConcurrentRequest(ctx); err != nil {
|
|
return ctx.writeErrorMessage(err)
|
|
}
|
|
defer s.endConcurrentRequest()
|
|
|
|
// Initiaialize the search.
|
|
startTime := time.Now()
|
|
bi, err := s.api.InitSearch(ctx.qt, &ctx.sq, ctx.deadline)
|
|
if err != nil {
|
|
return ctx.writeErrorMessage(err)
|
|
}
|
|
s.indexSearchDuration.UpdateDuration(startTime)
|
|
defer bi.MustClose()
|
|
|
|
// Send empty error message to vmselect.
|
|
if err := ctx.writeString(""); err != nil {
|
|
return fmt.Errorf("cannot send empty error message: %w", err)
|
|
}
|
|
|
|
// Send found blocks to vmselect.
|
|
blocksRead := 0
|
|
for bi.NextBlock(&ctx.mb) {
|
|
blocksRead++
|
|
s.metricBlocksRead.Inc()
|
|
s.metricRowsRead.Add(ctx.mb.Block.RowsCount())
|
|
|
|
ctx.dataBuf = ctx.mb.Marshal(ctx.dataBuf[:0])
|
|
|
|
if err := ctx.writeDataBufBytes(); err != nil {
|
|
return fmt.Errorf("cannot send MetricBlock: %w", err)
|
|
}
|
|
}
|
|
if err := bi.Error(); err != nil {
|
|
return fmt.Errorf("search error: %w", err)
|
|
}
|
|
ctx.qt.Printf("sent %d blocks to vmselect", blocksRead)
|
|
|
|
// Send 'end of response' marker
|
|
if err := ctx.writeString(""); err != nil {
|
|
return fmt.Errorf("cannot send 'end of response' marker")
|
|
}
|
|
return nil
|
|
}
|