lib/storage: remove duplicate MetricIDs in tag->metricIDs items before writing them into inverted index

This commit is contained in:
Aliaksandr Valialkin 2019-09-25 17:55:13 +03:00
parent adc18c3ee6
commit 4468f9f966
2 changed files with 46 additions and 0 deletions

View file

@ -2391,7 +2391,9 @@ func (tmm *tagToMetricIDsRowsMerger) flushPendingMetricIDs(dstData []byte, dstIt
} }
// Use sort.Sort instead of sort.Slice in order to reduce memory allocations. // Use sort.Sort instead of sort.Slice in order to reduce memory allocations.
sort.Sort(&tmm.pendingMetricIDs) sort.Sort(&tmm.pendingMetricIDs)
tmm.pendingMetricIDs = removeDuplicateMetricIDs(tmm.pendingMetricIDs)
// Marshal pendingMetricIDs
dstDataLen := len(dstData) dstDataLen := len(dstData)
dstData = marshalCommonPrefix(dstData, nsPrefixTagToMetricIDs, mp.AccountID, mp.ProjectID) dstData = marshalCommonPrefix(dstData, nsPrefixTagToMetricIDs, mp.AccountID, mp.ProjectID)
dstData = mp.Tag.Marshal(dstData) dstData = mp.Tag.Marshal(dstData)
@ -2403,6 +2405,33 @@ func (tmm *tagToMetricIDsRowsMerger) flushPendingMetricIDs(dstData []byte, dstIt
return dstData, dstItems return dstData, dstItems
} }
func removeDuplicateMetricIDs(sortedMetricIDs []uint64) []uint64 {
if len(sortedMetricIDs) < 2 {
return sortedMetricIDs
}
prevMetricID := sortedMetricIDs[0]
hasDuplicates := false
for _, metricID := range sortedMetricIDs[1:] {
if prevMetricID == metricID {
hasDuplicates = true
}
prevMetricID = metricID
}
if !hasDuplicates {
return sortedMetricIDs
}
dstMetricIDs := sortedMetricIDs[:1]
prevMetricID = sortedMetricIDs[0]
for _, metricID := range sortedMetricIDs[1:] {
if prevMetricID == metricID {
continue
}
dstMetricIDs = append(dstMetricIDs, metricID)
prevMetricID = metricID
}
return dstMetricIDs
}
func getTagToMetricIDsRowsMerger() *tagToMetricIDsRowsMerger { func getTagToMetricIDsRowsMerger() *tagToMetricIDsRowsMerger {
v := tmmPool.Get() v := tmmPool.Get()
if v == nil { if v == nil {

View file

@ -15,6 +15,23 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
) )
func TestRemoveDuplicateMetricIDs(t *testing.T) {
f := func(metricIDs, expectedMetricIDs []uint64) {
a := removeDuplicateMetricIDs(metricIDs)
if !reflect.DeepEqual(a, expectedMetricIDs) {
t.Fatalf("unexpected result from removeDuplicateMetricIDs:\ngot\n%d\nwant\n%d", a, expectedMetricIDs)
}
}
f(nil, nil)
f([]uint64{123}, []uint64{123})
f([]uint64{123, 123}, []uint64{123})
f([]uint64{123, 123, 123}, []uint64{123})
f([]uint64{0, 1, 1, 2}, []uint64{0, 1, 2})
f([]uint64{0, 0, 0, 1, 1, 2}, []uint64{0, 1, 2})
f([]uint64{0, 1, 1, 2, 2}, []uint64{0, 1, 2})
f([]uint64{0, 1, 2, 2}, []uint64{0, 1, 2})
}
func TestMarshalUnmarshalTSIDs(t *testing.T) { func TestMarshalUnmarshalTSIDs(t *testing.T) {
f := func(tsids []TSID) { f := func(tsids []TSID) {
t.Helper() t.Helper()