apptest/tests: add test to verify sparse cache usage

Sparse cache is only used for "final merges" - merge of data of previous months, so tests verify that.

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
This commit is contained in:
Zakhar Bessarab 2025-01-29 17:19:40 +04:00
parent 0e08a7a125
commit 3e5527ae57
No known key found for this signature in database
GPG key ID: 932B34D6FE062023
3 changed files with 93 additions and 0 deletions

View file

@ -0,0 +1,76 @@
package tests
import (
"fmt"
"math/rand/v2"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/apptest"
)
func TestStorageUsesSparseCacheForFinalMerge(t *testing.T) {
tc := apptest.NewTestCase(t)
defer tc.Stop()
sut := tc.MustStartVmsingle("sparse-cache-final-merge", []string{`-retentionPeriod=100y`, `-downsampling.period={__name__=~"metric.*"}:5m:1s`})
// insert metrics daily over past 2 months
const metricsPerStep = 10
const steps = 35
const stepSize = int64(1 * 24 * 60 * 60 * 1000)
records := make([]string, metricsPerStep)
ts := time.Now().Add(-2 * 30 * 24 * time.Hour).UnixMilli()
for range steps {
for i := range metricsPerStep {
name := fmt.Sprintf("metric_%d", i)
records[i] = fmt.Sprintf("%s %d %d", name, rand.IntN(1000), ts)
}
sut.PrometheusAPIV1ImportPrometheus(t, records, apptest.QueryOpts{})
ts += stepSize
}
sut.ForceFlush(t)
sut.ForceMerge(t)
// todo: replace with a more reliable way to check if the merge is completed
// wait for merge to be completed
time.Sleep(5 * time.Second)
v := sut.GetIntMetric(t, `vm_cache_requests_total{type="indexdb/dataBlocksSparse"}`)
if v <= 0 {
t.Fatalf(`unexpected vm_cache_requests_total{type="indexdb/dataBlocksSparse"} value: %d`, v)
}
}
func TestStorageDoesNotUseSparseCacheForRegularMerge(t *testing.T) {
tc := apptest.NewTestCase(t)
defer tc.Stop()
sut := tc.MustStartVmsingle("sparse-cache-regular-merge", []string{`-retentionPeriod=100y`, `-downsampling.period={__name__=~"metric.*"}:5m:1s`})
// insert metrics into current month only
const metricsPerStep = 10
const steps = 2
const stepSize = 60 * 1000
records := make([]string, metricsPerStep)
ts := time.Now().Add(-1 * time.Hour).UnixMilli()
for range steps {
for i := range metricsPerStep {
name := fmt.Sprintf("metric_%d", i)
records[i] = fmt.Sprintf("%s %d %d", name, rand.IntN(1000), ts)
}
sut.PrometheusAPIV1ImportPrometheus(t, records, apptest.QueryOpts{})
ts += stepSize
}
sut.ForceFlush(t)
sut.ForceMerge(t)
// todo: replace with a more reliable way to check if the merge is completed
// wait for merge to be completed
time.Sleep(5 * time.Second)
v := sut.GetIntMetric(t, `vm_cache_requests_total{type="indexdb/dataBlocksSparse"}`)
if v == 0 {
t.Fatalf(`unexpected vm_cache_requests_total{type="indexdb/dataBlocksSparse"} value: %d`, v)
}
}

View file

@ -24,6 +24,7 @@ type Vmsingle struct {
// vmstorage URLs.
forceFlushURL string
forceMergeURL string
// vminsert URLs.
influxLineWriteURL string
@ -65,6 +66,7 @@ func StartVmsingle(instance string, flags []string, cli *Client) (*Vmsingle, err
httpListenAddr: stderrExtracts[1],
forceFlushURL: fmt.Sprintf("http://%s/internal/force_flush", stderrExtracts[1]),
forceMergeURL: fmt.Sprintf("http://%s/internal/force_merge", stderrExtracts[1]),
influxLineWriteURL: fmt.Sprintf("http://%s/influx/write", stderrExtracts[1]),
prometheusAPIV1ImportPrometheusURL: fmt.Sprintf("http://%s/prometheus/api/v1/import/prometheus", stderrExtracts[1]),
prometheusAPIV1WriteURL: fmt.Sprintf("http://%s/prometheus/api/v1/write", stderrExtracts[1]),
@ -83,6 +85,13 @@ func (app *Vmsingle) ForceFlush(t *testing.T) {
app.cli.Get(t, app.forceFlushURL, http.StatusOK)
}
// ForceMerge is a test helper function that forces the merging of parts.
func (app *Vmsingle) ForceMerge(t *testing.T) {
t.Helper()
app.cli.Get(t, app.forceMergeURL, http.StatusOK)
}
// InfluxWrite is a test helper function that inserts a
// collection of records in Influx line format by sending a HTTP
// POST request to /influx/write vmsingle endpoint.

View file

@ -21,6 +21,7 @@ type Vmstorage struct {
vmselectAddr string
forceFlushURL string
forceMergeURL string
}
// StartVmstorage starts an instance of vmstorage with the given flags. It also
@ -57,6 +58,7 @@ func StartVmstorage(instance string, flags []string, cli *Client) (*Vmstorage, e
vmselectAddr: stderrExtracts[3],
forceFlushURL: fmt.Sprintf("http://%s/internal/force_flush", stderrExtracts[1]),
forceMergeURL: fmt.Sprintf("http://%s/internal/force_merge", stderrExtracts[1]),
}, nil
}
@ -80,6 +82,12 @@ func (app *Vmstorage) ForceFlush(t *testing.T) {
app.cli.Get(t, app.forceFlushURL, http.StatusOK)
}
// ForceMerge is a test helper function that forces the merging of parts.
func (app *Vmstorage) ForceMerge(t *testing.T) {
t.Helper()
app.cli.Get(t, app.forceMergeURL, http.StatusOK)
}
// String returns the string representation of the vmstorage app state.
func (app *Vmstorage) String() string {
return fmt.Sprintf("{app: %s storageDataPath: %q httpListenAddr: %q vminsertAddr: %q vmselectAddr: %q}", []any{