2020-02-23 11:35:47 +00:00
package remotewrite
import (
"flag"
"fmt"
2023-11-24 12:42:11 +00:00
"net/http"
2021-09-28 21:52:07 +00:00
"net/url"
2023-03-28 01:15:28 +00:00
"path/filepath"
2021-05-20 10:13:40 +00:00
"strconv"
2020-05-30 11:36:40 +00:00
"sync"
2020-02-23 11:35:47 +00:00
"sync/atomic"
2021-05-20 10:13:40 +00:00
"time"
2020-02-23 11:35:47 +00:00
2023-11-24 12:42:11 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
2021-08-05 06:46:19 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
2021-05-20 10:13:40 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2020-12-08 18:49:32 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
2022-11-21 22:38:43 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
2020-02-23 11:35:47 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
2023-03-28 01:15:28 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
2020-02-23 11:35:47 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
2020-05-30 11:36:40 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
2020-02-23 11:35:47 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
2021-05-19 23:12:36 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
2023-11-01 22:05:11 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
2024-03-30 04:38:29 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/ratelimiter"
2023-01-04 06:19:18 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
2023-08-11 12:38:28 +00:00
"github.com/VictoriaMetrics/metrics"
2023-08-11 13:23:00 +00:00
"github.com/cespare/xxhash/v2"
2020-02-23 11:35:47 +00:00
)
var (
2023-02-24 01:36:52 +00:00
remoteWriteURLs = flagutil . NewArrayString ( "remoteWrite.url" , "Remote storage URL to write data to. It must support either VictoriaMetrics remote write protocol " +
"or Prometheus remote_write protocol. Example url: http://<victoriametrics-host>:8428/api/v1/write . " +
2023-07-25 01:15:24 +00:00
"Pass multiple -remoteWrite.url options in order to replicate the collected data to multiple remote storage systems. " +
2023-12-04 23:20:44 +00:00
"The data can be sharded among the configured remote storage systems if -remoteWrite.shardByURL flag is set" )
enableMultitenantHandlers = flag . Bool ( "enableMultitenantHandlers" , false , "Whether to process incoming data via multitenant insert handlers according to " +
2024-04-18 00:54:20 +00:00
"https://docs.victoriametrics.com/cluster-victoriametrics/#url-format . By default incoming data is processed via single-node insert handlers " +
2023-12-04 23:20:44 +00:00
"according to https://docs.victoriametrics.com/#how-to-import-time-series-data ." +
2024-04-17 23:31:37 +00:00
"See https://docs.victoriametrics.com/vmagent/#multitenancy for details" )
2024-04-19 09:25:41 +00:00
2023-07-25 01:15:24 +00:00
shardByURL = flag . Bool ( "remoteWrite.shardByURL" , false , "Whether to shard outgoing series across all the remote storage systems enumerated via -remoteWrite.url . " +
2024-04-19 09:25:41 +00:00
"By default the data is replicated across all the -remoteWrite.url . See https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages . " +
"See also -remoteWrite.shardByURLReplicas" )
shardByURLReplicas = flag . Int ( "remoteWrite.shardByURLReplicas" , 1 , "How many copies of data to make among remote storage systems enumerated via -remoteWrite.url " +
"when -remoteWrite.shardByURL is set. See https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages" )
2023-11-01 22:05:11 +00:00
shardByURLLabels = flagutil . NewArrayString ( "remoteWrite.shardByURL.labels" , "Optional list of labels, which must be used for sharding outgoing samples " +
"among remote storage systems if -remoteWrite.shardByURL command-line flag is set. By default all the labels are used for sharding in order to gain " +
2024-04-02 21:36:32 +00:00
"even distribution of series over the specified -remoteWrite.url systems. See also -remoteWrite.shardByURL.ignoreLabels" )
shardByURLIgnoreLabels = flagutil . NewArrayString ( "remoteWrite.shardByURL.ignoreLabels" , "Optional list of labels, which must be ignored when sharding outgoing samples " +
"among remote storage systems if -remoteWrite.shardByURL command-line flag is set. By default all the labels are used for sharding in order to gain " +
"even distribution of series over the specified -remoteWrite.url systems. See also -remoteWrite.shardByURL.labels" )
2024-04-19 09:25:41 +00:00
2024-04-02 21:36:32 +00:00
tmpDataPath = flag . String ( "remoteWrite.tmpDataPath" , "vmagent-remotewrite-data" , "Path to directory for storing pending data, which isn't sent to the configured -remoteWrite.url . " +
2023-11-25 09:31:30 +00:00
"See also -remoteWrite.maxDiskUsagePerURL and -remoteWrite.disableOnDiskQueue" )
2023-03-28 01:15:28 +00:00
keepDanglingQueues = flag . Bool ( "remoteWrite.keepDanglingQueues" , false , "Keep persistent queues contents at -remoteWrite.tmpDataPath in case there are no matching -remoteWrite.url. " +
"Useful when -remoteWrite.url is changed temporarily and persistent queue files will be needed later on." )
2021-06-17 10:26:35 +00:00
queues = flag . Int ( "remoteWrite.queues" , cgroup . AvailableCPUs ( ) * 2 , "The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues " +
2024-03-06 11:43:08 +00:00
"isn't enough for sending high volume of collected data to remote storage. " +
"Default value depends on the number of available CPU cores. It should work fine in most cases since it minimizes resource usage" )
2020-02-23 11:35:47 +00:00
showRemoteWriteURL = flag . Bool ( "remoteWrite.showURL" , false , "Whether to show -remoteWrite.url in the exported metrics. " +
2020-07-10 11:07:02 +00:00
"It is hidden by default, since it can contain sensitive info such as auth key" )
2023-08-12 11:17:55 +00:00
maxPendingBytesPerURL = flagutil . NewArrayBytes ( "remoteWrite.maxDiskUsagePerURL" , 0 , "The maximum file-based buffer size in bytes at -remoteWrite.tmpDataPath " +
2020-03-03 17:48:46 +00:00
"for each -remoteWrite.url. When buffer size reaches the configured maximum, then old data is dropped when adding new data to the buffer. " +
2023-04-26 10:23:01 +00:00
"Buffered data is stored in ~500MB chunks. It is recommended to set the value for this flag to a multiple of the block size 500MB. " +
2020-03-03 17:48:46 +00:00
"Disk usage is unlimited if the value is set to 0" )
2023-08-12 11:17:55 +00:00
significantFigures = flagutil . NewArrayInt ( "remoteWrite.significantFigures" , 0 , "The number of significant figures to leave in metric values before writing them " +
2021-02-01 12:27:05 +00:00
"to remote storage. See https://en.wikipedia.org/wiki/Significant_figures . Zero value saves all the significant figures. " +
"This option may be used for improving data compression for the stored metrics. See also -remoteWrite.roundDigits" )
2023-08-12 11:17:55 +00:00
roundDigits = flagutil . NewArrayInt ( "remoteWrite.roundDigits" , 100 , "Round metric values to this number of decimal digits after the point before " +
"writing them to remote storage. " +
2021-02-01 12:27:05 +00:00
"Examples: -remoteWrite.roundDigits=2 would round 1.236 to 1.24, while -remoteWrite.roundDigits=-1 would round 126.78 to 130. " +
2023-05-10 07:50:41 +00:00
"By default, digits rounding is disabled. Set it to 100 for disabling it for a particular remote storage. " +
2021-02-01 12:27:05 +00:00
"This option may be used for improving data compression for the stored metrics" )
2021-05-19 23:12:36 +00:00
sortLabels = flag . Bool ( "sortLabels" , false , ` Whether to sort labels for incoming samples before writing them to all the configured remote storage systems. ` +
` This may be needed for reducing memory usage at remote storage when the order of labels in incoming samples is random. ` +
` For example, if m { k1="v1",k2="v2"} may be sent as m { k2="v2",k1="v1"} ` +
` Enabled sorting for labels can slow down ingestion performance a bit ` )
2021-05-20 10:13:40 +00:00
maxHourlySeries = flag . Int ( "remoteWrite.maxHourlySeries" , 0 , "The maximum number of unique series vmagent can send to remote storage systems during the last hour. " +
2024-04-17 23:31:37 +00:00
"Excess series are logged and dropped. This can be useful for limiting series cardinality. See https://docs.victoriametrics.com/vmagent/#cardinality-limiter" )
2021-05-20 10:13:40 +00:00
maxDailySeries = flag . Int ( "remoteWrite.maxDailySeries" , 0 , "The maximum number of unique series vmagent can send to remote storage systems during the last 24 hours. " +
2024-04-17 23:31:37 +00:00
"Excess series are logged and dropped. This can be useful for limiting series churn rate. See https://docs.victoriametrics.com/vmagent/#cardinality-limiter" )
2024-03-30 04:38:29 +00:00
maxIngestionRate = flag . Int ( "maxIngestionRate" , 0 , "The maximum number of samples vmagent can receive per second. Data ingestion is paused when the limit is exceeded. " +
"By default there are no limits on samples ingestion rate. See also -remoteWrite.rateLimit" )
2023-04-01 04:27:45 +00:00
streamAggrConfig = flagutil . NewArrayString ( "remoteWrite.streamAggr.config" , "Optional path to file with stream aggregation config. " +
2024-04-18 00:19:11 +00:00
"See https://docs.victoriametrics.com/stream-aggregation/ . " +
2023-07-24 23:44:09 +00:00
"See also -remoteWrite.streamAggr.keepInput, -remoteWrite.streamAggr.dropInput and -remoteWrite.streamAggr.dedupInterval" )
streamAggrKeepInput = flagutil . NewArrayBool ( "remoteWrite.streamAggr.keepInput" , "Whether to keep all the input samples after the aggregation " +
"with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples " +
2024-04-18 00:19:11 +00:00
"are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation/" )
2023-07-24 23:44:09 +00:00
streamAggrDropInput = flagutil . NewArrayBool ( "remoteWrite.streamAggr.dropInput" , "Whether to drop all the input samples after the aggregation " +
"with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples " +
2024-04-18 00:19:11 +00:00
"are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation/" )
2024-03-04 22:45:22 +00:00
streamAggrDedupInterval = flagutil . NewArrayDuration ( "remoteWrite.streamAggr.dedupInterval" , 0 , "Input samples are de-duplicated with this interval before optional aggregation " +
2024-04-18 00:19:11 +00:00
"with -remoteWrite.streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation/#deduplication" )
2024-03-17 21:01:44 +00:00
streamAggrIgnoreOldSamples = flagutil . NewArrayBool ( "remoteWrite.streamAggr.ignoreOldSamples" , "Whether to ignore input samples with old timestamps outside the current aggregation interval " +
2024-04-18 00:19:11 +00:00
"for the corresponding -remoteWrite.streamAggr.config . See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples" )
2024-04-22 12:22:59 +00:00
streamAggrIgnoreFirstIntervals = flag . Int ( "remoteWrite.streamAggr.ignoreFirstIntervals" , 0 , "Number of aggregation intervals to skip after the start. Increase this value if you observe incorrect aggregation results after vmagent restarts. It could be caused by receiving unordered delayed data from clients pushing data into the vmagent. " +
"See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start" )
streamAggrDropInputLabels = flagutil . NewArrayString ( "streamAggr.dropInputLabels" , "An optional list of labels to drop from samples " +
2024-04-18 00:19:11 +00:00
"before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels" )
2024-03-05 00:13:21 +00:00
2024-05-10 10:09:21 +00:00
disableOnDiskQueue = flagutil . NewArrayBool ( "remoteWrite.disableOnDiskQueue" , "Whether to disable storing pending data to -remoteWrite.tmpDataPath " +
"when the configured remote storage systems cannot keep up with the data ingestion rate. See https://docs.victoriametrics.com/vmagent#disabling-on-disk-persistence ." +
2023-11-25 09:31:30 +00:00
"See also -remoteWrite.dropSamplesOnOverload" )
2024-05-10 10:09:21 +00:00
dropSamplesOnOverload = flagutil . NewArrayBool ( "remoteWrite.dropSamplesOnOverload" , "Whether to drop samples when -remoteWrite.disableOnDiskQueue is set and if the samples " +
"cannot be pushed into the configured remote storage systems in a timely manner. See https://docs.victoriametrics.com/vmagent#disabling-on-disk-persistence" )
2020-02-23 11:35:47 +00:00
)
2021-08-05 06:46:19 +00:00
var (
2024-05-13 13:22:37 +00:00
// rwctxs contains statically populated entries when -remoteWrite.url is specified.
rwctxs [ ] * remoteWriteCtx
2021-08-05 06:46:19 +00:00
2024-05-13 13:22:37 +00:00
// Data without tenant id is written to defaultAuthToken if -enableMultitenantHandlers is specified.
2021-08-05 06:46:19 +00:00
defaultAuthToken = & auth . Token { }
2023-11-24 12:42:11 +00:00
2023-11-25 09:31:30 +00:00
// ErrQueueFullHTTPRetry must be returned when TryPush() returns false.
2023-11-24 12:42:11 +00:00
ErrQueueFullHTTPRetry = & httpserver . ErrorWithStatusCode {
2023-11-25 09:31:30 +00:00
Err : fmt . Errorf ( "remote storage systems cannot keep up with the data ingestion rate; retry the request later " +
"or remove -remoteWrite.disableOnDiskQueue from vmagent command-line flags, so it could save pending data to -remoteWrite.tmpDataPath; " +
2024-04-17 23:31:37 +00:00
"see https://docs.victoriametrics.com/vmagent/#disabling-on-disk-persistence" ) ,
2023-11-24 12:42:11 +00:00
StatusCode : http . StatusTooManyRequests ,
}
2024-05-10 10:09:21 +00:00
// disableOnDiskQueueAll is set to true if all remoteWrite.urls were configured to disable persistent queue via disableOnDiskQueue
disableOnDiskQueueAll bool
2021-08-05 06:46:19 +00:00
)
2024-05-13 13:22:37 +00:00
// MultitenancyEnabled returns true if -enableMultitenantHandlers is specified.
2021-08-05 06:46:19 +00:00
func MultitenancyEnabled ( ) bool {
2024-05-13 13:22:37 +00:00
return * enableMultitenantHandlers
2021-08-05 06:46:19 +00:00
}
2020-03-03 11:08:17 +00:00
2020-05-30 11:36:40 +00:00
// Contains the current relabelConfigs.
2023-07-20 00:37:49 +00:00
var allRelabelConfigs atomic . Pointer [ relabelConfigs ]
2020-05-30 11:36:40 +00:00
2020-08-30 18:23:38 +00:00
// maxQueues limits the maximum value for `-remoteWrite.queues`. There is no sense in setting too high value,
// since it may lead to high memory usage due to big number of buffers.
2021-04-23 19:01:57 +00:00
var maxQueues = cgroup . AvailableCPUs ( ) * 16
2020-08-30 18:23:38 +00:00
2023-03-28 01:33:05 +00:00
const persistentQueueDirname = "persistent-queue"
2023-03-28 01:15:28 +00:00
2020-09-29 16:48:53 +00:00
// InitSecretFlags must be called after flag.Parse and before any logging.
func InitSecretFlags ( ) {
if ! * showRemoteWriteURL {
// remoteWrite.url can contain authentication codes, so hide it at `/metrics` output.
flagutil . RegisterSecretFlag ( "remoteWrite.url" )
}
}
2024-04-02 21:36:32 +00:00
var (
shardByURLLabelsMap map [ string ] struct { }
shardByURLIgnoreLabelsMap map [ string ] struct { }
)
2023-11-01 22:05:11 +00:00
2020-02-23 11:35:47 +00:00
// Init initializes remotewrite.
//
// It must be called after flag.Parse().
//
// Stop must be called for graceful shutdown.
2021-08-05 06:46:19 +00:00
func Init ( ) {
2024-05-13 13:22:37 +00:00
if len ( * remoteWriteURLs ) == 0 {
logger . Fatalf ( "at least one `-remoteWrite.url` command-line flag must be set" )
2021-08-05 06:44:29 +00:00
}
2021-05-20 10:13:40 +00:00
if * maxHourlySeries > 0 {
hourlySeriesLimiter = bloomfilter . NewLimiter ( * maxHourlySeries , time . Hour )
2021-05-20 12:27:06 +00:00
_ = metrics . NewGauge ( ` vmagent_hourly_series_limit_max_series ` , func ( ) float64 {
return float64 ( hourlySeriesLimiter . MaxItems ( ) )
} )
_ = metrics . NewGauge ( ` vmagent_hourly_series_limit_current_series ` , func ( ) float64 {
return float64 ( hourlySeriesLimiter . CurrentItems ( ) )
} )
2021-05-20 10:13:40 +00:00
}
if * maxDailySeries > 0 {
dailySeriesLimiter = bloomfilter . NewLimiter ( * maxDailySeries , 24 * time . Hour )
2021-05-20 12:27:06 +00:00
_ = metrics . NewGauge ( ` vmagent_daily_series_limit_max_series ` , func ( ) float64 {
return float64 ( dailySeriesLimiter . MaxItems ( ) )
} )
_ = metrics . NewGauge ( ` vmagent_daily_series_limit_current_series ` , func ( ) float64 {
return float64 ( dailySeriesLimiter . CurrentItems ( ) )
} )
2021-05-20 10:13:40 +00:00
}
2024-03-30 04:38:29 +00:00
2020-08-30 18:23:38 +00:00
if * queues > maxQueues {
* queues = maxQueues
}
if * queues <= 0 {
* queues = 1
2020-02-23 11:35:47 +00:00
}
2024-04-02 21:36:32 +00:00
if len ( * shardByURLLabels ) > 0 && len ( * shardByURLIgnoreLabels ) > 0 {
logger . Fatalf ( "-remoteWrite.shardByURL.labels and -remoteWrite.shardByURL.ignoreLabels cannot be set simultaneously; " +
"see https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages" )
2023-11-01 22:05:11 +00:00
}
2024-04-02 21:36:32 +00:00
shardByURLLabelsMap = newMapFromStrings ( * shardByURLLabels )
shardByURLIgnoreLabelsMap = newMapFromStrings ( * shardByURLIgnoreLabels )
2020-05-30 11:36:40 +00:00
initLabelsGlobal ( )
2021-05-21 13:34:03 +00:00
// Register SIGHUP handler for config reload before loadRelabelConfigs.
// This guarantees that the config will be re-read if the signal arrives just after loadRelabelConfig.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
sighupCh := procutil . NewSighupChan ( )
2020-05-30 11:36:40 +00:00
rcs , err := loadRelabelConfigs ( )
if err != nil {
logger . Fatalf ( "cannot load relabel configs: %s" , err )
}
allRelabelConfigs . Store ( rcs )
2023-03-29 16:05:58 +00:00
relabelConfigSuccess . Set ( 1 )
relabelConfigTimestamp . Set ( fasttime . UnixTimestamp ( ) )
2021-08-05 06:46:19 +00:00
if len ( * remoteWriteURLs ) > 0 {
2024-05-13 13:22:37 +00:00
rwctxs = newRemoteWriteCtxs ( nil , * remoteWriteURLs )
2021-08-05 06:44:29 +00:00
}
2024-05-10 10:09:21 +00:00
disableOnDiskQueueAll = true
for _ , v := range * disableOnDiskQueue {
if ! v {
disableOnDiskQueueAll = false
break
}
}
2023-11-23 18:39:40 +00:00
dropDanglingQueues ( )
2021-08-05 06:44:29 +00:00
2020-05-30 11:36:40 +00:00
// Start config reloader.
configReloaderWG . Add ( 1 )
go func ( ) {
defer configReloaderWG . Done ( )
for {
select {
case <- sighupCh :
2023-04-01 04:27:45 +00:00
case <- configReloaderStopCh :
2020-05-30 11:36:40 +00:00
return
}
2023-04-01 04:27:45 +00:00
reloadRelabelConfigs ( )
reloadStreamAggrConfigs ( )
2020-05-30 11:36:40 +00:00
}
} ( )
2020-02-23 11:35:47 +00:00
}
2023-11-23 18:39:40 +00:00
func dropDanglingQueues ( ) {
if * keepDanglingQueues {
return
}
// Remove dangling persistent queues, if any.
// This is required for the case when the number of queues has been changed or URL have been changed.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014
//
2024-04-23 12:49:45 +00:00
// In case if there were many persistent queues with identical *remoteWriteURLs
// the queue with the last index will be dropped.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6140
2024-05-13 13:22:37 +00:00
existingQueues := make ( map [ string ] struct { } , len ( rwctxs ) )
for _ , rwctx := range rwctxs {
2023-11-23 18:39:40 +00:00
existingQueues [ rwctx . fq . Dirname ( ) ] = struct { } { }
}
queuesDir := filepath . Join ( * tmpDataPath , persistentQueueDirname )
files := fs . MustReadDir ( queuesDir )
removed := 0
for _ , f := range files {
dirname := f . Name ( )
if _ , ok := existingQueues [ dirname ] ; ! ok {
logger . Infof ( "removing dangling queue %q" , dirname )
fullPath := filepath . Join ( queuesDir , dirname )
fs . MustRemoveAll ( fullPath )
removed ++
}
}
if removed > 0 {
2024-05-13 13:22:37 +00:00
logger . Infof ( "removed %d dangling queues from %q, active queues: %d" , removed , * tmpDataPath , len ( rwctxs ) )
2023-11-23 18:39:40 +00:00
}
}
2023-04-01 04:27:45 +00:00
func reloadRelabelConfigs ( ) {
relabelConfigReloads . Inc ( )
logger . Infof ( "reloading relabel configs pointed by -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig" )
rcs , err := loadRelabelConfigs ( )
if err != nil {
relabelConfigReloadErrors . Inc ( )
relabelConfigSuccess . Set ( 0 )
logger . Errorf ( "cannot reload relabel configs; preserving the previous configs; error: %s" , err )
return
}
allRelabelConfigs . Store ( rcs )
relabelConfigSuccess . Set ( 1 )
relabelConfigTimestamp . Set ( fasttime . UnixTimestamp ( ) )
logger . Infof ( "successfully reloaded relabel configs" )
}
2022-11-21 22:38:43 +00:00
var (
2023-03-29 16:05:58 +00:00
relabelConfigReloads = metrics . NewCounter ( ` vmagent_relabel_config_reloads_total ` )
relabelConfigReloadErrors = metrics . NewCounter ( ` vmagent_relabel_config_reloads_errors_total ` )
2023-12-20 12:23:38 +00:00
relabelConfigSuccess = metrics . NewGauge ( ` vmagent_relabel_config_last_reload_successful ` , nil )
2023-03-29 16:05:58 +00:00
relabelConfigTimestamp = metrics . NewCounter ( ` vmagent_relabel_config_last_reload_success_timestamp_seconds ` )
2022-11-21 22:38:43 +00:00
)
2023-04-01 04:27:45 +00:00
func reloadStreamAggrConfigs ( ) {
for _ , rwctx := range rwctxs {
rwctx . reinitStreamAggr ( )
}
}
2021-08-05 06:46:19 +00:00
func newRemoteWriteCtxs ( at * auth . Token , urls [ ] string ) [ ] * remoteWriteCtx {
if len ( urls ) == 0 {
logger . Panicf ( "BUG: urls must be non-empty" )
}
2021-11-04 13:39:14 +00:00
maxInmemoryBlocks := memory . Allowed ( ) / len ( urls ) / * maxRowsPerBlock / 100
2021-11-05 13:14:49 +00:00
if maxInmemoryBlocks / * queues > 100 {
2021-08-05 06:46:19 +00:00
// There is no much sense in keeping higher number of blocks in memory,
// since this means that the producer outperforms consumer and the queue
// will continue growing. It is better storing the queue to file.
2021-11-05 13:14:49 +00:00
maxInmemoryBlocks = 100 * * queues
2021-08-05 06:46:19 +00:00
}
if maxInmemoryBlocks < 2 {
maxInmemoryBlocks = 2
}
rwctxs := make ( [ ] * remoteWriteCtx , len ( urls ) )
2021-09-28 21:52:07 +00:00
for i , remoteWriteURLRaw := range urls {
remoteWriteURL , err := url . Parse ( remoteWriteURLRaw )
if err != nil {
logger . Fatalf ( "invalid -remoteWrite.url=%q: %s" , remoteWriteURL , err )
}
2021-08-05 06:46:19 +00:00
sanitizedURL := fmt . Sprintf ( "%d:secret-url" , i + 1 )
if at != nil {
2024-04-18 00:54:20 +00:00
// Construct full remote_write url for the given tenant according to https://docs.victoriametrics.com/cluster-victoriametrics/#url-format
2021-09-28 21:52:07 +00:00
remoteWriteURL . Path = fmt . Sprintf ( "%s/insert/%d:%d/prometheus/api/v1/write" , remoteWriteURL . Path , at . AccountID , at . ProjectID )
2021-08-05 06:46:19 +00:00
sanitizedURL = fmt . Sprintf ( "%s:%d:%d" , sanitizedURL , at . AccountID , at . ProjectID )
}
if * showRemoteWriteURL {
sanitizedURL = fmt . Sprintf ( "%d:%s" , i + 1 , remoteWriteURL )
}
2023-09-01 07:34:16 +00:00
rwctxs [ i ] = newRemoteWriteCtx ( i , remoteWriteURL , maxInmemoryBlocks , sanitizedURL )
2021-08-05 06:46:19 +00:00
}
return rwctxs
}
2024-05-16 07:25:42 +00:00
var (
configReloaderStopCh = make ( chan struct { } )
configReloaderWG sync . WaitGroup
)
2020-05-30 11:36:40 +00:00
2024-03-30 04:38:29 +00:00
// StartIngestionRateLimiter starts ingestion rate limiter.
//
// Ingestion rate limiter must be started before Init() call.
//
// StopIngestionRateLimiter must be called before Stop() call in order to unblock all the callers
// to ingestion rate limiter. Otherwise deadlock may occur at Stop() call.
func StartIngestionRateLimiter ( ) {
if * maxIngestionRate <= 0 {
return
}
ingestionRateLimitReached := metrics . NewCounter ( ` vmagent_max_ingestion_rate_limit_reached_total ` )
ingestionRateLimiterStopCh = make ( chan struct { } )
ingestionRateLimiter = ratelimiter . New ( int64 ( * maxIngestionRate ) , ingestionRateLimitReached , ingestionRateLimiterStopCh )
}
// StopIngestionRateLimiter stops ingestion rate limiter.
func StopIngestionRateLimiter ( ) {
if ingestionRateLimiterStopCh == nil {
return
}
close ( ingestionRateLimiterStopCh )
ingestionRateLimiterStopCh = nil
}
var (
ingestionRateLimiter * ratelimiter . RateLimiter
ingestionRateLimiterStopCh chan struct { }
)
2020-02-23 11:35:47 +00:00
// Stop stops remotewrite.
//
2023-11-25 09:31:30 +00:00
// It is expected that nobody calls TryPush during and after the call to this func.
2020-02-23 11:35:47 +00:00
func Stop ( ) {
2023-04-01 04:27:45 +00:00
close ( configReloaderStopCh )
2020-05-30 11:36:40 +00:00
configReloaderWG . Wait ( )
2021-08-05 06:46:19 +00:00
2024-05-13 13:22:37 +00:00
for _ , rwctx := range rwctxs {
2021-08-05 06:46:19 +00:00
rwctx . MustStop ( )
}
2024-05-13 13:22:37 +00:00
rwctxs = nil
2021-09-01 11:14:37 +00:00
if sl := hourlySeriesLimiter ; sl != nil {
sl . MustStop ( )
}
if sl := dailySeriesLimiter ; sl != nil {
sl . MustStop ( )
}
2020-02-23 11:35:47 +00:00
}
2024-05-13 13:22:37 +00:00
// PushDropSamplesOnFailure pushes wr to the configured remote storage systems set via -remoteWrite.url
2023-11-25 12:42:37 +00:00
//
// PushDropSamplesOnFailure can modify wr contents.
func PushDropSamplesOnFailure ( at * auth . Token , wr * prompbmarshal . WriteRequest ) {
_ = tryPush ( at , wr , true )
}
2024-05-13 13:22:37 +00:00
// TryPush tries sending wr to the configured remote storage systems set via -remoteWrite.url
2021-08-05 06:46:19 +00:00
//
2023-11-25 09:31:30 +00:00
// TryPush can modify wr contents, so the caller must re-initialize wr before calling TryPush() after unsuccessful attempt.
// TryPush may send partial data from wr on unsuccessful attempt, so repeated call for the same wr may send the data multiple times.
//
// The caller must return ErrQueueFullHTTPRetry to the client, which sends wr, if TryPush returns false.
func TryPush ( at * auth . Token , wr * prompbmarshal . WriteRequest ) bool {
2024-05-10 10:09:21 +00:00
return tryPush ( at , wr , false )
2023-11-25 12:42:37 +00:00
}
2024-05-10 10:09:21 +00:00
func tryPush ( at * auth . Token , wr * prompbmarshal . WriteRequest , forceDropSamplesOnFailure bool ) bool {
2023-12-04 23:20:44 +00:00
tss := wr . Timeseries
if at == nil && MultitenancyEnabled ( ) {
// Write data to default tenant if at isn't set when multitenancy is enabled.
2021-08-05 06:46:19 +00:00
at = defaultAuthToken
2021-08-05 06:44:29 +00:00
}
2023-12-04 23:20:44 +00:00
var tenantRctx * relabelCtx
2024-05-13 13:22:37 +00:00
if at != nil {
2023-12-04 23:20:44 +00:00
// Convert at to (vm_account_id, vm_project_id) labels.
tenantRctx = getRelabelCtx ( )
defer putRelabelCtx ( tenantRctx )
2021-08-05 06:44:29 +00:00
}
2023-11-25 09:31:30 +00:00
rowsCount := getRowsCount ( tss )
2024-05-10 10:09:21 +00:00
// Quick check whether writes to configured remote storage systems are blocked.
// This allows saving CPU time spent on relabeling and block compression
// if some of remote storage systems cannot keep up with the data ingestion rate.
// this shortcut is only applicable if all remote writes have disableOnDiskQueue = true
if disableOnDiskQueueAll {
2023-11-25 09:31:30 +00:00
for _ , rwctx := range rwctxs {
if rwctx . fq . IsWriteBlocked ( ) {
2024-05-10 10:09:21 +00:00
rwctx . pushFailures . Inc ( )
if forceDropSamplesOnFailure || rwctx . dropSamplesOnOverload {
2023-11-25 09:31:30 +00:00
// Just drop samples
2024-05-10 10:09:21 +00:00
rwctx . rowsDroppedOnPushFailure . Add ( rowsCount )
continue
2023-11-25 09:31:30 +00:00
}
return false
}
2023-11-24 12:42:11 +00:00
}
}
2021-08-05 06:44:29 +00:00
2020-03-03 11:08:17 +00:00
var rctx * relabelCtx
2023-07-20 00:37:49 +00:00
rcs := allRelabelConfigs . Load ( )
2021-02-22 14:33:55 +00:00
pcsGlobal := rcs . global
2023-08-17 12:35:26 +00:00
if pcsGlobal . Len ( ) > 0 {
2020-03-03 11:08:17 +00:00
rctx = getRelabelCtx ( )
2023-12-04 23:20:44 +00:00
defer putRelabelCtx ( rctx )
2020-03-03 11:08:17 +00:00
}
2022-05-06 12:28:59 +00:00
globalRowsPushedBeforeRelabel . Add ( rowsCount )
2021-11-04 13:39:14 +00:00
maxSamplesPerBlock := * maxRowsPerBlock
// Allow up to 10x of labels per each block on average.
maxLabelsPerBlock := 10 * maxSamplesPerBlock
2023-11-25 09:31:30 +00:00
2020-02-28 16:57:45 +00:00
for len ( tss ) > 0 {
2020-07-10 12:13:26 +00:00
// Process big tss in smaller blocks in order to reduce the maximum memory usage
2020-09-26 01:07:45 +00:00
samplesCount := 0
2021-03-30 21:44:31 +00:00
labelsCount := 0
2020-09-26 01:07:45 +00:00
i := 0
for i < len ( tss ) {
samplesCount += len ( tss [ i ] . Samples )
2023-11-25 09:31:30 +00:00
labelsCount += len ( tss [ i ] . Samples ) * len ( tss [ i ] . Labels )
2020-09-26 01:07:45 +00:00
i ++
2021-11-04 13:39:14 +00:00
if samplesCount >= maxSamplesPerBlock || labelsCount >= maxLabelsPerBlock {
2020-09-26 01:07:45 +00:00
break
}
}
2024-03-21 16:14:49 +00:00
2024-03-30 04:38:29 +00:00
ingestionRateLimiter . Register ( samplesCount )
2024-03-21 16:14:49 +00:00
2020-02-28 16:57:45 +00:00
tssBlock := tss
2020-09-26 01:07:45 +00:00
if i < len ( tss ) {
tssBlock = tss [ : i ]
tss = tss [ i : ]
2020-02-28 18:03:38 +00:00
} else {
tss = nil
2020-02-28 16:57:45 +00:00
}
2023-12-04 23:20:44 +00:00
if tenantRctx != nil {
tenantRctx . tenantToLabels ( tssBlock , at . AccountID , at . ProjectID )
}
2020-03-03 11:08:17 +00:00
if rctx != nil {
2022-05-06 12:28:59 +00:00
rowsCountBeforeRelabel := getRowsCount ( tssBlock )
2023-08-15 11:47:48 +00:00
tssBlock = rctx . applyRelabeling ( tssBlock , pcsGlobal )
2022-05-06 12:28:59 +00:00
rowsCountAfterRelabel := getRowsCount ( tssBlock )
rowsDroppedByGlobalRelabel . Add ( rowsCountBeforeRelabel - rowsCountAfterRelabel )
2020-03-03 11:08:17 +00:00
}
2021-05-19 23:12:36 +00:00
sortLabelsIfNeeded ( tssBlock )
2021-05-20 10:13:40 +00:00
tssBlock = limitSeriesCardinality ( tssBlock )
2024-05-13 13:22:37 +00:00
if ! tryPushBlockToRemoteStorages ( tssBlock , forceDropSamplesOnFailure ) {
2023-11-24 12:42:11 +00:00
return false
}
2020-02-28 16:57:45 +00:00
}
2023-11-24 12:42:11 +00:00
return true
2020-02-23 11:35:47 +00:00
}
2024-05-13 13:22:37 +00:00
func tryPushBlockToRemoteStorages ( tssBlock [ ] prompbmarshal . TimeSeries , forceDropSamplesOnFailure bool ) bool {
2021-11-04 13:00:51 +00:00
if len ( tssBlock ) == 0 {
// Nothing to push
2023-11-24 12:42:11 +00:00
return true
2021-11-04 13:00:51 +00:00
}
2023-11-24 12:42:11 +00:00
2023-07-25 01:15:24 +00:00
if len ( rwctxs ) == 1 {
// Fast path - just push data to the configured single remote storage
2024-05-10 10:09:21 +00:00
return rwctxs [ 0 ] . TryPush ( tssBlock , forceDropSamplesOnFailure )
2023-07-25 01:15:24 +00:00
}
// We need to push tssBlock to multiple remote storages.
// This is either sharding or replication depending on -remoteWrite.shardByURL command-line flag value.
2024-04-19 09:25:41 +00:00
if * shardByURL && * shardByURLReplicas < len ( rwctxs ) {
// Shard tssBlock samples among rwctxs.
replicas := * shardByURLReplicas
if replicas <= 0 {
replicas = 1
2023-07-25 01:15:24 +00:00
}
2024-05-13 13:22:37 +00:00
return tryShardingBlockAmongRemoteStorages ( tssBlock , replicas , forceDropSamplesOnFailure )
2023-07-25 01:15:24 +00:00
}
2024-04-19 09:25:41 +00:00
// Replicate tssBlock samples among rwctxs.
// Push tssBlock to remote storage systems in parallel in order to reduce
2023-07-25 01:15:24 +00:00
// the time needed for sending the data to multiple remote storage systems.
2021-11-04 13:00:51 +00:00
var wg sync . WaitGroup
2023-07-25 01:15:24 +00:00
wg . Add ( len ( rwctxs ) )
2024-02-24 00:44:19 +00:00
var anyPushFailed atomic . Bool
2021-11-04 13:00:51 +00:00
for _ , rwctx := range rwctxs {
2021-11-04 14:58:28 +00:00
go func ( rwctx * remoteWriteCtx ) {
2021-11-04 13:00:51 +00:00
defer wg . Done ( )
2024-05-10 10:09:21 +00:00
if ! rwctx . TryPush ( tssBlock , forceDropSamplesOnFailure ) {
2024-02-24 00:44:19 +00:00
anyPushFailed . Store ( true )
2023-11-24 12:42:11 +00:00
}
2021-11-04 14:58:28 +00:00
} ( rwctx )
2021-11-04 13:00:51 +00:00
}
2021-11-04 14:58:28 +00:00
wg . Wait ( )
2024-02-24 00:44:19 +00:00
return ! anyPushFailed . Load ( )
2021-11-04 13:00:51 +00:00
}
2024-05-13 13:22:37 +00:00
func tryShardingBlockAmongRemoteStorages ( tssBlock [ ] prompbmarshal . TimeSeries , replicas int , forceDropSamplesOnFailure bool ) bool {
2024-04-19 09:25:41 +00:00
x := getTSSShards ( len ( rwctxs ) )
defer putTSSShards ( x )
shards := x . shards
tmpLabels := promutils . GetLabels ( )
for _ , ts := range tssBlock {
hashLabels := ts . Labels
if len ( shardByURLLabelsMap ) > 0 {
hashLabels = tmpLabels . Labels [ : 0 ]
for _ , label := range ts . Labels {
if _ , ok := shardByURLLabelsMap [ label . Name ] ; ok {
hashLabels = append ( hashLabels , label )
}
}
tmpLabels . Labels = hashLabels
} else if len ( shardByURLIgnoreLabelsMap ) > 0 {
hashLabels = tmpLabels . Labels [ : 0 ]
for _ , label := range ts . Labels {
if _ , ok := shardByURLIgnoreLabelsMap [ label . Name ] ; ! ok {
hashLabels = append ( hashLabels , label )
}
}
tmpLabels . Labels = hashLabels
}
h := getLabelsHash ( hashLabels )
idx := h % uint64 ( len ( shards ) )
i := 0
for {
shards [ idx ] = append ( shards [ idx ] , ts )
i ++
if i >= replicas {
break
}
idx ++
if idx >= uint64 ( len ( shards ) ) {
idx = 0
}
}
}
promutils . PutLabels ( tmpLabels )
// Push sharded samples to remote storage systems in parallel in order to reduce
// the time needed for sending the data to multiple remote storage systems.
var wg sync . WaitGroup
var anyPushFailed atomic . Bool
for i , rwctx := range rwctxs {
shard := shards [ i ]
if len ( shard ) == 0 {
continue
}
wg . Add ( 1 )
go func ( rwctx * remoteWriteCtx , tss [ ] prompbmarshal . TimeSeries ) {
defer wg . Done ( )
2024-05-10 10:09:21 +00:00
if ! rwctx . TryPush ( tss , forceDropSamplesOnFailure ) {
2024-04-19 09:25:41 +00:00
anyPushFailed . Store ( true )
}
} ( rwctx , shard )
}
wg . Wait ( )
return ! anyPushFailed . Load ( )
}
type tssShards struct {
shards [ ] [ ] prompbmarshal . TimeSeries
}
func getTSSShards ( n int ) * tssShards {
v := tssShardsPool . Get ( )
if v == nil {
v = & tssShards { }
}
x := v . ( * tssShards )
if cap ( x . shards ) < n {
x . shards = make ( [ ] [ ] prompbmarshal . TimeSeries , n )
}
x . shards = x . shards [ : n ]
return x
}
func putTSSShards ( x * tssShards ) {
shards := x . shards
for i := range shards {
clear ( shards [ i ] )
shards [ i ] = shards [ i ] [ : 0 ]
}
tssShardsPool . Put ( x )
}
var tssShardsPool sync . Pool
2021-05-19 23:12:36 +00:00
// sortLabelsIfNeeded sorts labels if -sortLabels command-line flag is set.
func sortLabelsIfNeeded ( tss [ ] prompbmarshal . TimeSeries ) {
if ! * sortLabels {
return
}
for i := range tss {
promrelabel . SortLabels ( tss [ i ] . Labels )
}
}
2021-05-20 10:13:40 +00:00
func limitSeriesCardinality ( tss [ ] prompbmarshal . TimeSeries ) [ ] prompbmarshal . TimeSeries {
if hourlySeriesLimiter == nil && dailySeriesLimiter == nil {
return tss
}
dst := make ( [ ] prompbmarshal . TimeSeries , 0 , len ( tss ) )
for i := range tss {
labels := tss [ i ] . Labels
h := getLabelsHash ( labels )
if hourlySeriesLimiter != nil && ! hourlySeriesLimiter . Add ( h ) {
2021-05-20 11:15:19 +00:00
hourlySeriesLimitRowsDropped . Add ( len ( tss [ i ] . Samples ) )
logSkippedSeries ( labels , "-remoteWrite.maxHourlySeries" , hourlySeriesLimiter . MaxItems ( ) )
2021-05-20 10:13:40 +00:00
continue
}
if dailySeriesLimiter != nil && ! dailySeriesLimiter . Add ( h ) {
2021-05-20 11:15:19 +00:00
dailySeriesLimitRowsDropped . Add ( len ( tss [ i ] . Samples ) )
logSkippedSeries ( labels , "-remoteWrite.maxDailySeries" , dailySeriesLimiter . MaxItems ( ) )
2021-05-20 10:13:40 +00:00
continue
}
dst = append ( dst , tss [ i ] )
}
return dst
}
var (
hourlySeriesLimiter * bloomfilter . Limiter
dailySeriesLimiter * bloomfilter . Limiter
2021-05-20 11:15:19 +00:00
hourlySeriesLimitRowsDropped = metrics . NewCounter ( ` vmagent_hourly_series_limit_rows_dropped_total ` )
dailySeriesLimitRowsDropped = metrics . NewCounter ( ` vmagent_daily_series_limit_rows_dropped_total ` )
2021-05-20 10:13:40 +00:00
)
func getLabelsHash ( labels [ ] prompbmarshal . Label ) uint64 {
bb := labelsHashBufPool . Get ( )
b := bb . B [ : 0 ]
for _ , label := range labels {
b = append ( b , label . Name ... )
b = append ( b , label . Value ... )
}
h := xxhash . Sum64 ( b )
bb . B = b
labelsHashBufPool . Put ( bb )
return h
}
var labelsHashBufPool bytesutil . ByteBufferPool
func logSkippedSeries ( labels [ ] prompbmarshal . Label , flagName string , flagValue int ) {
select {
case <- logSkippedSeriesTicker . C :
2021-12-21 15:03:25 +00:00
// Do not use logger.WithThrottler() here, since this will increase CPU usage
// because every call to logSkippedSeries will result to a call to labelsToString.
2021-05-20 10:13:40 +00:00
logger . Warnf ( "skip series %s because %s=%d reached" , labelsToString ( labels ) , flagName , flagValue )
default :
}
}
var logSkippedSeriesTicker = time . NewTicker ( 5 * time . Second )
func labelsToString ( labels [ ] prompbmarshal . Label ) string {
var b [ ] byte
b = append ( b , '{' )
for i , label := range labels {
b = append ( b , label . Name ... )
b = append ( b , '=' )
b = strconv . AppendQuote ( b , label . Value )
if i + 1 < len ( labels ) {
b = append ( b , ',' )
}
}
b = append ( b , '}' )
return string ( b )
}
2022-05-06 12:28:59 +00:00
var (
2022-05-06 12:50:50 +00:00
globalRowsPushedBeforeRelabel = metrics . NewCounter ( "vmagent_remotewrite_global_rows_pushed_before_relabel_total" )
2022-05-06 12:28:59 +00:00
rowsDroppedByGlobalRelabel = metrics . NewCounter ( "vmagent_remotewrite_global_relabel_metrics_dropped_total" )
)
2020-03-03 11:08:17 +00:00
type remoteWriteCtx struct {
2023-01-04 06:19:18 +00:00
idx int
fq * persistentqueue . FastQueue
c * client
2024-03-04 22:45:22 +00:00
sas atomic . Pointer [ streamaggr . Aggregators ]
deduplicator * streamaggr . Deduplicator
2024-05-10 10:09:21 +00:00
streamAggrKeepInput bool
streamAggrDropInput bool
disableOnDiskQueue bool
dropSamplesOnOverload bool
2023-01-04 06:19:18 +00:00
2020-03-03 11:08:17 +00:00
pss [ ] * pendingSeries
2024-02-24 00:44:19 +00:00
pssNextIdx atomic . Uint64
2020-03-03 11:08:17 +00:00
2023-11-25 09:31:30 +00:00
rowsPushedAfterRelabel * metrics . Counter
rowsDroppedByRelabel * metrics . Counter
2024-05-10 10:09:21 +00:00
pushFailures * metrics . Counter
rowsDroppedOnPushFailure * metrics . Counter
2020-03-03 11:08:17 +00:00
}
2023-09-01 07:34:16 +00:00
func newRemoteWriteCtx ( argIdx int , remoteWriteURL * url . URL , maxInmemoryBlocks int , sanitizedURL string ) * remoteWriteCtx {
2021-09-28 21:52:07 +00:00
// strip query params, otherwise changing params resets pq
pqURL := * remoteWriteURL
pqURL . RawQuery = ""
pqURL . Fragment = ""
h := xxhash . Sum64 ( [ ] byte ( pqURL . String ( ) ) )
2023-03-28 01:33:05 +00:00
queuePath := filepath . Join ( * tmpDataPath , persistentQueueDirname , fmt . Sprintf ( "%d_%016X" , argIdx + 1 , h ) )
2023-08-12 11:17:55 +00:00
maxPendingBytes := maxPendingBytesPerURL . GetOptionalArg ( argIdx )
2023-04-26 10:23:01 +00:00
if maxPendingBytes != 0 && maxPendingBytes < persistentqueue . DefaultChunkFileSize {
2023-05-08 22:42:28 +00:00
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4195
logger . Warnf ( "rounding the -remoteWrite.maxDiskUsagePerURL=%d to the minimum supported value: %d" , maxPendingBytes , persistentqueue . DefaultChunkFileSize )
maxPendingBytes = persistentqueue . DefaultChunkFileSize
2023-04-26 10:23:01 +00:00
}
2024-05-10 10:09:21 +00:00
isPQDisabled := disableOnDiskQueue . GetOptionalArg ( argIdx )
fq := persistentqueue . MustOpenFastQueue ( queuePath , sanitizedURL , maxInmemoryBlocks , maxPendingBytes , isPQDisabled )
2021-09-28 21:52:07 +00:00
_ = metrics . GetOrCreateGauge ( fmt . Sprintf ( ` vmagent_remotewrite_pending_data_bytes { path=%q, url=%q} ` , queuePath , sanitizedURL ) , func ( ) float64 {
2020-03-03 11:08:17 +00:00
return float64 ( fq . GetPendingBytes ( ) )
} )
2021-09-28 21:52:07 +00:00
_ = metrics . GetOrCreateGauge ( fmt . Sprintf ( ` vmagent_remotewrite_pending_inmemory_blocks { path=%q, url=%q} ` , queuePath , sanitizedURL ) , func ( ) float64 {
2020-03-03 11:08:17 +00:00
return float64 ( fq . GetInmemoryQueueLen ( ) )
} )
2023-11-25 09:31:30 +00:00
_ = metrics . GetOrCreateGauge ( fmt . Sprintf ( ` vmagent_remotewrite_queue_blocked { path=%q, url=%q} ` , queuePath , sanitizedURL ) , func ( ) float64 {
if fq . IsWriteBlocked ( ) {
return 1
2023-11-24 12:42:11 +00:00
}
return 0
} )
2023-02-24 01:36:52 +00:00
2021-09-28 21:52:07 +00:00
var c * client
switch remoteWriteURL . Scheme {
case "http" , "https" :
2023-02-26 20:07:30 +00:00
c = newHTTPClient ( argIdx , remoteWriteURL . String ( ) , sanitizedURL , fq , * queues )
2021-09-28 21:52:07 +00:00
default :
logger . Fatalf ( "unsupported scheme: %s for remoteWriteURL: %s, want `http`, `https`" , remoteWriteURL . Scheme , sanitizedURL )
}
c . init ( argIdx , * queues , sanitizedURL )
2023-01-04 06:19:18 +00:00
// Initialize pss
2023-08-12 11:17:55 +00:00
sf := significantFigures . GetOptionalArg ( argIdx )
rd := roundDigits . GetOptionalArg ( argIdx )
2021-03-31 13:16:26 +00:00
pssLen := * queues
if n := cgroup . AvailableCPUs ( ) ; pssLen > n {
// There is no sense in running more than availableCPUs concurrent pendingSeries,
// since every pendingSeries can saturate up to a single CPU.
pssLen = n
}
pss := make ( [ ] * pendingSeries , pssLen )
2020-03-03 11:08:17 +00:00
for i := range pss {
2023-11-24 12:42:11 +00:00
pss [ i ] = newPendingSeries ( fq , c . useVMProto , sf , rd )
2020-03-03 11:08:17 +00:00
}
2023-01-04 06:19:18 +00:00
rwctx := & remoteWriteCtx {
2020-05-30 11:36:40 +00:00
idx : argIdx ,
fq : fq ,
c : c ,
pss : pss ,
2020-03-03 11:08:17 +00:00
2024-05-10 10:09:21 +00:00
dropSamplesOnOverload : dropSamplesOnOverload . GetOptionalArg ( argIdx ) ,
disableOnDiskQueue : isPQDisabled ,
2023-11-25 09:31:30 +00:00
rowsPushedAfterRelabel : metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_rows_pushed_after_relabel_total { path=%q, url=%q} ` , queuePath , sanitizedURL ) ) ,
rowsDroppedByRelabel : metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_relabel_metrics_dropped_total { path=%q, url=%q} ` , queuePath , sanitizedURL ) ) ,
2024-05-10 10:09:21 +00:00
pushFailures : metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_push_failures_total { path=%q, url=%q} ` , queuePath , sanitizedURL ) ) ,
rowsDroppedOnPushFailure : metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_samples_dropped_total { path=%q, url=%q} ` , queuePath , sanitizedURL ) ) ,
2020-02-23 11:35:47 +00:00
}
2023-01-04 06:19:18 +00:00
// Initialize sas
2023-04-01 04:27:45 +00:00
sasFile := streamAggrConfig . GetOptionalArg ( argIdx )
2024-03-04 22:45:22 +00:00
dedupInterval := streamAggrDedupInterval . GetOptionalArg ( argIdx )
2024-03-17 21:01:44 +00:00
ignoreOldSamples := streamAggrIgnoreOldSamples . GetOptionalArg ( argIdx )
2023-04-01 04:27:45 +00:00
if sasFile != "" {
2024-03-04 03:42:55 +00:00
opts := & streamaggr . Options {
2024-04-22 11:52:04 +00:00
DedupInterval : dedupInterval ,
DropInputLabels : * streamAggrDropInputLabels ,
IgnoreOldSamples : ignoreOldSamples ,
IgnoreFirstIntervals : * streamAggrIgnoreFirstIntervals ,
2024-03-04 03:42:55 +00:00
}
sas , err := streamaggr . LoadFromFile ( sasFile , rwctx . pushInternalTrackDropped , opts )
2023-01-04 06:19:18 +00:00
if err != nil {
2023-04-01 04:27:45 +00:00
logger . Fatalf ( "cannot initialize stream aggregators from -remoteWrite.streamAggr.config=%q: %s" , sasFile , err )
2023-01-04 06:19:18 +00:00
}
2023-04-01 04:27:45 +00:00
rwctx . sas . Store ( sas )
2023-01-04 06:19:18 +00:00
rwctx . streamAggrKeepInput = streamAggrKeepInput . GetOptionalArg ( argIdx )
2023-07-24 23:44:09 +00:00
rwctx . streamAggrDropInput = streamAggrDropInput . GetOptionalArg ( argIdx )
2023-04-01 04:27:45 +00:00
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_streamaggr_config_reload_successful { path=%q} ` , sasFile ) ) . Set ( 1 )
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_streamaggr_config_reload_success_timestamp_seconds { path=%q} ` , sasFile ) ) . Set ( fasttime . UnixTimestamp ( ) )
2024-03-04 22:45:22 +00:00
} else if dedupInterval > 0 {
2024-03-05 00:13:21 +00:00
rwctx . deduplicator = streamaggr . NewDeduplicator ( rwctx . pushInternalTrackDropped , dedupInterval , * streamAggrDropInputLabels )
2023-01-04 06:19:18 +00:00
}
return rwctx
2020-02-23 11:35:47 +00:00
}
2020-03-03 11:08:17 +00:00
func ( rwctx * remoteWriteCtx ) MustStop ( ) {
2024-03-04 22:45:22 +00:00
// sas and deduplicator must be stopped before rwctx is closed
2023-06-07 13:45:43 +00:00
// because sas can write pending series to rwctx.pss if there are any
sas := rwctx . sas . Swap ( nil )
sas . MustStop ( )
2024-03-04 22:45:22 +00:00
if rwctx . deduplicator != nil {
rwctx . deduplicator . MustStop ( )
rwctx . deduplicator = nil
}
2020-03-03 11:08:17 +00:00
for _ , ps := range rwctx . pss {
ps . MustStop ( )
}
2020-05-30 11:36:40 +00:00
rwctx . idx = 0
2020-03-03 11:08:17 +00:00
rwctx . pss = nil
2021-02-18 22:31:07 +00:00
rwctx . fq . UnblockAllReaders ( )
2020-03-03 11:08:17 +00:00
rwctx . c . MustStop ( )
rwctx . c = nil
2023-04-01 04:27:45 +00:00
2021-02-17 19:23:38 +00:00
rwctx . fq . MustClose ( )
rwctx . fq = nil
2020-03-03 11:08:17 +00:00
2022-05-06 12:28:59 +00:00
rwctx . rowsPushedAfterRelabel = nil
rwctx . rowsDroppedByRelabel = nil
2020-03-03 11:08:17 +00:00
}
2020-02-23 11:35:47 +00:00
2024-05-06 10:09:51 +00:00
// TryPush sends tss series to the configured remote write endpoint
//
// TryPush can be called concurrently for multiple remoteWriteCtx,
// so it shouldn't modify tss entries.
2024-05-10 10:09:21 +00:00
func ( rwctx * remoteWriteCtx ) TryPush ( tss [ ] prompbmarshal . TimeSeries , forceDropSamplesOnFailure bool ) bool {
2023-01-04 06:19:18 +00:00
// Apply relabeling
2020-03-03 11:08:17 +00:00
var rctx * relabelCtx
2020-07-10 12:13:26 +00:00
var v * [ ] prompbmarshal . TimeSeries
2023-07-20 00:37:49 +00:00
rcs := allRelabelConfigs . Load ( )
2021-02-22 14:33:55 +00:00
pcs := rcs . perURL [ rwctx . idx ]
if pcs . Len ( ) > 0 {
2020-07-10 12:13:26 +00:00
rctx = getRelabelCtx ( )
2020-05-12 19:01:47 +00:00
// Make a copy of tss before applying relabeling in order to prevent
// from affecting time series for other remoteWrite.url configs.
2020-07-10 12:13:26 +00:00
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/599
2023-07-24 23:33:30 +00:00
v = tssPool . Get ( ) . ( * [ ] prompbmarshal . TimeSeries )
2020-07-10 12:13:26 +00:00
tss = append ( * v , tss ... )
2022-05-06 12:28:59 +00:00
rowsCountBeforeRelabel := getRowsCount ( tss )
2023-08-15 11:47:48 +00:00
tss = rctx . applyRelabeling ( tss , pcs )
2022-05-06 12:28:59 +00:00
rowsCountAfterRelabel := getRowsCount ( tss )
rwctx . rowsDroppedByRelabel . Add ( rowsCountBeforeRelabel - rowsCountAfterRelabel )
2020-03-03 11:08:17 +00:00
}
2022-05-06 12:28:59 +00:00
rowsCount := getRowsCount ( tss )
rwctx . rowsPushedAfterRelabel . Add ( rowsCount )
2023-01-04 06:19:18 +00:00
2024-03-04 22:45:22 +00:00
// Apply stream aggregation or deduplication if they are configured
2023-07-24 23:44:09 +00:00
sas := rwctx . sas . Load ( )
if sas != nil {
matchIdxs := matchIdxsPool . Get ( )
matchIdxs . B = sas . Push ( tss , matchIdxs . B )
if ! rwctx . streamAggrKeepInput {
if rctx == nil {
rctx = getRelabelCtx ( )
// Make a copy of tss before dropping aggregated series
v = tssPool . Get ( ) . ( * [ ] prompbmarshal . TimeSeries )
tss = append ( * v , tss ... )
}
tss = dropAggregatedSeries ( tss , matchIdxs . B , rwctx . streamAggrDropInput )
2023-07-24 23:33:30 +00:00
}
2023-07-24 23:44:09 +00:00
matchIdxsPool . Put ( matchIdxs )
2024-03-04 22:45:22 +00:00
} else if rwctx . deduplicator != nil {
rwctx . deduplicator . Push ( tss )
tss = tss [ : 0 ]
2023-07-24 23:33:30 +00:00
}
2023-11-25 09:31:30 +00:00
// Try pushing the data to remote storage
ok := rwctx . tryPushInternal ( tss )
// Return back relabeling contexts to the pool
if rctx != nil {
* v = prompbmarshal . ResetTimeSeries ( tss )
tssPool . Put ( v )
putRelabelCtx ( rctx )
}
2024-05-10 10:09:21 +00:00
if ! ok {
rwctx . pushFailures . Inc ( )
if forceDropSamplesOnFailure || rwctx . dropSamplesOnOverload {
rwctx . rowsDroppedOnPushFailure . Add ( len ( tss ) )
return true
}
}
2023-11-25 09:31:30 +00:00
return ok
2023-07-24 23:44:09 +00:00
}
2023-07-24 23:33:30 +00:00
2023-07-24 23:44:09 +00:00
var matchIdxsPool bytesutil . ByteBufferPool
2023-07-24 23:33:30 +00:00
2023-07-24 23:44:09 +00:00
func dropAggregatedSeries ( src [ ] prompbmarshal . TimeSeries , matchIdxs [ ] byte , dropInput bool ) [ ] prompbmarshal . TimeSeries {
dst := src [ : 0 ]
2023-08-10 12:27:21 +00:00
if ! dropInput {
for i , match := range matchIdxs {
if match == 1 {
continue
}
dst = append ( dst , src [ i ] )
2023-07-24 23:44:09 +00:00
}
}
tail := src [ len ( dst ) : ]
2024-03-04 22:45:22 +00:00
clear ( tail )
2023-07-24 23:44:09 +00:00
return dst
2020-03-03 11:08:17 +00:00
}
2020-07-10 12:13:26 +00:00
2023-11-24 12:42:11 +00:00
func ( rwctx * remoteWriteCtx ) pushInternalTrackDropped ( tss [ ] prompbmarshal . TimeSeries ) {
2023-11-25 09:31:30 +00:00
if rwctx . tryPushInternal ( tss ) {
return
}
2024-05-10 10:09:21 +00:00
if ! rwctx . disableOnDiskQueue {
2023-11-25 09:31:30 +00:00
logger . Panicf ( "BUG: tryPushInternal must return true if -remoteWrite.disableOnDiskQueue isn't set" )
}
2024-05-10 10:09:21 +00:00
rwctx . pushFailures . Inc ( )
if dropSamplesOnOverload . GetOptionalArg ( rwctx . idx ) {
2023-11-25 09:31:30 +00:00
rowsCount := getRowsCount ( tss )
2024-05-10 10:09:21 +00:00
rwctx . rowsDroppedOnPushFailure . Add ( rowsCount )
2023-11-24 12:42:11 +00:00
}
}
2023-11-25 09:31:30 +00:00
func ( rwctx * remoteWriteCtx ) tryPushInternal ( tss [ ] prompbmarshal . TimeSeries ) bool {
2023-09-08 21:17:16 +00:00
var rctx * relabelCtx
var v * [ ] prompbmarshal . TimeSeries
2023-08-15 11:47:48 +00:00
if len ( labelsGlobal ) > 0 {
2023-09-08 21:17:16 +00:00
// Make a copy of tss before adding extra labels in order to prevent
// from affecting time series for other remoteWrite.url configs.
rctx = getRelabelCtx ( )
v = tssPool . Get ( ) . ( * [ ] prompbmarshal . TimeSeries )
tss = append ( * v , tss ... )
2023-08-17 12:35:26 +00:00
rctx . appendExtraLabels ( tss , labelsGlobal )
2023-08-15 11:47:48 +00:00
}
2023-08-17 10:15:03 +00:00
2023-01-04 06:19:18 +00:00
pss := rwctx . pss
2024-02-24 00:44:19 +00:00
idx := rwctx . pssNextIdx . Add ( 1 ) % uint64 ( len ( pss ) )
2023-11-25 09:31:30 +00:00
ok := pss [ idx ] . TryPush ( tss )
if rctx != nil {
* v = prompbmarshal . ResetTimeSeries ( tss )
tssPool . Put ( v )
putRelabelCtx ( rctx )
}
return ok
2023-01-04 06:19:18 +00:00
}
2023-03-29 16:05:58 +00:00
func ( rwctx * remoteWriteCtx ) reinitStreamAggr ( ) {
2023-10-16 13:57:24 +00:00
sasFile := streamAggrConfig . GetOptionalArg ( rwctx . idx )
if sasFile == "" {
2023-04-01 04:27:45 +00:00
// There is no stream aggregation for rwctx
2023-03-29 16:05:58 +00:00
return
}
2023-10-16 14:00:24 +00:00
2023-04-01 04:27:45 +00:00
logger . Infof ( "reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q" , sasFile )
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_streamaggr_config_reloads_total { path=%q} ` , sasFile ) ) . Inc ( )
2024-03-04 03:42:55 +00:00
opts := & streamaggr . Options {
2024-03-17 21:01:44 +00:00
DedupInterval : streamAggrDedupInterval . GetOptionalArg ( rwctx . idx ) ,
DropInputLabels : * streamAggrDropInputLabels ,
IgnoreOldSamples : streamAggrIgnoreOldSamples . GetOptionalArg ( rwctx . idx ) ,
2024-03-04 03:42:55 +00:00
}
sasNew , err := streamaggr . LoadFromFile ( sasFile , rwctx . pushInternalTrackDropped , opts )
2023-04-01 04:27:45 +00:00
if err != nil {
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_streamaggr_config_reloads_errors_total { path=%q} ` , sasFile ) ) . Inc ( )
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_streamaggr_config_reload_successful { path=%q} ` , sasFile ) ) . Set ( 0 )
logger . Errorf ( "cannot reload stream aggregation config from -remoteWrite.streamAggr.config=%q; continue using the previously loaded config; error: %s" , sasFile , err )
2023-03-29 16:05:58 +00:00
return
}
2023-10-16 18:52:52 +00:00
sas := rwctx . sas . Load ( )
2023-04-01 04:27:45 +00:00
if ! sasNew . Equal ( sas ) {
sasOld := rwctx . sas . Swap ( sasNew )
sasOld . MustStop ( )
logger . Infof ( "successfully reloaded stream aggregation configs at -remoteWrite.streamAggr.config=%q" , sasFile )
} else {
sasNew . MustStop ( )
logger . Infof ( "the config at -remoteWrite.streamAggr.config=%q wasn't changed" , sasFile )
2023-03-29 16:05:58 +00:00
}
2023-04-01 04:27:45 +00:00
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_streamaggr_config_reload_successful { path=%q} ` , sasFile ) ) . Set ( 1 )
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_streamaggr_config_reload_success_timestamp_seconds { path=%q} ` , sasFile ) ) . Set ( fasttime . UnixTimestamp ( ) )
2023-03-29 16:05:58 +00:00
}
2023-07-24 23:33:30 +00:00
var tssPool = & sync . Pool {
2020-07-10 12:13:26 +00:00
New : func ( ) interface { } {
2020-07-14 11:27:50 +00:00
a := [ ] prompbmarshal . TimeSeries { }
return & a
2020-07-10 12:13:26 +00:00
} ,
}
2022-05-06 12:28:59 +00:00
func getRowsCount ( tss [ ] prompbmarshal . TimeSeries ) int {
rowsCount := 0
for _ , ts := range tss {
rowsCount += len ( ts . Samples )
}
return rowsCount
}
2023-04-01 04:27:45 +00:00
2024-05-16 07:25:42 +00:00
// HasAnyStreamAggrConfigured checks if any streaming aggregation config provided
func HasAnyStreamAggrConfigured ( ) bool {
return len ( * streamAggrConfig ) > 0
}
2023-04-01 04:27:45 +00:00
// CheckStreamAggrConfigs checks configs pointed by -remoteWrite.streamAggr.config
func CheckStreamAggrConfigs ( ) error {
2024-04-02 20:16:24 +00:00
pushNoop := func ( _ [ ] prompbmarshal . TimeSeries ) { }
2023-04-01 04:27:45 +00:00
for idx , sasFile := range * streamAggrConfig {
if sasFile == "" {
continue
}
2024-03-04 03:42:55 +00:00
opts := & streamaggr . Options {
2024-03-17 21:01:44 +00:00
DedupInterval : streamAggrDedupInterval . GetOptionalArg ( idx ) ,
DropInputLabels : * streamAggrDropInputLabels ,
IgnoreOldSamples : streamAggrIgnoreOldSamples . GetOptionalArg ( idx ) ,
2024-03-04 03:42:55 +00:00
}
sas , err := streamaggr . LoadFromFile ( sasFile , pushNoop , opts )
2023-04-01 04:27:45 +00:00
if err != nil {
return fmt . Errorf ( "cannot load -remoteWrite.streamAggr.config=%q: %w" , sasFile , err )
}
sas . MustStop ( )
}
return nil
}
2024-04-02 21:36:32 +00:00
func newMapFromStrings ( a [ ] string ) map [ string ] struct { } {
m := make ( map [ string ] struct { } , len ( a ) )
for _ , s := range a {
m [ s ] = struct { } { }
}
return m
}