VictoriaMetrics/lib/streamaggr/streamaggr_test.go
Aliaksandr Valialkin d577657fb7
lib/streamaggr: follow-up for ff72ca14b9
- Make sure that the last successfully loaded config is used on hot-reload failure
- Properly cleanup resources occupied by already initialized aggregators
  when the current aggregator fails to be initialized
- Expose distinct vmagent_streamaggr_config_reload* metrics per each -remoteWrite.streamAggr.config
  This should simplify monitoring and debugging failed reloads
- Remove race condition at app/vminsert/common.MustStopStreamAggr when calling sa.MustStop() while sa
  could be in use at realoadSaConfig()
- Remove lib/streamaggr.aggregator.hasState global variable, since it may negatively impact scalability
  on system with big number of CPU cores at hasState.Store(true) call inside aggregator.Push().
- Remove fine-grained aggregator reload - reload all the aggregators on config change instead.
  This simplifies the code a bit. The fine-grained aggregator reload may be returned back
  if there will be demand from real users for it.
- Check -relabelConfig and -streamAggr.config files when single-node VictoriaMetrics runs with -dryRun flag
- Return back accidentally removed changelog for v1.87.4 at docs/CHANGELOG.md

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3639
2023-03-31 22:30:38 -07:00

795 lines
17 KiB
Go

package streamaggr
import (
"fmt"
"sort"
"strings"
"sync"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
)
func TestAggregatorsFailure(t *testing.T) {
f := func(config string) {
t.Helper()
pushFunc := func(tss []prompbmarshal.TimeSeries) {
panic(fmt.Errorf("pushFunc shouldn't be called"))
}
a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0)
if err == nil {
t.Fatalf("expecting non-nil error")
}
if a != nil {
t.Fatalf("expecting nil a")
}
}
// Invalid config
f(`foobar`)
// Unknown option
f(`
- interval: 1m
outputs: [total]
foobar: baz
`)
// missing interval
f(`
- outputs: [total]
`)
// missing outputs
f(`
- interval: 1m
`)
// Invalid output
f(`
- interval: 1m
outputs: [foobar]
`)
// Negative interval
f(`- interval: -5m`)
// Too small interval
f(`- interval: 10ms`)
// Invalid input_relabel_configs
f(`
- interval: 1m
outputs: [total]
input_relabel_configs:
- foo: bar
`)
f(`
- interval: 1m
outputs: [total]
input_relabel_configs:
- action: replace
`)
// Invalid output_relabel_configs
f(`
- interval: 1m
outputs: [total]
output_relabel_configs:
- foo: bar
`)
f(`
- interval: 1m
outputs: [total]
output_relabel_configs:
- action: replace
`)
// Both by and without are non-empty
f(`
- interval: 1m
outputs: [total]
by: [foo]
without: [bar]
`)
// Invalid quantiles()
f(`
- interval: 1m
outputs: ["quantiles("]
`)
f(`
- interval: 1m
outputs: ["quantiles()"]
`)
f(`
- interval: 1m
outputs: ["quantiles(foo)"]
`)
f(`
- interval: 1m
outputs: ["quantiles(-0.5)"]
`)
f(`
- interval: 1m
outputs: ["quantiles(1.5)"]
`)
}
func TestAggregatorsEqual(t *testing.T) {
f := func(a, b string, expectedResult bool) {
t.Helper()
pushFunc := func(tss []prompbmarshal.TimeSeries) {}
aa, err := NewAggregatorsFromData([]byte(a), pushFunc, 0)
if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err)
}
ab, err := NewAggregatorsFromData([]byte(b), pushFunc, 0)
if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err)
}
result := aa.Equal(ab)
if result != expectedResult {
t.Fatalf("unexpected result; got %v; want %v", result, expectedResult)
}
}
f("", "", true)
f(`
- outputs: [total]
interval: 5m
`, ``, false)
f(`
- outputs: [total]
interval: 5m
`, `
- outputs: [total]
interval: 5m
`, true)
f(`
- outputs: [total]
interval: 3m
`, `
- outputs: [total]
interval: 5m
`, false)
}
func TestAggregatorsSuccess(t *testing.T) {
f := func(config, inputMetrics, outputMetricsExpected string) {
t.Helper()
// Initialize Aggregators
var tssOutput []prompbmarshal.TimeSeries
var tssOutputLock sync.Mutex
pushFunc := func(tss []prompbmarshal.TimeSeries) {
tssOutputLock.Lock()
for _, ts := range tss {
labelsCopy := append([]prompbmarshal.Label{}, ts.Labels...)
samplesCopy := append([]prompbmarshal.Sample{}, ts.Samples...)
tssOutput = append(tssOutput, prompbmarshal.TimeSeries{
Labels: labelsCopy,
Samples: samplesCopy,
})
}
tssOutputLock.Unlock()
}
a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0)
if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err)
}
// Push the inputMetrics to Aggregators
tssInput := mustParsePromMetrics(inputMetrics)
a.Push(tssInput)
a.MustStop()
// Verify the tssOutput contains the expected metrics
tsStrings := make([]string, len(tssOutput))
for i, ts := range tssOutput {
tsStrings[i] = timeSeriesToString(ts)
}
sort.Strings(tsStrings)
outputMetrics := strings.Join(tsStrings, "")
if outputMetrics != outputMetricsExpected {
t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected)
}
}
// Empty config
f(``, ``, ``)
f(``, `foo{bar="baz"} 1`, ``)
f(``, "foo 1\nbaz 2", ``)
// Empty by list - aggregate only by time
f(`
- interval: 1m
outputs: [count_samples, sum_samples, count_series, last]
`, `
foo{abc="123"} 4
bar 5
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
`, `bar:1m_count_samples 1
bar:1m_count_series 1
bar:1m_last 5
bar:1m_sum_samples 5
foo:1m_count_samples{abc="123"} 2
foo:1m_count_samples{abc="456",de="fg"} 1
foo:1m_count_series{abc="123"} 1
foo:1m_count_series{abc="456",de="fg"} 1
foo:1m_last{abc="123"} 8.5
foo:1m_last{abc="456",de="fg"} 8
foo:1m_sum_samples{abc="123"} 12.5
foo:1m_sum_samples{abc="456",de="fg"} 8
`)
// Special case: __name__ in `by` list - this is the same as empty `by` list
f(`
- interval: 1m
by: [__name__]
outputs: [count_samples, sum_samples, count_series]
`, `
foo{abc="123"} 4
bar 5
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
`, `bar:1m_count_samples 1
bar:1m_count_series 1
bar:1m_sum_samples 5
foo:1m_count_samples 3
foo:1m_count_series 2
foo:1m_sum_samples 20.5
`)
// Non-empty `by` list with non-existing labels
f(`
- interval: 1m
by: [foo, bar]
outputs: [count_samples, sum_samples, count_series]
`, `
foo{abc="123"} 4
bar 5
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
`, `bar:1m_by_bar_foo_count_samples 1
bar:1m_by_bar_foo_count_series 1
bar:1m_by_bar_foo_sum_samples 5
foo:1m_by_bar_foo_count_samples 3
foo:1m_by_bar_foo_count_series 2
foo:1m_by_bar_foo_sum_samples 20.5
`)
// Non-empty `by` list with existing label
f(`
- interval: 1m
by: [abc]
outputs: [count_samples, sum_samples, count_series]
`, `
foo{abc="123"} 4
bar 5
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
`, `bar:1m_by_abc_count_samples 1
bar:1m_by_abc_count_series 1
bar:1m_by_abc_sum_samples 5
foo:1m_by_abc_count_samples{abc="123"} 2
foo:1m_by_abc_count_samples{abc="456"} 1
foo:1m_by_abc_count_series{abc="123"} 1
foo:1m_by_abc_count_series{abc="456"} 1
foo:1m_by_abc_sum_samples{abc="123"} 12.5
foo:1m_by_abc_sum_samples{abc="456"} 8
`)
// Non-empty `by` list with duplicate existing label
f(`
- interval: 1m
by: [abc, abc]
outputs: [count_samples, sum_samples, count_series]
`, `
foo{abc="123"} 4
bar 5
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
`, `bar:1m_by_abc_count_samples 1
bar:1m_by_abc_count_series 1
bar:1m_by_abc_sum_samples 5
foo:1m_by_abc_count_samples{abc="123"} 2
foo:1m_by_abc_count_samples{abc="456"} 1
foo:1m_by_abc_count_series{abc="123"} 1
foo:1m_by_abc_count_series{abc="456"} 1
foo:1m_by_abc_sum_samples{abc="123"} 12.5
foo:1m_by_abc_sum_samples{abc="456"} 8
`)
// Non-empty `without` list with non-existing labels
f(`
- interval: 1m
without: [foo]
outputs: [count_samples, sum_samples, count_series]
`, `
foo{abc="123"} 4
bar 5
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
`, `bar:1m_without_foo_count_samples 1
bar:1m_without_foo_count_series 1
bar:1m_without_foo_sum_samples 5
foo:1m_without_foo_count_samples{abc="123"} 2
foo:1m_without_foo_count_samples{abc="456",de="fg"} 1
foo:1m_without_foo_count_series{abc="123"} 1
foo:1m_without_foo_count_series{abc="456",de="fg"} 1
foo:1m_without_foo_sum_samples{abc="123"} 12.5
foo:1m_without_foo_sum_samples{abc="456",de="fg"} 8
`)
// Non-empty `without` list with existing labels
f(`
- interval: 1m
without: [abc]
outputs: [count_samples, sum_samples, count_series]
`, `
foo{abc="123"} 4
bar 5
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
`, `bar:1m_without_abc_count_samples 1
bar:1m_without_abc_count_series 1
bar:1m_without_abc_sum_samples 5
foo:1m_without_abc_count_samples 2
foo:1m_without_abc_count_samples{de="fg"} 1
foo:1m_without_abc_count_series 1
foo:1m_without_abc_count_series{de="fg"} 1
foo:1m_without_abc_sum_samples 12.5
foo:1m_without_abc_sum_samples{de="fg"} 8
`)
// Special case: __name__ in `without` list
f(`
- interval: 1m
without: [__name__]
outputs: [count_samples, sum_samples, count_series]
`, `
foo{abc="123"} 4
bar 5
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
`, `:1m_count_samples 1
:1m_count_samples{abc="123"} 2
:1m_count_samples{abc="456",de="fg"} 1
:1m_count_series 1
:1m_count_series{abc="123"} 1
:1m_count_series{abc="456",de="fg"} 1
:1m_sum_samples 5
:1m_sum_samples{abc="123"} 12.5
:1m_sum_samples{abc="456",de="fg"} 8
`)
// drop some input metrics
f(`
- interval: 1m
without: [abc]
outputs: [count_samples, sum_samples, count_series]
input_relabel_configs:
- if: 'foo'
action: drop
`, `
foo{abc="123"} 4
bar 5
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
`, `bar:1m_without_abc_count_samples 1
bar:1m_without_abc_count_series 1
bar:1m_without_abc_sum_samples 5
`)
// rename output metrics
f(`
- interval: 1m
without: [abc]
outputs: [count_samples, sum_samples, count_series]
output_relabel_configs:
- action: replace_all
source_labels: [__name__]
regex: ":|_"
replacement: "-"
target_label: __name__
- action: drop
source_labels: [de]
regex: fg
`, `
foo{abc="123"} 4
bar 5
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
`, `bar-1m-without-abc-count-samples 1
bar-1m-without-abc-count-series 1
bar-1m-without-abc-sum-samples 5
foo-1m-without-abc-count-samples 2
foo-1m-without-abc-count-series 1
foo-1m-without-abc-sum-samples 12.5
`)
// match doesn't match anything
f(`
- interval: 1m
without: [abc]
outputs: [count_samples, sum_samples, count_series]
match: '{non_existing_label!=""}'
`, `
foo{abc="123"} 4
bar 5
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
`, ``)
// match matches foo series with non-empty abc label
f(`
- interval: 1m
by: [abc]
outputs: [count_samples, sum_samples, count_series]
match: 'foo{abc=~".+"}'
`, `
foo{abc="123"} 4
bar 5
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
`, `foo:1m_by_abc_count_samples{abc="123"} 2
foo:1m_by_abc_count_samples{abc="456"} 1
foo:1m_by_abc_count_series{abc="123"} 1
foo:1m_by_abc_count_series{abc="456"} 1
foo:1m_by_abc_sum_samples{abc="123"} 12.5
foo:1m_by_abc_sum_samples{abc="456"} 8
`)
// total output for non-repeated series
f(`
- interval: 1m
outputs: [total]
`, `
foo 123
bar{baz="qwe"} 4.34
`, `bar:1m_total{baz="qwe"} 0
foo:1m_total 0
`)
// total output for repeated series
f(`
- interval: 1m
outputs: [total]
`, `
foo 123
bar{baz="qwe"} 1.32
bar{baz="qwe"} 4.34
bar{baz="qwe"} 2
foo{baz="qwe"} -5
bar{baz="qwer"} 343
bar{baz="qwer"} 344
foo{baz="qwe"} 10
`, `bar:1m_total{baz="qwe"} 5.02
bar:1m_total{baz="qwer"} 1
foo:1m_total 0
foo:1m_total{baz="qwe"} 15
`)
// total output for repeated series with group by __name__
f(`
- interval: 1m
by: [__name__]
outputs: [total]
`, `
foo 123
bar{baz="qwe"} 1.32
bar{baz="qwe"} 4.34
bar{baz="qwe"} 2
foo{baz="qwe"} -5
bar{baz="qwer"} 343
bar{baz="qwer"} 344
foo{baz="qwe"} 10
`, `bar:1m_total 6.02
foo:1m_total 15
`)
// increase output for non-repeated series
f(`
- interval: 1m
outputs: [increase]
`, `
foo 123
bar{baz="qwe"} 4.34
`, `bar:1m_increase{baz="qwe"} 0
foo:1m_increase 0
`)
// increase output for repeated series
f(`
- interval: 1m
outputs: [increase]
`, `
foo 123
bar{baz="qwe"} 1.32
bar{baz="qwe"} 4.34
bar{baz="qwe"} 2
foo{baz="qwe"} -5
bar{baz="qwer"} 343
bar{baz="qwer"} 344
foo{baz="qwe"} 10
`, `bar:1m_increase{baz="qwe"} 5.02
bar:1m_increase{baz="qwer"} 1
foo:1m_increase 0
foo:1m_increase{baz="qwe"} 15
`)
// multiple aggregate configs
f(`
- interval: 1m
outputs: [count_series, sum_samples]
- interval: 5m
by: [bar]
outputs: [sum_samples]
`, `
foo 1
foo{bar="baz"} 2
foo 3.3
`, `foo:1m_count_series 1
foo:1m_count_series{bar="baz"} 1
foo:1m_sum_samples 4.3
foo:1m_sum_samples{bar="baz"} 2
foo:5m_by_bar_sum_samples 4.3
foo:5m_by_bar_sum_samples{bar="baz"} 2
`)
// min and max outputs
f(`
- interval: 1m
outputs: [min, max]
`, `
foo{abc="123"} 4
bar 5
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
`, `bar:1m_max 5
bar:1m_min 5
foo:1m_max{abc="123"} 8.5
foo:1m_max{abc="456",de="fg"} 8
foo:1m_min{abc="123"} 4
foo:1m_min{abc="456",de="fg"} 8
`)
// avg output
f(`
- interval: 1m
outputs: [avg]
`, `
foo{abc="123"} 4
bar 5
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
`, `bar:1m_avg 5
foo:1m_avg{abc="123"} 6.25
foo:1m_avg{abc="456",de="fg"} 8
`)
// stddev output
f(`
- interval: 1m
outputs: [stddev]
`, `
foo{abc="123"} 4
bar 5
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
`, `bar:1m_stddev 0
foo:1m_stddev{abc="123"} 2.25
foo:1m_stddev{abc="456",de="fg"} 0
`)
// stdvar output
f(`
- interval: 1m
outputs: [stdvar]
`, `
foo{abc="123"} 4
bar 5
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
`, `bar:1m_stdvar 0
foo:1m_stdvar{abc="123"} 5.0625
foo:1m_stdvar{abc="456",de="fg"} 0
`)
// histogram_bucket output
f(`
- interval: 1m
outputs: [histogram_bucket]
`, `
cpu_usage{cpu="1"} 12.5
cpu_usage{cpu="1"} 13.3
cpu_usage{cpu="1"} 13
cpu_usage{cpu="1"} 12
cpu_usage{cpu="1"} 14
cpu_usage{cpu="1"} 25
cpu_usage{cpu="2"} 90
`, `cpu_usage:1m_histogram_bucket{cpu="1",vmrange="1.136e+01...1.292e+01"} 2
cpu_usage:1m_histogram_bucket{cpu="1",vmrange="1.292e+01...1.468e+01"} 3
cpu_usage:1m_histogram_bucket{cpu="1",vmrange="2.448e+01...2.783e+01"} 1
cpu_usage:1m_histogram_bucket{cpu="2",vmrange="8.799e+01...1.000e+02"} 1
`)
// histogram_bucket output without cpu
f(`
- interval: 1m
without: [cpu]
outputs: [histogram_bucket]
`, `
cpu_usage{cpu="1"} 12.5
cpu_usage{cpu="1"} 13.3
cpu_usage{cpu="1"} 13
cpu_usage{cpu="1"} 12
cpu_usage{cpu="1"} 14
cpu_usage{cpu="1"} 25
cpu_usage{cpu="2"} 90
`, `cpu_usage:1m_without_cpu_histogram_bucket{vmrange="1.136e+01...1.292e+01"} 2
cpu_usage:1m_without_cpu_histogram_bucket{vmrange="1.292e+01...1.468e+01"} 3
cpu_usage:1m_without_cpu_histogram_bucket{vmrange="2.448e+01...2.783e+01"} 1
cpu_usage:1m_without_cpu_histogram_bucket{vmrange="8.799e+01...1.000e+02"} 1
`)
// quantiles output
f(`
- interval: 1m
outputs: ["quantiles(0, 0.5, 1)"]
`, `
cpu_usage{cpu="1"} 12.5
cpu_usage{cpu="1"} 13.3
cpu_usage{cpu="1"} 13
cpu_usage{cpu="1"} 12
cpu_usage{cpu="1"} 14
cpu_usage{cpu="1"} 25
cpu_usage{cpu="2"} 90
`, `cpu_usage:1m_quantiles{cpu="1",quantile="0"} 12
cpu_usage:1m_quantiles{cpu="1",quantile="0.5"} 13.3
cpu_usage:1m_quantiles{cpu="1",quantile="1"} 25
cpu_usage:1m_quantiles{cpu="2",quantile="0"} 90
cpu_usage:1m_quantiles{cpu="2",quantile="0.5"} 90
cpu_usage:1m_quantiles{cpu="2",quantile="1"} 90
`)
// quantiles output without cpu
f(`
- interval: 1m
without: [cpu]
outputs: ["quantiles(0, 0.5, 1)"]
`, `
cpu_usage{cpu="1"} 12.5
cpu_usage{cpu="1"} 13.3
cpu_usage{cpu="1"} 13
cpu_usage{cpu="1"} 12
cpu_usage{cpu="1"} 14
cpu_usage{cpu="1"} 25
cpu_usage{cpu="2"} 90
`, `cpu_usage:1m_without_cpu_quantiles{quantile="0"} 12
cpu_usage:1m_without_cpu_quantiles{quantile="0.5"} 13.3
cpu_usage:1m_without_cpu_quantiles{quantile="1"} 90
`)
}
func TestAggregatorsWithDedupInterval(t *testing.T) {
f := func(config, inputMetrics, outputMetricsExpected string) {
t.Helper()
// Initialize Aggregators
var tssOutput []prompbmarshal.TimeSeries
var tssOutputLock sync.Mutex
pushFunc := func(tss []prompbmarshal.TimeSeries) {
tssOutputLock.Lock()
for _, ts := range tss {
labelsCopy := append([]prompbmarshal.Label{}, ts.Labels...)
samplesCopy := append([]prompbmarshal.Sample{}, ts.Samples...)
tssOutput = append(tssOutput, prompbmarshal.TimeSeries{
Labels: labelsCopy,
Samples: samplesCopy,
})
}
tssOutputLock.Unlock()
}
const dedupInterval = time.Hour
a, err := NewAggregatorsFromData([]byte(config), pushFunc, dedupInterval)
if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err)
}
// Push the inputMetrics to Aggregators
tssInput := mustParsePromMetrics(inputMetrics)
a.Push(tssInput)
if a != nil {
for _, aggr := range a.as {
aggr.dedupFlush()
aggr.flush()
}
}
a.MustStop()
// Verify the tssOutput contains the expected metrics
tsStrings := make([]string, len(tssOutput))
for i, ts := range tssOutput {
tsStrings[i] = timeSeriesToString(ts)
}
sort.Strings(tsStrings)
outputMetrics := strings.Join(tsStrings, "")
if outputMetrics != outputMetricsExpected {
t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected)
}
}
f(`
- interval: 1m
outputs: [sum_samples]
`, `
foo 123
bar 567
`, `bar:1m_sum_samples 567
foo:1m_sum_samples 123
`)
f(`
- interval: 1m
outputs: [sum_samples]
`, `
foo 123
bar{baz="qwe"} 1.32
bar{baz="qwe"} 4.34
bar{baz="qwe"} 2
foo{baz="qwe"} -5
bar{baz="qwer"} 343
bar{baz="qwer"} 344
foo{baz="qwe"} 10
`, `bar:1m_sum_samples{baz="qwe"} 2
bar:1m_sum_samples{baz="qwer"} 344
foo:1m_sum_samples 123
foo:1m_sum_samples{baz="qwe"} 10
`)
}
func timeSeriesToString(ts prompbmarshal.TimeSeries) string {
labelsString := promrelabel.LabelsToString(ts.Labels)
if len(ts.Samples) != 1 {
panic(fmt.Errorf("unexpected number of samples for %s: %d; want 1", labelsString, len(ts.Samples)))
}
return fmt.Sprintf("%s %v\n", labelsString, ts.Samples[0].Value)
}
func mustParsePromMetrics(s string) []prompbmarshal.TimeSeries {
var rows prometheus.Rows
errLogger := func(s string) {
panic(fmt.Errorf("unexpected error when parsing Prometheus metrics: %s", s))
}
rows.UnmarshalWithErrLogger(s, errLogger)
var tss []prompbmarshal.TimeSeries
samples := make([]prompbmarshal.Sample, 0, len(rows.Rows))
for _, row := range rows.Rows {
labels := make([]prompbmarshal.Label, 0, len(row.Tags)+1)
labels = append(labels, prompbmarshal.Label{
Name: "__name__",
Value: row.Metric,
})
for _, tag := range row.Tags {
labels = append(labels, prompbmarshal.Label{
Name: tag.Key,
Value: tag.Value,
})
}
samples = append(samples, prompbmarshal.Sample{
Value: row.Value,
Timestamp: row.Timestamp,
})
ts := prompbmarshal.TimeSeries{
Labels: labels,
Samples: samples[len(samples)-1:],
}
tss = append(tss, ts)
}
return tss
}