mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
427ce69426
While at it, move app/vmselect/bufferedwriter to lib/bufferedwriter, since it is going to be used in VictoriaLogs
171 lines
7.3 KiB
Go
171 lines
7.3 KiB
Go
package clusternative
|
|
|
|
import (
|
|
"flag"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vmselectapi"
|
|
)
|
|
|
|
var (
|
|
maxTagKeys = flag.Int("clusternative.maxTagKeys", 100e3, "The maximum number of tag keys returned per search at -clusternativeListenAddr")
|
|
maxTagValues = flag.Int("clusternative.maxTagValues", 100e3, "The maximum number of tag values returned per search at -clusternativeListenAddr")
|
|
maxTagValueSuffixesPerSearch = flag.Int("clusternative.maxTagValueSuffixesPerSearch", 100e3, "The maximum number of tag value suffixes returned "+
|
|
"from /metrics/find at -clusternativeListenAddr")
|
|
maxConcurrentRequests = flag.Int("clusternative.maxConcurrentRequests", 2*cgroup.AvailableCPUs(), "The maximum number of concurrent vmselect requests "+
|
|
"the server can process at -clusternativeListenAddr. It shouldn't be high, since a single request usually saturates a CPU core at the underlying vmstorage nodes, "+
|
|
"and many concurrently executed requests may require high amounts of memory. See also -clusternative.maxQueueDuration")
|
|
maxQueueDuration = flag.Duration("clusternative.maxQueueDuration", 10*time.Second, "The maximum time the incoming query to -clusternativeListenAddr waits for execution "+
|
|
"when -clusternative.maxConcurrentRequests limit is reached")
|
|
|
|
disableRPCCompression = flag.Bool(`clusternative.disableCompression`, false, "Whether to disable compression of the data sent to vmselect via -clusternativeListenAddr. "+
|
|
"This reduces CPU usage at the cost of higher network bandwidth usage")
|
|
)
|
|
|
|
// NewVMSelectServer starts new server at the given addr, which serves vmselect requests from netstorage.
|
|
func NewVMSelectServer(addr string) (*vmselectapi.Server, error) {
|
|
api := &vmstorageAPI{}
|
|
limits := vmselectapi.Limits{
|
|
MaxLabelNames: *maxTagKeys,
|
|
MaxLabelValues: *maxTagValues,
|
|
MaxTagValueSuffixes: *maxTagValueSuffixesPerSearch,
|
|
MaxConcurrentRequests: *maxConcurrentRequests,
|
|
MaxConcurrentRequestsFlagName: "clusternative.maxConcurrentRequests",
|
|
MaxQueueDuration: *maxQueueDuration,
|
|
MaxQueueDurationFlagName: "clusternative.maxQueueDuration",
|
|
}
|
|
return vmselectapi.NewServer(addr, api, limits, *disableRPCCompression)
|
|
}
|
|
|
|
// vmstorageAPI impelements vmselectapi.API
|
|
type vmstorageAPI struct{}
|
|
|
|
func (api *vmstorageAPI) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (vmselectapi.BlockIterator, error) {
|
|
denyPartialResponse := httputils.GetDenyPartialResponse(nil)
|
|
dl := searchutils.DeadlineFromTimestamp(deadline)
|
|
bi := newBlockIterator(qt, denyPartialResponse, sq, dl)
|
|
return bi, nil
|
|
}
|
|
|
|
func (api *vmstorageAPI) Tenants(qt *querytracer.Tracer, tr storage.TimeRange, deadline uint64) ([]string, error) {
|
|
dl := searchutils.DeadlineFromTimestamp(deadline)
|
|
return netstorage.Tenants(qt, tr, dl)
|
|
}
|
|
|
|
func (api *vmstorageAPI) SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) ([]string, error) {
|
|
denyPartialResponse := httputils.GetDenyPartialResponse(nil)
|
|
dl := searchutils.DeadlineFromTimestamp(deadline)
|
|
metricNames, _, err := netstorage.SearchMetricNames(qt, denyPartialResponse, sq, dl)
|
|
return metricNames, err
|
|
}
|
|
|
|
func (api *vmstorageAPI) LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuery, labelName string, maxLabelValues int, deadline uint64) ([]string, error) {
|
|
denyPartialResponse := httputils.GetDenyPartialResponse(nil)
|
|
dl := searchutils.DeadlineFromTimestamp(deadline)
|
|
labelValues, _, err := netstorage.LabelValues(qt, denyPartialResponse, labelName, sq, maxLabelValues, dl)
|
|
return labelValues, err
|
|
}
|
|
|
|
func (api *vmstorageAPI) TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte,
|
|
maxSuffixes int, deadline uint64) ([]string, error) {
|
|
denyPartialResponse := httputils.GetDenyPartialResponse(nil)
|
|
dl := searchutils.DeadlineFromTimestamp(deadline)
|
|
suffixes, _, err := netstorage.TagValueSuffixes(qt, accountID, projectID, denyPartialResponse, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, dl)
|
|
return suffixes, err
|
|
}
|
|
|
|
func (api *vmstorageAPI) LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLabelNames int, deadline uint64) ([]string, error) {
|
|
denyPartialResponse := httputils.GetDenyPartialResponse(nil)
|
|
dl := searchutils.DeadlineFromTimestamp(deadline)
|
|
labelNames, _, err := netstorage.LabelNames(qt, denyPartialResponse, sq, maxLabelNames, dl)
|
|
return labelNames, err
|
|
}
|
|
|
|
func (api *vmstorageAPI) SeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, deadline uint64) (uint64, error) {
|
|
denyPartialResponse := httputils.GetDenyPartialResponse(nil)
|
|
dl := searchutils.DeadlineFromTimestamp(deadline)
|
|
seriesCount, _, err := netstorage.SeriesCount(qt, accountID, projectID, denyPartialResponse, dl)
|
|
return seriesCount, err
|
|
}
|
|
|
|
func (api *vmstorageAPI) TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery, focusLabel string, topN int, deadline uint64) (*storage.TSDBStatus, error) {
|
|
denyPartialResponse := httputils.GetDenyPartialResponse(nil)
|
|
dl := searchutils.DeadlineFromTimestamp(deadline)
|
|
tsdbStatus, _, err := netstorage.TSDBStatus(qt, denyPartialResponse, sq, focusLabel, topN, dl)
|
|
return tsdbStatus, err
|
|
}
|
|
|
|
func (api *vmstorageAPI) DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (int, error) {
|
|
dl := searchutils.DeadlineFromTimestamp(deadline)
|
|
return netstorage.DeleteSeries(qt, sq, dl)
|
|
}
|
|
|
|
func (api *vmstorageAPI) RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, deadline uint64) error {
|
|
dl := searchutils.DeadlineFromTimestamp(deadline)
|
|
return netstorage.RegisterMetricNames(qt, mrs, dl)
|
|
}
|
|
|
|
// blockIterator implements vmselectapi.BlockIterator
|
|
type blockIterator struct {
|
|
workCh chan workItem
|
|
wg sync.WaitGroup
|
|
err error
|
|
}
|
|
|
|
type workItem struct {
|
|
mb *storage.MetricBlock
|
|
doneCh chan struct{}
|
|
}
|
|
|
|
func newBlockIterator(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.SearchQuery, deadline searchutils.Deadline) *blockIterator {
|
|
var bi blockIterator
|
|
bi.workCh = make(chan workItem, 16)
|
|
bi.wg.Add(1)
|
|
go func() {
|
|
_, err := netstorage.ProcessBlocks(qt, denyPartialResponse, sq, func(mb *storage.MetricBlock, workerID uint) error {
|
|
wi := workItem{
|
|
mb: mb,
|
|
doneCh: make(chan struct{}),
|
|
}
|
|
bi.workCh <- wi
|
|
<-wi.doneCh
|
|
return nil
|
|
}, deadline)
|
|
close(bi.workCh)
|
|
bi.err = err
|
|
bi.wg.Done()
|
|
}()
|
|
return &bi
|
|
}
|
|
|
|
func (bi *blockIterator) NextBlock(mb *storage.MetricBlock) bool {
|
|
wi, ok := <-bi.workCh
|
|
if !ok {
|
|
return false
|
|
}
|
|
mb.CopyFrom(wi.mb)
|
|
wi.doneCh <- struct{}{}
|
|
return true
|
|
}
|
|
|
|
func (bi *blockIterator) Error() error {
|
|
bi.wg.Wait()
|
|
return bi.err
|
|
}
|
|
|
|
func (bi *blockIterator) MustClose() {
|
|
var mb storage.MetricBlock
|
|
for bi.NextBlock(&mb) {
|
|
// Drain pending blocks before exit in order to free up
|
|
// the goroutine started at newBlockIterator
|
|
}
|
|
// Wait until the goroutine from newBlockIterator is finished.
|
|
bi.wg.Wait()
|
|
}
|