mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-02-09 15:27:11 +00:00
Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files
This commit is contained in:
commit
2847c84a7b
18 changed files with 286 additions and 105 deletions
26
README.md
26
README.md
|
@ -1171,20 +1171,13 @@ See [these docs](https://docs.victoriametrics.com/guides/guide-vmcluster-multipl
|
|||
|
||||
## Downsampling
|
||||
|
||||
There is no downsampling support at the moment, but:
|
||||
[VictoriaMetrics Enterprise](https://victoriametrics.com/enterprise.html) supports multi-level downsampling with `-downsampling.period` command-line flag. For example:
|
||||
|
||||
* VictoriaMetrics is optimized for querying big amounts of raw data. See benchmark results for heavy queries
|
||||
in [this article](https://medium.com/@valyala/measuring-vertical-scalability-for-time-series-databases-in-google-cloud-92550d78d8ae).
|
||||
* VictoriaMetrics has good compression for on-disk data. See [this article](https://medium.com/@valyala/victoriametrics-achieving-better-compression-for-time-series-data-than-gorilla-317bc1f95932)
|
||||
for details.
|
||||
* The downsampling doesn't improve query performance on a long time range if the time range contains big number of time series due to [high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate). The query performance depends on the number of unique time series on the selected time range, while downsampling doesn't reduce the number of unique time series in the database - it can reduce only the number of samples per each time series.
|
||||
* `-downsampling.period=30d:5m` instructs VictoriaMetrics to [deduplicate](#deduplication) samples older than 30 days with 5 minutes interval.
|
||||
|
||||
These properties reduce the need of downsampling. We plan to implement downsampling in the future.
|
||||
See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/36) for details.
|
||||
* `-downsampling.period=30d:5m,180d:1h` instructs VictoriaMetrics to deduplicate samples older than 30 days with 5 minutes interval and to deduplicate samples older than 180 days with 1 hour interval.
|
||||
|
||||
It is possible to (ab)use [-dedup.minScrapeInterval](#deduplication) for basic downsampling.
|
||||
For instance, if interval between the ingested data points is 15s, then `-dedup.minScrapeInterval=5m` will leave
|
||||
only a single data point out of 20 initial data points per each 5m interval.
|
||||
Downsampling is applied independently per each time series. It can reduce disk space usage and improve query performance if it is applied to time series with big number of samples per each series. The downsampling doesn't improve query performance if the database contains big number of time series with small number of samples per each series (aka [high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate)), since downsamlping doesn't reduce the number of time series. So the majority of time is spent on searching for the matching time series.
|
||||
|
||||
|
||||
## Multi-tenancy
|
||||
|
@ -1575,11 +1568,14 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
|
|||
The maximum size in bytes of a single DataDog POST request to /api/v1/series
|
||||
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 67108864)
|
||||
-dedup.minScrapeInterval duration
|
||||
Leave only the first sample in every time series per each discrete interval equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication for details
|
||||
Leave only the first sample in every time series per each discrete interval equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication and https://docs.victoriametrics.com/#downsampling
|
||||
-deleteAuthKey string
|
||||
authKey for metrics' deletion via /api/v1/admin/tsdb/delete_series and /tags/delSeries
|
||||
-denyQueriesOutsideRetention
|
||||
Whether to deny queries outside of the configured -retentionPeriod. When set, then /api/v1/query_range would return '503 Service Unavailable' error for queries with 'from' value outside -retentionPeriod. This may be useful when multiple data sources with distinct retentions are hidden behind query-tee
|
||||
-downsampling.period array
|
||||
Comma-separated downsampling periods in the format 'offset:period'. For example, '30d:10m' instructs to leave a single sample per 10 minutes for samples older than 30 days. See https://docs.victoriametrics.com/#downsampling for details
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
-dryRun
|
||||
Whether to check only -promscrape.config and then exit. Unknown config entries are allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse
|
||||
-enableTCP6
|
||||
|
@ -1588,6 +1584,8 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
|
|||
Whether to enable reading flags from environment variables additionally to command line. Command line flag values have priority over values from environment vars. Flags are read only from command line if this flag isn't set. See https://docs.victoriametrics.com/#environment-variables for more details
|
||||
-envflag.prefix string
|
||||
Prefix for environment variables if -envflag.enable is set
|
||||
-eula
|
||||
By specifying this flag, you confirm that you have an enterprise license and accept the EULA https://victoriametrics.com/assets/VM_EULA.pdf
|
||||
-finalMergeDelay duration
|
||||
The delay before starting final merge for per-month partition after no new data is ingested into it. Final merge may require additional disk IO and CPU resources. Final merge may increase query speed and reduce disk space usage in some cases. Zero value disables final merge
|
||||
-forceFlushAuthKey string
|
||||
|
@ -1772,6 +1770,10 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
|
|||
Whether to disable automatic response cache reset if a sample with timestamp outside -search.cacheTimestampOffset is inserted into VictoriaMetrics
|
||||
-search.disableCache
|
||||
Whether to disable response caching. This may be useful during data backfilling
|
||||
-search.graphiteMaxPointsPerSeries int
|
||||
The maximum number of points per series Graphite render API can return (default 1000000)
|
||||
-search.graphiteStorageStep duration
|
||||
The interval between datapoints stored in the database. It is used at Graphite Render API handler for normalizing the interval between datapoints in case it isn't normalized. It can be overriden by sending 'storage_step' query arg to /render API or by sending the desired interval via 'Storage-Step' http header during querying /render API (default 10s)
|
||||
-search.latencyOffset duration
|
||||
The time when data points become visible in query results after the collection. Too small value can result in incomplete last points for query results (default 30s)
|
||||
-search.logSlowQueryDuration duration
|
||||
|
|
|
@ -25,7 +25,7 @@ import (
|
|||
var (
|
||||
httpListenAddr = flag.String("httpListenAddr", ":8428", "TCP address to listen for http connections")
|
||||
minScrapeInterval = flag.Duration("dedup.minScrapeInterval", 0, "Leave only the first sample in every time series per each discrete interval "+
|
||||
"equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication for details")
|
||||
"equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication and https://docs.victoriametrics.com/#downsampling")
|
||||
dryRun = flag.Bool("dryRun", false, "Whether to check only -promscrape.config and then exit. "+
|
||||
"Unknown config entries are allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse")
|
||||
)
|
||||
|
@ -57,7 +57,7 @@ func main() {
|
|||
|
||||
logger.Infof("starting VictoriaMetrics at %q...", *httpListenAddr)
|
||||
startTime := time.Now()
|
||||
storage.SetMinScrapeIntervalForDeduplication(*minScrapeInterval)
|
||||
storage.SetDedupInterval(*minScrapeInterval)
|
||||
vmstorage.Init(promql.ResetRollupResultCacheIfNeeded)
|
||||
vmselect.Init()
|
||||
vminsert.Init()
|
||||
|
|
|
@ -483,7 +483,8 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.
|
|||
dst.Values = append(dst.Values, pts.pd.values...)
|
||||
dst.Timestamps = append(dst.Timestamps, pts.pd.timestamps...)
|
||||
}
|
||||
mergeSortBlocks(dst, sbs)
|
||||
dedupInterval := storage.GetDedupInterval()
|
||||
mergeSortBlocks(dst, sbs, dedupInterval)
|
||||
if pts.pd != nil {
|
||||
if !sort.IsSorted(dst) {
|
||||
sort.Sort(dst)
|
||||
|
@ -531,7 +532,7 @@ var sbPool sync.Pool
|
|||
|
||||
var metricRowsSkipped = metrics.NewCounter(`vm_metric_rows_skipped_total{name="vmselect"}`)
|
||||
|
||||
func mergeSortBlocks(dst *Result, sbh sortBlocksHeap) {
|
||||
func mergeSortBlocks(dst *Result, sbh sortBlocksHeap, dedupInterval int64) {
|
||||
// Skip empty sort blocks, since they cannot be passed to heap.Init.
|
||||
src := sbh
|
||||
sbh = sbh[:0]
|
||||
|
@ -574,8 +575,7 @@ func mergeSortBlocks(dst *Result, sbh sortBlocksHeap) {
|
|||
putSortBlock(top)
|
||||
}
|
||||
}
|
||||
|
||||
timestamps, values := storage.DeduplicateSamples(dst.Timestamps, dst.Values)
|
||||
timestamps, values := storage.DeduplicateSamples(dst.Timestamps, dst.Values, dedupInterval)
|
||||
dedups := len(dst.Timestamps) - len(timestamps)
|
||||
dedupsDuringSelect.Add(dedups)
|
||||
dst.Timestamps = timestamps
|
||||
|
|
|
@ -6,20 +6,22 @@ sort: 15
|
|||
|
||||
## tip
|
||||
|
||||
* FEATURE: [VictoriaMetrics enterprise](https://victoriametrics.com/enterprise.html): add multi-level downsampling support. See [these docs](https://docs.victoriametrics.com/#downsampling) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/36).
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add ability to analyze the correlation between two queries on a single graph. Just click `+Query` button, enter the second query in the newly appeared input field and press `Ctrl+Enter`. Results for both queries should be displayed simultaneously on the same graph. Every query has its own vertical scale, which is displayed on the left and the right side of the graph. Lines for the second query are dashed. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1916).
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add ability to override the interval between returned datapoints. By default it is automatically calculated depending on the selected time range and horizontal resolution of the graph. Now it is possible to override it with custom values. This may be useful during data exploration and debugging.
|
||||
* FEATURE: accept optional `extra_filters[]=series_selector` query args at Prometheus query APIs additionally to `extra_label` query args. This allows enforcing additional filters for all the Prometheus query APIs by using [vmgateway](https://docs.victoriametrics.com/vmgateway.html) or [vmauth](https://docs.victoriametrics.com/vmauth.html). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1863).
|
||||
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): allow specifying `http` and `https` urls in `-auth.config` command-line flag. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1898). Thanks for @TFM93 .
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow specifying `http` and `https` urls in the following command-line flags: `-promscrape.config`, `-remoteWrite.relabelConfig` and `-remoteWrite.urlRelabelConfig`.
|
||||
* FEATURE: vminsert: allow specifying `http` and `https` urls in `-relabelConfig` command-line flag.
|
||||
* FEATURE: vminsert: add `-maxLabelValueLen` command-line flag for the ability to configure the maximum length of label value. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1908).
|
||||
* FEATURE: preserve the order of time series passed to [limit_offset](https://docs.victoriametrics.com/MetricsQL.html#limit_offset) function. This allows implementing series paging via `limit_offset(limit, offset, sort_by_label(...))`. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1920) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/951) issues.
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add ability to override the interval between returned datapoints. By default it is automatically calculated depending on the selected time range and horizontal resolution of the graph. Now it is possible to override it with custom values. This may be useful during data exploration and debugging.
|
||||
* FEATURE: automaticall convert `(value1|...|valueN)` into `{value1,...,valueN}` inside `__graphite__` pseudo-label. This allows using [Grafana multi-value template variables](https://grafana.com/docs/grafana/latest/variables/formatting-multi-value-variables/) inside `__graphite__` pseudo-label. For example, `{__graphite__=~"foo.($bar)"}` is expanded to `{__graphite__=~"foo.{x,y}"}` if both `x` and `y` are selected for `$bar` template variable. See [these docs](https://docs.victoriametrics.com/#selecting-graphite-metrics) for details.
|
||||
|
||||
* BUGFIX: fix `unaligned 64-bit atomic operation` panic on 32-bit architectures, which has been introduced in v1.70.0. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1944).
|
||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): restore the ability to use `$labels.alertname` in labels templating. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1921).
|
||||
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): add missing `query` caption to the input field for the query. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1900).
|
||||
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix navigation over query history with `Ctrl+up/down` and fix zoom relatively to the cursor position. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1936).
|
||||
* BUGFIX: deduplicate samples more thoroughly if [deduplication](https://docs.victoriametrics.com/#deduplication) is enabled. Previously some duplicate samples may be left on disk for time series with high churn rate. This may result in bigger storage space requirements.
|
||||
|
||||
|
||||
## [v1.70.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.70.0)
|
||||
|
|
|
@ -407,6 +407,11 @@ Restoring from backup:
|
|||
3. Start `vmstorage` node.
|
||||
|
||||
|
||||
## Downsampling
|
||||
|
||||
Downsampling is available in [enterprise version of VictoriaMetrics](https://victoriametrics.com/enterprise.html). It is configured with `-downsampling.period` command-line flag. The same flag value must be passed to both `vmstorage` and `vmselect` nodes. See [these docs](https://docs.victoriametrics.com/#downsampling) for details.
|
||||
|
||||
|
||||
## Profiling
|
||||
|
||||
All the cluster components provide the following handlers for [profiling](https://blog.golang.org/profiling-go-programs):
|
||||
|
@ -484,6 +489,8 @@ Below is the output for `/path/to/vminsert -help`:
|
|||
Whether to enable reading flags from environment variables additionally to command line. Command line flag values have priority over values from environment vars. Flags are read only from command line if this flag isn't set. See https://docs.victoriametrics.com/#environment-variables for more details
|
||||
-envflag.prefix string
|
||||
Prefix for environment variables if -envflag.enable is set
|
||||
-eula
|
||||
By specifying this flag, you confirm that you have an enterprise license and accept the EULA https://victoriametrics.com/assets/VM_EULA.pdf
|
||||
-fs.disableMmap
|
||||
Whether to use pread() instead of mmap() for reading data files. By default mmap() is used for 64-bit arches and pread() is used for 32-bit arches, since they cannot read data files bigger than 2^32 bytes in memory. mmap() is usually faster for reading small data chunks than pread()
|
||||
-graphiteListenAddr string
|
||||
|
@ -596,12 +603,17 @@ Below is the output for `/path/to/vmselect -help`:
|
|||
Path to directory for cache files. Cache isn't saved if empty
|
||||
-dedup.minScrapeInterval duration
|
||||
Leave only the first sample in every time series per each discrete interval equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication for details
|
||||
-downsampling.period array
|
||||
Comma-separated downsampling periods in the format 'offset:period'. For example, '30d:10m' instructs to leave a single sample per 10 minutes for samples older than 30 days. See https://docs.victoriametrics.com/#downsampling for details
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
-enableTCP6
|
||||
Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP and UDP is used
|
||||
-envflag.enable
|
||||
Whether to enable reading flags from environment variables additionally to command line. Command line flag values have priority over values from environment vars. Flags are read only from command line if this flag isn't set. See https://docs.victoriametrics.com/#environment-variables for more details
|
||||
-envflag.prefix string
|
||||
Prefix for environment variables if -envflag.enable is set
|
||||
-eula
|
||||
By specifying this flag, you confirm that you have an enterprise license and accept the EULA https://victoriametrics.com/assets/VM_EULA.pdf
|
||||
-fs.disableMmap
|
||||
Whether to use pread() instead of mmap() for reading data files. By default mmap() is used for 64-bit arches and pread() is used for 32-bit arches, since they cannot read data files bigger than 2^32 bytes in memory. mmap() is usually faster for reading small data chunks than pread()
|
||||
-graphiteTrimTimestamp duration
|
||||
|
@ -647,6 +659,10 @@ Below is the output for `/path/to/vmselect -help`:
|
|||
Whether to deny partial responses if a part of -storageNode instances fail to perform queries; this trades availability over consistency; see also -search.maxQueryDuration
|
||||
-search.disableCache
|
||||
Whether to disable response caching. This may be useful during data backfilling
|
||||
-search.graphiteMaxPointsPerSeries int
|
||||
The maximum number of points per series Graphite render API can return (default 1000000)
|
||||
-search.graphiteStorageStep duration
|
||||
The interval between datapoints stored in the database. It is used at Graphite Render API handler for normalizing the interval between datapoints in case it isn't normalized. It can be overriden by sending 'storage_step' query arg to /render API or by sending the desired interval via 'Storage-Step' http header during querying /render API (default 10s)
|
||||
-search.latencyOffset duration
|
||||
The time when data points become visible in query results after the collection. Too small value can result in incomplete last points for query results (default 30s)
|
||||
-search.logSlowQueryDuration duration
|
||||
|
@ -715,12 +731,17 @@ Below is the output for `/path/to/vmstorage -help`:
|
|||
Leave only the first sample in every time series per each discrete interval equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication for details
|
||||
-denyQueriesOutsideRetention
|
||||
Whether to deny queries outside of the configured -retentionPeriod. When set, then /api/v1/query_range would return '503 Service Unavailable' error for queries with 'from' value outside -retentionPeriod. This may be useful when multiple data sources with distinct retentions are hidden behind query-tee
|
||||
-downsampling.period array
|
||||
Comma-separated downsampling periods in the format 'offset:period'. For example, '30d:10m' instructs to leave a single sample per 10 minutes for samples older than 30 days. See https://docs.victoriametrics.com/#downsampling for details
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
-enableTCP6
|
||||
Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP and UDP is used
|
||||
-envflag.enable
|
||||
Whether to enable reading flags from environment variables additionally to command line. Command line flag values have priority over values from environment vars. Flags are read only from command line if this flag isn't set. See https://docs.victoriametrics.com/#environment-variables for more details
|
||||
-envflag.prefix string
|
||||
Prefix for environment variables if -envflag.enable is set
|
||||
-eula
|
||||
By specifying this flag, you confirm that you have an enterprise license and accept the EULA https://victoriametrics.com/assets/VM_EULA.pdf
|
||||
-finalMergeDelay duration
|
||||
The delay before starting final merge for per-month partition after no new data is ingested into it. Final merge may require additional disk IO and CPU resources. Final merge may increase query speed and reduce disk space usage in some cases. Zero value disables final merge
|
||||
-forceFlushAuthKey string
|
||||
|
|
|
@ -423,7 +423,7 @@ Data sent to VictoriaMetrics via `Graphite plaintext protocol` may be read via t
|
|||
|
||||
VictoriaMetrics supports `__graphite__` pseudo-label for selecting time series with Graphite-compatible filters in [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html). For example, `{__graphite__="foo.*.bar"}` is equivalent to `{__name__=~"foo[.][^.]*[.]bar"}`, but it works faster and it is easier to use when migrating from Graphite to VictoriaMetrics. See [docs for Graphite paths and wildcards](https://graphite.readthedocs.io/en/latest/render_api.html#paths-and-wildcards). VictoriaMetrics also supports [label_graphite_group](https://docs.victoriametrics.com/MetricsQL.html#label_graphite_group) function for extracting the given groups from Graphite metric name.
|
||||
|
||||
The `__graphite__` pseudo-label supports e.g. alternate regexp filters such as `(value1|...|valueN)`. They are transparently converted to `{value1,...,valueN}` syntax [used in Graphite](https://graphite.readthedocs.io/en/latest/render_api.html#paths-and-wildcards). This allows using [multi-value template variables in Grafana](https://grafana.com/docs/grafana/latest/variables/formatting-multi-value-variables/) inside `__graphite__` pseudo-label. For example, Grafana expands `{__graphite__=~"foo.$bar.baz"}` into `{__graphite__=~"foo.(x|y).baz"}` if `$bar` template variable contains `x` and `y` values. In this case the query is automatically converted into `{__graphite__=~"foo.{x,y}.baz"}` before execution.
|
||||
The `__graphite__` pseudo-label supports e.g. alternate regexp filters such as `(value1|...|valueN)`. They are transparently converted to `{value1,...,valueN}` syntax [used in Graphite](https://graphite.readthedocs.io/en/latest/render_api.html#paths-and-wildcards). This allows using [multi-value template variables in Grafana](https://grafana.com/docs/grafana/latest/variables/formatting-multi-value-variables/) inside `__graphite__` pseudo-label. For example, Grafana expands `{__graphite__=~"foo.($bar).baz"}` into `{__graphite__=~"foo.(x|y).baz"}` if `$bar` template variable contains `x` and `y` values. In this case the query is automatically converted into `{__graphite__=~"foo.{x,y}.baz"}` before execution.
|
||||
|
||||
## How to send data from OpenTSDB-compatible agents
|
||||
|
||||
|
@ -1171,20 +1171,13 @@ See [these docs](https://docs.victoriametrics.com/guides/guide-vmcluster-multipl
|
|||
|
||||
## Downsampling
|
||||
|
||||
There is no downsampling support at the moment, but:
|
||||
[VictoriaMetrics Enterprise](https://victoriametrics.com/enterprise.html) supports multi-level downsampling with `-downsampling.period` command-line flag. For example:
|
||||
|
||||
* VictoriaMetrics is optimized for querying big amounts of raw data. See benchmark results for heavy queries
|
||||
in [this article](https://medium.com/@valyala/measuring-vertical-scalability-for-time-series-databases-in-google-cloud-92550d78d8ae).
|
||||
* VictoriaMetrics has good compression for on-disk data. See [this article](https://medium.com/@valyala/victoriametrics-achieving-better-compression-for-time-series-data-than-gorilla-317bc1f95932)
|
||||
for details.
|
||||
* The downsampling doesn't improve query performance on a long time range if the time range contains big number of time series due to [high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate). The query performance depends on the number of unique time series on the selected time range, while downsampling doesn't reduce the number of unique time series in the database - it can reduce only the number of samples per each time series.
|
||||
* `-downsampling.period=30d:5m` instructs VictoriaMetrics to [deduplicate](#deduplication) samples older than 30 days with 5 minutes interval.
|
||||
|
||||
These properties reduce the need of downsampling. We plan to implement downsampling in the future.
|
||||
See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/36) for details.
|
||||
* `-downsampling.period=30d:5m,180d:1h` instructs VictoriaMetrics to deduplicate samples older than 30 days with 5 minutes interval and to deduplicate samples older than 180 days with 1 hour interval.
|
||||
|
||||
It is possible to (ab)use [-dedup.minScrapeInterval](#deduplication) for basic downsampling.
|
||||
For instance, if interval between the ingested data points is 15s, then `-dedup.minScrapeInterval=5m` will leave
|
||||
only a single data point out of 20 initial data points per each 5m interval.
|
||||
Downsampling is applied independently per each time series. It can reduce disk space usage and improve query performance if it is applied to time series with big number of samples per each series. The downsampling doesn't improve query performance if the database contains big number of time series with small number of samples per each series (aka [high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate)), since downsamlping doesn't reduce the number of time series. So the majority of time is spent on searching for the matching time series.
|
||||
|
||||
|
||||
## Multi-tenancy
|
||||
|
@ -1575,11 +1568,14 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
|
|||
The maximum size in bytes of a single DataDog POST request to /api/v1/series
|
||||
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 67108864)
|
||||
-dedup.minScrapeInterval duration
|
||||
Leave only the first sample in every time series per each discrete interval equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication for details
|
||||
Leave only the first sample in every time series per each discrete interval equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication and https://docs.victoriametrics.com/#downsampling
|
||||
-deleteAuthKey string
|
||||
authKey for metrics' deletion via /api/v1/admin/tsdb/delete_series and /tags/delSeries
|
||||
-denyQueriesOutsideRetention
|
||||
Whether to deny queries outside of the configured -retentionPeriod. When set, then /api/v1/query_range would return '503 Service Unavailable' error for queries with 'from' value outside -retentionPeriod. This may be useful when multiple data sources with distinct retentions are hidden behind query-tee
|
||||
-downsampling.period array
|
||||
Comma-separated downsampling periods in the format 'offset:period'. For example, '30d:10m' instructs to leave a single sample per 10 minutes for samples older than 30 days. See https://docs.victoriametrics.com/#downsampling for details
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
-dryRun
|
||||
Whether to check only -promscrape.config and then exit. Unknown config entries are allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse
|
||||
-enableTCP6
|
||||
|
@ -1588,6 +1584,8 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
|
|||
Whether to enable reading flags from environment variables additionally to command line. Command line flag values have priority over values from environment vars. Flags are read only from command line if this flag isn't set. See https://docs.victoriametrics.com/#environment-variables for more details
|
||||
-envflag.prefix string
|
||||
Prefix for environment variables if -envflag.enable is set
|
||||
-eula
|
||||
By specifying this flag, you confirm that you have an enterprise license and accept the EULA https://victoriametrics.com/assets/VM_EULA.pdf
|
||||
-finalMergeDelay duration
|
||||
The delay before starting final merge for per-month partition after no new data is ingested into it. Final merge may require additional disk IO and CPU resources. Final merge may increase query speed and reduce disk space usage in some cases. Zero value disables final merge
|
||||
-forceFlushAuthKey string
|
||||
|
@ -1772,6 +1770,10 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
|
|||
Whether to disable automatic response cache reset if a sample with timestamp outside -search.cacheTimestampOffset is inserted into VictoriaMetrics
|
||||
-search.disableCache
|
||||
Whether to disable response caching. This may be useful during data backfilling
|
||||
-search.graphiteMaxPointsPerSeries int
|
||||
The maximum number of points per series Graphite render API can return (default 1000000)
|
||||
-search.graphiteStorageStep duration
|
||||
The interval between datapoints stored in the database. It is used at Graphite Render API handler for normalizing the interval between datapoints in case it isn't normalized. It can be overriden by sending 'storage_step' query arg to /render API or by sending the desired interval via 'Storage-Step' http header during querying /render API (default 10s)
|
||||
-search.latencyOffset duration
|
||||
The time when data points become visible in query results after the collection. Too small value can result in incomplete last points for query results (default 30s)
|
||||
-search.logSlowQueryDuration duration
|
||||
|
|
|
@ -427,7 +427,7 @@ Data sent to VictoriaMetrics via `Graphite plaintext protocol` may be read via t
|
|||
|
||||
VictoriaMetrics supports `__graphite__` pseudo-label for selecting time series with Graphite-compatible filters in [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html). For example, `{__graphite__="foo.*.bar"}` is equivalent to `{__name__=~"foo[.][^.]*[.]bar"}`, but it works faster and it is easier to use when migrating from Graphite to VictoriaMetrics. See [docs for Graphite paths and wildcards](https://graphite.readthedocs.io/en/latest/render_api.html#paths-and-wildcards). VictoriaMetrics also supports [label_graphite_group](https://docs.victoriametrics.com/MetricsQL.html#label_graphite_group) function for extracting the given groups from Graphite metric name.
|
||||
|
||||
The `__graphite__` pseudo-label supports e.g. alternate regexp filters such as `(value1|...|valueN)`. They are transparently converted to `{value1,...,valueN}` syntax [used in Graphite](https://graphite.readthedocs.io/en/latest/render_api.html#paths-and-wildcards). This allows using [multi-value template variables in Grafana](https://grafana.com/docs/grafana/latest/variables/formatting-multi-value-variables/) inside `__graphite__` pseudo-label. For example, Grafana expands `{__graphite__=~"foo.$bar.baz"}` into `{__graphite__=~"foo.(x|y).baz"}` if `$bar` template variable contains `x` and `y` values. In this case the query is automatically converted into `{__graphite__=~"foo.{x,y}.baz"}` before execution.
|
||||
The `__graphite__` pseudo-label supports e.g. alternate regexp filters such as `(value1|...|valueN)`. They are transparently converted to `{value1,...,valueN}` syntax [used in Graphite](https://graphite.readthedocs.io/en/latest/render_api.html#paths-and-wildcards). This allows using [multi-value template variables in Grafana](https://grafana.com/docs/grafana/latest/variables/formatting-multi-value-variables/) inside `__graphite__` pseudo-label. For example, Grafana expands `{__graphite__=~"foo.($bar).baz"}` into `{__graphite__=~"foo.(x|y).baz"}` if `$bar` template variable contains `x` and `y` values. In this case the query is automatically converted into `{__graphite__=~"foo.{x,y}.baz"}` before execution.
|
||||
|
||||
## How to send data from OpenTSDB-compatible agents
|
||||
|
||||
|
@ -1175,20 +1175,13 @@ See [these docs](https://docs.victoriametrics.com/guides/guide-vmcluster-multipl
|
|||
|
||||
## Downsampling
|
||||
|
||||
There is no downsampling support at the moment, but:
|
||||
[VictoriaMetrics Enterprise](https://victoriametrics.com/enterprise.html) supports multi-level downsampling with `-downsampling.period` command-line flag. For example:
|
||||
|
||||
* VictoriaMetrics is optimized for querying big amounts of raw data. See benchmark results for heavy queries
|
||||
in [this article](https://medium.com/@valyala/measuring-vertical-scalability-for-time-series-databases-in-google-cloud-92550d78d8ae).
|
||||
* VictoriaMetrics has good compression for on-disk data. See [this article](https://medium.com/@valyala/victoriametrics-achieving-better-compression-for-time-series-data-than-gorilla-317bc1f95932)
|
||||
for details.
|
||||
* The downsampling doesn't improve query performance on a long time range if the time range contains big number of time series due to [high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate). The query performance depends on the number of unique time series on the selected time range, while downsampling doesn't reduce the number of unique time series in the database - it can reduce only the number of samples per each time series.
|
||||
* `-downsampling.period=30d:5m` instructs VictoriaMetrics to [deduplicate](#deduplication) samples older than 30 days with 5 minutes interval.
|
||||
|
||||
These properties reduce the need of downsampling. We plan to implement downsampling in the future.
|
||||
See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/36) for details.
|
||||
* `-downsampling.period=30d:5m,180d:1h` instructs VictoriaMetrics to deduplicate samples older than 30 days with 5 minutes interval and to deduplicate samples older than 180 days with 1 hour interval.
|
||||
|
||||
It is possible to (ab)use [-dedup.minScrapeInterval](#deduplication) for basic downsampling.
|
||||
For instance, if interval between the ingested data points is 15s, then `-dedup.minScrapeInterval=5m` will leave
|
||||
only a single data point out of 20 initial data points per each 5m interval.
|
||||
Downsampling is applied independently per each time series. It can reduce disk space usage and improve query performance if it is applied to time series with big number of samples per each series. The downsampling doesn't improve query performance if the database contains big number of time series with small number of samples per each series (aka [high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate)), since downsamlping doesn't reduce the number of time series. So the majority of time is spent on searching for the matching time series.
|
||||
|
||||
|
||||
## Multi-tenancy
|
||||
|
@ -1579,11 +1572,14 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
|
|||
The maximum size in bytes of a single DataDog POST request to /api/v1/series
|
||||
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 67108864)
|
||||
-dedup.minScrapeInterval duration
|
||||
Leave only the first sample in every time series per each discrete interval equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication for details
|
||||
Leave only the first sample in every time series per each discrete interval equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication and https://docs.victoriametrics.com/#downsampling
|
||||
-deleteAuthKey string
|
||||
authKey for metrics' deletion via /api/v1/admin/tsdb/delete_series and /tags/delSeries
|
||||
-denyQueriesOutsideRetention
|
||||
Whether to deny queries outside of the configured -retentionPeriod. When set, then /api/v1/query_range would return '503 Service Unavailable' error for queries with 'from' value outside -retentionPeriod. This may be useful when multiple data sources with distinct retentions are hidden behind query-tee
|
||||
-downsampling.period array
|
||||
Comma-separated downsampling periods in the format 'offset:period'. For example, '30d:10m' instructs to leave a single sample per 10 minutes for samples older than 30 days. See https://docs.victoriametrics.com/#downsampling for details
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
-dryRun
|
||||
Whether to check only -promscrape.config and then exit. Unknown config entries are allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse
|
||||
-enableTCP6
|
||||
|
@ -1592,6 +1588,8 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
|
|||
Whether to enable reading flags from environment variables additionally to command line. Command line flag values have priority over values from environment vars. Flags are read only from command line if this flag isn't set. See https://docs.victoriametrics.com/#environment-variables for more details
|
||||
-envflag.prefix string
|
||||
Prefix for environment variables if -envflag.enable is set
|
||||
-eula
|
||||
By specifying this flag, you confirm that you have an enterprise license and accept the EULA https://victoriametrics.com/assets/VM_EULA.pdf
|
||||
-finalMergeDelay duration
|
||||
The delay before starting final merge for per-month partition after no new data is ingested into it. Final merge may require additional disk IO and CPU resources. Final merge may increase query speed and reduce disk space usage in some cases. Zero value disables final merge
|
||||
-forceFlushAuthKey string
|
||||
|
@ -1776,6 +1774,10 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
|
|||
Whether to disable automatic response cache reset if a sample with timestamp outside -search.cacheTimestampOffset is inserted into VictoriaMetrics
|
||||
-search.disableCache
|
||||
Whether to disable response caching. This may be useful during data backfilling
|
||||
-search.graphiteMaxPointsPerSeries int
|
||||
The maximum number of points per series Graphite render API can return (default 1000000)
|
||||
-search.graphiteStorageStep duration
|
||||
The interval between datapoints stored in the database. It is used at Graphite Render API handler for normalizing the interval between datapoints in case it isn't normalized. It can be overriden by sending 'storage_step' query arg to /render API or by sending the desired interval via 'Storage-Step' http header during querying /render API (default 10s)
|
||||
-search.latencyOffset duration
|
||||
The time when data points become visible in query results after the collection. Too small value can result in incomplete last points for query results (default 30s)
|
||||
-search.logSlowQueryDuration duration
|
||||
|
|
|
@ -148,13 +148,26 @@ func (b *Block) tooBig() bool {
|
|||
}
|
||||
|
||||
func (b *Block) deduplicateSamplesDuringMerge() {
|
||||
if len(b.values) == 0 {
|
||||
// Nothing to dedup or the data is already marshaled.
|
||||
if !isDedupEnabled() {
|
||||
// Deduplication is disabled
|
||||
return
|
||||
}
|
||||
// Unmarshal block if it isn't unmarshaled yet in order to apply the de-duplication to unmarshaled samples.
|
||||
if err := b.UnmarshalData(); err != nil {
|
||||
logger.Panicf("FATAL: cannot unmarshal block: %s", err)
|
||||
}
|
||||
srcTimestamps := b.timestamps[b.nextIdx:]
|
||||
if len(srcTimestamps) < 2 {
|
||||
// Nothing to dedup.
|
||||
return
|
||||
}
|
||||
dedupInterval := GetDedupInterval()
|
||||
if dedupInterval <= 0 {
|
||||
// Deduplication is disabled.
|
||||
return
|
||||
}
|
||||
srcValues := b.values[b.nextIdx:]
|
||||
timestamps, values := deduplicateSamplesDuringMerge(srcTimestamps, srcValues)
|
||||
timestamps, values := deduplicateSamplesDuringMerge(srcTimestamps, srcValues, dedupInterval)
|
||||
dedups := len(srcTimestamps) - len(timestamps)
|
||||
atomic.AddUint64(&dedupsDuringMerge, uint64(dedups))
|
||||
b.timestamps = b.timestamps[:b.nextIdx+len(timestamps)]
|
||||
|
|
|
@ -184,11 +184,9 @@ func (bsw *blockStreamWriter) MustClose() {
|
|||
}
|
||||
|
||||
// WriteExternalBlock writes b to bsw and updates ph and rowsMerged.
|
||||
func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsMerged *uint64, needDedup bool) {
|
||||
func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsMerged *uint64) {
|
||||
atomic.AddUint64(rowsMerged, uint64(b.rowsCount()))
|
||||
if needDedup {
|
||||
b.deduplicateSamplesDuringMerge()
|
||||
}
|
||||
b.deduplicateSamplesDuringMerge()
|
||||
headerData, timestampsData, valuesData := b.MarshalData(bsw.timestampsBlockOffset, bsw.valuesBlockOffset)
|
||||
usePrevTimestamps := len(bsw.prevTimestampsData) > 0 && bytes.Equal(timestampsData, bsw.prevTimestampsData)
|
||||
if usePrevTimestamps {
|
||||
|
|
|
@ -49,7 +49,7 @@ func benchmarkBlockStreamWriter(b *testing.B, ebs []Block, rowsCount int, writeR
|
|||
|
||||
bsw.InitFromInmemoryPart(&mp)
|
||||
for i := range ebsCopy {
|
||||
bsw.WriteExternalBlock(&ebsCopy[i], &ph, &rowsMerged, false)
|
||||
bsw.WriteExternalBlock(&ebsCopy[i], &ph, &rowsMerged)
|
||||
}
|
||||
bsw.MustClose()
|
||||
mp.Reset()
|
||||
|
|
|
@ -4,31 +4,37 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
// SetMinScrapeIntervalForDeduplication sets the minimum interval for data points during de-duplication.
|
||||
// SetDedupInterval sets the deduplication interval, which is applied to raw samples during data ingestion and querying.
|
||||
//
|
||||
// De-duplication is disabled if interval is 0.
|
||||
// De-duplication is disabled if dedupInterval is 0.
|
||||
//
|
||||
// This function must be called before initializing the storage.
|
||||
func SetMinScrapeIntervalForDeduplication(interval time.Duration) {
|
||||
minScrapeInterval = interval.Milliseconds()
|
||||
func SetDedupInterval(dedupInterval time.Duration) {
|
||||
globalDedupInterval = dedupInterval.Milliseconds()
|
||||
}
|
||||
|
||||
var minScrapeInterval = int64(0)
|
||||
// GetDedupInterval returns the dedup interval in milliseconds, which has been set via SetDedupInterval.
|
||||
func GetDedupInterval() int64 {
|
||||
return globalDedupInterval
|
||||
}
|
||||
|
||||
// DeduplicateSamples removes samples from src* if they are closer to each other than minScrapeInterval.
|
||||
func DeduplicateSamples(srcTimestamps []int64, srcValues []float64) ([]int64, []float64) {
|
||||
if minScrapeInterval <= 0 {
|
||||
return srcTimestamps, srcValues
|
||||
}
|
||||
if !needsDedup(srcTimestamps, minScrapeInterval) {
|
||||
var globalDedupInterval int64
|
||||
|
||||
func isDedupEnabled() bool {
|
||||
return globalDedupInterval > 0
|
||||
}
|
||||
|
||||
// DeduplicateSamples removes samples from src* if they are closer to each other than dedupInterval in millseconds.
|
||||
func DeduplicateSamples(srcTimestamps []int64, srcValues []float64, dedupInterval int64) ([]int64, []float64) {
|
||||
if !needsDedup(srcTimestamps, dedupInterval) {
|
||||
// Fast path - nothing to deduplicate
|
||||
return srcTimestamps, srcValues
|
||||
}
|
||||
return deduplicateInternal(minScrapeInterval, srcTimestamps, srcValues)
|
||||
return deduplicateInternal(srcTimestamps, srcValues, dedupInterval)
|
||||
}
|
||||
|
||||
func deduplicateInternal(interval int64, srcTimestamps []int64, srcValues []float64) ([]int64, []float64) {
|
||||
tsNext := (srcTimestamps[0] - srcTimestamps[0]%interval) + interval
|
||||
func deduplicateInternal(srcTimestamps []int64, srcValues []float64, dedupInterval int64) ([]int64, []float64) {
|
||||
tsNext := (srcTimestamps[0] - srcTimestamps[0]%dedupInterval) + dedupInterval
|
||||
dstTimestamps := srcTimestamps[:1]
|
||||
dstValues := srcValues[:1]
|
||||
for i := 1; i < len(srcTimestamps); i++ {
|
||||
|
@ -40,28 +46,25 @@ func deduplicateInternal(interval int64, srcTimestamps []int64, srcValues []floa
|
|||
dstValues = append(dstValues, srcValues[i])
|
||||
|
||||
// Update tsNext
|
||||
tsNext += interval
|
||||
tsNext += dedupInterval
|
||||
if ts >= tsNext {
|
||||
// Slow path for updating ts.
|
||||
tsNext = (ts - ts%interval) + interval
|
||||
tsNext = (ts - ts%dedupInterval) + dedupInterval
|
||||
}
|
||||
}
|
||||
return dstTimestamps, dstValues
|
||||
}
|
||||
|
||||
func deduplicateSamplesDuringMerge(srcTimestamps, srcValues []int64) ([]int64, []int64) {
|
||||
if minScrapeInterval <= 0 {
|
||||
return srcTimestamps, srcValues
|
||||
}
|
||||
if !needsDedup(srcTimestamps, minScrapeInterval) {
|
||||
func deduplicateSamplesDuringMerge(srcTimestamps, srcValues []int64, dedupInterval int64) ([]int64, []int64) {
|
||||
if !needsDedup(srcTimestamps, dedupInterval) {
|
||||
// Fast path - nothing to deduplicate
|
||||
return srcTimestamps, srcValues
|
||||
}
|
||||
return deduplicateDuringMergeInternal(minScrapeInterval, srcTimestamps, srcValues)
|
||||
return deduplicateDuringMergeInternal(srcTimestamps, srcValues, dedupInterval)
|
||||
}
|
||||
|
||||
func deduplicateDuringMergeInternal(interval int64, srcTimestamps, srcValues []int64) ([]int64, []int64) {
|
||||
tsNext := (srcTimestamps[0] - srcTimestamps[0]%interval) + interval
|
||||
func deduplicateDuringMergeInternal(srcTimestamps, srcValues []int64, dedupInterval int64) ([]int64, []int64) {
|
||||
tsNext := (srcTimestamps[0] - srcTimestamps[0]%dedupInterval) + dedupInterval
|
||||
dstTimestamps := srcTimestamps[:1]
|
||||
dstValues := srcValues[:1]
|
||||
for i := 1; i < len(srcTimestamps); i++ {
|
||||
|
@ -73,27 +76,27 @@ func deduplicateDuringMergeInternal(interval int64, srcTimestamps, srcValues []i
|
|||
dstValues = append(dstValues, srcValues[i])
|
||||
|
||||
// Update tsNext
|
||||
tsNext += interval
|
||||
tsNext += dedupInterval
|
||||
if ts >= tsNext {
|
||||
// Slow path for updating ts.
|
||||
tsNext = (ts - ts%interval) + interval
|
||||
tsNext = (ts - ts%dedupInterval) + dedupInterval
|
||||
}
|
||||
}
|
||||
return dstTimestamps, dstValues
|
||||
}
|
||||
|
||||
func needsDedup(timestamps []int64, interval int64) bool {
|
||||
if len(timestamps) == 0 || interval <= 0 {
|
||||
func needsDedup(timestamps []int64, dedupInterval int64) bool {
|
||||
if len(timestamps) == 0 || dedupInterval <= 0 {
|
||||
return false
|
||||
}
|
||||
tsNext := (timestamps[0] - timestamps[0]%interval) + interval
|
||||
tsNext := (timestamps[0] - timestamps[0]%dedupInterval) + dedupInterval
|
||||
for _, ts := range timestamps[1:] {
|
||||
if ts < tsNext {
|
||||
return true
|
||||
}
|
||||
tsNext += interval
|
||||
tsNext += dedupInterval
|
||||
if ts >= tsNext {
|
||||
tsNext = (ts - ts%interval) + interval
|
||||
tsNext = (ts - ts%dedupInterval) + dedupInterval
|
||||
}
|
||||
}
|
||||
return false
|
||||
|
|
|
@ -30,18 +30,17 @@ func TestNeedsDedup(t *testing.T) {
|
|||
|
||||
func TestDeduplicateSamples(t *testing.T) {
|
||||
// Disable deduplication before exit, since the rest of tests expect disabled dedup.
|
||||
defer SetMinScrapeIntervalForDeduplication(0)
|
||||
|
||||
f := func(scrapeInterval time.Duration, timestamps, timestampsExpected []int64) {
|
||||
t.Helper()
|
||||
SetMinScrapeIntervalForDeduplication(scrapeInterval)
|
||||
timestampsCopy := make([]int64, len(timestamps))
|
||||
values := make([]float64, len(timestamps))
|
||||
for i, ts := range timestamps {
|
||||
timestampsCopy[i] = ts
|
||||
values[i] = float64(i)
|
||||
}
|
||||
timestampsCopy, values = DeduplicateSamples(timestampsCopy, values)
|
||||
dedupInterval := scrapeInterval.Milliseconds()
|
||||
timestampsCopy, values = DeduplicateSamples(timestampsCopy, values, dedupInterval)
|
||||
if !reflect.DeepEqual(timestampsCopy, timestampsExpected) {
|
||||
t.Fatalf("invalid DeduplicateSamples(%v) result;\ngot\n%v\nwant\n%v", timestamps, timestampsCopy, timestampsExpected)
|
||||
}
|
||||
|
@ -69,9 +68,9 @@ func TestDeduplicateSamples(t *testing.T) {
|
|||
t.Fatalf("superfluous timestamps found starting from index %d: %v", j, timestampsCopy[j:])
|
||||
}
|
||||
|
||||
// Verify that the second call to DeduplicatSamples doesn't modify samples.
|
||||
// Verify that the second call to DeduplicateSamples doesn't modify samples.
|
||||
valuesCopy := append([]float64{}, values...)
|
||||
timestampsCopy, valuesCopy = DeduplicateSamples(timestampsCopy, valuesCopy)
|
||||
timestampsCopy, valuesCopy = DeduplicateSamples(timestampsCopy, valuesCopy, dedupInterval)
|
||||
if !reflect.DeepEqual(timestampsCopy, timestampsExpected) {
|
||||
t.Fatalf("invalid DeduplicateSamples(%v) timestamps for the second call;\ngot\n%v\nwant\n%v", timestamps, timestampsCopy, timestampsExpected)
|
||||
}
|
||||
|
@ -90,18 +89,17 @@ func TestDeduplicateSamples(t *testing.T) {
|
|||
|
||||
func TestDeduplicateSamplesDuringMerge(t *testing.T) {
|
||||
// Disable deduplication before exit, since the rest of tests expect disabled dedup.
|
||||
defer SetMinScrapeIntervalForDeduplication(0)
|
||||
|
||||
f := func(scrapeInterval time.Duration, timestamps, timestampsExpected []int64) {
|
||||
t.Helper()
|
||||
SetMinScrapeIntervalForDeduplication(scrapeInterval)
|
||||
timestampsCopy := make([]int64, len(timestamps))
|
||||
values := make([]int64, len(timestamps))
|
||||
for i, ts := range timestamps {
|
||||
timestampsCopy[i] = ts
|
||||
values[i] = int64(i)
|
||||
}
|
||||
timestampsCopy, values = deduplicateSamplesDuringMerge(timestampsCopy, values)
|
||||
dedupInterval := scrapeInterval.Milliseconds()
|
||||
timestampsCopy, values = deduplicateSamplesDuringMerge(timestampsCopy, values, dedupInterval)
|
||||
if !reflect.DeepEqual(timestampsCopy, timestampsExpected) {
|
||||
t.Fatalf("invalid deduplicateSamplesDuringMerge(%v) result;\ngot\n%v\nwant\n%v", timestamps, timestampsCopy, timestampsExpected)
|
||||
}
|
||||
|
@ -129,9 +127,9 @@ func TestDeduplicateSamplesDuringMerge(t *testing.T) {
|
|||
t.Fatalf("superfluous timestamps found starting from index %d: %v", j, timestampsCopy[j:])
|
||||
}
|
||||
|
||||
// Verify that the second call to DeduplicatSamples doesn't modify samples.
|
||||
// Verify that the second call to DeduplicateSamples doesn't modify samples.
|
||||
valuesCopy := append([]int64{}, values...)
|
||||
timestampsCopy, valuesCopy = deduplicateSamplesDuringMerge(timestampsCopy, valuesCopy)
|
||||
timestampsCopy, valuesCopy = deduplicateSamplesDuringMerge(timestampsCopy, valuesCopy, dedupInterval)
|
||||
if !reflect.DeepEqual(timestampsCopy, timestampsExpected) {
|
||||
t.Fatalf("invalid deduplicateSamplesDuringMerge(%v) timestamps for the second call;\ngot\n%v\nwant\n%v", timestamps, timestampsCopy, timestampsExpected)
|
||||
}
|
||||
|
|
|
@ -15,8 +15,7 @@ func BenchmarkDeduplicateSamples(b *testing.B) {
|
|||
}
|
||||
for _, minScrapeInterval := range []time.Duration{time.Second, 2 * time.Second, 5 * time.Second, 10 * time.Second} {
|
||||
b.Run(fmt.Sprintf("minScrapeInterval=%s", minScrapeInterval), func(b *testing.B) {
|
||||
SetMinScrapeIntervalForDeduplication(minScrapeInterval)
|
||||
defer SetMinScrapeIntervalForDeduplication(0)
|
||||
dedupInterval := minScrapeInterval.Milliseconds()
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(blockSize)
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
|
@ -25,7 +24,7 @@ func BenchmarkDeduplicateSamples(b *testing.B) {
|
|||
for pb.Next() {
|
||||
timestampsCopy := append(timestampsCopy[:0], timestamps...)
|
||||
valuesCopy := append(valuesCopy[:0], values...)
|
||||
ts, vs := DeduplicateSamples(timestampsCopy, valuesCopy)
|
||||
ts, vs := DeduplicateSamples(timestampsCopy, valuesCopy, dedupInterval)
|
||||
if len(ts) == 0 || len(vs) == 0 {
|
||||
panic(fmt.Errorf("expecting non-empty results; got\nts=%v\nvs=%v", ts, vs))
|
||||
}
|
||||
|
|
|
@ -76,14 +76,14 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc
|
|||
if bsm.Block.bh.TSID.Less(&pendingBlock.bh.TSID) {
|
||||
logger.Panicf("BUG: the next TSID=%+v is smaller than the current TSID=%+v", &bsm.Block.bh.TSID, &pendingBlock.bh.TSID)
|
||||
}
|
||||
bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged, true)
|
||||
bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged)
|
||||
pendingBlock.CopyFrom(bsm.Block)
|
||||
continue
|
||||
}
|
||||
if pendingBlock.tooBig() && pendingBlock.bh.MaxTimestamp <= bsm.Block.bh.MinTimestamp {
|
||||
// Fast path - pendingBlock is too big and it doesn't overlap with bsm.Block.
|
||||
// Write the pendingBlock and then deal with bsm.Block.
|
||||
bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged, true)
|
||||
bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged)
|
||||
pendingBlock.CopyFrom(bsm.Block)
|
||||
continue
|
||||
}
|
||||
|
@ -119,13 +119,13 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc
|
|||
tmpBlock.timestamps = tmpBlock.timestamps[:maxRowsPerBlock]
|
||||
tmpBlock.values = tmpBlock.values[:maxRowsPerBlock]
|
||||
tmpBlock.fixupTimestamps()
|
||||
bsw.WriteExternalBlock(tmpBlock, ph, rowsMerged, true)
|
||||
bsw.WriteExternalBlock(tmpBlock, ph, rowsMerged)
|
||||
}
|
||||
if err := bsm.Error(); err != nil {
|
||||
return fmt.Errorf("cannot read block to be merged: %w", err)
|
||||
}
|
||||
if !pendingBlockIsEmpty {
|
||||
bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged, true)
|
||||
bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -1,11 +1,17 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/metricsql"
|
||||
)
|
||||
|
||||
// partHeader represents part header.
|
||||
|
@ -21,6 +27,9 @@ type partHeader struct {
|
|||
|
||||
// MaxTimestamp is the maximum timestamp in the part.
|
||||
MaxTimestamp int64
|
||||
|
||||
// MinDedupInterval is minimal dedup interval in milliseconds across all the blocks in the part.
|
||||
MinDedupInterval int64
|
||||
}
|
||||
|
||||
// String returns string representation of ph.
|
||||
|
@ -104,6 +113,10 @@ func (ph *partHeader) ParseFromPath(path string) error {
|
|||
return fmt.Errorf("blocksCount cannot be bigger than rowsCount; got blocksCount=%d, rowsCount=%d", ph.BlocksCount, ph.RowsCount)
|
||||
}
|
||||
|
||||
if err := ph.readMinDedupInterval(path); err != nil {
|
||||
return fmt.Errorf("cannot read min dedup interval: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -113,4 +126,34 @@ func (ph *partHeader) Reset() {
|
|||
ph.BlocksCount = 0
|
||||
ph.MinTimestamp = (1 << 63) - 1
|
||||
ph.MaxTimestamp = -1 << 63
|
||||
ph.MinDedupInterval = 0
|
||||
}
|
||||
|
||||
func (ph *partHeader) readMinDedupInterval(partPath string) error {
|
||||
filePath := partPath + "/min_dedup_interval"
|
||||
data, err := ioutil.ReadFile(filePath)
|
||||
if err != nil {
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
// The minimum dedup interval may not exist for old parts.
|
||||
ph.MinDedupInterval = 0
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("cannot read %q: %w", filePath, err)
|
||||
}
|
||||
dedupInterval, err := metricsql.DurationValue(string(data), 0)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot parse minimum dedup interval %q at %q: %w", data, filePath, err)
|
||||
}
|
||||
ph.MinDedupInterval = dedupInterval
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ph *partHeader) writeMinDedupInterval(partPath string) error {
|
||||
filePath := partPath + "/min_dedup_interval"
|
||||
dedupInterval := time.Duration(ph.MinDedupInterval) * time.Millisecond
|
||||
data := dedupInterval.String()
|
||||
if err := fs.WriteFileAtomically(filePath, []byte(data)); err != nil {
|
||||
return fmt.Errorf("cannot create %q: %w", filePath, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -835,7 +835,17 @@ func (pt *partition) ForceMergeAllParts() error {
|
|||
// Nothing to merge.
|
||||
return nil
|
||||
}
|
||||
// If len(pws) == 1, then the merge must run anyway, so deleted time series could be removed from the part.
|
||||
|
||||
// Check whether there is enough disk space for merging pws.
|
||||
newPartSize := getPartsSize(pws)
|
||||
maxOutBytes := fs.MustGetFreeSpace(pt.bigPartsPath)
|
||||
if newPartSize > maxOutBytes {
|
||||
freeSpaceNeededBytes := newPartSize - maxOutBytes
|
||||
logger.Warnf("cannot initiate force merge for the partition %s; additional space needed: %d bytes", pt.name, freeSpaceNeededBytes)
|
||||
return nil
|
||||
}
|
||||
|
||||
// If len(pws) == 1, then the merge must run anyway. This allows removing the deleted series and performing de-duplication if needed.
|
||||
if err := pt.mergePartsOptimal(pws, pt.stopCh); err != nil {
|
||||
return fmt.Errorf("cannot force merge %d parts from partition %q: %w", len(pws), pt.name, err)
|
||||
}
|
||||
|
@ -1056,6 +1066,31 @@ func atomicSetBool(p *uint64, b bool) {
|
|||
atomic.StoreUint64(p, v)
|
||||
}
|
||||
|
||||
func (pt *partition) runFinalDedup() error {
|
||||
if !isDedupNeeded(pt) {
|
||||
return nil
|
||||
}
|
||||
t := time.Now()
|
||||
logger.Infof("starting final dedup for partition %s", pt.name)
|
||||
if err := pt.ForceMergeAllParts(); err != nil {
|
||||
return fmt.Errorf("cannot perform final dedup for partition %s: %w", pt.name, err)
|
||||
}
|
||||
logger.Infof("final dedup for partition %s finished in %.3f seconds", pt.name, time.Since(t).Seconds())
|
||||
return nil
|
||||
}
|
||||
|
||||
func isDedupNeeded(pt *partition) bool {
|
||||
pws := pt.GetParts(nil)
|
||||
defer pt.PutParts(pws)
|
||||
dedupInterval := GetDedupInterval()
|
||||
if dedupInterval <= 0 {
|
||||
// The deduplication isn't needed.
|
||||
return false
|
||||
}
|
||||
minDedupInterval := getMinDedupInterval(pws)
|
||||
return minDedupInterval < dedupInterval
|
||||
}
|
||||
|
||||
// mergeParts merges pws.
|
||||
//
|
||||
// Merging is immediately stopped if stopCh is closed.
|
||||
|
@ -1146,6 +1181,11 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro
|
|||
}
|
||||
bsrs = nil
|
||||
|
||||
ph.MinDedupInterval = getMinDedupInterval(pws)
|
||||
if err := ph.writeMinDedupInterval(tmpPartPath); err != nil {
|
||||
return fmt.Errorf("cannot store min dedup interval for part %q: %w", tmpPartPath, err)
|
||||
}
|
||||
|
||||
// Create a transaction for atomic deleting old parts and moving
|
||||
// new part to its destination place.
|
||||
var bb bytesutil.ByteBuffer
|
||||
|
@ -1225,6 +1265,20 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro
|
|||
return nil
|
||||
}
|
||||
|
||||
func getMinDedupInterval(pws []*partWrapper) int64 {
|
||||
if len(pws) == 0 {
|
||||
return 0
|
||||
}
|
||||
dMin := pws[0].p.ph.MinDedupInterval
|
||||
for _, pw := range pws[1:] {
|
||||
d := pw.p.ph.MinDedupInterval
|
||||
if d < dMin {
|
||||
dMin = d
|
||||
}
|
||||
}
|
||||
return dMin
|
||||
}
|
||||
|
||||
func getCompressLevelForRowsCount(rowsCount, blocksCount uint64) int {
|
||||
avgRowsPerBlock := rowsCount / blocksCount
|
||||
if avgRowsPerBlock <= 200 {
|
||||
|
|
|
@ -115,7 +115,7 @@ func (rrm *rawRowsMarshaler) marshalToInmemoryPart(mp *inmemoryPart, rows []rawR
|
|||
|
||||
rrm.auxValues, scale = decimal.AppendFloatToDecimal(rrm.auxValues[:0], rrm.auxFloatValues)
|
||||
tmpBlock.Init(tsid, rrm.auxTimestamps, rrm.auxValues, scale, precisionBits)
|
||||
rrm.bsw.WriteExternalBlock(tmpBlock, ph, &rowsMerged, false)
|
||||
rrm.bsw.WriteExternalBlock(tmpBlock, ph, &rowsMerged)
|
||||
|
||||
tsid = &r.TSID
|
||||
precisionBits = r.PrecisionBits
|
||||
|
@ -125,7 +125,7 @@ func (rrm *rawRowsMarshaler) marshalToInmemoryPart(mp *inmemoryPart, rows []rawR
|
|||
|
||||
rrm.auxValues, scale = decimal.AppendFloatToDecimal(rrm.auxValues[:0], rrm.auxFloatValues)
|
||||
tmpBlock.Init(tsid, rrm.auxTimestamps, rrm.auxValues, scale, precisionBits)
|
||||
rrm.bsw.WriteExternalBlock(tmpBlock, ph, &rowsMerged, false)
|
||||
rrm.bsw.WriteExternalBlock(tmpBlock, ph, &rowsMerged)
|
||||
if rowsMerged != uint64(len(rows)) {
|
||||
logger.Panicf("BUG: unexpected rowsMerged; got %d; want %d", rowsMerged, len(rows))
|
||||
}
|
||||
|
|
|
@ -31,7 +31,8 @@ type table struct {
|
|||
|
||||
stop chan struct{}
|
||||
|
||||
retentionWatcherWG sync.WaitGroup
|
||||
retentionWatcherWG sync.WaitGroup
|
||||
finalDedupWatcherWG sync.WaitGroup
|
||||
}
|
||||
|
||||
// partitionWrapper provides refcounting mechanism for the partition.
|
||||
|
@ -135,6 +136,7 @@ func openTable(path string, getDeletedMetricIDs func() *uint64set.Set, retention
|
|||
tb.addPartitionNolock(pt)
|
||||
}
|
||||
tb.startRetentionWatcher()
|
||||
tb.startFinalDedupWatcher()
|
||||
return tb, nil
|
||||
}
|
||||
|
||||
|
@ -193,6 +195,7 @@ func (tb *table) addPartitionNolock(pt *partition) {
|
|||
func (tb *table) MustClose() {
|
||||
close(tb.stop)
|
||||
tb.retentionWatcherWG.Wait()
|
||||
tb.finalDedupWatcherWG.Wait()
|
||||
|
||||
tb.ptwsLock.Lock()
|
||||
ptws := tb.ptws
|
||||
|
@ -435,6 +438,47 @@ func (tb *table) retentionWatcher() {
|
|||
}
|
||||
}
|
||||
|
||||
func (tb *table) startFinalDedupWatcher() {
|
||||
tb.finalDedupWatcherWG.Add(1)
|
||||
go func() {
|
||||
tb.finalDedupWatcher()
|
||||
tb.finalDedupWatcherWG.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
func (tb *table) finalDedupWatcher() {
|
||||
if !isDedupEnabled() {
|
||||
// Deduplication is disabled.
|
||||
return
|
||||
}
|
||||
f := func() {
|
||||
ptws := tb.GetPartitions(nil)
|
||||
defer tb.PutPartitions(ptws)
|
||||
timestamp := timestampFromTime(time.Now())
|
||||
currentPartitionName := timestampToPartitionName(timestamp)
|
||||
for _, ptw := range ptws {
|
||||
if ptw.pt.name == currentPartitionName {
|
||||
// Do not run final dedup for the current month.
|
||||
continue
|
||||
}
|
||||
if err := ptw.pt.runFinalDedup(); err != nil {
|
||||
logger.Errorf("cannot run final dedup for partition %s: %s", ptw.pt.name, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
t := time.NewTicker(time.Hour)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-tb.stop:
|
||||
return
|
||||
case <-t.C:
|
||||
f()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetPartitions appends tb's partitions snapshot to dst and returns the result.
|
||||
//
|
||||
// The returned partitions must be passed to PutPartitions
|
||||
|
|
Loading…
Reference in a new issue