From 12d733dd5da6892a8cc1148083b81d1b04bf1641 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 8 May 2021 17:55:44 +0300 Subject: [PATCH] app/vminsert: add support for data ingestion via other vminsert nodes --- app/vminsert/main.go | 14 +++++++------- docs/CHANGELOG.md | 1 + docs/Cluster-VictoriaMetrics.md | 9 ++++++++- lib/protoparser/common/unmarshal_work.go | 2 +- lib/storage/metric_name.go | 8 ++++---- lib/storage/metric_name_test.go | 8 ++++---- lib/storage/search_test.go | 2 +- lib/storage/storage.go | 18 ++++++++++-------- lib/storage/storage_test.go | 2 +- 9 files changed, 37 insertions(+), 27 deletions(-) diff --git a/app/vminsert/main.go b/app/vminsert/main.go index cafa8d31b..e428c4020 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -44,8 +44,8 @@ var ( ) var ( - influxServer *influxserver.Server graphiteServer *graphiteserver.Server + influxServer *influxserver.Server opentsdbServer *opentsdbserver.Server opentsdbhttpServer *opentsdbhttpserver.Server ) @@ -56,12 +56,12 @@ func Init() { storage.SetMaxLabelsPerTimeseries(*maxLabelsPerTimeseries) common.StartUnmarshalWorkers() writeconcurrencylimiter.Init() - if len(*influxListenAddr) > 0 { - influxServer = influxserver.MustStart(*influxListenAddr, influx.InsertHandlerForReader) - } if len(*graphiteListenAddr) > 0 { graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, graphite.InsertHandler) } + if len(*influxListenAddr) > 0 { + influxServer = influxserver.MustStart(*influxListenAddr, influx.InsertHandlerForReader) + } if len(*opentsdbListenAddr) > 0 { opentsdbServer = opentsdbserver.MustStart(*opentsdbListenAddr, opentsdb.InsertHandler, opentsdbhttp.InsertHandler) } @@ -74,12 +74,12 @@ func Init() { // Stop stops vminsert. func Stop() { promscrape.Stop() - if len(*influxListenAddr) > 0 { - influxServer.MustStop() - } if len(*graphiteListenAddr) > 0 { graphiteServer.MustStop() } + if len(*influxListenAddr) > 0 { + influxServer.MustStop() + } if len(*opentsdbListenAddr) > 0 { opentsdbServer.MustStop() } diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index ae4d999af..a1ca57c28 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -4,6 +4,7 @@ sort: 15 # CHANGELOG +* FEATURE: vminsert: add support for data ingestion via other `vminsert` nodes. This allows building multi-level data ingestion paths in VictoriaMetrics cluster by writing data from one level of `vminsert` nodes to another level of `vminsert` nodes. See [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multi-level-cluster-setup) for details. * FEATURE: vmalert: add flag to control behaviour on startup for state restore errors. Alerting rules now can return specific error type ErrStateRestore to indicate whether restore state procedure failed. Such errors were returned and logged before as well. But now user can specify whether to just log these errors (`-remoteRead.ignoreRestoreErrors=true`) or to stop the process (`-remoteRead.ignoreRestoreErrors=false`). The latter is important when VM isn't ready yet to serve queries from vmalert and it needs to wait. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1252). * BUGFIX: vmagent: fix possible race when refreshing `role: endpoints` and `role: endpointslices` scrape targets in `kubernetes_sd_config`. Prevoiusly `pod` objects could be updated after the related `endpoints` object update. This could lead to missing scrape targets. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240). diff --git a/docs/Cluster-VictoriaMetrics.md b/docs/Cluster-VictoriaMetrics.md index 5a8e92615..ec971cad2 100644 --- a/docs/Cluster-VictoriaMetrics.md +++ b/docs/Cluster-VictoriaMetrics.md @@ -334,6 +334,13 @@ If you need multi-AZ setup, then it is recommended running independed clusters i [vmagent](https://docs.victoriametrics.com/vmagent.html) in front of these clusters, so it could replicate incoming data into all the cluster. Then [promxy](https://github.com/jacksontj/promxy) could be used for querying the data from multiple clusters. +Another solution is to use multi-level cluster setup, where the top level of `vminsert` nodes [replicate](#replication-and-data-safety) data among the lower level of `vminsert` nodes located at different availability zones. These `vminsert` nodes then spread the data among `vmstorage` nodes in each AZ. See [these docs](#multi-level-cluster-setup) for more details. + + +## Multi-level cluster setup + +`vminsert` nodes can accept data from another `vminsert` nodes starting from [v1.60.0](https://docs.victoriametrics.com/CHANGELOG.html#v1600) if `-clusternativeListenAddr` command-line flag is set. For example, if `vminsert` is started with `-clusternativeListenAddr=:8400` command-line flag, then it can accept data from another `vminsert` nodes at TCP port 8400 in the same way as `vmstorage` nodes do. This allows chaining `vminsert` nodes and building multi-level cluster topologies with flexible configs. For example, the top level of `vminsert` nodes can replicate data among the second level of `vminsert` nodes located in distinct availability zones (AZ), while the second-level `vminsert` nodes can spread the data among `vmstorage` nodes located in the same AZ. Such setup guarantees cluster availability if some AZ becomes unavailable. The data from all the `vmstorage` nodes in all the AZs can be read via `vmselect` nodes, which are configured to query all the `vmstorage` nodes in all the availability zones (e.g. all the `vmstorage` addresses are passed via `-storageNode` command-line flag to `vmselect` nodes). Additionally, `-replicationFactor=k+1` must be passed to `vmselect` nodes, where `k` is the lowest number of `vmstorage` nodes in a single AZ. See [replication docs](#replication-and-data-safety) for more details. + ## Helm @@ -359,7 +366,7 @@ so up to 2 `vmstorage` nodes can be lost without data loss. The minimum number o the remaining 3 `vmstorage` nodes could provide the `-replicationFactor=3` for newly ingested data. When the replication is enabled, `-replicationFactor=N` and `-dedup.minScrapeInterval=1ms` command-line flag must be passed to `vmselect` nodes. -The `-replicationFactor=N` improves query performance when a part of vmstorage nodes respond slowly and/or temporarily unavailable. Sometimes `-replicationFactor` at `vmselect` nodes shouldn't be set. See [this issues](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1207) for details. +The `-replicationFactor=N` improves query performance when up to `N-1` vmstorage nodes respond slowly and/or temporarily unavailable. Sometimes `-replicationFactor` at `vmselect` nodes can result in partial responses. See [this issues](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1207) for details. The `-dedup.minScrapeInterval=1ms` de-duplicates replicated data during queries. It is OK if `-dedup.minScrapeInterval` exceeds 1ms when [deduplication](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#deduplication) is used additionally to replication. diff --git a/lib/protoparser/common/unmarshal_work.go b/lib/protoparser/common/unmarshal_work.go index 049644f15..55758e933 100644 --- a/lib/protoparser/common/unmarshal_work.go +++ b/lib/protoparser/common/unmarshal_work.go @@ -26,7 +26,7 @@ func StartUnmarshalWorkers() { logger.Panicf("BUG: it looks like startUnmarshalWorkers() has been alread called without stopUnmarshalWorkers()") } gomaxprocs := cgroup.AvailableCPUs() - unmarshalWorkCh = make(chan UnmarshalWork, 2*gomaxprocs) + unmarshalWorkCh = make(chan UnmarshalWork, gomaxprocs) unmarshalWorkersWG.Add(gomaxprocs) for i := 0; i < gomaxprocs; i++ { go func() { diff --git a/lib/storage/metric_name.go b/lib/storage/metric_name.go index 3e232ee45..d5f486744 100644 --- a/lib/storage/metric_name.go +++ b/lib/storage/metric_name.go @@ -436,7 +436,7 @@ func SetMaxLabelsPerTimeseries(maxLabels int) { // MarshalMetricNameRaw marshals labels to dst and returns the result. // -// The result must be unmarshaled with MetricName.unmarshalRaw +// The result must be unmarshaled with MetricName.UnmarshalRaw func MarshalMetricNameRaw(dst []byte, labels []prompb.Label) []byte { // Calculate the required space for dst. dstLen := len(dst) @@ -538,7 +538,7 @@ func labelsToString(labels []prompb.Label) string { // marshalRaw marshals mn to dst and returns the result. // -// The results may be unmarshaled with MetricName.unmarshalRaw. +// The results may be unmarshaled with MetricName.UnmarshalRaw. // // This function is for testing purposes. MarshalMetricNameRaw must be used // in prod instead. @@ -555,8 +555,8 @@ func (mn *MetricName) marshalRaw(dst []byte) []byte { return dst } -// unmarshalRaw unmarshals mn encoded with MarshalMetricNameRaw. -func (mn *MetricName) unmarshalRaw(src []byte) error { +// UnmarshalRaw unmarshals mn encoded with MarshalMetricNameRaw. +func (mn *MetricName) UnmarshalRaw(src []byte) error { mn.Reset() for len(src) > 0 { tail, key, err := unmarshalBytesFast(src) diff --git a/lib/storage/metric_name_test.go b/lib/storage/metric_name_test.go index ec11e421d..c2da64a41 100644 --- a/lib/storage/metric_name_test.go +++ b/lib/storage/metric_name_test.go @@ -140,7 +140,7 @@ func TestMetricNameMarshalUnmarshalRaw(t *testing.T) { } data := mn.marshalRaw(nil) var mn1 MetricName - if err := mn1.unmarshalRaw(data); err != nil { + if err := mn1.UnmarshalRaw(data); err != nil { t.Fatalf("cannot unmarshal mn %s: %s", &mn, err) } if !reflect.DeepEqual(&mn, &mn1) { @@ -149,13 +149,13 @@ func TestMetricNameMarshalUnmarshalRaw(t *testing.T) { // Try unmarshaling MetricName without tag value. brokenData := marshalTagValue(data, []byte("foobar")) - if err := mn1.unmarshalRaw(brokenData); err == nil { + if err := mn1.UnmarshalRaw(brokenData); err == nil { t.Fatalf("expecting non-zero error when unmarshaling MetricName without tag value") } // Try unmarshaling MetricName with invalid tag key. brokenData[len(brokenData)-1] = 123 - if err := mn1.unmarshalRaw(brokenData); err == nil { + if err := mn1.UnmarshalRaw(brokenData); err == nil { t.Fatalf("expecting non-zero error when unmarshaling MetricName with invalid tag key") } @@ -163,7 +163,7 @@ func TestMetricNameMarshalUnmarshalRaw(t *testing.T) { brokenData = marshalTagValue(data, []byte("foobar")) brokenData = marshalTagValue(brokenData, []byte("aaa")) brokenData[len(brokenData)-1] = 123 - if err := mn1.unmarshalRaw(brokenData); err == nil { + if err := mn1.UnmarshalRaw(brokenData); err == nil { t.Fatalf("expecting non-zero error when unmarshaling MetricName with invalid tag value") } } diff --git a/lib/storage/search_test.go b/lib/storage/search_test.go index 712f8d1a1..1bd2ec3ce 100644 --- a/lib/storage/search_test.go +++ b/lib/storage/search_test.go @@ -187,7 +187,7 @@ func testSearchInternal(st *Storage, tr TimeRange, mrs []MetricRow, accountsCoun if mr.Timestamp < tr.MinTimestamp || mr.Timestamp > tr.MaxTimestamp { continue } - if err := mn.unmarshalRaw(mr.MetricNameRaw); err != nil { + if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil { return fmt.Errorf("cannot unmarshal MetricName: %w", err) } if !metricGroupRegexp.Match(mn.MetricGroup) { diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 7c5c82911..81bd3b18e 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -1251,7 +1251,7 @@ func (s *Storage) GetTSDBStatusForDate(date uint64, topN int, deadline uint64) ( // MetricRow is a metric to insert into storage. type MetricRow struct { // MetricNameRaw contains raw metric name, which must be decoded - // with MetricName.unmarshalRaw. + // with MetricName.UnmarshalRaw. MetricNameRaw []byte Timestamp int64 @@ -1269,7 +1269,7 @@ func (mr *MetricRow) CopyFrom(src *MetricRow) { func (mr *MetricRow) String() string { metricName := string(mr.MetricNameRaw) var mn MetricName - if err := mn.unmarshalRaw(mr.MetricNameRaw); err == nil { + if err := mn.UnmarshalRaw(mr.MetricNameRaw); err == nil { metricName = mn.String() } return fmt.Sprintf("%s (Timestamp=%d, Value=%f)", metricName, mr.Timestamp, mr.Value) @@ -1283,13 +1283,15 @@ func (mr *MetricRow) Marshal(dst []byte) []byte { return dst } -// Unmarshal unmarshals mr from src and returns the remaining tail from src. -func (mr *MetricRow) Unmarshal(src []byte) ([]byte, error) { +// UnmarshalX unmarshals mr from src and returns the remaining tail from src. +// +// mr refers to src, so it remains valid until src changes. +func (mr *MetricRow) UnmarshalX(src []byte) ([]byte, error) { tail, metricNameRaw, err := encoding.UnmarshalBytes(src) if err != nil { return tail, fmt.Errorf("cannot unmarshal MetricName: %w", err) } - mr.MetricNameRaw = append(mr.MetricNameRaw[:0], metricNameRaw...) + mr.MetricNameRaw = metricNameRaw if len(tail) < 8 { return tail, fmt.Errorf("cannot unmarshal Timestamp: want %d bytes; have %d bytes", 8, len(tail)) @@ -1391,7 +1393,7 @@ func (s *Storage) RegisterMetricNames(mrs []MetricRow) error { } // Slow path - register mr.MetricNameRaw. - if err := mn.unmarshalRaw(mr.MetricNameRaw); err != nil { + if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil { return fmt.Errorf("cannot register the metric because cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err) } mn.sortTags() @@ -1572,7 +1574,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra func getUserReadableMetricName(metricNameRaw []byte) string { var mn MetricName - if err := mn.unmarshalRaw(metricNameRaw); err != nil { + if err := mn.UnmarshalRaw(metricNameRaw); err != nil { return fmt.Sprintf("cannot unmarshal metricNameRaw %q: %s", metricNameRaw, err) } return mn.String() @@ -1608,7 +1610,7 @@ func (pmrs *pendingMetricRows) addRow(mr *MetricRow) error { // Do not spend CPU time on re-calculating canonical metricName during bulk import // of many rows for the same metric. if string(mr.MetricNameRaw) != string(pmrs.lastMetricNameRaw) { - if err := pmrs.mn.unmarshalRaw(mr.MetricNameRaw); err != nil { + if err := pmrs.mn.UnmarshalRaw(mr.MetricNameRaw); err != nil { return fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err) } pmrs.mn.sortTags() diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index f2891841e..2e418d960 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -315,7 +315,7 @@ func TestMetricRowMarshalUnmarshal(t *testing.T) { buf = mr1.Marshal(buf[:0]) var mr2 MetricRow - tail, err := mr2.Unmarshal(buf) + tail, err := mr2.UnmarshalX(buf) if err != nil { t.Fatalf("cannot unmarshal mr1=%s: %s", mr1, err) }