app/vmselect: add ability to query vmselect from another vmselect

This commit is contained in:
Aliaksandr Valialkin 2022-07-06 13:19:45 +03:00
parent ea4ab0df36
commit 195dccf678
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
8 changed files with 212 additions and 11 deletions

View file

@ -366,12 +366,14 @@ error rates comparing the network inside a single AZ.
If you need multi-AZ setup, then it is recommended running independed clusters in each AZ and setting up
[vmagent](https://docs.victoriametrics.com/vmagent.html) in front of these clusters, so it could replicate incoming data
into all the cluster - see [these docs](https://docs.victoriametrics.com/vmagent.html#multitenancy) for details.
Then [promxy](https://github.com/jacksontj/promxy) could be used for querying the data from multiple clusters.
Then an additional `vmselect` nodes can be configured for reading the data from multiple clusters according to [these docs](#multi-level-cluster-setup).
## Multi-level cluster setup
`vminsert` nodes can accept data from another `vminsert` nodes starting from [v1.60.0](https://docs.victoriametrics.com/CHANGELOG.html#v1600) if `-clusternativeListenAddr` command-line flag is set. For example, if `vminsert` is started with `-clusternativeListenAddr=:8400` command-line flag, then it can accept data from another `vminsert` nodes at TCP port 8400 in the same way as `vmstorage` nodes do. This allows chaining `vminsert` nodes and building multi-level cluster topologies with flexible configs. For example, the top level of `vminsert` nodes can replicate data among the second level of `vminsert` nodes located in distinct availability zones (AZ), while the second-level `vminsert` nodes can spread the data among `vmstorage` nodes located in the same AZ. Such setup guarantees cluster availability if some AZ becomes unavailable. The data from all the `vmstorage` nodes in all the AZs can be read via `vmselect` nodes, which are configured to query all the `vmstorage` nodes in all the availability zones (e.g. all the `vmstorage` addresses are passed via `-storageNode` command-line flag to `vmselect` nodes).
`vmselect` nodes can be queried by other `vmselect` nodes if they run with `-clusternativeListenAddr` command-line flag. For example, if `vmselect` is started with `-clusternativeListenAddr=:8401`, then it can accept queries from another `vmselect` nodes at TCP port 8401 in the same way as `vmstorage` nodes do. This allows chaining `vmselect` nodes and building multi-level cluster topologies. For example, the top-level `vmselect` node can query second-level `vmselect` nodes in different availability zones (AZ), while the second-level `vmselect` nodes can query `vmstorage` nodes in local AZ.
`vminsert` nodes can accept data from another `vminsert` nodes if they run with `-clusternativeListenAddr` command-line flag. For example, if `vminsert` is started with `-clusternativeListenAddr=:8400`, then it can accept data from another `vminsert` nodes at TCP port 8400 in the same way as `vmstorage` nodes do. This allows chaining `vminsert` nodes and building multi-level cluster topologies. For example, the top-level `vminsert` node can replicate data among the second level of `vminsert` nodes located in distinct availability zones (AZ), while the second-level `vminsert` nodes can spread the data among `vmstorage` nodes in local AZ.
The multi-level cluster setup for `vminsert` nodes has the following shortcomings because of synchronous replication and data sharding:
@ -380,6 +382,7 @@ The multi-level cluster setup for `vminsert` nodes has the following shortcoming
These issues are addressed by [vmagent](https://docs.victoriametrics.com/vmagent.html) when it runs in [multitenancy mode](https://docs.victoriametrics.com/vmagent.html#multitenancy). `vmagent` buffers data, which must be sent to a particular AZ, when this AZ is temporarily unavailable. The buffer is stored on disk. The buffered data is sent to AZ as soon as it becomes available.
## Helm
Helm chart simplifies managing cluster version of VictoriaMetrics in Kubernetes.

View file

@ -0,0 +1,154 @@
package clusternative
import (
"flag"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vmselectapi"
)
var (
maxTagKeys = flag.Int("clusternative.maxTagKeys", 100e3, "The maximum number of tag keys returned per search at -clusternativeListenAddr")
maxTagValues = flag.Int("clusternative.maxTagValues", 100e3, "The maximum number of tag values returned per search at -clusternativeListenAddr")
maxTagValueSuffixesPerSearch = flag.Int("clusternative.maxTagValueSuffixesPerSearch", 100e3, "The maximum number of tag value suffixes returned "+
"from /metrics/find at -clusternativeListenAddr")
disableRPCCompression = flag.Bool(`clusternative.disableCompression`, false, "Whether to disable compression of the data sent to vmselect via -clusternativeListenAddr. "+
"This reduces CPU usage at the cost of higher network bandwidth usage")
)
// NewVMSelectServer starts new server at the given addr, which serves vmselect requests from netstorage.
func NewVMSelectServer(addr string) (*vmselectapi.Server, error) {
api := &vmstorageAPI{}
limits := vmselectapi.Limits{
MaxLabelNames: *maxTagKeys,
MaxLabelValues: *maxTagValues,
MaxTagValueSuffixes: *maxTagValueSuffixesPerSearch,
}
return vmselectapi.NewServer(addr, api, limits, *disableRPCCompression)
}
// vmstorageAPI impelements vmselectapi.API
type vmstorageAPI struct{}
func (api *vmstorageAPI) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (vmselectapi.BlockIterator, error) {
denyPartialResponse := searchutils.GetDenyPartialResponse(nil)
dl := searchutils.DeadlineFromTimestamp(deadline)
bi := newBlockIterator(qt, denyPartialResponse, sq, dl)
return bi, nil
}
func (api *vmstorageAPI) SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) ([]string, error) {
denyPartialResponse := searchutils.GetDenyPartialResponse(nil)
dl := searchutils.DeadlineFromTimestamp(deadline)
metricNames, _, err := netstorage.SearchMetricNames(qt, denyPartialResponse, sq, dl)
return metricNames, err
}
func (api *vmstorageAPI) LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuery, labelName string, maxLabelValues int, deadline uint64) ([]string, error) {
denyPartialResponse := searchutils.GetDenyPartialResponse(nil)
dl := searchutils.DeadlineFromTimestamp(deadline)
labelValues, _, err := netstorage.LabelValues(qt, denyPartialResponse, labelName, sq, maxLabelValues, dl)
return labelValues, err
}
func (api *vmstorageAPI) TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte,
maxSuffixes int, deadline uint64) ([]string, error) {
denyPartialResponse := searchutils.GetDenyPartialResponse(nil)
dl := searchutils.DeadlineFromTimestamp(deadline)
suffixes, _, err := netstorage.TagValueSuffixes(qt, accountID, projectID, denyPartialResponse, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, dl)
return suffixes, err
}
func (api *vmstorageAPI) LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLabelNames int, deadline uint64) ([]string, error) {
denyPartialResponse := searchutils.GetDenyPartialResponse(nil)
dl := searchutils.DeadlineFromTimestamp(deadline)
labelNames, _, err := netstorage.LabelNames(qt, denyPartialResponse, sq, maxLabelNames, dl)
return labelNames, err
}
func (api *vmstorageAPI) SeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, deadline uint64) (uint64, error) {
denyPartialResponse := searchutils.GetDenyPartialResponse(nil)
dl := searchutils.DeadlineFromTimestamp(deadline)
seriesCount, _, err := netstorage.SeriesCount(qt, accountID, projectID, denyPartialResponse, dl)
return seriesCount, err
}
func (api *vmstorageAPI) TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery, focusLabel string, topN int, deadline uint64) (*storage.TSDBStatus, error) {
denyPartialResponse := searchutils.GetDenyPartialResponse(nil)
dl := searchutils.DeadlineFromTimestamp(deadline)
tsdbStatus, _, err := netstorage.TSDBStatus(qt, denyPartialResponse, sq, focusLabel, topN, dl)
return tsdbStatus, err
}
func (api *vmstorageAPI) DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (int, error) {
dl := searchutils.DeadlineFromTimestamp(deadline)
return netstorage.DeleteSeries(qt, sq, dl)
}
func (api *vmstorageAPI) RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, deadline uint64) error {
dl := searchutils.DeadlineFromTimestamp(deadline)
return netstorage.RegisterMetricNames(qt, mrs, dl)
}
// blockIterator implements vmselectapi.BlockIterator
type blockIterator struct {
workCh chan workItem
wg sync.WaitGroup
err error
}
type workItem struct {
mb *storage.MetricBlock
doneCh chan struct{}
}
func newBlockIterator(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.SearchQuery, deadline searchutils.Deadline) *blockIterator {
var bi blockIterator
bi.workCh = make(chan workItem, 16)
bi.wg.Add(1)
go func() {
_, err := netstorage.ProcessBlocks(qt, denyPartialResponse, sq, func(mb *storage.MetricBlock) error {
wi := workItem{
mb: mb,
doneCh: make(chan struct{}),
}
bi.workCh <- wi
<-wi.doneCh
return nil
}, deadline)
close(bi.workCh)
bi.err = err
bi.wg.Done()
}()
return &bi
}
func (bi *blockIterator) NextBlock(mb *storage.MetricBlock) bool {
wi, ok := <-bi.workCh
if !ok {
return false
}
mb.CopyFrom(wi.mb)
wi.doneCh <- struct{}{}
return true
}
func (bi *blockIterator) Error() error {
bi.wg.Wait()
return bi.err
}
func (bi *blockIterator) MustClose() {
var mb storage.MetricBlock
for bi.NextBlock(&mb) {
// Drain pending blocks before exit in order to free up
// the goroutine started at newBlockIterator
}
// Wait until the goroutine from newBlockIterator is finished.
bi.wg.Wait()
}

View file

@ -12,6 +12,7 @@ import (
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/clusternative"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/prometheus"
@ -30,10 +31,13 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vmselectapi"
"github.com/VictoriaMetrics/metrics"
)
var (
clusternativeListenAddr = flag.String("clusternativeListenAddr", "", "TCP address to listen for requests from other vmselect nodes in multi-level cluster setup. "+
"See https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multi-level-cluster-setup . Usually :8401 must be set. Doesn't work if empty")
httpListenAddr = flag.String("httpListenAddr", ":8481", "Address to listen for http connections")
cacheDataPath = flag.String("cacheDataPath", "", "Path to directory for cache files. Cache isn't saved if empty")
maxConcurrentRequests = flag.Int("search.maxConcurrentRequests", getDefaultMaxConcurrentRequests(), "The maximum number of concurrent search requests. "+
@ -92,6 +96,16 @@ func main() {
}
concurrencyCh = make(chan struct{}, *maxConcurrentRequests)
initVMAlertProxy()
var vmselectapiServer *vmselectapi.Server
if *clusternativeListenAddr != "" {
logger.Infof("starting vmselectapi server at %q", *clusternativeListenAddr)
s, err := clusternative.NewVMSelectServer(*clusternativeListenAddr)
if err != nil {
logger.Fatalf("cannot initialize vmselectapi server: %s", err)
}
vmselectapiServer = s
logger.Infof("started vmselectapi server at %q", *clusternativeListenAddr)
}
go func() {
httpserver.Serve(*httpListenAddr, requestHandler)
@ -107,6 +121,12 @@ func main() {
}
logger.Infof("successfully shut down http service in %.3f seconds", time.Since(startTime).Seconds())
if vmselectapiServer != nil {
logger.Infof("stopping vmselectapi server...")
vmselectapiServer.MustStop()
logger.Infof("stopped vmselectapi server")
}
logger.Infof("shutting down neststorage...")
startTime = time.Now()
netstorage.Stop()

View file

@ -879,7 +879,7 @@ func TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, denyP
suffixes, err := sn.getTagValueSuffixes(qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, deadline)
if err != nil {
sn.tagValueSuffixesErrors.Inc()
err = fmt.Errorf("cannot get tag value suffixes for tr=%s, tagKey=%q, tagValuePrefix=%q, delimiter=%c from vmstorage %s: %w",
err = fmt.Errorf("cannot get tag value suffixes for timeRange=%s, tagKey=%q, tagValuePrefix=%q, delimiter=%c from vmstorage %s: %w",
tr.String(), tagKey, tagValuePrefix, delimiter, sn.connPool.Addr(), err)
}
return &nodeResult{
@ -1159,7 +1159,7 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear
atomic.AddUint64(&samples, uint64(mb.Block.RowsCount()))
return nil
}
_, err := processSearchQuery(qt, true, sq, processBlock, deadline)
_, err := ProcessBlocks(qt, true, sq, processBlock, deadline)
// Make sure processBlock isn't called anymore in order to prevent from data races.
atomic.StoreUint32(&stopped, 1)
@ -1265,7 +1265,7 @@ func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *st
}
return nil
}
isPartial, err := processSearchQuery(qt, denyPartialResponse, sq, processBlock, deadline)
isPartial, err := ProcessBlocks(qt, denyPartialResponse, sq, processBlock, deadline)
// Make sure processBlock isn't called anymore in order to protect from data races.
atomic.StoreUint32(&stopped, 1)
@ -1296,7 +1296,8 @@ func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *st
return &rss, isPartial, nil
}
func processSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.SearchQuery,
// ProcessBlocks calls processBlock per each block matching the given sq.
func ProcessBlocks(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.SearchQuery,
processBlock func(mb *storage.MetricBlock) error, deadline searchutils.Deadline) (bool, error) {
requestData := sq.Marshal(nil)
@ -2136,7 +2137,7 @@ func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requ
}
tail, err := mb.Unmarshal(buf)
if err != nil {
return fmt.Errorf("cannot unmarshal MetricBlock #%d: %w", blocksRead, err)
return fmt.Errorf("cannot unmarshal MetricBlock #%d from %d bytes: %w", blocksRead, len(buf), err)
}
if len(tail) != 0 {
return fmt.Errorf("non-empty tail after unmarshaling MetricBlock #%d: (len=%d) %q", blocksRead, len(tail), tail)

View file

@ -181,6 +181,9 @@ func GetDenyPartialResponse(r *http.Request) bool {
if *denyPartialResponse {
return true
}
if r == nil {
return false
}
return GetBool(r, "deny_partial_response")
}
@ -204,6 +207,13 @@ func NewDeadline(startTime time.Time, timeout time.Duration, flagHint string) De
}
}
// DeadlineFromTimestamp returns deadline from the given timestamp in seconds.
func DeadlineFromTimestamp(timestamp uint64) Deadline {
startTime := time.Now()
timeout := time.Unix(int64(timestamp), 0).Sub(startTime)
return NewDeadline(startTime, timeout, "")
}
// Exceeded returns true if deadline is exceeded.
func (d *Deadline) Exceeded() bool {
return fasttime.UnixTimestamp() > d.deadline
@ -218,7 +228,11 @@ func (d *Deadline) Deadline() uint64 {
func (d *Deadline) String() string {
startTime := time.Unix(int64(d.deadline), 0).Add(-d.timeout)
elapsed := time.Since(startTime)
return fmt.Sprintf("%.3f seconds (elapsed %.3f seconds); the timeout can be adjusted with `%s` command-line flag", d.timeout.Seconds(), elapsed.Seconds(), d.flagHint)
msg := fmt.Sprintf("%.3f seconds (elapsed %.3f seconds)", d.timeout.Seconds(), elapsed.Seconds())
if d.flagHint != "" {
msg += fmt.Sprintf("; the timeout can be adjusted with `%s` command-line flag", d.flagHint)
}
return msg
}
// GetExtraTagFilters returns additional label filters from request.

View file

@ -18,7 +18,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
**Update notes:** this release introduces backwards-incompatible changes to `vm_partial_results_total` metric by changing its labels to be consistent with `vm_requests_total` metric.
If you use alerting rules or Grafana dashboards, which rely on this metric, then they must be updated. The official dashboards for VictoriaMetrics don't use this metric.
* FEATURE: [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html): allow accessing [vmalert's](https://docs.victoriametrics.com/vmalert.html) UI when `-vmalert.proxyURL` command-line flag is set. See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#vmalert) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2825).
* FEATURE: [cluster version of VictoriaMetrics](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): add support for querying lower-level `vmselect` nodes from upper-level `vmselect` nodes. This makes possible to build multi-level cluster setups for global querying view and HA purposes without the need to use [Promxy](https://github.com/jacksontj/promxy). See [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multi-level-cluster-setup) for details.
* FEATURE: add `-search.setLookbackToStep` command-line flag, which enables InfluxDB-like gap filling during querying. See [these docs](https://docs.victoriametrics.com/guides/migrate-from-influx.html) for details.
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add an UI for [query tracing](https://docs.victoriametrics.com/#query-tracing). It can be enabled by clicking `enable query tracing` checkbox and re-running the query. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2703).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-remoteWrite.headers` command-line option for specifying optional HTTP headers to send to the configured `-remoteWrite.url`. For example, `-remoteWrite.headers='Foo:Bar^^Baz:x'` would send `Foo: Bar` and `Baz: x` HTTP headers with every request to `-remoteWrite.url`. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2805).

View file

@ -370,12 +370,14 @@ error rates comparing the network inside a single AZ.
If you need multi-AZ setup, then it is recommended running independed clusters in each AZ and setting up
[vmagent](https://docs.victoriametrics.com/vmagent.html) in front of these clusters, so it could replicate incoming data
into all the cluster - see [these docs](https://docs.victoriametrics.com/vmagent.html#multitenancy) for details.
Then [promxy](https://github.com/jacksontj/promxy) could be used for querying the data from multiple clusters.
Then an additional `vmselect` nodes can be configured for reading the data from multiple clusters according to [these docs](#multi-level-cluster-setup).
## Multi-level cluster setup
`vminsert` nodes can accept data from another `vminsert` nodes starting from [v1.60.0](https://docs.victoriametrics.com/CHANGELOG.html#v1600) if `-clusternativeListenAddr` command-line flag is set. For example, if `vminsert` is started with `-clusternativeListenAddr=:8400` command-line flag, then it can accept data from another `vminsert` nodes at TCP port 8400 in the same way as `vmstorage` nodes do. This allows chaining `vminsert` nodes and building multi-level cluster topologies with flexible configs. For example, the top level of `vminsert` nodes can replicate data among the second level of `vminsert` nodes located in distinct availability zones (AZ), while the second-level `vminsert` nodes can spread the data among `vmstorage` nodes located in the same AZ. Such setup guarantees cluster availability if some AZ becomes unavailable. The data from all the `vmstorage` nodes in all the AZs can be read via `vmselect` nodes, which are configured to query all the `vmstorage` nodes in all the availability zones (e.g. all the `vmstorage` addresses are passed via `-storageNode` command-line flag to `vmselect` nodes).
`vmselect` nodes can be queried by other `vmselect` nodes if they run with `-clusternativeListenAddr` command-line flag. For example, if `vmselect` is started with `-clusternativeListenAddr=:8401`, then it can accept queries from another `vmselect` nodes at TCP port 8401 in the same way as `vmstorage` nodes do. This allows chaining `vmselect` nodes and building multi-level cluster topologies. For example, the top-level `vmselect` node can query second-level `vmselect` nodes in different availability zones (AZ), while the second-level `vmselect` nodes can query `vmstorage` nodes in local AZ.
`vminsert` nodes can accept data from another `vminsert` nodes if they run with `-clusternativeListenAddr` command-line flag. For example, if `vminsert` is started with `-clusternativeListenAddr=:8400`, then it can accept data from another `vminsert` nodes at TCP port 8400 in the same way as `vmstorage` nodes do. This allows chaining `vminsert` nodes and building multi-level cluster topologies. For example, the top-level `vminsert` node can replicate data among the second level of `vminsert` nodes located in distinct availability zones (AZ), while the second-level `vminsert` nodes can spread the data among `vmstorage` nodes in local AZ.
The multi-level cluster setup for `vminsert` nodes has the following shortcomings because of synchronous replication and data sharding:
@ -384,6 +386,7 @@ The multi-level cluster setup for `vminsert` nodes has the following shortcoming
These issues are addressed by [vmagent](https://docs.victoriametrics.com/vmagent.html) when it runs in [multitenancy mode](https://docs.victoriametrics.com/vmagent.html#multitenancy). `vmagent` buffers data, which must be sent to a particular AZ, when this AZ is temporarily unavailable. The buffer is stored on disk. The buffered data is sent to AZ as soon as it becomes available.
## Helm
Helm chart simplifies managing cluster version of VictoriaMetrics in Kubernetes.

View file

@ -68,6 +68,12 @@ func (mb *MetricBlock) Marshal(dst []byte) []byte {
return MarshalBlock(dst, &mb.Block)
}
// CopyFrom copies src to mb.
func (mb *MetricBlock) CopyFrom(src *MetricBlock) {
mb.MetricName = append(mb.MetricName[:0], src.MetricName...)
mb.Block.CopyFrom(&src.Block)
}
// MarshalBlock marshals b to dst.
//
// b.MarshalData must be called on b before calling MarshalBlock.