mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/streamaggr: metrics to track dropped, nan samples and samples lag (#6358)
### Describe Your Changes Added streamaggr metrics to: - `vm_streamaggr_samples_lag_seconds` - samples lag - `vm_streamaggr_ignored_samples_total{reason="nan"}` - ignored NaN samples - `vm_streamaggr_ignored_samples_total{reason="too_old"}` - ignored old samples
This commit is contained in:
parent
55d8379ae6
commit
185fac03b3
2 changed files with 17 additions and 0 deletions
|
@ -59,6 +59,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
|
|||
* FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup/), [vmrestore](https://docs.victoriametrics.com/vmrestore/), [vmbackupmanager](https://docs.victoriametrics.com/vmbackupmanager/): add `-s3TLSInsecureSkipVerify` command-line flag for skipping TLS certificates verification when connecting to S3 endpoint.
|
||||
* FEATURE: expose metric `vm_indexdb_items_dropped_total` to track the number of IndexDB records that had to be dropped during ingestion. The reason of dropping the record will be annotated in `reason` label of the exposed metric. This change also comes with a new [alerting rule](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-health.yml) to track changes of this metric.
|
||||
* FEATURE: [alerts-health](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-health.yml): add new alerting rules `TooLongLabelValues` and `TooLongLabelNames` to notify about truncation of label values or names respectively.
|
||||
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): expose `vm_streamaggr_ignored_samples_total` [counters](https://docs.victoriametrics.com/keyconcepts/#counter) at [`/metrics` page](https://docs.victoriametrics.com/#monitoring), which can be used for detecting amount of too old or NaN valued ignored samples. Expose also `vm_streamaggr_samples_lag_seconds` [histogram](https://docs.victoriametrics.com/keyconcepts/#histogram) to monitor aggregated samples lag.
|
||||
|
||||
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix bug that prevents the first query trace from expanding on click event. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6186). The issue was introduced in [v1.100.0](https://docs.victoriametrics.com/changelog/#v11000) release.
|
||||
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix calendar display when `UTC+00:00` timezone is set. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6239).
|
||||
|
|
|
@ -389,9 +389,12 @@ type aggregator struct {
|
|||
|
||||
flushDuration *metrics.Histogram
|
||||
dedupFlushDuration *metrics.Histogram
|
||||
samplesLag *metrics.Histogram
|
||||
|
||||
flushTimeouts *metrics.Counter
|
||||
dedupFlushTimeouts *metrics.Counter
|
||||
ignoredOldSamples *metrics.Counter
|
||||
ignoredNanSamples *metrics.Counter
|
||||
}
|
||||
|
||||
type aggrState interface {
|
||||
|
@ -610,9 +613,12 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option
|
|||
|
||||
flushDuration: ms.GetOrCreateHistogram(`vm_streamaggr_flush_duration_seconds`),
|
||||
dedupFlushDuration: ms.GetOrCreateHistogram(`vm_streamaggr_dedup_flush_duration_seconds`),
|
||||
samplesLag: ms.GetOrCreateHistogram(`vm_streamaggr_samples_lag_seconds`),
|
||||
|
||||
flushTimeouts: ms.GetOrCreateCounter(`vm_streamaggr_flush_timeouts_total`),
|
||||
dedupFlushTimeouts: ms.GetOrCreateCounter(`vm_streamaggr_dedup_flush_timeouts_total`),
|
||||
ignoredNanSamples: ms.GetOrCreateCounter(`vm_streamaggr_ignored_samples_total{reason="nan"}`),
|
||||
ignoredOldSamples: ms.GetOrCreateCounter(`vm_streamaggr_ignored_samples_total{reason="too_old"}`),
|
||||
}
|
||||
if dedupInterval > 0 {
|
||||
a.da = newDedupAggr()
|
||||
|
@ -798,6 +804,7 @@ func (a *aggregator) MustStop() {
|
|||
|
||||
// Push pushes tss to a.
|
||||
func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
|
||||
now := time.Now().UnixMilli()
|
||||
ctx := getPushCtx()
|
||||
defer putPushCtx(ctx)
|
||||
|
||||
|
@ -810,6 +817,8 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
|
|||
dropLabels := a.dropInputLabels
|
||||
ignoreOldSamples := a.ignoreOldSamples
|
||||
minTimestamp := a.minTimestamp.Load()
|
||||
var totalLag int64
|
||||
var totalSamples int
|
||||
for idx, ts := range tss {
|
||||
if !a.match.Match(ts.Labels) {
|
||||
continue
|
||||
|
@ -840,13 +849,17 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
|
|||
key := bytesutil.InternBytes(buf)
|
||||
for _, sample := range ts.Samples {
|
||||
if math.IsNaN(sample.Value) {
|
||||
a.ignoredNanSamples.Inc()
|
||||
// Skip NaN values
|
||||
continue
|
||||
}
|
||||
if ignoreOldSamples && sample.Timestamp < minTimestamp {
|
||||
a.ignoredOldSamples.Inc()
|
||||
// Skip old samples outside the current aggregation interval
|
||||
continue
|
||||
}
|
||||
totalLag += now - sample.Timestamp
|
||||
totalSamples++
|
||||
samples = append(samples, pushSample{
|
||||
key: key,
|
||||
value: sample.Value,
|
||||
|
@ -854,6 +867,9 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
|
|||
})
|
||||
}
|
||||
}
|
||||
if totalSamples > 0 {
|
||||
a.samplesLag.Update(float64(totalLag/int64(totalSamples)) / 1000)
|
||||
}
|
||||
ctx.samples = samples
|
||||
ctx.buf = buf
|
||||
|
||||
|
|
Loading…
Reference in a new issue