mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 15:16:42 +00:00
Merge branch 'public-single-node' into victorialogs-wip
This commit is contained in:
commit
9673da2578
21 changed files with 454 additions and 162 deletions
|
@ -46,11 +46,11 @@ func loadRelabelConfigs() (*relabelConfigs, error) {
|
||||||
}
|
}
|
||||||
rcs.global = global
|
rcs.global = global
|
||||||
}
|
}
|
||||||
if len(*relabelConfigPaths) > (len(*remoteWriteURLs) + len(*remoteWriteMultitenantURLs)) {
|
if len(*relabelConfigPaths) > len(*remoteWriteURLs) {
|
||||||
return nil, fmt.Errorf("too many -remoteWrite.urlRelabelConfig args: %d; it mustn't exceed the number of -remoteWrite.url or -remoteWrite.multitenantURL args: %d",
|
return nil, fmt.Errorf("too many -remoteWrite.urlRelabelConfig args: %d; it mustn't exceed the number of -remoteWrite.url args: %d",
|
||||||
len(*relabelConfigPaths), (len(*remoteWriteURLs) + len(*remoteWriteMultitenantURLs)))
|
len(*relabelConfigPaths), (len(*remoteWriteURLs)))
|
||||||
}
|
}
|
||||||
rcs.perURL = make([]*promrelabel.ParsedConfigs, (len(*remoteWriteURLs) + len(*remoteWriteMultitenantURLs)))
|
rcs.perURL = make([]*promrelabel.ParsedConfigs, len(*remoteWriteURLs))
|
||||||
for i, path := range *relabelConfigPaths {
|
for i, path := range *relabelConfigPaths {
|
||||||
if len(path) == 0 {
|
if len(path) == 0 {
|
||||||
// Skip empty relabel config.
|
// Skip empty relabel config.
|
||||||
|
|
|
@ -29,7 +29,6 @@ import (
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/ratelimiter"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/ratelimiter"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
|
|
||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
"github.com/cespare/xxhash/v2"
|
"github.com/cespare/xxhash/v2"
|
||||||
)
|
)
|
||||||
|
@ -39,10 +38,6 @@ var (
|
||||||
"or Prometheus remote_write protocol. Example url: http://<victoriametrics-host>:8428/api/v1/write . "+
|
"or Prometheus remote_write protocol. Example url: http://<victoriametrics-host>:8428/api/v1/write . "+
|
||||||
"Pass multiple -remoteWrite.url options in order to replicate the collected data to multiple remote storage systems. "+
|
"Pass multiple -remoteWrite.url options in order to replicate the collected data to multiple remote storage systems. "+
|
||||||
"The data can be sharded among the configured remote storage systems if -remoteWrite.shardByURL flag is set")
|
"The data can be sharded among the configured remote storage systems if -remoteWrite.shardByURL flag is set")
|
||||||
remoteWriteMultitenantURLs = flagutil.NewArrayString("remoteWrite.multitenantURL", "Base path for multitenant remote storage URL to write data to. "+
|
|
||||||
"See https://docs.victoriametrics.com/vmagent/#multitenancy for details. Example url: http://<vminsert>:8480 . "+
|
|
||||||
"Pass multiple -remoteWrite.multitenantURL flags in order to replicate data to multiple remote storage systems. "+
|
|
||||||
"This flag is deprecated in favor of -enableMultitenantHandlers . See https://docs.victoriametrics.com/vmagent/#multitenancy")
|
|
||||||
enableMultitenantHandlers = flag.Bool("enableMultitenantHandlers", false, "Whether to process incoming data via multitenant insert handlers according to "+
|
enableMultitenantHandlers = flag.Bool("enableMultitenantHandlers", false, "Whether to process incoming data via multitenant insert handlers according to "+
|
||||||
"https://docs.victoriametrics.com/cluster-victoriametrics/#url-format . By default incoming data is processed via single-node insert handlers "+
|
"https://docs.victoriametrics.com/cluster-victoriametrics/#url-format . By default incoming data is processed via single-node insert handlers "+
|
||||||
"according to https://docs.victoriametrics.com/#how-to-import-time-series-data ."+
|
"according to https://docs.victoriametrics.com/#how-to-import-time-series-data ."+
|
||||||
|
@ -118,14 +113,10 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// rwctxsDefault contains statically populated entries when -remoteWrite.url is specified.
|
// rwctxs contains statically populated entries when -remoteWrite.url is specified.
|
||||||
rwctxsDefault []*remoteWriteCtx
|
rwctxs []*remoteWriteCtx
|
||||||
|
|
||||||
// rwctxsMap contains dynamically populated entries when -remoteWrite.multitenantURL is specified.
|
// Data without tenant id is written to defaultAuthToken if -enableMultitenantHandlers is specified.
|
||||||
rwctxsMap = make(map[tenantmetrics.TenantID][]*remoteWriteCtx)
|
|
||||||
rwctxsMapLock sync.Mutex
|
|
||||||
|
|
||||||
// Data without tenant id is written to defaultAuthToken if -remoteWrite.multitenantURL is specified.
|
|
||||||
defaultAuthToken = &auth.Token{}
|
defaultAuthToken = &auth.Token{}
|
||||||
|
|
||||||
// ErrQueueFullHTTPRetry must be returned when TryPush() returns false.
|
// ErrQueueFullHTTPRetry must be returned when TryPush() returns false.
|
||||||
|
@ -140,9 +131,9 @@ var (
|
||||||
disableOnDiskQueueAll bool
|
disableOnDiskQueueAll bool
|
||||||
)
|
)
|
||||||
|
|
||||||
// MultitenancyEnabled returns true if -enableMultitenantHandlers or -remoteWrite.multitenantURL is specified.
|
// MultitenancyEnabled returns true if -enableMultitenantHandlers is specified.
|
||||||
func MultitenancyEnabled() bool {
|
func MultitenancyEnabled() bool {
|
||||||
return *enableMultitenantHandlers || len(*remoteWriteMultitenantURLs) > 0
|
return *enableMultitenantHandlers
|
||||||
}
|
}
|
||||||
|
|
||||||
// Contains the current relabelConfigs.
|
// Contains the current relabelConfigs.
|
||||||
|
@ -173,11 +164,8 @@ var (
|
||||||
//
|
//
|
||||||
// Stop must be called for graceful shutdown.
|
// Stop must be called for graceful shutdown.
|
||||||
func Init() {
|
func Init() {
|
||||||
if len(*remoteWriteURLs) == 0 && len(*remoteWriteMultitenantURLs) == 0 {
|
if len(*remoteWriteURLs) == 0 {
|
||||||
logger.Fatalf("at least one `-remoteWrite.url` or `-remoteWrite.multitenantURL` command-line flag must be set")
|
logger.Fatalf("at least one `-remoteWrite.url` command-line flag must be set")
|
||||||
}
|
|
||||||
if len(*remoteWriteURLs) > 0 && len(*remoteWriteMultitenantURLs) > 0 {
|
|
||||||
logger.Fatalf("cannot set both `-remoteWrite.url` and `-remoteWrite.multitenantURL` command-line flags")
|
|
||||||
}
|
}
|
||||||
if *maxHourlySeries > 0 {
|
if *maxHourlySeries > 0 {
|
||||||
hourlySeriesLimiter = bloomfilter.NewLimiter(*maxHourlySeries, time.Hour)
|
hourlySeriesLimiter = bloomfilter.NewLimiter(*maxHourlySeries, time.Hour)
|
||||||
|
@ -228,7 +216,7 @@ func Init() {
|
||||||
relabelConfigTimestamp.Set(fasttime.UnixTimestamp())
|
relabelConfigTimestamp.Set(fasttime.UnixTimestamp())
|
||||||
|
|
||||||
if len(*remoteWriteURLs) > 0 {
|
if len(*remoteWriteURLs) > 0 {
|
||||||
rwctxsDefault = newRemoteWriteCtxs(nil, *remoteWriteURLs)
|
rwctxs = newRemoteWriteCtxs(nil, *remoteWriteURLs)
|
||||||
}
|
}
|
||||||
|
|
||||||
disableOnDiskQueueAll = true
|
disableOnDiskQueueAll = true
|
||||||
|
@ -261,12 +249,6 @@ func dropDanglingQueues() {
|
||||||
if *keepDanglingQueues {
|
if *keepDanglingQueues {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if len(*remoteWriteMultitenantURLs) > 0 {
|
|
||||||
// Do not drop dangling queues for *remoteWriteMultitenantURLs, since it is impossible to determine
|
|
||||||
// unused queues for multitenant urls - they are created on demand when new sample for the given
|
|
||||||
// tenant is pushed to remote storage.
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// Remove dangling persistent queues, if any.
|
// Remove dangling persistent queues, if any.
|
||||||
// This is required for the case when the number of queues has been changed or URL have been changed.
|
// This is required for the case when the number of queues has been changed or URL have been changed.
|
||||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014
|
||||||
|
@ -274,8 +256,8 @@ func dropDanglingQueues() {
|
||||||
// In case if there were many persistent queues with identical *remoteWriteURLs
|
// In case if there were many persistent queues with identical *remoteWriteURLs
|
||||||
// the queue with the last index will be dropped.
|
// the queue with the last index will be dropped.
|
||||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6140
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6140
|
||||||
existingQueues := make(map[string]struct{}, len(rwctxsDefault))
|
existingQueues := make(map[string]struct{}, len(rwctxs))
|
||||||
for _, rwctx := range rwctxsDefault {
|
for _, rwctx := range rwctxs {
|
||||||
existingQueues[rwctx.fq.Dirname()] = struct{}{}
|
existingQueues[rwctx.fq.Dirname()] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -292,7 +274,7 @@ func dropDanglingQueues() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if removed > 0 {
|
if removed > 0 {
|
||||||
logger.Infof("removed %d dangling queues from %q, active queues: %d", removed, *tmpDataPath, len(rwctxsDefault))
|
logger.Infof("removed %d dangling queues from %q, active queues: %d", removed, *tmpDataPath, len(rwctxs))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -320,18 +302,6 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
func reloadStreamAggrConfigs() {
|
func reloadStreamAggrConfigs() {
|
||||||
if len(*remoteWriteMultitenantURLs) > 0 {
|
|
||||||
rwctxsMapLock.Lock()
|
|
||||||
for _, rwctxs := range rwctxsMap {
|
|
||||||
reinitStreamAggr(rwctxs)
|
|
||||||
}
|
|
||||||
rwctxsMapLock.Unlock()
|
|
||||||
} else {
|
|
||||||
reinitStreamAggr(rwctxsDefault)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func reinitStreamAggr(rwctxs []*remoteWriteCtx) {
|
|
||||||
for _, rwctx := range rwctxs {
|
for _, rwctx := range rwctxs {
|
||||||
rwctx.reinitStreamAggr()
|
rwctx.reinitStreamAggr()
|
||||||
}
|
}
|
||||||
|
@ -411,18 +381,10 @@ func Stop() {
|
||||||
close(configReloaderStopCh)
|
close(configReloaderStopCh)
|
||||||
configReloaderWG.Wait()
|
configReloaderWG.Wait()
|
||||||
|
|
||||||
for _, rwctx := range rwctxsDefault {
|
|
||||||
rwctx.MustStop()
|
|
||||||
}
|
|
||||||
rwctxsDefault = nil
|
|
||||||
|
|
||||||
// There is no need in locking rwctxsMapLock here, since nobody should call TryPush during the Stop call.
|
|
||||||
for _, rwctxs := range rwctxsMap {
|
|
||||||
for _, rwctx := range rwctxs {
|
for _, rwctx := range rwctxs {
|
||||||
rwctx.MustStop()
|
rwctx.MustStop()
|
||||||
}
|
}
|
||||||
}
|
rwctxs = nil
|
||||||
rwctxsMap = nil
|
|
||||||
|
|
||||||
if sl := hourlySeriesLimiter; sl != nil {
|
if sl := hourlySeriesLimiter; sl != nil {
|
||||||
sl.MustStop()
|
sl.MustStop()
|
||||||
|
@ -432,20 +394,14 @@ func Stop() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// PushDropSamplesOnFailure pushes wr to the configured remote storage systems set via -remoteWrite.url and -remoteWrite.multitenantURL
|
// PushDropSamplesOnFailure pushes wr to the configured remote storage systems set via -remoteWrite.url
|
||||||
//
|
|
||||||
// If at is nil, then the data is pushed to the configured -remoteWrite.url.
|
|
||||||
// If at isn't nil, the data is pushed to the configured -remoteWrite.multitenantURL.
|
|
||||||
//
|
//
|
||||||
// PushDropSamplesOnFailure can modify wr contents.
|
// PushDropSamplesOnFailure can modify wr contents.
|
||||||
func PushDropSamplesOnFailure(at *auth.Token, wr *prompbmarshal.WriteRequest) {
|
func PushDropSamplesOnFailure(at *auth.Token, wr *prompbmarshal.WriteRequest) {
|
||||||
_ = tryPush(at, wr, true)
|
_ = tryPush(at, wr, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TryPush tries sending wr to the configured remote storage systems set via -remoteWrite.url and -remoteWrite.multitenantURL
|
// TryPush tries sending wr to the configured remote storage systems set via -remoteWrite.url
|
||||||
//
|
|
||||||
// If at is nil, then the data is pushed to the configured -remoteWrite.url.
|
|
||||||
// If at isn't nil, the data is pushed to the configured -remoteWrite.multitenantURL.
|
|
||||||
//
|
//
|
||||||
// TryPush can modify wr contents, so the caller must re-initialize wr before calling TryPush() after unsuccessful attempt.
|
// TryPush can modify wr contents, so the caller must re-initialize wr before calling TryPush() after unsuccessful attempt.
|
||||||
// TryPush may send partial data from wr on unsuccessful attempt, so repeated call for the same wr may send the data multiple times.
|
// TryPush may send partial data from wr on unsuccessful attempt, so repeated call for the same wr may send the data multiple times.
|
||||||
|
@ -464,28 +420,11 @@ func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, forceDropSamplesOnF
|
||||||
}
|
}
|
||||||
|
|
||||||
var tenantRctx *relabelCtx
|
var tenantRctx *relabelCtx
|
||||||
var rwctxs []*remoteWriteCtx
|
if at != nil {
|
||||||
if at == nil {
|
|
||||||
rwctxs = rwctxsDefault
|
|
||||||
} else if len(*remoteWriteMultitenantURLs) == 0 {
|
|
||||||
// Convert at to (vm_account_id, vm_project_id) labels.
|
// Convert at to (vm_account_id, vm_project_id) labels.
|
||||||
tenantRctx = getRelabelCtx()
|
tenantRctx = getRelabelCtx()
|
||||||
defer putRelabelCtx(tenantRctx)
|
defer putRelabelCtx(tenantRctx)
|
||||||
rwctxs = rwctxsDefault
|
|
||||||
} else {
|
|
||||||
rwctxsMapLock.Lock()
|
|
||||||
tenantID := tenantmetrics.TenantID{
|
|
||||||
AccountID: at.AccountID,
|
|
||||||
ProjectID: at.ProjectID,
|
|
||||||
}
|
}
|
||||||
rwctxs = rwctxsMap[tenantID]
|
|
||||||
if rwctxs == nil {
|
|
||||||
rwctxs = newRemoteWriteCtxs(at, *remoteWriteMultitenantURLs)
|
|
||||||
rwctxsMap[tenantID] = rwctxs
|
|
||||||
}
|
|
||||||
rwctxsMapLock.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
rowsCount := getRowsCount(tss)
|
rowsCount := getRowsCount(tss)
|
||||||
|
|
||||||
// Quick check whether writes to configured remote storage systems are blocked.
|
// Quick check whether writes to configured remote storage systems are blocked.
|
||||||
|
@ -552,14 +491,14 @@ func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, forceDropSamplesOnF
|
||||||
}
|
}
|
||||||
sortLabelsIfNeeded(tssBlock)
|
sortLabelsIfNeeded(tssBlock)
|
||||||
tssBlock = limitSeriesCardinality(tssBlock)
|
tssBlock = limitSeriesCardinality(tssBlock)
|
||||||
if !tryPushBlockToRemoteStorages(rwctxs, tssBlock, forceDropSamplesOnFailure) {
|
if !tryPushBlockToRemoteStorages(tssBlock, forceDropSamplesOnFailure) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarshal.TimeSeries, forceDropSamplesOnFailure bool) bool {
|
func tryPushBlockToRemoteStorages(tssBlock []prompbmarshal.TimeSeries, forceDropSamplesOnFailure bool) bool {
|
||||||
if len(tssBlock) == 0 {
|
if len(tssBlock) == 0 {
|
||||||
// Nothing to push
|
// Nothing to push
|
||||||
return true
|
return true
|
||||||
|
@ -578,7 +517,7 @@ func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmar
|
||||||
if replicas <= 0 {
|
if replicas <= 0 {
|
||||||
replicas = 1
|
replicas = 1
|
||||||
}
|
}
|
||||||
return tryShardingBlockAmongRemoteStorages(rwctxs, tssBlock, replicas, forceDropSamplesOnFailure)
|
return tryShardingBlockAmongRemoteStorages(tssBlock, replicas, forceDropSamplesOnFailure)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Replicate tssBlock samples among rwctxs.
|
// Replicate tssBlock samples among rwctxs.
|
||||||
|
@ -599,7 +538,7 @@ func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmar
|
||||||
return !anyPushFailed.Load()
|
return !anyPushFailed.Load()
|
||||||
}
|
}
|
||||||
|
|
||||||
func tryShardingBlockAmongRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarshal.TimeSeries, replicas int, forceDropSamplesOnFailure bool) bool {
|
func tryShardingBlockAmongRemoteStorages(tssBlock []prompbmarshal.TimeSeries, replicas int, forceDropSamplesOnFailure bool) bool {
|
||||||
x := getTSSShards(len(rwctxs))
|
x := getTSSShards(len(rwctxs))
|
||||||
defer putTSSShards(x)
|
defer putTSSShards(x)
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,120 @@
|
||||||
|
import React, { FC, useState } from "preact/compat";
|
||||||
|
import Button from "../Main/Button/Button";
|
||||||
|
import TextField from "../Main/TextField/TextField";
|
||||||
|
import Modal from "../Main/Modal/Modal";
|
||||||
|
import Spinner from "../Main/Spinner/Spinner";
|
||||||
|
import { DownloadIcon, ErrorIcon } from "../Main/Icons";
|
||||||
|
import useBoolean from "../../hooks/useBoolean";
|
||||||
|
import useDeviceDetect from "../../hooks/useDeviceDetect";
|
||||||
|
import { useAppState } from "../../state/common/StateContext";
|
||||||
|
import classNames from "classnames";
|
||||||
|
import "./style.scss";
|
||||||
|
|
||||||
|
const AnomalyConfig: FC = () => {
|
||||||
|
const { serverUrl } = useAppState();
|
||||||
|
const { isMobile } = useDeviceDetect();
|
||||||
|
|
||||||
|
const {
|
||||||
|
value: isModalOpen,
|
||||||
|
setTrue: setOpenModal,
|
||||||
|
setFalse: setCloseModal,
|
||||||
|
} = useBoolean(false);
|
||||||
|
|
||||||
|
const [isLoading, setIsLoading] = useState(false);
|
||||||
|
const [textConfig, setTextConfig] = useState<string>("");
|
||||||
|
const [downloadUrl, setDownloadUrl] = useState<string>("");
|
||||||
|
const [error, setError] = useState<string>("");
|
||||||
|
|
||||||
|
const fetchConfig = async () => {
|
||||||
|
setIsLoading(true);
|
||||||
|
try {
|
||||||
|
const url = `${serverUrl}/api/vmanomaly/config.yaml`;
|
||||||
|
const response = await fetch(url);
|
||||||
|
if (!response.ok) {
|
||||||
|
setError(` ${response.status} ${response.statusText}`);
|
||||||
|
} else {
|
||||||
|
const blob = await response.blob();
|
||||||
|
const yamlAsString = await blob.text();
|
||||||
|
setTextConfig(yamlAsString);
|
||||||
|
setDownloadUrl(URL.createObjectURL(blob));
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
console.error(error);
|
||||||
|
setError(String(error));
|
||||||
|
}
|
||||||
|
setIsLoading(false);
|
||||||
|
};
|
||||||
|
|
||||||
|
const handleOpenModal = () => {
|
||||||
|
setOpenModal();
|
||||||
|
setError("");
|
||||||
|
URL.revokeObjectURL(downloadUrl);
|
||||||
|
setTextConfig("");
|
||||||
|
setDownloadUrl("");
|
||||||
|
fetchConfig();
|
||||||
|
};
|
||||||
|
|
||||||
|
return (
|
||||||
|
<>
|
||||||
|
<Button
|
||||||
|
color="secondary"
|
||||||
|
variant="outlined"
|
||||||
|
onClick={handleOpenModal}
|
||||||
|
>
|
||||||
|
Open Config
|
||||||
|
</Button>
|
||||||
|
{isModalOpen && (
|
||||||
|
<Modal
|
||||||
|
title="Download config"
|
||||||
|
onClose={setCloseModal}
|
||||||
|
>
|
||||||
|
<div
|
||||||
|
className={classNames({
|
||||||
|
"vm-anomaly-config": true,
|
||||||
|
"vm-anomaly-config_mobile": isMobile,
|
||||||
|
})}
|
||||||
|
>
|
||||||
|
{isLoading && (
|
||||||
|
<Spinner
|
||||||
|
containerStyles={{ position: "relative" }}
|
||||||
|
message={"Loading config..."}
|
||||||
|
/>
|
||||||
|
)}
|
||||||
|
{!isLoading && error && (
|
||||||
|
<div className="vm-anomaly-config-error">
|
||||||
|
<div className="vm-anomaly-config-error__icon"><ErrorIcon/></div>
|
||||||
|
<h3 className="vm-anomaly-config-error__title">Cannot download config</h3>
|
||||||
|
<p className="vm-anomaly-config-error__text">{error}</p>
|
||||||
|
</div>
|
||||||
|
)}
|
||||||
|
{!isLoading && textConfig && (
|
||||||
|
<TextField
|
||||||
|
value={textConfig}
|
||||||
|
label={"config.yaml"}
|
||||||
|
type="textarea"
|
||||||
|
disabled={true}
|
||||||
|
/>
|
||||||
|
)}
|
||||||
|
<div className="vm-anomaly-config-footer">
|
||||||
|
{downloadUrl && (
|
||||||
|
<a
|
||||||
|
href={downloadUrl}
|
||||||
|
download={"config.yaml"}
|
||||||
|
>
|
||||||
|
<Button
|
||||||
|
variant="contained"
|
||||||
|
startIcon={<DownloadIcon/>}
|
||||||
|
>
|
||||||
|
download
|
||||||
|
</Button>
|
||||||
|
</a>
|
||||||
|
)}
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
</Modal>
|
||||||
|
)}
|
||||||
|
</>
|
||||||
|
);
|
||||||
|
};
|
||||||
|
|
||||||
|
export default AnomalyConfig;
|
|
@ -0,0 +1,61 @@
|
||||||
|
@use "src/styles/variables" as *;
|
||||||
|
|
||||||
|
.vm-anomaly-config {
|
||||||
|
display: grid;
|
||||||
|
grid-template-rows: calc(($vh * 70) - 78px - ($padding-medium*3)) auto;
|
||||||
|
gap: $padding-global;
|
||||||
|
min-width: 400px;
|
||||||
|
max-width: 80vw;
|
||||||
|
min-height: 300px;
|
||||||
|
|
||||||
|
&_mobile {
|
||||||
|
width: 100%;
|
||||||
|
max-width: none;
|
||||||
|
min-height: 100%;
|
||||||
|
grid-template-rows: calc(($vh * 100) - 78px - ($padding-global*3)) auto;
|
||||||
|
}
|
||||||
|
|
||||||
|
textarea {
|
||||||
|
overflow: auto;
|
||||||
|
width: 100%;
|
||||||
|
height: 100%;
|
||||||
|
max-height: 900px;
|
||||||
|
}
|
||||||
|
|
||||||
|
&-error {
|
||||||
|
display: flex;
|
||||||
|
flex-direction: column;
|
||||||
|
align-items: center;
|
||||||
|
justify-content: center;
|
||||||
|
width: 100%;
|
||||||
|
gap: $padding-small;
|
||||||
|
text-align: center;
|
||||||
|
|
||||||
|
&__icon {
|
||||||
|
display: flex;
|
||||||
|
align-items: center;
|
||||||
|
justify-content: center;
|
||||||
|
width: 30px;
|
||||||
|
height: 30px;
|
||||||
|
margin-bottom: $padding-small;
|
||||||
|
color: $color-error;
|
||||||
|
}
|
||||||
|
|
||||||
|
&__title {
|
||||||
|
font-size: $font-size-medium;
|
||||||
|
font-weight: bold;
|
||||||
|
}
|
||||||
|
|
||||||
|
&__text {
|
||||||
|
max-width: 700px;
|
||||||
|
line-height: 1.3;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
&-footer {
|
||||||
|
display: flex;
|
||||||
|
align-items: center;
|
||||||
|
justify-content: flex-end;
|
||||||
|
gap: $padding-small;
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,4 +1,5 @@
|
||||||
import React, { CSSProperties, FC } from "preact/compat";
|
import React, { FC } from "preact/compat";
|
||||||
|
import { CSSProperties } from "react";
|
||||||
import "./style.scss";
|
import "./style.scss";
|
||||||
import classNames from "classnames";
|
import classNames from "classnames";
|
||||||
import { useAppState } from "../../../state/common/StateContext";
|
import { useAppState } from "../../../state/common/StateContext";
|
||||||
|
@ -8,7 +9,7 @@ interface SpinnerProps {
|
||||||
message?: string
|
message?: string
|
||||||
}
|
}
|
||||||
|
|
||||||
const Spinner: FC<SpinnerProps> = ({ containerStyles = {}, message }) => {
|
const Spinner: FC<SpinnerProps> = ({ containerStyles, message }) => {
|
||||||
const { isDarkTheme } = useAppState();
|
const { isDarkTheme } = useAppState();
|
||||||
|
|
||||||
return (
|
return (
|
||||||
|
@ -17,7 +18,7 @@ const Spinner: FC<SpinnerProps> = ({ containerStyles = {}, message }) => {
|
||||||
"vm-spinner": true,
|
"vm-spinner": true,
|
||||||
"vm-spinner_dark": isDarkTheme,
|
"vm-spinner_dark": isDarkTheme,
|
||||||
})}
|
})}
|
||||||
style={containerStyles && {}}
|
style={containerStyles}
|
||||||
>
|
>
|
||||||
<div className="half-circle-spinner">
|
<div className="half-circle-spinner">
|
||||||
<div className="circle circle-1"></div>
|
<div className="circle circle-1"></div>
|
||||||
|
|
|
@ -24,6 +24,7 @@ import useDeviceDetect from "../../../hooks/useDeviceDetect";
|
||||||
import { QueryStats } from "../../../api/types";
|
import { QueryStats } from "../../../api/types";
|
||||||
import { usePrettifyQuery } from "./hooks/usePrettifyQuery";
|
import { usePrettifyQuery } from "./hooks/usePrettifyQuery";
|
||||||
import QueryHistory from "../QueryHistory/QueryHistory";
|
import QueryHistory from "../QueryHistory/QueryHistory";
|
||||||
|
import AnomalyConfig from "../../../components/ExploreAnomaly/AnomalyConfig";
|
||||||
|
|
||||||
export interface QueryConfiguratorProps {
|
export interface QueryConfiguratorProps {
|
||||||
queryErrors: string[];
|
queryErrors: string[];
|
||||||
|
@ -37,6 +38,7 @@ export interface QueryConfiguratorProps {
|
||||||
prettify?: boolean;
|
prettify?: boolean;
|
||||||
autocomplete?: boolean;
|
autocomplete?: boolean;
|
||||||
traceQuery?: boolean;
|
traceQuery?: boolean;
|
||||||
|
anomalyConfig?: boolean;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -253,6 +255,7 @@ const QueryConfigurator: FC<QueryConfiguratorProps> = ({
|
||||||
<AdditionalSettings hideButtons={hideButtons}/>
|
<AdditionalSettings hideButtons={hideButtons}/>
|
||||||
<div className="vm-query-configurator-settings__buttons">
|
<div className="vm-query-configurator-settings__buttons">
|
||||||
<QueryHistory handleSelectQuery={handleSelectHistory}/>
|
<QueryHistory handleSelectQuery={handleSelectHistory}/>
|
||||||
|
{hideButtons?.anomalyConfig && <AnomalyConfig/>}
|
||||||
{!hideButtons?.addQuery && stateQuery.length < MAX_QUERY_FIELDS && (
|
{!hideButtons?.addQuery && stateQuery.length < MAX_QUERY_FIELDS && (
|
||||||
<Button
|
<Button
|
||||||
variant="outlined"
|
variant="outlined"
|
||||||
|
|
|
@ -87,7 +87,7 @@ const ExploreAnomaly: FC = () => {
|
||||||
setHideError={setHideError}
|
setHideError={setHideError}
|
||||||
stats={queryStats}
|
stats={queryStats}
|
||||||
onRunQuery={handleRunQuery}
|
onRunQuery={handleRunQuery}
|
||||||
hideButtons={{ addQuery: true, prettify: true, autocomplete: true, traceQuery: true }}
|
hideButtons={{ addQuery: true, prettify: true, autocomplete: true, traceQuery: true, anomalyConfig: true }}
|
||||||
/>
|
/>
|
||||||
{isLoading && <Spinner/>}
|
{isLoading && <Spinner/>}
|
||||||
{(!hideError && error) && <Alert variant="error">{error}</Alert>}
|
{(!hideError && error) && <Alert variant="error">{error}</Alert>}
|
||||||
|
|
|
@ -8,7 +8,7 @@ export const getDefaultServer = (tenantId?: string): string => {
|
||||||
const { serverURL } = getAppModeParams();
|
const { serverURL } = getAppModeParams();
|
||||||
const storageURL = getFromStorage("SERVER_URL") as string;
|
const storageURL = getFromStorage("SERVER_URL") as string;
|
||||||
const logsURL = window.location.href.replace(/\/(select\/)?(vmui)\/.*/, "");
|
const logsURL = window.location.href.replace(/\/(select\/)?(vmui)\/.*/, "");
|
||||||
const anomalyURL = window.location.href.replace(/(?:graph|vmui)\/.*/, "");
|
const anomalyURL = `${window.location.origin}${window.location.pathname}`;
|
||||||
const defaultURL = window.location.href.replace(/\/(?:prometheus\/)?(?:graph|vmui)\/.*/, "/prometheus");
|
const defaultURL = window.location.href.replace(/\/(?:prometheus\/)?(?:graph|vmui)\/.*/, "/prometheus");
|
||||||
const url = serverURL || storageURL || defaultURL;
|
const url = serverURL || storageURL || defaultURL;
|
||||||
|
|
||||||
|
@ -16,7 +16,7 @@ export const getDefaultServer = (tenantId?: string): string => {
|
||||||
case AppType.logs:
|
case AppType.logs:
|
||||||
return logsURL;
|
return logsURL;
|
||||||
case AppType.anomaly:
|
case AppType.anomaly:
|
||||||
return serverURL || storageURL || anomalyURL;
|
return storageURL || anomalyURL;
|
||||||
default:
|
default:
|
||||||
return tenantId ? replaceTenantId(url, tenantId) : url;
|
return tenantId ? replaceTenantId(url, tenantId) : url;
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,6 +30,9 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
|
||||||
|
|
||||||
## tip
|
## tip
|
||||||
|
|
||||||
|
**Update note 1: the `-remoteWrite.multitenantURL` command-line flag at `vmagent` was removed starting from this release. This flag was deprecated since [v1.96.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.96.0). Use `-enableMultitenantHandlers` instead, as it is easier to use and combine with [multitenant URL at vminsert](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy-via-labels). See these [docs for details](https://docs.victoriametrics.com/vmagent.html#multitenancy).**
|
||||||
|
|
||||||
|
|
||||||
* SECURITY: upgrade Go builder from Go1.22.2 to Go1.22.3. See [the list of issues addressed in Go1.22.3](https://github.com/golang/go/issues?q=milestone%3AGo1.22.3+label%3ACherryPickApproved).
|
* SECURITY: upgrade Go builder from Go1.22.2 to Go1.22.3. See [the list of issues addressed in Go1.22.3](https://github.com/golang/go/issues?q=milestone%3AGo1.22.3+label%3ACherryPickApproved).
|
||||||
|
|
||||||
* FEATURE: [dashboards/single](https://grafana.com/grafana/dashboards/10229): support selecting of multiple instances on the dashboard. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5869) for details.
|
* FEATURE: [dashboards/single](https://grafana.com/grafana/dashboards/10229): support selecting of multiple instances on the dashboard. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5869) for details.
|
||||||
|
@ -40,12 +43,15 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
|
||||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add service discovery support for [Vultr](https://www.vultr.com/). See [these docs](https://docs.victoriametrics.com/sd_configs/#vultr_sd_configs) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6041).
|
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add service discovery support for [Vultr](https://www.vultr.com/). See [these docs](https://docs.victoriametrics.com/sd_configs/#vultr_sd_configs) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6041).
|
||||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow configuring `-remoteWrite.disableOnDiskQueue` and `-remoteWrite.dropSamplesOnOverload` cmd-line flags per each `-remoteWrite.url`. See this [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6065). Thanks to @rbizos for implementaion!
|
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow configuring `-remoteWrite.disableOnDiskQueue` and `-remoteWrite.dropSamplesOnOverload` cmd-line flags per each `-remoteWrite.url`. See this [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6065). Thanks to @rbizos for implementaion!
|
||||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add labels `path` and `url` to metrics `vmagent_remotewrite_push_failures_total` and `vmagent_remotewrite_samples_dropped_total`. Now number of failed pushes and dropped samples can be tracked per `-remoteWrite.url`.
|
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add labels `path` and `url` to metrics `vmagent_remotewrite_push_failures_total` and `vmagent_remotewrite_samples_dropped_total`. Now number of failed pushes and dropped samples can be tracked per `-remoteWrite.url`.
|
||||||
|
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [rate_sum](https://docs.victoriametrics.com/stream-aggregation/#rate_sum) and [rate_avg](https://docs.victoriametrics.com/stream-aggregation/#rate_avg) aggregation outputs.
|
||||||
|
|
||||||
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix bug that prevents the first query trace from expanding on click event. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6186). The issue was introduced in [v1.100.0](https://docs.victoriametrics.com/changelog/#v11000) release.
|
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix bug that prevents the first query trace from expanding on click event. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6186). The issue was introduced in [v1.100.0](https://docs.victoriametrics.com/changelog/#v11000) release.
|
||||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent/): prevent potential panic during [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) if more than one `--remoteWrite.streamAggr.dedupInterval` is configured. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6205).
|
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent/): prevent potential panic during [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) if more than one `--remoteWrite.streamAggr.dedupInterval` is configured. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6205).
|
||||||
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): set correct suffix `<output>_prometheus` for aggregation outputs [increase_prometheus](https://docs.victoriametrics.com/stream-aggregation/#increase_prometheus) and [total_prometheus](https://docs.victoriametrics.com/stream-aggregation/#total_prometheus). Before, outputs `total` and `total_prometheus` or `increase` and `increase_prometheus` had the same suffix.
|
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): set correct suffix `<output>_prometheus` for aggregation outputs [increase_prometheus](https://docs.victoriametrics.com/stream-aggregation/#increase_prometheus) and [total_prometheus](https://docs.victoriametrics.com/stream-aggregation/#total_prometheus). Before, outputs `total` and `total_prometheus` or `increase` and `increase_prometheus` had the same suffix.
|
||||||
* BUGFIX: properly estimate the needed memory for query execution if it has the format [`aggr_func`](https://docs.victoriametrics.com/metricsql/#aggregate-functions)([`rollup_func[d]`](https://docs.victoriametrics.com/metricsql/#rollup-functions) (for example, `sum(rate(request_duration_seconds_bucket[5m]))`). This should allow performing aggregations over bigger number of time series when VictoriaMetrics runs in environments with small amounts of available memory. The issue has been introduced in [this commit](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/5138eaeea0791caa34bcfab410e0ca9cd253cd8f) in [v1.83.0](https://docs.victoriametrics.com/changelog_2022/#v1830).
|
* BUGFIX: properly estimate the needed memory for query execution if it has the format [`aggr_func`](https://docs.victoriametrics.com/metricsql/#aggregate-functions)([`rollup_func[d]`](https://docs.victoriametrics.com/metricsql/#rollup-functions) (for example, `sum(rate(request_duration_seconds_bucket[5m]))`). This should allow performing aggregations over bigger number of time series when VictoriaMetrics runs in environments with small amounts of available memory. The issue has been introduced in [this commit](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/5138eaeea0791caa34bcfab410e0ca9cd253cd8f) in [v1.83.0](https://docs.victoriametrics.com/changelog_2022/#v1830).
|
||||||
|
|
||||||
|
* DEPRECATION: [vmagent](https://docs.victoriametrics.com/vmagent/): removed deprecated `-remoteWrite.multitenantURL` flag from vmagent. This flag was deprecated since [v1.96.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.96.0). Use `-enableMultitenantHandlers` instead, as it is easier to use and combine with [multitenant URL at vminsert](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy-via-labels). See these [docs for details](https://docs.victoriametrics.com/vmagent.html#multitenancy).
|
||||||
|
|
||||||
## [v1.101.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.101.0)
|
## [v1.101.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.101.0)
|
||||||
|
|
||||||
Released at 2024-04-26
|
Released at 2024-04-26
|
||||||
|
|
|
@ -513,6 +513,8 @@ Below are aggregation functions that can be put in the `outputs` list at [stream
|
||||||
* [count_series](#count_series)
|
* [count_series](#count_series)
|
||||||
* [increase](#increase)
|
* [increase](#increase)
|
||||||
* [increase_prometheus](#increase_prometheus)
|
* [increase_prometheus](#increase_prometheus)
|
||||||
|
* [rate_sum](#rate_sum)
|
||||||
|
* [rate_avg](#rate_avg)
|
||||||
* [histogram_bucket](#histogram_bucket)
|
* [histogram_bucket](#histogram_bucket)
|
||||||
* [last](#last)
|
* [last](#last)
|
||||||
* [max](#max)
|
* [max](#max)
|
||||||
|
@ -577,7 +579,7 @@ The results of `increase` is equal to the following [MetricsQL](https://docs.vic
|
||||||
sum(increase_pure(some_counter[interval]))
|
sum(increase_pure(some_counter[interval]))
|
||||||
```
|
```
|
||||||
|
|
||||||
`increase` assumes that all the counters start from 0. For example, if the fist seen sample for new [time series](https://docs.victoriametrics.com/keyconcepts/#time-series)
|
`increase` assumes that all the counters start from 0. For example, if the first seen sample for new [time series](https://docs.victoriametrics.com/keyconcepts/#time-series)
|
||||||
is `10`, then `increase` assumes that the time series has been increased by `10`. If you need ignoring the first sample for new time series,
|
is `10`, then `increase` assumes that the time series has been increased by `10`. If you need ignoring the first sample for new time series,
|
||||||
then take a look at [increase_prometheus](#increase_prometheus).
|
then take a look at [increase_prometheus](#increase_prometheus).
|
||||||
|
|
||||||
|
@ -585,21 +587,37 @@ For example, see below time series produced by config with aggregation interval
|
||||||
|
|
||||||
<img alt="increase aggregation" src="stream-aggregation-check-increase.webp">
|
<img alt="increase aggregation" src="stream-aggregation-check-increase.webp">
|
||||||
|
|
||||||
`increase` can be used as an alternative for [rate](https://docs.victoriametrics.com/metricsql/#rate) function.
|
|
||||||
For example, if `increase` is calculated for `some_counter` with `interval: 5m`, then `rate` can be calculated
|
|
||||||
by dividing the resulting aggregation by `5m`:
|
|
||||||
|
|
||||||
```metricsql
|
|
||||||
some_counter:5m_increase / 5m
|
|
||||||
```
|
|
||||||
|
|
||||||
This is similar to `rate(some_counter[5m])`.
|
|
||||||
|
|
||||||
Aggregating irregular and sporadic metrics (received from [Lambdas](https://aws.amazon.com/lambda/)
|
Aggregating irregular and sporadic metrics (received from [Lambdas](https://aws.amazon.com/lambda/)
|
||||||
or [Cloud Functions](https://cloud.google.com/functions)) can be controlled via [staleness_interval](#staleness) option.
|
or [Cloud Functions](https://cloud.google.com/functions)) can be controlled via [staleness_interval](#staleness) option.
|
||||||
|
|
||||||
See also [increase_prometheus](#increase_prometheus) and [total](#total).
|
See also [increase_prometheus](#increase_prometheus) and [total](#total).
|
||||||
|
|
||||||
|
### rate_sum
|
||||||
|
|
||||||
|
`rate_sum` returns the sum of average per-second change of input [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) over the given `interval`.
|
||||||
|
`rate_sum` makes sense only for aggregating [counters](https://docs.victoriametrics.com/keyconcepts/#counter).
|
||||||
|
|
||||||
|
The results of `rate_sum` are equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query:
|
||||||
|
|
||||||
|
```metricsql
|
||||||
|
sum(rate(some_counter[interval]))
|
||||||
|
```
|
||||||
|
|
||||||
|
See also [rate_avg](#rate_avg) and [total](#total) outputs.
|
||||||
|
|
||||||
|
### rate_avg
|
||||||
|
|
||||||
|
`rate_avg` returns the average of average per-second of input [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) over the given `interval`.
|
||||||
|
`rate_avg` makes sense only for aggregating [counters](https://docs.victoriametrics.com/keyconcepts/#counter).
|
||||||
|
|
||||||
|
The results of `rate_avg` are equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query:
|
||||||
|
|
||||||
|
```metricsql
|
||||||
|
avg(rate(some_counter[interval]))
|
||||||
|
```
|
||||||
|
|
||||||
|
See also [rate_sum](#rate_avg) and [total](#total) outputs.
|
||||||
|
|
||||||
### increase_prometheus
|
### increase_prometheus
|
||||||
|
|
||||||
`increase_prometheus` returns the increase of input [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) over the given `interval`.
|
`increase_prometheus` returns the increase of input [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) over the given `interval`.
|
||||||
|
@ -741,7 +759,7 @@ The results of `total` is roughly equal to the the following [MetricsQL](https:/
|
||||||
sum(running_sum(increase_pure(some_counter)))
|
sum(running_sum(increase_pure(some_counter)))
|
||||||
```
|
```
|
||||||
|
|
||||||
`total` assumes that all the counters start from 0. For example, if the fist seen sample for new [time series](https://docs.victoriametrics.com/keyconcepts/#time-series)
|
`total` assumes that all the counters start from 0. For example, if the first seen sample for new [time series](https://docs.victoriametrics.com/keyconcepts/#time-series)
|
||||||
is `10`, then `total` assumes that the time series has been increased by `10`. If you need ignoring the first sample for new time series,
|
is `10`, then `total` assumes that the time series has been increased by `10`. If you need ignoring the first sample for new time series,
|
||||||
then take a look at [total_prometheus](#total_prometheus).
|
then take a look at [total_prometheus](#total_prometheus).
|
||||||
|
|
||||||
|
|
|
@ -2099,10 +2099,6 @@ See the docs at https://docs.victoriametrics.com/vmagent/ .
|
||||||
The maximum number of unique series vmagent can send to remote storage systems during the last hour. Excess series are logged and dropped. This can be useful for limiting series cardinality. See https://docs.victoriametrics.com/vmagent/#cardinality-limiter
|
The maximum number of unique series vmagent can send to remote storage systems during the last hour. Excess series are logged and dropped. This can be useful for limiting series cardinality. See https://docs.victoriametrics.com/vmagent/#cardinality-limiter
|
||||||
-remoteWrite.maxRowsPerBlock int
|
-remoteWrite.maxRowsPerBlock int
|
||||||
The maximum number of samples to send in each block to remote storage. Higher number may improve performance at the cost of the increased memory usage. See also -remoteWrite.maxBlockSize (default 10000)
|
The maximum number of samples to send in each block to remote storage. Higher number may improve performance at the cost of the increased memory usage. See also -remoteWrite.maxBlockSize (default 10000)
|
||||||
-remoteWrite.multitenantURL array
|
|
||||||
Base path for multitenant remote storage URL to write data to. See https://docs.victoriametrics.com/vmagent/#multitenancy for details. Example url: http://<vminsert>:8480 . Pass multiple -remoteWrite.multitenantURL flags in order to replicate data to multiple remote storage systems. This flag is deprecated in favor of -enableMultitenantHandlers . See https://docs.victoriametrics.com/vmagent/#multitenancy
|
|
||||||
Supports an array of values separated by comma or specified via multiple flags.
|
|
||||||
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
|
|
||||||
-remoteWrite.oauth2.clientID array
|
-remoteWrite.oauth2.clientID array
|
||||||
Optional OAuth2 clientID to use for the corresponding -remoteWrite.url
|
Optional OAuth2 clientID to use for the corresponding -remoteWrite.url
|
||||||
Supports an array of values separated by comma or specified via multiple flags.
|
Supports an array of values separated by comma or specified via multiple flags.
|
||||||
|
|
|
@ -1512,7 +1512,7 @@ func (c *blockResultColumn) getFloatValueAtRow(rowIdx int) float64 {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *blockResultColumn) getMaxValue(br *blockResult) float64 {
|
func (c *blockResultColumn) getMaxValue(_ *blockResult) float64 {
|
||||||
if c.isConst {
|
if c.isConst {
|
||||||
v := c.encodedValues[0]
|
v := c.encodedValues[0]
|
||||||
f, ok := tryParseFloat64(v)
|
f, ok := tryParseFloat64(v)
|
||||||
|
@ -1620,7 +1620,7 @@ func (c *blockResultColumn) getMaxValue(br *blockResult) float64 {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *blockResultColumn) getMinValue(br *blockResult) float64 {
|
func (c *blockResultColumn) getMinValue(_ *blockResult) float64 {
|
||||||
if c.isConst {
|
if c.isConst {
|
||||||
v := c.encodedValues[0]
|
v := c.encodedValues[0]
|
||||||
f, ok := tryParseFloat64(v)
|
f, ok := tryParseFloat64(v)
|
||||||
|
|
|
@ -19,7 +19,7 @@ func (po *pipeOffset) String() string {
|
||||||
func (po *pipeOffset) updateNeededFields(_, _ fieldsSet) {
|
func (po *pipeOffset) updateNeededFields(_, _ fieldsSet) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (po *pipeOffset) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor {
|
func (po *pipeOffset) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor {
|
||||||
return &pipeOffsetProcessor{
|
return &pipeOffsetProcessor{
|
||||||
po: po,
|
po: po,
|
||||||
ppBase: ppBase,
|
ppBase: ppBase,
|
||||||
|
|
|
@ -316,17 +316,17 @@ func (shard *pipeSortProcessorShard) createFloat64Values(values []string) []floa
|
||||||
return a
|
return a
|
||||||
}
|
}
|
||||||
|
|
||||||
func (psp *pipeSortProcessorShard) Len() int {
|
func (shard *pipeSortProcessorShard) Len() int {
|
||||||
return len(psp.rowRefs)
|
return len(shard.rowRefs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (psp *pipeSortProcessorShard) Swap(i, j int) {
|
func (shard *pipeSortProcessorShard) Swap(i, j int) {
|
||||||
rowRefs := psp.rowRefs
|
rowRefs := shard.rowRefs
|
||||||
rowRefs[i], rowRefs[j] = rowRefs[j], rowRefs[i]
|
rowRefs[i], rowRefs[j] = rowRefs[j], rowRefs[i]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (psp *pipeSortProcessorShard) Less(i, j int) bool {
|
func (shard *pipeSortProcessorShard) Less(i, j int) bool {
|
||||||
return sortBlockLess(psp, i, psp, j)
|
return sortBlockLess(shard, i, shard, j)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (psp *pipeSortProcessor) writeBlock(workerID uint, br *blockResult) {
|
func (psp *pipeSortProcessor) writeBlock(workerID uint, br *blockResult) {
|
||||||
|
|
|
@ -175,7 +175,6 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int {
|
||||||
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
|
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
keyBuf = sup.keyBuf
|
|
||||||
return stateSizeIncrease
|
return stateSizeIncrease
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -308,7 +307,7 @@ func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx in
|
||||||
m[string(keyBuf)] = struct{}{}
|
m[string(keyBuf)] = struct{}{}
|
||||||
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
|
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
|
||||||
}
|
}
|
||||||
sup.keyBuf = keyBuf
|
//sup.keyBuf = keyBuf
|
||||||
return stateSizeIncrease
|
return stateSizeIncrease
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -325,7 +324,6 @@ func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx in
|
||||||
m[string(keyBuf)] = struct{}{}
|
m[string(keyBuf)] = struct{}{}
|
||||||
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
|
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
|
||||||
}
|
}
|
||||||
keyBuf = sup.keyBuf
|
|
||||||
return stateSizeIncrease
|
return stateSizeIncrease
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -164,32 +164,6 @@ func (c *BlockColumn) reset() {
|
||||||
c.Values = nil
|
c.Values = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getBlockColumnIndex(columns []BlockColumn, columnName string) int {
|
|
||||||
for i, c := range columns {
|
|
||||||
if c.Name == columnName {
|
|
||||||
return i
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return -1
|
|
||||||
}
|
|
||||||
|
|
||||||
func getBlockColumnValues(columns []BlockColumn, columnName string, rowsCount int) []string {
|
|
||||||
for _, c := range columns {
|
|
||||||
if c.Name == columnName {
|
|
||||||
return c.Values
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return getEmptyStrings(rowsCount)
|
|
||||||
}
|
|
||||||
|
|
||||||
func appendBlockColumnValues(dst [][]string, columns []BlockColumn, fields []string, rowsCount int) [][]string {
|
|
||||||
for _, f := range fields {
|
|
||||||
values := getBlockColumnValues(columns, f, rowsCount)
|
|
||||||
dst = append(dst, values)
|
|
||||||
}
|
|
||||||
return dst
|
|
||||||
}
|
|
||||||
|
|
||||||
func getEmptyStrings(rowsCount int) []string {
|
func getEmptyStrings(rowsCount int) []string {
|
||||||
p := emptyStrings.Load()
|
p := emptyStrings.Load()
|
||||||
if p == nil {
|
if p == nil {
|
||||||
|
|
|
@ -89,7 +89,7 @@ func TestStorageRunQuery(t *testing.T) {
|
||||||
panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
|
panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
|
||||||
}
|
}
|
||||||
tenantIDs := []TenantID{tenantID}
|
tenantIDs := []TenantID{tenantID}
|
||||||
s.RunQuery(context.Background(), tenantIDs, q, writeBlock)
|
checkErr(t, s.RunQuery(context.Background(), tenantIDs, q, writeBlock))
|
||||||
})
|
})
|
||||||
t.Run("missing-message-text", func(_ *testing.T) {
|
t.Run("missing-message-text", func(_ *testing.T) {
|
||||||
q := mustParseQuery(`foobar`)
|
q := mustParseQuery(`foobar`)
|
||||||
|
@ -101,7 +101,7 @@ func TestStorageRunQuery(t *testing.T) {
|
||||||
panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
|
panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
|
||||||
}
|
}
|
||||||
tenantIDs := []TenantID{tenantID}
|
tenantIDs := []TenantID{tenantID}
|
||||||
s.RunQuery(context.Background(), tenantIDs, q, writeBlock)
|
checkErr(t, s.RunQuery(context.Background(), tenantIDs, q, writeBlock))
|
||||||
})
|
})
|
||||||
t.Run("matching-tenant-id", func(t *testing.T) {
|
t.Run("matching-tenant-id", func(t *testing.T) {
|
||||||
q := mustParseQuery(`tenant.id:*`)
|
q := mustParseQuery(`tenant.id:*`)
|
||||||
|
@ -135,7 +135,7 @@ func TestStorageRunQuery(t *testing.T) {
|
||||||
rowsCountTotal.Add(uint32(len(timestamps)))
|
rowsCountTotal.Add(uint32(len(timestamps)))
|
||||||
}
|
}
|
||||||
tenantIDs := []TenantID{tenantID}
|
tenantIDs := []TenantID{tenantID}
|
||||||
s.RunQuery(context.Background(), tenantIDs, q, writeBlock)
|
checkErr(t, s.RunQuery(context.Background(), tenantIDs, q, writeBlock))
|
||||||
|
|
||||||
expectedRowsCount := streamsPerTenant * blocksPerStream * rowsPerBlock
|
expectedRowsCount := streamsPerTenant * blocksPerStream * rowsPerBlock
|
||||||
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
|
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
|
||||||
|
@ -149,7 +149,7 @@ func TestStorageRunQuery(t *testing.T) {
|
||||||
writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
|
writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
|
||||||
rowsCountTotal.Add(uint32(len(timestamps)))
|
rowsCountTotal.Add(uint32(len(timestamps)))
|
||||||
}
|
}
|
||||||
s.RunQuery(context.Background(), allTenantIDs, q, writeBlock)
|
checkErr(t, s.RunQuery(context.Background(), allTenantIDs, q, writeBlock))
|
||||||
|
|
||||||
expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock
|
expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock
|
||||||
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
|
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
|
||||||
|
@ -162,7 +162,7 @@ func TestStorageRunQuery(t *testing.T) {
|
||||||
writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
|
writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
|
||||||
rowsCountTotal.Add(uint32(len(timestamps)))
|
rowsCountTotal.Add(uint32(len(timestamps)))
|
||||||
}
|
}
|
||||||
s.RunQuery(context.Background(), allTenantIDs, q, writeBlock)
|
checkErr(t, s.RunQuery(context.Background(), allTenantIDs, q, writeBlock))
|
||||||
|
|
||||||
expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock
|
expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock
|
||||||
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
|
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
|
||||||
|
@ -174,7 +174,7 @@ func TestStorageRunQuery(t *testing.T) {
|
||||||
writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
|
writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
|
||||||
panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
|
panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
|
||||||
}
|
}
|
||||||
s.RunQuery(context.Background(), allTenantIDs, q, writeBlock)
|
checkErr(t, s.RunQuery(context.Background(), allTenantIDs, q, writeBlock))
|
||||||
})
|
})
|
||||||
t.Run("matching-stream-id", func(t *testing.T) {
|
t.Run("matching-stream-id", func(t *testing.T) {
|
||||||
for i := 0; i < streamsPerTenant; i++ {
|
for i := 0; i < streamsPerTenant; i++ {
|
||||||
|
@ -208,7 +208,7 @@ func TestStorageRunQuery(t *testing.T) {
|
||||||
rowsCountTotal.Add(uint32(len(timestamps)))
|
rowsCountTotal.Add(uint32(len(timestamps)))
|
||||||
}
|
}
|
||||||
tenantIDs := []TenantID{tenantID}
|
tenantIDs := []TenantID{tenantID}
|
||||||
s.RunQuery(context.Background(), tenantIDs, q, writeBlock)
|
checkErr(t, s.RunQuery(context.Background(), tenantIDs, q, writeBlock))
|
||||||
|
|
||||||
expectedRowsCount := blocksPerStream * rowsPerBlock
|
expectedRowsCount := blocksPerStream * rowsPerBlock
|
||||||
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
|
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
|
||||||
|
@ -227,7 +227,7 @@ func TestStorageRunQuery(t *testing.T) {
|
||||||
rowsCountTotal.Add(uint32(len(timestamps)))
|
rowsCountTotal.Add(uint32(len(timestamps)))
|
||||||
}
|
}
|
||||||
tenantIDs := []TenantID{tenantID}
|
tenantIDs := []TenantID{tenantID}
|
||||||
s.RunQuery(context.Background(), tenantIDs, q, writeBlock)
|
checkErr(t, s.RunQuery(context.Background(), tenantIDs, q, writeBlock))
|
||||||
|
|
||||||
expectedRowsCount := streamsPerTenant * blocksPerStream * 2
|
expectedRowsCount := streamsPerTenant * blocksPerStream * 2
|
||||||
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
|
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
|
||||||
|
@ -247,7 +247,7 @@ func TestStorageRunQuery(t *testing.T) {
|
||||||
rowsCountTotal.Add(uint32(len(timestamps)))
|
rowsCountTotal.Add(uint32(len(timestamps)))
|
||||||
}
|
}
|
||||||
tenantIDs := []TenantID{tenantID}
|
tenantIDs := []TenantID{tenantID}
|
||||||
s.RunQuery(context.Background(), tenantIDs, q, writeBlock)
|
checkErr(t, s.RunQuery(context.Background(), tenantIDs, q, writeBlock))
|
||||||
|
|
||||||
expectedRowsCount := streamsPerTenant * blocksPerStream
|
expectedRowsCount := streamsPerTenant * blocksPerStream
|
||||||
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
|
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
|
||||||
|
@ -267,7 +267,7 @@ func TestStorageRunQuery(t *testing.T) {
|
||||||
rowsCountTotal.Add(uint32(len(timestamps)))
|
rowsCountTotal.Add(uint32(len(timestamps)))
|
||||||
}
|
}
|
||||||
tenantIDs := []TenantID{tenantID}
|
tenantIDs := []TenantID{tenantID}
|
||||||
s.RunQuery(context.Background(), tenantIDs, q, writeBlock)
|
checkErr(t, s.RunQuery(context.Background(), tenantIDs, q, writeBlock))
|
||||||
|
|
||||||
expectedRowsCount := blocksPerStream
|
expectedRowsCount := blocksPerStream
|
||||||
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
|
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
|
||||||
|
@ -286,7 +286,7 @@ func TestStorageRunQuery(t *testing.T) {
|
||||||
panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
|
panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
|
||||||
}
|
}
|
||||||
tenantIDs := []TenantID{tenantID}
|
tenantIDs := []TenantID{tenantID}
|
||||||
s.RunQuery(context.Background(), tenantIDs, q, writeBlock)
|
checkErr(t, s.RunQuery(context.Background(), tenantIDs, q, writeBlock))
|
||||||
})
|
})
|
||||||
t.Run("missing-time-range", func(_ *testing.T) {
|
t.Run("missing-time-range", func(_ *testing.T) {
|
||||||
minTimestamp := baseTimestamp + (rowsPerBlock+1)*1e9
|
minTimestamp := baseTimestamp + (rowsPerBlock+1)*1e9
|
||||||
|
@ -300,7 +300,7 @@ func TestStorageRunQuery(t *testing.T) {
|
||||||
panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
|
panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
|
||||||
}
|
}
|
||||||
tenantIDs := []TenantID{tenantID}
|
tenantIDs := []TenantID{tenantID}
|
||||||
s.RunQuery(context.Background(), tenantIDs, q, writeBlock)
|
checkErr(t, s.RunQuery(context.Background(), tenantIDs, q, writeBlock))
|
||||||
})
|
})
|
||||||
|
|
||||||
// Close the storage and delete its data
|
// Close the storage and delete its data
|
||||||
|
@ -308,6 +308,13 @@ func TestStorageRunQuery(t *testing.T) {
|
||||||
fs.MustRemoveAll(path)
|
fs.MustRemoveAll(path)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func checkErr(t *testing.T, err error) {
|
||||||
|
t.Helper()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected err: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func mustParseQuery(query string) *Query {
|
func mustParseQuery(query string) *Query {
|
||||||
q, err := ParseQuery(query)
|
q, err := ParseQuery(query)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
147
lib/streamaggr/rate.go
Normal file
147
lib/streamaggr/rate.go
Normal file
|
@ -0,0 +1,147 @@
|
||||||
|
package streamaggr
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||||
|
)
|
||||||
|
|
||||||
|
// rateAggrState calculates output=rate, e.g. the counter per-second change.
|
||||||
|
type rateAggrState struct {
|
||||||
|
m sync.Map
|
||||||
|
|
||||||
|
suffix string
|
||||||
|
|
||||||
|
// Time series state is dropped if no new samples are received during stalenessSecs.
|
||||||
|
stalenessSecs uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
type rateStateValue struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
lastValues map[string]*rateLastValueState
|
||||||
|
deleteDeadline uint64
|
||||||
|
deleted bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type rateLastValueState struct {
|
||||||
|
value float64
|
||||||
|
timestamp int64
|
||||||
|
deleteDeadline uint64
|
||||||
|
|
||||||
|
// total stores cumulative difference between registered values
|
||||||
|
// in the aggregation interval
|
||||||
|
total float64
|
||||||
|
// prevTimestamp stores timestamp of the last registered value
|
||||||
|
// in the previous aggregation interval
|
||||||
|
prevTimestamp int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func newRateAggrState(stalenessInterval time.Duration, suffix string) *rateAggrState {
|
||||||
|
stalenessSecs := roundDurationToSecs(stalenessInterval)
|
||||||
|
return &rateAggrState{
|
||||||
|
suffix: suffix,
|
||||||
|
stalenessSecs: stalenessSecs,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (as *rateAggrState) pushSamples(samples []pushSample) {
|
||||||
|
currentTime := fasttime.UnixTimestamp()
|
||||||
|
deleteDeadline := currentTime + as.stalenessSecs
|
||||||
|
for i := range samples {
|
||||||
|
s := &samples[i]
|
||||||
|
inputKey, outputKey := getInputOutputKey(s.key)
|
||||||
|
|
||||||
|
again:
|
||||||
|
v, ok := as.m.Load(outputKey)
|
||||||
|
if !ok {
|
||||||
|
// The entry is missing in the map. Try creating it.
|
||||||
|
v = &rateStateValue{
|
||||||
|
lastValues: make(map[string]*rateLastValueState),
|
||||||
|
}
|
||||||
|
vNew, loaded := as.m.LoadOrStore(outputKey, v)
|
||||||
|
if loaded {
|
||||||
|
// Use the entry created by a concurrent goroutine.
|
||||||
|
v = vNew
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sv := v.(*rateStateValue)
|
||||||
|
sv.mu.Lock()
|
||||||
|
deleted := sv.deleted
|
||||||
|
if !deleted {
|
||||||
|
lv, ok := sv.lastValues[inputKey]
|
||||||
|
if ok {
|
||||||
|
if s.timestamp < lv.timestamp {
|
||||||
|
// Skip out of order sample
|
||||||
|
sv.mu.Unlock()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if lv.prevTimestamp == 0 {
|
||||||
|
lv.prevTimestamp = lv.timestamp
|
||||||
|
}
|
||||||
|
if s.value >= lv.value {
|
||||||
|
lv.total += s.value - lv.value
|
||||||
|
} else {
|
||||||
|
// counter reset
|
||||||
|
lv.total += s.value
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
lv = &rateLastValueState{}
|
||||||
|
}
|
||||||
|
lv.value = s.value
|
||||||
|
lv.timestamp = s.timestamp
|
||||||
|
lv.deleteDeadline = deleteDeadline
|
||||||
|
sv.lastValues[inputKey] = lv
|
||||||
|
sv.deleteDeadline = deleteDeadline
|
||||||
|
}
|
||||||
|
sv.mu.Unlock()
|
||||||
|
if deleted {
|
||||||
|
// The entry has been deleted by the concurrent call to flushState
|
||||||
|
// Try obtaining and updating the entry again.
|
||||||
|
goto again
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (as *rateAggrState) flushState(ctx *flushCtx, resetState bool) {
|
||||||
|
currentTime := fasttime.UnixTimestamp()
|
||||||
|
currentTimeMsec := int64(currentTime) * 1000
|
||||||
|
|
||||||
|
m := &as.m
|
||||||
|
m.Range(func(k, v interface{}) bool {
|
||||||
|
sv := v.(*rateStateValue)
|
||||||
|
sv.mu.Lock()
|
||||||
|
|
||||||
|
// check for stale entries
|
||||||
|
deleted := currentTime > sv.deleteDeadline
|
||||||
|
if deleted {
|
||||||
|
// Mark the current entry as deleted
|
||||||
|
sv.deleted = deleted
|
||||||
|
sv.mu.Unlock()
|
||||||
|
m.Delete(k)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete outdated entries in sv.lastValues
|
||||||
|
var rate float64
|
||||||
|
m := sv.lastValues
|
||||||
|
for k1, v1 := range m {
|
||||||
|
if currentTime > v1.deleteDeadline {
|
||||||
|
delete(m, k1)
|
||||||
|
} else if v1.prevTimestamp > 0 {
|
||||||
|
rate += v1.total * 1000 / float64(v1.timestamp-v1.prevTimestamp)
|
||||||
|
v1.prevTimestamp = v1.timestamp
|
||||||
|
v1.total = 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if as.suffix == "rate_avg" {
|
||||||
|
// note: capture m length after deleted items were removed
|
||||||
|
rate /= float64(len(m))
|
||||||
|
}
|
||||||
|
sv.mu.Unlock()
|
||||||
|
|
||||||
|
key := k.(string)
|
||||||
|
ctx.appendSeries(key, as.suffix, currentTimeMsec, rate)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
}
|
|
@ -164,6 +164,8 @@ type Config struct {
|
||||||
//
|
//
|
||||||
// The following names are allowed:
|
// The following names are allowed:
|
||||||
//
|
//
|
||||||
|
// - rate_sum - calculates sum of rate for input counters
|
||||||
|
// - rate_avg - calculates average of rate for input counters
|
||||||
// - total - aggregates input counters
|
// - total - aggregates input counters
|
||||||
// - total_prometheus - aggregates input counters, ignoring the first sample in new time series
|
// - total_prometheus - aggregates input counters, ignoring the first sample in new time series
|
||||||
// - increase - calculates the increase over input series
|
// - increase - calculates the increase over input series
|
||||||
|
@ -530,6 +532,10 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option
|
||||||
aggrStates[i] = newTotalAggrState(stalenessInterval, true, true)
|
aggrStates[i] = newTotalAggrState(stalenessInterval, true, true)
|
||||||
case "increase_prometheus":
|
case "increase_prometheus":
|
||||||
aggrStates[i] = newTotalAggrState(stalenessInterval, true, false)
|
aggrStates[i] = newTotalAggrState(stalenessInterval, true, false)
|
||||||
|
case "rate_sum":
|
||||||
|
aggrStates[i] = newRateAggrState(stalenessInterval, "rate_sum")
|
||||||
|
case "rate_avg":
|
||||||
|
aggrStates[i] = newRateAggrState(stalenessInterval, "rate_avg")
|
||||||
case "count_series":
|
case "count_series":
|
||||||
aggrStates[i] = newCountSeriesAggrState()
|
aggrStates[i] = newCountSeriesAggrState()
|
||||||
case "count_samples":
|
case "count_samples":
|
||||||
|
|
|
@ -828,7 +828,7 @@ cpu_usage:1m_without_cpu_quantiles{quantile="1"} 90
|
||||||
`, `
|
`, `
|
||||||
foo{abc="123"} 4
|
foo{abc="123"} 4
|
||||||
bar 5
|
bar 5
|
||||||
foo{abc="123"} 8.5
|
foo{abc="123"} 8.5 10
|
||||||
foo{abc="456",de="fg"} 8
|
foo{abc="456",de="fg"} 8
|
||||||
`, `bar-1m-without-abc-count-samples{new_label="must_keep_metric_name"} 1
|
`, `bar-1m-without-abc-count-samples{new_label="must_keep_metric_name"} 1
|
||||||
bar-1m-without-abc-count-series{new_label="must_keep_metric_name"} 1
|
bar-1m-without-abc-count-series{new_label="must_keep_metric_name"} 1
|
||||||
|
@ -836,6 +836,20 @@ bar-1m-without-abc-sum-samples{new_label="must_keep_metric_name"} 5
|
||||||
foo-1m-without-abc-count-samples{new_label="must_keep_metric_name"} 2
|
foo-1m-without-abc-count-samples{new_label="must_keep_metric_name"} 2
|
||||||
foo-1m-without-abc-count-series{new_label="must_keep_metric_name"} 1
|
foo-1m-without-abc-count-series{new_label="must_keep_metric_name"} 1
|
||||||
foo-1m-without-abc-sum-samples{new_label="must_keep_metric_name"} 12.5
|
foo-1m-without-abc-sum-samples{new_label="must_keep_metric_name"} 12.5
|
||||||
|
`, "1111")
|
||||||
|
|
||||||
|
// test rate_sum and rate_avg
|
||||||
|
f(`
|
||||||
|
- interval: 1m
|
||||||
|
by: [cde]
|
||||||
|
outputs: [rate_sum, rate_avg]
|
||||||
|
`, `
|
||||||
|
foo{abc="123", cde="1"} 4
|
||||||
|
foo{abc="123", cde="1"} 8.5 10
|
||||||
|
foo{abc="456", cde="1"} 8
|
||||||
|
foo{abc="456", cde="1"} 10 10
|
||||||
|
`, `foo:1m_by_cde_rate_avg{cde="1"} 0.325
|
||||||
|
foo:1m_by_cde_rate_sum{cde="1"} 0.65
|
||||||
`, "1111")
|
`, "1111")
|
||||||
|
|
||||||
// keep_metric_names
|
// keep_metric_names
|
||||||
|
@ -979,6 +993,7 @@ func mustParsePromMetrics(s string) []prompbmarshal.TimeSeries {
|
||||||
}
|
}
|
||||||
rows.UnmarshalWithErrLogger(s, errLogger)
|
rows.UnmarshalWithErrLogger(s, errLogger)
|
||||||
var tss []prompbmarshal.TimeSeries
|
var tss []prompbmarshal.TimeSeries
|
||||||
|
now := time.Now().UnixMilli()
|
||||||
samples := make([]prompbmarshal.Sample, 0, len(rows.Rows))
|
samples := make([]prompbmarshal.Sample, 0, len(rows.Rows))
|
||||||
for _, row := range rows.Rows {
|
for _, row := range rows.Rows {
|
||||||
labels := make([]prompbmarshal.Label, 0, len(row.Tags)+1)
|
labels := make([]prompbmarshal.Label, 0, len(row.Tags)+1)
|
||||||
|
@ -994,7 +1009,7 @@ func mustParsePromMetrics(s string) []prompbmarshal.TimeSeries {
|
||||||
}
|
}
|
||||||
samples = append(samples, prompbmarshal.Sample{
|
samples = append(samples, prompbmarshal.Sample{
|
||||||
Value: row.Value,
|
Value: row.Value,
|
||||||
Timestamp: row.Timestamp,
|
Timestamp: now + row.Timestamp,
|
||||||
})
|
})
|
||||||
ts := prompbmarshal.TimeSeries{
|
ts := prompbmarshal.TimeSeries{
|
||||||
Labels: labels,
|
Labels: labels,
|
||||||
|
|
|
@ -34,13 +34,13 @@ type totalAggrState struct {
|
||||||
|
|
||||||
type totalStateValue struct {
|
type totalStateValue struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
lastValues map[string]lastValueState
|
lastValues map[string]totalLastValueState
|
||||||
total float64
|
total float64
|
||||||
deleteDeadline uint64
|
deleteDeadline uint64
|
||||||
deleted bool
|
deleted bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type lastValueState struct {
|
type totalLastValueState struct {
|
||||||
value float64
|
value float64
|
||||||
timestamp int64
|
timestamp int64
|
||||||
deleteDeadline uint64
|
deleteDeadline uint64
|
||||||
|
@ -78,7 +78,7 @@ func (as *totalAggrState) pushSamples(samples []pushSample) {
|
||||||
if !ok {
|
if !ok {
|
||||||
// The entry is missing in the map. Try creating it.
|
// The entry is missing in the map. Try creating it.
|
||||||
v = &totalStateValue{
|
v = &totalStateValue{
|
||||||
lastValues: make(map[string]lastValueState),
|
lastValues: make(map[string]totalLastValueState),
|
||||||
}
|
}
|
||||||
vNew, loaded := as.m.LoadOrStore(outputKey, v)
|
vNew, loaded := as.m.LoadOrStore(outputKey, v)
|
||||||
if loaded {
|
if loaded {
|
||||||
|
@ -97,6 +97,7 @@ func (as *totalAggrState) pushSamples(samples []pushSample) {
|
||||||
sv.mu.Unlock()
|
sv.mu.Unlock()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.value >= lv.value {
|
if s.value >= lv.value {
|
||||||
sv.total += s.value - lv.value
|
sv.total += s.value - lv.value
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in a new issue