mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/storage: compress metricIDs, which match the given filters, before storing them in tagFiltersToMetricIDsCache
This allows reducing the indexdb/tagFiltersToMetricIDs cache size by 8 on average. The cache size can be checked via vm_cache_size_bytes{type="indexdb/tagFiltersToMetricIDs"} metric exposed at /metrics page.
This commit is contained in:
parent
4d78954158
commit
6c214397ed
2 changed files with 80 additions and 41 deletions
|
@ -146,7 +146,7 @@ func mustOpenIndexDB(path string, s *Storage, isReadOnly *uint32) *indexDB {
|
|||
|
||||
tb := mergeset.MustOpenTable(path, invalidateTagFiltersCache, mergeTagToMetricIDsRows, isReadOnly)
|
||||
|
||||
// Do not persist tagFiltersToMetricIDsCache in files, since it is very volatile.
|
||||
// Do not persist tagFiltersToMetricIDsCache in files, since it is very volatile because of tagFiltersKeyGen.
|
||||
mem := memory.Allowed()
|
||||
tagFiltersCacheSize := getTagFiltersCacheSize()
|
||||
|
||||
|
@ -320,10 +320,7 @@ func (db *indexDB) getMetricIDsFromTagFiltersCache(qt *querytracer.Tracer, key [
|
|||
return nil, false
|
||||
}
|
||||
qt.Printf("found metricIDs with size: %d bytes", len(buf.B))
|
||||
metricIDs, err := unmarshalMetricIDs(nil, buf.B)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot unmarshal metricIDs from tagFiltersToMetricIDsCache: %s", err)
|
||||
}
|
||||
metricIDs := mustUnmarshalMetricIDs(nil, buf.B)
|
||||
qt.Printf("unmarshaled %d metricIDs", len(metricIDs))
|
||||
return metricIDs, true
|
||||
}
|
||||
|
@ -374,6 +371,8 @@ func (db *indexDB) putMetricNameToCache(metricID uint64, metricName []byte) {
|
|||
}
|
||||
|
||||
func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versioned bool) []byte {
|
||||
// There is no need in versioning the tagFilters key, since the tagFiltersToMetricIDsCache
|
||||
// isn't persisted to disk (it is very volatile because of tagFiltersKeyGen).
|
||||
prefix := ^uint64(0)
|
||||
if versioned {
|
||||
prefix = atomic.LoadUint64(&tagFiltersKeyGen)
|
||||
|
@ -394,52 +393,60 @@ func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versione
|
|||
}
|
||||
|
||||
func invalidateTagFiltersCache() {
|
||||
// This function must be fast, since it is called each
|
||||
// time new timeseries is added.
|
||||
// This function must be fast, since it is called each time new timeseries is added.
|
||||
atomic.AddUint64(&tagFiltersKeyGen, 1)
|
||||
}
|
||||
|
||||
var tagFiltersKeyGen uint64
|
||||
|
||||
func marshalMetricIDs(dst []byte, metricIDs []uint64) []byte {
|
||||
dst = encoding.MarshalUint64(dst, uint64(len(metricIDs)))
|
||||
if len(metricIDs) == 0 {
|
||||
return dst
|
||||
// Compress metricIDs, so they occupy less space in the cache.
|
||||
//
|
||||
// The srcBuf is a []byte cast of metricIDs.
|
||||
var srcBuf []byte
|
||||
if len(metricIDs) > 0 {
|
||||
sh := (*reflect.SliceHeader)(unsafe.Pointer(&srcBuf))
|
||||
sh.Data = uintptr(unsafe.Pointer(&metricIDs[0]))
|
||||
sh.Cap = 8 * len(metricIDs)
|
||||
sh.Len = 8 * len(metricIDs)
|
||||
}
|
||||
var buf []byte
|
||||
sh := (*reflect.SliceHeader)(unsafe.Pointer(&buf))
|
||||
sh.Data = uintptr(unsafe.Pointer(&metricIDs[0]))
|
||||
sh.Cap = 8 * len(metricIDs)
|
||||
sh.Len = sh.Cap
|
||||
dst = append(dst, buf...)
|
||||
|
||||
dst = encoding.CompressZSTDLevel(dst, srcBuf, 1)
|
||||
return dst
|
||||
}
|
||||
|
||||
func unmarshalMetricIDs(dst []uint64, src []byte) ([]uint64, error) {
|
||||
if len(src)%8 != 0 {
|
||||
return dst, fmt.Errorf("cannot unmarshal metricIDs from buffer of %d bytes; the buffer length must divide by 8", len(src))
|
||||
func mustUnmarshalMetricIDs(dst []uint64, src []byte) []uint64 {
|
||||
// Decompress src into dstBuf.
|
||||
//
|
||||
// dstBuf is a []byte cast of dst.
|
||||
var dstBuf []byte
|
||||
if len(dst) > 0 {
|
||||
sh := (*reflect.SliceHeader)(unsafe.Pointer(&dstBuf))
|
||||
sh.Data = uintptr(unsafe.Pointer(&dst[0]))
|
||||
sh.Cap = 8 * cap(dst)
|
||||
sh.Len = 8 * len(dst)
|
||||
}
|
||||
if len(src) < 8 {
|
||||
return dst, fmt.Errorf("cannot unmarshal metricIDs len from buffer of %d bytes; need at least 8 bytes", len(src))
|
||||
dstBufLen := len(dstBuf)
|
||||
var err error
|
||||
dstBuf, err = encoding.DecompressZSTD(dstBuf, src)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot decompress metricIDs: %s", err)
|
||||
}
|
||||
n := encoding.UnmarshalUint64(src)
|
||||
if n > ((1<<64)-1)/8 {
|
||||
return dst, fmt.Errorf("unexpectedly high metricIDs len: %d bytes; must be lower than %d bytes", n, uint64(((1<<64)-1)/8))
|
||||
if len(dstBuf) == dstBufLen {
|
||||
// Zero metricIDs
|
||||
return dst
|
||||
}
|
||||
src = src[8:]
|
||||
if n*8 != uint64(len(src)) {
|
||||
return dst, fmt.Errorf("unexpected buffer length for unmarshaling metricIDs; got %d bytes; want %d bytes", n*8, len(src))
|
||||
if (len(dstBuf)-dstBufLen)%8 != 0 {
|
||||
logger.Panicf("FATAL: cannot unmarshal metricIDs from buffer of %d bytes; the buffer length must divide by 8", len(dstBuf)-dstBufLen)
|
||||
}
|
||||
if n == 0 {
|
||||
return dst, nil
|
||||
}
|
||||
var metricIDs []uint64
|
||||
sh := (*reflect.SliceHeader)(unsafe.Pointer(&metricIDs))
|
||||
sh.Data = uintptr(unsafe.Pointer(&src[0]))
|
||||
sh.Cap = sh.Len
|
||||
sh.Len = len(src) / 8
|
||||
dst = append(dst, metricIDs...)
|
||||
return dst, nil
|
||||
|
||||
// Convert dstBuf back to dst
|
||||
sh := (*reflect.SliceHeader)(unsafe.Pointer(&dst))
|
||||
sh.Data = uintptr(unsafe.Pointer(&dstBuf[0]))
|
||||
sh.Cap = cap(dstBuf) / 8
|
||||
sh.Len = len(dstBuf) / 8
|
||||
|
||||
return dst
|
||||
}
|
||||
|
||||
// getTSIDByMetricName fills the dst with TSID for the given metricName at the given date.
|
||||
|
@ -1582,6 +1589,9 @@ func (is *indexSearch) loadDeletedMetricIDs() (*uint64set.Set, error) {
|
|||
return dmis, nil
|
||||
}
|
||||
|
||||
// searchMetricIDs returns metricIDs for the given tfss and tr.
|
||||
//
|
||||
// The returned metricIDs are sorted.
|
||||
func (db *indexDB) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]uint64, error) {
|
||||
qt = qt.NewChild("search for matching metricIDs: filters=%s, timeRange=%s", tfss, &tr)
|
||||
defer qt.Done()
|
||||
|
@ -2117,6 +2127,9 @@ func (is *indexSearch) searchMetricIDsWithFiltersOnDate(qt *querytracer.Tracer,
|
|||
return metricIDs, nil
|
||||
}
|
||||
|
||||
// searchMetricIDs returns metricIDs for the given tfss and tr.
|
||||
//
|
||||
// The returned metricIDs are sorted.
|
||||
func (is *indexSearch) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]uint64, error) {
|
||||
ok, err := is.containsTimeRange(tr)
|
||||
if err != nil {
|
||||
|
|
|
@ -24,18 +24,44 @@ import (
|
|||
func TestMarshalUnmarshalMetricIDs(t *testing.T) {
|
||||
f := func(metricIDs []uint64) {
|
||||
t.Helper()
|
||||
|
||||
// Try marshaling and unmarshaling to an empty dst
|
||||
data := marshalMetricIDs(nil, metricIDs)
|
||||
result, err := unmarshalMetricIDs(nil, data)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
result := mustUnmarshalMetricIDs(nil, data)
|
||||
if !reflect.DeepEqual(result, metricIDs) {
|
||||
t.Fatalf("unexpected metricIDs after unmarshaling;\ngot\n%d\nwant\n%d", result, metricIDs)
|
||||
}
|
||||
|
||||
// Try marshaling and unmarshaling to non-empty dst
|
||||
dataPrefix := []byte("prefix")
|
||||
data = marshalMetricIDs(dataPrefix, metricIDs)
|
||||
if len(data) < len(dataPrefix) {
|
||||
t.Fatalf("too short len(data)=%d; must be at least len(dataPrefix)=%d", len(data), len(dataPrefix))
|
||||
}
|
||||
if string(data[:len(dataPrefix)]) != string(dataPrefix) {
|
||||
t.Fatalf("unexpected prefix; got %q; want %q", data[:len(dataPrefix)], dataPrefix)
|
||||
}
|
||||
data = data[len(dataPrefix):]
|
||||
|
||||
resultPrefix := []uint64{889432422, 89243, 9823}
|
||||
result = mustUnmarshalMetricIDs(resultPrefix, data)
|
||||
if len(result) < len(resultPrefix) {
|
||||
t.Fatalf("too short result returned; len(result)=%d; must be at least len(resultPrefix)=%d", len(result), len(resultPrefix))
|
||||
}
|
||||
if !reflect.DeepEqual(result[:len(resultPrefix)], resultPrefix) {
|
||||
t.Fatalf("unexpected result prefix; got %d; want %d", result[:len(resultPrefix)], resultPrefix)
|
||||
}
|
||||
result = result[len(resultPrefix):]
|
||||
if (len(metricIDs) > 0 || len(result) > 0) && !reflect.DeepEqual(result, metricIDs) {
|
||||
t.Fatalf("unexpected metricIDs after unmarshaling from prefix;\ngot\n%d\nwant\n%d", result, metricIDs)
|
||||
}
|
||||
}
|
||||
|
||||
f(nil)
|
||||
f([]uint64{0})
|
||||
f([]uint64{1})
|
||||
f([]uint64{1234, 678932943, 843289893843})
|
||||
f([]uint64{1, 2, 3, 4, 5, 6, 8989898, 823849234, 1<<64 - 1, 1<<32 - 1, 0})
|
||||
}
|
||||
|
||||
func TestMergeSortedMetricIDs(t *testing.T) {
|
||||
|
|
Loading…
Reference in a new issue