VictoriaMetrics/app/vmselect/clusternative/vmselect.go
Aliaksandr Valialkin 0dca3c4025
app/{vmselect,vmstorage}: return compression of the data passed from vmstorage to vmselect
This reverts cd4f641d32 , since it has been appeared that the disabled compression
for vmstorage->vmselect data increase network bandwidth usage by more than 10x on typical production workloads,
while it decreases CPU usage at vmstorage by up to 10% and improves query latency by up to 10%.

The 10x increase in network usage is too high price for 10% improvements on query latency and vmstorage CPU usage.
This may result in network bandwidth bottlenecks, which can reduce the overall performance and stability
of VictoriaMetrics cluster. That's why return back the vmstorage->vmselect data compression by default.

The vmstorage->vmselect compression can be disabled by passing -rpc.disableCompression command-line flag to vmstorage.
The vmselect->vmselect compression in multi-level cluster setup can be disabled by passing -clusternative.disableCompression command-line flag.
2024-01-24 13:37:05 +02:00

171 lines
7.3 KiB
Go

package clusternative
import (
"flag"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vmselectapi"
)
var (
maxTagKeys = flag.Int("clusternative.maxTagKeys", 100e3, "The maximum number of tag keys returned per search at -clusternativeListenAddr")
maxTagValues = flag.Int("clusternative.maxTagValues", 100e3, "The maximum number of tag values returned per search at -clusternativeListenAddr")
maxTagValueSuffixesPerSearch = flag.Int("clusternative.maxTagValueSuffixesPerSearch", 100e3, "The maximum number of tag value suffixes returned "+
"from /metrics/find at -clusternativeListenAddr")
maxConcurrentRequests = flag.Int("clusternative.maxConcurrentRequests", 2*cgroup.AvailableCPUs(), "The maximum number of concurrent vmselect requests "+
"the server can process at -clusternativeListenAddr. It shouldn't be high, since a single request usually saturates a CPU core at the underlying vmstorage nodes, "+
"and many concurrently executed requests may require high amounts of memory. See also -clusternative.maxQueueDuration")
maxQueueDuration = flag.Duration("clusternative.maxQueueDuration", 10*time.Second, "The maximum time the incoming query to -clusternativeListenAddr waits for execution "+
"when -clusternative.maxConcurrentRequests limit is reached")
disableRPCCompression = flag.Bool("clusternative.disableCompression", false, "Whether to disable compression of the data sent to vmselect via -clusternativeListenAddr. "+
"This reduces CPU usage at the cost of higher network bandwidth usage")
)
// NewVMSelectServer starts new server at the given addr, which serves vmselect requests from netstorage.
func NewVMSelectServer(addr string) (*vmselectapi.Server, error) {
api := &vmstorageAPI{}
limits := vmselectapi.Limits{
MaxLabelNames: *maxTagKeys,
MaxLabelValues: *maxTagValues,
MaxTagValueSuffixes: *maxTagValueSuffixesPerSearch,
MaxConcurrentRequests: *maxConcurrentRequests,
MaxConcurrentRequestsFlagName: "clusternative.maxConcurrentRequests",
MaxQueueDuration: *maxQueueDuration,
MaxQueueDurationFlagName: "clusternative.maxQueueDuration",
}
return vmselectapi.NewServer(addr, api, limits, *disableRPCCompression)
}
// vmstorageAPI impelements vmselectapi.API
type vmstorageAPI struct{}
func (api *vmstorageAPI) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (vmselectapi.BlockIterator, error) {
denyPartialResponse := httputils.GetDenyPartialResponse(nil)
dl := searchutils.DeadlineFromTimestamp(deadline)
bi := newBlockIterator(qt, denyPartialResponse, sq, dl)
return bi, nil
}
func (api *vmstorageAPI) Tenants(qt *querytracer.Tracer, tr storage.TimeRange, deadline uint64) ([]string, error) {
dl := searchutils.DeadlineFromTimestamp(deadline)
return netstorage.Tenants(qt, tr, dl)
}
func (api *vmstorageAPI) SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) ([]string, error) {
denyPartialResponse := httputils.GetDenyPartialResponse(nil)
dl := searchutils.DeadlineFromTimestamp(deadline)
metricNames, _, err := netstorage.SearchMetricNames(qt, denyPartialResponse, sq, dl)
return metricNames, err
}
func (api *vmstorageAPI) LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuery, labelName string, maxLabelValues int, deadline uint64) ([]string, error) {
denyPartialResponse := httputils.GetDenyPartialResponse(nil)
dl := searchutils.DeadlineFromTimestamp(deadline)
labelValues, _, err := netstorage.LabelValues(qt, denyPartialResponse, labelName, sq, maxLabelValues, dl)
return labelValues, err
}
func (api *vmstorageAPI) TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte,
maxSuffixes int, deadline uint64) ([]string, error) {
denyPartialResponse := httputils.GetDenyPartialResponse(nil)
dl := searchutils.DeadlineFromTimestamp(deadline)
suffixes, _, err := netstorage.TagValueSuffixes(qt, accountID, projectID, denyPartialResponse, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, dl)
return suffixes, err
}
func (api *vmstorageAPI) LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLabelNames int, deadline uint64) ([]string, error) {
denyPartialResponse := httputils.GetDenyPartialResponse(nil)
dl := searchutils.DeadlineFromTimestamp(deadline)
labelNames, _, err := netstorage.LabelNames(qt, denyPartialResponse, sq, maxLabelNames, dl)
return labelNames, err
}
func (api *vmstorageAPI) SeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, deadline uint64) (uint64, error) {
denyPartialResponse := httputils.GetDenyPartialResponse(nil)
dl := searchutils.DeadlineFromTimestamp(deadline)
seriesCount, _, err := netstorage.SeriesCount(qt, accountID, projectID, denyPartialResponse, dl)
return seriesCount, err
}
func (api *vmstorageAPI) TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery, focusLabel string, topN int, deadline uint64) (*storage.TSDBStatus, error) {
denyPartialResponse := httputils.GetDenyPartialResponse(nil)
dl := searchutils.DeadlineFromTimestamp(deadline)
tsdbStatus, _, err := netstorage.TSDBStatus(qt, denyPartialResponse, sq, focusLabel, topN, dl)
return tsdbStatus, err
}
func (api *vmstorageAPI) DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (int, error) {
dl := searchutils.DeadlineFromTimestamp(deadline)
return netstorage.DeleteSeries(qt, sq, dl)
}
func (api *vmstorageAPI) RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, deadline uint64) error {
dl := searchutils.DeadlineFromTimestamp(deadline)
return netstorage.RegisterMetricNames(qt, mrs, dl)
}
// blockIterator implements vmselectapi.BlockIterator
type blockIterator struct {
workCh chan workItem
wg sync.WaitGroup
err error
}
type workItem struct {
mb *storage.MetricBlock
doneCh chan struct{}
}
func newBlockIterator(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.SearchQuery, deadline searchutils.Deadline) *blockIterator {
var bi blockIterator
bi.workCh = make(chan workItem, 16)
bi.wg.Add(1)
go func() {
_, err := netstorage.ProcessBlocks(qt, denyPartialResponse, sq, func(mb *storage.MetricBlock, workerID uint) error {
wi := workItem{
mb: mb,
doneCh: make(chan struct{}),
}
bi.workCh <- wi
<-wi.doneCh
return nil
}, deadline)
close(bi.workCh)
bi.err = err
bi.wg.Done()
}()
return &bi
}
func (bi *blockIterator) NextBlock(mb *storage.MetricBlock) bool {
wi, ok := <-bi.workCh
if !ok {
return false
}
mb.CopyFrom(wi.mb)
wi.doneCh <- struct{}{}
return true
}
func (bi *blockIterator) Error() error {
bi.wg.Wait()
return bi.err
}
func (bi *blockIterator) MustClose() {
var mb storage.MetricBlock
for bi.NextBlock(&mb) {
// Drain pending blocks before exit in order to free up
// the goroutine started at newBlockIterator
}
// Wait until the goroutine from newBlockIterator is finished.
bi.wg.Wait()
}