app/vminsert: add support for data ingestion via other vminsert nodes

This commit is contained in:
Aliaksandr Valialkin 2021-05-08 17:55:44 +03:00
parent 237e9f9fd7
commit 12d733dd5d
9 changed files with 37 additions and 27 deletions

View file

@ -44,8 +44,8 @@ var (
) )
var ( var (
influxServer *influxserver.Server
graphiteServer *graphiteserver.Server graphiteServer *graphiteserver.Server
influxServer *influxserver.Server
opentsdbServer *opentsdbserver.Server opentsdbServer *opentsdbserver.Server
opentsdbhttpServer *opentsdbhttpserver.Server opentsdbhttpServer *opentsdbhttpserver.Server
) )
@ -56,12 +56,12 @@ func Init() {
storage.SetMaxLabelsPerTimeseries(*maxLabelsPerTimeseries) storage.SetMaxLabelsPerTimeseries(*maxLabelsPerTimeseries)
common.StartUnmarshalWorkers() common.StartUnmarshalWorkers()
writeconcurrencylimiter.Init() writeconcurrencylimiter.Init()
if len(*influxListenAddr) > 0 {
influxServer = influxserver.MustStart(*influxListenAddr, influx.InsertHandlerForReader)
}
if len(*graphiteListenAddr) > 0 { if len(*graphiteListenAddr) > 0 {
graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, graphite.InsertHandler) graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, graphite.InsertHandler)
} }
if len(*influxListenAddr) > 0 {
influxServer = influxserver.MustStart(*influxListenAddr, influx.InsertHandlerForReader)
}
if len(*opentsdbListenAddr) > 0 { if len(*opentsdbListenAddr) > 0 {
opentsdbServer = opentsdbserver.MustStart(*opentsdbListenAddr, opentsdb.InsertHandler, opentsdbhttp.InsertHandler) opentsdbServer = opentsdbserver.MustStart(*opentsdbListenAddr, opentsdb.InsertHandler, opentsdbhttp.InsertHandler)
} }
@ -74,12 +74,12 @@ func Init() {
// Stop stops vminsert. // Stop stops vminsert.
func Stop() { func Stop() {
promscrape.Stop() promscrape.Stop()
if len(*influxListenAddr) > 0 {
influxServer.MustStop()
}
if len(*graphiteListenAddr) > 0 { if len(*graphiteListenAddr) > 0 {
graphiteServer.MustStop() graphiteServer.MustStop()
} }
if len(*influxListenAddr) > 0 {
influxServer.MustStop()
}
if len(*opentsdbListenAddr) > 0 { if len(*opentsdbListenAddr) > 0 {
opentsdbServer.MustStop() opentsdbServer.MustStop()
} }

View file

@ -4,6 +4,7 @@ sort: 15
# CHANGELOG # CHANGELOG
* FEATURE: vminsert: add support for data ingestion via other `vminsert` nodes. This allows building multi-level data ingestion paths in VictoriaMetrics cluster by writing data from one level of `vminsert` nodes to another level of `vminsert` nodes. See [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multi-level-cluster-setup) for details.
* FEATURE: vmalert: add flag to control behaviour on startup for state restore errors. Alerting rules now can return specific error type ErrStateRestore to indicate whether restore state procedure failed. Such errors were returned and logged before as well. But now user can specify whether to just log these errors (`-remoteRead.ignoreRestoreErrors=true`) or to stop the process (`-remoteRead.ignoreRestoreErrors=false`). The latter is important when VM isn't ready yet to serve queries from vmalert and it needs to wait. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1252). * FEATURE: vmalert: add flag to control behaviour on startup for state restore errors. Alerting rules now can return specific error type ErrStateRestore to indicate whether restore state procedure failed. Such errors were returned and logged before as well. But now user can specify whether to just log these errors (`-remoteRead.ignoreRestoreErrors=true`) or to stop the process (`-remoteRead.ignoreRestoreErrors=false`). The latter is important when VM isn't ready yet to serve queries from vmalert and it needs to wait. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1252).
* BUGFIX: vmagent: fix possible race when refreshing `role: endpoints` and `role: endpointslices` scrape targets in `kubernetes_sd_config`. Prevoiusly `pod` objects could be updated after the related `endpoints` object update. This could lead to missing scrape targets. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240). * BUGFIX: vmagent: fix possible race when refreshing `role: endpoints` and `role: endpointslices` scrape targets in `kubernetes_sd_config`. Prevoiusly `pod` objects could be updated after the related `endpoints` object update. This could lead to missing scrape targets. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240).

View file

@ -334,6 +334,13 @@ If you need multi-AZ setup, then it is recommended running independed clusters i
[vmagent](https://docs.victoriametrics.com/vmagent.html) in front of these clusters, so it could replicate incoming data [vmagent](https://docs.victoriametrics.com/vmagent.html) in front of these clusters, so it could replicate incoming data
into all the cluster. Then [promxy](https://github.com/jacksontj/promxy) could be used for querying the data from multiple clusters. into all the cluster. Then [promxy](https://github.com/jacksontj/promxy) could be used for querying the data from multiple clusters.
Another solution is to use multi-level cluster setup, where the top level of `vminsert` nodes [replicate](#replication-and-data-safety) data among the lower level of `vminsert` nodes located at different availability zones. These `vminsert` nodes then spread the data among `vmstorage` nodes in each AZ. See [these docs](#multi-level-cluster-setup) for more details.
## Multi-level cluster setup
`vminsert` nodes can accept data from another `vminsert` nodes starting from [v1.60.0](https://docs.victoriametrics.com/CHANGELOG.html#v1600) if `-clusternativeListenAddr` command-line flag is set. For example, if `vminsert` is started with `-clusternativeListenAddr=:8400` command-line flag, then it can accept data from another `vminsert` nodes at TCP port 8400 in the same way as `vmstorage` nodes do. This allows chaining `vminsert` nodes and building multi-level cluster topologies with flexible configs. For example, the top level of `vminsert` nodes can replicate data among the second level of `vminsert` nodes located in distinct availability zones (AZ), while the second-level `vminsert` nodes can spread the data among `vmstorage` nodes located in the same AZ. Such setup guarantees cluster availability if some AZ becomes unavailable. The data from all the `vmstorage` nodes in all the AZs can be read via `vmselect` nodes, which are configured to query all the `vmstorage` nodes in all the availability zones (e.g. all the `vmstorage` addresses are passed via `-storageNode` command-line flag to `vmselect` nodes). Additionally, `-replicationFactor=k+1` must be passed to `vmselect` nodes, where `k` is the lowest number of `vmstorage` nodes in a single AZ. See [replication docs](#replication-and-data-safety) for more details.
## Helm ## Helm
@ -359,7 +366,7 @@ so up to 2 `vmstorage` nodes can be lost without data loss. The minimum number o
the remaining 3 `vmstorage` nodes could provide the `-replicationFactor=3` for newly ingested data. the remaining 3 `vmstorage` nodes could provide the `-replicationFactor=3` for newly ingested data.
When the replication is enabled, `-replicationFactor=N` and `-dedup.minScrapeInterval=1ms` command-line flag must be passed to `vmselect` nodes. When the replication is enabled, `-replicationFactor=N` and `-dedup.minScrapeInterval=1ms` command-line flag must be passed to `vmselect` nodes.
The `-replicationFactor=N` improves query performance when a part of vmstorage nodes respond slowly and/or temporarily unavailable. Sometimes `-replicationFactor` at `vmselect` nodes shouldn't be set. See [this issues](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1207) for details. The `-replicationFactor=N` improves query performance when up to `N-1` vmstorage nodes respond slowly and/or temporarily unavailable. Sometimes `-replicationFactor` at `vmselect` nodes can result in partial responses. See [this issues](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1207) for details.
The `-dedup.minScrapeInterval=1ms` de-duplicates replicated data during queries. It is OK if `-dedup.minScrapeInterval` exceeds 1ms The `-dedup.minScrapeInterval=1ms` de-duplicates replicated data during queries. It is OK if `-dedup.minScrapeInterval` exceeds 1ms
when [deduplication](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#deduplication) is used additionally to replication. when [deduplication](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#deduplication) is used additionally to replication.

View file

@ -26,7 +26,7 @@ func StartUnmarshalWorkers() {
logger.Panicf("BUG: it looks like startUnmarshalWorkers() has been alread called without stopUnmarshalWorkers()") logger.Panicf("BUG: it looks like startUnmarshalWorkers() has been alread called without stopUnmarshalWorkers()")
} }
gomaxprocs := cgroup.AvailableCPUs() gomaxprocs := cgroup.AvailableCPUs()
unmarshalWorkCh = make(chan UnmarshalWork, 2*gomaxprocs) unmarshalWorkCh = make(chan UnmarshalWork, gomaxprocs)
unmarshalWorkersWG.Add(gomaxprocs) unmarshalWorkersWG.Add(gomaxprocs)
for i := 0; i < gomaxprocs; i++ { for i := 0; i < gomaxprocs; i++ {
go func() { go func() {

View file

@ -436,7 +436,7 @@ func SetMaxLabelsPerTimeseries(maxLabels int) {
// MarshalMetricNameRaw marshals labels to dst and returns the result. // MarshalMetricNameRaw marshals labels to dst and returns the result.
// //
// The result must be unmarshaled with MetricName.unmarshalRaw // The result must be unmarshaled with MetricName.UnmarshalRaw
func MarshalMetricNameRaw(dst []byte, labels []prompb.Label) []byte { func MarshalMetricNameRaw(dst []byte, labels []prompb.Label) []byte {
// Calculate the required space for dst. // Calculate the required space for dst.
dstLen := len(dst) dstLen := len(dst)
@ -538,7 +538,7 @@ func labelsToString(labels []prompb.Label) string {
// marshalRaw marshals mn to dst and returns the result. // marshalRaw marshals mn to dst and returns the result.
// //
// The results may be unmarshaled with MetricName.unmarshalRaw. // The results may be unmarshaled with MetricName.UnmarshalRaw.
// //
// This function is for testing purposes. MarshalMetricNameRaw must be used // This function is for testing purposes. MarshalMetricNameRaw must be used
// in prod instead. // in prod instead.
@ -555,8 +555,8 @@ func (mn *MetricName) marshalRaw(dst []byte) []byte {
return dst return dst
} }
// unmarshalRaw unmarshals mn encoded with MarshalMetricNameRaw. // UnmarshalRaw unmarshals mn encoded with MarshalMetricNameRaw.
func (mn *MetricName) unmarshalRaw(src []byte) error { func (mn *MetricName) UnmarshalRaw(src []byte) error {
mn.Reset() mn.Reset()
for len(src) > 0 { for len(src) > 0 {
tail, key, err := unmarshalBytesFast(src) tail, key, err := unmarshalBytesFast(src)

View file

@ -140,7 +140,7 @@ func TestMetricNameMarshalUnmarshalRaw(t *testing.T) {
} }
data := mn.marshalRaw(nil) data := mn.marshalRaw(nil)
var mn1 MetricName var mn1 MetricName
if err := mn1.unmarshalRaw(data); err != nil { if err := mn1.UnmarshalRaw(data); err != nil {
t.Fatalf("cannot unmarshal mn %s: %s", &mn, err) t.Fatalf("cannot unmarshal mn %s: %s", &mn, err)
} }
if !reflect.DeepEqual(&mn, &mn1) { if !reflect.DeepEqual(&mn, &mn1) {
@ -149,13 +149,13 @@ func TestMetricNameMarshalUnmarshalRaw(t *testing.T) {
// Try unmarshaling MetricName without tag value. // Try unmarshaling MetricName without tag value.
brokenData := marshalTagValue(data, []byte("foobar")) brokenData := marshalTagValue(data, []byte("foobar"))
if err := mn1.unmarshalRaw(brokenData); err == nil { if err := mn1.UnmarshalRaw(brokenData); err == nil {
t.Fatalf("expecting non-zero error when unmarshaling MetricName without tag value") t.Fatalf("expecting non-zero error when unmarshaling MetricName without tag value")
} }
// Try unmarshaling MetricName with invalid tag key. // Try unmarshaling MetricName with invalid tag key.
brokenData[len(brokenData)-1] = 123 brokenData[len(brokenData)-1] = 123
if err := mn1.unmarshalRaw(brokenData); err == nil { if err := mn1.UnmarshalRaw(brokenData); err == nil {
t.Fatalf("expecting non-zero error when unmarshaling MetricName with invalid tag key") t.Fatalf("expecting non-zero error when unmarshaling MetricName with invalid tag key")
} }
@ -163,7 +163,7 @@ func TestMetricNameMarshalUnmarshalRaw(t *testing.T) {
brokenData = marshalTagValue(data, []byte("foobar")) brokenData = marshalTagValue(data, []byte("foobar"))
brokenData = marshalTagValue(brokenData, []byte("aaa")) brokenData = marshalTagValue(brokenData, []byte("aaa"))
brokenData[len(brokenData)-1] = 123 brokenData[len(brokenData)-1] = 123
if err := mn1.unmarshalRaw(brokenData); err == nil { if err := mn1.UnmarshalRaw(brokenData); err == nil {
t.Fatalf("expecting non-zero error when unmarshaling MetricName with invalid tag value") t.Fatalf("expecting non-zero error when unmarshaling MetricName with invalid tag value")
} }
} }

View file

@ -187,7 +187,7 @@ func testSearchInternal(st *Storage, tr TimeRange, mrs []MetricRow, accountsCoun
if mr.Timestamp < tr.MinTimestamp || mr.Timestamp > tr.MaxTimestamp { if mr.Timestamp < tr.MinTimestamp || mr.Timestamp > tr.MaxTimestamp {
continue continue
} }
if err := mn.unmarshalRaw(mr.MetricNameRaw); err != nil { if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil {
return fmt.Errorf("cannot unmarshal MetricName: %w", err) return fmt.Errorf("cannot unmarshal MetricName: %w", err)
} }
if !metricGroupRegexp.Match(mn.MetricGroup) { if !metricGroupRegexp.Match(mn.MetricGroup) {

View file

@ -1251,7 +1251,7 @@ func (s *Storage) GetTSDBStatusForDate(date uint64, topN int, deadline uint64) (
// MetricRow is a metric to insert into storage. // MetricRow is a metric to insert into storage.
type MetricRow struct { type MetricRow struct {
// MetricNameRaw contains raw metric name, which must be decoded // MetricNameRaw contains raw metric name, which must be decoded
// with MetricName.unmarshalRaw. // with MetricName.UnmarshalRaw.
MetricNameRaw []byte MetricNameRaw []byte
Timestamp int64 Timestamp int64
@ -1269,7 +1269,7 @@ func (mr *MetricRow) CopyFrom(src *MetricRow) {
func (mr *MetricRow) String() string { func (mr *MetricRow) String() string {
metricName := string(mr.MetricNameRaw) metricName := string(mr.MetricNameRaw)
var mn MetricName var mn MetricName
if err := mn.unmarshalRaw(mr.MetricNameRaw); err == nil { if err := mn.UnmarshalRaw(mr.MetricNameRaw); err == nil {
metricName = mn.String() metricName = mn.String()
} }
return fmt.Sprintf("%s (Timestamp=%d, Value=%f)", metricName, mr.Timestamp, mr.Value) return fmt.Sprintf("%s (Timestamp=%d, Value=%f)", metricName, mr.Timestamp, mr.Value)
@ -1283,13 +1283,15 @@ func (mr *MetricRow) Marshal(dst []byte) []byte {
return dst return dst
} }
// Unmarshal unmarshals mr from src and returns the remaining tail from src. // UnmarshalX unmarshals mr from src and returns the remaining tail from src.
func (mr *MetricRow) Unmarshal(src []byte) ([]byte, error) { //
// mr refers to src, so it remains valid until src changes.
func (mr *MetricRow) UnmarshalX(src []byte) ([]byte, error) {
tail, metricNameRaw, err := encoding.UnmarshalBytes(src) tail, metricNameRaw, err := encoding.UnmarshalBytes(src)
if err != nil { if err != nil {
return tail, fmt.Errorf("cannot unmarshal MetricName: %w", err) return tail, fmt.Errorf("cannot unmarshal MetricName: %w", err)
} }
mr.MetricNameRaw = append(mr.MetricNameRaw[:0], metricNameRaw...) mr.MetricNameRaw = metricNameRaw
if len(tail) < 8 { if len(tail) < 8 {
return tail, fmt.Errorf("cannot unmarshal Timestamp: want %d bytes; have %d bytes", 8, len(tail)) return tail, fmt.Errorf("cannot unmarshal Timestamp: want %d bytes; have %d bytes", 8, len(tail))
@ -1391,7 +1393,7 @@ func (s *Storage) RegisterMetricNames(mrs []MetricRow) error {
} }
// Slow path - register mr.MetricNameRaw. // Slow path - register mr.MetricNameRaw.
if err := mn.unmarshalRaw(mr.MetricNameRaw); err != nil { if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil {
return fmt.Errorf("cannot register the metric because cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err) return fmt.Errorf("cannot register the metric because cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err)
} }
mn.sortTags() mn.sortTags()
@ -1572,7 +1574,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
func getUserReadableMetricName(metricNameRaw []byte) string { func getUserReadableMetricName(metricNameRaw []byte) string {
var mn MetricName var mn MetricName
if err := mn.unmarshalRaw(metricNameRaw); err != nil { if err := mn.UnmarshalRaw(metricNameRaw); err != nil {
return fmt.Sprintf("cannot unmarshal metricNameRaw %q: %s", metricNameRaw, err) return fmt.Sprintf("cannot unmarshal metricNameRaw %q: %s", metricNameRaw, err)
} }
return mn.String() return mn.String()
@ -1608,7 +1610,7 @@ func (pmrs *pendingMetricRows) addRow(mr *MetricRow) error {
// Do not spend CPU time on re-calculating canonical metricName during bulk import // Do not spend CPU time on re-calculating canonical metricName during bulk import
// of many rows for the same metric. // of many rows for the same metric.
if string(mr.MetricNameRaw) != string(pmrs.lastMetricNameRaw) { if string(mr.MetricNameRaw) != string(pmrs.lastMetricNameRaw) {
if err := pmrs.mn.unmarshalRaw(mr.MetricNameRaw); err != nil { if err := pmrs.mn.UnmarshalRaw(mr.MetricNameRaw); err != nil {
return fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err) return fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err)
} }
pmrs.mn.sortTags() pmrs.mn.sortTags()

View file

@ -315,7 +315,7 @@ func TestMetricRowMarshalUnmarshal(t *testing.T) {
buf = mr1.Marshal(buf[:0]) buf = mr1.Marshal(buf[:0])
var mr2 MetricRow var mr2 MetricRow
tail, err := mr2.Unmarshal(buf) tail, err := mr2.UnmarshalX(buf)
if err != nil { if err != nil {
t.Fatalf("cannot unmarshal mr1=%s: %s", mr1, err) t.Fatalf("cannot unmarshal mr1=%s: %s", mr1, err)
} }