From 0f91f83639805a733d8a8aa984dfb298eca5667e Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 13 Dec 2023 00:06:30 +0200 Subject: [PATCH] app/vmselect: add support for vmstorage groups with independent -replicationFactor per group Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5197 See https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#vmstorage-groups-at-vmselect Thanks to @zekker6 for the initial pull request at https://github.com/VictoriaMetrics/VictoriaMetrics-enterprise/pull/718 --- docs/CHANGELOG.md | 1 + docs/Cluster-VictoriaMetrics.md | 79 +++++++++++++++++++++---- lib/flagutil/dict.go | 100 ++++++++++++++++++++++++++++++++ lib/flagutil/dict_test.go | 69 ++++++++++++++++++++++ 4 files changed, 239 insertions(+), 10 deletions(-) create mode 100644 lib/flagutil/dict.go create mode 100644 lib/flagutil/dict_test.go diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index f1d26bddf..04f9089a9 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -34,6 +34,7 @@ The sandbox cluster installation is running under the constant load generated by * SECURITY: upgrade Go builder from Go1.21.4 to Go1.21.5. See [the list of issues addressed in Go1.21.5](https://github.com/golang/go/issues?q=milestone%3AGo1.21.5+label%3ACherryPickApproved). * FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): add ability to send requests to the first available backend and fall back to other `hot standby` backends when the first backend is unavailable. This allows building highly available setups as shown in [these docs](https://docs.victoriametrics.com/vmauth.html#high-availability). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4792). +* FEATURE: `vmselect`: allow specifying multiple groups of `vmstorage` nodes with independent `-replicationFactor` per each group. See [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#vmstorage-groups-at-vmselect) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5197) for details. * FEATURE: `vmselect`: allow opening [vmui](https://docs.victoriametrics.com/#vmui) and investigating [Top queries](https://docs.victoriametrics.com/#top-queries) and [Active queries](https://docs.victoriametrics.com/#active-queries) when the `vmselect` is overloaded with concurrent queries (e.g. when more than `-search.maxConcurrentRequests` concurrent queries are executed). Previously an attempt to open `Top queries` or `Active queries` at `vmui` could result in `couldn't start executing the request in ... seconds, since -search.maxConcurrentRequests=... concurrent requests are executed` error, which could complicate debugging of overloaded `vmselect` or single-node VictoriaMetrics. * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-enableMultitenantHandlers` command-line flag, which allows receiving data via [VictoriaMetrics cluster urls](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format) at `vmagent` and converting [tenant ids](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy) to (`vm_account_id`, `vm_project_id`) labels before sending the data to the configured `-remoteWrite.url`. See [these docs](https://docs.victoriametrics.com/vmagent.html#multitenancy) for details. * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-remoteWrite.disableOnDiskQueue` command-line flag, which can be used for disabling data queueing to disk when the remote storage cannot keep up with the data ingestion rate. See [these docs](https://docs.victoriametrics.com/vmagent.html#disabling-on-disk-persistence) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2110). diff --git a/docs/Cluster-VictoriaMetrics.md b/docs/Cluster-VictoriaMetrics.md index 8a46a40df..5b7816824 100644 --- a/docs/Cluster-VictoriaMetrics.md +++ b/docs/Cluster-VictoriaMetrics.md @@ -567,6 +567,8 @@ The cluster works in the following way when some of `vmstorage` nodes are unavai if less than `-replicationFactor` vmstorage nodes are unavailable during querying, since it assumes that the remaining `vmstorage` nodes contain the full data. See [these docs](#replication-and-data-safety) for details. + It is also possible to configure independent replication factor per distinct `vmstorage` groups - see [these docs](#vmstorage-groups-at-vmselect). + `vmselect` doesn't serve partial responses for API handlers returning raw datapoints - [`/api/v1/export*` endpoints](https://docs.victoriametrics.com/#how-to-export-time-series), since users usually expect this data is always complete. Data replication can be used for increasing storage durability. See [these docs](#replication-and-data-safety) for details. @@ -678,17 +680,61 @@ Then an additional `vmselect` nodes can be configured for reading the data from ## Multi-level cluster setup -`vmselect` nodes can be queried by other `vmselect` nodes if they run with `-clusternativeListenAddr` command-line flag. For example, if `vmselect` is started with `-clusternativeListenAddr=:8401`, then it can accept queries from another `vmselect` nodes at TCP port 8401 in the same way as `vmstorage` nodes do. This allows chaining `vmselect` nodes and building multi-level cluster topologies. For example, the top-level `vmselect` node can query second-level `vmselect` nodes in different availability zones (AZ), while the second-level `vmselect` nodes can query `vmstorage` nodes in local AZ. +`vmselect` nodes can be queried by other `vmselect` nodes if they run with `-clusternativeListenAddr` command-line flag. +For example, if `vmselect` is started with `-clusternativeListenAddr=:8401`, then it can accept queries from another `vmselect` nodes at TCP port 8401 +in the same way as `vmstorage` nodes do. This allows chaining `vmselect` nodes and building multi-level cluster topologies. +For example, the top-level `vmselect` node can query second-level `vmselect` nodes in different availability zones (AZ), +while the second-level `vmselect` nodes can query `vmstorage` nodes in local AZ. See also [vmstorage groups at vmselect](#vmstorage-groups-at-vmselect). -`vminsert` nodes can accept data from another `vminsert` nodes if they run with `-clusternativeListenAddr` command-line flag. For example, if `vminsert` is started with `-clusternativeListenAddr=:8400`, then it can accept data from another `vminsert` nodes at TCP port 8400 in the same way as `vmstorage` nodes do. This allows chaining `vminsert` nodes and building multi-level cluster topologies. For example, the top-level `vminsert` node can replicate data among the second level of `vminsert` nodes located in distinct availability zones (AZ), while the second-level `vminsert` nodes can spread the data among `vmstorage` nodes in local AZ. +`vminsert` nodes can accept data from another `vminsert` nodes if they run with `-clusternativeListenAddr` command-line flag. +For example, if `vminsert` is started with `-clusternativeListenAddr=:8400`, then it can accept data from another `vminsert` nodes at TCP port 8400 +in the same way as `vmstorage` nodes do. This allows chaining `vminsert` nodes and building multi-level cluster topologies. +For example, the top-level `vminsert` node can replicate data among the second level of `vminsert` nodes located in distinct availability zones (AZ), +while the second-level `vminsert` nodes can spread the data among `vmstorage` nodes in local AZ. The multi-level cluster setup for `vminsert` nodes has the following shortcomings because of synchronous replication and data sharding: * Data ingestion speed is limited by the slowest link to AZ. * `vminsert` nodes at top level re-route incoming data to the remaining AZs when some AZs are temporarily unavailable. This results in data gaps at AZs which were temporarily unavailable. -These issues are addressed by [vmagent](https://docs.victoriametrics.com/vmagent.html) when it runs in [multitenancy mode](https://docs.victoriametrics.com/vmagent.html#multitenancy). `vmagent` buffers data, which must be sent to a particular AZ, when this AZ is temporarily unavailable. The buffer is stored on disk. The buffered data is sent to AZ as soon as it becomes available. +These issues are addressed by [vmagent](https://docs.victoriametrics.com/vmagent.html) when it runs in [multitenancy mode](https://docs.victoriametrics.com/vmagent.html#multitenancy). +`vmagent` buffers data, which must be sent to a particular AZ, when this AZ is temporarily unavailable. The buffer is stored on disk. The buffered data is sent to AZ as soon as it becomes available. +## vmstorage groups at vmselect + +`vmselect` can be configured to query multiple distinct groups of `vmstorage` nodes with individual `-replicationFactor` per each group. +The following format for `-storageNode` command-line flag value should be used for assigning a particular `addr` of `vmstorage` to a particular `groupName` - +`-storageNode=groupName/addr`. For example, the following command runs `vmselect`, which continues returning full responses if up to one node per each group is temporarily unavailable +because the given `-replicationFactor=2` is applied individually per each group: + +``` +/path/to/vmselect \ + -replicationFactor=2 \ + -storageNode=group1/host1 \ + -storageNode=group1/host2 \ + -storageNode=group1/host3 \ + -storageNode=group2/host4 \ + -storageNode=group2/host5 \ + -storageNode=group2/host6 \ + -storageNode=group3/host7 \ + -storageNode=group3/host8 \ + -storageNode=group3/host9 +``` + +It is possible to specify distinct `-replicationFactor` per each group via the following format - `-replicationFactor=groupName:rf`. +For example, the following command runs `vmselect`, which uses `-replicationFactor=3` for the `group1`, while it uses `-replicationFactor=1` for the `group2`: + +``` +/path/to/vmselect \ + -replicationFactor=group1:3 \ + -storageNode=group1/host1 \ + -storageNode=group1/host2 \ + -storageNode=group1/host3 \ + -replicationFactor=group2:1 \ + -storageNode=group2/host4 \ + -storageNode=group2/host5 \ + -storageNode=group2/host6 +``` ## Helm @@ -701,19 +747,32 @@ It is available in the [helm-charts](https://github.com/VictoriaMetrics/helm-cha ## Replication and data safety -By default, VictoriaMetrics offloads replication to the underlying storage pointed by `-storageDataPath` such as [Google compute persistent disk](https://cloud.google.com/compute/docs/disks#pdspecs), which guarantees data durability. VictoriaMetrics supports application-level replication if replicated durable persistent disks cannot be used for some reason. +By default, VictoriaMetrics offloads replication to the underlying storage pointed by `-storageDataPath` such as [Google compute persistent disk](https://cloud.google.com/compute/docs/disks#pdspecs), +which guarantees data durability. VictoriaMetrics supports application-level replication if replicated durable persistent disks cannot be used for some reason. -The replication can be enabled by passing `-replicationFactor=N` command-line flag to `vminsert`. This instructs `vminsert` to store `N` copies for every ingested sample on `N` distinct `vmstorage` nodes. This guarantees that all the stored data remains available for querying if up to `N-1` `vmstorage` nodes are unavailable. +The replication can be enabled by passing `-replicationFactor=N` command-line flag to `vminsert`. This instructs `vminsert` to store `N` copies for every ingested sample +on `N` distinct `vmstorage` nodes. This guarantees that all the stored data remains available for querying if up to `N-1` `vmstorage` nodes are unavailable. -Passing `-replicationFactor=N` command-line flag to `vmselect` instructs it to not mark responses as `partial` if less than `-replicationFactor` vmstorage nodes are unavailable during the query. See [cluster availability docs](#cluster-availability) for details. +Passing `-replicationFactor=N` command-line flag to `vmselect` instructs it to not mark responses as `partial` if less than `-replicationFactor` vmstorage nodes are unavailable during the query. +See [cluster availability docs](#cluster-availability) for details. -The cluster must contain at least `2*N-1` `vmstorage` nodes, where `N` is replication factor, in order to maintain the given replication factor for newly ingested data when `N-1` of storage nodes are unavailable. +The cluster must contain at least `2*N-1` `vmstorage` nodes, where `N` is replication factor, in order to maintain the given replication factor +for newly ingested data when `N-1` of storage nodes are unavailable. -VictoriaMetrics stores timestamps with millisecond precision, so `-dedup.minScrapeInterval=1ms` command-line flag must be passed to `vmselect` nodes when the replication is enabled, so they could de-duplicate replicated samples obtained from distinct `vmstorage` nodes during querying. If duplicate data is pushed to VictoriaMetrics from identically configured [vmagent](https://docs.victoriametrics.com/vmagent.html) instances or Prometheus instances, then the `-dedup.minScrapeInterval` must be set to `scrape_interval` from scrape configs according to [deduplication docs](#deduplication). +VictoriaMetrics stores timestamps with millisecond precision, so `-dedup.minScrapeInterval=1ms` command-line flag must be passed to `vmselect` nodes when the replication is enabled, +so they could de-duplicate replicated samples obtained from distinct `vmstorage` nodes during querying. If duplicate data is pushed to VictoriaMetrics +from identically configured [vmagent](https://docs.victoriametrics.com/vmagent.html) instances or Prometheus instances, then the `-dedup.minScrapeInterval` must be set +to `scrape_interval` from scrape configs according to [deduplication docs](#deduplication). -Note that [replication doesn't save from disaster](https://medium.com/@valyala/speeding-up-backups-for-big-time-series-databases-533c1a927883), so it is recommended performing regular backups. See [these docs](#backups) for details. +Note that [replication doesn't save from disaster](https://medium.com/@valyala/speeding-up-backups-for-big-time-series-databases-533c1a927883), +so it is recommended performing regular backups. See [these docs](#backups) for details. -Note that the replication increases resource usage - CPU, RAM, disk space, network bandwidth - by up to `-replicationFactor=N` times, because `vminsert` stores `N` copies of incoming data to distinct `vmstorage` nodes and `vmselect` needs to de-duplicate the replicated data obtained from `vmstorage` nodes during querying. So it is more cost-effective to offload the replication to underlying replicated durable storage pointed by `-storageDataPath` such as [Google Compute Engine persistent disk](https://cloud.google.com/compute/docs/disks/#pdspecs), which is protected from data loss and data corruption. It also provides consistently high performance and [may be resized](https://cloud.google.com/compute/docs/disks/add-persistent-disk) without downtime. HDD-based persistent disks should be enough for the majority of use cases. It is recommended using durable replicated persistent volumes in Kubernetes. +Note that the replication increases resource usage - CPU, RAM, disk space, network bandwidth - by up to `-replicationFactor=N` times, because `vminsert` stores `N` copies +of incoming data to distinct `vmstorage` nodes and `vmselect` needs to de-duplicate the replicated data obtained from `vmstorage` nodes during querying. +So it is more cost-effective to offload the replication to underlying replicated durable storage pointed by `-storageDataPath` +such as [Google Compute Engine persistent disk](https://cloud.google.com/compute/docs/disks/#pdspecs), which is protected from data loss and data corruption. +It also provides consistently high performance and [may be resized](https://cloud.google.com/compute/docs/disks/add-persistent-disk) without downtime. +HDD-based persistent disks should be enough for the majority of use cases. It is recommended using durable replicated persistent volumes in Kubernetes. ## Deduplication diff --git a/lib/flagutil/dict.go b/lib/flagutil/dict.go new file mode 100644 index 000000000..eceade66d --- /dev/null +++ b/lib/flagutil/dict.go @@ -0,0 +1,100 @@ +package flagutil + +import ( + "flag" + "fmt" + "strconv" + "strings" +) + +// DictInt allows specifying a dictionary of named ints in the form `name1:value1,...,nameN:valueN`. +type DictInt struct { + defaultValue int + kvs []kIntValue +} + +type kIntValue struct { + k string + v int +} + +// NewDictInt creates DictInt with the given name, defaultValue and description. +func NewDictInt(name string, defaultValue int, description string) *DictInt { + description += fmt.Sprintf(" (default %d)", defaultValue) + description += "\nSupports an `array` of `key:value` entries separated by comma or specified via multiple flags." + di := &DictInt{ + defaultValue: defaultValue, + } + flag.Var(di, name, description) + return di +} + +// String implements flag.Value interface +func (di *DictInt) String() string { + kvs := di.kvs + if len(kvs) == 1 && kvs[0].k == "" { + // Short form - a single int value + return strconv.Itoa(kvs[0].v) + } + + formattedResults := make([]string, len(kvs)) + for i, kv := range kvs { + formattedResults[i] = fmt.Sprintf("%s:%d", kv.k, kv.v) + } + return strings.Join(formattedResults, ",") +} + +// Set implements flag.Value interface +func (di *DictInt) Set(value string) error { + values := parseArrayValues(value) + if len(di.kvs) == 0 && len(values) == 1 && strings.IndexByte(values[0], ':') < 0 { + v, err := strconv.Atoi(values[0]) + if err != nil { + return err + } + di.kvs = append(di.kvs, kIntValue{ + v: v, + }) + return nil + } + for _, x := range values { + n := strings.IndexByte(x, ':') + if n < 0 { + return fmt.Errorf("missing ':' in %q", x) + } + k := x[:n] + v, err := strconv.Atoi(x[n+1:]) + if err != nil { + return fmt.Errorf("cannot parse value for key=%q: %w", k, err) + } + if di.contains(k) { + return fmt.Errorf("duplicate value for key=%q: %d", k, v) + } + di.kvs = append(di.kvs, kIntValue{ + k: k, + v: v, + }) + } + return nil +} + +func (di *DictInt) contains(key string) bool { + for _, kv := range di.kvs { + if kv.k == key { + return true + } + } + return false +} + +// Get returns value for the given key. +// +// Default value is returned if key isn't found in di. +func (di *DictInt) Get(key string) int { + for _, kv := range di.kvs { + if kv.k == key { + return kv.v + } + } + return di.defaultValue +} diff --git a/lib/flagutil/dict_test.go b/lib/flagutil/dict_test.go new file mode 100644 index 000000000..31e326ab4 --- /dev/null +++ b/lib/flagutil/dict_test.go @@ -0,0 +1,69 @@ +package flagutil + +import ( + "testing" +) + +func TestDictIntSetSuccess(t *testing.T) { + f := func(s string) { + t.Helper() + var di DictInt + if err := di.Set(s); err != nil { + t.Fatalf("unexpected error: %s", err) + } + result := di.String() + if result != s { + t.Fatalf("unexpected DictInt.String(); got %q; want %q", result, s) + } + } + + f("") + f("123") + f("-234") + f("foo:123") + f("foo:123,bar:-42,baz:0,aa:43") +} + +func TestDictIntFailure(t *testing.T) { + f := func(s string) { + t.Helper() + var di DictInt + if err := di.Set(s); err == nil { + t.Fatalf("expecting non-nil error") + } + } + + // missing values + f("foo") + f("foo:") + + // non-integer values + f("foo:bar") + f("12.34") + f("foo:123.34") + + // duplicate keys + f("a:234,k:123,k:432") +} + +func TestDictIntGet(t *testing.T) { + f := func(s, key string, defaultValue, expectedValue int) { + t.Helper() + var di DictInt + di.defaultValue = defaultValue + if err := di.Set(s); err != nil { + t.Fatalf("unexpected error: %s", err) + } + value := di.Get(key) + if value != expectedValue { + t.Fatalf("unexpected value; got %d; want %d", value, expectedValue) + } + } + + f("", "", 123, 123) + f("", "foo", 123, 123) + f("foo:42", "", 123, 123) + f("foo:42", "foo", 123, 42) + f("532", "", 123, 532) + f("532", "foo", 123, 123) +}