From e094c8e21465f1cc49558f8e1b7c60512dc7fd62 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 28 Mar 2023 17:00:14 -0700 Subject: [PATCH 01/12] docs: mention that VictoriaMetrics rounds time range to UTC days at /api/v1/labels, /api/v1/label/.../values and /api/v1/series handlers Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3107 --- README.md | 41 +++++++++++++++++++++------ docs/README.md | 41 +++++++++++++++++++++------ docs/Single-server-VictoriaMetrics.md | 41 +++++++++++++++++++++------ docs/url-examples.md | 9 ++++-- docs/vmagent.md | 2 +- 5 files changed, 106 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index 931aecfb9..ff78de46e 100644 --- a/README.md +++ b/README.md @@ -742,20 +742,45 @@ All the Prometheus querying API handlers can be prepended with `/prometheus` pre ### Prometheus querying API enhancements -VictoriaMetrics accepts optional `extra_label==` query arg, which can be used for enforcing additional label filters for queries. For example, -`/api/v1/query_range?extra_label=user_id=123&extra_label=group_id=456&query=` would automatically add `{user_id="123",group_id="456"}` label filters to the given ``. This functionality can be used for limiting the scope of time series visible to the given tenant. It is expected that the `extra_label` query args are automatically set by auth proxy sitting in front of VictoriaMetrics. See [vmauth](https://docs.victoriametrics.com/vmauth.html) and [vmgateway](https://docs.victoriametrics.com/vmgateway.html) as examples of such proxies. +VictoriaMetrics accepts optional `extra_label==` query arg, which can be used +for enforcing additional label filters for queries. For example, `/api/v1/query_range?extra_label=user_id=123&extra_label=group_id=456&query=` +would automatically add `{user_id="123",group_id="456"}` label filters to the given ``. +This functionality can be used for limiting the scope of time series visible to the given tenant. +It is expected that the `extra_label` query args are automatically set by auth proxy sitting in front of VictoriaMetrics. +See [vmauth](https://docs.victoriametrics.com/vmauth.html) and [vmgateway](https://docs.victoriametrics.com/vmgateway.html) as examples of such proxies. -VictoriaMetrics accepts optional `extra_filters[]=series_selector` query arg, which can be used for enforcing arbitrary label filters for queries. For example, -`/api/v1/query_range?extra_filters[]={env=~"prod|staging",user="xyz"}&query=` would automatically add `{env=~"prod|staging",user="xyz"}` label filters to the given ``. This functionality can be used for limiting the scope of time series visible to the given tenant. It is expected that the `extra_filters[]` query args are automatically set by auth proxy sitting in front of VictoriaMetrics. See [vmauth](https://docs.victoriametrics.com/vmauth.html) and [vmgateway](https://docs.victoriametrics.com/vmgateway.html) as examples of such proxies. +VictoriaMetrics accepts optional `extra_filters[]=series_selector` query arg, which can be used for enforcing arbitrary label filters for queries. +For example, `/api/v1/query_range?extra_filters[]={env=~"prod|staging",user="xyz"}&query=` would automatically +add `{env=~"prod|staging",user="xyz"}` label filters to the given ``. This functionality can be used for limiting +the scope of time series visible to the given tenant. It is expected that the `extra_filters[]` query args are automatically +set by auth proxy sitting in front of VictoriaMetrics. +See [vmauth](https://docs.victoriametrics.com/vmauth.html) and [vmgateway](https://docs.victoriametrics.com/vmgateway.html) as examples of such proxies. VictoriaMetrics accepts multiple formats for `time`, `start` and `end` query args - see [these docs](#timestamp-formats). -VictoriaMetrics accepts `round_digits` query arg for `/api/v1/query` and `/api/v1/query_range` handlers. It can be used for rounding response values to the given number of digits after the decimal point. For example, `/api/v1/query?query=avg_over_time(temperature[1h])&round_digits=2` would round response values to up to two digits after the decimal point. +VictoriaMetrics accepts `round_digits` query arg for [/api/v1/query](https://docs.victoriametrics.com/keyConcepts.html#instant-query) +and [/api/v1/query_range](https://docs.victoriametrics.com/keyConcepts.html#range-query) handlers. It can be used for rounding response values +to the given number of digits after the decimal point. +For example, `/api/v1/query?query=avg_over_time(temperature[1h])&round_digits=2` would round response values to up to two digits after the decimal point. -VictoriaMetrics accepts `limit` query arg for `/api/v1/labels` and `/api/v1/label//values` handlers for limiting the number of returned entries. For example, the query to `/api/v1/labels?limit=5` returns a sample of up to 5 unique labels, while ignoring the rest of labels. If the provided `limit` value exceeds the corresponding `-search.maxTagKeys` / `-search.maxTagValues` command-line flag values, then limits specified in the command-line flags are used. +VictoriaMetrics accepts `limit` query arg for [/api/v1/labels](https://docs.victoriametrics.com/url-examples.html#apiv1labels) +and [`/api/v1/label//values`](https://docs.victoriametrics.com/url-examples.html#apiv1labelvalues) handlers for limiting the number of returned entries. +For example, the query to `/api/v1/labels?limit=5` returns a sample of up to 5 unique labels, while ignoring the rest of labels. +If the provided `limit` value exceeds the corresponding `-search.maxTagKeys` / `-search.maxTagValues` command-line flag values, +then limits specified in the command-line flags are used. -By default, VictoriaMetrics returns time series for the last 5 minutes from `/api/v1/series`, `/api/v1/labels` and `/api/v1/label//values` while the Prometheus API defaults to all time. Explicitly set `start` and `end` to select the desired time range. -VictoriaMetrics accepts `limit` query arg for `/api/v1/series` handlers for limiting the number of returned entries. For example, the query to `/api/v1/series?limit=5` returns a sample of up to 5 series, while ignoring the rest. If the provided `limit` value exceeds the corresponding `-search.maxSeries` command-line flag values, then limits specified in the command-line flags are used. +By default, VictoriaMetrics returns time series for the last day starting at 00:00 UTC +from [/api/v1/series](https://docs.victoriametrics.com/url-examples.html#apiv1series), +[/api/v1/labels](https://docs.victoriametrics.com/url-examples.html#apiv1labels) and +[`/api/v1/label//values`](https://docs.victoriametrics.com/url-examples.html#apiv1labelvalues), +while the Prometheus API defaults to all time. Explicitly set `start` and `end` to select the desired time range. +VictoriaMetrics rounds the specified `start..end` time range to day granularity because of performance optimization concerns. +If you need the exact set of label names and label values on the given time range, then send queries +to [/api/v1/query](https://docs.victoriametrics.com/keyConcepts.html#instant-query) or to [/api/v1/query_range](https://docs.victoriametrics.com/keyConcepts.html#range-query). + +VictoriaMetrics accepts `limit` query arg at [/api/v1/series](https://docs.victoriametrics.com/url-examples.html#apiv1series) +for limiting the number of returned entries. For example, the query to `/api/v1/series?limit=5` returns a sample of up to 5 series, while ignoring the rest of series. +If the provided `limit` value exceeds the corresponding `-search.maxSeries` command-line flag values, then limits specified in the command-line flags are used. Additionally, VictoriaMetrics provides the following handlers: diff --git a/docs/README.md b/docs/README.md index 7817851bd..52ff491d1 100644 --- a/docs/README.md +++ b/docs/README.md @@ -743,20 +743,45 @@ All the Prometheus querying API handlers can be prepended with `/prometheus` pre ### Prometheus querying API enhancements -VictoriaMetrics accepts optional `extra_label==` query arg, which can be used for enforcing additional label filters for queries. For example, -`/api/v1/query_range?extra_label=user_id=123&extra_label=group_id=456&query=` would automatically add `{user_id="123",group_id="456"}` label filters to the given ``. This functionality can be used for limiting the scope of time series visible to the given tenant. It is expected that the `extra_label` query args are automatically set by auth proxy sitting in front of VictoriaMetrics. See [vmauth](https://docs.victoriametrics.com/vmauth.html) and [vmgateway](https://docs.victoriametrics.com/vmgateway.html) as examples of such proxies. +VictoriaMetrics accepts optional `extra_label==` query arg, which can be used +for enforcing additional label filters for queries. For example, `/api/v1/query_range?extra_label=user_id=123&extra_label=group_id=456&query=` +would automatically add `{user_id="123",group_id="456"}` label filters to the given ``. +This functionality can be used for limiting the scope of time series visible to the given tenant. +It is expected that the `extra_label` query args are automatically set by auth proxy sitting in front of VictoriaMetrics. +See [vmauth](https://docs.victoriametrics.com/vmauth.html) and [vmgateway](https://docs.victoriametrics.com/vmgateway.html) as examples of such proxies. -VictoriaMetrics accepts optional `extra_filters[]=series_selector` query arg, which can be used for enforcing arbitrary label filters for queries. For example, -`/api/v1/query_range?extra_filters[]={env=~"prod|staging",user="xyz"}&query=` would automatically add `{env=~"prod|staging",user="xyz"}` label filters to the given ``. This functionality can be used for limiting the scope of time series visible to the given tenant. It is expected that the `extra_filters[]` query args are automatically set by auth proxy sitting in front of VictoriaMetrics. See [vmauth](https://docs.victoriametrics.com/vmauth.html) and [vmgateway](https://docs.victoriametrics.com/vmgateway.html) as examples of such proxies. +VictoriaMetrics accepts optional `extra_filters[]=series_selector` query arg, which can be used for enforcing arbitrary label filters for queries. +For example, `/api/v1/query_range?extra_filters[]={env=~"prod|staging",user="xyz"}&query=` would automatically +add `{env=~"prod|staging",user="xyz"}` label filters to the given ``. This functionality can be used for limiting +the scope of time series visible to the given tenant. It is expected that the `extra_filters[]` query args are automatically +set by auth proxy sitting in front of VictoriaMetrics. +See [vmauth](https://docs.victoriametrics.com/vmauth.html) and [vmgateway](https://docs.victoriametrics.com/vmgateway.html) as examples of such proxies. VictoriaMetrics accepts multiple formats for `time`, `start` and `end` query args - see [these docs](#timestamp-formats). -VictoriaMetrics accepts `round_digits` query arg for `/api/v1/query` and `/api/v1/query_range` handlers. It can be used for rounding response values to the given number of digits after the decimal point. For example, `/api/v1/query?query=avg_over_time(temperature[1h])&round_digits=2` would round response values to up to two digits after the decimal point. +VictoriaMetrics accepts `round_digits` query arg for [/api/v1/query](https://docs.victoriametrics.com/keyConcepts.html#instant-query) +and [/api/v1/query_range](https://docs.victoriametrics.com/keyConcepts.html#range-query) handlers. It can be used for rounding response values +to the given number of digits after the decimal point. +For example, `/api/v1/query?query=avg_over_time(temperature[1h])&round_digits=2` would round response values to up to two digits after the decimal point. -VictoriaMetrics accepts `limit` query arg for `/api/v1/labels` and `/api/v1/label//values` handlers for limiting the number of returned entries. For example, the query to `/api/v1/labels?limit=5` returns a sample of up to 5 unique labels, while ignoring the rest of labels. If the provided `limit` value exceeds the corresponding `-search.maxTagKeys` / `-search.maxTagValues` command-line flag values, then limits specified in the command-line flags are used. +VictoriaMetrics accepts `limit` query arg for [/api/v1/labels](https://docs.victoriametrics.com/url-examples.html#apiv1labels) +and [`/api/v1/label//values`](https://docs.victoriametrics.com/url-examples.html#apiv1labelvalues) handlers for limiting the number of returned entries. +For example, the query to `/api/v1/labels?limit=5` returns a sample of up to 5 unique labels, while ignoring the rest of labels. +If the provided `limit` value exceeds the corresponding `-search.maxTagKeys` / `-search.maxTagValues` command-line flag values, +then limits specified in the command-line flags are used. -By default, VictoriaMetrics returns time series for the last 5 minutes from `/api/v1/series`, `/api/v1/labels` and `/api/v1/label//values` while the Prometheus API defaults to all time. Explicitly set `start` and `end` to select the desired time range. -VictoriaMetrics accepts `limit` query arg for `/api/v1/series` handlers for limiting the number of returned entries. For example, the query to `/api/v1/series?limit=5` returns a sample of up to 5 series, while ignoring the rest. If the provided `limit` value exceeds the corresponding `-search.maxSeries` command-line flag values, then limits specified in the command-line flags are used. +By default, VictoriaMetrics returns time series for the last day starting at 00:00 UTC +from [/api/v1/series](https://docs.victoriametrics.com/url-examples.html#apiv1series), +[/api/v1/labels](https://docs.victoriametrics.com/url-examples.html#apiv1labels) and +[`/api/v1/label//values`](https://docs.victoriametrics.com/url-examples.html#apiv1labelvalues), +while the Prometheus API defaults to all time. Explicitly set `start` and `end` to select the desired time range. +VictoriaMetrics rounds the specified `start..end` time range to day granularity because of performance optimization concerns. +If you need the exact set of label names and label values on the given time range, then send queries +to [/api/v1/query](https://docs.victoriametrics.com/keyConcepts.html#instant-query) or to [/api/v1/query_range](https://docs.victoriametrics.com/keyConcepts.html#range-query). + +VictoriaMetrics accepts `limit` query arg at [/api/v1/series](https://docs.victoriametrics.com/url-examples.html#apiv1series) +for limiting the number of returned entries. For example, the query to `/api/v1/series?limit=5` returns a sample of up to 5 series, while ignoring the rest of series. +If the provided `limit` value exceeds the corresponding `-search.maxSeries` command-line flag values, then limits specified in the command-line flags are used. Additionally, VictoriaMetrics provides the following handlers: diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index 625d67898..a9f13a957 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -746,20 +746,45 @@ All the Prometheus querying API handlers can be prepended with `/prometheus` pre ### Prometheus querying API enhancements -VictoriaMetrics accepts optional `extra_label==` query arg, which can be used for enforcing additional label filters for queries. For example, -`/api/v1/query_range?extra_label=user_id=123&extra_label=group_id=456&query=` would automatically add `{user_id="123",group_id="456"}` label filters to the given ``. This functionality can be used for limiting the scope of time series visible to the given tenant. It is expected that the `extra_label` query args are automatically set by auth proxy sitting in front of VictoriaMetrics. See [vmauth](https://docs.victoriametrics.com/vmauth.html) and [vmgateway](https://docs.victoriametrics.com/vmgateway.html) as examples of such proxies. +VictoriaMetrics accepts optional `extra_label==` query arg, which can be used +for enforcing additional label filters for queries. For example, `/api/v1/query_range?extra_label=user_id=123&extra_label=group_id=456&query=` +would automatically add `{user_id="123",group_id="456"}` label filters to the given ``. +This functionality can be used for limiting the scope of time series visible to the given tenant. +It is expected that the `extra_label` query args are automatically set by auth proxy sitting in front of VictoriaMetrics. +See [vmauth](https://docs.victoriametrics.com/vmauth.html) and [vmgateway](https://docs.victoriametrics.com/vmgateway.html) as examples of such proxies. -VictoriaMetrics accepts optional `extra_filters[]=series_selector` query arg, which can be used for enforcing arbitrary label filters for queries. For example, -`/api/v1/query_range?extra_filters[]={env=~"prod|staging",user="xyz"}&query=` would automatically add `{env=~"prod|staging",user="xyz"}` label filters to the given ``. This functionality can be used for limiting the scope of time series visible to the given tenant. It is expected that the `extra_filters[]` query args are automatically set by auth proxy sitting in front of VictoriaMetrics. See [vmauth](https://docs.victoriametrics.com/vmauth.html) and [vmgateway](https://docs.victoriametrics.com/vmgateway.html) as examples of such proxies. +VictoriaMetrics accepts optional `extra_filters[]=series_selector` query arg, which can be used for enforcing arbitrary label filters for queries. +For example, `/api/v1/query_range?extra_filters[]={env=~"prod|staging",user="xyz"}&query=` would automatically +add `{env=~"prod|staging",user="xyz"}` label filters to the given ``. This functionality can be used for limiting +the scope of time series visible to the given tenant. It is expected that the `extra_filters[]` query args are automatically +set by auth proxy sitting in front of VictoriaMetrics. +See [vmauth](https://docs.victoriametrics.com/vmauth.html) and [vmgateway](https://docs.victoriametrics.com/vmgateway.html) as examples of such proxies. VictoriaMetrics accepts multiple formats for `time`, `start` and `end` query args - see [these docs](#timestamp-formats). -VictoriaMetrics accepts `round_digits` query arg for `/api/v1/query` and `/api/v1/query_range` handlers. It can be used for rounding response values to the given number of digits after the decimal point. For example, `/api/v1/query?query=avg_over_time(temperature[1h])&round_digits=2` would round response values to up to two digits after the decimal point. +VictoriaMetrics accepts `round_digits` query arg for [/api/v1/query](https://docs.victoriametrics.com/keyConcepts.html#instant-query) +and [/api/v1/query_range](https://docs.victoriametrics.com/keyConcepts.html#range-query) handlers. It can be used for rounding response values +to the given number of digits after the decimal point. +For example, `/api/v1/query?query=avg_over_time(temperature[1h])&round_digits=2` would round response values to up to two digits after the decimal point. -VictoriaMetrics accepts `limit` query arg for `/api/v1/labels` and `/api/v1/label//values` handlers for limiting the number of returned entries. For example, the query to `/api/v1/labels?limit=5` returns a sample of up to 5 unique labels, while ignoring the rest of labels. If the provided `limit` value exceeds the corresponding `-search.maxTagKeys` / `-search.maxTagValues` command-line flag values, then limits specified in the command-line flags are used. +VictoriaMetrics accepts `limit` query arg for [/api/v1/labels](https://docs.victoriametrics.com/url-examples.html#apiv1labels) +and [`/api/v1/label//values`](https://docs.victoriametrics.com/url-examples.html#apiv1labelvalues) handlers for limiting the number of returned entries. +For example, the query to `/api/v1/labels?limit=5` returns a sample of up to 5 unique labels, while ignoring the rest of labels. +If the provided `limit` value exceeds the corresponding `-search.maxTagKeys` / `-search.maxTagValues` command-line flag values, +then limits specified in the command-line flags are used. -By default, VictoriaMetrics returns time series for the last 5 minutes from `/api/v1/series`, `/api/v1/labels` and `/api/v1/label//values` while the Prometheus API defaults to all time. Explicitly set `start` and `end` to select the desired time range. -VictoriaMetrics accepts `limit` query arg for `/api/v1/series` handlers for limiting the number of returned entries. For example, the query to `/api/v1/series?limit=5` returns a sample of up to 5 series, while ignoring the rest. If the provided `limit` value exceeds the corresponding `-search.maxSeries` command-line flag values, then limits specified in the command-line flags are used. +By default, VictoriaMetrics returns time series for the last day starting at 00:00 UTC +from [/api/v1/series](https://docs.victoriametrics.com/url-examples.html#apiv1series), +[/api/v1/labels](https://docs.victoriametrics.com/url-examples.html#apiv1labels) and +[`/api/v1/label//values`](https://docs.victoriametrics.com/url-examples.html#apiv1labelvalues), +while the Prometheus API defaults to all time. Explicitly set `start` and `end` to select the desired time range. +VictoriaMetrics rounds the specified `start..end` time range to day granularity because of performance optimization concerns. +If you need the exact set of label names and label values on the given time range, then send queries +to [/api/v1/query](https://docs.victoriametrics.com/keyConcepts.html#instant-query) or to [/api/v1/query_range](https://docs.victoriametrics.com/keyConcepts.html#range-query). + +VictoriaMetrics accepts `limit` query arg at [/api/v1/series](https://docs.victoriametrics.com/url-examples.html#apiv1series) +for limiting the number of returned entries. For example, the query to `/api/v1/series?limit=5` returns a sample of up to 5 series, while ignoring the rest of series. +If the provided `limit` value exceeds the corresponding `-search.maxSeries` command-line flag values, then limits specified in the command-line flags are used. Additionally, VictoriaMetrics provides the following handlers: diff --git a/docs/url-examples.md b/docs/url-examples.md index c1f7fcbc5..37f5a4103 100644 --- a/docs/url-examples.md +++ b/docs/url-examples.md @@ -288,7 +288,8 @@ curl http://:8481/select/0/prometheus/api/v1/labels -By default VictoriaMetrics returns labels seen during the last 5 minutes. An arbitrary time range can be set via `start` and `end` query args. +By default VictoriaMetrics returns labels seen during the last day starting at 00:00 UTC. An arbitrary time range can be set via `start` and `end` query args. +The specified `start..end` time range is rounded to day granularity because of performance optimization concerns. Additional information: * [Prometheus querying API usage](https://docs.victoriametrics.com/#prometheus-querying-api-usage) @@ -317,7 +318,8 @@ curl http://:8481/select/0/prometheus/api/v1/label/job/values -By default VictoriaMetrics returns label values seen during the last 5 minutes. An arbitrary time range can be set via `start` and `end` query args. +By default VictoriaMetrics returns labels values seen during the last day starting at 00:00 UTC. An arbitrary time range can be set via `start` and `end` query args. +The specified `start..end` time range is rounded to day granularity because of performance optimization concerns. Additional information: * [Prometheus querying API usage](https://docs.victoriametrics.com/#prometheus-querying-api-usage) @@ -402,7 +404,8 @@ curl http://:8481/select/0/prometheus/api/v1/series -d 'match[]=vm_htt -By default VictoriaMetrics returns time series seen during the last 5 minutes. An arbitrary time range can be set via `start` and `end` query args. +By default VictoriaMetrics returns time series seen during the last day starting at 00:00 UTC. An arbitrary time range can be set via `start` and `end` query args. +The specified `start..end` time range is rounded to day granularity because of performance optimization concerns. Additional information: * [Prometheus querying API usage](https://docs.victoriametrics.com/#prometheus-querying-api-usage) diff --git a/docs/vmagent.md b/docs/vmagent.md index d9789fe7b..5f425b3fa 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -1523,7 +1523,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . -remoteWrite.relabelConfig string Optional path to file with relabeling configs, which are applied to all the metrics before sending them to -remoteWrite.url. See also -remoteWrite.urlRelabelConfig. The path can point either to local file or to http url. See https://docs.victoriametrics.com/vmagent.html#relabeling -remoteWrite.keepDanglingQueues - Keep persistent queues contents in case there are no matching -remoteWrite.url. Useful when -remoteWrite.url is changed temporarily and persistent queue files will be needed later on. + Keep persistent queues contents at -remoteWrite.tmpDataPath in case there are no matching -remoteWrite.url. Useful when -remoteWrite.url is changed temporarily and persistent queue files will be needed later on. -remoteWrite.roundDigits array Round metric values to this number of decimal digits after the point before writing them to remote storage. Examples: -remoteWrite.roundDigits=2 would round 1.236 to 1.24, while -remoteWrite.roundDigits=-1 would round 126.78 to 130. By default digits rounding is disabled. Set it to 100 for disabling it for a particular remote storage. This option may be used for improving data compression for the stored metrics Supports array of values separated by comma or specified via multiple flags. From 94cabf29b0c4bf154562cac387fdd4667201ac11 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 28 Mar 2023 21:10:16 -0700 Subject: [PATCH 02/12] lib/flagutil: ArrayString: support commas inside quoted strings and inside `[]`, `{}` and `()` braces Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3915 --- docs/CHANGELOG.md | 1 + lib/flagutil/array.go | 119 +++++++++++++++++++++++++++---------- lib/flagutil/array_test.go | 52 ++++++++++++++-- 3 files changed, 133 insertions(+), 39 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 36fcce1f3..9f60f9100 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -41,6 +41,7 @@ created by v1.90.0 or newer versions. The solution is to upgrade to v1.90.0 or n * BUGFIX: allow using dashes and dots in environment variables names referred in config files via `%{ENV-VAR.SYNTAX}`. See [these docs](https://docs.victoriametrics.com/#environment-variables) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3999). * BUGFIX: return back query performance scalability on hosts with big number of CPU cores. The scalability has been reduced in [v1.86.0](https://docs.victoriametrics.com/CHANGELOG.html#v1860). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3966). * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly convert [VictoriaMetrics historgram buckets](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350) to Prometheus histogram buckets when VictoriaMetrics histogram contain zero buckets. Previously these buckets were ignored, and this could lead to missing Prometheus histogram buckets after the conversion. Thanks to @zklapow for [the fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4021). +* BUGFIX: properly support comma-separated filters inside [retention filters](https://docs.victoriametrics.com/#retention-filters). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3915). ## [v1.89.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.89.1) diff --git a/lib/flagutil/array.go b/lib/flagutil/array.go index be81f09f7..9db5b6db8 100644 --- a/lib/flagutil/array.go +++ b/lib/flagutil/array.go @@ -59,16 +59,19 @@ func NewArrayBytes(name, description string) *ArrayBytes { // -foo=value1 -foo=value2 // -foo=value1,value2 // -// Flag values may be quoted. For instance, the following arg creates an array of ("a", "b, c") items: +// Each flag value may contain commas inside single quotes, double quotes, [], () or {} braces. +// For example, -foo=[a,b,c] defines a single command-line flag with `[a,b,c]` value. // -// -foo='a,"b, c"' +// Flag values may be quoted. For instance, the following arg creates an array of ("a", "b,c") items: +// +// -foo='a,"b,c"' type ArrayString []string // String implements flag.Value interface func (a *ArrayString) String() string { aEscaped := make([]string, len(*a)) for i, v := range *a { - if strings.ContainsAny(v, `", `+"\n") { + if strings.ContainsAny(v, `,'"{[(`+"\n") { v = fmt.Sprintf("%q", v) } aEscaped[i] = v @@ -94,55 +97,105 @@ func parseArrayValues(s string) []string { if len(tail) == 0 { return values } - if tail[0] == ',' { - tail = tail[1:] - } s = tail + if s[0] == ',' { + s = s[1:] + } } } +var closeQuotes = map[byte]byte{ + '"': '"', + '\'': '\'', + '[': ']', + '{': '}', + '(': ')', +} + func getNextArrayValue(s string) (string, string) { - if len(s) == 0 { - return "", "" + v, tail := getNextArrayValueMaybeQuoted(s) + if strings.HasPrefix(v, `"`) && strings.HasSuffix(v, `"`) { + vUnquoted, err := strconv.Unquote(v) + if err == nil { + return vUnquoted, tail + } + v = v[1 : len(v)-1] + v = strings.ReplaceAll(v, `\"`, `"`) + v = strings.ReplaceAll(v, `\\`, `\`) + return v, tail } - if s[0] != '"' { - // Fast path - unquoted string - n := strings.IndexByte(s, ',') + if strings.HasPrefix(v, `'`) && strings.HasSuffix(v, `'`) { + v = v[1 : len(v)-1] + v = strings.ReplaceAll(v, `\'`, "'") + v = strings.ReplaceAll(v, `\\`, `\`) + return v, tail + } + return v, tail +} + +func getNextArrayValueMaybeQuoted(s string) (string, string) { + idx := 0 + for { + n := strings.IndexAny(s[idx:], `,"'[{(`) if n < 0 { // The last item return s, "" } - return s[:n], s[n:] + idx += n + ch := s[idx] + if ch == ',' { + // The next item + return s[:idx], s[idx:] + } + idx++ + m := indexCloseQuote(s[idx:], closeQuotes[ch]) + idx += m } +} - // Find the end of quoted string - end := 1 - ss := s[1:] +func indexCloseQuote(s string, closeQuote byte) int { + if closeQuote == '"' || closeQuote == '\'' { + idx := 0 + for { + n := strings.IndexByte(s[idx:], closeQuote) + if n < 0 { + return 0 + } + idx += n + if n := getTrailingBackslashesCount(s[:idx]); n%2 == 1 { + // The quote is escaped with backslash. Skip it + idx++ + continue + } + return idx + 1 + } + } + idx := 0 for { - n := strings.IndexByte(ss, '"') + n := strings.IndexAny(s[idx:], `"'[{()}]`) if n < 0 { - // Cannot find trailing quote. Return the whole string till the end. - return s, "" + return 0 } - end += n + 1 - // Verify whether the trailing quote is escaped with backslash. - backslashes := 0 - for n > backslashes && ss[n-backslashes-1] == '\\' { - backslashes++ + idx += n + ch := s[idx] + if ch == closeQuote { + return idx + 1 } - if backslashes&1 == 0 { - // The trailing quote isn't escaped. - break + idx++ + m := indexCloseQuote(s[idx:], closeQuotes[ch]) + if m == 0 { + return 0 } - // The trailing quote is escaped. Continue searching for the next quote. - ss = ss[n+1:] + idx += m } - v := s[:end] - vUnquoted, err := strconv.Unquote(v) - if err == nil { - v = vUnquoted +} + +func getTrailingBackslashesCount(s string) int { + n := len(s) + for n > 0 && s[n-1] == '\\' { + n-- } - return v, s[end:] + return len(s) - n } // GetOptionalArg returns optional arg under the given argIdx. diff --git a/lib/flagutil/array_test.go b/lib/flagutil/array_test.go index 0b0811d92..f64321120 100644 --- a/lib/flagutil/array_test.go +++ b/lib/flagutil/array_test.go @@ -53,15 +53,54 @@ func TestArrayString_Set(t *testing.T) { t.Fatalf("unexpected values parsed;\ngot\n%q\nwant\n%q", a, expectedValues) } } + // Zero args f("", nil) + + // Single arg f(`foo`, []string{`foo`}) - f(`foo,b ar,baz`, []string{`foo`, `b ar`, `baz`}) - f(`foo,b\"'ar,"baz,d`, []string{`foo`, `b\"'ar`, `"baz,d`}) - f(`,foo,,ba"r,`, []string{``, `foo`, ``, `ba"r`, ``}) + f(`fo"o`, []string{`fo"o`}) + f(`fo'o`, []string{`fo'o`}) + f(`fo{o`, []string{`fo{o`}) + f(`fo[o`, []string{`fo[o`}) + f(`fo(o`, []string{`fo(o`}) + + // Single arg with Prometheus label filters + f(`foo{bar="baz",x="y"}`, []string{`foo{bar="baz",x="y"}`}) + f(`foo{bar="ba}z",x="y"}`, []string{`foo{bar="ba}z",x="y"}`}) + f(`foo{bar='baz',x="y"}`, []string{`foo{bar='baz',x="y"}`}) + f(`foo{bar='baz',x='y'}`, []string{`foo{bar='baz',x='y'}`}) + f(`foo{bar='ba}z',x='y'}`, []string{`foo{bar='ba}z',x='y'}`}) + f(`{foo="ba[r",baz='a'}`, []string{`{foo="ba[r",baz='a'}`}) + + // Single arg with JSON + f(`[1,2,3]`, []string{`[1,2,3]`}) + f(`{"foo":"ba,r",baz:x}`, []string{`{"foo":"ba,r",baz:x}`}) + + // Single quoted arg + f(`"foo"`, []string{`foo`}) + f(`"fo,'o"`, []string{`fo,'o`}) + f(`"f\\o,\'\"o"`, []string{`f\o,\'"o`}) + f(`"foo{bar='baz',x='y'}"`, []string{`foo{bar='baz',x='y'}`}) + f(`'foo'`, []string{`foo`}) + f(`'fo,"o'`, []string{`fo,"o`}) + f(`'f\\o,\'\"o'`, []string{`f\o,'\"o`}) + f(`'foo{bar="baz",x="y"}'`, []string{`foo{bar="baz",x="y"}`}) + + // Multiple args + f(`foo,bar,baz`, []string{`foo`, `bar`, `baz`}) + f(`"foo",'bar',{[(ba'",z"`, []string{`foo`, `bar`, `{[(ba'",z"`}) + f(`foo,b"'ar,"baz,d`, []string{`foo`, `b"'ar,"baz`, `d`}) + f(`{foo="b,ar"},baz{x="y",z="d"}`, []string{`{foo="b,ar"}`, `baz{x="y",z="d"}`}) + + // Empty args f(`""`, []string{``}) - f(`"foo,b\nar"`, []string{`foo,b` + "\n" + `ar`}) - f(`"foo","bar",baz`, []string{`foo`, `bar`, `baz`}) - f(`,fo,"\"b, a'\\",,r,`, []string{``, `fo`, `"b, a'\`, ``, `r`, ``}) + f(`''`, []string{``}) + f(`,`, []string{``, ``}) + f(`,foo,,ba"r,`, []string{``, `foo`, ``, `ba"r`, ``}) + + // Special chars inside double quotes + f(`"foo,b\nar"`, []string{"foo,b\nar"}) + f(`"foo\x23bar"`, []string{"foo\x23bar"}) } func TestArrayString_GetOptionalArg(t *testing.T) { @@ -100,6 +139,7 @@ func TestArrayString_String(t *testing.T) { f(",foo,") f(`", foo","b\"ar",`) f(`,"\nfoo\\",bar`) + f(`"foo{bar=~\"baz\",a!=\"b\"}","{a='b,{[(c'}"`) } func TestArrayDuration(t *testing.T) { From 9199c2372067873dc55a0732b164ebd928dee7c1 Mon Sep 17 00:00:00 2001 From: Eliran Barnoy Date: Wed, 29 Mar 2023 12:41:33 +0300 Subject: [PATCH 03/12] Fix operator links to include VMPrometheusConverter for added visibility --- docs/operator/design.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/operator/design.md b/docs/operator/design.md index 725cd189b..d655793a9 100644 --- a/docs/operator/design.md +++ b/docs/operator/design.md @@ -26,8 +26,9 @@ Operator introduces the following custom resources: * [VMAlertmanager](#vmalertmanager) * [VMAlertmanagerConfig](#vmalertmanagerconfig) * [VMRule](#vmrule) +* [VMPrometheusConverter](#vmprometheusconverter) * [VMProbe](#vmprobe) -* [VMNodeScrape](#vmodescrape) +* [VMNodeScrape](#vmnodescrape) * [VMStaticScrape](#vmstaticscrape) * [VMAuth](#vmauth) * [VMUser](#vmuser) From ff72ca14b9d51323e8014f07f424eb9f72d50eb2 Mon Sep 17 00:00:00 2001 From: Alexander Marshalov <_@marshalov.org> Date: Wed, 29 Mar 2023 18:05:58 +0200 Subject: [PATCH 04/12] added hot reload support for stream aggregation configs (#3969) (#3970) added hot reload support for stream aggregation configs (#3969) Signed-off-by: Alexander Marshalov <_@marshalov.org> --- app/vmagent/main.go | 5 +- app/vmagent/remotewrite/remotewrite.go | 83 +++++++++---- app/vmagent/remotewrite/streamagg.go | 118 ++++++++++++++++++ app/vminsert/common/streamaggr.go | 83 ++++++++++++- docs/CHANGELOG.md | 14 +-- docs/stream-aggregation.md | 17 +++ docs/vmagent.md | 2 +- lib/streamaggr/streamaggr.go | 159 ++++++++++++++++++++++--- lib/streamaggr/streamaggr_test.go | 104 +++++++++++++++- 9 files changed, 524 insertions(+), 61 deletions(-) create mode 100644 app/vmagent/remotewrite/streamagg.go diff --git a/app/vmagent/main.go b/app/vmagent/main.go index 9c4a6adda..7c7486935 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -68,7 +68,7 @@ var ( "at -opentsdbHTTPListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt") configAuthKey = flag.String("configAuthKey", "", "Authorization key for accessing /config page. It must be passed via authKey query arg") dryRun = flag.Bool("dryRun", false, "Whether to check only config files without running vmagent. The following files are checked: "+ - "-promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig . "+ + "-promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig, -remoteWrite.streamAggr.config . "+ "Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag") ) @@ -109,6 +109,9 @@ func main() { if err := promscrape.CheckConfig(); err != nil { logger.Fatalf("error when checking -promscrape.config: %s", err) } + if err := remotewrite.CheckStreamAggConfigs(); err != nil { + logger.Fatalf("error when checking -remoteWrite.streamAggr.config: %s", err) + } logger.Infof("all the configs are ok; exiting with 0 status code") return } diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 1ffa377b0..84a662d45 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -65,15 +65,6 @@ var ( "Excess series are logged and dropped. This can be useful for limiting series cardinality. See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter") maxDailySeries = flag.Int("remoteWrite.maxDailySeries", 0, "The maximum number of unique series vmagent can send to remote storage systems during the last 24 hours. "+ "Excess series are logged and dropped. This can be useful for limiting series churn rate. See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter") - - streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config. "+ - "See https://docs.victoriametrics.com/stream-aggregation.html . "+ - "See also -remoteWrite.streamAggr.keepInput and -remoteWrite.streamAggr.dedupInterval") - streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep input samples after the aggregation with -remoteWrite.streamAggr.config. "+ - "By default the input is dropped after the aggregation, so only the aggregate data is sent to the -remoteWrite.url. "+ - "See https://docs.victoriametrics.com/stream-aggregation.html") - streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", "Input samples are de-duplicated with this interval before being aggregated. "+ - "Only the last sample per each time series per each interval is aggregated if the interval is greater than zero") ) var ( @@ -96,6 +87,9 @@ func MultitenancyEnabled() bool { // Contains the current relabelConfigs. var allRelabelConfigs atomic.Value +// Contains the loader for stream aggregation configs. +var saCfgLoader *saConfigsLoader + // maxQueues limits the maximum value for `-remoteWrite.queues`. There is no sense in setting too high value, // since it may lead to high memory usage due to big number of buffers. var maxQueues = cgroup.AvailableCPUs() * 16 @@ -159,8 +153,13 @@ func Init() { } allRelabelConfigs.Store(rcs) - configSuccess.Set(1) - configTimestamp.Set(fasttime.UnixTimestamp()) + relabelConfigSuccess.Set(1) + relabelConfigTimestamp.Set(fasttime.UnixTimestamp()) + + saCfgLoader, err = newSaConfigsLoader(*streamAggrConfig) + if err != nil { + logger.Fatalf("cannot load stream aggregation config: %s", err) + } if len(*remoteWriteURLs) > 0 { rwctxsDefault = newRemoteWriteCtxs(nil, *remoteWriteURLs) @@ -176,29 +175,48 @@ func Init() { case <-stopCh: return } - configReloads.Inc() + relabelConfigReloads.Inc() logger.Infof("SIGHUP received; reloading relabel configs pointed by -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig") rcs, err := loadRelabelConfigs() if err != nil { - configReloadErrors.Inc() - configSuccess.Set(0) + relabelConfigReloadErrors.Inc() + relabelConfigSuccess.Set(0) logger.Errorf("cannot reload relabel configs; preserving the previous configs; error: %s", err) continue } - allRelabelConfigs.Store(rcs) - configSuccess.Set(1) - configTimestamp.Set(fasttime.UnixTimestamp()) + relabelConfigSuccess.Set(1) + relabelConfigTimestamp.Set(fasttime.UnixTimestamp()) logger.Infof("Successfully reloaded relabel configs") + + logger.Infof("reloading stream agg configs pointed by -remoteWrite.streamAggr.config") + err = saCfgLoader.reloadConfigs() + if err != nil { + logger.Errorf("Cannot reload stream aggregation configs: %s", err) + } + if len(*remoteWriteMultitenantURLs) > 0 { + rwctxsMapLock.Lock() + for _, rwctxs := range rwctxsMap { + for _, rwctx := range rwctxs { + rwctx.reinitStreamAggr() + } + } + rwctxsMapLock.Unlock() + } else { + for _, rwctx := range rwctxsDefault { + rwctx.reinitStreamAggr() + } + } + logger.Infof("Successfully reloaded stream aggregation configs") } }() } var ( - configReloads = metrics.NewCounter(`vmagent_relabel_config_reloads_total`) - configReloadErrors = metrics.NewCounter(`vmagent_relabel_config_reloads_errors_total`) - configSuccess = metrics.NewCounter(`vmagent_relabel_config_last_reload_successful`) - configTimestamp = metrics.NewCounter(`vmagent_relabel_config_last_reload_success_timestamp_seconds`) + relabelConfigReloads = metrics.NewCounter(`vmagent_relabel_config_reloads_total`) + relabelConfigReloadErrors = metrics.NewCounter(`vmagent_relabel_config_reloads_errors_total`) + relabelConfigSuccess = metrics.NewCounter(`vmagent_relabel_config_last_reload_successful`) + relabelConfigTimestamp = metrics.NewCounter(`vmagent_relabel_config_last_reload_success_timestamp_seconds`) ) func newRemoteWriteCtxs(at *auth.Token, urls []string) []*remoteWriteCtx { @@ -489,6 +507,7 @@ type remoteWriteCtx struct { c *client sas *streamaggr.Aggregators + saHash uint64 streamAggrKeepInput bool pss []*pendingSeries @@ -548,14 +567,16 @@ func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxI } // Initialize sas - sasFile := streamAggrConfig.GetOptionalArg(argIdx) - if sasFile != "" { + saCfg, saHash := saCfgLoader.getCurrentConfig(argIdx) + if len(saCfg) > 0 { + sasFile := streamAggrConfig.GetOptionalArg(argIdx) dedupInterval := streamAggrDedupInterval.GetOptionalArgOrDefault(argIdx, 0) - sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternal, dedupInterval) + sas, err := streamaggr.NewAggregators(saCfg, rwctx.pushInternal, dedupInterval) if err != nil { logger.Fatalf("cannot initialize stream aggregators from -remoteWrite.streamAggrFile=%q: %s", sasFile, err) } rwctx.sas = sas + rwctx.saHash = saHash rwctx.streamAggrKeepInput = streamAggrKeepInput.GetOptionalArg(argIdx) } @@ -623,6 +644,20 @@ func (rwctx *remoteWriteCtx) pushInternal(tss []prompbmarshal.TimeSeries) { pss[idx].Push(tss) } +func (rwctx *remoteWriteCtx) reinitStreamAggr() { + if rwctx.sas == nil { + return + } + saCfg, saHash := saCfgLoader.getCurrentConfig(rwctx.idx) + if rwctx.saHash == saHash { + return + } + if err := rwctx.sas.ReInitConfigs(saCfg); err != nil { + logger.Errorf("Cannot apply stream aggregation configs %d: %s", rwctx.idx, err) + } + rwctx.saHash = saHash +} + var tssRelabelPool = &sync.Pool{ New: func() interface{} { a := []prompbmarshal.TimeSeries{} diff --git a/app/vmagent/remotewrite/streamagg.go b/app/vmagent/remotewrite/streamagg.go new file mode 100644 index 000000000..b56091f53 --- /dev/null +++ b/app/vmagent/remotewrite/streamagg.go @@ -0,0 +1,118 @@ +package remotewrite + +import ( + "fmt" + "sync/atomic" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr" + "github.com/VictoriaMetrics/metrics" +) + +var ( + streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config. "+ + "See https://docs.victoriametrics.com/stream-aggregation.html . "+ + "See also -remoteWrite.streamAggr.keepInput and -remoteWrite.streamAggr.dedupInterval") + streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep input samples after the aggregation with -remoteWrite.streamAggr.config. "+ + "By default the input is dropped after the aggregation, so only the aggregate data is sent to the -remoteWrite.url. "+ + "See https://docs.victoriametrics.com/stream-aggregation.html") + streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", "Input samples are de-duplicated with this interval before being aggregated. "+ + "Only the last sample per each time series per each interval is aggregated if the interval is greater than zero") +) + +var ( + saCfgReloads = metrics.NewCounter(`vmagent_streamaggr_config_reloads_total`) + saCfgReloadErr = metrics.NewCounter(`vmagent_streamaggr_config_reloads_errors_total`) + saCfgSuccess = metrics.NewCounter(`vmagent_streamaggr_config_last_reload_successful`) + saCfgTimestamp = metrics.NewCounter(`vmagent_streamaggr_config_last_reload_success_timestamp_seconds`) +) + +// saConfigRules - type alias for unmarshalled stream aggregation config +type saConfigRules = []*streamaggr.Config + +// saConfigsLoader loads stream aggregation configs from the given files. +type saConfigsLoader struct { + files []string + configs atomic.Pointer[[]saConfig] +} + +// newSaConfigsLoader creates new saConfigsLoader for the given config files. +func newSaConfigsLoader(configFiles []string) (*saConfigsLoader, error) { + result := &saConfigsLoader{ + files: configFiles, + } + // Initial load of configs. + if err := result.reloadConfigs(); err != nil { + return nil, err + } + return result, nil +} + +// reloadConfigs reloads stream aggregation configs from the files given in constructor. +func (r *saConfigsLoader) reloadConfigs() error { + // Increment reloads counter if it is not the initial load. + if r.configs.Load() != nil { + saCfgReloads.Inc() + } + + // Load all configs from files. + var configs = make([]saConfig, len(r.files)) + for i, path := range r.files { + if len(path) == 0 { + // Skip empty stream aggregation config. + continue + } + rules, hash, err := streamaggr.LoadConfigsFromFile(path) + if err != nil { + saCfgSuccess.Set(0) + saCfgReloadErr.Inc() + return fmt.Errorf("cannot load stream aggregation config from %q: %w", path, err) + } + configs[i] = saConfig{ + path: path, + hash: hash, + rules: rules, + } + } + + // Update configs. + r.configs.Store(&configs) + + saCfgSuccess.Set(1) + saCfgTimestamp.Set(fasttime.UnixTimestamp()) + return nil +} + +// getCurrentConfig returns the current stream aggregation config with the given idx. +func (r *saConfigsLoader) getCurrentConfig(idx int) (saConfigRules, uint64) { + all := r.configs.Load() + if all == nil { + return nil, 0 + } + cfgs := *all + if len(cfgs) == 0 { + return nil, 0 + } + if idx >= len(cfgs) { + if len(cfgs) == 1 { + cfg := cfgs[0] + return cfg.rules, cfg.hash + } + return nil, 0 + } + cfg := cfgs[idx] + return cfg.rules, cfg.hash +} + +type saConfig struct { + path string + hash uint64 + rules saConfigRules +} + +// CheckStreamAggConfigs checks -remoteWrite.streamAggr.config. +func CheckStreamAggConfigs() error { + _, err := newSaConfigsLoader(*streamAggrConfig) + return err +} diff --git a/app/vminsert/common/streamaggr.go b/app/vminsert/common/streamaggr.go index 066a9cacb..b01132791 100644 --- a/app/vminsert/common/streamaggr.go +++ b/app/vminsert/common/streamaggr.go @@ -2,15 +2,19 @@ package common import ( "flag" + "fmt" + "sync" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr" + "github.com/VictoriaMetrics/metrics" ) var ( @@ -24,28 +28,69 @@ var ( "Only the last sample per each time series per each interval is aggregated if the interval is greater than zero") ) +var ( + stopCh = make(chan struct{}) + configReloaderWG sync.WaitGroup + + saCfgReloads = metrics.NewCounter(`vminsert_streamagg_config_reloads_total`) + saCfgReloadErr = metrics.NewCounter(`vminsert_streamagg_config_reloads_errors_total`) + saCfgSuccess = metrics.NewCounter(`vminsert_streamagg_config_last_reload_successful`) + saCfgTimestamp = metrics.NewCounter(`vminsert_streamagg_config_last_reload_success_timestamp_seconds`) + + sa *streamaggr.Aggregators + saHash uint64 +) + // InitStreamAggr must be called after flag.Parse and before using the common package. // // MustStopStreamAggr must be called when stream aggr is no longer needed. func InitStreamAggr() { if *streamAggrConfig == "" { - // Nothing to initialize return } - a, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, *streamAggrDedupInterval) + + sighupCh := procutil.NewSighupChan() + + configs, hash, err := streamaggr.LoadConfigsFromFile(*streamAggrConfig) if err != nil { logger.Fatalf("cannot load -streamAggr.config=%q: %s", *streamAggrConfig, err) } + a, err := streamaggr.NewAggregators(configs, pushAggregateSeries, *streamAggrDedupInterval) + if err != nil { + logger.Fatalf("cannot init -streamAggr.config=%q: %s", *streamAggrConfig, err) + } sa = a + saHash = hash + saCfgSuccess.Set(1) + saCfgTimestamp.Set(fasttime.UnixTimestamp()) + + // Start config reloader. + configReloaderWG.Add(1) + go func() { + defer configReloaderWG.Done() + for { + select { + case <-sighupCh: + case <-stopCh: + return + } + if err := reloadSaConfig(); err != nil { + logger.Errorf("cannot reload -streamAggr.config=%q: %s", *streamAggrConfig, err) + continue + } + } + }() } // MustStopStreamAggr stops stream aggregators. func MustStopStreamAggr() { + close(stopCh) + sa.MustStop() sa = nil -} -var sa *streamaggr.Aggregators + configReloaderWG.Wait() +} type streamAggrCtx struct { mn storage.MetricName @@ -119,3 +164,33 @@ func pushAggregateSeries(tss []prompbmarshal.TimeSeries) { logger.Errorf("cannot flush aggregate series: %s", err) } } + +func reloadSaConfig() error { + saCfgReloads.Inc() + + cfgs, hash, err := streamaggr.LoadConfigsFromFile(*streamAggrConfig) + if err != nil { + saCfgSuccess.Set(0) + saCfgReloadErr.Inc() + return fmt.Errorf("cannot reload -streamAggr.config=%q: %w", *streamAggrConfig, err) + } + + if saHash == hash { + return nil + } + + if err = sa.ReInitConfigs(cfgs); err != nil { + saCfgSuccess.Set(0) + saCfgReloadErr.Inc() + return fmt.Errorf("cannot apply new -streamAggr.config=%q: %w", *streamAggrConfig, err) + } + + saHash = hash + + saCfgSuccess.Set(1) + saCfgTimestamp.Set(fasttime.UnixTimestamp()) + + logger.Infof("Successfully reloaded stream aggregation config") + + return nil +} diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 9f60f9100..301a6dc52 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -26,6 +26,7 @@ created by v1.90.0 or newer versions. The solution is to upgrade to v1.90.0 or n * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-kafka.consumer.topic.concurrency` command-line flag. It controls the number of Kafka consumer workers to use by `vmagent`. It should eliminate the need to start multiple `vmagent` instances to improve data transfer rate. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1957). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [Kafka producer and consumer](https://docs.victoriametrics.com/vmagent.html#kafka-integration) on `arm64` machines. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2271). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): delete unused buffered data at `-remoteWrite.tmpDataPath` directory when there is no matching `-remoteWrite.url` to send this data to. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014). +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for hot reload of stream aggregation configs. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3639). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): automatically draw a heatmap graph when the query selects a single [histogram](https://docs.victoriametrics.com/keyConcepts.html#histogram). This simplifies analyzing histograms. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3384). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add support for drag'n'drop and paste from clipboard in the "Trace analyzer" page. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3971). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): hide messages longer than 3 lines in the trace. You can view the full message by clicking on the `show more` button. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3971). @@ -135,19 +136,6 @@ Released at 2023-02-24 * BUGFIX: properly parse timestamps in milliseconds when [ingesting data via OpenTSDB telnet put protocol](https://docs.victoriametrics.com/#sending-data-via-telnet-put-protocol). Previously timestamps in milliseconds were mistakenly multiplied by 1000. Thanks to @Droxenator for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3810). * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): do not add extrapolated points outside the real points when using [interpolate()](https://docs.victoriametrics.com/MetricsQL.html#interpolate) function. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3816). -## [v1.87.4](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.4) - -Released at 2023-03-25 - -**v1.87.x is a line of LTS releases (e.g. long-time support). It contains important up-to-date bugfixes. -The v1.87.x line will be supported for at least 12 months since [v1.87.0](https://docs.victoriametrics.com/CHANGELOG.html#v1870) release** - -* BUGFIX: prevent from slow [snapshot creating](https://docs.victoriametrics.com/#how-to-work-with-snapshots) under high data ingestion rate. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3551). -* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth.html): suppress [proxy protocol](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt) parsing errors in case of `EOF`. Usually, the error is caused by health checks and is not a sign of an actual error. -* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): fix snapshot not being deleted in case of error during backup. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2055). -* BUGFIX: allow using dashes and dots in environment variables names referred in config files via `%{ENV-VAR.SYNTAX}`. See [these docs](https://docs.victoriametrics.com/#environment-variables) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3999). -* BUGFIX: return back query performance scalability on hosts with big number of CPU cores. The scalability has been reduced in [v1.86.0](https://docs.victoriametrics.com/CHANGELOG.html#v1860). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3966). - ## [v1.87.3](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.3) Released at 2023-03-12 diff --git a/docs/stream-aggregation.md b/docs/stream-aggregation.md index 4eb6c86a4..df983c6ba 100644 --- a/docs/stream-aggregation.md +++ b/docs/stream-aggregation.md @@ -545,3 +545,20 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server- The file can contain multiple aggregation configs. The aggregation is performed independently per each specified config entry. + +### Configuration update + +[vmagent](https://docs.victoriametrics.com/vmagent.html) and +[single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html) support two +approaches for reloading stream aggregation configs from updated config files such as +`-remoteWrite.streamAggr.config` and `-streamAggr.config` without restart. + +* Sending `SIGHUP` signal to `vmagent` process: + + ```console + kill -SIGHUP `pidof vmagent` + ``` + +* Sending HTTP request to `/-/reload` endpoint (e.g. `http://vmagent:8429/-/reload`). + +It will reset the aggregation state only for changed rules in the configuration files. diff --git a/docs/vmagent.md b/docs/vmagent.md index 5f425b3fa..36ca3b697 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -108,7 +108,7 @@ additionally to pull-based Prometheus-compatible targets' scraping: `vmagent` should be restarted in order to update config options set via command-line args. `vmagent` supports multiple approaches for reloading configs from updated config files such as -`-promscrape.config`, `-remoteWrite.relabelConfig` and `-remoteWrite.urlRelabelConfig`: +`-promscrape.config`, `-remoteWrite.relabelConfig`, `-remoteWrite.urlRelabelConfig` and `-remoteWrite.streamAggr.config`: * Sending `SIGHUP` signal to `vmagent` process: diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 3e6199350..c177544f3 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -1,12 +1,14 @@ package streamaggr import ( + "encoding/json" "fmt" "math" "sort" "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -17,6 +19,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" + "github.com/cespare/xxhash/v2" "gopkg.in/yaml.v2" ) @@ -36,22 +39,40 @@ var supportedOutputs = []string{ "quantiles(phi1, ..., phiN)", } -// LoadFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data. +// ParseConfig loads array of stream aggregation configs from the given path. +func ParseConfig(data []byte) ([]*Config, uint64, error) { + var cfgs []*Config + if err := yaml.UnmarshalStrict(data, &cfgs); err != nil { + return nil, 0, fmt.Errorf("cannot parse stream aggregation config %w", err) + } + return cfgs, xxhash.Sum64(data), nil +} + +// LoadConfigsFromFile loads array of stream aggregation configs from the given path. +func LoadConfigsFromFile(path string) ([]*Config, uint64, error) { + data, err := fs.ReadFileOrHTTP(path) + if err != nil { + return nil, 0, fmt.Errorf("cannot load stream aggregation config from %q: %w", path, err) + } + return ParseConfig(data) +} + +// LoadAggregatorsFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data. // // If dedupInterval > 0, then the input samples are de-duplicated before being aggregated, // e.g. only the last sample per each time series per each dedupInterval is aggregated. // // The returned Aggregators must be stopped with MustStop() when no longer needed. -func LoadFromFile(path string, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) { - data, err := fs.ReadFileOrHTTP(path) +func LoadAggregatorsFromFile(path string, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, uint64, error) { + cfgs, configHash, err := LoadConfigsFromFile(path) if err != nil { - return nil, fmt.Errorf("cannot load aggregators: %w", err) + return nil, 0, fmt.Errorf("cannot load stream aggregation config: %w", err) } - as, err := NewAggregatorsFromData(data, pushFunc, dedupInterval) + as, err := NewAggregators(cfgs, pushFunc, dedupInterval) if err != nil { - return nil, fmt.Errorf("cannot initialize aggregators from %q: %w", path, err) + return nil, 0, fmt.Errorf("cannot initialize aggregators from %q: %w", path, err) } - return as, nil + return as, configHash, nil } // NewAggregatorsFromData initializes Aggregators from the given data and uses the given pushFunc for pushing the aggregated data. @@ -127,9 +148,22 @@ type Config struct { OutputRelabelConfigs []promrelabel.RelabelConfig `yaml:"output_relabel_configs,omitempty"` } +func (cfg *Config) hash() (uint64, error) { + if cfg == nil { + return 0, nil + } + data, err := json.Marshal(cfg) + if err != nil { + return 0, fmt.Errorf("cannot marshal stream aggregation rule %+v: %w", cfg, err) + } + return xxhash.Sum64(data), nil +} + // Aggregators aggregates metrics passed to Push and calls pushFunc for aggregate data. type Aggregators struct { - as []*aggregator + as atomic.Pointer[[]*aggregator] + pushFunc PushFunc + dedupInterval time.Duration } // NewAggregators creates Aggregators from the given cfgs. @@ -152,9 +186,13 @@ func NewAggregators(cfgs []*Config, pushFunc PushFunc, dedupInterval time.Durati } as[i] = a } - return &Aggregators{ - as: as, - }, nil + result := &Aggregators{ + pushFunc: pushFunc, + dedupInterval: dedupInterval, + } + result.as.Store(&as) + + return result, nil } // MustStop stops a. @@ -162,7 +200,7 @@ func (a *Aggregators) MustStop() { if a == nil { return } - for _, aggr := range a.as { + for _, aggr := range *a.as.Load() { aggr.MustStop() } } @@ -172,11 +210,74 @@ func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries) { if a == nil { return } - for _, aggr := range a.as { + for _, aggr := range *a.as.Load() { aggr.Push(tss) } } +// ReInitConfigs reinits state of Aggregators a with the given new stream aggregation config +func (a *Aggregators) ReInitConfigs(cfgs []*Config) error { + if a == nil { + return nil + } + + keys := make(map[uint64]struct{}) // set of all keys (configs and aggregators) + cfgsMap := make(map[uint64]*Config) // map of config keys to their indices in cfgs + aggrsMap := make(map[uint64]*aggregator) // map of aggregator keys to their indices in a.as + + for _, cfg := range cfgs { + key, err := cfg.hash() + if err != nil { + return fmt.Errorf("unable to calculate hash for config '%+v': %w", cfg, err) + } + keys[key] = struct{}{} + cfgsMap[key] = cfg + } + for _, aggr := range *a.as.Load() { + keys[aggr.cfgHash] = struct{}{} + aggrsMap[aggr.cfgHash] = aggr + } + + asNew := make([]*aggregator, 0, len(aggrsMap)) + asDel := make([]*aggregator, 0, len(aggrsMap)) + for key := range keys { + cfg, hasCfg := cfgsMap[key] + agg, hasAggr := aggrsMap[key] + + // if config for aggregator was changed or removed + // then we need to stop aggregator and remove it + if !hasCfg && hasAggr { + asDel = append(asDel, agg) + continue + } + + // if there is no aggregator for config (new config), + // then we need to create it + if hasCfg && !hasAggr { + newAgg, err := newAggregator(cfg, a.pushFunc, a.dedupInterval) + if err != nil { + return fmt.Errorf("cannot initialize aggregator for config '%+v': %w", cfg, err) + } + asNew = append(asNew, newAgg) + continue + } + + // if aggregator config was not changed, then we can just keep it + if hasCfg && hasAggr { + asNew = append(asNew, agg) + } + } + + // Atomically replace aggregators array. + a.as.Store(&asNew) + // and stop old aggregators + for _, aggr := range asDel { + aggr.MustStop() + } + + return nil +} + // aggregator aggregates input series according to the config passed to NewAggregator type aggregator struct { match *promrelabel.IfExpression @@ -194,6 +295,7 @@ type aggregator struct { // aggrStates contains aggregate states for the given outputs aggrStates []aggrState + hasState atomic.Bool pushFunc PushFunc @@ -202,7 +304,8 @@ type aggregator struct { // It contains the interval, labels in (by, without), plus output name. // For example, foo_bar metric name is transformed to foo_bar:1m_by_job // for `interval: 1m`, `by: [job]` - suffix string + suffix string + cfgHash uint64 wg sync.WaitGroup stopCh chan struct{} @@ -330,6 +433,11 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration) dedupAggr = newLastAggrState() } + cfgHash, err := cfg.hash() + if err != nil { + return nil, fmt.Errorf("cannot calculate config hash for config %+v: %w", cfg, err) + } + // initialize the aggregator a := &aggregator{ match: cfg.Match, @@ -345,7 +453,8 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration) aggrStates: aggrStates, pushFunc: pushFunc, - suffix: suffix, + suffix: suffix, + cfgHash: cfgHash, stopCh: make(chan struct{}), } @@ -411,8 +520,9 @@ func (a *aggregator) dedupFlush() { skipAggrSuffix: true, } a.dedupAggr.appendSeriesForFlush(ctx) - logger.Errorf("series after dedup: %v", ctx.tss) a.push(ctx.tss) + + a.hasState.Store(false) } func (a *aggregator) flush() { @@ -442,6 +552,8 @@ func (a *aggregator) flush() { // Push the output metrics. a.pushFunc(tss) } + + a.hasState.Store(false) } // MustStop stops the aggregator. @@ -449,11 +561,26 @@ func (a *aggregator) flush() { // The aggregator stops pushing the aggregated metrics after this call. func (a *aggregator) MustStop() { close(a.stopCh) + + if a.hasState.Load() { + if a.dedupAggr != nil { + flushConcurrencyCh <- struct{}{} + a.dedupFlush() + <-flushConcurrencyCh + } + + flushConcurrencyCh <- struct{}{} + a.flush() + <-flushConcurrencyCh + } + a.wg.Wait() } // Push pushes tss to a. func (a *aggregator) Push(tss []prompbmarshal.TimeSeries) { + a.hasState.Store(true) + if a.dedupAggr == nil { a.push(tss) return diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go index a3c002c8d..096830696 100644 --- a/lib/streamaggr/streamaggr_test.go +++ b/lib/streamaggr/streamaggr_test.go @@ -146,7 +146,7 @@ func TestAggregatorsSuccess(t *testing.T) { tssInput := mustParsePromMetrics(inputMetrics) a.Push(tssInput) if a != nil { - for _, aggr := range a.as { + for _, aggr := range *a.as.Load() { aggr.flush() } } @@ -671,7 +671,7 @@ func TestAggregatorsWithDedupInterval(t *testing.T) { tssInput := mustParsePromMetrics(inputMetrics) a.Push(tssInput) if a != nil { - for _, aggr := range a.as { + for _, aggr := range *a.as.Load() { aggr.dedupFlush() aggr.flush() } @@ -719,6 +719,106 @@ foo:1m_sum_samples{baz="qwe"} 10 `) } +func TestAggregatorsReinit(t *testing.T) { + f := func(config, newConfig, inputMetrics, outputMetricsExpected string) { + t.Helper() + + // Initialize Aggregators + var tssOutput []prompbmarshal.TimeSeries + var tssOutputLock sync.Mutex + pushFunc := func(tss []prompbmarshal.TimeSeries) { + tssOutputLock.Lock() + for _, ts := range tss { + labelsCopy := append([]prompbmarshal.Label{}, ts.Labels...) + samplesCopy := append([]prompbmarshal.Sample{}, ts.Samples...) + tssOutput = append(tssOutput, prompbmarshal.TimeSeries{ + Labels: labelsCopy, + Samples: samplesCopy, + }) + } + tssOutputLock.Unlock() + } + + a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0) + if err != nil { + t.Fatalf("cannot initialize aggregators: %s", err) + } + + // Push the inputMetrics to Aggregators + tssInput := mustParsePromMetrics(inputMetrics) + a.Push(tssInput) + + // Reinitialize Aggregators + nc, _, err := ParseConfig([]byte(newConfig)) + if err != nil { + t.Fatalf("cannot parse new config: %s", err) + } + err = a.ReInitConfigs(nc) + if err != nil { + t.Fatalf("cannot reinit aggregators: %s", err) + } + + // Push the inputMetrics to Aggregators + a.Push(tssInput) + if a != nil { + for _, aggr := range *a.as.Load() { + aggr.flush() + } + } + + a.MustStop() + + // Verify the tssOutput contains the expected metrics + tsStrings := make([]string, len(tssOutput)) + for i, ts := range tssOutput { + tsStrings[i] = timeSeriesToString(ts) + } + sort.Strings(tsStrings) + outputMetrics := strings.Join(tsStrings, "") + if outputMetrics != outputMetricsExpected { + t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected) + } + } + + f(` +- interval: 1m + outputs: [count_samples] +`, ` +- interval: 1m + outputs: [sum_samples] +`, ` +foo 123 +bar 567 +foo 234 +`, `bar:1m_count_samples 1 +bar:1m_sum_samples 567 +foo:1m_count_samples 2 +foo:1m_sum_samples 357 +`) + + f(` +- interval: 1m + outputs: [total] +- interval: 2m + outputs: [count_samples] +`, ` +- interval: 1m + outputs: [sum_samples] +- interval: 2m + outputs: [count_samples] +`, ` +foo 123 +bar 567 +foo 234 +`, `bar:1m_sum_samples 567 +bar:1m_total 0 +bar:2m_count_samples 2 +foo:1m_sum_samples 357 +foo:1m_total 111 +foo:2m_count_samples 4 +`) +} + func timeSeriesToString(ts prompbmarshal.TimeSeries) string { labelsString := promrelabel.LabelsToString(ts.Labels) if len(ts.Samples) != 1 { From 0945a0384367326a31d80c0fe451eb247d1ba837 Mon Sep 17 00:00:00 2001 From: Daria Karavaieva Date: Wed, 29 Mar 2023 20:24:06 +0200 Subject: [PATCH 05/12] Vmanomaly guide index fix (#4029) * name and scrutture change * fix indexing * index fix * name change * line separator fix --- docs/guides/guide-vmanomaly-vmalert.md | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/docs/guides/guide-vmanomaly-vmalert.md b/docs/guides/guide-vmanomaly-vmalert.md index 498eede0d..a9f443fc4 100644 --- a/docs/guides/guide-vmanomaly-vmalert.md +++ b/docs/guides/guide-vmanomaly-vmalert.md @@ -1,4 +1,4 @@ -# vmanomaly Quickstart +# Getting started with vmanomaly **Prerequisites** - In the tutorial, we'll be using the following VictoriaMetrics components: @@ -8,7 +8,7 @@ If you're unfamiliar with the listed components, please read [QuickStart](https://docs.victoriametrics.com/Quick-Start.html) first. - It is assumed that you are familiar with [Grafana](https://grafana.com/)(v.9.3.1) and [Docker](https://docs.docker.com/get-docker/) and [Docker Compose](https://docs.docker.com/compose/). -## What is vmanomaly? +## 1. What is vmanomaly? *VictoriaMetrics Anomaly Detection* ([vmanomaly](https://docs.victoriametrics.com/vmanomaly.html)) is a service that continuously scans time series stored in VictoriaMetrics and detects unexpected changes within data patterns in real-time. It does so by utilizing user-configurable machine learning models. All the service parameters are defined in a config file. @@ -27,11 +27,11 @@ The value is designed to: - *exceed 1* if the datapoint is abnormal. Then, users can enable alerting rules based on the **anomaly score** with [vmalert](#what-is-vmalert). -## What is vmalert? +## 2. What is vmalert? [vmalert](https://docs.victoriametrics.com/vmalert.html) is an alerting tool for VictoriaMetrics. It executes a list of the given alerting or recording rules against configured `-datasource.url`. [Alerting rules](https://docs.victoriametrics.com/vmalert.html#alerting-rules) allow you to define conditions that, when met, will notify the user. The alerting condition is defined in a form of a query expression via [MetricsQL query language](https://docs.victoriametrics.com/MetricsQL.html). For example, in our case, the expression `anomaly_score > 1.0` will notify a user when the calculated anomaly score exceeds a threshold of 1. -## How does vmanomaly works with vmalert? +## 3. How does vmanomaly works with vmalert? Compared to classical alerting rules, anomaly detection is more "hands-off" and data-aware. Instead of thinking of critical conditions to define, user can rely on catching anomalies that were not expected to happen. In other words, by setting up alerting rules, a user must know what to look for, ahead of time, while anomaly detection looks for any deviations from past behavior. Practical use case is to put anomaly score generated by vmanomaly into alerting rules with some threshold. @@ -43,9 +43,10 @@ Practical use case is to put anomaly score generated by vmanomaly into alerting - Explore data for analysis in [Grafana](https://grafana.com/). - Explore vmanomaly results. - Explore vmalert alerts + _____________________________ -## Data to analyze +## 4. Data to analyze Let's talk about data used for anomaly detection in this tutorial. We are going to collect our own CPU usage data with [Node Exporter](https://prometheus.io/docs/guides/node-exporter/) into the VictoriaMetrics database. @@ -73,9 +74,10 @@ Here is how this query may look like in Grafana: ![node_cpu_rate_graph](guide-vmanomaly-node-cpu-rate-graph.png "node_cpu_rate_graph") This query result will generate 8 time series per each cpu, and we will use them as an input for our VM Anomaly Detection. vmanomaly will start learning configured model type separately for each of the time series. + ______________________________ -## vmanomaly configuration and parameter description +## 5. vmanomaly configuration and parameter description **Parameter description**: There are 4 main sections in config file: @@ -141,7 +143,7 @@ writer: _____________________________________________ -## vmanomaly output +## 6. vmanomaly output As the result of running vmanomaly, it produces the following metrics: - `anomaly_score` - the main one. Ideally, if it is between 0.0 and 1.0 it is considered to be a non-anomalous value. If it is greater than 1.0, it is considered an anomaly (but you can reconfigure that in alerting config, of course), - `yhat` - predicted expected value, @@ -154,7 +156,7 @@ Here is an example of how output metric will be written into VictoriaMetrics: ____________________________________________ -## vmalert configuration +## 7. vmalert configuration Here we provide an example of the config for vmalert `vmalert_config.yml`.
@@ -176,7 +178,7 @@ groups: In the query expression we need to put a condition on the generated anomaly scores. Usually if the anomaly score is between 0.0 and 1.0, the analyzed value is not abnormal. The more anomaly score exceeded 1 the more our model is sure that value is an anomaly. You can choose your threshold value that you consider reasonable based on the anomaly score metric, generated by vmanomaly. One of the best ways is to estimate it visually, by plotting the `anomaly_score` metric, along with predicted "expected" range of `yhat_lower` and `yhat_upper`. Later in this tutorial we will show an example ____________________________________________ -## Docker Compose configuration +## 8. Docker Compose configuration Now we are going to configure the `docker-compose.yml` file to run all needed services. Here are all services we are going to run: @@ -375,7 +377,7 @@ docker-compose up -d ___________________________________________________________ -## Model results +## 9. Model results To look at model results we need to go to grafana on the `localhost:3000`. Data vmanomaly need some time to generate more data to visualize. Let's investigate model output visualization in Grafana. @@ -410,5 +412,5 @@ On the page `http://localhost:8880/vmalert/groups` you can find our configured A According to the rule configured for vmalert we will see Alert when anomaly score exceed 1. You will see an alert on Alert tab. `http://localhost:8880/vmalert/alerts` ![alerts](guide-vmanomaly-alerts-firing.png "alerts firing") -## Conclusion +## 10. Conclusion Now we know how to set up Victoria Metric Anomaly Detection tool and use it together with vmalert. We also discovered core vmanomaly generated metrics and behaviour. From ec45f1bc5fd1e8d4bffee3d599075961bd96d235 Mon Sep 17 00:00:00 2001 From: Zakhar Bessarab Date: Thu, 30 Mar 2023 14:18:00 +0300 Subject: [PATCH 06/12] lib/fs: verify response code when reading configuration over HTTP (#4036) Verifying status code helps to avoid misleading errors caused by attempt to parse unsuccessful response. Related issue: #4034 Signed-off-by: Zakhar Bessarab --- docs/CHANGELOG.md | 1 + lib/fs/fs.go | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 301a6dc52..051231031 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -43,6 +43,7 @@ created by v1.90.0 or newer versions. The solution is to upgrade to v1.90.0 or n * BUGFIX: return back query performance scalability on hosts with big number of CPU cores. The scalability has been reduced in [v1.86.0](https://docs.victoriametrics.com/CHANGELOG.html#v1860). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3966). * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly convert [VictoriaMetrics historgram buckets](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350) to Prometheus histogram buckets when VictoriaMetrics histogram contain zero buckets. Previously these buckets were ignored, and this could lead to missing Prometheus histogram buckets after the conversion. Thanks to @zklapow for [the fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4021). * BUGFIX: properly support comma-separated filters inside [retention filters](https://docs.victoriametrics.com/#retention-filters). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3915). +* BUGFIX: verify response code when fetching configuration files via HTTP. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4034). ## [v1.89.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.89.1) diff --git a/lib/fs/fs.go b/lib/fs/fs.go index d438ddc90..7da255887 100644 --- a/lib/fs/fs.go +++ b/lib/fs/fs.go @@ -424,6 +424,11 @@ func ReadFileOrHTTP(path string) ([]byte, error) { if err != nil { return nil, fmt.Errorf("cannot fetch %q: %w", path, err) } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status code when fetching %q: %d, expecting %d", path, resp.StatusCode, http.StatusOK) + } + data, err := io.ReadAll(resp.Body) _ = resp.Body.Close() if err != nil { From 4a49577028bd3b8def52d53ee2f75992999bb768 Mon Sep 17 00:00:00 2001 From: Roman Khavronenko Date: Thu, 30 Mar 2023 14:57:00 +0200 Subject: [PATCH 07/12] vmalert: use `missingkey=zero` for templating (#4040) Replace empty labels with "" instead of "" during templating, as Prometheus does. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4012 Signed-off-by: hagen1778 --- app/vmalert/notifier/alert.go | 2 ++ docs/CHANGELOG.md | 1 + 2 files changed, 3 insertions(+) diff --git a/app/vmalert/notifier/alert.go b/app/vmalert/notifier/alert.go index 89dca7e17..9be76aeed 100644 --- a/app/vmalert/notifier/alert.go +++ b/app/vmalert/notifier/alert.go @@ -170,6 +170,8 @@ func templateAnnotation(dst io.Writer, text string, data tplData, tmpl *textTpl. if err != nil { return fmt.Errorf("error cloning template before parse annotation: %w", err) } + // Clone() doesn't copy tpl Options, so we set them manually + tpl = tpl.Option("missingkey=zero") tpl, err = tpl.Parse(text) if err != nil { return fmt.Errorf("error parsing annotation template: %w", err) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 051231031..ede415cbb 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -44,6 +44,7 @@ created by v1.90.0 or newer versions. The solution is to upgrade to v1.90.0 or n * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly convert [VictoriaMetrics historgram buckets](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350) to Prometheus histogram buckets when VictoriaMetrics histogram contain zero buckets. Previously these buckets were ignored, and this could lead to missing Prometheus histogram buckets after the conversion. Thanks to @zklapow for [the fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4021). * BUGFIX: properly support comma-separated filters inside [retention filters](https://docs.victoriametrics.com/#retention-filters). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3915). * BUGFIX: verify response code when fetching configuration files via HTTP. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4034). +* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): replace empty labels with "" instead of "" during templating, as Prometheus does. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4012). ## [v1.89.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.89.1) From 5e5fc66e3b02772293208bf94a565809e21545d4 Mon Sep 17 00:00:00 2001 From: Zakhar Bessarab Date: Thu, 30 Mar 2023 18:21:36 +0300 Subject: [PATCH 08/12] docs/vmctl: add examples of URLs used for migration in different modes (#4042) Signed-off-by: Zakhar Bessarab --- app/vmctl/README.md | 20 +++++++++++++++++++- docs/vmctl.md | 20 +++++++++++++++++++- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/app/vmctl/README.md b/app/vmctl/README.md index 645509994..d2a81ed5a 100644 --- a/app/vmctl/README.md +++ b/app/vmctl/README.md @@ -781,7 +781,25 @@ To avoid such situation try to filter out VM process metrics via `--vm-native-fi 4. `vmctl` doesn't provide relabeling or other types of labels management in this mode. Instead, use [relabeling in VictoriaMetrics](https://github.com/VictoriaMetrics/vmctl/issues/4#issuecomment-683424375). 5. When importing in or from cluster version remember to use correct [URL format](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format) -and specify `accountID` param. +and specify `accountID` param. Example formats: + +```console +# Migrating from cluster to single +--vm-native-src-addr=http://:8481/select/0/prometheus +--vm-native-dst-addr=http://:8428 + + # Migrating from single to cluster +--vm-native-src-addr=http://:8428 +--vm-native-src-addr=http://:8480/insert/0/prometheus + +# Migrating single to single +--vm-native-src-addr=http://:8428 +--vm-native-dst-addr=http://:8428 + +# Migrating cluster to cluster +--vm-native-src-addr=http://:8481/select/0/prometheus +--vm-native-dst-addr=http://:8480/insert/0/prometheus +``` 6. When migrating large volumes of data it might be useful to use `--vm-native-step-interval` flag to split single process into smaller steps. 7. `vmctl` supports `--vm-concurrency` which controls the number of concurrent workers that process the input from source query results. Please note that each import request can load up to a single vCPU core on VictoriaMetrics. So try to set it according diff --git a/docs/vmctl.md b/docs/vmctl.md index d0bef4292..c4a4d0ecb 100644 --- a/docs/vmctl.md +++ b/docs/vmctl.md @@ -785,7 +785,25 @@ To avoid such situation try to filter out VM process metrics via `--vm-native-fi 4. `vmctl` doesn't provide relabeling or other types of labels management in this mode. Instead, use [relabeling in VictoriaMetrics](https://github.com/VictoriaMetrics/vmctl/issues/4#issuecomment-683424375). 5. When importing in or from cluster version remember to use correct [URL format](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format) -and specify `accountID` param. +and specify `accountID` param. Example formats: + +```console +# Migrating from cluster to single +--vm-native-src-addr=http://:8481/select/0/prometheus +--vm-native-dst-addr=http://:8428 + + # Migrating from single to cluster +--vm-native-src-addr=http://:8428 +--vm-native-src-addr=http://:8480/insert/0/prometheus + +# Migrating single to single +--vm-native-src-addr=http://:8428 +--vm-native-dst-addr=http://:8428 + +# Migrating cluster to cluster +--vm-native-src-addr=http://:8481/select/0/prometheus +--vm-native-dst-addr=http://:8480/insert/0/prometheus +``` 6. When migrating large volumes of data it might be useful to use `--vm-native-step-interval` flag to split single process into smaller steps. 7. `vmctl` supports `--vm-concurrency` which controls the number of concurrent workers that process the input from source query results. Please note that each import request can load up to a single vCPU core on VictoriaMetrics. So try to set it according From 59c350d0d280251b1a0a0927cda52411def5c209 Mon Sep 17 00:00:00 2001 From: Max Golionko <8kirk8@gmail.com> Date: Fri, 31 Mar 2023 22:29:44 +0800 Subject: [PATCH 09/12] fix: app/vmui/Dockerfile-web to reduce vulnerabilities (#4044) The following vulnerabilities are fixed with an upgrade: - https://snyk.io/vuln/SNYK-ALPINE317-OPENSSL-3368755 - https://snyk.io/vuln/SNYK-ALPINE317-OPENSSL-3368755 - https://snyk.io/vuln/SNYK-ALPINE317-OPENSSL-5291795 - https://snyk.io/vuln/SNYK-ALPINE317-OPENSSL-5291795 Co-authored-by: snyk-bot --- app/vmui/Dockerfile-web | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/vmui/Dockerfile-web b/app/vmui/Dockerfile-web index 282617201..bf19ceeab 100644 --- a/app/vmui/Dockerfile-web +++ b/app/vmui/Dockerfile-web @@ -6,7 +6,7 @@ COPY web/ /build/ RUN GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o web-amd64 github.com/VictoriMetrics/vmui/ && \ GOOS=windows GOARCH=amd64 CGO_ENABLED=0 go build -o web-windows github.com/VictoriMetrics/vmui/ -FROM alpine:3.17.2 +FROM alpine:3.17.3 USER root COPY --from=build-web-stage /build/web-amd64 /app/web From d577657fb705ac3a6f3a79ab9ac8c33d9712b808 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 31 Mar 2023 21:27:45 -0700 Subject: [PATCH 10/12] lib/streamaggr: follow-up for ff72ca14b9d51323e8014f07f424eb9f72d50eb2 - Make sure that the last successfully loaded config is used on hot-reload failure - Properly cleanup resources occupied by already initialized aggregators when the current aggregator fails to be initialized - Expose distinct vmagent_streamaggr_config_reload* metrics per each -remoteWrite.streamAggr.config This should simplify monitoring and debugging failed reloads - Remove race condition at app/vminsert/common.MustStopStreamAggr when calling sa.MustStop() while sa could be in use at realoadSaConfig() - Remove lib/streamaggr.aggregator.hasState global variable, since it may negatively impact scalability on system with big number of CPU cores at hasState.Store(true) call inside aggregator.Push(). - Remove fine-grained aggregator reload - reload all the aggregators on config change instead. This simplifies the code a bit. The fine-grained aggregator reload may be returned back if there will be demand from real users for it. - Check -relabelConfig and -streamAggr.config files when single-node VictoriaMetrics runs with -dryRun flag - Return back accidentally removed changelog for v1.87.4 at docs/CHANGELOG.md Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3639 --- README.md | 2 +- app/victoria-metrics/main.go | 13 +- app/vmagent/README.md | 4 +- app/vmagent/main.go | 10 +- app/vmagent/remotewrite/remotewrite.go | 166 +++++++++++++--------- app/vmagent/remotewrite/streamagg.go | 118 ---------------- app/vminsert/common/insert_ctx.go | 3 +- app/vminsert/common/streamaggr.go | 108 +++++++------- app/vminsert/relabel/relabel.go | 6 + docs/CHANGELOG.md | 16 ++- docs/README.md | 2 +- docs/Single-server-VictoriaMetrics.md | 2 +- docs/stream-aggregation.md | 14 +- docs/vmagent.md | 2 +- lib/promscrape/scraper.go | 2 +- lib/streamaggr/streamaggr.go | 186 ++++++------------------- lib/streamaggr/streamaggr_test.go | 146 ++++++------------- 17 files changed, 291 insertions(+), 509 deletions(-) delete mode 100644 app/vmagent/remotewrite/streamagg.go diff --git a/README.md b/README.md index ff78de46e..20c3b3a64 100644 --- a/README.md +++ b/README.md @@ -2193,7 +2193,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li Comma-separated downsampling periods in the format 'offset:period'. For example, '30d:10m' instructs to leave a single sample per 10 minutes for samples older than 30 days. See https://docs.victoriametrics.com/#downsampling for details. This flag is available only in VictoriaMetrics enterprise. See https://docs.victoriametrics.com/enterprise.html Supports an array of values separated by comma or specified via multiple flags. -dryRun - Whether to check only -promscrape.config and then exit. Unknown config entries aren't allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse=false command-line flag + Whether to check config files without running VictoriaMetrics. The following config files are checked: -promscrape.config, -relabelConfig and -streamAggr.config. Unknown config entries aren't allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse=false command-line flag -enableTCP6 Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP and UDP is used -envflag.enable diff --git a/app/victoria-metrics/main.go b/app/victoria-metrics/main.go index 848d592b5..a37f43b2e 100644 --- a/app/victoria-metrics/main.go +++ b/app/victoria-metrics/main.go @@ -8,6 +8,8 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert" + vminsertcommon "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" + vminsertrelabel "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" @@ -30,8 +32,9 @@ var ( "With enabled proxy protocol http server cannot serve regular /metrics endpoint. Use -pushmetrics.url for metrics pushing") minScrapeInterval = flag.Duration("dedup.minScrapeInterval", 0, "Leave only the last sample in every time series per each discrete interval "+ "equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication and https://docs.victoriametrics.com/#downsampling") - dryRun = flag.Bool("dryRun", false, "Whether to check only -promscrape.config and then exit. "+ - "Unknown config entries aren't allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse=false command-line flag") + dryRun = flag.Bool("dryRun", false, "Whether to check config files without running VictoriaMetrics. The following config files are checked: "+ + "-promscrape.config, -relabelConfig and -streamAggr.config. Unknown config entries aren't allowed in -promscrape.config by default. "+ + "This can be changed with -promscrape.config.strictParse=false command-line flag") inmemoryDataFlushInterval = flag.Duration("inmemoryDataFlushInterval", 5*time.Second, "The interval for guaranteed saving of in-memory data to disk. "+ "The saved data survives unclean shutdown such as OOM crash, hardware reset, SIGKILL, etc. "+ "Bigger intervals may help increasing lifetime of flash storage with limited write cycles (e.g. Raspberry PI). "+ @@ -54,6 +57,12 @@ func main() { if err := promscrape.CheckConfig(); err != nil { logger.Fatalf("error when checking -promscrape.config: %s", err) } + if err := vminsertrelabel.CheckRelabelConfig(); err != nil { + logger.Fatalf("error when checking -relabelConfig: %s", err) + } + if err := vminsertcommon.CheckStreamAggrConfig(); err != nil { + logger.Fatalf("error when checking -streamAggr.config: %s", err) + } logger.Infof("-promscrape.config is ok; exiting with 0 status code") return } diff --git a/app/vmagent/README.md b/app/vmagent/README.md index 3fa07a66f..5a65edec7 100644 --- a/app/vmagent/README.md +++ b/app/vmagent/README.md @@ -104,7 +104,7 @@ additionally to pull-based Prometheus-compatible targets' scraping: `vmagent` should be restarted in order to update config options set via command-line args. `vmagent` supports multiple approaches for reloading configs from updated config files such as -`-promscrape.config`, `-remoteWrite.relabelConfig` and `-remoteWrite.urlRelabelConfig`: +`-promscrape.config`, `-remoteWrite.relabelConfig`, `-remoteWrite.urlRelabelConfig` and `-remoteWrite.streamAggr.config`: * Sending `SIGHUP` signal to `vmagent` process: @@ -1186,7 +1186,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . -denyQueryTracing Whether to disable the ability to trace queries. See https://docs.victoriametrics.com/#query-tracing -dryRun - Whether to check only config files without running vmagent. The following files are checked: -promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig . Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag + Whether to check config files without running vmagent. The following files are checked: -promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig, -remoteWrite.streamAggr.config . Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag -enableTCP6 Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP and UDP is used -envflag.enable diff --git a/app/vmagent/main.go b/app/vmagent/main.go index 7c7486935..a0db615b8 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -67,7 +67,7 @@ var ( opentsdbHTTPUseProxyProtocol = flag.Bool("opentsdbHTTPListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted "+ "at -opentsdbHTTPListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt") configAuthKey = flag.String("configAuthKey", "", "Authorization key for accessing /config page. It must be passed via authKey query arg") - dryRun = flag.Bool("dryRun", false, "Whether to check only config files without running vmagent. The following files are checked: "+ + dryRun = flag.Bool("dryRun", false, "Whether to check config files without running vmagent. The following files are checked: "+ "-promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig, -remoteWrite.streamAggr.config . "+ "Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag") ) @@ -103,13 +103,13 @@ func main() { return } if *dryRun { - if err := remotewrite.CheckRelabelConfigs(); err != nil { - logger.Fatalf("error when checking relabel configs: %s", err) - } if err := promscrape.CheckConfig(); err != nil { logger.Fatalf("error when checking -promscrape.config: %s", err) } - if err := remotewrite.CheckStreamAggConfigs(); err != nil { + if err := remotewrite.CheckRelabelConfigs(); err != nil { + logger.Fatalf("error when checking relabel configs: %s", err) + } + if err := remotewrite.CheckStreamAggrConfigs(); err != nil { logger.Fatalf("error when checking -remoteWrite.streamAggr.config: %s", err) } logger.Infof("all the configs are ok; exiting with 0 status code") diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 84a662d45..1b5390fe9 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -65,6 +65,15 @@ var ( "Excess series are logged and dropped. This can be useful for limiting series cardinality. See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter") maxDailySeries = flag.Int("remoteWrite.maxDailySeries", 0, "The maximum number of unique series vmagent can send to remote storage systems during the last 24 hours. "+ "Excess series are logged and dropped. This can be useful for limiting series churn rate. See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter") + + streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config. "+ + "See https://docs.victoriametrics.com/stream-aggregation.html . "+ + "See also -remoteWrite.streamAggr.keepInput and -remoteWrite.streamAggr.dedupInterval") + streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep input samples after the aggregation with -remoteWrite.streamAggr.config. "+ + "By default the input is dropped after the aggregation, so only the aggregate data is sent to the -remoteWrite.url. "+ + "See https://docs.victoriametrics.com/stream-aggregation.html") + streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", "Input samples are de-duplicated with this interval before being aggregated. "+ + "Only the last sample per each time series per each interval is aggregated if the interval is greater than zero") ) var ( @@ -87,9 +96,6 @@ func MultitenancyEnabled() bool { // Contains the current relabelConfigs. var allRelabelConfigs atomic.Value -// Contains the loader for stream aggregation configs. -var saCfgLoader *saConfigsLoader - // maxQueues limits the maximum value for `-remoteWrite.queues`. There is no sense in setting too high value, // since it may lead to high memory usage due to big number of buffers. var maxQueues = cgroup.AvailableCPUs() * 16 @@ -152,15 +158,9 @@ func Init() { logger.Fatalf("cannot load relabel configs: %s", err) } allRelabelConfigs.Store(rcs) - relabelConfigSuccess.Set(1) relabelConfigTimestamp.Set(fasttime.UnixTimestamp()) - saCfgLoader, err = newSaConfigsLoader(*streamAggrConfig) - if err != nil { - logger.Fatalf("cannot load stream aggregation config: %s", err) - } - if len(*remoteWriteURLs) > 0 { rwctxsDefault = newRemoteWriteCtxs(nil, *remoteWriteURLs) } @@ -172,46 +172,31 @@ func Init() { for { select { case <-sighupCh: - case <-stopCh: + case <-configReloaderStopCh: return } - relabelConfigReloads.Inc() - logger.Infof("SIGHUP received; reloading relabel configs pointed by -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig") - rcs, err := loadRelabelConfigs() - if err != nil { - relabelConfigReloadErrors.Inc() - relabelConfigSuccess.Set(0) - logger.Errorf("cannot reload relabel configs; preserving the previous configs; error: %s", err) - continue - } - allRelabelConfigs.Store(rcs) - relabelConfigSuccess.Set(1) - relabelConfigTimestamp.Set(fasttime.UnixTimestamp()) - logger.Infof("Successfully reloaded relabel configs") - - logger.Infof("reloading stream agg configs pointed by -remoteWrite.streamAggr.config") - err = saCfgLoader.reloadConfigs() - if err != nil { - logger.Errorf("Cannot reload stream aggregation configs: %s", err) - } - if len(*remoteWriteMultitenantURLs) > 0 { - rwctxsMapLock.Lock() - for _, rwctxs := range rwctxsMap { - for _, rwctx := range rwctxs { - rwctx.reinitStreamAggr() - } - } - rwctxsMapLock.Unlock() - } else { - for _, rwctx := range rwctxsDefault { - rwctx.reinitStreamAggr() - } - } - logger.Infof("Successfully reloaded stream aggregation configs") + reloadRelabelConfigs() + reloadStreamAggrConfigs() } }() } +func reloadRelabelConfigs() { + relabelConfigReloads.Inc() + logger.Infof("reloading relabel configs pointed by -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig") + rcs, err := loadRelabelConfigs() + if err != nil { + relabelConfigReloadErrors.Inc() + relabelConfigSuccess.Set(0) + logger.Errorf("cannot reload relabel configs; preserving the previous configs; error: %s", err) + return + } + allRelabelConfigs.Store(rcs) + relabelConfigSuccess.Set(1) + relabelConfigTimestamp.Set(fasttime.UnixTimestamp()) + logger.Infof("successfully reloaded relabel configs") +} + var ( relabelConfigReloads = metrics.NewCounter(`vmagent_relabel_config_reloads_total`) relabelConfigReloadErrors = metrics.NewCounter(`vmagent_relabel_config_reloads_errors_total`) @@ -219,6 +204,24 @@ var ( relabelConfigTimestamp = metrics.NewCounter(`vmagent_relabel_config_last_reload_success_timestamp_seconds`) ) +func reloadStreamAggrConfigs() { + if len(*remoteWriteMultitenantURLs) > 0 { + rwctxsMapLock.Lock() + for _, rwctxs := range rwctxsMap { + reinitStreamAggr(rwctxs) + } + rwctxsMapLock.Unlock() + } else { + reinitStreamAggr(rwctxsDefault) + } +} + +func reinitStreamAggr(rwctxs []*remoteWriteCtx) { + for _, rwctx := range rwctxs { + rwctx.reinitStreamAggr() + } +} + func newRemoteWriteCtxs(at *auth.Token, urls []string) []*remoteWriteCtx { if len(urls) == 0 { logger.Panicf("BUG: urls must be non-empty") @@ -284,14 +287,14 @@ func newRemoteWriteCtxs(at *auth.Token, urls []string) []*remoteWriteCtx { return rwctxs } -var stopCh = make(chan struct{}) +var configReloaderStopCh = make(chan struct{}) var configReloaderWG sync.WaitGroup // Stop stops remotewrite. // // It is expected that nobody calls Push during and after the call to this func. func Stop() { - close(stopCh) + close(configReloaderStopCh) configReloaderWG.Wait() for _, rwctx := range rwctxsDefault { @@ -506,8 +509,7 @@ type remoteWriteCtx struct { fq *persistentqueue.FastQueue c *client - sas *streamaggr.Aggregators - saHash uint64 + sas atomic.Pointer[streamaggr.Aggregators] streamAggrKeepInput bool pss []*pendingSeries @@ -567,17 +569,17 @@ func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxI } // Initialize sas - saCfg, saHash := saCfgLoader.getCurrentConfig(argIdx) - if len(saCfg) > 0 { - sasFile := streamAggrConfig.GetOptionalArg(argIdx) + sasFile := streamAggrConfig.GetOptionalArg(argIdx) + if sasFile != "" { dedupInterval := streamAggrDedupInterval.GetOptionalArgOrDefault(argIdx, 0) - sas, err := streamaggr.NewAggregators(saCfg, rwctx.pushInternal, dedupInterval) + sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternal, dedupInterval) if err != nil { - logger.Fatalf("cannot initialize stream aggregators from -remoteWrite.streamAggrFile=%q: %s", sasFile, err) + logger.Fatalf("cannot initialize stream aggregators from -remoteWrite.streamAggr.config=%q: %s", sasFile, err) } - rwctx.sas = sas - rwctx.saHash = saHash + rwctx.sas.Store(sas) rwctx.streamAggrKeepInput = streamAggrKeepInput.GetOptionalArg(argIdx) + metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(1) + metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, sasFile)).Set(fasttime.UnixTimestamp()) } return rwctx @@ -592,8 +594,10 @@ func (rwctx *remoteWriteCtx) MustStop() { rwctx.fq.UnblockAllReaders() rwctx.c.MustStop() rwctx.c = nil - rwctx.sas.MustStop() - rwctx.sas = nil + + sas := rwctx.sas.Swap(nil) + sas.MustStop() + rwctx.fq.MustClose() rwctx.fq = nil @@ -624,8 +628,9 @@ func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) { rwctx.rowsPushedAfterRelabel.Add(rowsCount) // Apply stream aggregation if any - rwctx.sas.Push(tss) - if rwctx.sas == nil || rwctx.streamAggrKeepInput { + sas := rwctx.sas.Load() + sas.Push(tss) + if sas == nil || rwctx.streamAggrKeepInput { // Push samples to the remote storage rwctx.pushInternal(tss) } @@ -645,17 +650,33 @@ func (rwctx *remoteWriteCtx) pushInternal(tss []prompbmarshal.TimeSeries) { } func (rwctx *remoteWriteCtx) reinitStreamAggr() { - if rwctx.sas == nil { + sas := rwctx.sas.Load() + if sas == nil { + // There is no stream aggregation for rwctx return } - saCfg, saHash := saCfgLoader.getCurrentConfig(rwctx.idx) - if rwctx.saHash == saHash { + + sasFile := streamAggrConfig.GetOptionalArg(rwctx.idx) + logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", sasFile) + metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, sasFile)).Inc() + dedupInterval := streamAggrDedupInterval.GetOptionalArgOrDefault(rwctx.idx, 0) + sasNew, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternal, dedupInterval) + if err != nil { + metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_errors_total{path=%q}`, sasFile)).Inc() + metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(0) + logger.Errorf("cannot reload stream aggregation config from -remoteWrite.streamAggr.config=%q; continue using the previously loaded config; error: %s", sasFile, err) return } - if err := rwctx.sas.ReInitConfigs(saCfg); err != nil { - logger.Errorf("Cannot apply stream aggregation configs %d: %s", rwctx.idx, err) + if !sasNew.Equal(sas) { + sasOld := rwctx.sas.Swap(sasNew) + sasOld.MustStop() + logger.Infof("successfully reloaded stream aggregation configs at -remoteWrite.streamAggr.config=%q", sasFile) + } else { + sasNew.MustStop() + logger.Infof("the config at -remoteWrite.streamAggr.config=%q wasn't changed", sasFile) } - rwctx.saHash = saHash + metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(1) + metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, sasFile)).Set(fasttime.UnixTimestamp()) } var tssRelabelPool = &sync.Pool{ @@ -672,3 +693,20 @@ func getRowsCount(tss []prompbmarshal.TimeSeries) int { } return rowsCount } + +// CheckStreamAggrConfigs checks configs pointed by -remoteWrite.streamAggr.config +func CheckStreamAggrConfigs() error { + pushNoop := func(tss []prompbmarshal.TimeSeries) {} + for idx, sasFile := range *streamAggrConfig { + if sasFile == "" { + continue + } + dedupInterval := streamAggrDedupInterval.GetOptionalArgOrDefault(idx, 0) + sas, err := streamaggr.LoadFromFile(sasFile, pushNoop, dedupInterval) + if err != nil { + return fmt.Errorf("cannot load -remoteWrite.streamAggr.config=%q: %w", sasFile, err) + } + sas.MustStop() + } + return nil +} diff --git a/app/vmagent/remotewrite/streamagg.go b/app/vmagent/remotewrite/streamagg.go deleted file mode 100644 index b56091f53..000000000 --- a/app/vmagent/remotewrite/streamagg.go +++ /dev/null @@ -1,118 +0,0 @@ -package remotewrite - -import ( - "fmt" - "sync/atomic" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr" - "github.com/VictoriaMetrics/metrics" -) - -var ( - streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config. "+ - "See https://docs.victoriametrics.com/stream-aggregation.html . "+ - "See also -remoteWrite.streamAggr.keepInput and -remoteWrite.streamAggr.dedupInterval") - streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep input samples after the aggregation with -remoteWrite.streamAggr.config. "+ - "By default the input is dropped after the aggregation, so only the aggregate data is sent to the -remoteWrite.url. "+ - "See https://docs.victoriametrics.com/stream-aggregation.html") - streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", "Input samples are de-duplicated with this interval before being aggregated. "+ - "Only the last sample per each time series per each interval is aggregated if the interval is greater than zero") -) - -var ( - saCfgReloads = metrics.NewCounter(`vmagent_streamaggr_config_reloads_total`) - saCfgReloadErr = metrics.NewCounter(`vmagent_streamaggr_config_reloads_errors_total`) - saCfgSuccess = metrics.NewCounter(`vmagent_streamaggr_config_last_reload_successful`) - saCfgTimestamp = metrics.NewCounter(`vmagent_streamaggr_config_last_reload_success_timestamp_seconds`) -) - -// saConfigRules - type alias for unmarshalled stream aggregation config -type saConfigRules = []*streamaggr.Config - -// saConfigsLoader loads stream aggregation configs from the given files. -type saConfigsLoader struct { - files []string - configs atomic.Pointer[[]saConfig] -} - -// newSaConfigsLoader creates new saConfigsLoader for the given config files. -func newSaConfigsLoader(configFiles []string) (*saConfigsLoader, error) { - result := &saConfigsLoader{ - files: configFiles, - } - // Initial load of configs. - if err := result.reloadConfigs(); err != nil { - return nil, err - } - return result, nil -} - -// reloadConfigs reloads stream aggregation configs from the files given in constructor. -func (r *saConfigsLoader) reloadConfigs() error { - // Increment reloads counter if it is not the initial load. - if r.configs.Load() != nil { - saCfgReloads.Inc() - } - - // Load all configs from files. - var configs = make([]saConfig, len(r.files)) - for i, path := range r.files { - if len(path) == 0 { - // Skip empty stream aggregation config. - continue - } - rules, hash, err := streamaggr.LoadConfigsFromFile(path) - if err != nil { - saCfgSuccess.Set(0) - saCfgReloadErr.Inc() - return fmt.Errorf("cannot load stream aggregation config from %q: %w", path, err) - } - configs[i] = saConfig{ - path: path, - hash: hash, - rules: rules, - } - } - - // Update configs. - r.configs.Store(&configs) - - saCfgSuccess.Set(1) - saCfgTimestamp.Set(fasttime.UnixTimestamp()) - return nil -} - -// getCurrentConfig returns the current stream aggregation config with the given idx. -func (r *saConfigsLoader) getCurrentConfig(idx int) (saConfigRules, uint64) { - all := r.configs.Load() - if all == nil { - return nil, 0 - } - cfgs := *all - if len(cfgs) == 0 { - return nil, 0 - } - if idx >= len(cfgs) { - if len(cfgs) == 1 { - cfg := cfgs[0] - return cfg.rules, cfg.hash - } - return nil, 0 - } - cfg := cfgs[idx] - return cfg.rules, cfg.hash -} - -type saConfig struct { - path string - hash uint64 - rules saConfigRules -} - -// CheckStreamAggConfigs checks -remoteWrite.streamAggr.config. -func CheckStreamAggConfigs() error { - _, err := newSaConfigsLoader(*streamAggrConfig) - return err -} diff --git a/app/vminsert/common/insert_ctx.go b/app/vminsert/common/insert_ctx.go index 67ed92d90..b2acad938 100644 --- a/app/vminsert/common/insert_ctx.go +++ b/app/vminsert/common/insert_ctx.go @@ -137,7 +137,8 @@ func (ctx *InsertCtx) ApplyRelabeling() { // FlushBufs flushes buffered rows to the underlying storage. func (ctx *InsertCtx) FlushBufs() error { - if sa != nil && !ctx.skipStreamAggr { + sas := sasGlobal.Load() + if sas != nil && !ctx.skipStreamAggr { ctx.streamAggrCtx.push(ctx.mrs) if !*streamAggrKeepInput { ctx.Reset(0) diff --git a/app/vminsert/common/streamaggr.go b/app/vminsert/common/streamaggr.go index b01132791..6a512c81c 100644 --- a/app/vminsert/common/streamaggr.go +++ b/app/vminsert/common/streamaggr.go @@ -4,6 +4,7 @@ import ( "flag" "fmt" "sync" + "sync/atomic" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -29,18 +30,31 @@ var ( ) var ( - stopCh = make(chan struct{}) - configReloaderWG sync.WaitGroup + saCfgReloaderStopCh = make(chan struct{}) + saCfgReloaderWG sync.WaitGroup saCfgReloads = metrics.NewCounter(`vminsert_streamagg_config_reloads_total`) saCfgReloadErr = metrics.NewCounter(`vminsert_streamagg_config_reloads_errors_total`) saCfgSuccess = metrics.NewCounter(`vminsert_streamagg_config_last_reload_successful`) saCfgTimestamp = metrics.NewCounter(`vminsert_streamagg_config_last_reload_success_timestamp_seconds`) - sa *streamaggr.Aggregators - saHash uint64 + sasGlobal atomic.Pointer[streamaggr.Aggregators] ) +// CheckStreamAggrConfig checks config pointed by -stramaggr.config +func CheckStreamAggrConfig() error { + if *streamAggrConfig == "" { + return nil + } + pushNoop := func(tss []prompbmarshal.TimeSeries) {} + sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushNoop, *streamAggrDedupInterval) + if err != nil { + return fmt.Errorf("error when loading -streamAggr.config=%q: %w", *streamAggrConfig, err) + } + sas.MustStop() + return nil +} + // InitStreamAggr must be called after flag.Parse and before using the common package. // // MustStopStreamAggr must be called when stream aggr is no longer needed. @@ -51,45 +65,60 @@ func InitStreamAggr() { sighupCh := procutil.NewSighupChan() - configs, hash, err := streamaggr.LoadConfigsFromFile(*streamAggrConfig) + sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, *streamAggrDedupInterval) if err != nil { logger.Fatalf("cannot load -streamAggr.config=%q: %s", *streamAggrConfig, err) } - a, err := streamaggr.NewAggregators(configs, pushAggregateSeries, *streamAggrDedupInterval) - if err != nil { - logger.Fatalf("cannot init -streamAggr.config=%q: %s", *streamAggrConfig, err) - } - sa = a - saHash = hash + sasGlobal.Store(sas) saCfgSuccess.Set(1) saCfgTimestamp.Set(fasttime.UnixTimestamp()) // Start config reloader. - configReloaderWG.Add(1) + saCfgReloaderWG.Add(1) go func() { - defer configReloaderWG.Done() + defer saCfgReloaderWG.Done() for { select { case <-sighupCh: - case <-stopCh: + case <-saCfgReloaderStopCh: return } - if err := reloadSaConfig(); err != nil { - logger.Errorf("cannot reload -streamAggr.config=%q: %s", *streamAggrConfig, err) - continue - } + reloadStreamAggrConfig() } }() } +func reloadStreamAggrConfig() { + logger.Infof("reloading -streamAggr.config=%q", *streamAggrConfig) + saCfgReloads.Inc() + + sasNew, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, *streamAggrDedupInterval) + if err != nil { + saCfgSuccess.Set(0) + saCfgReloadErr.Inc() + logger.Errorf("cannot reload -streamAggr.config=%q: use the previously loaded config; error: %s", *streamAggrConfig, err) + return + } + sas := sasGlobal.Load() + if !sasNew.Equal(sas) { + sasOld := sasGlobal.Swap(sasNew) + sasOld.MustStop() + logger.Infof("successfully reloaded stream aggregation config at -streamAggr.config=%q", *streamAggrConfig) + } else { + logger.Infof("nothing changed in -streamAggr.config=%q", *streamAggrConfig) + sasNew.MustStop() + } + saCfgSuccess.Set(1) + saCfgTimestamp.Set(fasttime.UnixTimestamp()) +} + // MustStopStreamAggr stops stream aggregators. func MustStopStreamAggr() { - close(stopCh) + close(saCfgReloaderStopCh) + saCfgReloaderWG.Wait() - sa.MustStop() - sa = nil - - configReloaderWG.Wait() + sas := sasGlobal.Swap(nil) + sas.MustStop() } type streamAggrCtx struct { @@ -109,6 +138,7 @@ func (ctx *streamAggrCtx) push(mrs []storage.MetricRow) { ts := &tss[0] labels := ts.Labels samples := ts.Samples + sas := sasGlobal.Load() for _, mr := range mrs { if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil { logger.Panicf("BUG: cannot unmarshal recently marshaled MetricName: %s", err) @@ -133,7 +163,7 @@ func (ctx *streamAggrCtx) push(mrs []storage.MetricRow) { ts.Labels = labels ts.Samples = samples - sa.Push(tss) + sas.Push(tss) } } @@ -164,33 +194,3 @@ func pushAggregateSeries(tss []prompbmarshal.TimeSeries) { logger.Errorf("cannot flush aggregate series: %s", err) } } - -func reloadSaConfig() error { - saCfgReloads.Inc() - - cfgs, hash, err := streamaggr.LoadConfigsFromFile(*streamAggrConfig) - if err != nil { - saCfgSuccess.Set(0) - saCfgReloadErr.Inc() - return fmt.Errorf("cannot reload -streamAggr.config=%q: %w", *streamAggrConfig, err) - } - - if saHash == hash { - return nil - } - - if err = sa.ReInitConfigs(cfgs); err != nil { - saCfgSuccess.Set(0) - saCfgReloadErr.Inc() - return fmt.Errorf("cannot apply new -streamAggr.config=%q: %w", *streamAggrConfig, err) - } - - saHash = hash - - saCfgSuccess.Set(1) - saCfgTimestamp.Set(fasttime.UnixTimestamp()) - - logger.Infof("Successfully reloaded stream aggregation config") - - return nil -} diff --git a/app/vminsert/relabel/relabel.go b/app/vminsert/relabel/relabel.go index 601eb4968..054b8b67f 100644 --- a/app/vminsert/relabel/relabel.go +++ b/app/vminsert/relabel/relabel.go @@ -71,6 +71,12 @@ var ( var pcsGlobal atomic.Value +// CheckRelabelConfig checks config pointed by -relabelConfig +func CheckRelabelConfig() error { + _, err := loadRelabelConfig() + return err +} + func loadRelabelConfig() (*promrelabel.ParsedConfigs, error) { if len(*relabelConfig) == 0 { return nil, nil diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index ede415cbb..452ed3364 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -26,7 +26,8 @@ created by v1.90.0 or newer versions. The solution is to upgrade to v1.90.0 or n * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-kafka.consumer.topic.concurrency` command-line flag. It controls the number of Kafka consumer workers to use by `vmagent`. It should eliminate the need to start multiple `vmagent` instances to improve data transfer rate. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1957). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [Kafka producer and consumer](https://docs.victoriametrics.com/vmagent.html#kafka-integration) on `arm64` machines. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2271). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): delete unused buffered data at `-remoteWrite.tmpDataPath` directory when there is no matching `-remoteWrite.url` to send this data to. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014). -* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for hot reload of stream aggregation configs. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3639). +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add the ability for hot reloading of [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) configs. See [these docs](https://docs.victoriametrics.com/stream-aggregation.html#configuration-update) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3639). +* FEATURE: check the contents of `-relabelConfig` and `-streamAggr.config` files additionally to `-promscrape.config` when single-node VictoriaMetrics runs with `-dryRun` command-line flag. This aligns the behaviour of single-node VictoriaMetrics with [vmagent](https://docs.victoriametrics.com/vmagent.html) behaviour for `-dryRun` command-line flag. * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): automatically draw a heatmap graph when the query selects a single [histogram](https://docs.victoriametrics.com/keyConcepts.html#histogram). This simplifies analyzing histograms. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3384). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add support for drag'n'drop and paste from clipboard in the "Trace analyzer" page. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3971). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): hide messages longer than 3 lines in the trace. You can view the full message by clicking on the `show more` button. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3971). @@ -138,6 +139,19 @@ Released at 2023-02-24 * BUGFIX: properly parse timestamps in milliseconds when [ingesting data via OpenTSDB telnet put protocol](https://docs.victoriametrics.com/#sending-data-via-telnet-put-protocol). Previously timestamps in milliseconds were mistakenly multiplied by 1000. Thanks to @Droxenator for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3810). * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): do not add extrapolated points outside the real points when using [interpolate()](https://docs.victoriametrics.com/MetricsQL.html#interpolate) function. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3816). +## [v1.87.4](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.4) + +Released at 2023-03-25 + +**v1.87.x is a line of LTS releases (e.g. long-time support). It contains important up-to-date bugfixes. +The v1.87.x line will be supported for at least 12 months since [v1.87.0](https://docs.victoriametrics.com/CHANGELOG.html#v1870) release** + +* BUGFIX: prevent from slow [snapshot creating](https://docs.victoriametrics.com/#how-to-work-with-snapshots) under high data ingestion rate. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3551). +* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth.html): suppress [proxy protocol](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt) parsing errors in case of `EOF`. Usually, the error is caused by health checks and is not a sign of an actual error. +* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): fix snapshot not being deleted in case of error during backup. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2055). +* BUGFIX: allow using dashes and dots in environment variables names referred in config files via `%{ENV-VAR.SYNTAX}`. See [these docs](https://docs.victoriametrics.com/#environment-variables) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3999). +* BUGFIX: return back query performance scalability on hosts with big number of CPU cores. The scalability has been reduced in [v1.86.0](https://docs.victoriametrics.com/CHANGELOG.html#v1860). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3966). + ## [v1.87.3](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.3) Released at 2023-03-12 diff --git a/docs/README.md b/docs/README.md index 52ff491d1..667a87195 100644 --- a/docs/README.md +++ b/docs/README.md @@ -2194,7 +2194,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li Comma-separated downsampling periods in the format 'offset:period'. For example, '30d:10m' instructs to leave a single sample per 10 minutes for samples older than 30 days. See https://docs.victoriametrics.com/#downsampling for details. This flag is available only in VictoriaMetrics enterprise. See https://docs.victoriametrics.com/enterprise.html Supports an array of values separated by comma or specified via multiple flags. -dryRun - Whether to check only -promscrape.config and then exit. Unknown config entries aren't allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse=false command-line flag + Whether to check config files without running VictoriaMetrics. The following config files are checked: -promscrape.config, -relabelConfig and -streamAggr.config. Unknown config entries aren't allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse=false command-line flag -enableTCP6 Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP and UDP is used -envflag.enable diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index a9f13a957..c3b3aef06 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -2197,7 +2197,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li Comma-separated downsampling periods in the format 'offset:period'. For example, '30d:10m' instructs to leave a single sample per 10 minutes for samples older than 30 days. See https://docs.victoriametrics.com/#downsampling for details. This flag is available only in VictoriaMetrics enterprise. See https://docs.victoriametrics.com/enterprise.html Supports an array of values separated by comma or specified via multiple flags. -dryRun - Whether to check only -promscrape.config and then exit. Unknown config entries aren't allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse=false command-line flag + Whether to check config files without running VictoriaMetrics. The following config files are checked: -promscrape.config, -relabelConfig and -streamAggr.config. Unknown config entries aren't allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse=false command-line flag -enableTCP6 Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP and UDP is used -envflag.enable diff --git a/docs/stream-aggregation.md b/docs/stream-aggregation.md index df983c6ba..1ad21511e 100644 --- a/docs/stream-aggregation.md +++ b/docs/stream-aggregation.md @@ -509,7 +509,7 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server- # match is an optional filter for incoming samples to aggregate. # It can contain arbitrary Prometheus series selector # according to https://docs.victoriametrics.com/keyConcepts.html#filtering . - # If match is missing, then all the incoming samples are aggregated. + # If match isn't set, then all the incoming samples are aggregated. - match: 'http_request_duration_seconds_bucket{env=~"prod|staging"}' # interval is the interval for the aggregation. @@ -548,17 +548,13 @@ per each specified config entry. ### Configuration update -[vmagent](https://docs.victoriametrics.com/vmagent.html) and -[single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html) support two -approaches for reloading stream aggregation configs from updated config files such as -`-remoteWrite.streamAggr.config` and `-streamAggr.config` without restart. +[vmagent](https://docs.victoriametrics.com/vmagent.html) and [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html) +support the following approaches for hot reloading stream aggregation configs from `-remoteWrite.streamAggr.config` and `-streamAggr.config`: -* Sending `SIGHUP` signal to `vmagent` process: +* By sending `SIGHUP` signal to `vmagent` or `victoria-metrics` process: ```console kill -SIGHUP `pidof vmagent` ``` -* Sending HTTP request to `/-/reload` endpoint (e.g. `http://vmagent:8429/-/reload`). - -It will reset the aggregation state only for changed rules in the configuration files. +* By sending HTTP request to `/-/reload` endpoint (e.g. `http://vmagent:8429/-/reload` or `http://victoria-metrics:8428/-/reload). diff --git a/docs/vmagent.md b/docs/vmagent.md index 36ca3b697..4b640355e 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -1190,7 +1190,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . -denyQueryTracing Whether to disable the ability to trace queries. See https://docs.victoriametrics.com/#query-tracing -dryRun - Whether to check only config files without running vmagent. The following files are checked: -promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig . Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag + Whether to check config files without running vmagent. The following files are checked: -promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig, -remoteWrite.streamAggr.config . Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag -enableTCP6 Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP and UDP is used -envflag.enable diff --git a/lib/promscrape/scraper.go b/lib/promscrape/scraper.go index e191b1a97..661d9ebe5 100644 --- a/lib/promscrape/scraper.go +++ b/lib/promscrape/scraper.go @@ -50,7 +50,7 @@ var ( // CheckConfig checks -promscrape.config for errors and unsupported options. func CheckConfig() error { if *promscrapeConfigFile == "" { - return fmt.Errorf("missing -promscrape.config option") + return nil } _, _, err := loadConfig(*promscrapeConfigFile) return err diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index c177544f3..03a0e0210 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -8,7 +8,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -19,7 +18,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" - "github.com/cespare/xxhash/v2" "gopkg.in/yaml.v2" ) @@ -39,40 +37,22 @@ var supportedOutputs = []string{ "quantiles(phi1, ..., phiN)", } -// ParseConfig loads array of stream aggregation configs from the given path. -func ParseConfig(data []byte) ([]*Config, uint64, error) { - var cfgs []*Config - if err := yaml.UnmarshalStrict(data, &cfgs); err != nil { - return nil, 0, fmt.Errorf("cannot parse stream aggregation config %w", err) - } - return cfgs, xxhash.Sum64(data), nil -} - -// LoadConfigsFromFile loads array of stream aggregation configs from the given path. -func LoadConfigsFromFile(path string) ([]*Config, uint64, error) { - data, err := fs.ReadFileOrHTTP(path) - if err != nil { - return nil, 0, fmt.Errorf("cannot load stream aggregation config from %q: %w", path, err) - } - return ParseConfig(data) -} - -// LoadAggregatorsFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data. +// LoadFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data. // // If dedupInterval > 0, then the input samples are de-duplicated before being aggregated, // e.g. only the last sample per each time series per each dedupInterval is aggregated. // // The returned Aggregators must be stopped with MustStop() when no longer needed. -func LoadAggregatorsFromFile(path string, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, uint64, error) { - cfgs, configHash, err := LoadConfigsFromFile(path) +func LoadFromFile(path string, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) { + data, err := fs.ReadFileOrHTTP(path) if err != nil { - return nil, 0, fmt.Errorf("cannot load stream aggregation config: %w", err) + return nil, fmt.Errorf("cannot load aggregators: %w", err) } - as, err := NewAggregators(cfgs, pushFunc, dedupInterval) + as, err := NewAggregatorsFromData(data, pushFunc, dedupInterval) if err != nil { - return nil, 0, fmt.Errorf("cannot initialize aggregators from %q: %w", path, err) + return nil, fmt.Errorf("cannot initialize aggregators from %q: %w", path, err) } - return as, configHash, nil + return as, nil } // NewAggregatorsFromData initializes Aggregators from the given data and uses the given pushFunc for pushing the aggregated data. @@ -84,7 +64,7 @@ func LoadAggregatorsFromFile(path string, pushFunc PushFunc, dedupInterval time. func NewAggregatorsFromData(data []byte, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) { var cfgs []*Config if err := yaml.UnmarshalStrict(data, &cfgs); err != nil { - return nil, err + return nil, fmt.Errorf("cannot parse stream aggregation config: %w", err) } return NewAggregators(cfgs, pushFunc, dedupInterval) } @@ -148,22 +128,13 @@ type Config struct { OutputRelabelConfigs []promrelabel.RelabelConfig `yaml:"output_relabel_configs,omitempty"` } -func (cfg *Config) hash() (uint64, error) { - if cfg == nil { - return 0, nil - } - data, err := json.Marshal(cfg) - if err != nil { - return 0, fmt.Errorf("cannot marshal stream aggregation rule %+v: %w", cfg, err) - } - return xxhash.Sum64(data), nil -} - // Aggregators aggregates metrics passed to Push and calls pushFunc for aggregate data. type Aggregators struct { - as atomic.Pointer[[]*aggregator] - pushFunc PushFunc - dedupInterval time.Duration + as []*aggregator + + // configData contains marshaled configs passed to NewAggregators(). + // It is used in Equal() for comparing Aggregators. + configData []byte } // NewAggregators creates Aggregators from the given cfgs. @@ -182,17 +153,22 @@ func NewAggregators(cfgs []*Config, pushFunc PushFunc, dedupInterval time.Durati for i, cfg := range cfgs { a, err := newAggregator(cfg, pushFunc, dedupInterval) if err != nil { + // Stop already initialized aggregators before returning the error. + for _, a := range as[:i] { + a.MustStop() + } return nil, fmt.Errorf("cannot initialize aggregator #%d: %w", i, err) } as[i] = a } - result := &Aggregators{ - pushFunc: pushFunc, - dedupInterval: dedupInterval, + configData, err := json.Marshal(cfgs) + if err != nil { + logger.Panicf("BUG: cannot marshal the provided configs: %s", err) } - result.as.Store(&as) - - return result, nil + return &Aggregators{ + as: as, + configData: configData, + }, nil } // MustStop stops a. @@ -200,84 +176,29 @@ func (a *Aggregators) MustStop() { if a == nil { return } - for _, aggr := range *a.as.Load() { + for _, aggr := range a.as { aggr.MustStop() } } +// Equal returns true if a and b are initialized from identical configs. +func (a *Aggregators) Equal(b *Aggregators) bool { + if a == nil || b == nil { + return a == nil && b == nil + } + return string(a.configData) == string(b.configData) +} + // Push pushes tss to a. func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries) { if a == nil { return } - for _, aggr := range *a.as.Load() { + for _, aggr := range a.as { aggr.Push(tss) } } -// ReInitConfigs reinits state of Aggregators a with the given new stream aggregation config -func (a *Aggregators) ReInitConfigs(cfgs []*Config) error { - if a == nil { - return nil - } - - keys := make(map[uint64]struct{}) // set of all keys (configs and aggregators) - cfgsMap := make(map[uint64]*Config) // map of config keys to their indices in cfgs - aggrsMap := make(map[uint64]*aggregator) // map of aggregator keys to their indices in a.as - - for _, cfg := range cfgs { - key, err := cfg.hash() - if err != nil { - return fmt.Errorf("unable to calculate hash for config '%+v': %w", cfg, err) - } - keys[key] = struct{}{} - cfgsMap[key] = cfg - } - for _, aggr := range *a.as.Load() { - keys[aggr.cfgHash] = struct{}{} - aggrsMap[aggr.cfgHash] = aggr - } - - asNew := make([]*aggregator, 0, len(aggrsMap)) - asDel := make([]*aggregator, 0, len(aggrsMap)) - for key := range keys { - cfg, hasCfg := cfgsMap[key] - agg, hasAggr := aggrsMap[key] - - // if config for aggregator was changed or removed - // then we need to stop aggregator and remove it - if !hasCfg && hasAggr { - asDel = append(asDel, agg) - continue - } - - // if there is no aggregator for config (new config), - // then we need to create it - if hasCfg && !hasAggr { - newAgg, err := newAggregator(cfg, a.pushFunc, a.dedupInterval) - if err != nil { - return fmt.Errorf("cannot initialize aggregator for config '%+v': %w", cfg, err) - } - asNew = append(asNew, newAgg) - continue - } - - // if aggregator config was not changed, then we can just keep it - if hasCfg && hasAggr { - asNew = append(asNew, agg) - } - } - - // Atomically replace aggregators array. - a.as.Store(&asNew) - // and stop old aggregators - for _, aggr := range asDel { - aggr.MustStop() - } - - return nil -} - // aggregator aggregates input series according to the config passed to NewAggregator type aggregator struct { match *promrelabel.IfExpression @@ -295,7 +216,6 @@ type aggregator struct { // aggrStates contains aggregate states for the given outputs aggrStates []aggrState - hasState atomic.Bool pushFunc PushFunc @@ -304,8 +224,7 @@ type aggregator struct { // It contains the interval, labels in (by, without), plus output name. // For example, foo_bar metric name is transformed to foo_bar:1m_by_job // for `interval: 1m`, `by: [job]` - suffix string - cfgHash uint64 + suffix string wg sync.WaitGroup stopCh chan struct{} @@ -433,11 +352,6 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration) dedupAggr = newLastAggrState() } - cfgHash, err := cfg.hash() - if err != nil { - return nil, fmt.Errorf("cannot calculate config hash for config %+v: %w", cfg, err) - } - // initialize the aggregator a := &aggregator{ match: cfg.Match, @@ -453,8 +367,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration) aggrStates: aggrStates, pushFunc: pushFunc, - suffix: suffix, - cfgHash: cfgHash, + suffix: suffix, stopCh: make(chan struct{}), } @@ -521,8 +434,6 @@ func (a *aggregator) dedupFlush() { } a.dedupAggr.appendSeriesForFlush(ctx) a.push(ctx.tss) - - a.hasState.Store(false) } func (a *aggregator) flush() { @@ -552,8 +463,6 @@ func (a *aggregator) flush() { // Push the output metrics. a.pushFunc(tss) } - - a.hasState.Store(false) } // MustStop stops the aggregator. @@ -561,26 +470,19 @@ func (a *aggregator) flush() { // The aggregator stops pushing the aggregated metrics after this call. func (a *aggregator) MustStop() { close(a.stopCh) - - if a.hasState.Load() { - if a.dedupAggr != nil { - flushConcurrencyCh <- struct{}{} - a.dedupFlush() - <-flushConcurrencyCh - } - - flushConcurrencyCh <- struct{}{} - a.flush() - <-flushConcurrencyCh - } - a.wg.Wait() + + // Flush the remaining data from the last interval if needed. + flushConcurrencyCh <- struct{}{} + if a.dedupAggr != nil { + a.dedupFlush() + } + a.flush() + <-flushConcurrencyCh } // Push pushes tss to a. func (a *aggregator) Push(tss []prompbmarshal.TimeSeries) { - a.hasState.Store(true) - if a.dedupAggr == nil { a.push(tss) return diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go index 096830696..57c74aa8e 100644 --- a/lib/streamaggr/streamaggr_test.go +++ b/lib/streamaggr/streamaggr_test.go @@ -118,6 +118,45 @@ func TestAggregatorsFailure(t *testing.T) { `) } +func TestAggregatorsEqual(t *testing.T) { + f := func(a, b string, expectedResult bool) { + t.Helper() + + pushFunc := func(tss []prompbmarshal.TimeSeries) {} + aa, err := NewAggregatorsFromData([]byte(a), pushFunc, 0) + if err != nil { + t.Fatalf("cannot initialize aggregators: %s", err) + } + ab, err := NewAggregatorsFromData([]byte(b), pushFunc, 0) + if err != nil { + t.Fatalf("cannot initialize aggregators: %s", err) + } + result := aa.Equal(ab) + if result != expectedResult { + t.Fatalf("unexpected result; got %v; want %v", result, expectedResult) + } + } + f("", "", true) + f(` +- outputs: [total] + interval: 5m +`, ``, false) + f(` +- outputs: [total] + interval: 5m +`, ` +- outputs: [total] + interval: 5m +`, true) + f(` +- outputs: [total] + interval: 3m +`, ` +- outputs: [total] + interval: 5m +`, false) +} + func TestAggregatorsSuccess(t *testing.T) { f := func(config, inputMetrics, outputMetricsExpected string) { t.Helper() @@ -145,11 +184,6 @@ func TestAggregatorsSuccess(t *testing.T) { // Push the inputMetrics to Aggregators tssInput := mustParsePromMetrics(inputMetrics) a.Push(tssInput) - if a != nil { - for _, aggr := range *a.as.Load() { - aggr.flush() - } - } a.MustStop() // Verify the tssOutput contains the expected metrics @@ -671,7 +705,7 @@ func TestAggregatorsWithDedupInterval(t *testing.T) { tssInput := mustParsePromMetrics(inputMetrics) a.Push(tssInput) if a != nil { - for _, aggr := range *a.as.Load() { + for _, aggr := range a.as { aggr.dedupFlush() aggr.flush() } @@ -719,106 +753,6 @@ foo:1m_sum_samples{baz="qwe"} 10 `) } -func TestAggregatorsReinit(t *testing.T) { - f := func(config, newConfig, inputMetrics, outputMetricsExpected string) { - t.Helper() - - // Initialize Aggregators - var tssOutput []prompbmarshal.TimeSeries - var tssOutputLock sync.Mutex - pushFunc := func(tss []prompbmarshal.TimeSeries) { - tssOutputLock.Lock() - for _, ts := range tss { - labelsCopy := append([]prompbmarshal.Label{}, ts.Labels...) - samplesCopy := append([]prompbmarshal.Sample{}, ts.Samples...) - tssOutput = append(tssOutput, prompbmarshal.TimeSeries{ - Labels: labelsCopy, - Samples: samplesCopy, - }) - } - tssOutputLock.Unlock() - } - - a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0) - if err != nil { - t.Fatalf("cannot initialize aggregators: %s", err) - } - - // Push the inputMetrics to Aggregators - tssInput := mustParsePromMetrics(inputMetrics) - a.Push(tssInput) - - // Reinitialize Aggregators - nc, _, err := ParseConfig([]byte(newConfig)) - if err != nil { - t.Fatalf("cannot parse new config: %s", err) - } - err = a.ReInitConfigs(nc) - if err != nil { - t.Fatalf("cannot reinit aggregators: %s", err) - } - - // Push the inputMetrics to Aggregators - a.Push(tssInput) - if a != nil { - for _, aggr := range *a.as.Load() { - aggr.flush() - } - } - - a.MustStop() - - // Verify the tssOutput contains the expected metrics - tsStrings := make([]string, len(tssOutput)) - for i, ts := range tssOutput { - tsStrings[i] = timeSeriesToString(ts) - } - sort.Strings(tsStrings) - outputMetrics := strings.Join(tsStrings, "") - if outputMetrics != outputMetricsExpected { - t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected) - } - } - - f(` -- interval: 1m - outputs: [count_samples] -`, ` -- interval: 1m - outputs: [sum_samples] -`, ` -foo 123 -bar 567 -foo 234 -`, `bar:1m_count_samples 1 -bar:1m_sum_samples 567 -foo:1m_count_samples 2 -foo:1m_sum_samples 357 -`) - - f(` -- interval: 1m - outputs: [total] -- interval: 2m - outputs: [count_samples] -`, ` -- interval: 1m - outputs: [sum_samples] -- interval: 2m - outputs: [count_samples] -`, ` -foo 123 -bar 567 -foo 234 -`, `bar:1m_sum_samples 567 -bar:1m_total 0 -bar:2m_count_samples 2 -foo:1m_sum_samples 357 -foo:1m_total 111 -foo:2m_count_samples 4 -`) -} - func timeSeriesToString(ts prompbmarshal.TimeSeries) string { labelsString := promrelabel.LabelsToString(ts.Labels) if len(ts.Samples) != 1 { From 4d00107b92eea04adcd76eeacd12032b04d54aab Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 31 Mar 2023 22:41:02 -0700 Subject: [PATCH 11/12] lib/fs: follow-up for ec45f1bc5fd1e8d4bffee3d599075961bd96d235 Properly close response body before checking for the response code. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4034 --- lib/fs/fs.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/lib/fs/fs.go b/lib/fs/fs.go index 7da255887..43fbe04fa 100644 --- a/lib/fs/fs.go +++ b/lib/fs/fs.go @@ -424,13 +424,14 @@ func ReadFileOrHTTP(path string) ([]byte, error) { if err != nil { return nil, fmt.Errorf("cannot fetch %q: %w", path, err) } - - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("unexpected status code when fetching %q: %d, expecting %d", path, resp.StatusCode, http.StatusOK) - } - data, err := io.ReadAll(resp.Body) _ = resp.Body.Close() + if resp.StatusCode != http.StatusOK { + if len(data) > 4*1024 { + data = data[:4*1024] + } + return nil, fmt.Errorf("unexpected status code when fetching %q: %d, expecting %d; response: %q", path, resp.StatusCode, http.StatusOK, data) + } if err != nil { return nil, fmt.Errorf("cannot read %q: %s", path, err) } From cddfc4d3f8466fd704dc9023edd84d3888a73d52 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 31 Mar 2023 22:46:25 -0700 Subject: [PATCH 12/12] deployment/docker: update base Docker image from Alpine 3.17.2 to Alpine 3.17.3 This fixes security issues from https://alpinelinux.org/posts/Alpine-3.17.3-released.html This is a follow-up for 59c350d0d280251b1a0a0927cda52411def5c209 --- deployment/docker/Makefile | 4 ++-- docs/CHANGELOG.md | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/deployment/docker/Makefile b/deployment/docker/Makefile index 3df5df6e9..3d224b7b9 100644 --- a/deployment/docker/Makefile +++ b/deployment/docker/Makefile @@ -2,8 +2,8 @@ DOCKER_NAMESPACE := victoriametrics -ROOT_IMAGE ?= alpine:3.17.2 -CERTS_IMAGE := alpine:3.17.2 +ROOT_IMAGE ?= alpine:3.17.3 +CERTS_IMAGE := alpine:3.17.3 GO_BUILDER_IMAGE := golang:1.20.2-alpine BUILDER_IMAGE := local/builder:2.0.0-$(shell echo $(GO_BUILDER_IMAGE) | tr :/ __)-1 BASE_IMAGE := local/base:1.1.4-$(shell echo $(ROOT_IMAGE) | tr :/ __)-$(shell echo $(CERTS_IMAGE) | tr :/ __) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 452ed3364..d7927ce2d 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -19,6 +19,8 @@ The following tip changes can be tested by building VictoriaMetrics components f so the previous versions of VictoriaMetrics will exit with the `unexpected number of substrings in the part name` error when trying to run them on the data created by v1.90.0 or newer versions. The solution is to upgrade to v1.90.0 or newer releases** +* SECURITY: upgrade base docker image (alpine) from 3.17.2 to 3.17.3. See [alpine 3.17.3 release notes](https://alpinelinux.org/posts/Alpine-3.17.3-released.html). + * FEATURE: release Windows binaries for [single-node VictoriaMetrics](https://docs.victoriametrics.com/), [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html), [vmbackup](https://docs.victoriametrics.com/vmbackup.html) and [vmrestore](https://docs.victoriametrics.com/vmrestore.html). See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3236), [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3821) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/70) issues. * FEATURE: log metrics with truncated labels if the length of label value in the ingested metric exceeds `-maxLabelValueLen`. This should simplify debugging for this case. * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): show target URL when debugging [target relabeling](https://docs.victoriametrics.com/vmagent.html#relabel-debug). This should simplify target relabel debugging a bit. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3882).