From 185fac03b3a6a8a98e60cd2c8eca7d96e500ebf8 Mon Sep 17 00:00:00 2001 From: Andrii Chubatiuk Date: Thu, 6 Jun 2024 15:06:11 +0300 Subject: [PATCH] lib/streamaggr: metrics to track dropped, nan samples and samples lag (#6358) ### Describe Your Changes Added streamaggr metrics to: - `vm_streamaggr_samples_lag_seconds` - samples lag - `vm_streamaggr_ignored_samples_total{reason="nan"}` - ignored NaN samples - `vm_streamaggr_ignored_samples_total{reason="too_old"}` - ignored old samples --- docs/CHANGELOG.md | 1 + lib/streamaggr/streamaggr.go | 16 ++++++++++++++++ 2 files changed, 17 insertions(+) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index e858090ba..257493fd7 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -59,6 +59,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup/), [vmrestore](https://docs.victoriametrics.com/vmrestore/), [vmbackupmanager](https://docs.victoriametrics.com/vmbackupmanager/): add `-s3TLSInsecureSkipVerify` command-line flag for skipping TLS certificates verification when connecting to S3 endpoint. * FEATURE: expose metric `vm_indexdb_items_dropped_total` to track the number of IndexDB records that had to be dropped during ingestion. The reason of dropping the record will be annotated in `reason` label of the exposed metric. This change also comes with a new [alerting rule](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-health.yml) to track changes of this metric. * FEATURE: [alerts-health](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-health.yml): add new alerting rules `TooLongLabelValues` and `TooLongLabelNames` to notify about truncation of label values or names respectively. +* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): expose `vm_streamaggr_ignored_samples_total` [counters](https://docs.victoriametrics.com/keyconcepts/#counter) at [`/metrics` page](https://docs.victoriametrics.com/#monitoring), which can be used for detecting amount of too old or NaN valued ignored samples. Expose also `vm_streamaggr_samples_lag_seconds` [histogram](https://docs.victoriametrics.com/keyconcepts/#histogram) to monitor aggregated samples lag. * BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix bug that prevents the first query trace from expanding on click event. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6186). The issue was introduced in [v1.100.0](https://docs.victoriametrics.com/changelog/#v11000) release. * BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix calendar display when `UTC+00:00` timezone is set. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6239). diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 6657b2e04..625a0b570 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -389,9 +389,12 @@ type aggregator struct { flushDuration *metrics.Histogram dedupFlushDuration *metrics.Histogram + samplesLag *metrics.Histogram flushTimeouts *metrics.Counter dedupFlushTimeouts *metrics.Counter + ignoredOldSamples *metrics.Counter + ignoredNanSamples *metrics.Counter } type aggrState interface { @@ -610,9 +613,12 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option flushDuration: ms.GetOrCreateHistogram(`vm_streamaggr_flush_duration_seconds`), dedupFlushDuration: ms.GetOrCreateHistogram(`vm_streamaggr_dedup_flush_duration_seconds`), + samplesLag: ms.GetOrCreateHistogram(`vm_streamaggr_samples_lag_seconds`), flushTimeouts: ms.GetOrCreateCounter(`vm_streamaggr_flush_timeouts_total`), dedupFlushTimeouts: ms.GetOrCreateCounter(`vm_streamaggr_dedup_flush_timeouts_total`), + ignoredNanSamples: ms.GetOrCreateCounter(`vm_streamaggr_ignored_samples_total{reason="nan"}`), + ignoredOldSamples: ms.GetOrCreateCounter(`vm_streamaggr_ignored_samples_total{reason="too_old"}`), } if dedupInterval > 0 { a.da = newDedupAggr() @@ -798,6 +804,7 @@ func (a *aggregator) MustStop() { // Push pushes tss to a. func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { + now := time.Now().UnixMilli() ctx := getPushCtx() defer putPushCtx(ctx) @@ -810,6 +817,8 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { dropLabels := a.dropInputLabels ignoreOldSamples := a.ignoreOldSamples minTimestamp := a.minTimestamp.Load() + var totalLag int64 + var totalSamples int for idx, ts := range tss { if !a.match.Match(ts.Labels) { continue @@ -840,13 +849,17 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { key := bytesutil.InternBytes(buf) for _, sample := range ts.Samples { if math.IsNaN(sample.Value) { + a.ignoredNanSamples.Inc() // Skip NaN values continue } if ignoreOldSamples && sample.Timestamp < minTimestamp { + a.ignoredOldSamples.Inc() // Skip old samples outside the current aggregation interval continue } + totalLag += now - sample.Timestamp + totalSamples++ samples = append(samples, pushSample{ key: key, value: sample.Value, @@ -854,6 +867,9 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { }) } } + if totalSamples > 0 { + a.samplesLag.Update(float64(totalLag/int64(totalSamples)) / 1000) + } ctx.samples = samples ctx.buf = buf