diff --git a/app/vmstorage/transport/vmselect.go b/app/vmstorage/transport/vmselect.go index f625889b0..20bf71109 100644 --- a/app/vmstorage/transport/vmselect.go +++ b/app/vmstorage/transport/vmselect.go @@ -1,34 +1,23 @@ package transport import ( - "errors" "flag" "fmt" - "io" - "net" "net/http" "sync" - "sync/atomic" - "time" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" - "github.com/VictoriaMetrics/metrics" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/vmselectapi" ) var ( - maxTagKeysPerSearch = flag.Int("search.maxTagKeys", 100e3, "The maximum number of tag keys returned per search") - maxTagValuesPerSearch = flag.Int("search.maxTagValues", 100e3, "The maximum number of tag values returned per search") + maxUniqueTimeseries = flag.Int("search.maxUniqueTimeseries", 0, "The maximum number of unique time series, which can be scanned during every query. This allows protecting against heavy queries, which select unexpectedly high number of series. Zero means 'no limit'. See also -search.max* command-line flags at vmselect") + maxTagKeys = flag.Int("search.maxTagKeys", 100e3, "The maximum number of tag keys returned per search") + maxTagValues = flag.Int("search.maxTagValues", 100e3, "The maximum number of tag values returned per search") maxTagValueSuffixesPerSearch = flag.Int("search.maxTagValueSuffixesPerSearch", 100e3, "The maximum number of tag value suffixes returned from /metrics/find") - maxMetricsPerSearch = flag.Int("search.maxUniqueTimeseries", 0, "The maximum number of unique time series, which can be scanned during every query. This allows protecting against heavy queries, which select unexpectedly high number of series. Zero means 'no limit'. See also -search.max* command-line flags at vmselect") disableRPCCompression = flag.Bool(`rpc.disableCompression`, false, "Whether to disable compression of the data sent from vmstorage to vmselect. "+ "This reduces CPU usage at the cost of higher network bandwidth usage") @@ -37,879 +26,112 @@ var ( "This may be useful when multiple data sources with distinct retentions are hidden behind query-tee") ) -// VMSelectServer processes connections from vmselect. -type VMSelectServer struct { - // storage is a pointer to the underlying storage. - storage *storage.Storage - - // ln is the listener for incoming connections to the server. - ln net.Listener - - // connsMap is a map of currently established connections to the server. - // It is used for closing the connections when MustStop() is called. - connsMap ingestserver.ConnsMap - - // wg is used for waiting for worker goroutines to stop when MustStop() is called. - wg sync.WaitGroup - - // stopFlag is set to true when the server needs to stop. - stopFlag uint32 +// NewVMSelectServer starts new server at the given addr, which serves vmselect requests from the given s. +func NewVMSelectServer(addr string, s *storage.Storage) (*vmselectapi.Server, error) { + api := &vmstorageAPI{ + s: s, + } + limits := vmselectapi.Limits{ + MaxMetrics: *maxUniqueTimeseries, + MaxLabelNames: *maxTagKeys, + MaxLabelValues: *maxTagValues, + MaxTagValueSuffixes: *maxTagValueSuffixesPerSearch, + } + return vmselectapi.NewServer(addr, api, limits, *disableRPCCompression) } -// NewVMSelectServer starts new VMSelectServer at the given addr, which serves the given storage. -func NewVMSelectServer(addr string, storage *storage.Storage) (*VMSelectServer, error) { - ln, err := netutil.NewTCPListener("vmselect", addr, nil) - if err != nil { - return nil, fmt.Errorf("unable to listen vmselectAddr %s: %w", addr, err) - } - s := &VMSelectServer{ - storage: storage, - ln: ln, - } - s.connsMap.Init() - s.wg.Add(1) - go func() { - s.run() - s.wg.Done() - }() - return s, nil +// vmstorageAPI impelemnts vmselectapi.API +type vmstorageAPI struct { + s *storage.Storage } -func (s *VMSelectServer) run() { - logger.Infof("accepting vmselect conns at %s", s.ln.Addr()) - for { - c, err := s.ln.Accept() - if err != nil { - if pe, ok := err.(net.Error); ok && pe.Temporary() { - continue - } - if s.isStopping() { - return - } - logger.Panicf("FATAL: cannot process vmselect conns at %s: %s", s.ln.Addr(), err) - } - logger.Infof("accepted vmselect conn from %s", c.RemoteAddr()) - - if !s.connsMap.Add(c) { - // The server is closed. - _ = c.Close() - return - } - vmselectConns.Inc() - s.wg.Add(1) - go func() { - defer func() { - s.connsMap.Delete(c) - vmselectConns.Dec() - s.wg.Done() - }() - - // Compress responses to vmselect even if they already contain compressed blocks. - // Responses contain uncompressed metric names, which should compress well - // when the response contains high number of time series. - // Additionally, recently added metric blocks are usually uncompressed, so the compression - // should save network bandwidth. - compressionLevel := 1 - if *disableRPCCompression { - compressionLevel = 0 - } - bc, err := handshake.VMSelectServer(c, compressionLevel) - if err != nil { - if s.isStopping() { - // c is closed inside VMSelectServer.MustClose - return - } - logger.Errorf("cannot perform vmselect handshake with client %q: %s", c.RemoteAddr(), err) - _ = c.Close() - return - } - - defer func() { - if !s.isStopping() { - logger.Infof("closing vmselect conn from %s", c.RemoteAddr()) - } - _ = bc.Close() - }() - - logger.Infof("processing vmselect conn from %s", c.RemoteAddr()) - if err := s.processConn(bc); err != nil { - if s.isStopping() { - return - } - vmselectConnErrors.Inc() - logger.Errorf("cannot process vmselect conn %s: %s", c.RemoteAddr(), err) - } - }() +func (api *vmstorageAPI) InitSearch(qt *querytracer.Tracer, tfss []*storage.TagFilters, tr storage.TimeRange, maxMetrics int, deadline uint64) (vmselectapi.BlockIterator, error) { + if err := checkTimeRange(api.s, tr); err != nil { + return nil, err } + bi := getBlockIterator() + bi.sr.Init(qt, api.s, tfss, tr, maxMetrics, deadline) + if err := bi.sr.Error(); err != nil { + bi.MustClose() + return nil, err + } + return bi, nil } -var ( - vmselectConns = metrics.NewCounter("vm_vmselect_conns") - vmselectConnErrors = metrics.NewCounter("vm_vmselect_conn_errors_total") -) - -// MustClose gracefully closes s, so it no longer touches s.storage after returning. -func (s *VMSelectServer) MustClose() { - // Mark the server as stoping. - s.setIsStopping() - - // Stop accepting new connections from vmselect. - if err := s.ln.Close(); err != nil { - logger.Panicf("FATAL: cannot close vmselect listener: %s", err) - } - - // Close existing connections from vmselect, so the goroutines - // processing these connections are finished. - s.connsMap.CloseAll() - - // Wait until all the goroutines processing vmselect conns are finished. - s.wg.Wait() +func (api *vmstorageAPI) SearchMetricNames(qt *querytracer.Tracer, tfss []*storage.TagFilters, tr storage.TimeRange, maxMetrics int, deadline uint64) ([]storage.MetricName, error) { + return api.s.SearchMetricNames(qt, tfss, tr, maxMetrics, deadline) } -func (s *VMSelectServer) setIsStopping() { - atomic.StoreUint32(&s.stopFlag, 1) +func (api *vmstorageAPI) LabelValues(qt *querytracer.Tracer, accountID, projectID uint32, tfss []*storage.TagFilters, tr storage.TimeRange, labelName string, + maxLabelValues, maxMetrics int, deadline uint64) ([]string, error) { + return api.s.SearchLabelValuesWithFiltersOnTimeRange(qt, accountID, projectID, labelName, tfss, tr, maxLabelValues, maxMetrics, deadline) } -func (s *VMSelectServer) isStopping() bool { - return atomic.LoadUint32(&s.stopFlag) != 0 +func (api *vmstorageAPI) TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix []byte, delimiter byte, + maxSuffixes int, deadline uint64) ([]string, error) { + return api.s.SearchTagValueSuffixes(qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, deadline) } -func (s *VMSelectServer) processConn(bc *handshake.BufferedConn) error { - ctx := &vmselectRequestCtx{ - bc: bc, - sizeBuf: make([]byte, 8), - } - for { - if err := s.processRequest(ctx); err != nil { - if err == io.EOF { - // Remote client gracefully closed the connection. - return nil - } - if errors.Is(err, storage.ErrDeadlineExceeded) { - return fmt.Errorf("cannot process vmselect request in %d seconds: %w", ctx.timeout, err) - } - return fmt.Errorf("cannot process vmselect request: %w", err) - } - if err := bc.Flush(); err != nil { - return fmt.Errorf("cannot flush compressed buffers: %w", err) - } - } +func (api *vmstorageAPI) LabelNames(qt *querytracer.Tracer, accountID, projectID uint32, tfss []*storage.TagFilters, tr storage.TimeRange, maxLabelNames, + maxMetrics int, deadline uint64) ([]string, error) { + return api.s.SearchLabelNamesWithFiltersOnTimeRange(qt, accountID, projectID, tfss, tr, maxLabelNames, maxMetrics, deadline) } -type vmselectRequestCtx struct { - bc *handshake.BufferedConn - sizeBuf []byte - dataBuf []byte - - qt *querytracer.Tracer - sq storage.SearchQuery - tfss []*storage.TagFilters - sr storage.Search - mb storage.MetricBlock - - // timeout in seconds for the current request - timeout uint64 - - // deadline in unix timestamp seconds for the current request. - deadline uint64 +func (api *vmstorageAPI) SeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, deadline uint64) (uint64, error) { + return api.s.GetSeriesCount(accountID, projectID, deadline) } -func (ctx *vmselectRequestCtx) readTimeRange() (storage.TimeRange, error) { - var tr storage.TimeRange - minTimestamp, err := ctx.readUint64() - if err != nil { - return tr, fmt.Errorf("cannot read minTimestamp: %w", err) - } - maxTimestamp, err := ctx.readUint64() - if err != nil { - return tr, fmt.Errorf("cannot read maxTimestamp: %w", err) - } - tr.MinTimestamp = int64(minTimestamp) - tr.MaxTimestamp = int64(maxTimestamp) - return tr, nil +func (api *vmstorageAPI) TSDBStatus(qt *querytracer.Tracer, accountID, projectID uint32, tfss []*storage.TagFilters, date uint64, focusLabel string, + topN, maxMetrics int, deadline uint64) (*storage.TSDBStatus, error) { + return api.s.GetTSDBStatus(qt, accountID, projectID, tfss, date, focusLabel, topN, maxMetrics, deadline) } -func (ctx *vmselectRequestCtx) readLimit() (int, error) { - n, err := ctx.readUint32() - if err != nil { - return 0, fmt.Errorf("cannot read limit: %w", err) - } - if n > 1<<31-1 { - n = 1<<31 - 1 - } - return int(n), nil +func (api *vmstorageAPI) DeleteMetrics(qt *querytracer.Tracer, tfss []*storage.TagFilters, maxMetrics int, deadline uint64) (int, error) { + return api.s.DeleteMetrics(qt, tfss) } -func (ctx *vmselectRequestCtx) readUint32() (uint32, error) { - ctx.sizeBuf = bytesutil.ResizeNoCopyMayOverallocate(ctx.sizeBuf, 4) - if _, err := io.ReadFull(ctx.bc, ctx.sizeBuf); err != nil { - if err == io.EOF { - return 0, err - } - return 0, fmt.Errorf("cannot read uint32: %w", err) - } - n := encoding.UnmarshalUint32(ctx.sizeBuf) - return n, nil +func (api *vmstorageAPI) RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow) error { + return api.s.RegisterMetricNames(qt, mrs) } -func (ctx *vmselectRequestCtx) readUint64() (uint64, error) { - ctx.sizeBuf = bytesutil.ResizeNoCopyMayOverallocate(ctx.sizeBuf, 8) - if _, err := io.ReadFull(ctx.bc, ctx.sizeBuf); err != nil { - if err == io.EOF { - return 0, err - } - return 0, fmt.Errorf("cannot read uint64: %w", err) - } - n := encoding.UnmarshalUint64(ctx.sizeBuf) - return n, nil +func (api *vmstorageAPI) SearchGraphitePaths(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, query []byte, + maxMetrics int, deadline uint64) ([]string, error) { + return api.s.SearchGraphitePaths(qt, accountID, projectID, tr, query, maxMetrics, deadline) } -func (ctx *vmselectRequestCtx) readAccountIDProjectID() (uint32, uint32, error) { - accountID, err := ctx.readUint32() - if err != nil { - return 0, 0, fmt.Errorf("cannot read accountID: %w", err) - } - projectID, err := ctx.readUint32() - if err != nil { - return 0, 0, fmt.Errorf("cannot read projectID: %w", err) - } - return accountID, projectID, nil +// blockIterator implements vmselectapi.BlockIterator +type blockIterator struct { + sr storage.Search } -// maxSearchQuerySize is the maximum size of SearchQuery packet in bytes. -const maxSearchQuerySize = 1024 * 1024 +var blockIteratorsPool sync.Pool -func (ctx *vmselectRequestCtx) readSearchQuery() error { - if err := ctx.readDataBufBytes(maxSearchQuerySize); err != nil { - return fmt.Errorf("cannot read searchQuery: %w", err) - } - tail, err := ctx.sq.Unmarshal(ctx.dataBuf) - if err != nil { - return fmt.Errorf("cannot unmarshal SearchQuery: %w", err) - } - if len(tail) > 0 { - return fmt.Errorf("unexpected non-zero tail left after unmarshaling SearchQuery: (len=%d) %q", len(tail), tail) - } - return nil +func (bi *blockIterator) MustClose() { + bi.sr.MustClose() + blockIteratorsPool.Put(bi) } -func (ctx *vmselectRequestCtx) readDataBufBytes(maxDataSize int) error { - ctx.sizeBuf = bytesutil.ResizeNoCopyMayOverallocate(ctx.sizeBuf, 8) - if _, err := io.ReadFull(ctx.bc, ctx.sizeBuf); err != nil { - if err == io.EOF { - return err - } - return fmt.Errorf("cannot read data size: %w", err) +func getBlockIterator() *blockIterator { + v := blockIteratorsPool.Get() + if v == nil { + v = &blockIterator{} } - dataSize := encoding.UnmarshalUint64(ctx.sizeBuf) - if dataSize > uint64(maxDataSize) { - return fmt.Errorf("too big data size: %d; it mustn't exceed %d bytes", dataSize, maxDataSize) - } - ctx.dataBuf = bytesutil.ResizeNoCopyMayOverallocate(ctx.dataBuf, int(dataSize)) - if dataSize == 0 { - return nil - } - if n, err := io.ReadFull(ctx.bc, ctx.dataBuf); err != nil { - return fmt.Errorf("cannot read data with size %d: %w; read only %d bytes", dataSize, err, n) - } - return nil + return v.(*blockIterator) } -func (ctx *vmselectRequestCtx) readBool() (bool, error) { - ctx.dataBuf = bytesutil.ResizeNoCopyMayOverallocate(ctx.dataBuf, 1) - if _, err := io.ReadFull(ctx.bc, ctx.dataBuf); err != nil { - if err == io.EOF { - return false, err - } - return false, fmt.Errorf("cannot read bool: %w", err) +func (bi *blockIterator) NextBlock(mb *storage.MetricBlock, fetchData bool) bool { + if !bi.sr.NextMetricBlock() { + return false } - v := ctx.dataBuf[0] != 0 - return v, nil + mb.MetricName = bi.sr.MetricBlockRef.MetricName + bi.sr.MetricBlockRef.BlockRef.MustReadBlock(&mb.Block, fetchData) + return true } -func (ctx *vmselectRequestCtx) readByte() (byte, error) { - ctx.dataBuf = bytesutil.ResizeNoCopyMayOverallocate(ctx.dataBuf, 1) - if _, err := io.ReadFull(ctx.bc, ctx.dataBuf); err != nil { - if err == io.EOF { - return 0, err - } - return 0, fmt.Errorf("cannot read byte: %w", err) - } - b := ctx.dataBuf[0] - return b, nil +func (bi *blockIterator) Error() error { + return bi.sr.Error() } -func (ctx *vmselectRequestCtx) writeDataBufBytes() error { - if err := ctx.writeUint64(uint64(len(ctx.dataBuf))); err != nil { - return fmt.Errorf("cannot write data size: %w", err) - } - if len(ctx.dataBuf) == 0 { - return nil - } - if _, err := ctx.bc.Write(ctx.dataBuf); err != nil { - return fmt.Errorf("cannot write data with size %d: %w", len(ctx.dataBuf), err) - } - return nil -} - -// maxErrorMessageSize is the maximum size of error message to send to clients. -const maxErrorMessageSize = 64 * 1024 - -func (ctx *vmselectRequestCtx) writeErrorMessage(err error) error { - if errors.Is(err, storage.ErrDeadlineExceeded) { - err = fmt.Errorf("cannot execute request in %d seconds: %w", ctx.timeout, err) - } - errMsg := err.Error() - if len(errMsg) > maxErrorMessageSize { - // Trim too long error message. - errMsg = errMsg[:maxErrorMessageSize] - } - if err := ctx.writeString(errMsg); err != nil { - return fmt.Errorf("cannot send error message %q to client: %w", errMsg, err) - } - return nil -} - -func (ctx *vmselectRequestCtx) writeString(s string) error { - ctx.dataBuf = append(ctx.dataBuf[:0], s...) - return ctx.writeDataBufBytes() -} - -func (ctx *vmselectRequestCtx) writeUint64(n uint64) error { - ctx.sizeBuf = encoding.MarshalUint64(ctx.sizeBuf[:0], n) - if _, err := ctx.bc.Write(ctx.sizeBuf); err != nil { - return fmt.Errorf("cannot write uint64 %d: %w", n, err) - } - return nil -} - -const maxRPCNameSize = 128 - -func (s *VMSelectServer) processRequest(ctx *vmselectRequestCtx) error { - // Read rpcName - // Do not set deadline on reading rpcName, since it may take a - // lot of time for idle connection. - if err := ctx.readDataBufBytes(maxRPCNameSize); err != nil { - if err == io.EOF { - // Remote client gracefully closed the connection. - return err - } - return fmt.Errorf("cannot read rpcName: %w", err) - } - rpcName := string(ctx.dataBuf) - - // Initialize query tracing. - traceEnabled, err := ctx.readBool() - if err != nil { - return fmt.Errorf("cannot read traceEnabled: %w", err) - } - ctx.qt = querytracer.New(traceEnabled, "%s() at vmstorage", rpcName) - - // Limit the time required for reading request args. - if err := ctx.bc.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil { - return fmt.Errorf("cannot set read deadline for reading request args: %w", err) - } - defer func() { - _ = ctx.bc.SetReadDeadline(time.Time{}) - }() - - // Read the timeout for request execution. - timeout, err := ctx.readUint32() - if err != nil { - return fmt.Errorf("cannot read timeout for the request %q: %w", rpcName, err) - } - ctx.timeout = uint64(timeout) - ctx.deadline = fasttime.UnixTimestamp() + uint64(timeout) - - // Process the rpcName call. - if err := s.processRPC(ctx, rpcName); err != nil { - return fmt.Errorf("cannot execute %q: %s", rpcName, err) - } - - // Finish query trace. - ctx.qt.Done() - traceJSON := ctx.qt.ToJSON() - if err := ctx.writeString(traceJSON); err != nil { - return fmt.Errorf("cannot send trace with length %d bytes to vmselect: %w", len(traceJSON), err) - } - return nil -} - -func (s *VMSelectServer) processRPC(ctx *vmselectRequestCtx, rpcName string) error { - switch rpcName { - case "search_v6": - return s.processSearch(ctx) - case "searchMetricNames_v3": - return s.processSearchMetricNames(ctx) - case "labelValues_v5": - return s.processLabelValues(ctx) - case "tagValueSuffixes_v3": - return s.processTagValueSuffixes(ctx) - case "labelNames_v5": - return s.processLabelNames(ctx) - case "seriesCount_v4": - return s.processSeriesCount(ctx) - case "tsdbStatus_v5": - return s.processTSDBStatus(ctx) - case "deleteMetrics_v5": - return s.processDeleteMetrics(ctx) - case "registerMetricNames_v3": - return s.processRegisterMetricNames(ctx) - default: - return fmt.Errorf("unsupported rpcName: %q", rpcName) - } -} - -const maxMetricNameRawSize = 1024 * 1024 -const maxMetricNamesPerRequest = 1024 * 1024 - -func (s *VMSelectServer) processRegisterMetricNames(ctx *vmselectRequestCtx) error { - vmselectRegisterMetricNamesRequests.Inc() - - // Read request - metricsCount, err := ctx.readUint64() - if err != nil { - return fmt.Errorf("cannot read metricsCount: %w", err) - } - if metricsCount > maxMetricNamesPerRequest { - return fmt.Errorf("too many metric names in a single request; got %d; mustn't exceed %d", metricsCount, maxMetricNamesPerRequest) - } - mrs := make([]storage.MetricRow, metricsCount) - for i := 0; i < int(metricsCount); i++ { - if err := ctx.readDataBufBytes(maxMetricNameRawSize); err != nil { - return fmt.Errorf("cannot read metricNameRaw: %w", err) - } - mr := &mrs[i] - mr.MetricNameRaw = append(mr.MetricNameRaw[:0], ctx.dataBuf...) - n, err := ctx.readUint64() - if err != nil { - return fmt.Errorf("cannot read timestamp: %w", err) - } - mr.Timestamp = int64(n) - } - - // Register metric names from mrs. - if err := s.storage.RegisterMetricNames(ctx.qt, mrs); err != nil { - return ctx.writeErrorMessage(err) - } - - // Send an empty error message to vmselect. - if err := ctx.writeString(""); err != nil { - return fmt.Errorf("cannot send empty error message: %w", err) - } - return nil -} - -func (s *VMSelectServer) processDeleteMetrics(ctx *vmselectRequestCtx) error { - vmselectDeleteMetricsRequests.Inc() - - // Read request - if err := ctx.readSearchQuery(); err != nil { - return err - } - - // Setup ctx.tfss - tr := storage.TimeRange{ - MinTimestamp: 0, - MaxTimestamp: time.Now().UnixNano() / 1e6, - } - if err := ctx.setupTfss(s.storage, tr); err != nil { - return ctx.writeErrorMessage(err) - } - - // Delete the given metrics. - deletedCount, err := s.storage.DeleteMetrics(ctx.qt, ctx.tfss) - if err != nil { - return ctx.writeErrorMessage(err) - } - - // Send an empty error message to vmselect. - if err := ctx.writeString(""); err != nil { - return fmt.Errorf("cannot send empty error message: %w", err) - } - // Send deletedCount to vmselect. - if err := ctx.writeUint64(uint64(deletedCount)); err != nil { - return fmt.Errorf("cannot send deletedCount=%d: %w", deletedCount, err) - } - return nil -} - -func (s *VMSelectServer) processLabelNames(ctx *vmselectRequestCtx) error { - vmselectLabelNamesRequests.Inc() - - // Read request - if err := ctx.readSearchQuery(); err != nil { - return err - } - maxLabelNames, err := ctx.readLimit() - if err != nil { - return fmt.Errorf("cannot read maxLabelNames: %w", err) - } - if maxLabelNames <= 0 || maxLabelNames > *maxTagKeysPerSearch { - maxLabelNames = *maxTagKeysPerSearch - } - - // Execute the request - tr := storage.TimeRange{ - MinTimestamp: ctx.sq.MinTimestamp, - MaxTimestamp: ctx.sq.MaxTimestamp, - } - if err := ctx.setupTfss(s.storage, tr); err != nil { - return ctx.writeErrorMessage(err) - } - maxMetrics := ctx.getMaxMetrics() - labelNames, err := s.storage.SearchLabelNamesWithFiltersOnTimeRange(ctx.qt, ctx.sq.AccountID, ctx.sq.ProjectID, ctx.tfss, tr, maxLabelNames, maxMetrics, ctx.deadline) - if err != nil { - return ctx.writeErrorMessage(err) - } - - // Send an empty error message to vmselect. - if err := ctx.writeString(""); err != nil { - return fmt.Errorf("cannot send empty error message: %w", err) - } - - // Send labelNames to vmselect - for _, labelName := range labelNames { - if err := ctx.writeString(labelName); err != nil { - return fmt.Errorf("cannot write label name %q: %w", labelName, err) - } - } - // Send 'end of response' marker - if err := ctx.writeString(""); err != nil { - return fmt.Errorf("cannot send 'end of response' marker") - } - return nil -} - -const maxLabelValueSize = 16 * 1024 - -func (s *VMSelectServer) processLabelValues(ctx *vmselectRequestCtx) error { - vmselectLabelValuesRequests.Inc() - - // Read request - if err := ctx.readDataBufBytes(maxLabelValueSize); err != nil { - return fmt.Errorf("cannot read labelName: %w", err) - } - labelName := string(ctx.dataBuf) - if err := ctx.readSearchQuery(); err != nil { - return err - } - maxLabelValues, err := ctx.readLimit() - if err != nil { - return fmt.Errorf("cannot read maxLabelValues: %w", err) - } - if maxLabelValues <= 0 || maxLabelValues > *maxTagValuesPerSearch { - maxLabelValues = *maxTagValuesPerSearch - } - - // Execute the request - tr := storage.TimeRange{ - MinTimestamp: ctx.sq.MinTimestamp, - MaxTimestamp: ctx.sq.MaxTimestamp, - } - if err := ctx.setupTfss(s.storage, tr); err != nil { - return ctx.writeErrorMessage(err) - } - maxMetrics := ctx.getMaxMetrics() - labelValues, err := s.storage.SearchLabelValuesWithFiltersOnTimeRange(ctx.qt, ctx.sq.AccountID, ctx.sq.ProjectID, labelName, ctx.tfss, tr, - maxLabelValues, maxMetrics, ctx.deadline) - if err != nil { - return ctx.writeErrorMessage(err) - } - - // Send an empty error message to vmselect. - if err := ctx.writeString(""); err != nil { - return fmt.Errorf("cannot send empty error message: %w", err) - } - - // Send labelValues to vmselect - for _, labelValue := range labelValues { - if len(labelValue) == 0 { - // Skip empty label values, since they have no sense for prometheus. - continue - } - if err := ctx.writeString(labelValue); err != nil { - return fmt.Errorf("cannot write labelValue %q: %w", labelValue, err) - } - } - // Send 'end of label values' marker - if err := ctx.writeString(""); err != nil { - return fmt.Errorf("cannot send 'end of response' marker") - } - return nil -} - -func (s *VMSelectServer) processTagValueSuffixes(ctx *vmselectRequestCtx) error { - vmselectTagValueSuffixesRequests.Inc() - - // read request - accountID, projectID, err := ctx.readAccountIDProjectID() - if err != nil { - return err - } - tr, err := ctx.readTimeRange() - if err != nil { - return err - } - if err := ctx.readDataBufBytes(maxLabelValueSize); err != nil { - return fmt.Errorf("cannot read tagKey: %w", err) - } - tagKey := append([]byte{}, ctx.dataBuf...) - if err := ctx.readDataBufBytes(maxLabelValueSize); err != nil { - return fmt.Errorf("cannot read tagValuePrefix: %w", err) - } - tagValuePrefix := append([]byte{}, ctx.dataBuf...) - delimiter, err := ctx.readByte() - if err != nil { - return fmt.Errorf("cannot read delimiter: %w", err) - } - - // Search for tag value suffixes - suffixes, err := s.storage.SearchTagValueSuffixes(ctx.qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, *maxTagValueSuffixesPerSearch, ctx.deadline) - if err != nil { - return ctx.writeErrorMessage(err) - } - if len(suffixes) >= *maxTagValueSuffixesPerSearch { - err := fmt.Errorf("more than -search.maxTagValueSuffixesPerSearch=%d tag value suffixes found "+ - "for tagKey=%q, tagValuePrefix=%q, delimiter=%c on time range %s; "+ - "either narrow down the query or increase -search.maxTagValueSuffixesPerSearch command-line flag value", - *maxTagValueSuffixesPerSearch, tagKey, tagValuePrefix, delimiter, tr.String()) - return ctx.writeErrorMessage(err) - } - - // Send an empty error message to vmselect. - if err := ctx.writeString(""); err != nil { - return fmt.Errorf("cannot send empty error message: %w", err) - } - - // Send suffixes to vmselect. - // Suffixes may contain empty string, so prepend suffixes with suffixCount. - if err := ctx.writeUint64(uint64(len(suffixes))); err != nil { - return fmt.Errorf("cannot write suffixesCount: %w", err) - } - for i, suffix := range suffixes { - if err := ctx.writeString(suffix); err != nil { - return fmt.Errorf("cannot write suffix #%d: %w", i+1, err) - } - } - return nil -} - -func (s *VMSelectServer) processSeriesCount(ctx *vmselectRequestCtx) error { - vmselectSeriesCountRequests.Inc() - - // Read request - accountID, projectID, err := ctx.readAccountIDProjectID() - if err != nil { - return err - } - - // Execute the request - n, err := s.storage.GetSeriesCount(accountID, projectID, ctx.deadline) - if err != nil { - return ctx.writeErrorMessage(err) - } - - // Send an empty error message to vmselect. - if err := ctx.writeString(""); err != nil { - return fmt.Errorf("cannot send empty error message: %w", err) - } - - // Send series count to vmselect. - if err := ctx.writeUint64(n); err != nil { - return fmt.Errorf("cannot write series count to vmselect: %w", err) - } - return nil -} - -func (s *VMSelectServer) processTSDBStatus(ctx *vmselectRequestCtx) error { - vmselectTSDBStatusRequests.Inc() - - // Read request - if err := ctx.readSearchQuery(); err != nil { - return err - } - if err := ctx.readDataBufBytes(maxLabelValueSize); err != nil { - return fmt.Errorf("cannot read focusLabel: %w", err) - } - focusLabel := string(ctx.dataBuf) - topN, err := ctx.readUint32() - if err != nil { - return fmt.Errorf("cannot read topN: %w", err) - } - - // Execute the request - tr := storage.TimeRange{ - MinTimestamp: ctx.sq.MinTimestamp, - MaxTimestamp: ctx.sq.MaxTimestamp, - } - if err := ctx.setupTfss(s.storage, tr); err != nil { - return ctx.writeErrorMessage(err) - } - maxMetrics := ctx.getMaxMetrics() - date := uint64(ctx.sq.MinTimestamp) / (24 * 3600 * 1000) - status, err := s.storage.GetTSDBStatus(ctx.qt, ctx.sq.AccountID, ctx.sq.ProjectID, ctx.tfss, date, focusLabel, int(topN), maxMetrics, ctx.deadline) - if err != nil { - return ctx.writeErrorMessage(err) - } - - // Send an empty error message to vmselect. - if err := ctx.writeString(""); err != nil { - return fmt.Errorf("cannot send empty error message: %w", err) - } - - // Send status to vmselect. - return writeTSDBStatus(ctx, status) -} - -func writeTSDBStatus(ctx *vmselectRequestCtx, status *storage.TSDBStatus) error { - if err := ctx.writeUint64(status.TotalSeries); err != nil { - return fmt.Errorf("cannot write totalSeries to vmselect: %w", err) - } - if err := ctx.writeUint64(status.TotalLabelValuePairs); err != nil { - return fmt.Errorf("cannot write totalLabelValuePairs to vmselect: %w", err) - } - if err := writeTopHeapEntries(ctx, status.SeriesCountByMetricName); err != nil { - return fmt.Errorf("cannot write seriesCountByMetricName to vmselect: %w", err) - } - if err := writeTopHeapEntries(ctx, status.SeriesCountByLabelName); err != nil { - return fmt.Errorf("cannot write seriesCountByLabelName to vmselect: %w", err) - } - if err := writeTopHeapEntries(ctx, status.SeriesCountByFocusLabelValue); err != nil { - return fmt.Errorf("cannot write seriesCountByFocusLabelValue to vmselect: %w", err) - } - if err := writeTopHeapEntries(ctx, status.SeriesCountByLabelValuePair); err != nil { - return fmt.Errorf("cannot write seriesCountByLabelValuePair to vmselect: %w", err) - } - if err := writeTopHeapEntries(ctx, status.LabelValueCountByLabelName); err != nil { - return fmt.Errorf("cannot write labelValueCountByLabelName to vmselect: %w", err) - } - return nil -} - -func writeTopHeapEntries(ctx *vmselectRequestCtx, a []storage.TopHeapEntry) error { - if err := ctx.writeUint64(uint64(len(a))); err != nil { - return fmt.Errorf("cannot write topHeapEntries size: %w", err) - } - for _, e := range a { - if err := ctx.writeString(e.Name); err != nil { - return fmt.Errorf("cannot write topHeapEntry name: %w", err) - } - if err := ctx.writeUint64(e.Count); err != nil { - return fmt.Errorf("cannot write topHeapEntry count: %w", err) - } - } - return nil -} - -func (s *VMSelectServer) processSearchMetricNames(ctx *vmselectRequestCtx) error { - vmselectSearchMetricNamesRequests.Inc() - - // Read request. - if err := ctx.readSearchQuery(); err != nil { - return err - } - - // Search metric names. - tr := storage.TimeRange{ - MinTimestamp: ctx.sq.MinTimestamp, - MaxTimestamp: ctx.sq.MaxTimestamp, - } - if err := ctx.setupTfss(s.storage, tr); err != nil { - return ctx.writeErrorMessage(err) - } - maxMetrics := ctx.getMaxMetrics() - mns, err := s.storage.SearchMetricNames(ctx.qt, ctx.tfss, tr, maxMetrics, ctx.deadline) - if err != nil { - return ctx.writeErrorMessage(err) - } - - // Send empty error message to vmselect. - if err := ctx.writeString(""); err != nil { - return fmt.Errorf("cannot send empty error message: %w", err) - } - - // Send response. - metricNamesCount := len(mns) - if err := ctx.writeUint64(uint64(metricNamesCount)); err != nil { - return fmt.Errorf("cannot send metricNamesCount: %w", err) - } - for i, mn := range mns { - ctx.dataBuf = mn.Marshal(ctx.dataBuf[:0]) - if err := ctx.writeDataBufBytes(); err != nil { - return fmt.Errorf("cannot send metricName #%d: %w", i+1, err) - } - } - ctx.qt.Printf("sent %d series to vmselect", len(mns)) - return nil -} - -func (s *VMSelectServer) processSearch(ctx *vmselectRequestCtx) error { - vmselectSearchRequests.Inc() - - // Read request. - if err := ctx.readSearchQuery(); err != nil { - return err - } - fetchData, err := ctx.readBool() - if err != nil { - return fmt.Errorf("cannot read `fetchData` bool: %w", err) - } - - // Setup search. - tr := storage.TimeRange{ - MinTimestamp: ctx.sq.MinTimestamp, - MaxTimestamp: ctx.sq.MaxTimestamp, - } - if err := ctx.setupTfss(s.storage, tr); err != nil { - return ctx.writeErrorMessage(err) - } - if err := checkTimeRange(s.storage, tr); err != nil { - return ctx.writeErrorMessage(err) - } - startTime := time.Now() - maxMetrics := ctx.getMaxMetrics() - ctx.sr.Init(ctx.qt, s.storage, ctx.tfss, tr, maxMetrics, ctx.deadline) - indexSearchDuration.UpdateDuration(startTime) - defer ctx.sr.MustClose() - if err := ctx.sr.Error(); err != nil { - return ctx.writeErrorMessage(err) - } - - // Send empty error message to vmselect. - if err := ctx.writeString(""); err != nil { - return fmt.Errorf("cannot send empty error message: %w", err) - } - - // Send found blocks to vmselect. - blocksRead := 0 - for ctx.sr.NextMetricBlock() { - blocksRead++ - ctx.mb.MetricName = ctx.sr.MetricBlockRef.MetricName - ctx.sr.MetricBlockRef.BlockRef.MustReadBlock(&ctx.mb.Block, fetchData) - - vmselectMetricBlocksRead.Inc() - vmselectMetricRowsRead.Add(ctx.mb.Block.RowsCount()) - - ctx.dataBuf = ctx.mb.Marshal(ctx.dataBuf[:0]) - if err := ctx.writeDataBufBytes(); err != nil { - return fmt.Errorf("cannot send MetricBlock: %w", err) - } - } - if err := ctx.sr.Error(); err != nil { - return fmt.Errorf("search error: %w", err) - } - ctx.qt.Printf("sent %d blocks to vmselect", blocksRead) - - // Send 'end of response' marker - if err := ctx.writeString(""); err != nil { - return fmt.Errorf("cannot send 'end of response' marker") - } - return nil -} - -var indexSearchDuration = metrics.NewHistogram(`vm_index_search_duration_seconds`) - // checkTimeRange returns true if the given tr is denied for querying. func checkTimeRange(s *storage.Storage, tr storage.TimeRange) error { if !*denyQueriesOutsideRetention { @@ -926,62 +148,3 @@ func checkTimeRange(s *storage.Storage, tr storage.TimeRange) error { StatusCode: http.StatusServiceUnavailable, } } - -var ( - vmselectRegisterMetricNamesRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="register_metric_names"}`) - vmselectDeleteMetricsRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="delete_metrics"}`) - vmselectLabelNamesRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="label_names"}`) - vmselectLabelValuesRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="label_values"}`) - vmselectTagValueSuffixesRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="tag_value_suffixes"}`) - vmselectSeriesCountRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="series_count"}`) - vmselectTSDBStatusRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="tsdb_status"}`) - vmselectSearchMetricNamesRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="search_metric_names"}`) - vmselectSearchRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="search"}`) - - vmselectMetricBlocksRead = metrics.NewCounter(`vm_vmselect_metric_blocks_read_total`) - vmselectMetricRowsRead = metrics.NewCounter(`vm_vmselect_metric_rows_read_total`) -) - -func (ctx *vmselectRequestCtx) getMaxMetrics() int { - maxMetrics := ctx.sq.MaxMetrics - maxMetricsLimit := *maxMetricsPerSearch - if maxMetricsLimit <= 0 { - maxMetricsLimit = 2e9 - } - if maxMetrics <= 0 || maxMetrics > maxMetricsLimit { - maxMetrics = maxMetricsLimit - } - return maxMetrics -} - -func (ctx *vmselectRequestCtx) setupTfss(s *storage.Storage, tr storage.TimeRange) error { - tfss := ctx.tfss[:0] - accountID := ctx.sq.AccountID - projectID := ctx.sq.ProjectID - for _, tagFilters := range ctx.sq.TagFilterss { - tfs := storage.NewTagFilters(accountID, projectID) - for i := range tagFilters { - tf := &tagFilters[i] - if string(tf.Key) == "__graphite__" { - query := tf.Value - maxMetrics := ctx.getMaxMetrics() - paths, err := s.SearchGraphitePaths(ctx.qt, accountID, projectID, tr, query, maxMetrics, ctx.deadline) - if err != nil { - return fmt.Errorf("error when searching for Graphite paths for query %q: %w", query, err) - } - if len(paths) >= maxMetrics { - return fmt.Errorf("more than %d time series match Graphite query %q; "+ - "either narrow down the query or increase the corresponding -search.max* command-line flag value at vmselect nodes", maxMetrics, query) - } - tfs.AddGraphiteQuery(query, paths, tf.IsNegative) - continue - } - if err := tfs.Add(tf.Key, tf.Value, tf.IsNegative, tf.IsRegexp); err != nil { - return fmt.Errorf("cannot parse tag filter %s: %w", tf, err) - } - } - tfss = append(tfss, tfs) - } - ctx.tfss = tfss - return nil -} diff --git a/lib/vmselectapi/api.go b/lib/vmselectapi/api.go new file mode 100644 index 000000000..6fd8f498b --- /dev/null +++ b/lib/vmselectapi/api.go @@ -0,0 +1,59 @@ +package vmselectapi + +import ( + "github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" +) + +// API must implement vmselect API. +type API interface { + // InitSearch initialize series search for the given tfss. + // + // The returned BlockIterator must be closed with MustClose to free up resources when it is no longer needed. + InitSearch(qt *querytracer.Tracer, tfss []*storage.TagFilters, tr storage.TimeRange, maxMetrics int, deadline uint64) (BlockIterator, error) + + // SearchMetricNames returns metric names matching the given tfss. + SearchMetricNames(qt *querytracer.Tracer, tfss []*storage.TagFilters, tr storage.TimeRange, maxMetrics int, deadline uint64) ([]storage.MetricName, error) + + // LabelValues returns values for labelName label acorss series matching the given tfss. + LabelValues(qt *querytracer.Tracer, accountID, projectID uint32, tfss []*storage.TagFilters, tr storage.TimeRange, labelName string, maxLabelValues, maxMetrics int, deadline uint64) ([]string, error) + + // TagValueSuffixes returns tag value suffixes for the given args. + TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix []byte, delimiter byte, maxSuffixes int, deadline uint64) ([]string, error) + + // LabelNames returns lable names for series matching the given tfss. + LabelNames(qt *querytracer.Tracer, accountID, projectID uint32, tfss []*storage.TagFilters, tr storage.TimeRange, maxLableNames, maxMetrics int, deadline uint64) ([]string, error) + + // SeriesCount returns the number of series for the given (accountID, projectID). + SeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, deadline uint64) (uint64, error) + + // TSDBStatus returns tsdb status for the given sq. + TSDBStatus(qt *querytracer.Tracer, accountID, projectID uint32, tfss []*storage.TagFilters, date uint64, focusLabel string, topN, maxMetrics int, deadline uint64) (*storage.TSDBStatus, error) + + // DeleteMetrics deletes metrics matching the given tfss. + DeleteMetrics(qt *querytracer.Tracer, tfss []*storage.TagFilters, maxMetrics int, deadline uint64) (int, error) + + // RegisterMetricNames registers the given mrs in the storage. + RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow) error + + // SearchGraphitePaths searches for Graphite paths for the given query. + SearchGraphitePaths(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, query []byte, maxMetrics int, deadline uint64) ([]string, error) +} + +// BlockIterator must iterate through series blocks found by VMSelect.InitSearch. +// +// MustClose must be called in order to free up allocated resources when BlockIterator is no longer needed. +type BlockIterator interface { + // NextBlock reads the next block into mb. + // + // It returns true on success, false on error or if no blocks to read. + // + // If fetchData is false, then only mb.MetricName is read. Otherwise mb.Block is also read. + NextBlock(mb *storage.MetricBlock, fetchData bool) bool + + // MustClose frees up resources allocated by BlockIterator. + MustClose() + + // Error returns the last error occurred in NextBlock(), which returns false. + Error() error +} diff --git a/lib/vmselectapi/server.go b/lib/vmselectapi/server.go new file mode 100644 index 000000000..c491e7da6 --- /dev/null +++ b/lib/vmselectapi/server.go @@ -0,0 +1,989 @@ +package vmselectapi + +import ( + "errors" + "fmt" + "io" + "net" + "sync" + "sync/atomic" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" + "github.com/VictoriaMetrics/metrics" +) + +// Server processes vmselect requests. +type Server struct { + // api contains the implementation of the server API for vmselect requests. + api API + + // limits contains various limits for the Server. + limits Limits + + // disableResponseCompression controls whether vmselect server must compress responses. + disableResponseCompression bool + + // ln is the listener for incoming connections to the server. + ln net.Listener + + // connsMap is a map of currently established connections to the server. + // It is used for closing the connections when MustStop() is called. + connsMap ingestserver.ConnsMap + + // wg is used for waiting for worker goroutines to stop when MustStop() is called. + wg sync.WaitGroup + + // stopFlag is set to true when the server needs to stop. + stopFlag uint32 + + vmselectConns *metrics.Counter + vmselectConnErrors *metrics.Counter + + indexSearchDuration *metrics.Histogram + + registerMetricNamesRequests *metrics.Counter + deleteMetricsRequests *metrics.Counter + labelNamesRequests *metrics.Counter + labelValuesRequests *metrics.Counter + tagValueSuffixesRequests *metrics.Counter + seriesCountRequests *metrics.Counter + tsdbStatusRequests *metrics.Counter + searchMetricNamesRequests *metrics.Counter + searchRequests *metrics.Counter + + metricBlocksRead *metrics.Counter + metricRowsRead *metrics.Counter +} + +// Limits contains various limits for Server. +type Limits struct { + // MaxMetrics is the maximum number of time series, which may be returned from various API calls. + MaxMetrics int + + // MaxLabelNames is the maximum label names, which may be returned from labelNames request. + MaxLabelNames int + + // MaxLabelValues is the maximum label values, which may be returned from labelValues request. + MaxLabelValues int + + // MaxTagValueSuffixes is the maximum number of entries, which can be returned from tagValueSuffixes request. + MaxTagValueSuffixes int +} + +// NewServer starts new Server at the given addr, which serves the given api with the given limits. +// +// If disableResponseCompression is set to true, then the returned server doesn't compress responses. +func NewServer(addr string, api API, limits Limits, disableResponseCompression bool) (*Server, error) { + ln, err := netutil.NewTCPListener("vmselect", addr, nil) + if err != nil { + return nil, fmt.Errorf("unable to listen vmselectAddr %s: %w", addr, err) + } + s := &Server{ + api: api, + limits: limits, + ln: ln, + + vmselectConns: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_conns{addr=%q}`, addr)), + vmselectConnErrors: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_conn_errors_total{addr=%q}`, addr)), + + indexSearchDuration: metrics.NewHistogram(fmt.Sprintf(`vm_index_search_duration_seconds{addr=%q}`, addr)), + + registerMetricNamesRequests: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="registerMetricNames",addr=%q}`, addr)), + deleteMetricsRequests: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="deleteMetrics",addr=%q}`, addr)), + labelNamesRequests: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="labelNames",addr=%q}`, addr)), + labelValuesRequests: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="labelValues",addr=%q}`, addr)), + tagValueSuffixesRequests: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="tagValueSuffixes",addr=%q}`, addr)), + seriesCountRequests: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="seriesSount",addr=%q}`, addr)), + tsdbStatusRequests: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="tsdbStatus",addr=%q}`, addr)), + searchMetricNamesRequests: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="searchMetricNames",addr=%q}`, addr)), + searchRequests: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="search",addr=%q}`, addr)), + + metricBlocksRead: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_metric_blocks_read_total{addr=%q}`, addr)), + metricRowsRead: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_metric_rows_read_total{addr=%q}`, addr)), + } + s.connsMap.Init() + s.wg.Add(1) + go func() { + s.run() + s.wg.Done() + }() + return s, nil +} + +func (s *Server) run() { + logger.Infof("accepting vmselect conns at %s", s.ln.Addr()) + for { + c, err := s.ln.Accept() + if err != nil { + if pe, ok := err.(net.Error); ok && pe.Temporary() { + continue + } + if s.isStopping() { + return + } + logger.Panicf("FATAL: cannot process vmselect conns at %s: %s", s.ln.Addr(), err) + } + logger.Infof("accepted vmselect conn from %s", c.RemoteAddr()) + + if !s.connsMap.Add(c) { + // The server is closed. + _ = c.Close() + return + } + s.vmselectConns.Inc() + s.wg.Add(1) + go func() { + defer func() { + s.connsMap.Delete(c) + s.vmselectConns.Dec() + s.wg.Done() + }() + + // Compress responses to vmselect even if they already contain compressed blocks. + // Responses contain uncompressed metric names, which should compress well + // when the response contains high number of time series. + // Additionally, recently added metric blocks are usually uncompressed, so the compression + // should save network bandwidth. + compressionLevel := 1 + if s.disableResponseCompression { + compressionLevel = 0 + } + bc, err := handshake.VMSelectServer(c, compressionLevel) + if err != nil { + if s.isStopping() { + // c is closed inside Server.MustClose + return + } + logger.Errorf("cannot perform vmselect handshake with client %q: %s", c.RemoteAddr(), err) + _ = c.Close() + return + } + + defer func() { + if !s.isStopping() { + logger.Infof("closing vmselect conn from %s", c.RemoteAddr()) + } + _ = bc.Close() + }() + + logger.Infof("processing vmselect conn from %s", c.RemoteAddr()) + if err := s.processConn(bc); err != nil { + if s.isStopping() { + return + } + s.vmselectConnErrors.Inc() + logger.Errorf("cannot process vmselect conn %s: %s", c.RemoteAddr(), err) + } + }() + } +} + +// MustClose gracefully closes s, so it no longer touches s.api after returning. +func (s *Server) MustClose() { + // Mark the server as stoping. + s.setIsStopping() + + // Stop accepting new connections from vmselect. + if err := s.ln.Close(); err != nil { + logger.Panicf("FATAL: cannot close vmselect listener: %s", err) + } + + // Close existing connections from vmselect, so the goroutines + // processing these connections are finished. + s.connsMap.CloseAll() + + // Wait until all the goroutines processing vmselect conns are finished. + s.wg.Wait() +} + +func (s *Server) setIsStopping() { + atomic.StoreUint32(&s.stopFlag, 1) +} + +func (s *Server) isStopping() bool { + return atomic.LoadUint32(&s.stopFlag) != 0 +} + +func (s *Server) processConn(bc *handshake.BufferedConn) error { + ctx := &vmselectRequestCtx{ + bc: bc, + sizeBuf: make([]byte, 8), + } + for { + if err := s.processRequest(ctx); err != nil { + if err == io.EOF { + // Remote client gracefully closed the connection. + return nil + } + if errors.Is(err, storage.ErrDeadlineExceeded) { + return fmt.Errorf("cannot process vmselect request in %d seconds: %w", ctx.timeout, err) + } + return fmt.Errorf("cannot process vmselect request: %w", err) + } + if err := bc.Flush(); err != nil { + return fmt.Errorf("cannot flush compressed buffers: %w", err) + } + } +} + +type vmselectRequestCtx struct { + bc *handshake.BufferedConn + sizeBuf []byte + dataBuf []byte + + qt *querytracer.Tracer + sq storage.SearchQuery + mb storage.MetricBlock + + // timeout in seconds for the current request + timeout uint64 + + // deadline in unix timestamp seconds for the current request. + deadline uint64 +} + +func (ctx *vmselectRequestCtx) readTimeRange() (storage.TimeRange, error) { + var tr storage.TimeRange + minTimestamp, err := ctx.readUint64() + if err != nil { + return tr, fmt.Errorf("cannot read minTimestamp: %w", err) + } + maxTimestamp, err := ctx.readUint64() + if err != nil { + return tr, fmt.Errorf("cannot read maxTimestamp: %w", err) + } + tr.MinTimestamp = int64(minTimestamp) + tr.MaxTimestamp = int64(maxTimestamp) + return tr, nil +} + +func (ctx *vmselectRequestCtx) readLimit() (int, error) { + n, err := ctx.readUint32() + if err != nil { + return 0, fmt.Errorf("cannot read limit: %w", err) + } + if n > 1<<31-1 { + n = 1<<31 - 1 + } + return int(n), nil +} + +func (ctx *vmselectRequestCtx) readUint32() (uint32, error) { + ctx.sizeBuf = bytesutil.ResizeNoCopyMayOverallocate(ctx.sizeBuf, 4) + if _, err := io.ReadFull(ctx.bc, ctx.sizeBuf); err != nil { + if err == io.EOF { + return 0, err + } + return 0, fmt.Errorf("cannot read uint32: %w", err) + } + n := encoding.UnmarshalUint32(ctx.sizeBuf) + return n, nil +} + +func (ctx *vmselectRequestCtx) readUint64() (uint64, error) { + ctx.sizeBuf = bytesutil.ResizeNoCopyMayOverallocate(ctx.sizeBuf, 8) + if _, err := io.ReadFull(ctx.bc, ctx.sizeBuf); err != nil { + if err == io.EOF { + return 0, err + } + return 0, fmt.Errorf("cannot read uint64: %w", err) + } + n := encoding.UnmarshalUint64(ctx.sizeBuf) + return n, nil +} + +func (ctx *vmselectRequestCtx) readAccountIDProjectID() (uint32, uint32, error) { + accountID, err := ctx.readUint32() + if err != nil { + return 0, 0, fmt.Errorf("cannot read accountID: %w", err) + } + projectID, err := ctx.readUint32() + if err != nil { + return 0, 0, fmt.Errorf("cannot read projectID: %w", err) + } + return accountID, projectID, nil +} + +// maxSearchQuerySize is the maximum size of SearchQuery packet in bytes. +const maxSearchQuerySize = 1024 * 1024 + +func (ctx *vmselectRequestCtx) readSearchQuery() error { + if err := ctx.readDataBufBytes(maxSearchQuerySize); err != nil { + return fmt.Errorf("cannot read searchQuery: %w", err) + } + tail, err := ctx.sq.Unmarshal(ctx.dataBuf) + if err != nil { + return fmt.Errorf("cannot unmarshal SearchQuery: %w", err) + } + if len(tail) > 0 { + return fmt.Errorf("unexpected non-zero tail left after unmarshaling SearchQuery: (len=%d) %q", len(tail), tail) + } + return nil +} + +func (ctx *vmselectRequestCtx) readDataBufBytes(maxDataSize int) error { + ctx.sizeBuf = bytesutil.ResizeNoCopyMayOverallocate(ctx.sizeBuf, 8) + if _, err := io.ReadFull(ctx.bc, ctx.sizeBuf); err != nil { + if err == io.EOF { + return err + } + return fmt.Errorf("cannot read data size: %w", err) + } + dataSize := encoding.UnmarshalUint64(ctx.sizeBuf) + if dataSize > uint64(maxDataSize) { + return fmt.Errorf("too big data size: %d; it mustn't exceed %d bytes", dataSize, maxDataSize) + } + ctx.dataBuf = bytesutil.ResizeNoCopyMayOverallocate(ctx.dataBuf, int(dataSize)) + if dataSize == 0 { + return nil + } + if n, err := io.ReadFull(ctx.bc, ctx.dataBuf); err != nil { + return fmt.Errorf("cannot read data with size %d: %w; read only %d bytes", dataSize, err, n) + } + return nil +} + +func (ctx *vmselectRequestCtx) readBool() (bool, error) { + ctx.dataBuf = bytesutil.ResizeNoCopyMayOverallocate(ctx.dataBuf, 1) + if _, err := io.ReadFull(ctx.bc, ctx.dataBuf); err != nil { + if err == io.EOF { + return false, err + } + return false, fmt.Errorf("cannot read bool: %w", err) + } + v := ctx.dataBuf[0] != 0 + return v, nil +} + +func (ctx *vmselectRequestCtx) readByte() (byte, error) { + ctx.dataBuf = bytesutil.ResizeNoCopyMayOverallocate(ctx.dataBuf, 1) + if _, err := io.ReadFull(ctx.bc, ctx.dataBuf); err != nil { + if err == io.EOF { + return 0, err + } + return 0, fmt.Errorf("cannot read byte: %w", err) + } + b := ctx.dataBuf[0] + return b, nil +} + +func (ctx *vmselectRequestCtx) writeDataBufBytes() error { + if err := ctx.writeUint64(uint64(len(ctx.dataBuf))); err != nil { + return fmt.Errorf("cannot write data size: %w", err) + } + if len(ctx.dataBuf) == 0 { + return nil + } + if _, err := ctx.bc.Write(ctx.dataBuf); err != nil { + return fmt.Errorf("cannot write data with size %d: %w", len(ctx.dataBuf), err) + } + return nil +} + +// maxErrorMessageSize is the maximum size of error message to send to clients. +const maxErrorMessageSize = 64 * 1024 + +func (ctx *vmselectRequestCtx) writeErrorMessage(err error) error { + if errors.Is(err, storage.ErrDeadlineExceeded) { + err = fmt.Errorf("cannot execute request in %d seconds: %w", ctx.timeout, err) + } + errMsg := err.Error() + if len(errMsg) > maxErrorMessageSize { + // Trim too long error message. + errMsg = errMsg[:maxErrorMessageSize] + } + if err := ctx.writeString(errMsg); err != nil { + return fmt.Errorf("cannot send error message %q to client: %w", errMsg, err) + } + return nil +} + +func (ctx *vmselectRequestCtx) writeString(s string) error { + ctx.dataBuf = append(ctx.dataBuf[:0], s...) + return ctx.writeDataBufBytes() +} + +func (ctx *vmselectRequestCtx) writeUint64(n uint64) error { + ctx.sizeBuf = encoding.MarshalUint64(ctx.sizeBuf[:0], n) + if _, err := ctx.bc.Write(ctx.sizeBuf); err != nil { + return fmt.Errorf("cannot write uint64 %d: %w", n, err) + } + return nil +} + +const maxRPCNameSize = 128 + +func (s *Server) processRequest(ctx *vmselectRequestCtx) error { + // Read rpcName + // Do not set deadline on reading rpcName, since it may take a + // lot of time for idle connection. + if err := ctx.readDataBufBytes(maxRPCNameSize); err != nil { + if err == io.EOF { + // Remote client gracefully closed the connection. + return err + } + return fmt.Errorf("cannot read rpcName: %w", err) + } + rpcName := string(ctx.dataBuf) + + // Initialize query tracing. + traceEnabled, err := ctx.readBool() + if err != nil { + return fmt.Errorf("cannot read traceEnabled: %w", err) + } + ctx.qt = querytracer.New(traceEnabled, "%s() at vmstorage", rpcName) + + // Limit the time required for reading request args. + if err := ctx.bc.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil { + return fmt.Errorf("cannot set read deadline for reading request args: %w", err) + } + defer func() { + _ = ctx.bc.SetReadDeadline(time.Time{}) + }() + + // Read the timeout for request execution. + timeout, err := ctx.readUint32() + if err != nil { + return fmt.Errorf("cannot read timeout for the request %q: %w", rpcName, err) + } + ctx.timeout = uint64(timeout) + ctx.deadline = fasttime.UnixTimestamp() + uint64(timeout) + + // Process the rpcName call. + if err := s.processRPC(ctx, rpcName); err != nil { + return fmt.Errorf("cannot execute %q: %s", rpcName, err) + } + + // Finish query trace. + ctx.qt.Done() + traceJSON := ctx.qt.ToJSON() + if err := ctx.writeString(traceJSON); err != nil { + return fmt.Errorf("cannot send trace with length %d bytes to vmselect: %w", len(traceJSON), err) + } + return nil +} + +func (s *Server) processRPC(ctx *vmselectRequestCtx, rpcName string) error { + switch rpcName { + case "search_v6": + return s.processSeriesSearch(ctx) + case "searchMetricNames_v3": + return s.processSearchMetricNames(ctx) + case "labelValues_v5": + return s.processLabelValues(ctx) + case "tagValueSuffixes_v3": + return s.processTagValueSuffixes(ctx) + case "labelNames_v5": + return s.processLabelNames(ctx) + case "seriesCount_v4": + return s.processSeriesCount(ctx) + case "tsdbStatus_v5": + return s.processTSDBStatus(ctx) + case "deleteMetrics_v5": + return s.processDeleteMetrics(ctx) + case "registerMetricNames_v3": + return s.processRegisterMetricNames(ctx) + default: + return fmt.Errorf("unsupported rpcName: %q", rpcName) + } +} + +const maxMetricNameRawSize = 1024 * 1024 +const maxMetricNamesPerRequest = 1024 * 1024 + +func (s *Server) processRegisterMetricNames(ctx *vmselectRequestCtx) error { + s.registerMetricNamesRequests.Inc() + + // Read request + metricsCount, err := ctx.readUint64() + if err != nil { + return fmt.Errorf("cannot read metricsCount: %w", err) + } + if metricsCount > maxMetricNamesPerRequest { + return fmt.Errorf("too many metric names in a single request; got %d; mustn't exceed %d", metricsCount, maxMetricNamesPerRequest) + } + mrs := make([]storage.MetricRow, metricsCount) + for i := 0; i < int(metricsCount); i++ { + if err := ctx.readDataBufBytes(maxMetricNameRawSize); err != nil { + return fmt.Errorf("cannot read metricNameRaw: %w", err) + } + mr := &mrs[i] + mr.MetricNameRaw = append(mr.MetricNameRaw[:0], ctx.dataBuf...) + n, err := ctx.readUint64() + if err != nil { + return fmt.Errorf("cannot read timestamp: %w", err) + } + mr.Timestamp = int64(n) + } + + // Register metric names from mrs. + if err := s.api.RegisterMetricNames(ctx.qt, mrs); err != nil { + return ctx.writeErrorMessage(err) + } + + // Send an empty error message to vmselect. + if err := ctx.writeString(""); err != nil { + return fmt.Errorf("cannot send empty error message: %w", err) + } + return nil +} + +func (s *Server) processDeleteMetrics(ctx *vmselectRequestCtx) error { + s.deleteMetricsRequests.Inc() + + // Read request + if err := ctx.readSearchQuery(); err != nil { + return err + } + + // Execute the request. + tr := storage.TimeRange{ + MinTimestamp: 0, + MaxTimestamp: time.Now().UnixNano() / 1e6, + } + maxMetrics := s.getMaxMetrics(ctx) + tfss, err := s.setupTfss(ctx.qt, &ctx.sq, tr, maxMetrics, ctx.deadline) + if err != nil { + return ctx.writeErrorMessage(err) + } + deletedCount, err := s.api.DeleteMetrics(ctx.qt, tfss, maxMetrics, ctx.deadline) + if err != nil { + return ctx.writeErrorMessage(err) + } + + // Send an empty error message to vmselect. + if err := ctx.writeString(""); err != nil { + return fmt.Errorf("cannot send empty error message: %w", err) + } + // Send deletedCount to vmselect. + if err := ctx.writeUint64(uint64(deletedCount)); err != nil { + return fmt.Errorf("cannot send deletedCount=%d: %w", deletedCount, err) + } + return nil +} + +func (s *Server) processLabelNames(ctx *vmselectRequestCtx) error { + s.labelNamesRequests.Inc() + + // Read request + if err := ctx.readSearchQuery(); err != nil { + return err + } + maxLabelNames, err := ctx.readLimit() + if err != nil { + return fmt.Errorf("cannot read maxLabelNames: %w", err) + } + if maxLabelNames <= 0 || maxLabelNames > s.limits.MaxLabelNames { + maxLabelNames = s.limits.MaxLabelNames + } + + // Execute the request + tr := storage.TimeRange{ + MinTimestamp: ctx.sq.MinTimestamp, + MaxTimestamp: ctx.sq.MaxTimestamp, + } + maxMetrics := s.getMaxMetrics(ctx) + tfss, err := s.setupTfss(ctx.qt, &ctx.sq, tr, maxMetrics, ctx.deadline) + if err != nil { + return ctx.writeErrorMessage(err) + } + labelNames, err := s.api.LabelNames(ctx.qt, ctx.sq.AccountID, ctx.sq.ProjectID, tfss, tr, maxLabelNames, maxMetrics, ctx.deadline) + if err != nil { + return ctx.writeErrorMessage(err) + } + + // Send an empty error message to vmselect. + if err := ctx.writeString(""); err != nil { + return fmt.Errorf("cannot send empty error message: %w", err) + } + + // Send labelNames to vmselect + for _, labelName := range labelNames { + if err := ctx.writeString(labelName); err != nil { + return fmt.Errorf("cannot write label name %q: %w", labelName, err) + } + } + // Send 'end of response' marker + if err := ctx.writeString(""); err != nil { + return fmt.Errorf("cannot send 'end of response' marker") + } + return nil +} + +const maxLabelValueSize = 16 * 1024 + +func (s *Server) processLabelValues(ctx *vmselectRequestCtx) error { + s.labelValuesRequests.Inc() + + // Read request + if err := ctx.readDataBufBytes(maxLabelValueSize); err != nil { + return fmt.Errorf("cannot read labelName: %w", err) + } + labelName := string(ctx.dataBuf) + if err := ctx.readSearchQuery(); err != nil { + return err + } + maxLabelValues, err := ctx.readLimit() + if err != nil { + return fmt.Errorf("cannot read maxLabelValues: %w", err) + } + if maxLabelValues <= 0 || maxLabelValues > s.limits.MaxLabelValues { + maxLabelValues = s.limits.MaxLabelValues + } + + // Execute the request + tr := storage.TimeRange{ + MinTimestamp: ctx.sq.MinTimestamp, + MaxTimestamp: ctx.sq.MaxTimestamp, + } + maxMetrics := s.getMaxMetrics(ctx) + tfss, err := s.setupTfss(ctx.qt, &ctx.sq, tr, maxMetrics, ctx.deadline) + if err != nil { + return ctx.writeErrorMessage(err) + } + labelValues, err := s.api.LabelValues(ctx.qt, ctx.sq.AccountID, ctx.sq.ProjectID, tfss, tr, labelName, maxLabelValues, maxMetrics, ctx.deadline) + if err != nil { + return ctx.writeErrorMessage(err) + } + + // Send an empty error message to vmselect. + if err := ctx.writeString(""); err != nil { + return fmt.Errorf("cannot send empty error message: %w", err) + } + + // Send labelValues to vmselect + for _, labelValue := range labelValues { + if len(labelValue) == 0 { + // Skip empty label values, since they have no sense for prometheus. + continue + } + if err := ctx.writeString(labelValue); err != nil { + return fmt.Errorf("cannot write labelValue %q: %w", labelValue, err) + } + } + // Send 'end of label values' marker + if err := ctx.writeString(""); err != nil { + return fmt.Errorf("cannot send 'end of response' marker") + } + return nil +} + +func (s *Server) processTagValueSuffixes(ctx *vmselectRequestCtx) error { + s.tagValueSuffixesRequests.Inc() + + // read request + accountID, projectID, err := ctx.readAccountIDProjectID() + if err != nil { + return err + } + tr, err := ctx.readTimeRange() + if err != nil { + return err + } + if err := ctx.readDataBufBytes(maxLabelValueSize); err != nil { + return fmt.Errorf("cannot read tagKey: %w", err) + } + tagKey := append([]byte{}, ctx.dataBuf...) + if err := ctx.readDataBufBytes(maxLabelValueSize); err != nil { + return fmt.Errorf("cannot read tagValuePrefix: %w", err) + } + tagValuePrefix := append([]byte{}, ctx.dataBuf...) + delimiter, err := ctx.readByte() + if err != nil { + return fmt.Errorf("cannot read delimiter: %w", err) + } + + // Execute the request + suffixes, err := s.api.TagValueSuffixes(ctx.qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, s.limits.MaxTagValueSuffixes, ctx.deadline) + if err != nil { + return ctx.writeErrorMessage(err) + } + + if len(suffixes) >= s.limits.MaxTagValueSuffixes { + err := fmt.Errorf("more than %d tag value suffixes found "+ + "for tagKey=%q, tagValuePrefix=%q, delimiter=%c on time range %s; "+ + "either narrow down the query or increase -search.max* command-line flag value; see https://docs.victoriametrics.com/#resource-usage-limits", + s.limits.MaxTagValueSuffixes, tagKey, tagValuePrefix, delimiter, tr.String()) + return ctx.writeErrorMessage(err) + } + + // Send an empty error message to vmselect. + if err := ctx.writeString(""); err != nil { + return fmt.Errorf("cannot send empty error message: %w", err) + } + + // Send suffixes to vmselect. + // Suffixes may contain empty string, so prepend suffixes with suffixCount. + if err := ctx.writeUint64(uint64(len(suffixes))); err != nil { + return fmt.Errorf("cannot write suffixesCount: %w", err) + } + for i, suffix := range suffixes { + if err := ctx.writeString(suffix); err != nil { + return fmt.Errorf("cannot write suffix #%d: %w", i+1, err) + } + } + return nil +} + +func (s *Server) processSeriesCount(ctx *vmselectRequestCtx) error { + s.seriesCountRequests.Inc() + + // Read request + accountID, projectID, err := ctx.readAccountIDProjectID() + if err != nil { + return err + } + + // Execute the request + n, err := s.api.SeriesCount(ctx.qt, accountID, projectID, ctx.deadline) + if err != nil { + return ctx.writeErrorMessage(err) + } + + // Send an empty error message to vmselect. + if err := ctx.writeString(""); err != nil { + return fmt.Errorf("cannot send empty error message: %w", err) + } + + // Send series count to vmselect. + if err := ctx.writeUint64(n); err != nil { + return fmt.Errorf("cannot write series count to vmselect: %w", err) + } + return nil +} + +func (s *Server) processTSDBStatus(ctx *vmselectRequestCtx) error { + s.tsdbStatusRequests.Inc() + + // Read request + if err := ctx.readSearchQuery(); err != nil { + return err + } + if err := ctx.readDataBufBytes(maxLabelValueSize); err != nil { + return fmt.Errorf("cannot read focusLabel: %w", err) + } + focusLabel := string(ctx.dataBuf) + topN, err := ctx.readUint32() + if err != nil { + return fmt.Errorf("cannot read topN: %w", err) + } + + // Execute the request + tr := storage.TimeRange{ + MinTimestamp: ctx.sq.MinTimestamp, + MaxTimestamp: ctx.sq.MaxTimestamp, + } + maxMetrics := s.getMaxMetrics(ctx) + tfss, err := s.setupTfss(ctx.qt, &ctx.sq, tr, maxMetrics, ctx.deadline) + if err != nil { + return ctx.writeErrorMessage(err) + } + date := uint64(ctx.sq.MinTimestamp) / (24 * 3600 * 1000) + status, err := s.api.TSDBStatus(ctx.qt, ctx.sq.AccountID, ctx.sq.ProjectID, tfss, date, focusLabel, int(topN), maxMetrics, ctx.deadline) + if err != nil { + return ctx.writeErrorMessage(err) + } + + // Send an empty error message to vmselect. + if err := ctx.writeString(""); err != nil { + return fmt.Errorf("cannot send empty error message: %w", err) + } + + // Send status to vmselect. + return writeTSDBStatus(ctx, status) +} + +func writeTSDBStatus(ctx *vmselectRequestCtx, status *storage.TSDBStatus) error { + if err := ctx.writeUint64(status.TotalSeries); err != nil { + return fmt.Errorf("cannot write totalSeries to vmselect: %w", err) + } + if err := ctx.writeUint64(status.TotalLabelValuePairs); err != nil { + return fmt.Errorf("cannot write totalLabelValuePairs to vmselect: %w", err) + } + if err := writeTopHeapEntries(ctx, status.SeriesCountByMetricName); err != nil { + return fmt.Errorf("cannot write seriesCountByMetricName to vmselect: %w", err) + } + if err := writeTopHeapEntries(ctx, status.SeriesCountByLabelName); err != nil { + return fmt.Errorf("cannot write seriesCountByLabelName to vmselect: %w", err) + } + if err := writeTopHeapEntries(ctx, status.SeriesCountByFocusLabelValue); err != nil { + return fmt.Errorf("cannot write seriesCountByFocusLabelValue to vmselect: %w", err) + } + if err := writeTopHeapEntries(ctx, status.SeriesCountByLabelValuePair); err != nil { + return fmt.Errorf("cannot write seriesCountByLabelValuePair to vmselect: %w", err) + } + if err := writeTopHeapEntries(ctx, status.LabelValueCountByLabelName); err != nil { + return fmt.Errorf("cannot write labelValueCountByLabelName to vmselect: %w", err) + } + return nil +} + +func writeTopHeapEntries(ctx *vmselectRequestCtx, a []storage.TopHeapEntry) error { + if err := ctx.writeUint64(uint64(len(a))); err != nil { + return fmt.Errorf("cannot write topHeapEntries size: %w", err) + } + for _, e := range a { + if err := ctx.writeString(e.Name); err != nil { + return fmt.Errorf("cannot write topHeapEntry name: %w", err) + } + if err := ctx.writeUint64(e.Count); err != nil { + return fmt.Errorf("cannot write topHeapEntry count: %w", err) + } + } + return nil +} + +func (s *Server) processSearchMetricNames(ctx *vmselectRequestCtx) error { + s.searchMetricNamesRequests.Inc() + + // Read request. + if err := ctx.readSearchQuery(); err != nil { + return err + } + + // Execute request. + tr := storage.TimeRange{ + MinTimestamp: ctx.sq.MinTimestamp, + MaxTimestamp: ctx.sq.MaxTimestamp, + } + maxMetrics := s.getMaxMetrics(ctx) + tfss, err := s.setupTfss(ctx.qt, &ctx.sq, tr, maxMetrics, ctx.deadline) + if err != nil { + return ctx.writeErrorMessage(err) + } + mns, err := s.api.SearchMetricNames(ctx.qt, tfss, tr, maxMetrics, ctx.deadline) + if err != nil { + return ctx.writeErrorMessage(err) + } + + // Send empty error message to vmselect. + if err := ctx.writeString(""); err != nil { + return fmt.Errorf("cannot send empty error message: %w", err) + } + + // Send response. + metricNamesCount := len(mns) + if err := ctx.writeUint64(uint64(metricNamesCount)); err != nil { + return fmt.Errorf("cannot send metricNamesCount: %w", err) + } + for i, mn := range mns { + ctx.dataBuf = mn.Marshal(ctx.dataBuf[:0]) + if err := ctx.writeDataBufBytes(); err != nil { + return fmt.Errorf("cannot send metricName #%d: %w", i+1, err) + } + } + ctx.qt.Printf("sent %d series to vmselect", len(mns)) + return nil +} + +func (s *Server) processSeriesSearch(ctx *vmselectRequestCtx) error { + s.searchRequests.Inc() + + // Read request. + if err := ctx.readSearchQuery(); err != nil { + return err + } + fetchData, err := ctx.readBool() + if err != nil { + return fmt.Errorf("cannot read `fetchData` bool: %w", err) + } + + // Initiaialize the search. + startTime := time.Now() + tr := storage.TimeRange{ + MinTimestamp: ctx.sq.MinTimestamp, + MaxTimestamp: ctx.sq.MaxTimestamp, + } + maxMetrics := s.getMaxMetrics(ctx) + tfss, err := s.setupTfss(ctx.qt, &ctx.sq, tr, maxMetrics, ctx.deadline) + if err != nil { + return ctx.writeErrorMessage(err) + } + bi, err := s.api.InitSearch(ctx.qt, tfss, tr, maxMetrics, ctx.deadline) + if err != nil { + return ctx.writeErrorMessage(err) + } + s.indexSearchDuration.UpdateDuration(startTime) + defer bi.MustClose() + + // Send empty error message to vmselect. + if err := ctx.writeString(""); err != nil { + return fmt.Errorf("cannot send empty error message: %w", err) + } + + // Send found blocks to vmselect. + blocksRead := 0 + for bi.NextBlock(&ctx.mb, fetchData) { + blocksRead++ + s.metricBlocksRead.Inc() + s.metricRowsRead.Add(ctx.mb.Block.RowsCount()) + + ctx.dataBuf = ctx.mb.Marshal(ctx.dataBuf[:0]) + if err := ctx.writeDataBufBytes(); err != nil { + return fmt.Errorf("cannot send MetricBlock: %w", err) + } + } + if err := bi.Error(); err != nil { + return fmt.Errorf("search error: %w", err) + } + ctx.qt.Printf("sent %d blocks to vmselect", blocksRead) + + // Send 'end of response' marker + if err := ctx.writeString(""); err != nil { + return fmt.Errorf("cannot send 'end of response' marker") + } + return nil +} + +func (s *Server) getMaxMetrics(ctx *vmselectRequestCtx) int { + maxMetrics := ctx.sq.MaxMetrics + maxMetricsLimit := s.limits.MaxMetrics + if maxMetricsLimit <= 0 { + maxMetricsLimit = 2e9 + } + if maxMetrics <= 0 || maxMetrics > maxMetricsLimit { + maxMetrics = maxMetricsLimit + } + return maxMetrics +} + +func (s *Server) setupTfss(qt *querytracer.Tracer, sq *storage.SearchQuery, tr storage.TimeRange, maxMetrics int, deadline uint64) ([]*storage.TagFilters, error) { + tfss := make([]*storage.TagFilters, 0, len(sq.TagFilterss)) + accountID := sq.AccountID + projectID := sq.ProjectID + for _, tagFilters := range sq.TagFilterss { + tfs := storage.NewTagFilters(accountID, projectID) + for i := range tagFilters { + tf := &tagFilters[i] + if string(tf.Key) == "__graphite__" { + query := tf.Value + qtChild := qt.NewChild("searching for series matching __graphite__=%q", query) + paths, err := s.api.SearchGraphitePaths(qtChild, accountID, projectID, tr, query, maxMetrics, deadline) + qtChild.Donef("found %d series", len(paths)) + if err != nil { + return nil, fmt.Errorf("error when searching for Graphite paths for query %q: %w", query, err) + } + if len(paths) >= maxMetrics { + return nil, fmt.Errorf("more than %d time series match Graphite query %q; "+ + "either narrow down the query or increase the corresponding -search.max* command-line flag value at vmselect nodes", maxMetrics, query) + } + tfs.AddGraphiteQuery(query, paths, tf.IsNegative) + continue + } + if err := tfs.Add(tf.Key, tf.Value, tf.IsNegative, tf.IsRegexp); err != nil { + return nil, fmt.Errorf("cannot parse tag filter %s: %w", tf, err) + } + } + tfss = append(tfss, tfs) + } + return tfss, nil +}