package vmselectapi

import (
	"errors"
	"fmt"
	"io"
	"net"
	"strings"
	"sync"
	"sync/atomic"
	"time"

	"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
	"github.com/VictoriaMetrics/metrics"
)

// Server processes vmselect requests.
type Server struct {
	// api contains the implementation of the server API for vmselect requests.
	api API

	// limits contains various limits for the Server.
	limits Limits

	// disableResponseCompression controls whether vmselect server must compress responses.
	disableResponseCompression bool

	// ln is the listener for incoming connections to the server.
	ln net.Listener

	// The channel for limiting the number of concurrently executed requests.
	concurrencyLimitCh chan struct{}

	// connsMap is a map of currently established connections to the server.
	// It is used for closing the connections when MustStop() is called.
	connsMap ingestserver.ConnsMap

	// wg is used for waiting for worker goroutines to stop when MustStop() is called.
	wg sync.WaitGroup

	// stopFlag is set to true when the server needs to stop.
	stopFlag uint32

	concurrencyLimitReached *metrics.Counter
	concurrencyLimitTimeout *metrics.Counter

	vmselectConns      *metrics.Counter
	vmselectConnErrors *metrics.Counter

	indexSearchDuration *metrics.Histogram

	registerMetricNamesRequests *metrics.Counter
	deleteSeriesRequests        *metrics.Counter
	labelNamesRequests          *metrics.Counter
	labelValuesRequests         *metrics.Counter
	tagValueSuffixesRequests    *metrics.Counter
	seriesCountRequests         *metrics.Counter
	tsdbStatusRequests          *metrics.Counter
	searchMetricNamesRequests   *metrics.Counter
	searchRequests              *metrics.Counter
	tenantsRequests             *metrics.Counter

	metricBlocksRead *metrics.Counter
	metricRowsRead   *metrics.Counter
}

// Limits contains various limits for Server.
type Limits struct {
	// MaxLabelNames is the maximum label names, which may be returned from labelNames request.
	MaxLabelNames int

	// MaxLabelValues is the maximum label values, which may be returned from labelValues request.
	MaxLabelValues int

	// MaxTagValueSuffixes is the maximum number of entries, which can be returned from tagValueSuffixes request.
	MaxTagValueSuffixes int

	// MaxConcurrentRequests is the maximum number of concurrent requests a server can process.
	//
	// The remaining requests wait for up to MaxQueueDuration for their execution.
	MaxConcurrentRequests int

	// MaxConcurrentRequestsFlagName is the name for the flag containing the MaxConcurrentRequests value.
	MaxConcurrentRequestsFlagName string

	// MaxQueueDuration is the maximum duration to wait if MaxConcurrentRequests are executed.
	MaxQueueDuration time.Duration

	// MaxQueueDurationFlagName is the name for the flag containing the MaxQueueDuration value.
	MaxQueueDurationFlagName string
}

// NewServer starts new Server at the given addr, which serves the given api with the given limits.
//
// If disableResponseCompression is set to true, then the returned server doesn't compress responses.
func NewServer(addr string, api API, limits Limits, disableResponseCompression bool) (*Server, error) {
	ln, err := netutil.NewTCPListener("vmselect", addr, false, nil)
	if err != nil {
		return nil, fmt.Errorf("unable to listen vmselectAddr %s: %w", addr, err)
	}
	concurrencyLimitCh := make(chan struct{}, limits.MaxConcurrentRequests)
	_ = metrics.NewGauge(`vm_vmselect_concurrent_requests_capacity`, func() float64 {
		return float64(cap(concurrencyLimitCh))
	})
	_ = metrics.NewGauge(`vm_vmselect_concurrent_requests_current`, func() float64 {
		return float64(len(concurrencyLimitCh))
	})
	s := &Server{
		api:                        api,
		limits:                     limits,
		disableResponseCompression: disableResponseCompression,
		ln:                         ln,

		concurrencyLimitCh: concurrencyLimitCh,

		concurrencyLimitReached: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_concurrent_requests_limit_reached_total{addr=%q}`, addr)),
		concurrencyLimitTimeout: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_concurrent_requests_limit_timeout_total{addr=%q}`, addr)),

		vmselectConns:      metrics.NewCounter(fmt.Sprintf(`vm_vmselect_conns{addr=%q}`, addr)),
		vmselectConnErrors: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_conn_errors_total{addr=%q}`, addr)),

		indexSearchDuration: metrics.NewHistogram(fmt.Sprintf(`vm_index_search_duration_seconds{addr=%q}`, addr)),

		registerMetricNamesRequests: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="registerMetricNames",addr=%q}`, addr)),
		deleteSeriesRequests:        metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="deleteSeries",addr=%q}`, addr)),
		labelNamesRequests:          metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="labelNames",addr=%q}`, addr)),
		labelValuesRequests:         metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="labelValues",addr=%q}`, addr)),
		tagValueSuffixesRequests:    metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="tagValueSuffixes",addr=%q}`, addr)),
		seriesCountRequests:         metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="seriesSount",addr=%q}`, addr)),
		tsdbStatusRequests:          metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="tsdbStatus",addr=%q}`, addr)),
		searchMetricNamesRequests:   metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="searchMetricNames",addr=%q}`, addr)),
		searchRequests:              metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="search",addr=%q}`, addr)),
		tenantsRequests:             metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="tenants",addr=%q}`, addr)),

		metricBlocksRead: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_metric_blocks_read_total{addr=%q}`, addr)),
		metricRowsRead:   metrics.NewCounter(fmt.Sprintf(`vm_vmselect_metric_rows_read_total{addr=%q}`, addr)),
	}

	s.connsMap.Init("vmselect")
	s.wg.Add(1)
	go func() {
		s.run()
		s.wg.Done()
	}()
	return s, nil
}

func (s *Server) run() {
	logger.Infof("accepting vmselect conns at %s", s.ln.Addr())
	for {
		c, err := s.ln.Accept()
		if err != nil {
			if pe, ok := err.(net.Error); ok && pe.Temporary() {
				continue
			}
			if s.isStopping() {
				return
			}
			logger.Panicf("FATAL: cannot process vmselect conns at %s: %s", s.ln.Addr(), err)
		}
		// Do not log connection accept from vmselect, since this can generate too many lines
		// in the log because vmselect tends to re-establish idle connections.

		if !s.connsMap.Add(c) {
			// The server is closed.
			_ = c.Close()
			return
		}
		s.vmselectConns.Inc()
		s.wg.Add(1)
		go func() {
			defer func() {
				s.connsMap.Delete(c)
				s.vmselectConns.Dec()
				s.wg.Done()
			}()

			// Compress responses to vmselect even if they already contain compressed blocks.
			// Responses contain uncompressed metric names, which should compress well
			// when the response contains high number of time series.
			// Additionally, recently added metric blocks are usually uncompressed, so the compression
			// should save network bandwidth.
			compressionLevel := 1
			if s.disableResponseCompression {
				compressionLevel = 0
			}
			bc, err := handshake.VMSelectServer(c, compressionLevel)
			if err != nil {
				if s.isStopping() {
					// c is closed inside Server.MustStop
					return
				}
				if !errors.Is(err, handshake.ErrIgnoreHealthcheck) {
					logger.Errorf("cannot perform vmselect handshake with client %q: %s", c.RemoteAddr(), err)
				}
				_ = c.Close()
				return
			}

			defer func() {
				_ = bc.Close()
			}()
			if err := s.processConn(bc); err != nil {
				if s.isStopping() {
					return
				}
				s.vmselectConnErrors.Inc()
				logger.Errorf("cannot process vmselect conn %s: %s", c.RemoteAddr(), err)
			}
		}()
	}
}

// MustStop gracefully stops s, so it no longer touches s.api after returning.
func (s *Server) MustStop() {
	// Mark the server as stoping.
	s.setIsStopping()

	// Stop accepting new connections from vmselect.
	if err := s.ln.Close(); err != nil {
		logger.Panicf("FATAL: cannot close vmselect listener: %s", err)
	}

	// Close existing connections from vmselect, so the goroutines
	// processing these connections are finished.
	s.connsMap.CloseAll(0)

	// Wait until all the goroutines processing vmselect conns are finished.
	s.wg.Wait()
}

func (s *Server) setIsStopping() {
	atomic.StoreUint32(&s.stopFlag, 1)
}

func (s *Server) isStopping() bool {
	return atomic.LoadUint32(&s.stopFlag) != 0
}

func (s *Server) processConn(bc *handshake.BufferedConn) error {
	ctx := &vmselectRequestCtx{
		bc:      bc,
		sizeBuf: make([]byte, 8),
	}
	for {
		if err := s.processRequest(ctx); err != nil {
			if isExpectedError(err) {
				return nil
			}
			if errors.Is(err, storage.ErrDeadlineExceeded) {
				return fmt.Errorf("cannot process vmselect request in %d seconds: %w", ctx.timeout, err)
			}
			return fmt.Errorf("cannot process vmselect request: %w", err)
		}
		if err := bc.Flush(); err != nil {
			return fmt.Errorf("cannot flush compressed buffers: %w", err)
		}
	}
}

func isExpectedError(err error) bool {
	if err == io.EOF {
		// Remote client gracefully closed the connection.
		return true
	}
	if errors.Is(err, net.ErrClosed) {
		return true
	}
	errStr := err.Error()
	if strings.Contains(errStr, "broken pipe") || strings.Contains(errStr, "connection reset by peer") {
		// The connection has been interrupted abruptly.
		// It could happen due to unexpected network glitch or because connection was
		// interrupted by remote client. In both cases, remote client will notice
		// connection breach and handle it on its own. No need in mirroring the error here.
		return true
	}
	return false
}

type vmselectRequestCtx struct {
	bc      *handshake.BufferedConn
	sizeBuf []byte
	dataBuf []byte

	qt *querytracer.Tracer
	sq storage.SearchQuery
	mb storage.MetricBlock

	// timeout in seconds for the current request
	timeout uint64

	// deadline in unix timestamp seconds for the current request.
	deadline uint64
}

func (ctx *vmselectRequestCtx) readTimeRange() (storage.TimeRange, error) {
	var tr storage.TimeRange
	minTimestamp, err := ctx.readUint64()
	if err != nil {
		return tr, fmt.Errorf("cannot read minTimestamp: %w", err)
	}
	maxTimestamp, err := ctx.readUint64()
	if err != nil {
		return tr, fmt.Errorf("cannot read maxTimestamp: %w", err)
	}
	tr.MinTimestamp = int64(minTimestamp)
	tr.MaxTimestamp = int64(maxTimestamp)
	return tr, nil
}

func (ctx *vmselectRequestCtx) readLimit() (int, error) {
	n, err := ctx.readUint32()
	if err != nil {
		return 0, fmt.Errorf("cannot read limit: %w", err)
	}
	if n > 1<<31-1 {
		n = 1<<31 - 1
	}
	return int(n), nil
}

func (ctx *vmselectRequestCtx) readUint32() (uint32, error) {
	ctx.sizeBuf = bytesutil.ResizeNoCopyMayOverallocate(ctx.sizeBuf, 4)
	if _, err := io.ReadFull(ctx.bc, ctx.sizeBuf); err != nil {
		if err == io.EOF {
			return 0, err
		}
		return 0, fmt.Errorf("cannot read uint32: %w", err)
	}
	n := encoding.UnmarshalUint32(ctx.sizeBuf)
	return n, nil
}

func (ctx *vmselectRequestCtx) readUint64() (uint64, error) {
	ctx.sizeBuf = bytesutil.ResizeNoCopyMayOverallocate(ctx.sizeBuf, 8)
	if _, err := io.ReadFull(ctx.bc, ctx.sizeBuf); err != nil {
		if err == io.EOF {
			return 0, err
		}
		return 0, fmt.Errorf("cannot read uint64: %w", err)
	}
	n := encoding.UnmarshalUint64(ctx.sizeBuf)
	return n, nil
}

func (ctx *vmselectRequestCtx) readAccountIDProjectID() (uint32, uint32, error) {
	accountID, err := ctx.readUint32()
	if err != nil {
		return 0, 0, fmt.Errorf("cannot read accountID: %w", err)
	}
	projectID, err := ctx.readUint32()
	if err != nil {
		return 0, 0, fmt.Errorf("cannot read projectID: %w", err)
	}
	return accountID, projectID, nil
}

// maxSearchQuerySize is the maximum size of SearchQuery packet in bytes.
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5154#issuecomment-1757216612
const maxSearchQuerySize = 5 * 1024 * 1024

func (ctx *vmselectRequestCtx) readSearchQuery() error {
	if err := ctx.readDataBufBytes(maxSearchQuerySize); err != nil {
		return fmt.Errorf("cannot read searchQuery: %w", err)
	}
	tail, err := ctx.sq.Unmarshal(ctx.dataBuf)
	if err != nil {
		return fmt.Errorf("cannot unmarshal SearchQuery: %w", err)
	}
	if len(tail) > 0 {
		return fmt.Errorf("unexpected non-zero tail left after unmarshaling SearchQuery: (len=%d) %q", len(tail), tail)
	}
	return nil
}

func (ctx *vmselectRequestCtx) readDataBufBytes(maxDataSize int) error {
	ctx.sizeBuf = bytesutil.ResizeNoCopyMayOverallocate(ctx.sizeBuf, 8)
	if _, err := io.ReadFull(ctx.bc, ctx.sizeBuf); err != nil {
		if err == io.EOF {
			return err
		}
		return fmt.Errorf("cannot read data size: %w", err)
	}
	dataSize := encoding.UnmarshalUint64(ctx.sizeBuf)
	if dataSize > uint64(maxDataSize) {
		return fmt.Errorf("too big data size: %d; it mustn't exceed %d bytes", dataSize, maxDataSize)
	}
	ctx.dataBuf = bytesutil.ResizeNoCopyMayOverallocate(ctx.dataBuf, int(dataSize))
	if dataSize == 0 {
		return nil
	}
	if n, err := io.ReadFull(ctx.bc, ctx.dataBuf); err != nil {
		return fmt.Errorf("cannot read data with size %d: %w; read only %d bytes", dataSize, err, n)
	}
	return nil
}

func (ctx *vmselectRequestCtx) readBool() (bool, error) {
	ctx.dataBuf = bytesutil.ResizeNoCopyMayOverallocate(ctx.dataBuf, 1)
	if _, err := io.ReadFull(ctx.bc, ctx.dataBuf); err != nil {
		if err == io.EOF {
			return false, err
		}
		return false, fmt.Errorf("cannot read bool: %w", err)
	}
	v := ctx.dataBuf[0] != 0
	return v, nil
}

func (ctx *vmselectRequestCtx) readByte() (byte, error) {
	ctx.dataBuf = bytesutil.ResizeNoCopyMayOverallocate(ctx.dataBuf, 1)
	if _, err := io.ReadFull(ctx.bc, ctx.dataBuf); err != nil {
		if err == io.EOF {
			return 0, err
		}
		return 0, fmt.Errorf("cannot read byte: %w", err)
	}
	b := ctx.dataBuf[0]
	return b, nil
}

func (ctx *vmselectRequestCtx) writeDataBufBytes() error {
	if err := ctx.writeUint64(uint64(len(ctx.dataBuf))); err != nil {
		return fmt.Errorf("cannot write data size: %w", err)
	}
	if len(ctx.dataBuf) == 0 {
		return nil
	}
	if _, err := ctx.bc.Write(ctx.dataBuf); err != nil {
		return fmt.Errorf("cannot write data with size %d: %w", len(ctx.dataBuf), err)
	}
	return nil
}

// maxErrorMessageSize is the maximum size of error message to send to clients.
const maxErrorMessageSize = 64 * 1024

func (ctx *vmselectRequestCtx) writeErrorMessage(err error) error {
	if errors.Is(err, storage.ErrDeadlineExceeded) {
		err = fmt.Errorf("cannot execute request in %d seconds: %w", ctx.timeout, err)
	}
	errMsg := err.Error()
	if len(errMsg) > maxErrorMessageSize {
		// Trim too long error message.
		errMsg = errMsg[:maxErrorMessageSize]
	}
	if err := ctx.writeString(errMsg); err != nil {
		return fmt.Errorf("cannot send error message %q to client: %w", errMsg, err)
	}
	return nil
}

func (ctx *vmselectRequestCtx) writeString(s string) error {
	ctx.dataBuf = append(ctx.dataBuf[:0], s...)
	return ctx.writeDataBufBytes()
}

func (ctx *vmselectRequestCtx) writeUint64(n uint64) error {
	ctx.sizeBuf = encoding.MarshalUint64(ctx.sizeBuf[:0], n)
	if _, err := ctx.bc.Write(ctx.sizeBuf); err != nil {
		return fmt.Errorf("cannot write uint64 %d: %w", n, err)
	}
	return nil
}

const maxRPCNameSize = 128

func (s *Server) processRequest(ctx *vmselectRequestCtx) error {
	// Read rpcName
	// Do not set deadline on reading rpcName, since it may take a
	// lot of time for idle connection.
	if err := ctx.readDataBufBytes(maxRPCNameSize); err != nil {
		if err == io.EOF {
			// Remote client gracefully closed the connection.
			return err
		}
		return fmt.Errorf("cannot read rpcName: %w", err)
	}
	rpcName := string(ctx.dataBuf)

	// Initialize query tracing.
	traceEnabled, err := ctx.readBool()
	if err != nil {
		return fmt.Errorf("cannot read traceEnabled: %w", err)
	}
	ctx.qt = querytracer.New(traceEnabled, "rpc call %s() at vmstorage", rpcName)

	// Limit the time required for reading request args.
	if err := ctx.bc.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil {
		return fmt.Errorf("cannot set read deadline for reading request args: %w", err)
	}
	defer func() {
		_ = ctx.bc.SetReadDeadline(time.Time{})
	}()

	// Read the timeout for request execution.
	timeout, err := ctx.readUint32()
	if err != nil {
		return fmt.Errorf("cannot read timeout for the request %q: %w", rpcName, err)
	}
	ctx.timeout = uint64(timeout)
	ctx.deadline = fasttime.UnixTimestamp() + uint64(timeout)

	// Process the rpcName call.
	if err := s.processRPC(ctx, rpcName); err != nil {
		return fmt.Errorf("cannot execute %q: %w", rpcName, err)
	}

	// Finish query trace.
	ctx.qt.Done()
	traceJSON := ctx.qt.ToJSON()
	if err := ctx.writeString(traceJSON); err != nil {
		return fmt.Errorf("cannot send trace with length %d bytes to vmselect: %w", len(traceJSON), err)
	}
	return nil
}

func (s *Server) beginConcurrentRequest(ctx *vmselectRequestCtx) error {
	select {
	case s.concurrencyLimitCh <- struct{}{}:
		return nil
	default:
		d := time.Duration(ctx.timeout) * time.Second
		if d > s.limits.MaxQueueDuration {
			d = s.limits.MaxQueueDuration
		}
		t := timerpool.Get(d)
		s.concurrencyLimitReached.Inc()
		select {
		case s.concurrencyLimitCh <- struct{}{}:
			timerpool.Put(t)
			ctx.qt.Printf("wait in queue because -%s=%d concurrent requests are executed", s.limits.MaxConcurrentRequestsFlagName, s.limits.MaxConcurrentRequests)
			return nil
		case <-t.C:
			timerpool.Put(t)
			s.concurrencyLimitTimeout.Inc()
			return fmt.Errorf("couldn't start executing the request in %.3f seconds, since -%s=%d concurrent requests "+
				"are already executed. Possible solutions: to reduce the query load; to add more compute resources to the server; "+
				"to increase -%s=%d; to increase -%s",
				d.Seconds(), s.limits.MaxConcurrentRequestsFlagName, s.limits.MaxConcurrentRequests,
				s.limits.MaxQueueDurationFlagName, s.limits.MaxQueueDuration, s.limits.MaxConcurrentRequestsFlagName)
		}
	}
}

func (s *Server) endConcurrentRequest() {
	<-s.concurrencyLimitCh
}

func (s *Server) processRPC(ctx *vmselectRequestCtx, rpcName string) error {
	switch rpcName {
	case "search_v7":
		return s.processSearch(ctx)
	case "searchMetricNames_v3":
		return s.processSearchMetricNames(ctx)
	case "labelValues_v5":
		return s.processLabelValues(ctx)
	case "tagValueSuffixes_v4":
		return s.processTagValueSuffixes(ctx)
	case "labelNames_v5":
		return s.processLabelNames(ctx)
	case "seriesCount_v4":
		return s.processSeriesCount(ctx)
	case "tsdbStatus_v5":
		return s.processTSDBStatus(ctx)
	case "deleteSeries_v5":
		return s.processDeleteSeries(ctx)
	case "registerMetricNames_v3":
		return s.processRegisterMetricNames(ctx)
	case "tenants_v1":
		return s.processTenants(ctx)
	default:
		return fmt.Errorf("unsupported rpcName: %q", rpcName)
	}
}

const maxMetricNameRawSize = 1024 * 1024
const maxMetricNamesPerRequest = 1024 * 1024

func (s *Server) processRegisterMetricNames(ctx *vmselectRequestCtx) error {
	s.registerMetricNamesRequests.Inc()

	// Read request
	metricsCount, err := ctx.readUint64()
	if err != nil {
		return fmt.Errorf("cannot read metricsCount: %w", err)
	}
	if metricsCount > maxMetricNamesPerRequest {
		return fmt.Errorf("too many metric names in a single request; got %d; mustn't exceed %d", metricsCount, maxMetricNamesPerRequest)
	}
	mrs := make([]storage.MetricRow, metricsCount)
	for i := 0; i < int(metricsCount); i++ {
		if err := ctx.readDataBufBytes(maxMetricNameRawSize); err != nil {
			return fmt.Errorf("cannot read metricNameRaw: %w", err)
		}
		mr := &mrs[i]
		mr.MetricNameRaw = append(mr.MetricNameRaw[:0], ctx.dataBuf...)
		n, err := ctx.readUint64()
		if err != nil {
			return fmt.Errorf("cannot read timestamp: %w", err)
		}
		mr.Timestamp = int64(n)
	}

	if err := s.beginConcurrentRequest(ctx); err != nil {
		return ctx.writeErrorMessage(err)
	}
	defer s.endConcurrentRequest()

	// Register metric names from mrs.
	if err := s.api.RegisterMetricNames(ctx.qt, mrs, ctx.deadline); err != nil {
		return ctx.writeErrorMessage(err)
	}

	// Send an empty error message to vmselect.
	if err := ctx.writeString(""); err != nil {
		return fmt.Errorf("cannot send empty error message: %w", err)
	}
	return nil
}

func (s *Server) processDeleteSeries(ctx *vmselectRequestCtx) error {
	s.deleteSeriesRequests.Inc()

	// Read request
	if err := ctx.readSearchQuery(); err != nil {
		return err
	}

	if err := s.beginConcurrentRequest(ctx); err != nil {
		return ctx.writeErrorMessage(err)
	}
	defer s.endConcurrentRequest()

	// Execute the request.
	deletedCount, err := s.api.DeleteSeries(ctx.qt, &ctx.sq, ctx.deadline)
	if err != nil {
		return ctx.writeErrorMessage(err)
	}

	// Send an empty error message to vmselect.
	if err := ctx.writeString(""); err != nil {
		return fmt.Errorf("cannot send empty error message: %w", err)
	}
	// Send deletedCount to vmselect.
	if err := ctx.writeUint64(uint64(deletedCount)); err != nil {
		return fmt.Errorf("cannot send deletedCount=%d: %w", deletedCount, err)
	}
	return nil
}

func (s *Server) processLabelNames(ctx *vmselectRequestCtx) error {
	s.labelNamesRequests.Inc()

	// Read request
	if err := ctx.readSearchQuery(); err != nil {
		return err
	}
	maxLabelNames, err := ctx.readLimit()
	if err != nil {
		return fmt.Errorf("cannot read maxLabelNames: %w", err)
	}
	if maxLabelNames <= 0 || maxLabelNames > s.limits.MaxLabelNames {
		maxLabelNames = s.limits.MaxLabelNames
	}

	if err := s.beginConcurrentRequest(ctx); err != nil {
		return ctx.writeErrorMessage(err)
	}
	defer s.endConcurrentRequest()

	// Execute the request
	labelNames, err := s.api.LabelNames(ctx.qt, &ctx.sq, maxLabelNames, ctx.deadline)
	if err != nil {
		return ctx.writeErrorMessage(err)
	}

	// Send an empty error message to vmselect.
	if err := ctx.writeString(""); err != nil {
		return fmt.Errorf("cannot send empty error message: %w", err)
	}

	// Send labelNames to vmselect
	for _, labelName := range labelNames {
		if len(labelName) == 0 {
			// Skip empty label names, since they may break RPC communication with vmselect
			continue
		}
		if err := ctx.writeString(labelName); err != nil {
			return fmt.Errorf("cannot write label name %q: %w", labelName, err)
		}
	}
	// Send 'end of response' marker
	if err := ctx.writeString(""); err != nil {
		return fmt.Errorf("cannot send 'end of response' marker")
	}
	return nil
}

const maxLabelValueSize = 16 * 1024

func (s *Server) processLabelValues(ctx *vmselectRequestCtx) error {
	s.labelValuesRequests.Inc()

	// Read request
	if err := ctx.readDataBufBytes(maxLabelValueSize); err != nil {
		return fmt.Errorf("cannot read labelName: %w", err)
	}
	labelName := string(ctx.dataBuf)
	if err := ctx.readSearchQuery(); err != nil {
		return err
	}
	maxLabelValues, err := ctx.readLimit()
	if err != nil {
		return fmt.Errorf("cannot read maxLabelValues: %w", err)
	}
	if maxLabelValues <= 0 || maxLabelValues > s.limits.MaxLabelValues {
		maxLabelValues = s.limits.MaxLabelValues
	}

	if err := s.beginConcurrentRequest(ctx); err != nil {
		return ctx.writeErrorMessage(err)
	}
	defer s.endConcurrentRequest()

	// Execute the request
	labelValues, err := s.api.LabelValues(ctx.qt, &ctx.sq, labelName, maxLabelValues, ctx.deadline)
	if err != nil {
		return ctx.writeErrorMessage(err)
	}

	// Send an empty error message to vmselect.
	if err := ctx.writeString(""); err != nil {
		return fmt.Errorf("cannot send empty error message: %w", err)
	}

	// Send labelValues to vmselect
	for _, labelValue := range labelValues {
		if len(labelValue) == 0 {
			// Skip empty label values, since they may break RPC communication with vmselect
			continue
		}
		if err := ctx.writeString(labelValue); err != nil {
			return fmt.Errorf("cannot write labelValue %q: %w", labelValue, err)
		}
	}
	// Send 'end of label values' marker
	if err := ctx.writeString(""); err != nil {
		return fmt.Errorf("cannot send 'end of response' marker")
	}
	return nil
}

func (s *Server) processTagValueSuffixes(ctx *vmselectRequestCtx) error {
	s.tagValueSuffixesRequests.Inc()

	// read request
	accountID, projectID, err := ctx.readAccountIDProjectID()
	if err != nil {
		return err
	}
	tr, err := ctx.readTimeRange()
	if err != nil {
		return err
	}
	if err := ctx.readDataBufBytes(maxLabelValueSize); err != nil {
		return fmt.Errorf("cannot read tagKey: %w", err)
	}
	tagKey := string(ctx.dataBuf)
	if err := ctx.readDataBufBytes(maxLabelValueSize); err != nil {
		return fmt.Errorf("cannot read tagValuePrefix: %w", err)
	}
	tagValuePrefix := string(ctx.dataBuf)
	delimiter, err := ctx.readByte()
	if err != nil {
		return fmt.Errorf("cannot read delimiter: %w", err)
	}
	maxSuffixes, err := ctx.readLimit()
	if err != nil {
		return fmt.Errorf("cannot read maxTagValueSuffixes: %d", err)
	}
	if maxSuffixes <= 0 || maxSuffixes > s.limits.MaxTagValueSuffixes {
		maxSuffixes = s.limits.MaxTagValueSuffixes
	}

	if err := s.beginConcurrentRequest(ctx); err != nil {
		return ctx.writeErrorMessage(err)
	}
	defer s.endConcurrentRequest()

	// Execute the request
	suffixes, err := s.api.TagValueSuffixes(ctx.qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, ctx.deadline)
	if err != nil {
		return ctx.writeErrorMessage(err)
	}

	if len(suffixes) >= s.limits.MaxTagValueSuffixes {
		err := fmt.Errorf("more than %d tag value suffixes found "+
			"for tagKey=%q, tagValuePrefix=%q, delimiter=%c on time range %s; "+
			"either narrow down the query or increase -search.max* command-line flag value; see https://docs.victoriametrics.com/#resource-usage-limits",
			s.limits.MaxTagValueSuffixes, tagKey, tagValuePrefix, delimiter, tr.String())
		return ctx.writeErrorMessage(err)
	}

	// Send an empty error message to vmselect.
	if err := ctx.writeString(""); err != nil {
		return fmt.Errorf("cannot send empty error message: %w", err)
	}

	// Send suffixes to vmselect.
	// Suffixes may contain empty string, so prepend suffixes with suffixCount.
	if err := ctx.writeUint64(uint64(len(suffixes))); err != nil {
		return fmt.Errorf("cannot write suffixesCount: %w", err)
	}
	for i, suffix := range suffixes {
		if err := ctx.writeString(suffix); err != nil {
			return fmt.Errorf("cannot write suffix #%d: %w", i+1, err)
		}
	}
	return nil
}

func (s *Server) processSeriesCount(ctx *vmselectRequestCtx) error {
	s.seriesCountRequests.Inc()

	// Read request
	accountID, projectID, err := ctx.readAccountIDProjectID()
	if err != nil {
		return err
	}

	if err := s.beginConcurrentRequest(ctx); err != nil {
		return ctx.writeErrorMessage(err)
	}
	defer s.endConcurrentRequest()

	// Execute the request
	n, err := s.api.SeriesCount(ctx.qt, accountID, projectID, ctx.deadline)
	if err != nil {
		return ctx.writeErrorMessage(err)
	}

	// Send an empty error message to vmselect.
	if err := ctx.writeString(""); err != nil {
		return fmt.Errorf("cannot send empty error message: %w", err)
	}

	// Send series count to vmselect.
	if err := ctx.writeUint64(n); err != nil {
		return fmt.Errorf("cannot write series count to vmselect: %w", err)
	}
	return nil
}

func (s *Server) processTSDBStatus(ctx *vmselectRequestCtx) error {
	s.tsdbStatusRequests.Inc()

	// Read request
	if err := ctx.readSearchQuery(); err != nil {
		return err
	}
	if err := ctx.readDataBufBytes(maxLabelValueSize); err != nil {
		return fmt.Errorf("cannot read focusLabel: %w", err)
	}
	focusLabel := string(ctx.dataBuf)
	topN, err := ctx.readUint32()
	if err != nil {
		return fmt.Errorf("cannot read topN: %w", err)
	}

	if err := s.beginConcurrentRequest(ctx); err != nil {
		return ctx.writeErrorMessage(err)
	}
	defer s.endConcurrentRequest()

	// Execute the request
	status, err := s.api.TSDBStatus(ctx.qt, &ctx.sq, focusLabel, int(topN), ctx.deadline)
	if err != nil {
		return ctx.writeErrorMessage(err)
	}

	// Send an empty error message to vmselect.
	if err := ctx.writeString(""); err != nil {
		return fmt.Errorf("cannot send empty error message: %w", err)
	}

	// Send status to vmselect.
	return writeTSDBStatus(ctx, status)
}

func (s *Server) processTenants(ctx *vmselectRequestCtx) error {
	s.tenantsRequests.Inc()

	// Read request
	tr, err := ctx.readTimeRange()
	if err != nil {
		return err
	}

	if err := s.beginConcurrentRequest(ctx); err != nil {
		return ctx.writeErrorMessage(err)
	}
	defer s.endConcurrentRequest()

	// Execute the request
	tenants, err := s.api.Tenants(ctx.qt, tr, ctx.deadline)
	if err != nil {
		return ctx.writeErrorMessage(err)
	}

	// Send an empty error message to vmselect.
	if err := ctx.writeString(""); err != nil {
		return fmt.Errorf("cannot send empty error message: %w", err)
	}

	// Send tenants to vmselect
	for _, tenant := range tenants {
		if len(tenant) == 0 {
			logger.Panicf("BUG: unexpected empty tenant name")
		}
		if err := ctx.writeString(tenant); err != nil {
			return fmt.Errorf("cannot write tenant %q: %w", tenant, err)
		}
	}
	// Send 'end of response' marker
	if err := ctx.writeString(""); err != nil {
		return fmt.Errorf("cannot send 'end of response' marker")
	}
	return nil
}

func writeTSDBStatus(ctx *vmselectRequestCtx, status *storage.TSDBStatus) error {
	if err := ctx.writeUint64(status.TotalSeries); err != nil {
		return fmt.Errorf("cannot write totalSeries to vmselect: %w", err)
	}
	if err := ctx.writeUint64(status.TotalLabelValuePairs); err != nil {
		return fmt.Errorf("cannot write totalLabelValuePairs to vmselect: %w", err)
	}
	if err := writeTopHeapEntries(ctx, status.SeriesCountByMetricName); err != nil {
		return fmt.Errorf("cannot write seriesCountByMetricName to vmselect: %w", err)
	}
	if err := writeTopHeapEntries(ctx, status.SeriesCountByLabelName); err != nil {
		return fmt.Errorf("cannot write seriesCountByLabelName to vmselect: %w", err)
	}
	if err := writeTopHeapEntries(ctx, status.SeriesCountByFocusLabelValue); err != nil {
		return fmt.Errorf("cannot write seriesCountByFocusLabelValue to vmselect: %w", err)
	}
	if err := writeTopHeapEntries(ctx, status.SeriesCountByLabelValuePair); err != nil {
		return fmt.Errorf("cannot write seriesCountByLabelValuePair to vmselect: %w", err)
	}
	if err := writeTopHeapEntries(ctx, status.LabelValueCountByLabelName); err != nil {
		return fmt.Errorf("cannot write labelValueCountByLabelName to vmselect: %w", err)
	}
	return nil
}

func writeTopHeapEntries(ctx *vmselectRequestCtx, a []storage.TopHeapEntry) error {
	if err := ctx.writeUint64(uint64(len(a))); err != nil {
		return fmt.Errorf("cannot write topHeapEntries size: %w", err)
	}
	for _, e := range a {
		if err := ctx.writeString(e.Name); err != nil {
			return fmt.Errorf("cannot write topHeapEntry name: %w", err)
		}
		if err := ctx.writeUint64(e.Count); err != nil {
			return fmt.Errorf("cannot write topHeapEntry count: %w", err)
		}
	}
	return nil
}

func (s *Server) processSearchMetricNames(ctx *vmselectRequestCtx) error {
	s.searchMetricNamesRequests.Inc()

	// Read request.
	if err := ctx.readSearchQuery(); err != nil {
		return err
	}

	if err := s.beginConcurrentRequest(ctx); err != nil {
		return ctx.writeErrorMessage(err)
	}
	defer s.endConcurrentRequest()

	// Execute request.
	metricNames, err := s.api.SearchMetricNames(ctx.qt, &ctx.sq, ctx.deadline)
	if err != nil {
		return ctx.writeErrorMessage(err)
	}

	// Send empty error message to vmselect.
	if err := ctx.writeString(""); err != nil {
		return fmt.Errorf("cannot send empty error message: %w", err)
	}

	// Send response.
	metricNamesCount := len(metricNames)
	if err := ctx.writeUint64(uint64(metricNamesCount)); err != nil {
		return fmt.Errorf("cannot send metricNamesCount: %w", err)
	}
	for i, metricName := range metricNames {
		if err := ctx.writeString(metricName); err != nil {
			return fmt.Errorf("cannot send metricName #%d: %w", i+1, err)
		}
	}
	ctx.qt.Printf("sent %d series to vmselect", len(metricNames))
	return nil
}

func (s *Server) processSearch(ctx *vmselectRequestCtx) error {
	s.searchRequests.Inc()

	// Read request.
	if err := ctx.readSearchQuery(); err != nil {
		return err
	}

	if err := s.beginConcurrentRequest(ctx); err != nil {
		return ctx.writeErrorMessage(err)
	}
	defer s.endConcurrentRequest()

	// Initiaialize the search.
	startTime := time.Now()
	bi, err := s.api.InitSearch(ctx.qt, &ctx.sq, ctx.deadline)
	if err != nil {
		return ctx.writeErrorMessage(err)
	}
	s.indexSearchDuration.UpdateDuration(startTime)
	defer bi.MustClose()

	// Send empty error message to vmselect.
	if err := ctx.writeString(""); err != nil {
		return fmt.Errorf("cannot send empty error message: %w", err)
	}

	// Send found blocks to vmselect.
	blocksRead := 0
	for bi.NextBlock(&ctx.mb) {
		blocksRead++
		s.metricBlocksRead.Inc()
		s.metricRowsRead.Add(ctx.mb.Block.RowsCount())

		ctx.dataBuf = ctx.mb.Marshal(ctx.dataBuf[:0])
		if err := ctx.writeDataBufBytes(); err != nil {
			return fmt.Errorf("cannot send MetricBlock: %w", err)
		}
	}
	if err := bi.Error(); err != nil {
		return fmt.Errorf("search error: %w", err)
	}
	ctx.qt.Printf("sent %d blocks to vmselect", blocksRead)

	// Send 'end of response' marker
	if err := ctx.writeString(""); err != nil {
		return fmt.Errorf("cannot send 'end of response' marker")
	}
	return nil
}