lib/streamaggr: metrics to track dropped, nan samples and samples lag (#6358)

### Describe Your Changes

Added streamaggr metrics to:
 - `vm_streamaggr_samples_lag_seconds` - samples lag
- `vm_streamaggr_ignored_samples_total{reason="nan"}` - ignored NaN
samples
- `vm_streamaggr_ignored_samples_total{reason="too_old"}` - ignored old
samples

(cherry picked from commit 185fac03b3)
This commit is contained in:
Andrii Chubatiuk 2024-06-06 15:06:11 +03:00 committed by hagen1778
parent 53382ae837
commit 93cd08f15f
No known key found for this signature in database
GPG key ID: 3BF75F3741CA9640
2 changed files with 17 additions and 0 deletions

View file

@ -59,6 +59,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup/), [vmrestore](https://docs.victoriametrics.com/vmrestore/), [vmbackupmanager](https://docs.victoriametrics.com/vmbackupmanager/): add `-s3TLSInsecureSkipVerify` command-line flag for skipping TLS certificates verification when connecting to S3 endpoint.
* FEATURE: expose metric `vm_indexdb_items_dropped_total` to track the number of IndexDB records that had to be dropped during ingestion. The reason of dropping the record will be annotated in `reason` label of the exposed metric. This change also comes with a new [alerting rule](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-health.yml) to track changes of this metric.
* FEATURE: [alerts-health](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-health.yml): add new alerting rules `TooLongLabelValues` and `TooLongLabelNames` to notify about truncation of label values or names respectively.
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): expose `vm_streamaggr_ignored_samples_total` [counters](https://docs.victoriametrics.com/keyconcepts/#counter) at [`/metrics` page](https://docs.victoriametrics.com/#monitoring), which can be used for detecting amount of too old or NaN valued ignored samples. Expose also `vm_streamaggr_samples_lag_seconds` [histogram](https://docs.victoriametrics.com/keyconcepts/#histogram) to monitor aggregated samples lag.
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix bug that prevents the first query trace from expanding on click event. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6186). The issue was introduced in [v1.100.0](https://docs.victoriametrics.com/changelog/#v11000) release.
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix calendar display when `UTC+00:00` timezone is set. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6239).

View file

@ -389,9 +389,12 @@ type aggregator struct {
flushDuration *metrics.Histogram
dedupFlushDuration *metrics.Histogram
samplesLag *metrics.Histogram
flushTimeouts *metrics.Counter
dedupFlushTimeouts *metrics.Counter
ignoredOldSamples *metrics.Counter
ignoredNanSamples *metrics.Counter
}
type aggrState interface {
@ -610,9 +613,12 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option
flushDuration: ms.GetOrCreateHistogram(`vm_streamaggr_flush_duration_seconds`),
dedupFlushDuration: ms.GetOrCreateHistogram(`vm_streamaggr_dedup_flush_duration_seconds`),
samplesLag: ms.GetOrCreateHistogram(`vm_streamaggr_samples_lag_seconds`),
flushTimeouts: ms.GetOrCreateCounter(`vm_streamaggr_flush_timeouts_total`),
dedupFlushTimeouts: ms.GetOrCreateCounter(`vm_streamaggr_dedup_flush_timeouts_total`),
ignoredNanSamples: ms.GetOrCreateCounter(`vm_streamaggr_ignored_samples_total{reason="nan"}`),
ignoredOldSamples: ms.GetOrCreateCounter(`vm_streamaggr_ignored_samples_total{reason="too_old"}`),
}
if dedupInterval > 0 {
a.da = newDedupAggr()
@ -798,6 +804,7 @@ func (a *aggregator) MustStop() {
// Push pushes tss to a.
func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
now := time.Now().UnixMilli()
ctx := getPushCtx()
defer putPushCtx(ctx)
@ -810,6 +817,8 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
dropLabels := a.dropInputLabels
ignoreOldSamples := a.ignoreOldSamples
minTimestamp := a.minTimestamp.Load()
var totalLag int64
var totalSamples int
for idx, ts := range tss {
if !a.match.Match(ts.Labels) {
continue
@ -840,13 +849,17 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
key := bytesutil.InternBytes(buf)
for _, sample := range ts.Samples {
if math.IsNaN(sample.Value) {
a.ignoredNanSamples.Inc()
// Skip NaN values
continue
}
if ignoreOldSamples && sample.Timestamp < minTimestamp {
a.ignoredOldSamples.Inc()
// Skip old samples outside the current aggregation interval
continue
}
totalLag += now - sample.Timestamp
totalSamples++
samples = append(samples, pushSample{
key: key,
value: sample.Value,
@ -854,6 +867,9 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
})
}
}
if totalSamples > 0 {
a.samplesLag.Update(float64(totalLag/int64(totalSamples)) / 1000)
}
ctx.samples = samples
ctx.buf = buf