lib/storage: compress metricIDs, which match the given filters, before storing them in tagFiltersToMetricIDsCache

This allows reducing the indexdb/tagFiltersToMetricIDs cache size by 8 on average.
The cache size can be checked via vm_cache_size_bytes{type="indexdb/tagFiltersToMetricIDs"} metric exposed at /metrics page.
This commit is contained in:
Aliaksandr Valialkin 2024-01-23 16:09:52 +02:00
parent 4d78954158
commit 6c214397ed
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
2 changed files with 80 additions and 41 deletions

View file

@ -146,7 +146,7 @@ func mustOpenIndexDB(path string, s *Storage, isReadOnly *uint32) *indexDB {
tb := mergeset.MustOpenTable(path, invalidateTagFiltersCache, mergeTagToMetricIDsRows, isReadOnly) tb := mergeset.MustOpenTable(path, invalidateTagFiltersCache, mergeTagToMetricIDsRows, isReadOnly)
// Do not persist tagFiltersToMetricIDsCache in files, since it is very volatile. // Do not persist tagFiltersToMetricIDsCache in files, since it is very volatile because of tagFiltersKeyGen.
mem := memory.Allowed() mem := memory.Allowed()
tagFiltersCacheSize := getTagFiltersCacheSize() tagFiltersCacheSize := getTagFiltersCacheSize()
@ -320,10 +320,7 @@ func (db *indexDB) getMetricIDsFromTagFiltersCache(qt *querytracer.Tracer, key [
return nil, false return nil, false
} }
qt.Printf("found metricIDs with size: %d bytes", len(buf.B)) qt.Printf("found metricIDs with size: %d bytes", len(buf.B))
metricIDs, err := unmarshalMetricIDs(nil, buf.B) metricIDs := mustUnmarshalMetricIDs(nil, buf.B)
if err != nil {
logger.Panicf("FATAL: cannot unmarshal metricIDs from tagFiltersToMetricIDsCache: %s", err)
}
qt.Printf("unmarshaled %d metricIDs", len(metricIDs)) qt.Printf("unmarshaled %d metricIDs", len(metricIDs))
return metricIDs, true return metricIDs, true
} }
@ -374,6 +371,8 @@ func (db *indexDB) putMetricNameToCache(metricID uint64, metricName []byte) {
} }
func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versioned bool) []byte { func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versioned bool) []byte {
// There is no need in versioning the tagFilters key, since the tagFiltersToMetricIDsCache
// isn't persisted to disk (it is very volatile because of tagFiltersKeyGen).
prefix := ^uint64(0) prefix := ^uint64(0)
if versioned { if versioned {
prefix = atomic.LoadUint64(&tagFiltersKeyGen) prefix = atomic.LoadUint64(&tagFiltersKeyGen)
@ -394,52 +393,60 @@ func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versione
} }
func invalidateTagFiltersCache() { func invalidateTagFiltersCache() {
// This function must be fast, since it is called each // This function must be fast, since it is called each time new timeseries is added.
// time new timeseries is added.
atomic.AddUint64(&tagFiltersKeyGen, 1) atomic.AddUint64(&tagFiltersKeyGen, 1)
} }
var tagFiltersKeyGen uint64 var tagFiltersKeyGen uint64
func marshalMetricIDs(dst []byte, metricIDs []uint64) []byte { func marshalMetricIDs(dst []byte, metricIDs []uint64) []byte {
dst = encoding.MarshalUint64(dst, uint64(len(metricIDs))) // Compress metricIDs, so they occupy less space in the cache.
if len(metricIDs) == 0 { //
return dst // The srcBuf is a []byte cast of metricIDs.
} var srcBuf []byte
var buf []byte if len(metricIDs) > 0 {
sh := (*reflect.SliceHeader)(unsafe.Pointer(&buf)) sh := (*reflect.SliceHeader)(unsafe.Pointer(&srcBuf))
sh.Data = uintptr(unsafe.Pointer(&metricIDs[0])) sh.Data = uintptr(unsafe.Pointer(&metricIDs[0]))
sh.Cap = 8 * len(metricIDs) sh.Cap = 8 * len(metricIDs)
sh.Len = sh.Cap sh.Len = 8 * len(metricIDs)
dst = append(dst, buf...) }
dst = encoding.CompressZSTDLevel(dst, srcBuf, 1)
return dst return dst
} }
func unmarshalMetricIDs(dst []uint64, src []byte) ([]uint64, error) { func mustUnmarshalMetricIDs(dst []uint64, src []byte) []uint64 {
if len(src)%8 != 0 { // Decompress src into dstBuf.
return dst, fmt.Errorf("cannot unmarshal metricIDs from buffer of %d bytes; the buffer length must divide by 8", len(src)) //
// dstBuf is a []byte cast of dst.
var dstBuf []byte
if len(dst) > 0 {
sh := (*reflect.SliceHeader)(unsafe.Pointer(&dstBuf))
sh.Data = uintptr(unsafe.Pointer(&dst[0]))
sh.Cap = 8 * cap(dst)
sh.Len = 8 * len(dst)
} }
if len(src) < 8 { dstBufLen := len(dstBuf)
return dst, fmt.Errorf("cannot unmarshal metricIDs len from buffer of %d bytes; need at least 8 bytes", len(src)) var err error
dstBuf, err = encoding.DecompressZSTD(dstBuf, src)
if err != nil {
logger.Panicf("FATAL: cannot decompress metricIDs: %s", err)
} }
n := encoding.UnmarshalUint64(src) if len(dstBuf) == dstBufLen {
if n > ((1<<64)-1)/8 { // Zero metricIDs
return dst, fmt.Errorf("unexpectedly high metricIDs len: %d bytes; must be lower than %d bytes", n, uint64(((1<<64)-1)/8)) return dst
} }
src = src[8:] if (len(dstBuf)-dstBufLen)%8 != 0 {
if n*8 != uint64(len(src)) { logger.Panicf("FATAL: cannot unmarshal metricIDs from buffer of %d bytes; the buffer length must divide by 8", len(dstBuf)-dstBufLen)
return dst, fmt.Errorf("unexpected buffer length for unmarshaling metricIDs; got %d bytes; want %d bytes", n*8, len(src))
} }
if n == 0 {
return dst, nil // Convert dstBuf back to dst
} sh := (*reflect.SliceHeader)(unsafe.Pointer(&dst))
var metricIDs []uint64 sh.Data = uintptr(unsafe.Pointer(&dstBuf[0]))
sh := (*reflect.SliceHeader)(unsafe.Pointer(&metricIDs)) sh.Cap = cap(dstBuf) / 8
sh.Data = uintptr(unsafe.Pointer(&src[0])) sh.Len = len(dstBuf) / 8
sh.Cap = sh.Len
sh.Len = len(src) / 8 return dst
dst = append(dst, metricIDs...)
return dst, nil
} }
// getTSIDByMetricName fills the dst with TSID for the given metricName at the given date. // getTSIDByMetricName fills the dst with TSID for the given metricName at the given date.
@ -1582,6 +1589,9 @@ func (is *indexSearch) loadDeletedMetricIDs() (*uint64set.Set, error) {
return dmis, nil return dmis, nil
} }
// searchMetricIDs returns metricIDs for the given tfss and tr.
//
// The returned metricIDs are sorted.
func (db *indexDB) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]uint64, error) { func (db *indexDB) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]uint64, error) {
qt = qt.NewChild("search for matching metricIDs: filters=%s, timeRange=%s", tfss, &tr) qt = qt.NewChild("search for matching metricIDs: filters=%s, timeRange=%s", tfss, &tr)
defer qt.Done() defer qt.Done()
@ -2117,6 +2127,9 @@ func (is *indexSearch) searchMetricIDsWithFiltersOnDate(qt *querytracer.Tracer,
return metricIDs, nil return metricIDs, nil
} }
// searchMetricIDs returns metricIDs for the given tfss and tr.
//
// The returned metricIDs are sorted.
func (is *indexSearch) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]uint64, error) { func (is *indexSearch) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]uint64, error) {
ok, err := is.containsTimeRange(tr) ok, err := is.containsTimeRange(tr)
if err != nil { if err != nil {

View file

@ -24,18 +24,44 @@ import (
func TestMarshalUnmarshalMetricIDs(t *testing.T) { func TestMarshalUnmarshalMetricIDs(t *testing.T) {
f := func(metricIDs []uint64) { f := func(metricIDs []uint64) {
t.Helper() t.Helper()
// Try marshaling and unmarshaling to an empty dst
data := marshalMetricIDs(nil, metricIDs) data := marshalMetricIDs(nil, metricIDs)
result, err := unmarshalMetricIDs(nil, data) result := mustUnmarshalMetricIDs(nil, data)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if !reflect.DeepEqual(result, metricIDs) { if !reflect.DeepEqual(result, metricIDs) {
t.Fatalf("unexpected metricIDs after unmarshaling;\ngot\n%d\nwant\n%d", result, metricIDs) t.Fatalf("unexpected metricIDs after unmarshaling;\ngot\n%d\nwant\n%d", result, metricIDs)
} }
// Try marshaling and unmarshaling to non-empty dst
dataPrefix := []byte("prefix")
data = marshalMetricIDs(dataPrefix, metricIDs)
if len(data) < len(dataPrefix) {
t.Fatalf("too short len(data)=%d; must be at least len(dataPrefix)=%d", len(data), len(dataPrefix))
} }
if string(data[:len(dataPrefix)]) != string(dataPrefix) {
t.Fatalf("unexpected prefix; got %q; want %q", data[:len(dataPrefix)], dataPrefix)
}
data = data[len(dataPrefix):]
resultPrefix := []uint64{889432422, 89243, 9823}
result = mustUnmarshalMetricIDs(resultPrefix, data)
if len(result) < len(resultPrefix) {
t.Fatalf("too short result returned; len(result)=%d; must be at least len(resultPrefix)=%d", len(result), len(resultPrefix))
}
if !reflect.DeepEqual(result[:len(resultPrefix)], resultPrefix) {
t.Fatalf("unexpected result prefix; got %d; want %d", result[:len(resultPrefix)], resultPrefix)
}
result = result[len(resultPrefix):]
if (len(metricIDs) > 0 || len(result) > 0) && !reflect.DeepEqual(result, metricIDs) {
t.Fatalf("unexpected metricIDs after unmarshaling from prefix;\ngot\n%d\nwant\n%d", result, metricIDs)
}
}
f(nil) f(nil)
f([]uint64{0})
f([]uint64{1}) f([]uint64{1})
f([]uint64{1234, 678932943, 843289893843}) f([]uint64{1234, 678932943, 843289893843})
f([]uint64{1, 2, 3, 4, 5, 6, 8989898, 823849234, 1<<64 - 1, 1<<32 - 1, 0})
} }
func TestMergeSortedMetricIDs(t *testing.T) { func TestMergeSortedMetricIDs(t *testing.T) {