app/vminsert: add support for data ingestion via other vminsert nodes

This commit is contained in:
Aliaksandr Valialkin 2021-05-08 17:55:44 +03:00
parent 07bc021f58
commit 4a5f45c77e
16 changed files with 452 additions and 187 deletions

View file

@ -330,6 +330,13 @@ If you need multi-AZ setup, then it is recommended running independed clusters i
[vmagent](https://docs.victoriametrics.com/vmagent.html) in front of these clusters, so it could replicate incoming data
into all the cluster. Then [promxy](https://github.com/jacksontj/promxy) could be used for querying the data from multiple clusters.
Another solution is to use multi-level cluster setup, where the top level of `vminsert` nodes [replicate](#replication-and-data-safety) data among the lower level of `vminsert` nodes located at different availability zones. These `vminsert` nodes then spread the data among `vmstorage` nodes in each AZ. See [these docs](#multi-level-cluster-setup) for more details.
## Multi-level cluster setup
`vminsert` nodes can accept data from another `vminsert` nodes starting from [v1.60.0](https://docs.victoriametrics.com/CHANGELOG.html#v1600) if `-clusternativeListenAddr` command-line flag is set. For example, if `vminsert` is started with `-clusternativeListenAddr=:8400` command-line flag, then it can accept data from another `vminsert` nodes at TCP port 8400 in the same way as `vmstorage` nodes do. This allows chaining `vminsert` nodes and building multi-level cluster topologies with flexible configs. For example, the top level of `vminsert` nodes can replicate data among the second level of `vminsert` nodes located in distinct availability zones (AZ), while the second-level `vminsert` nodes can spread the data among `vmstorage` nodes located in the same AZ. Such setup guarantees cluster availability if some AZ becomes unavailable. The data from all the `vmstorage` nodes in all the AZs can be read via `vmselect` nodes, which are configured to query all the `vmstorage` nodes in all the availability zones (e.g. all the `vmstorage` addresses are passed via `-storageNode` command-line flag to `vmselect` nodes). Additionally, `-replicationFactor=k+1` must be passed to `vmselect` nodes, where `k` is the lowest number of `vmstorage` nodes in a single AZ. See [replication docs](#replication-and-data-safety) for more details.
## Helm
@ -355,7 +362,7 @@ so up to 2 `vmstorage` nodes can be lost without data loss. The minimum number o
the remaining 3 `vmstorage` nodes could provide the `-replicationFactor=3` for newly ingested data.
When the replication is enabled, `-replicationFactor=N` and `-dedup.minScrapeInterval=1ms` command-line flag must be passed to `vmselect` nodes.
The `-replicationFactor=N` improves query performance when a part of vmstorage nodes respond slowly and/or temporarily unavailable. Sometimes `-replicationFactor` at `vmselect` nodes shouldn't be set. See [this issues](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1207) for details.
The `-replicationFactor=N` improves query performance when up to `N-1` vmstorage nodes respond slowly and/or temporarily unavailable. Sometimes `-replicationFactor` at `vmselect` nodes can result in partial responses. See [this issues](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1207) for details.
The `-dedup.minScrapeInterval=1ms` de-duplicates replicated data during queries. It is OK if `-dedup.minScrapeInterval` exceeds 1ms
when [deduplication](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#deduplication) is used additionally to replication.

View file

@ -0,0 +1,78 @@
package clusternative
import (
"fmt"
"net"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/clusternative"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="clusternative"}`)
rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="clusternative"}`)
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="clusternative"}`)
)
// InsertHandler processes data from vminsert nodes.
func InsertHandler(c net.Conn) error {
bc, err := handshake.VMInsertServer(c, 0)
if err != nil {
return fmt.Errorf("cannot perform vminsert handshake with client %q: %w", c.RemoteAddr(), err)
}
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(bc, func(rows []storage.MetricRow) error {
return insertRows(rows)
})
})
}
func insertRows(rows []storage.MetricRow) error {
ctx := netstorage.GetInsertCtx()
defer netstorage.PutInsertCtx(ctx)
ctx.Reset() // This line is required for initializing ctx internals.
hasRelabeling := relabel.HasRelabeling()
var at auth.Token
var rowsPerTenant *metrics.Counter
var mn storage.MetricName
for i := range rows {
mr := &rows[i]
if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil {
return fmt.Errorf("cannot unmarshal MetricNameRaw: %w", err)
}
if rowsPerTenant == nil || mn.AccountID != at.AccountID || mn.ProjectID != at.ProjectID {
at.AccountID = mn.AccountID
at.ProjectID = mn.ProjectID
rowsPerTenant = rowsTenantInserted.Get(&at)
}
ctx.Labels = ctx.Labels[:0]
ctx.AddLabelBytes(nil, mn.MetricGroup)
for j := range mn.Tags {
tag := &mn.Tags[j]
ctx.AddLabelBytes(tag.Key, tag.Value)
}
if hasRelabeling {
ctx.ApplyRelabeling()
}
if len(ctx.Labels) == 0 {
// Skip metric without labels.
continue
}
ctx.SortLabelsIfNeeded()
if err := ctx.WriteDataPoint(&at, ctx.Labels, mr.Timestamp, mr.Value); err != nil {
return err
}
rowsPerTenant.Inc()
}
rowsInserted.Add(len(rows))
rowsPerInsert.Update(float64(len(rows)))
return ctx.FlushBufs()
}

View file

@ -4,11 +4,13 @@ import (
"flag"
"fmt"
"io"
"net"
"net/http"
"os"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/clusternative"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/influx"
@ -27,6 +29,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/influxutils"
clusternativeserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/clusternative"
graphiteserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/graphite"
influxserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/influx"
opentsdbserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdb"
@ -40,6 +43,8 @@ import (
)
var (
clusternativeListenAddr = flag.String("clusternativeListenAddr", "", "TCP address to listen for data from other vminsert nodes in multi-level cluster setup. "+
"See https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multi-level-cluster-setup . Usually :8400 must be set. Doesn't work if empty")
graphiteListenAddr = flag.String("graphiteListenAddr", "", "TCP and UDP address to listen for Graphite plaintext data. Usually :2003 must be set. Doesn't work if empty")
influxListenAddr = flag.String("influxListenAddr", "", "TCP and UDP address to listen for Influx line protocol data. Usually :8189 must be set. Doesn't work if empty. "+
"This flag isn't needed when ingesting data over HTTP - just send it to http://<vminsert>:8480/insert/<accountID>/influx/write")
@ -53,10 +58,11 @@ var (
)
var (
influxServer *influxserver.Server
graphiteServer *graphiteserver.Server
opentsdbServer *opentsdbserver.Server
opentsdbhttpServer *opentsdbhttpserver.Server
clusternativeServer *clusternativeserver.Server
graphiteServer *graphiteserver.Server
influxServer *influxserver.Server
opentsdbServer *opentsdbserver.Server
opentsdbhttpServer *opentsdbhttpserver.Server
)
func main() {
@ -79,10 +85,9 @@ func main() {
storage.SetMaxLabelsPerTimeseries(*maxLabelsPerTimeseries)
common.StartUnmarshalWorkers()
writeconcurrencylimiter.Init()
if len(*influxListenAddr) > 0 {
influxServer = influxserver.MustStart(*influxListenAddr, func(r io.Reader) error {
var at auth.Token // TODO: properly initialize auth token
return influx.InsertHandlerForReader(&at, r)
if len(*clusternativeListenAddr) > 0 {
clusternativeServer = clusternativeserver.MustStart(*clusternativeListenAddr, func(c net.Conn) error {
return clusternative.InsertHandler(c)
})
}
if len(*graphiteListenAddr) > 0 {
@ -91,6 +96,12 @@ func main() {
return graphite.InsertHandler(&at, r)
})
}
if len(*influxListenAddr) > 0 {
influxServer = influxserver.MustStart(*influxListenAddr, func(r io.Reader) error {
var at auth.Token // TODO: properly initialize auth token
return influx.InsertHandlerForReader(&at, r)
})
}
if len(*opentsdbListenAddr) > 0 {
opentsdbServer = opentsdbserver.MustStart(*opentsdbListenAddr, func(r io.Reader) error {
var at auth.Token // TODO: properly initialize auth token
@ -115,12 +126,15 @@ func main() {
}
logger.Infof("successfully shut down http service in %.3f seconds", time.Since(startTime).Seconds())
if len(*influxListenAddr) > 0 {
influxServer.MustStop()
if len(*clusternativeListenAddr) > 0 {
clusternativeServer.MustStop()
}
if len(*graphiteListenAddr) > 0 {
graphiteServer.MustStop()
}
if len(*influxListenAddr) > 0 {
influxServer.MustStop()
}
if len(*opentsdbListenAddr) > 0 {
opentsdbServer.MustStop()
}

View file

@ -596,7 +596,7 @@ func spreadReroutedBufToStorageNodesBlocking(stopCh <-chan struct{}, br *bufRows
}
src := br.buf
for len(src) > 0 {
tail, err := mr.Unmarshal(src)
tail, err := mr.UnmarshalX(src)
if err != nil {
logger.Panicf("BUG: cannot unmarshal MetricRow from reroutedBR.buf: %s", err)
}
@ -610,6 +610,7 @@ func spreadReroutedBufToStorageNodesBlocking(stopCh <-chan struct{}, br *bufRows
// they all go to the original or to the next sn.
h = xxhash.Sum64(mr.MetricNameRaw)
}
mr.ResetX()
for {
idx := h % uint64(len(sns))
sn := sns[idx]

View file

@ -17,6 +17,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
)
@ -77,7 +78,7 @@ func main() {
registerStorageMetrics(strg)
transport.StartUnmarshalWorkers()
common.StartUnmarshalWorkers()
srv, err := transport.NewServer(*vminsertAddr, *vmselectAddr, strg)
if err != nil {
logger.Fatalf("cannot create a server with vminsertAddr=%s, vmselectAddr=%s: %s", *vminsertAddr, *vmselectAddr, err)
@ -104,7 +105,7 @@ func main() {
logger.Infof("gracefully shutting down the service")
startTime = time.Now()
srv.MustClose()
transport.StopUnmarshalWorkers()
common.StopUnmarshalWorkers()
logger.Infof("successfully shut down the service in %.3f seconds", time.Since(startTime).Seconds())
logger.Infof("gracefully closing the storage at %s", *storageDataPath)

View file

@ -12,14 +12,13 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/consts"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/clusternative"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
)
@ -294,159 +293,13 @@ func (s *Server) isStopping() bool {
}
func (s *Server) processVMInsertConn(bc *handshake.BufferedConn) error {
sizeBuf := make([]byte, 8)
var reqBuf []byte
remoteAddr := bc.RemoteAddr().String()
for {
if _, err := io.ReadFull(bc, sizeBuf); err != nil {
if err == io.EOF {
// Remote end gracefully closed the connection.
return nil
}
return fmt.Errorf("cannot read packet size: %w", err)
}
packetSize := encoding.UnmarshalUint64(sizeBuf)
if packetSize > consts.MaxInsertPacketSize {
return fmt.Errorf("too big packet size: %d; shouldn't exceed %d", packetSize, consts.MaxInsertPacketSize)
}
reqBuf = bytesutil.Resize(reqBuf, int(packetSize))
if n, err := io.ReadFull(bc, reqBuf); err != nil {
return fmt.Errorf("cannot read packet with size %d bytes: %w; read only %d bytes", packetSize, err, n)
}
// Send `ack` to vminsert that the packet has been received.
deadline := time.Now().Add(5 * time.Second)
if err := bc.SetWriteDeadline(deadline); err != nil {
return fmt.Errorf("cannot set write deadline for sending `ack` to vminsert: %w", err)
}
sizeBuf[0] = 1
if _, err := bc.Write(sizeBuf[:1]); err != nil {
return fmt.Errorf("cannot send `ack` to vminsert: %w", err)
}
if err := bc.Flush(); err != nil {
return fmt.Errorf("cannot flush `ack` to vminsert: %w", err)
}
vminsertPacketsRead.Inc()
uw := getUnmarshalWork()
uw.storage = s.storage
uw.remoteAddr = remoteAddr
uw.reqBuf, reqBuf = reqBuf, uw.reqBuf
unmarshalWorkCh <- uw
}
return clusternative.ParseStream(bc, func(rows []storage.MetricRow) error {
vminsertMetricsRead.Add(len(rows))
return s.storage.AddRows(rows, uint8(*precisionBits))
})
}
var (
vminsertPacketsRead = metrics.NewCounter("vm_vminsert_packets_read_total")
vminsertMetricsRead = metrics.NewCounter("vm_vminsert_metrics_read_total")
)
func getUnmarshalWork() *unmarshalWork {
v := unmarshalWorkPool.Get()
if v == nil {
return &unmarshalWork{}
}
return v.(*unmarshalWork)
}
// StartUnmarshalWorkers starts workers for unmarshaling data obtained from vminsert connections.
//
// This function must be called before servers are created via NewServer.
func StartUnmarshalWorkers() {
gomaxprocs := cgroup.AvailableCPUs()
unmarshalWorkCh = make(chan *unmarshalWork, gomaxprocs)
unmarshalWorkersWG.Add(gomaxprocs)
for i := 0; i < gomaxprocs; i++ {
go func() {
defer unmarshalWorkersWG.Done()
for uw := range unmarshalWorkCh {
uw.Unmarshal()
putUnmarshalWork(uw)
}
}()
}
}
// StopUnmarshalWorkers stops unmarshal workers which were started with StartUnmarshalWorkers.
//
// This function must be called after Server.MustClose().
func StopUnmarshalWorkers() {
close(unmarshalWorkCh)
unmarshalWorkersWG.Wait()
}
var (
unmarshalWorkCh chan *unmarshalWork
unmarshalWorkersWG sync.WaitGroup
)
func putUnmarshalWork(uw *unmarshalWork) {
uw.reset()
unmarshalWorkPool.Put(uw)
}
var unmarshalWorkPool sync.Pool
type unmarshalWork struct {
storage *storage.Storage
remoteAddr string
mrs []storage.MetricRow
reqBuf []byte
lastResetTime uint64
}
func (uw *unmarshalWork) reset() {
if len(uw.reqBuf)*4 < cap(uw.reqBuf) && fasttime.UnixTimestamp()-uw.lastResetTime > 10 {
// Periodically reset mrs and reqBuf in order to prevent from gradual memory usage growth
// when ceratin entries in mr contain too long labels.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/490 for details.
uw.mrs = nil
uw.reqBuf = nil
uw.lastResetTime = fasttime.UnixTimestamp()
}
uw.storage = nil
uw.remoteAddr = ""
uw.mrs = uw.mrs[:0]
uw.reqBuf = uw.reqBuf[:0]
}
func (uw *unmarshalWork) Unmarshal() {
mrs := uw.mrs[:0]
tail := uw.reqBuf
for len(tail) > 0 {
if len(mrs) < cap(mrs) {
mrs = mrs[:len(mrs)+1]
} else {
mrs = append(mrs, storage.MetricRow{})
}
mr := &mrs[len(mrs)-1]
var err error
tail, err = mr.Unmarshal(tail)
if err != nil {
logger.Errorf("cannot unmarshal MetricRow obtained from %s: %s", uw.remoteAddr, err)
uw.mrs = mrs[:0]
return
}
if len(mrs) >= 10000 {
// Store the collected mrs in order to reduce memory usage
// when too big number of mrs are sent in each packet.
// This should help with https://github.com/VictoriaMetrics/VictoriaMetrics/issues/490
uw.mrs = mrs
uw.flushRows()
mrs = uw.mrs[:0]
}
}
uw.mrs = mrs
uw.flushRows()
}
func (uw *unmarshalWork) flushRows() {
vminsertMetricsRead.Add(len(uw.mrs))
err := uw.storage.AddRows(uw.mrs, uint8(*precisionBits))
uw.mrs = uw.mrs[:0]
if err != nil {
logger.Errorf("cannot store metrics obtained from %s: %s", uw.remoteAddr, err)
}
}
var vminsertMetricsRead = metrics.NewCounter("vm_vminsert_metrics_read_total")
func (s *Server) processVMSelectConn(bc *handshake.BufferedConn) error {
ctx := &vmselectRequestCtx{

View file

@ -4,6 +4,7 @@ sort: 15
# CHANGELOG
* FEATURE: vminsert: add support for data ingestion via other `vminsert` nodes. This allows building multi-level data ingestion paths in VictoriaMetrics cluster by writing data from one level of `vminsert` nodes to another level of `vminsert` nodes. See [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multi-level-cluster-setup) for details.
* FEATURE: vmalert: add flag to control behaviour on startup for state restore errors. Alerting rules now can return specific error type ErrStateRestore to indicate whether restore state procedure failed. Such errors were returned and logged before as well. But now user can specify whether to just log these errors (`-remoteRead.ignoreRestoreErrors=true`) or to stop the process (`-remoteRead.ignoreRestoreErrors=false`). The latter is important when VM isn't ready yet to serve queries from vmalert and it needs to wait. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1252).
* BUGFIX: vmagent: fix possible race when refreshing `role: endpoints` and `role: endpointslices` scrape targets in `kubernetes_sd_config`. Prevoiusly `pod` objects could be updated after the related `endpoints` object update. This could lead to missing scrape targets. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240).

View file

@ -334,6 +334,13 @@ If you need multi-AZ setup, then it is recommended running independed clusters i
[vmagent](https://docs.victoriametrics.com/vmagent.html) in front of these clusters, so it could replicate incoming data
into all the cluster. Then [promxy](https://github.com/jacksontj/promxy) could be used for querying the data from multiple clusters.
Another solution is to use multi-level cluster setup, where the top level of `vminsert` nodes [replicate](#replication-and-data-safety) data among the lower level of `vminsert` nodes located at different availability zones. These `vminsert` nodes then spread the data among `vmstorage` nodes in each AZ. See [these docs](#multi-level-cluster-setup) for more details.
## Multi-level cluster setup
`vminsert` nodes can accept data from another `vminsert` nodes starting from [v1.60.0](https://docs.victoriametrics.com/CHANGELOG.html#v1600) if `-clusternativeListenAddr` command-line flag is set. For example, if `vminsert` is started with `-clusternativeListenAddr=:8400` command-line flag, then it can accept data from another `vminsert` nodes at TCP port 8400 in the same way as `vmstorage` nodes do. This allows chaining `vminsert` nodes and building multi-level cluster topologies with flexible configs. For example, the top level of `vminsert` nodes can replicate data among the second level of `vminsert` nodes located in distinct availability zones (AZ), while the second-level `vminsert` nodes can spread the data among `vmstorage` nodes located in the same AZ. Such setup guarantees cluster availability if some AZ becomes unavailable. The data from all the `vmstorage` nodes in all the AZs can be read via `vmselect` nodes, which are configured to query all the `vmstorage` nodes in all the availability zones (e.g. all the `vmstorage` addresses are passed via `-storageNode` command-line flag to `vmselect` nodes). Additionally, `-replicationFactor=k+1` must be passed to `vmselect` nodes, where `k` is the lowest number of `vmstorage` nodes in a single AZ. See [replication docs](#replication-and-data-safety) for more details.
## Helm
@ -359,7 +366,7 @@ so up to 2 `vmstorage` nodes can be lost without data loss. The minimum number o
the remaining 3 `vmstorage` nodes could provide the `-replicationFactor=3` for newly ingested data.
When the replication is enabled, `-replicationFactor=N` and `-dedup.minScrapeInterval=1ms` command-line flag must be passed to `vmselect` nodes.
The `-replicationFactor=N` improves query performance when a part of vmstorage nodes respond slowly and/or temporarily unavailable. Sometimes `-replicationFactor` at `vmselect` nodes shouldn't be set. See [this issues](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1207) for details.
The `-replicationFactor=N` improves query performance when up to `N-1` vmstorage nodes respond slowly and/or temporarily unavailable. Sometimes `-replicationFactor` at `vmselect` nodes can result in partial responses. See [this issues](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1207) for details.
The `-dedup.minScrapeInterval=1ms` de-duplicates replicated data during queries. It is OK if `-dedup.minScrapeInterval` exceeds 1ms
when [deduplication](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#deduplication) is used additionally to replication.

View file

@ -0,0 +1,88 @@
package clusternative
import (
"errors"
"net"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/metrics"
)
var (
writeRequestsTCP = metrics.NewCounter(`vm_ingestserver_requests_total{type="clusternative", net="tcp"}`)
writeErrorsTCP = metrics.NewCounter(`vm_ingestserver_request_errors_total{type="clusternative", net="tcp"}`)
)
// Server accepts data from vminsert over TCP in the same way as vmstorage does.
type Server struct {
addr string
lnTCP net.Listener
wg sync.WaitGroup
}
// MustStart starts clusternative server on the given addr.
//
// The incoming connections are processed with insertHandler.
//
// MustStop must be called on the returned server when it is no longer needed.
func MustStart(addr string, insertHandler func(c net.Conn) error) *Server {
logger.Infof("starting TCP clusternative server at %q", addr)
lnTCP, err := netutil.NewTCPListener("clusternative", addr)
if err != nil {
logger.Fatalf("cannot start TCP clusternative server at %q: %s", addr, err)
}
s := &Server{
addr: addr,
lnTCP: lnTCP,
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
serveTCP(lnTCP, insertHandler)
logger.Infof("stopped TCP clusternative server at %q", addr)
}()
return s
}
// MustStop stops the server.
func (s *Server) MustStop() {
logger.Infof("stopping TCP clusternative server at %q...", s.addr)
if err := s.lnTCP.Close(); err != nil {
logger.Errorf("cannot close TCP clusternative server: %s", err)
}
s.wg.Wait()
logger.Infof("TCP clusternative server at %q has been stopped", s.addr)
}
func serveTCP(ln net.Listener, insertHandler func(c net.Conn) error) {
for {
c, err := ln.Accept()
if err != nil {
var ne net.Error
if errors.As(err, &ne) {
if ne.Temporary() {
logger.Errorf("clusternative: temporary error when listening for TCP addr %q: %s", ln.Addr(), err)
time.Sleep(time.Second)
continue
}
if strings.Contains(err.Error(), "use of closed network connection") {
break
}
logger.Fatalf("unrecoverable error when accepting TCP clusternative connections: %s", err)
}
logger.Fatalf("unexpected error when accepting TCP clusternative connections: %s", err)
}
go func() {
writeRequestsTCP.Inc()
if err := insertHandler(c); err != nil {
writeErrorsTCP.Inc()
logger.Errorf("error in TCP clusternative conn %q<->%q: %s", c.LocalAddr(), c.RemoteAddr(), err)
}
_ = c.Close()
}()
}
}

View file

@ -0,0 +1,186 @@
package clusternative
import (
"fmt"
"io"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/consts"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
)
// ParseStream parses data sent from vminsert to bc and calls callback for parsed rows.
//
// The callback can be called concurrently multiple times for streamed data from req.
//
// callback shouldn't hold block after returning.
func ParseStream(bc *handshake.BufferedConn, callback func(rows []storage.MetricRow) error) error {
var wg sync.WaitGroup
var (
callbackErrLock sync.Mutex
callbackErr error
)
for {
uw := getUnmarshalWork()
uw.callback = func(rows []storage.MetricRow) {
if err := callback(rows); err != nil {
processErrors.Inc()
callbackErrLock.Lock()
if callbackErr == nil {
callbackErr = fmt.Errorf("error when processing native block: %w", err)
}
callbackErrLock.Unlock()
}
}
uw.wg = &wg
var err error
uw.reqBuf, err = readBlock(uw.reqBuf[:0], bc)
if err != nil {
wg.Wait()
if err == io.EOF {
// Remote end gracefully closed the connection.
putUnmarshalWork(uw)
return nil
}
return fmt.Errorf("cannot read packet size: %w", err)
}
blocksRead.Inc()
wg.Add(1)
common.ScheduleUnmarshalWork(uw)
}
}
// readBlock reads the next data block from vminsert-initiated bc, appends it to dst and returns the result.
func readBlock(dst []byte, bc *handshake.BufferedConn) ([]byte, error) {
sizeBuf := sizeBufPool.Get()
defer sizeBufPool.Put(sizeBuf)
sizeBuf.B = bytesutil.Resize(sizeBuf.B, 8)
if _, err := io.ReadFull(bc, sizeBuf.B); err != nil {
if err != io.EOF {
readErrors.Inc()
err = fmt.Errorf("cannot read packet size: %w", err)
}
return dst, err
}
packetSize := encoding.UnmarshalUint64(sizeBuf.B)
if packetSize > consts.MaxInsertPacketSize {
parseErrors.Inc()
return dst, fmt.Errorf("too big packet size: %d; shouldn't exceed %d", packetSize, consts.MaxInsertPacketSize)
}
dstLen := len(dst)
dst = bytesutil.Resize(dst, dstLen+int(packetSize))
if n, err := io.ReadFull(bc, dst[dstLen:]); err != nil {
readErrors.Inc()
return dst, fmt.Errorf("cannot read packet with size %d bytes: %w; read only %d bytes", packetSize, err, n)
}
// Send `ack` to vminsert that the packet has been received.
deadline := time.Now().Add(5 * time.Second)
if err := bc.SetWriteDeadline(deadline); err != nil {
writeErrors.Inc()
return dst, fmt.Errorf("cannot set write deadline for sending `ack` to vminsert: %w", err)
}
sizeBuf.B[0] = 1
if _, err := bc.Write(sizeBuf.B[:1]); err != nil {
writeErrors.Inc()
return dst, fmt.Errorf("cannot send `ack` to vminsert: %w", err)
}
if err := bc.Flush(); err != nil {
writeErrors.Inc()
return dst, fmt.Errorf("cannot flush `ack` to vminsert: %w", err)
}
return dst, nil
}
var sizeBufPool bytesutil.ByteBufferPool
var (
readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="clusternative"}`)
writeErrors = metrics.NewCounter(`vm_protoparser_write_errors_total{type="clusternative"}`)
rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="clusternative"}`)
blocksRead = metrics.NewCounter(`vm_protoparser_blocks_read_total{type="clusternative"}`)
parseErrors = metrics.NewCounter(`vm_protoparser_parse_errors_total{type="clusternative"}`)
processErrors = metrics.NewCounter(`vm_protoparser_process_errors_total{type="clusternative"}`)
)
type unmarshalWork struct {
wg *sync.WaitGroup
callback func(rows []storage.MetricRow)
reqBuf []byte
mrs []storage.MetricRow
lastResetTime uint64
}
func (uw *unmarshalWork) reset() {
if len(uw.reqBuf)*4 < cap(uw.reqBuf) && fasttime.UnixTimestamp()-uw.lastResetTime > 10 {
// Periodically reset reqBuf and mrs in order to prevent from gradual memory usage growth
// when ceratin entries in mr contain too long labels.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/490 for details.
uw.reqBuf = nil
uw.mrs = nil
uw.lastResetTime = fasttime.UnixTimestamp()
}
uw.wg = nil
uw.callback = nil
uw.reqBuf = uw.reqBuf[:0]
mrs := uw.mrs
for i := range mrs {
mrs[i].ResetX()
}
uw.mrs = uw.mrs[:0]
}
// Unmarshal implements common.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
defer uw.wg.Done()
if err := uw.unmarshal(); err != nil {
parseErrors.Inc()
logger.Errorf("error when unmarshaling clusternative block: %s", err)
putUnmarshalWork(uw)
return
}
mrs := uw.mrs
for len(mrs) > maxRowsPerCallback {
// Limit the number of rows passed to callback in order to reduce memory usage
// when processing big packets of rows.
uw.callback(mrs[:maxRowsPerCallback])
mrs = mrs[maxRowsPerCallback:]
}
uw.callback(mrs)
putUnmarshalWork(uw)
}
const maxRowsPerCallback = 10000
func (uw *unmarshalWork) unmarshal() error {
var err error
uw.mrs, err = storage.UnmarshalMetricRows(uw.mrs[:0], uw.reqBuf)
if err != nil {
return fmt.Errorf("cannot unmarshal MetricRow from clusternative block: %s", err)
}
rowsRead.Add(len(uw.mrs))
return nil
}
func getUnmarshalWork() *unmarshalWork {
v := unmarshalWorkPool.Get()
if v == nil {
return &unmarshalWork{}
}
return v.(*unmarshalWork)
}
func putUnmarshalWork(uw *unmarshalWork) {
uw.reset()
unmarshalWorkPool.Put(uw)
}
var unmarshalWorkPool sync.Pool

View file

@ -26,7 +26,7 @@ func StartUnmarshalWorkers() {
logger.Panicf("BUG: it looks like startUnmarshalWorkers() has been alread called without stopUnmarshalWorkers()")
}
gomaxprocs := cgroup.AvailableCPUs()
unmarshalWorkCh = make(chan UnmarshalWork, 2*gomaxprocs)
unmarshalWorkCh = make(chan UnmarshalWork, gomaxprocs)
unmarshalWorkersWG.Add(gomaxprocs)
for i := 0; i < gomaxprocs; i++ {
go func() {

View file

@ -504,7 +504,7 @@ func SetMaxLabelsPerTimeseries(maxLabels int) {
// MarshalMetricNameRaw marshals labels to dst and returns the result.
//
// The result must be unmarshaled with MetricName.unmarshalRaw
// The result must be unmarshaled with MetricName.UnmarshalRaw
func MarshalMetricNameRaw(dst []byte, accountID, projectID uint32, labels []prompb.Label) []byte {
// Calculate the required space for dst.
dstLen := len(dst)
@ -615,7 +615,7 @@ func MarshalMetricLabelRaw(dst []byte, label *prompb.Label) []byte {
// marshalRaw marshals mn to dst and returns the result.
//
// The results may be unmarshaled with MetricName.unmarshalRaw.
// The results may be unmarshaled with MetricName.UnmarshalRaw.
//
// This function is for testing purposes. MarshalMetricNameRaw must be used
// in prod instead.
@ -634,8 +634,8 @@ func (mn *MetricName) marshalRaw(dst []byte) []byte {
return dst
}
// unmarshalRaw unmarshals mn encoded with MarshalMetricNameRaw.
func (mn *MetricName) unmarshalRaw(src []byte) error {
// UnmarshalRaw unmarshals mn encoded with MarshalMetricNameRaw.
func (mn *MetricName) UnmarshalRaw(src []byte) error {
mn.Reset()
if len(src) < 4 {
return fmt.Errorf("not enough data for decoding accountID; got %d bytes; %X; want at least 4 bytes", len(src), src)

View file

@ -150,7 +150,7 @@ func TestMetricNameMarshalUnmarshalRaw(t *testing.T) {
}
data := mn.marshalRaw(nil)
var mn1 MetricName
if err := mn1.unmarshalRaw(data); err != nil {
if err := mn1.UnmarshalRaw(data); err != nil {
t.Fatalf("cannot unmarshal mn %s: %s", &mn, err)
}
if !reflect.DeepEqual(&mn, &mn1) {
@ -159,13 +159,13 @@ func TestMetricNameMarshalUnmarshalRaw(t *testing.T) {
// Try unmarshaling MetricName without tag value.
brokenData := marshalTagValue(data, []byte("foobar"))
if err := mn1.unmarshalRaw(brokenData); err == nil {
if err := mn1.UnmarshalRaw(brokenData); err == nil {
t.Fatalf("expecting non-zero error when unmarshaling MetricName without tag value")
}
// Try unmarshaling MetricName with invalid tag key.
brokenData[len(brokenData)-1] = 123
if err := mn1.unmarshalRaw(brokenData); err == nil {
if err := mn1.UnmarshalRaw(brokenData); err == nil {
t.Fatalf("expecting non-zero error when unmarshaling MetricName with invalid tag key")
}
@ -173,7 +173,7 @@ func TestMetricNameMarshalUnmarshalRaw(t *testing.T) {
brokenData = marshalTagValue(data, []byte("foobar"))
brokenData = marshalTagValue(brokenData, []byte("aaa"))
brokenData[len(brokenData)-1] = 123
if err := mn1.unmarshalRaw(brokenData); err == nil {
if err := mn1.UnmarshalRaw(brokenData); err == nil {
t.Fatalf("expecting non-zero error when unmarshaling MetricName with invalid tag value")
}
}

View file

@ -194,7 +194,7 @@ func testSearchInternal(st *Storage, tr TimeRange, mrs []MetricRow, accountsCoun
if mr.Timestamp < tr.MinTimestamp || mr.Timestamp > tr.MaxTimestamp {
continue
}
if err := mn.unmarshalRaw(mr.MetricNameRaw); err != nil {
if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil {
return fmt.Errorf("cannot unmarshal MetricName: %w", err)
}
if !metricGroupRegexp.Match(mn.MetricGroup) {

View file

@ -1327,13 +1327,20 @@ func (s *Storage) GetTSDBStatusForDate(accountID, projectID uint32, date uint64,
// MetricRow is a metric to insert into storage.
type MetricRow struct {
// MetricNameRaw contains raw metric name, which must be decoded
// with MetricName.unmarshalRaw.
// with MetricName.UnmarshalRaw.
MetricNameRaw []byte
Timestamp int64
Value float64
}
// ResetX resets mr after UnmarshalX or after UnmarshalMetricRows
func (mr *MetricRow) ResetX() {
mr.MetricNameRaw = nil
mr.Timestamp = 0
mr.Value = 0
}
// CopyFrom copies src to mr.
func (mr *MetricRow) CopyFrom(src *MetricRow) {
mr.MetricNameRaw = append(mr.MetricNameRaw[:0], src.MetricNameRaw...)
@ -1345,7 +1352,7 @@ func (mr *MetricRow) CopyFrom(src *MetricRow) {
func (mr *MetricRow) String() string {
metricName := string(mr.MetricNameRaw)
var mn MetricName
if err := mn.unmarshalRaw(mr.MetricNameRaw); err == nil {
if err := mn.UnmarshalRaw(mr.MetricNameRaw); err == nil {
metricName = mn.String()
}
return fmt.Sprintf("%s (Timestamp=%d, Value=%f)", metricName, mr.Timestamp, mr.Value)
@ -1364,13 +1371,35 @@ func MarshalMetricRow(dst []byte, metricNameRaw []byte, timestamp int64, value f
return dst
}
// Unmarshal unmarshals mr from src and returns the remaining tail from src.
func (mr *MetricRow) Unmarshal(src []byte) ([]byte, error) {
// UnmarshalMetricRows appends unmarshaled MetricRow items from src to dst and returns the result.
//
// The returned MetricRow items refer to src, so they become invalid as soon as src changes.
func UnmarshalMetricRows(dst []MetricRow, src []byte) ([]MetricRow, error) {
for len(src) > 0 {
if len(dst) < cap(dst) {
dst = dst[:len(dst)+1]
} else {
dst = append(dst, MetricRow{})
}
mr := &dst[len(dst)-1]
tail, err := mr.UnmarshalX(src)
if err != nil {
return dst, err
}
src = tail
}
return dst, nil
}
// UnmarshalX unmarshals mr from src and returns the remaining tail from src.
//
// mr refers to src, so it remains valid until src changes.
func (mr *MetricRow) UnmarshalX(src []byte) ([]byte, error) {
tail, metricNameRaw, err := encoding.UnmarshalBytes(src)
if err != nil {
return tail, fmt.Errorf("cannot unmarshal MetricName: %w", err)
}
mr.MetricNameRaw = append(mr.MetricNameRaw[:0], metricNameRaw...)
mr.MetricNameRaw = metricNameRaw
if len(tail) < 8 {
return tail, fmt.Errorf("cannot unmarshal Timestamp: want %d bytes; have %d bytes", 8, len(tail))
@ -1472,7 +1501,7 @@ func (s *Storage) RegisterMetricNames(mrs []MetricRow) error {
}
// Slow path - register mr.MetricNameRaw.
if err := mn.unmarshalRaw(mr.MetricNameRaw); err != nil {
if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil {
return fmt.Errorf("cannot register the metric because cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err)
}
mn.sortTags()
@ -1658,7 +1687,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
func getUserReadableMetricName(metricNameRaw []byte) string {
var mn MetricName
if err := mn.unmarshalRaw(metricNameRaw); err != nil {
if err := mn.UnmarshalRaw(metricNameRaw); err != nil {
return fmt.Sprintf("cannot unmarshal metricNameRaw %q: %s", metricNameRaw, err)
}
return mn.String()
@ -1694,7 +1723,7 @@ func (pmrs *pendingMetricRows) addRow(mr *MetricRow) error {
// Do not spend CPU time on re-calculating canonical metricName during bulk import
// of many rows for the same metric.
if string(mr.MetricNameRaw) != string(pmrs.lastMetricNameRaw) {
if err := pmrs.mn.unmarshalRaw(mr.MetricNameRaw); err != nil {
if err := pmrs.mn.UnmarshalRaw(mr.MetricNameRaw); err != nil {
return fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err)
}
pmrs.mn.sortTags()

View file

@ -351,7 +351,7 @@ func TestMetricRowMarshalUnmarshal(t *testing.T) {
buf = mr1.Marshal(buf[:0])
var mr2 MetricRow
tail, err := mr2.Unmarshal(buf)
tail, err := mr2.UnmarshalX(buf)
if err != nil {
t.Fatalf("cannot unmarshal mr1=%s: %s", mr1, err)
}