streamaggr: made labels compressor shared (#6173)

Though labels compressor is quite resource intensive, each aggregator
and deduplicator instance has it's own compressor. Made it shared across
all aggregators to consume less resources while using multiple
aggregators.

Co-authored-by: Roman Khavronenko <hagen1778@gmail.com>
This commit is contained in:
Andrii Chubatiuk 2024-05-08 14:10:53 +03:00 committed by GitHub
parent 02851d7800
commit a9283e06a3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 23 additions and 38 deletions

View file

@ -52,6 +52,7 @@ Released at 2024-04-26
* FEATURE: [vmsingle](https://docs.victoriametrics.com/single-server-victoriametrics/): support disable or log [implicit conversions](https://docs.victoriametrics.com/metricsql/#implicit-query-conversions) for subquery with cmd-line flags `-search.disableImplicitConversion` and `-search.logImplicitConversion`. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4338). * FEATURE: [vmsingle](https://docs.victoriametrics.com/single-server-victoriametrics/): support disable or log [implicit conversions](https://docs.victoriametrics.com/metricsql/#implicit-query-conversions) for subquery with cmd-line flags `-search.disableImplicitConversion` and `-search.logImplicitConversion`. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4338).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): support data replication additionally to sharding among remote storage systems if `-remoteWrite.shardByURLReplicas=N` command-line flag is set additionally to `-remoteWrite.shardByURL` command-line flag, where `N` is desired replication factor. This allows setting up data replication among failure domains when the replication factor is smaller than the number of failure domains. See [these docs](https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6054). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): support data replication additionally to sharding among remote storage systems if `-remoteWrite.shardByURLReplicas=N` command-line flag is set additionally to `-remoteWrite.shardByURL` command-line flag, where `N` is desired replication factor. This allows setting up data replication among failure domains when the replication factor is smaller than the number of failure domains. See [these docs](https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6054).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): reduce CPU usage when [sharding among remote storage systems](https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages) is enabled. * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): reduce CPU usage when [sharding among remote storage systems](https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages) is enabled.
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): reduce memory usage during stream aggregation if multiple aggregation configs are used for the same set of data.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): support [DNS SRV](https://en.wikipedia.org/wiki/SRV_record) addresses in `-remoteWrite.url` command-line option and in scrape target urls. For example, `-remoteWrite.url=http://srv+victoria-metrics/api/v1/write` automatically resolves the `victoria-metrics` DNS SRV to a list of hostnames with TCP ports and then sends the collected metrics to these TCP addresses. See [these docs](https://docs.victoriametrics.com/vmagent/#srv-urls) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6053). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): support [DNS SRV](https://en.wikipedia.org/wiki/SRV_record) addresses in `-remoteWrite.url` command-line option and in scrape target urls. For example, `-remoteWrite.url=http://srv+victoria-metrics/api/v1/write` automatically resolves the `victoria-metrics` DNS SRV to a list of hostnames with TCP ports and then sends the collected metrics to these TCP addresses. See [these docs](https://docs.victoriametrics.com/vmagent/#srv-urls) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6053).
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): allow skipping first N aggregation intervals via cmd-line flag `-streamAggr.ignoreFirstIntervals` for [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) or `-remoteWrite.streamAggr.ignoreFirstIntervals` for [vmagent](https://docs.victoriametrics.com/vmagent/). See more details [here](https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start). * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): allow skipping first N aggregation intervals via cmd-line flag `-streamAggr.ignoreFirstIntervals` for [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) or `-remoteWrite.streamAggr.ignoreFirstIntervals` for [vmagent](https://docs.victoriametrics.com/vmagent/). See more details [here](https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start).
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): support automatic discovering and load balancing for TCP addresses behind DNS SRV addresses. These addresses can be put inside `url_prefix` urls in the form `http://srv+addr/path`, where the `addr` is the [DNS SRV](https://en.wikipedia.org/wiki/SRV_record) address, which is automatically resolved to hostnames with TCP ports. See [these docs](https://docs.victoriametrics.com/vmauth/#srv-urls) for details. * FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): support automatic discovering and load balancing for TCP addresses behind DNS SRV addresses. These addresses can be put inside `url_prefix` urls in the form `http://srv+addr/path`, where the `addr` is the [DNS SRV](https://en.wikipedia.org/wiki/SRV_record) address, which is automatically resolved to hostnames with TCP ports. See [these docs](https://docs.victoriametrics.com/vmauth/#srv-urls) for details.

View file

@ -7,7 +7,6 @@ import (
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
) )
func BenchmarkDedupAggr(b *testing.B) { func BenchmarkDedupAggr(b *testing.B) {
@ -50,7 +49,6 @@ func benchmarkDedupAggr(b *testing.B, samplesPerPush int) {
} }
func newBenchSamples(count int) []pushSample { func newBenchSamples(count int) []pushSample {
var lc promutils.LabelsCompressor
labels := []prompbmarshal.Label{ labels := []prompbmarshal.Label{
{ {
Name: "app", Name: "app",
@ -82,7 +80,7 @@ func newBenchSamples(count int) []pushSample {
Name: "app", Name: "app",
Value: fmt.Sprintf("instance-%d", i), Value: fmt.Sprintf("instance-%d", i),
}) })
keyBuf = compressLabels(keyBuf[:0], &lc, labels[:labelsLen], labels[labelsLen:]) keyBuf = compressLabels(keyBuf[:0], labels[:labelsLen], labels[labelsLen:])
sample.key = string(keyBuf) sample.key = string(keyBuf)
sample.value = float64(i) sample.value = float64(i)
} }

View file

@ -15,7 +15,6 @@ import (
// Deduplicator deduplicates samples per each time series. // Deduplicator deduplicates samples per each time series.
type Deduplicator struct { type Deduplicator struct {
da *dedupAggr da *dedupAggr
lc promutils.LabelsCompressor
dropLabels []string dropLabels []string
@ -38,8 +37,7 @@ type Deduplicator struct {
// MustStop must be called on the returned deduplicator in order to free up occupied resources. // MustStop must be called on the returned deduplicator in order to free up occupied resources.
func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels []string) *Deduplicator { func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels []string) *Deduplicator {
d := &Deduplicator{ d := &Deduplicator{
da: newDedupAggr(), da: newDedupAggr(),
dropLabels: dropLabels, dropLabels: dropLabels,
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
@ -54,13 +52,6 @@ func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels
return float64(d.da.itemsCount()) return float64(d.da.itemsCount())
}) })
_ = ms.NewGauge(`vm_streamaggr_labels_compressor_size_bytes`, func() float64 {
return float64(d.lc.SizeBytes())
})
_ = ms.NewGauge(`vm_streamaggr_labels_compressor_items_count`, func() float64 {
return float64(d.lc.ItemsCount())
})
d.dedupFlushDuration = ms.GetOrCreateHistogram(`vm_streamaggr_dedup_flush_duration_seconds`) d.dedupFlushDuration = ms.GetOrCreateHistogram(`vm_streamaggr_dedup_flush_duration_seconds`)
d.dedupFlushTimeouts = ms.GetOrCreateCounter(`vm_streamaggr_dedup_flush_timeouts_total`) d.dedupFlushTimeouts = ms.GetOrCreateCounter(`vm_streamaggr_dedup_flush_timeouts_total`)
@ -103,7 +94,7 @@ func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) {
} }
labels.Sort() labels.Sort()
buf = d.lc.Compress(buf[:0], labels.Labels) buf = lc.Compress(buf[:0], labels.Labels)
key := bytesutil.InternBytes(buf) key := bytesutil.InternBytes(buf)
for _, s := range ts.Samples { for _, s := range ts.Samples {
pss = append(pss, pushSample{ pss = append(pss, pushSample{
@ -155,7 +146,7 @@ func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration) {
samples := ctx.samples samples := ctx.samples
for _, ps := range pss { for _, ps := range pss {
labelsLen := len(labels) labelsLen := len(labels)
labels = decompressLabels(labels, &d.lc, ps.key) labels = decompressLabels(labels, ps.key)
samplesLen := len(samples) samplesLen := len(samples)
samples = append(samples, prompbmarshal.Sample{ samples = append(samples, prompbmarshal.Sample{

View file

@ -45,6 +45,19 @@ var supportedOutputs = []string{
"quantiles(phi1, ..., phiN)", "quantiles(phi1, ..., phiN)",
} }
var (
// lc contains information about all compressed labels for streaming aggregation
lc promutils.LabelsCompressor
_ = metrics.NewGauge(`vm_streamaggr_labels_compressor_size_bytes`, func() float64 {
return float64(lc.SizeBytes())
})
_ = metrics.NewGauge(`vm_streamaggr_labels_compressor_items_count`, func() float64 {
return float64(lc.ItemsCount())
})
)
// LoadFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data. // LoadFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data.
// //
// opts can contain additional options. If opts is nil, then default options are used. // opts can contain additional options. If opts is nil, then default options are used.
@ -269,21 +282,6 @@ func newAggregatorsFromData(data []byte, pushFunc PushFunc, opts *Options) (*Agg
return float64(n) return float64(n)
}) })
_ = ms.NewGauge(`vm_streamaggr_labels_compressor_size_bytes`, func() float64 {
n := uint64(0)
for _, aggr := range as {
n += aggr.lc.SizeBytes()
}
return float64(n)
})
_ = ms.NewGauge(`vm_streamaggr_labels_compressor_items_count`, func() float64 {
n := uint64(0)
for _, aggr := range as {
n += aggr.lc.ItemsCount()
}
return float64(n)
})
metrics.RegisterSet(ms) metrics.RegisterSet(ms)
return &Aggregators{ return &Aggregators{
as: as, as: as,
@ -361,9 +359,6 @@ type aggregator struct {
// aggrStates contains aggregate states for the given outputs // aggrStates contains aggregate states for the given outputs
aggrStates []aggrState aggrStates []aggrState
// lc is used for compressing series keys before passing them to dedupAggr and aggrState
lc promutils.LabelsCompressor
// minTimestamp is used for ignoring old samples when ignoreOldSamples is set // minTimestamp is used for ignoring old samples when ignoreOldSamples is set
minTimestamp atomic.Int64 minTimestamp atomic.Int64
@ -822,7 +817,7 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
outputLabels.Labels = append(outputLabels.Labels, labels.Labels...) outputLabels.Labels = append(outputLabels.Labels, labels.Labels...)
} }
buf = compressLabels(buf[:0], &a.lc, inputLabels.Labels, outputLabels.Labels) buf = compressLabels(buf[:0], inputLabels.Labels, outputLabels.Labels)
key := bytesutil.InternBytes(buf) key := bytesutil.InternBytes(buf)
for _, sample := range ts.Samples { for _, sample := range ts.Samples {
if math.IsNaN(sample.Value) { if math.IsNaN(sample.Value) {
@ -850,7 +845,7 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
} }
} }
func compressLabels(dst []byte, lc *promutils.LabelsCompressor, inputLabels, outputLabels []prompbmarshal.Label) []byte { func compressLabels(dst []byte, inputLabels, outputLabels []prompbmarshal.Label) []byte {
bb := bbPool.Get() bb := bbPool.Get()
bb.B = lc.Compress(bb.B, inputLabels) bb.B = lc.Compress(bb.B, inputLabels)
dst = encoding.MarshalVarUint64(dst, uint64(len(bb.B))) dst = encoding.MarshalVarUint64(dst, uint64(len(bb.B)))
@ -860,7 +855,7 @@ func compressLabels(dst []byte, lc *promutils.LabelsCompressor, inputLabels, out
return dst return dst
} }
func decompressLabels(dst []prompbmarshal.Label, lc *promutils.LabelsCompressor, key string) []prompbmarshal.Label { func decompressLabels(dst []prompbmarshal.Label, key string) []prompbmarshal.Label {
return lc.Decompress(dst, bytesutil.ToUnsafeBytes(key)) return lc.Decompress(dst, bytesutil.ToUnsafeBytes(key))
} }
@ -1035,7 +1030,7 @@ func (ctx *flushCtx) flushSeries() {
func (ctx *flushCtx) appendSeries(key, suffix string, timestamp int64, value float64) { func (ctx *flushCtx) appendSeries(key, suffix string, timestamp int64, value float64) {
labelsLen := len(ctx.labels) labelsLen := len(ctx.labels)
samplesLen := len(ctx.samples) samplesLen := len(ctx.samples)
ctx.labels = decompressLabels(ctx.labels, &ctx.a.lc, key) ctx.labels = decompressLabels(ctx.labels, key)
if !ctx.a.keepMetricNames { if !ctx.a.keepMetricNames {
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix) ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix)
} }
@ -1058,7 +1053,7 @@ func (ctx *flushCtx) appendSeries(key, suffix string, timestamp int64, value flo
func (ctx *flushCtx) appendSeriesWithExtraLabel(key, suffix string, timestamp int64, value float64, extraName, extraValue string) { func (ctx *flushCtx) appendSeriesWithExtraLabel(key, suffix string, timestamp int64, value float64, extraName, extraValue string) {
labelsLen := len(ctx.labels) labelsLen := len(ctx.labels)
samplesLen := len(ctx.samples) samplesLen := len(ctx.samples)
ctx.labels = decompressLabels(ctx.labels, &ctx.a.lc, key) ctx.labels = decompressLabels(ctx.labels, key)
if !ctx.a.keepMetricNames { if !ctx.a.keepMetricNames {
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix) ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix)
} }