VictoriaMetrics/lib/streamaggr/streamaggr.go
Aliaksandr Valialkin 7acc54025e
Revert "lib/streamaggr: discard samples with timestamps outside of aggregation interval (#4199)"
This reverts commit 9e99f2f5b3.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4068

Reason for revert: this breaks valid use cases:

- If timestamps aren't specified in the incoming samples on purpose. For example, if stream aggregation is used
  as StatsD replacement. StatsD protocol has no timestamp concept for incoming samples.
  See https://github.com/b/statsd_spec

- If all the samples must be aggregated, even if they contain stale timestamps.
  for example, if the stream aggregation produces some counter of some events,
  it may be better to count all the events even if they were delayed before
  being ingested into VictoriaMetrics.

Is is also unclear how to determine whether the sample becomes stale.
For example, if the aggregation interval equals to 1h, and the previous
aggregation cycle just finished 10 minutes ago, what to do with the newly
incoming sample with the timestamp 30 minutes older than the current time?
The answer highly depends on the context, so it is unsafe to uncoditionally
use a single logic for dropping the old samples here.
2023-05-08 21:50:19 -07:00

776 lines
22 KiB
Go

package streamaggr
import (
"encoding/json"
"fmt"
"math"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"gopkg.in/yaml.v2"
)
var supportedOutputs = []string{
"total",
"increase",
"count_series",
"count_samples",
"sum_samples",
"last",
"min",
"max",
"avg",
"stddev",
"stdvar",
"histogram_bucket",
"quantiles(phi1, ..., phiN)",
}
// LoadFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data.
//
// If dedupInterval > 0, then the input samples are de-duplicated before being aggregated,
// e.g. only the last sample per each time series per each dedupInterval is aggregated.
//
// The returned Aggregators must be stopped with MustStop() when no longer needed.
func LoadFromFile(path string, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) {
data, err := fs.ReadFileOrHTTP(path)
if err != nil {
return nil, fmt.Errorf("cannot load aggregators: %w", err)
}
as, err := NewAggregatorsFromData(data, pushFunc, dedupInterval)
if err != nil {
return nil, fmt.Errorf("cannot initialize aggregators from %q: %w", path, err)
}
return as, nil
}
// NewAggregatorsFromData initializes Aggregators from the given data and uses the given pushFunc for pushing the aggregated data.
//
// If dedupInterval > 0, then the input samples are de-duplicated before being aggregated,
// e.g. only the last sample per each time series per each dedupInterval is aggregated.
//
// The returned Aggregators must be stopped with MustStop() when no longer needed.
func NewAggregatorsFromData(data []byte, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) {
var cfgs []*Config
if err := yaml.UnmarshalStrict(data, &cfgs); err != nil {
return nil, fmt.Errorf("cannot parse stream aggregation config: %w", err)
}
return NewAggregators(cfgs, pushFunc, dedupInterval)
}
// Config is a configuration for a single stream aggregation.
type Config struct {
// Match is a label selector for filtering time series for the given selector.
//
// If the match isn't set, then all the input time series are processed.
Match *promrelabel.IfExpression `yaml:"match,omitempty"`
// Interval is the interval between aggregations.
Interval string `yaml:"interval"`
// Outputs is a list of output aggregate functions to produce.
//
// The following names are allowed:
//
// - total - aggregates input counters
// - increase - counts the increase over input counters
// - count_series - counts the input series
// - count_samples - counts the input samples
// - sum_samples - sums the input samples
// - last - the last biggest sample value
// - min - the minimum sample value
// - max - the maximum sample value
// - avg - the average value across all the samples
// - stddev - standard deviation across all the samples
// - stdvar - standard variance across all the samples
// - histogram_bucket - creates VictoriaMetrics histogram for input samples
// - quantiles(phi1, ..., phiN) - quantiles' estimation for phi in the range [0..1]
//
// The output time series will have the following names:
//
// input_name:aggr_<interval>_<output>
//
Outputs []string `yaml:"outputs"`
// By is an optional list of labels for grouping input series.
//
// See also Without.
//
// If neither By nor Without are set, then the Outputs are calculated
// individually per each input time series.
By []string `yaml:"by,omitempty"`
// Without is an optional list of labels, which must be excluded when grouping input series.
//
// See also By.
//
// If neither By nor Without are set, then the Outputs are calculated
// individually per each input time series.
Without []string `yaml:"without,omitempty"`
// InputRelabelConfigs is an optional relabeling rules, which are applied on the input
// before aggregation.
InputRelabelConfigs []promrelabel.RelabelConfig `yaml:"input_relabel_configs,omitempty"`
// OutputRelabelConfigs is an optional relabeling rules, which are applied
// on the aggregated output before being sent to remote storage.
OutputRelabelConfigs []promrelabel.RelabelConfig `yaml:"output_relabel_configs,omitempty"`
}
// Aggregators aggregates metrics passed to Push and calls pushFunc for aggregate data.
type Aggregators struct {
as []*aggregator
// configData contains marshaled configs passed to NewAggregators().
// It is used in Equal() for comparing Aggregators.
configData []byte
}
// NewAggregators creates Aggregators from the given cfgs.
//
// pushFunc is called when the aggregated data must be flushed.
//
// If dedupInterval > 0, then the input samples are de-duplicated before being aggregated,
// e.g. only the last sample per each time series per each dedupInterval is aggregated.
//
// MustStop must be called on the returned Aggregators when they are no longer needed.
func NewAggregators(cfgs []*Config, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) {
if len(cfgs) == 0 {
return nil, nil
}
as := make([]*aggregator, len(cfgs))
for i, cfg := range cfgs {
a, err := newAggregator(cfg, pushFunc, dedupInterval)
if err != nil {
// Stop already initialized aggregators before returning the error.
for _, a := range as[:i] {
a.MustStop()
}
return nil, fmt.Errorf("cannot initialize aggregator #%d: %w", i, err)
}
as[i] = a
}
configData, err := json.Marshal(cfgs)
if err != nil {
logger.Panicf("BUG: cannot marshal the provided configs: %s", err)
}
return &Aggregators{
as: as,
configData: configData,
}, nil
}
// MustStop stops a.
func (a *Aggregators) MustStop() {
if a == nil {
return
}
for _, aggr := range a.as {
aggr.MustStop()
}
}
// Equal returns true if a and b are initialized from identical configs.
func (a *Aggregators) Equal(b *Aggregators) bool {
if a == nil || b == nil {
return a == nil && b == nil
}
return string(a.configData) == string(b.configData)
}
// Push pushes tss to a.
func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries) {
if a == nil {
return
}
for _, aggr := range a.as {
aggr.Push(tss)
}
}
// aggregator aggregates input series according to the config passed to NewAggregator
type aggregator struct {
match *promrelabel.IfExpression
inputRelabeling *promrelabel.ParsedConfigs
outputRelabeling *promrelabel.ParsedConfigs
by []string
without []string
aggregateOnlyByTime bool
// dedupAggr is set to non-nil if input samples must be de-duplicated according
// to the dedupInterval passed to newAggregator().
dedupAggr *lastAggrState
// aggrStates contains aggregate states for the given outputs
aggrStates []aggrState
pushFunc PushFunc
// suffix contains a suffix, which should be added to aggregate metric names
//
// It contains the interval, labels in (by, without), plus output name.
// For example, foo_bar metric name is transformed to foo_bar:1m_by_job
// for `interval: 1m`, `by: [job]`
suffix string
wg sync.WaitGroup
stopCh chan struct{}
}
type aggrState interface {
pushSample(inputKey, outputKey string, value float64)
appendSeriesForFlush(ctx *flushCtx)
}
// PushFunc is called by Aggregators when it needs to push its state to metrics storage
type PushFunc func(tss []prompbmarshal.TimeSeries)
// newAggregator creates new aggregator for the given cfg, which pushes the aggregate data to pushFunc.
//
// If dedupInterval > 0, then the input samples are de-duplicated before being aggregated,
// e.g. only the last sample per each time series per each dedupInterval is aggregated.
//
// The returned aggregator must be stopped when no longer needed by calling MustStop().
func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration) (*aggregator, error) {
// check cfg.Interval
interval, err := time.ParseDuration(cfg.Interval)
if err != nil {
return nil, fmt.Errorf("cannot parse `interval: %q`: %w", cfg.Interval, err)
}
if interval <= time.Second {
return nil, fmt.Errorf("the minimum supported aggregation interval is 1s; got %s", interval)
}
// initialize input_relabel_configs and output_relabel_configs
inputRelabeling, err := promrelabel.ParseRelabelConfigs(cfg.InputRelabelConfigs)
if err != nil {
return nil, fmt.Errorf("cannot parse input_relabel_configs: %w", err)
}
outputRelabeling, err := promrelabel.ParseRelabelConfigs(cfg.OutputRelabelConfigs)
if err != nil {
return nil, fmt.Errorf("cannot parse output_relabel_configs: %w", err)
}
// check by and without lists
by := sortAndRemoveDuplicates(cfg.By)
without := sortAndRemoveDuplicates(cfg.Without)
if len(by) > 0 && len(without) > 0 {
return nil, fmt.Errorf("`by: %s` and `without: %s` lists cannot be set simultaneously", by, without)
}
aggregateOnlyByTime := (len(by) == 0 && len(without) == 0)
if !aggregateOnlyByTime && len(without) == 0 {
by = addMissingUnderscoreName(by)
}
// initialize outputs list
if len(cfg.Outputs) == 0 {
return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s; "+
"see https://docs.victoriametrics.com/vmagent.html#stream-aggregation", supportedOutputs)
}
aggrStates := make([]aggrState, len(cfg.Outputs))
for i, output := range cfg.Outputs {
if strings.HasPrefix(output, "quantiles(") {
if !strings.HasSuffix(output, ")") {
return nil, fmt.Errorf("missing closing brace for `quantiles()` output")
}
argsStr := output[len("quantiles(") : len(output)-1]
if len(argsStr) == 0 {
return nil, fmt.Errorf("`quantiles()` must contain at least one phi")
}
args := strings.Split(argsStr, ",")
phis := make([]float64, len(args))
for j, arg := range args {
arg = strings.TrimSpace(arg)
phi, err := strconv.ParseFloat(arg, 64)
if err != nil {
return nil, fmt.Errorf("cannot parse phi=%q for quantiles(%s): %w", arg, argsStr, err)
}
if phi < 0 || phi > 1 {
return nil, fmt.Errorf("phi inside quantiles(%s) must be in the range [0..1]; got %v", argsStr, phi)
}
phis[j] = phi
}
aggrStates[i] = newQuantilesAggrState(phis)
continue
}
switch output {
case "total":
aggrStates[i] = newTotalAggrState(interval)
case "increase":
aggrStates[i] = newIncreaseAggrState(interval)
case "count_series":
aggrStates[i] = newCountSeriesAggrState()
case "count_samples":
aggrStates[i] = newCountSamplesAggrState()
case "sum_samples":
aggrStates[i] = newSumSamplesAggrState()
case "last":
aggrStates[i] = newLastAggrState()
case "min":
aggrStates[i] = newMinAggrState()
case "max":
aggrStates[i] = newMaxAggrState()
case "avg":
aggrStates[i] = newAvgAggrState()
case "stddev":
aggrStates[i] = newStddevAggrState()
case "stdvar":
aggrStates[i] = newStdvarAggrState()
case "histogram_bucket":
aggrStates[i] = newHistogramBucketAggrState(interval)
default:
return nil, fmt.Errorf("unsupported output=%q; supported values: %s; "+
"see https://docs.victoriametrics.com/vmagent.html#stream-aggregation", output, supportedOutputs)
}
}
// initialize suffix to add to metric names after aggregation
suffix := ":" + cfg.Interval
if labels := removeUnderscoreName(by); len(labels) > 0 {
suffix += fmt.Sprintf("_by_%s", strings.Join(labels, "_"))
}
if labels := removeUnderscoreName(without); len(labels) > 0 {
suffix += fmt.Sprintf("_without_%s", strings.Join(labels, "_"))
}
suffix += "_"
var dedupAggr *lastAggrState
if dedupInterval > 0 {
dedupAggr = newLastAggrState()
}
// initialize the aggregator
a := &aggregator{
match: cfg.Match,
inputRelabeling: inputRelabeling,
outputRelabeling: outputRelabeling,
by: by,
without: without,
aggregateOnlyByTime: aggregateOnlyByTime,
dedupAggr: dedupAggr,
aggrStates: aggrStates,
pushFunc: pushFunc,
suffix: suffix,
stopCh: make(chan struct{}),
}
if dedupAggr != nil {
a.wg.Add(1)
go func() {
a.runDedupFlusher(dedupInterval)
a.wg.Done()
}()
}
a.wg.Add(1)
go func() {
a.runFlusher(interval)
a.wg.Done()
}()
return a, nil
}
func (a *aggregator) runDedupFlusher(interval time.Duration) {
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-a.stopCh:
return
case <-t.C:
}
// Globally limit the concurrency for metrics' flush
// in order to limit memory usage when big number of aggregators
// are flushed at the same time.
flushConcurrencyCh <- struct{}{}
a.dedupFlush()
<-flushConcurrencyCh
}
}
func (a *aggregator) runFlusher(interval time.Duration) {
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-a.stopCh:
return
case <-t.C:
}
// Globally limit the concurrency for metrics' flush
// in order to limit memory usage when big number of aggregators
// are flushed at the same time.
flushConcurrencyCh <- struct{}{}
a.flush()
<-flushConcurrencyCh
}
}
var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())
func (a *aggregator) dedupFlush() {
ctx := &flushCtx{
skipAggrSuffix: true,
}
a.dedupAggr.appendSeriesForFlush(ctx)
a.push(ctx.tss)
}
func (a *aggregator) flush() {
ctx := &flushCtx{
suffix: a.suffix,
}
for _, as := range a.aggrStates {
ctx.reset()
as.appendSeriesForFlush(ctx)
tss := ctx.tss
// Apply output relabeling
if a.outputRelabeling != nil {
dst := tss[:0]
for _, ts := range tss {
ts.Labels = a.outputRelabeling.Apply(ts.Labels, 0)
if len(ts.Labels) == 0 {
// The metric has been deleted by the relabeling
continue
}
dst = append(dst, ts)
}
tss = dst
}
// Push the output metrics.
a.pushFunc(tss)
}
}
// MustStop stops the aggregator.
//
// The aggregator stops pushing the aggregated metrics after this call.
func (a *aggregator) MustStop() {
close(a.stopCh)
a.wg.Wait()
// Flush the remaining data from the last interval if needed.
flushConcurrencyCh <- struct{}{}
if a.dedupAggr != nil {
a.dedupFlush()
}
a.flush()
<-flushConcurrencyCh
}
// Push pushes tss to a.
func (a *aggregator) Push(tss []prompbmarshal.TimeSeries) {
if a.dedupAggr == nil {
a.push(tss)
return
}
// deduplication is enabled.
// push samples to dedupAggr, so later they will be pushed to the configured aggregators.
pushSample := a.dedupAggr.pushSample
inputKey := ""
bb := bbPool.Get()
for _, ts := range tss {
bb.B = marshalLabelsFast(bb.B[:0], ts.Labels)
outputKey := bytesutil.InternBytes(bb.B)
for _, sample := range ts.Samples {
pushSample(inputKey, outputKey, sample.Value)
}
}
bbPool.Put(bb)
}
func (a *aggregator) push(tss []prompbmarshal.TimeSeries) {
labels := promutils.GetLabels()
tmpLabels := promutils.GetLabels()
bb := bbPool.Get()
for _, ts := range tss {
if !a.match.Match(ts.Labels) {
continue
}
labels.Labels = append(labels.Labels[:0], ts.Labels...)
labels.Labels = a.inputRelabeling.Apply(labels.Labels, 0)
if len(labels.Labels) == 0 {
// The metric has been deleted by the relabeling
continue
}
labels.Sort()
if a.aggregateOnlyByTime {
bb.B = marshalLabelsFast(bb.B[:0], labels.Labels)
} else {
tmpLabels.Labels = removeUnneededLabels(tmpLabels.Labels[:0], labels.Labels, a.by, a.without)
bb.B = marshalLabelsFast(bb.B[:0], tmpLabels.Labels)
}
outputKey := bytesutil.InternBytes(bb.B)
inputKey := ""
if !a.aggregateOnlyByTime {
tmpLabels.Labels = extractUnneededLabels(tmpLabels.Labels[:0], labels.Labels, a.by, a.without)
bb.B = marshalLabelsFast(bb.B[:0], tmpLabels.Labels)
inputKey = bytesutil.InternBytes(bb.B)
}
for _, sample := range ts.Samples {
a.pushSample(inputKey, outputKey, sample.Value)
}
}
bbPool.Put(bb)
promutils.PutLabels(tmpLabels)
promutils.PutLabels(labels)
}
var bbPool bytesutil.ByteBufferPool
func (a *aggregator) pushSample(inputKey, outputKey string, value float64) {
if math.IsNaN(value) {
// Skip nan samples
return
}
for _, as := range a.aggrStates {
as.pushSample(inputKey, outputKey, value)
}
}
func extractUnneededLabels(dst, labels []prompbmarshal.Label, by, without []string) []prompbmarshal.Label {
if len(without) > 0 {
for _, label := range labels {
if hasInArray(label.Name, without) {
dst = append(dst, label)
}
}
} else {
for _, label := range labels {
if !hasInArray(label.Name, by) {
dst = append(dst, label)
}
}
}
return dst
}
func removeUnneededLabels(dst, labels []prompbmarshal.Label, by, without []string) []prompbmarshal.Label {
if len(without) > 0 {
for _, label := range labels {
if !hasInArray(label.Name, without) {
dst = append(dst, label)
}
}
} else {
for _, label := range labels {
if hasInArray(label.Name, by) {
dst = append(dst, label)
}
}
}
return dst
}
func hasInArray(name string, a []string) bool {
for _, s := range a {
if name == s {
return true
}
}
return false
}
func marshalLabelsFast(dst []byte, labels []prompbmarshal.Label) []byte {
dst = encoding.MarshalUint32(dst, uint32(len(labels)))
for _, label := range labels {
dst = encoding.MarshalUint32(dst, uint32(len(label.Name)))
dst = append(dst, label.Name...)
dst = encoding.MarshalUint32(dst, uint32(len(label.Value)))
dst = append(dst, label.Value...)
}
return dst
}
func unmarshalLabelsFast(dst []prompbmarshal.Label, src []byte) ([]prompbmarshal.Label, error) {
if len(src) < 4 {
return dst, fmt.Errorf("cannot unmarshal labels count from %d bytes; needs at least 4 bytes", len(src))
}
n := encoding.UnmarshalUint32(src)
src = src[4:]
for i := uint32(0); i < n; i++ {
// Unmarshal label name
if len(src) < 4 {
return dst, fmt.Errorf("cannot unmarshal label name length from %d bytes; needs at least 4 bytes", len(src))
}
labelNameLen := encoding.UnmarshalUint32(src)
src = src[4:]
if uint32(len(src)) < labelNameLen {
return dst, fmt.Errorf("cannot unmarshal label name from %d bytes; needs at least %d bytes", len(src), labelNameLen)
}
labelName := bytesutil.InternBytes(src[:labelNameLen])
src = src[labelNameLen:]
// Unmarshal label value
if len(src) < 4 {
return dst, fmt.Errorf("cannot unmarshal label value length from %d bytes; needs at least 4 bytes", len(src))
}
labelValueLen := encoding.UnmarshalUint32(src)
src = src[4:]
if uint32(len(src)) < labelValueLen {
return dst, fmt.Errorf("cannot unmarshal label value from %d bytes; needs at least %d bytes", len(src), labelValueLen)
}
labelValue := bytesutil.InternBytes(src[:labelValueLen])
src = src[labelValueLen:]
dst = append(dst, prompbmarshal.Label{
Name: labelName,
Value: labelValue,
})
}
if len(src) > 0 {
return dst, fmt.Errorf("unexpected non-empty tail after unmarshaling labels; tail length is %d bytes", len(src))
}
return dst, nil
}
type flushCtx struct {
skipAggrSuffix bool
suffix string
tss []prompbmarshal.TimeSeries
labels []prompbmarshal.Label
samples []prompbmarshal.Sample
}
func (ctx *flushCtx) reset() {
ctx.tss = prompbmarshal.ResetTimeSeries(ctx.tss)
promrelabel.CleanLabels(ctx.labels)
ctx.labels = ctx.labels[:0]
ctx.samples = ctx.samples[:0]
}
func (ctx *flushCtx) appendSeries(labelsMarshaled, suffix string, timestamp int64, value float64) {
var err error
labelsLen := len(ctx.labels)
samplesLen := len(ctx.samples)
ctx.labels, err = unmarshalLabelsFast(ctx.labels, bytesutil.ToUnsafeBytes(labelsMarshaled))
if err != nil {
logger.Panicf("BUG: cannot unmarshal labels from output key: %s", err)
}
if !ctx.skipAggrSuffix {
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.suffix, suffix)
}
ctx.samples = append(ctx.samples, prompbmarshal.Sample{
Timestamp: timestamp,
Value: value,
})
ctx.tss = append(ctx.tss, prompbmarshal.TimeSeries{
Labels: ctx.labels[labelsLen:],
Samples: ctx.samples[samplesLen:],
})
}
func (ctx *flushCtx) appendSeriesWithExtraLabel(labelsMarshaled, suffix string, timestamp int64, value float64, extraName, extraValue string) {
var err error
labelsLen := len(ctx.labels)
samplesLen := len(ctx.samples)
ctx.labels, err = unmarshalLabelsFast(ctx.labels, bytesutil.ToUnsafeBytes(labelsMarshaled))
if err != nil {
logger.Panicf("BUG: cannot unmarshal labels from output key: %s", err)
}
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.suffix, suffix)
ctx.labels = append(ctx.labels, prompbmarshal.Label{
Name: extraName,
Value: extraValue,
})
ctx.samples = append(ctx.samples, prompbmarshal.Sample{
Timestamp: timestamp,
Value: value,
})
ctx.tss = append(ctx.tss, prompbmarshal.TimeSeries{
Labels: ctx.labels[labelsLen:],
Samples: ctx.samples[samplesLen:],
})
}
func addMetricSuffix(labels []prompbmarshal.Label, offset int, firstSuffix, lastSuffix string) []prompbmarshal.Label {
src := labels[offset:]
for i := range src {
label := &src[i]
if label.Name != "__name__" {
continue
}
bb := bbPool.Get()
bb.B = append(bb.B, label.Value...)
bb.B = append(bb.B, firstSuffix...)
bb.B = append(bb.B, lastSuffix...)
label.Value = bytesutil.InternBytes(bb.B)
bbPool.Put(bb)
return labels
}
// The __name__ isn't found. Add it
bb := bbPool.Get()
bb.B = append(bb.B, firstSuffix...)
bb.B = append(bb.B, lastSuffix...)
labelValue := bytesutil.InternBytes(bb.B)
labels = append(labels, prompbmarshal.Label{
Name: "__name__",
Value: labelValue,
})
return labels
}
func addMissingUnderscoreName(labels []string) []string {
result := []string{"__name__"}
for _, s := range labels {
if s == "__name__" {
continue
}
result = append(result, s)
}
return result
}
func removeUnderscoreName(labels []string) []string {
var result []string
for _, s := range labels {
if s == "__name__" {
continue
}
result = append(result, s)
}
return result
}
func sortAndRemoveDuplicates(a []string) []string {
if len(a) == 0 {
return nil
}
a = append([]string{}, a...)
sort.Strings(a)
dst := a[:1]
for _, v := range a[1:] {
if v != dst[len(dst)-1] {
dst = append(dst, v)
}
}
return dst
}