mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
128 lines
4.2 KiB
Go
128 lines
4.2 KiB
Go
|
package remotewrite
|
||
|
|
||
|
import (
|
||
|
"flag"
|
||
|
"fmt"
|
||
|
"sync/atomic"
|
||
|
|
||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
|
||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||
|
"github.com/VictoriaMetrics/metrics"
|
||
|
xxhash "github.com/cespare/xxhash/v2"
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
remoteWriteURLs = flagutil.NewArray("remoteWrite.url", "Remote storage URL to write data to. It must support Prometheus remote_write API. "+
|
||
|
"It is recommended using VictoriaMetrics as remote storage. Example url: http://<victoriametrics-host>:8428/api/v1/write . "+
|
||
|
"Pass multiple -remoteWrite.url flags in order to write data concurrently to multiple remote storage systems")
|
||
|
tmpDataPath = flag.String("remoteWrite.tmpDataPath", "vmagent-remotewrite-data", "Path to directory where temporary data for remote write component is stored")
|
||
|
queues = flag.Int("remoteWrite.queues", 1, "The number of concurrent queues to each -remoteWrite.url. Set more queues if a single queue "+
|
||
|
"isn't enough for sending high volume of collected data to remote storage")
|
||
|
showRemoteWriteURL = flag.Bool("remoteWrite.showURL", false, "Whether to show -remoteWrite.url in the exported metrics. "+
|
||
|
"It is hidden by default, since it can contain sensistive auth info")
|
||
|
)
|
||
|
|
||
|
// Init initializes remotewrite.
|
||
|
//
|
||
|
// It must be called after flag.Parse().
|
||
|
//
|
||
|
// Stop must be called for graceful shutdown.
|
||
|
func Init() {
|
||
|
if len(*remoteWriteURLs) == 0 {
|
||
|
logger.Panicf("FATAL: at least one `-remoteWrite.url` must be set")
|
||
|
}
|
||
|
|
||
|
if !*showRemoteWriteURL {
|
||
|
// remoteWrite.url can contain authentication codes, so hide it at `/metrics` output.
|
||
|
httpserver.RegisterSecretFlag("remoteWrite.url")
|
||
|
}
|
||
|
initRelabel()
|
||
|
|
||
|
maxInmemoryBlocks := memory.Allowed() / len(*remoteWriteURLs) / maxRowsPerBlock / 100
|
||
|
if maxInmemoryBlocks > 200 {
|
||
|
// There is no much sense in keeping higher number of blocks in memory,
|
||
|
// since this means that the producer outperforms consumer and the queue
|
||
|
// will continue growing. It is better storing the queue to file.
|
||
|
maxInmemoryBlocks = 200
|
||
|
}
|
||
|
if maxInmemoryBlocks < 2 {
|
||
|
maxInmemoryBlocks = 2
|
||
|
}
|
||
|
for i, remoteWriteURL := range *remoteWriteURLs {
|
||
|
h := xxhash.Sum64([]byte(remoteWriteURL))
|
||
|
path := fmt.Sprintf("%s/persistent-queue/%016X", *tmpDataPath, h)
|
||
|
fq := persistentqueue.MustOpenFastQueue(path, remoteWriteURL, maxInmemoryBlocks)
|
||
|
urlLabelValue := fmt.Sprintf("secret-url-%d", i+1)
|
||
|
if *showRemoteWriteURL {
|
||
|
urlLabelValue = remoteWriteURL
|
||
|
}
|
||
|
_ = metrics.NewGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{url=%q, hash="%016X"}`, urlLabelValue, h), func() float64 {
|
||
|
return float64(fq.GetPendingBytes())
|
||
|
})
|
||
|
_ = metrics.NewGauge(fmt.Sprintf(`vmagent_remotewrite_pending_inmemory_blocks{url=%q}`, urlLabelValue), func() float64 {
|
||
|
return float64(fq.GetInmemoryQueueLen())
|
||
|
})
|
||
|
c := newClient(remoteWriteURL, urlLabelValue, fq)
|
||
|
fqs = append(fqs, fq)
|
||
|
cs = append(cs, c)
|
||
|
}
|
||
|
|
||
|
pss = make([]*pendingSeries, *queues)
|
||
|
for i := range pss {
|
||
|
pss[i] = newPendingSeries(pushBlockToPersistentQueues)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Stop stops remotewrite.
|
||
|
//
|
||
|
// It is expected that nobody calls Push during and after the call to this func.
|
||
|
func Stop() {
|
||
|
for _, ps := range pss {
|
||
|
ps.MustStop()
|
||
|
}
|
||
|
|
||
|
// Close all the persistent queues. This should unblock clients waiting in MustReadBlock.
|
||
|
for _, fq := range fqs {
|
||
|
fq.MustClose()
|
||
|
}
|
||
|
fqs = nil
|
||
|
|
||
|
// Stop all the clients
|
||
|
for _, c := range cs {
|
||
|
c.MustStop()
|
||
|
}
|
||
|
cs = nil
|
||
|
|
||
|
resetRelabel()
|
||
|
}
|
||
|
|
||
|
// Push sends wr to remote storage systems set via `-remoteWrite.url`.
|
||
|
//
|
||
|
// Each timeseries in wr.Timeseries must contain one sample.
|
||
|
func Push(wr *prompbmarshal.WriteRequest) {
|
||
|
rctx := relabelCtxPool.Get().(*relabelCtx)
|
||
|
rctx.applyRelabeling(wr)
|
||
|
|
||
|
idx := atomic.AddUint64(&pssNextIdx, 1) % uint64(len(pss))
|
||
|
pss[idx].Push(wr.Timeseries)
|
||
|
|
||
|
rctx.reset()
|
||
|
relabelCtxPool.Put(rctx)
|
||
|
}
|
||
|
|
||
|
func pushBlockToPersistentQueues(block []byte) {
|
||
|
for _, fq := range fqs {
|
||
|
fq.MustWriteBlock(block)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
var fqs []*persistentqueue.FastQueue
|
||
|
var cs []*client
|
||
|
|
||
|
var pssNextIdx uint64
|
||
|
var pss []*pendingSeries
|