diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index d280fa65d..d27e92ea1 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -436,15 +436,34 @@ func invalidateTagFiltersCache() { var tagFiltersKeyGen atomic.Uint64 func marshalMetricIDs(dst []byte, metricIDs []uint64) []byte { + if len(metricIDs) == 0 { + // Add one zero byte to indicate an empty metricID list and skip + // compression to save CPU cycles. + // + // An empty slice passed to ztsd won't be compressed and therefore + // nothing will be added to dst and if dst is empty the record won't be + // added to the cache. As the result, the search for a given filter will + // be performed again and again. This may lead to cases like this: + // https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7009 + return append(dst, 0) + } + // Compress metricIDs, so they occupy less space in the cache. // // The srcBuf is a []byte cast of metricIDs. srcBuf := unsafe.Slice((*byte)(unsafe.Pointer(unsafe.SliceData(metricIDs))), 8*len(metricIDs)) + dst = encoding.CompressZSTDLevel(dst, srcBuf, 1) return dst } func mustUnmarshalMetricIDs(dst []uint64, src []byte) []uint64 { + if len(src) == 1 && src[0] == 0 { + // One zero byte indicates an empty metricID list. + // See marshalMetricIDs(). + return dst + } + // Decompress src into dstBuf. // // dstBuf is a []byte cast of dst. @@ -456,10 +475,6 @@ func mustUnmarshalMetricIDs(dst []uint64, src []byte) []uint64 { if err != nil { logger.Panicf("FATAL: cannot decompress metricIDs: %s", err) } - if len(dstBuf) == dstBufLen { - // Zero metricIDs - return dst - } if (len(dstBuf)-dstBufLen)%8 != 0 { logger.Panicf("FATAL: cannot unmarshal metricIDs from buffer of %d bytes; the buffer length must divide by 8", len(dstBuf)-dstBufLen) } diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index 7d20ca6b2..d8f7dcfd3 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -64,6 +64,54 @@ func TestMarshalUnmarshalMetricIDs(t *testing.T) { f([]uint64{1, 2, 3, 4, 5, 6, 8989898, 823849234, 1<<64 - 1, 1<<32 - 1, 0}) } +func TestTagFiltersToMetricIDsCache(t *testing.T) { + f := func(want []uint64) { + t.Helper() + + path := t.Name() + defer fs.MustRemoveAll(path) + + s := MustOpenStorage(path, 0, 0, 0) + defer s.MustClose() + + idb := s.idb() + key := []byte("key") + idb.putMetricIDsToTagFiltersCache(nil, want, key) + got, ok := idb.getMetricIDsFromTagFiltersCache(nil, key) + if !ok { + t.Fatalf("expected metricIDs to be found in cache but they weren't: %v", want) + } + if !reflect.DeepEqual(got, want) { + t.Fatalf("unexpected metricIDs in cache: got %v, want %v", got, want) + } + } + + f([]uint64{0}) + f([]uint64{1}) + f([]uint64{1234, 678932943, 843289893843}) + f([]uint64{1, 2, 3, 4, 5, 6, 8989898, 823849234, 1<<64 - 1, 1<<32 - 1, 0}) +} + +func TestTagFiltersToMetricIDsCache_EmptyMetricIDList(t *testing.T) { + path := t.Name() + defer fs.MustRemoveAll(path) + s := MustOpenStorage(path, 0, 0, 0) + defer s.MustClose() + idb := s.idb() + + key := []byte("key") + emptyMetricIDs := []uint64(nil) + idb.putMetricIDsToTagFiltersCache(nil, emptyMetricIDs, key) + got, ok := idb.getMetricIDsFromTagFiltersCache(nil, key) + if !ok { + t.Fatalf("expected empty metricID list to be found in cache but it wasn't") + } + if len(got) > 0 { + t.Fatalf("unexpected found metricID list to be empty but got %v", got) + } + +} + func TestMergeSortedMetricIDs(t *testing.T) { f := func(a, b []uint64) { t.Helper() diff --git a/lib/storage/index_db_timing_test.go b/lib/storage/index_db_timing_test.go index d67bebaa7..cabf735ad 100644 --- a/lib/storage/index_db_timing_test.go +++ b/lib/storage/index_db_timing_test.go @@ -2,6 +2,7 @@ package storage import ( "fmt" + "math/rand" "regexp" "strconv" "testing" @@ -314,3 +315,33 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) { s.MustClose() fs.MustRemoveAll(path) } + +func BenchmarkMarshalUnmarshalMetricIDs(b *testing.B) { + rng := rand.New(rand.NewSource(1)) + + f := func(b *testing.B, numMetricIDs int) { + metricIDs := make([]uint64, numMetricIDs) + // metric IDs need to be sorted. + ts := uint64(time.Now().UnixNano()) + for i := range numMetricIDs { + metricIDs[i] = ts + uint64(rng.Intn(100)) + } + + var marshalledLen int + b.ResetTimer() + for range b.N { + marshalled := marshalMetricIDs(nil, metricIDs) + marshalledLen = len(marshalled) + _ = mustUnmarshalMetricIDs(nil, marshalled) + } + b.StopTimer() + compressionRate := float64(numMetricIDs*8) / float64(marshalledLen) + b.ReportMetric(compressionRate, "compression-rate") + } + + for _, n := range []int{0, 1, 10, 100, 1e3, 1e4, 1e5, 1e6, 1e7} { + b.Run(fmt.Sprintf("numMetricIDs-%d", n), func(b *testing.B) { + f(b, n) + }) + } +}