lib/storage: de-duplicate tags in MetricName.sortTags

Leave only the last tag among tags with duplicate keys. This is needed for reliable addition of extra_labels
during data ingestion. See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1007 for details.
This commit is contained in:
Aliaksandr Valialkin 2021-01-12 14:57:54 +02:00
parent 2140ccbdcc
commit c8ea697db8
2 changed files with 29 additions and 11 deletions

View file

@ -358,7 +358,8 @@ func (mn *MetricName) String() string {
// Marshal appends marshaled mn to dst and returns the result.
//
// Tags must be sorted before calling this function.
// mn.sortTags must be called before calling this function
// in order to sort and de-duplcate tags.
func (mn *MetricName) Marshal(dst []byte) []byte {
// Calculate the required size and pre-allocate space in dst
dstLen := len(dst)
@ -398,7 +399,7 @@ func (mn *MetricName) Unmarshal(src []byte) error {
}
// There is no need in verifying for identical tag keys,
// since they must be handled in MetricName.Marshal inside marshalTags.
// since they must be handled by MetricName.sortTags before calling MetricName.Marshal.
return nil
}
@ -550,7 +551,10 @@ func unmarshalBytesFast(src []byte) ([]byte, []byte, error) {
return src[n:], src[:n], nil
}
// sortTags sorts tags in mn.
// sortTags sorts tags in mn to canonical form needed for storing in the index.
//
// The function also de-duplicates tags with identical keys in mn. The last tag value
// for duplicate tags wins.
//
// Tags sorting is quite slow, so try avoiding it by caching mn
// with sorted tags.
@ -572,12 +576,25 @@ func (mn *MetricName) sortTags() {
}
cts.tags = dst
// Use sort.Sort instead of sort.Slice, since sort.Slice allocates a lot.
sort.Sort(&cts.tags)
// Use sort.Stable instead of sort.Sort in order to preserve the order of tags with duplicate keys.
// The last tag value wins for tags with duplicate keys.
// Use sort.Stable instead of sort.SliceStable, since sort.SliceStable allocates a lot.
sort.Stable(&cts.tags)
j := 0
var prevKey []byte
for i := range cts.tags {
mn.Tags[i].copyFrom(&cts.tags[i].tag)
tag := &cts.tags[i].tag
if j > 0 && bytes.Equal(tag.Key, prevKey) {
// Overwrite the previous tag with duplicate key.
j--
} else {
prevKey = tag.Key
}
mn.Tags[j].copyFrom(tag)
j++
}
mn.Tags = mn.Tags[:j]
putCanonicalTags(cts)
}

View file

@ -64,15 +64,16 @@ func TestMetricNameMarshalDuplicateKeys(t *testing.T) {
var mn MetricName
mn.MetricGroup = []byte("xxx")
mn.AddTag("foo", "bar")
mn.AddTag("duplicate", "tag")
mn.AddTag("duplicate", "tag")
mn.AddTag("tt", "xx")
mn.AddTag("duplicate", "tag1")
mn.AddTag("duplicate", "tag2")
mn.AddTag("tt", "xx")
mn.AddTag("foo", "abc")
mn.AddTag("duplicate", "tag3")
var mnExpected MetricName
mnExpected.MetricGroup = []byte("xxx")
mnExpected.AddTag("duplicate", "tag")
mnExpected.AddTag("foo", "bar")
mnExpected.AddTag("duplicate", "tag3")
mnExpected.AddTag("foo", "abc")
mnExpected.AddTag("tt", "xx")
mn.sortTags()