mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/storage: move -dedup.minScrapeInterval
flag outside lib/storage, so it doesnt show up in vminsert
in cluster version
This commit is contained in:
parent
07c067697e
commit
ea66212c93
3 changed files with 24 additions and 8 deletions
|
@ -18,6 +18,7 @@ import (
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
)
|
)
|
||||||
|
@ -27,8 +28,11 @@ var (
|
||||||
cacheDataPath = flag.String("cacheDataPath", "", "Path to directory for cache files. Cache isn't saved if empty")
|
cacheDataPath = flag.String("cacheDataPath", "", "Path to directory for cache files. Cache isn't saved if empty")
|
||||||
maxConcurrentRequests = flag.Int("search.maxConcurrentRequests", getDefaultMaxConcurrentRequests(), "The maximum number of concurrent search requests. "+
|
maxConcurrentRequests = flag.Int("search.maxConcurrentRequests", getDefaultMaxConcurrentRequests(), "The maximum number of concurrent search requests. "+
|
||||||
"It shouldn't be high, since a single request can saturate all the CPU cores. See also -search.maxQueueDuration")
|
"It shouldn't be high, since a single request can saturate all the CPU cores. See also -search.maxQueueDuration")
|
||||||
maxQueueDuration = flag.Duration("search.maxQueueDuration", 10*time.Second, "The maximum time the request waits for execution when -search.maxConcurrentRequests limit is reached")
|
maxQueueDuration = flag.Duration("search.maxQueueDuration", 10*time.Second, "The maximum time the request waits for execution when -search.maxConcurrentRequests limit is reached")
|
||||||
storageNodes = flagutil.NewArray("storageNode", "Addresses of vmstorage nodes; usage: -storageNode=vmstorage-host1:8401 -storageNode=vmstorage-host2:8401")
|
minScrapeInterval = flag.Duration("dedup.minScrapeInterval", 0, "Remove superflouos samples from time series if they are located closer to each other than this duration. "+
|
||||||
|
"This may be useful for reducing overhead when multiple identically configured Prometheus instances write data to the same VictoriaMetrics. "+
|
||||||
|
"Deduplication is disabled if the -dedup.minScrapeInterval is 0")
|
||||||
|
storageNodes = flagutil.NewArray("storageNode", "Addresses of vmstorage nodes; usage: -storageNode=vmstorage-host1:8401 -storageNode=vmstorage-host2:8401")
|
||||||
)
|
)
|
||||||
|
|
||||||
func getDefaultMaxConcurrentRequests() int {
|
func getDefaultMaxConcurrentRequests() int {
|
||||||
|
@ -52,6 +56,7 @@ func main() {
|
||||||
|
|
||||||
logger.Infof("starting netstorage at storageNodes %s", *storageNodes)
|
logger.Infof("starting netstorage at storageNodes %s", *storageNodes)
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
|
storage.SetMinScrapeIntervalForDeduplication(*minScrapeInterval)
|
||||||
if len(*storageNodes) == 0 {
|
if len(*storageNodes) == 0 {
|
||||||
logger.Fatalf("missing -storageNode arg")
|
logger.Fatalf("missing -storageNode arg")
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,9 @@ var (
|
||||||
|
|
||||||
bigMergeConcurrency = flag.Int("bigMergeConcurrency", 0, "The maximum number of CPU cores to use for big merges. Default value is used if set to 0")
|
bigMergeConcurrency = flag.Int("bigMergeConcurrency", 0, "The maximum number of CPU cores to use for big merges. Default value is used if set to 0")
|
||||||
smallMergeConcurrency = flag.Int("smallMergeConcurrency", 0, "The maximum number of CPU cores to use for small merges. Default value is used if set to 0")
|
smallMergeConcurrency = flag.Int("smallMergeConcurrency", 0, "The maximum number of CPU cores to use for small merges. Default value is used if set to 0")
|
||||||
|
minScrapeInterval = flag.Duration("dedup.minScrapeInterval", 0, "Remove superflouos samples from time series if they are located closer to each other than this duration. "+
|
||||||
|
"This may be useful for reducing overhead when multiple identically configured Prometheus instances write data to the same VictoriaMetrics. "+
|
||||||
|
"Deduplication is disabled if the -dedup.minScrapeInterval is 0")
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
@ -35,6 +38,7 @@ func main() {
|
||||||
buildinfo.Init()
|
buildinfo.Init()
|
||||||
logger.Init()
|
logger.Init()
|
||||||
|
|
||||||
|
storage.SetMinScrapeIntervalForDeduplication(*minScrapeInterval)
|
||||||
storage.SetBigMergeWorkersCount(*bigMergeConcurrency)
|
storage.SetBigMergeWorkersCount(*bigMergeConcurrency)
|
||||||
storage.SetSmallMergeWorkersCount(*smallMergeConcurrency)
|
storage.SetSmallMergeWorkersCount(*smallMergeConcurrency)
|
||||||
|
|
||||||
|
|
|
@ -1,14 +1,21 @@
|
||||||
package storage
|
package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"flag"
|
"time"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
)
|
)
|
||||||
|
|
||||||
var minScrapeInterval = flag.Duration("dedup.minScrapeInterval", 0, "Remove superflouos samples from time series if they are located closer to each other than this duration. "+
|
// SetMinScrapeIntervalForDeduplication sets the minimum interval for data points during de-duplication.
|
||||||
"This may be useful for reducing overhead when multiple identically configured Prometheus instances write data to the same VictoriaMetrics. "+
|
//
|
||||||
"Deduplication is disabled if the -dedup.minScrapeInterval is 0")
|
// De-duplication is disabled if interval is 0.
|
||||||
|
//
|
||||||
|
// This function must be called before initializing the storage.
|
||||||
|
func SetMinScrapeIntervalForDeduplication(interval time.Duration) {
|
||||||
|
minScrapeInterval = interval
|
||||||
|
}
|
||||||
|
|
||||||
|
var minScrapeInterval = time.Duration(0)
|
||||||
|
|
||||||
func getMinDelta() int64 {
|
func getMinDelta() int64 {
|
||||||
// Divide minScrapeInterval by 2 in order to preserve proper data points.
|
// Divide minScrapeInterval by 2 in order to preserve proper data points.
|
||||||
|
@ -23,7 +30,7 @@ func getMinDelta() int64 {
|
||||||
|
|
||||||
// DeduplicateSamples removes samples from src* if they are closer to each other than minScrapeInterval.
|
// DeduplicateSamples removes samples from src* if they are closer to each other than minScrapeInterval.
|
||||||
func DeduplicateSamples(srcTimestamps []int64, srcValues []float64) ([]int64, []float64) {
|
func DeduplicateSamples(srcTimestamps []int64, srcValues []float64) ([]int64, []float64) {
|
||||||
if *minScrapeInterval <= 0 {
|
if minScrapeInterval <= 0 {
|
||||||
return srcTimestamps, srcValues
|
return srcTimestamps, srcValues
|
||||||
}
|
}
|
||||||
minDelta := getMinDelta()
|
minDelta := getMinDelta()
|
||||||
|
@ -54,7 +61,7 @@ func DeduplicateSamples(srcTimestamps []int64, srcValues []float64) ([]int64, []
|
||||||
var dedupsDuringSelect = metrics.NewCounter(`deduplicated_samples_total{type="select"}`)
|
var dedupsDuringSelect = metrics.NewCounter(`deduplicated_samples_total{type="select"}`)
|
||||||
|
|
||||||
func deduplicateSamplesDuringMerge(srcTimestamps []int64, srcValues []int64) ([]int64, []int64) {
|
func deduplicateSamplesDuringMerge(srcTimestamps []int64, srcValues []int64) ([]int64, []int64) {
|
||||||
if *minScrapeInterval <= 0 {
|
if minScrapeInterval <= 0 {
|
||||||
return srcTimestamps, srcValues
|
return srcTimestamps, srcValues
|
||||||
}
|
}
|
||||||
if len(srcTimestamps) < 32 {
|
if len(srcTimestamps) < 32 {
|
||||||
|
|
Loading…
Reference in a new issue