VictoriaMetrics/lib/storage/search_test.go
Artem Fetishev e17ca8bcc0
Allow disabling per-day index (#6976)
Allow disabling the per-day index using the `-disablePerDayIndex` flag.
This should significantly improve the ingestion rate and decrease the
disk space usage for the use cases that assume small or no churn rate.
See the docs added to `docs/README.md` for details.

Both improvements are due to no data written to the per-day index.
Benchmark results:

```shell
rm -Rf ./lib/storage/Benchmark*; go test ./lib/storage -run=NONE -bench=BenchmarkStorageInsertWithAndWithoutPerDayIndex --loggerLevel=ERROR
goos: linux
goarch: amd64
pkg: github.com/VictoriaMetrics/VictoriaMetrics/lib/storage
cpu: 13th Gen Intel(R) Core(TM) i7-1355U
BenchmarkStorageInsertWithAndWithoutPerDayIndex/HighChurnRate/perDayIndexes-12                 1        3850268120 ns/op                39.56 data-MiB          28.20 indexdb-MiB           259722 rows/s
BenchmarkStorageInsertWithAndWithoutPerDayIndex/HighChurnRate/noPerDayIndexes-12               1        2916865725 ns/op                39.57 data-MiB          25.73 indexdb-MiB           342834 rows/s
BenchmarkStorageInsertWithAndWithoutPerDayIndex/NoChurnRate/perDayIndexes-12                   1        2218073474 ns/op                 9.772 data-MiB         13.73 indexdb-MiB           450842 rows/s
BenchmarkStorageInsertWithAndWithoutPerDayIndex/NoChurnRate/noPerDayIndexes-12                 1        1295140898 ns/op                 9.771 data-MiB          0.3566 indexdb-MiB         772119 rows/s
PASS
ok      github.com/VictoriaMetrics/VictoriaMetrics/lib/storage  11.421s
```

Signed-off-by: Artem Fetishev <wwctrsrx@gmail.com>
Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
Co-authored-by: Roman Khavronenko <hagen1778@gmail.com>
Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
2025-02-17 15:36:23 +01:00

279 lines
7.6 KiB
Go

package storage
import (
"bytes"
"fmt"
"math/rand"
"os"
"reflect"
"regexp"
"sort"
"testing"
"testing/quick"
"time"
)
func TestSearchQueryMarshalUnmarshal(t *testing.T) {
rnd := rand.New(rand.NewSource(0))
typ := reflect.TypeOf(&SearchQuery{})
var buf []byte
var sq2 SearchQuery
for i := 0; i < 1000; i++ {
v, ok := quick.Value(typ, rnd)
if !ok {
t.Fatalf("cannot create random SearchQuery via testing/quick.Value")
}
sq1 := v.Interface().(*SearchQuery)
if sq1 == nil {
// Skip nil sq1.
continue
}
tt := TenantToken{
AccountID: sq1.AccountID,
ProjectID: sq1.ProjectID,
}
buf = tt.Marshal(buf[:0])
buf = sq1.MarshaWithoutTenant(buf)
tail, err := sq2.Unmarshal(buf)
if err != nil {
t.Fatalf("cannot unmarshal SearchQuery: %s", err)
}
if len(tail) > 0 {
t.Fatalf("unexpected tail left after SearchQuery unmarshaling; tail (len=%d): %q", len(tail), tail)
}
if sq2.AccountID != sq1.AccountID {
t.Fatalf("unexpected AccountID; got %d; want %d", sq2.AccountID, sq1.AccountID)
}
if sq2.ProjectID != sq1.ProjectID {
t.Fatalf("unexpected ProjectID; got %d; want %d", sq2.ProjectID, sq1.ProjectID)
}
if sq1.MinTimestamp != sq2.MinTimestamp {
t.Fatalf("unexpected MinTimestamp; got %d; want %d", sq2.MinTimestamp, sq1.MinTimestamp)
}
if sq1.MaxTimestamp != sq2.MaxTimestamp {
t.Fatalf("unexpected MaxTimestamp; got %d; want %d", sq2.MaxTimestamp, sq1.MaxTimestamp)
}
if len(sq1.TagFilterss) != len(sq2.TagFilterss) {
t.Fatalf("unexpected TagFilterss len; got %d; want %d", len(sq2.TagFilterss), len(sq1.TagFilterss))
}
for ii := range sq1.TagFilterss {
tagFilters1 := sq1.TagFilterss[ii]
tagFilters2 := sq2.TagFilterss[ii]
for j := range tagFilters1 {
tf1 := &tagFilters1[j]
tf2 := &tagFilters2[j]
if string(tf1.Key) != string(tf2.Key) {
t.Fatalf("unexpected Key on iteration %d,%d; got %X; want %X", i, j, tf2.Key, tf1.Key)
}
if string(tf1.Value) != string(tf2.Value) {
t.Fatalf("unexpected Value on iteration %d,%d; got %X; want %X", i, j, tf2.Value, tf1.Value)
}
if tf1.IsNegative != tf2.IsNegative {
t.Fatalf("unexpected IsNegative on iteration %d,%d; got %v; want %v", i, j, tf2.IsNegative, tf1.IsNegative)
}
if tf1.IsRegexp != tf2.IsRegexp {
t.Fatalf("unexpected IsRegexp on iteration %d,%d; got %v; want %v", i, j, tf2.IsRegexp, tf1.IsRegexp)
}
}
}
}
}
func TestSearch(t *testing.T) {
path := "TestSearch"
st := MustOpenStorage(path, OpenOptions{})
defer func() {
st.MustClose()
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove storage %q: %s", path, err)
}
}()
// Add rows to storage.
const rowsCount = 2e4
const rowsPerBlock = 1e3
const metricGroupsCount = rowsCount / 5
const accountsCount = 2
mrs := make([]MetricRow, rowsCount)
var mn MetricName
mn.Tags = []Tag{
{[]byte("job"), []byte("super-service")},
{[]byte("instance"), []byte("8.8.8.8:1234")},
}
startTimestamp := timestampFromTime(time.Now())
startTimestamp -= startTimestamp % (1e3 * 60 * 30)
blockRowsCount := 0
for i := 0; i < rowsCount; i++ {
mn.AccountID = uint32(i % accountsCount)
mn.MetricGroup = []byte(fmt.Sprintf("metric_%d", i%metricGroupsCount))
mr := &mrs[i]
mr.MetricNameRaw = mn.marshalRaw(nil)
mr.Timestamp = startTimestamp + int64(i)
mr.Value = float64(i)
blockRowsCount++
if blockRowsCount == rowsPerBlock {
st.AddRows(mrs[i-blockRowsCount+1:i+1], defaultPrecisionBits)
blockRowsCount = 0
}
}
st.AddRows(mrs[rowsCount-blockRowsCount:], defaultPrecisionBits)
endTimestamp := mrs[len(mrs)-1].Timestamp
// Re-open the storage in order to flush all the pending cached data.
st.MustClose()
st = MustOpenStorage(path, OpenOptions{})
// Run search.
tr := TimeRange{
MinTimestamp: startTimestamp + int64(rowsCount)/3,
MaxTimestamp: endTimestamp - int64(rowsCount)/3,
}
t.Run("serial", func(t *testing.T) {
if err := testSearchInternal(st, tr, mrs, accountsCount); err != nil {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("concurrent", func(t *testing.T) {
ch := make(chan error, 3)
for i := 0; i < cap(ch); i++ {
go func() {
ch <- testSearchInternal(st, tr, mrs, accountsCount)
}()
}
var firstError error
for i := 0; i < cap(ch); i++ {
select {
case err := <-ch:
if err != nil && firstError == nil {
firstError = err
}
case <-time.After(10 * time.Second):
t.Fatalf("timeout")
}
}
if firstError != nil {
t.Fatalf("unexpected error: %s", firstError)
}
})
}
func testSearchInternal(s *Storage, tr TimeRange, mrs []MetricRow, accountsCount int) error {
for i := 0; i < 10; i++ {
// Prepare TagFilters for search.
tfs := NewTagFilters(uint32(i%accountsCount), 0)
metricGroupRe := fmt.Sprintf(`metric_\d*%d%d`, i, i)
if err := tfs.Add(nil, []byte(metricGroupRe), false, true); err != nil {
return fmt.Errorf("cannot add metricGroupRe=%q: %w", metricGroupRe, err)
}
if err := tfs.Add([]byte("job"), []byte("nonexisting-service"), true, false); err != nil {
return fmt.Errorf("cannot add tag filter %q=%q: %w", "job", "nonexsitsing-service", err)
}
if err := tfs.Add([]byte("instance"), []byte(".*"), false, true); err != nil {
return fmt.Errorf("cannot add tag filter %q=%q: %w", "instance", ".*", err)
}
// Build extectedMrs.
var expectedMrs []MetricRow
metricGroupRegexp := regexp.MustCompile(fmt.Sprintf("^%s$", metricGroupRe))
var mn MetricName
for j := range mrs {
mr := &mrs[j]
if mr.Timestamp < tr.MinTimestamp || mr.Timestamp > tr.MaxTimestamp {
continue
}
if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil {
return fmt.Errorf("cannot unmarshal MetricName: %w", err)
}
if !metricGroupRegexp.Match(mn.MetricGroup) {
continue
}
expectedMrs = append(expectedMrs, *mr)
}
if err := testAssertSearchResult(s, tr, tfs, expectedMrs); err != nil {
return err
}
}
return nil
}
func testAssertSearchResult(st *Storage, tr TimeRange, tfs *TagFilters, want []MetricRow) error {
type metricBlock struct {
MetricName []byte
Block *Block
}
var s Search
s.Init(nil, st, []*TagFilters{tfs}, tr, 1e5, noDeadline)
var mbs []metricBlock
for s.NextMetricBlock() {
var b Block
s.MetricBlockRef.BlockRef.MustReadBlock(&b)
var mb metricBlock
mb.MetricName = append(mb.MetricName, s.MetricBlockRef.MetricName...)
mb.Block = &b
mbs = append(mbs, mb)
}
if err := s.Error(); err != nil {
return fmt.Errorf("search error: %w", err)
}
s.MustClose()
var got []MetricRow
var mn MetricName
for _, mb := range mbs {
rb := newTestRawBlock(mb.Block, tr)
if err := mn.Unmarshal(mb.MetricName); err != nil {
return fmt.Errorf("cannot unmarshal MetricName: %w", err)
}
metricNameRaw := mn.marshalRaw(nil)
for i, timestamp := range rb.Timestamps {
mr := MetricRow{
MetricNameRaw: metricNameRaw,
Timestamp: timestamp,
Value: rb.Values[i],
}
got = append(got, mr)
}
}
testSortMetricRows(got)
testSortMetricRows(want)
if !reflect.DeepEqual(got, want) {
return fmt.Errorf("unexpected rows found;\ngot\n%s\nwant\n%s", mrsToString(got), mrsToString(want))
}
return nil
}
func testSortMetricRows(mrs []MetricRow) {
sort.Slice(mrs, func(i, j int) bool {
a, b := &mrs[i], &mrs[j]
cmp := bytes.Compare(a.MetricNameRaw, b.MetricNameRaw)
if cmp < 0 {
return true
}
if cmp > 0 {
return false
}
return a.Timestamp < b.Timestamp
})
}
func mrsToString(mrs []MetricRow) string {
var bb bytes.Buffer
fmt.Fprintf(&bb, "len=%d\n", len(mrs))
for i := range mrs {
mr := &mrs[i]
fmt.Fprintf(&bb, "[%q, Timestamp=%d, Value=%f]\n", mr.MetricNameRaw, mr.Timestamp, mr.Value)
}
return bb.String()
}