mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-02-09 15:27:11 +00:00
![Andrei Baidarov](/assets/img/avatar_default.png)
Previously, during de-duplication staleness markers could be removed due to incorrect logic at values equality check. During the evaluation of read query vmselect deduplicates samples using dedupInterval option. It picks the highest value across all points with the same timestamp next to the border of dedupInterval. The issue is any comparison with NaN via <, > returns false. This means that the position of NaN in srcValues could affect the result. This commit changes this logic with additional step, that explicitly checks for staleness marker for the following cases: 1. Deduplication on vmselect 2. Deduplication in vmstorage during merges 3. Deduplication in stream aggregation check performed only for stale markers, because other NaNs are rejected on ingestion by vmstorage or by stream aggregation. Checking for stale markers in general slows down dedup speed by 3%: ``` benchstat old.txt new.txt goos: darwin goarch: arm64 pkg: github.com/VictoriaMetrics/VictoriaMetrics/lib/storage cpu: Apple M4 Pro │ old.txt │ new.txt │ │ sec/op │ sec/op vs base │ DeduplicateSamples/minScrapeInterval=1s-14 462.8n ± ∞ ¹ 425.2n ± ∞ ¹ ~ (p=1.000 n=1) ² DeduplicateSamples/minScrapeInterval=2s-14 905.6n ± ∞ ¹ 903.3n ± ∞ ¹ ~ (p=1.000 n=1) ² DeduplicateSamples/minScrapeInterval=5s-14 710.0n ± ∞ ¹ 698.9n ± ∞ ¹ ~ (p=1.000 n=1) ² DeduplicateSamples/minScrapeInterval=10s-14 632.7n ± ∞ ¹ 638.5n ± ∞ ¹ ~ (p=1.000 n=1) ² DeduplicateSamplesDuringMerge/minScrapeInterval=1s-14 439.7n ± ∞ ¹ 409.9n ± ∞ ¹ ~ (p=1.000 n=1) ² DeduplicateSamplesDuringMerge/minScrapeInterval=2s-14 908.9n ± ∞ ¹ 882.2n ± ∞ ¹ ~ (p=1.000 n=1) ² DeduplicateSamplesDuringMerge/minScrapeInterval=5s-14 721.2n ± ∞ ¹ 684.7n ± ∞ ¹ ~ (p=1.000 n=1) ² DeduplicateSamplesDuringMerge/minScrapeInterval=10s-14 659.1n ± ∞ ¹ 630.6n ± ∞ ¹ ~ (p=1.000 n=1) ² geomean 659.5n 636.0n -3.56% ``` Related issue: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7674 --------- Co-authored-by: hagen1778 <roman@victoriametrics.com>
242 lines
4.9 KiB
Go
242 lines
4.9 KiB
Go
package streamaggr
|
|
|
|
import (
|
|
"sync"
|
|
"sync/atomic"
|
|
"unsafe"
|
|
|
|
"github.com/cespare/xxhash/v2"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
|
|
)
|
|
|
|
const dedupAggrShardsCount = 128
|
|
|
|
type dedupAggr struct {
|
|
shards []dedupAggrShard
|
|
}
|
|
|
|
type dedupAggrShard struct {
|
|
dedupAggrShardNopad
|
|
|
|
// The padding prevents false sharing on widespread platforms with
|
|
// 128 mod (cache line size) = 0 .
|
|
_ [128 - unsafe.Sizeof(dedupAggrShardNopad{})%128]byte
|
|
}
|
|
|
|
type dedupAggrShardNopad struct {
|
|
mu sync.Mutex
|
|
m map[string]*dedupAggrSample
|
|
|
|
samplesBuf []dedupAggrSample
|
|
|
|
sizeBytes atomic.Uint64
|
|
itemsCount atomic.Uint64
|
|
}
|
|
|
|
type dedupAggrSample struct {
|
|
value float64
|
|
timestamp int64
|
|
}
|
|
|
|
func newDedupAggr() *dedupAggr {
|
|
shards := make([]dedupAggrShard, dedupAggrShardsCount)
|
|
return &dedupAggr{
|
|
shards: shards,
|
|
}
|
|
}
|
|
|
|
func (da *dedupAggr) sizeBytes() uint64 {
|
|
n := uint64(unsafe.Sizeof(*da))
|
|
for i := range da.shards {
|
|
n += da.shards[i].sizeBytes.Load()
|
|
}
|
|
return n
|
|
}
|
|
|
|
func (da *dedupAggr) itemsCount() uint64 {
|
|
n := uint64(0)
|
|
for i := range da.shards {
|
|
n += da.shards[i].itemsCount.Load()
|
|
}
|
|
return n
|
|
}
|
|
|
|
func (da *dedupAggr) pushSamples(samples []pushSample) {
|
|
pss := getPerShardSamples()
|
|
shards := pss.shards
|
|
for _, sample := range samples {
|
|
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(sample.key))
|
|
idx := h % uint64(len(shards))
|
|
shards[idx] = append(shards[idx], sample)
|
|
}
|
|
for i, shardSamples := range shards {
|
|
if len(shardSamples) == 0 {
|
|
continue
|
|
}
|
|
da.shards[i].pushSamples(shardSamples)
|
|
}
|
|
putPerShardSamples(pss)
|
|
}
|
|
|
|
func getDedupFlushCtx() *dedupFlushCtx {
|
|
v := dedupFlushCtxPool.Get()
|
|
if v == nil {
|
|
return &dedupFlushCtx{}
|
|
}
|
|
return v.(*dedupFlushCtx)
|
|
}
|
|
|
|
func putDedupFlushCtx(ctx *dedupFlushCtx) {
|
|
ctx.reset()
|
|
dedupFlushCtxPool.Put(ctx)
|
|
}
|
|
|
|
var dedupFlushCtxPool sync.Pool
|
|
|
|
type dedupFlushCtx struct {
|
|
samples []pushSample
|
|
}
|
|
|
|
func (ctx *dedupFlushCtx) reset() {
|
|
clear(ctx.samples)
|
|
ctx.samples = ctx.samples[:0]
|
|
}
|
|
|
|
func (da *dedupAggr) flush(f func(samples []pushSample)) {
|
|
var wg sync.WaitGroup
|
|
for i := range da.shards {
|
|
flushConcurrencyCh <- struct{}{}
|
|
wg.Add(1)
|
|
go func(shard *dedupAggrShard) {
|
|
defer func() {
|
|
<-flushConcurrencyCh
|
|
wg.Done()
|
|
}()
|
|
|
|
ctx := getDedupFlushCtx()
|
|
shard.flush(ctx, f)
|
|
putDedupFlushCtx(ctx)
|
|
}(&da.shards[i])
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
type perShardSamples struct {
|
|
shards [][]pushSample
|
|
}
|
|
|
|
func (pss *perShardSamples) reset() {
|
|
shards := pss.shards
|
|
for i, shardSamples := range shards {
|
|
if len(shardSamples) > 0 {
|
|
clear(shardSamples)
|
|
shards[i] = shardSamples[:0]
|
|
}
|
|
}
|
|
}
|
|
|
|
func getPerShardSamples() *perShardSamples {
|
|
v := perShardSamplesPool.Get()
|
|
if v == nil {
|
|
return &perShardSamples{
|
|
shards: make([][]pushSample, dedupAggrShardsCount),
|
|
}
|
|
}
|
|
return v.(*perShardSamples)
|
|
}
|
|
|
|
func putPerShardSamples(pss *perShardSamples) {
|
|
pss.reset()
|
|
perShardSamplesPool.Put(pss)
|
|
}
|
|
|
|
var perShardSamplesPool sync.Pool
|
|
|
|
func (das *dedupAggrShard) pushSamples(samples []pushSample) {
|
|
das.mu.Lock()
|
|
defer das.mu.Unlock()
|
|
|
|
m := das.m
|
|
if m == nil {
|
|
m = make(map[string]*dedupAggrSample, len(samples))
|
|
das.m = m
|
|
}
|
|
samplesBuf := das.samplesBuf
|
|
for _, sample := range samples {
|
|
s, ok := m[sample.key]
|
|
if !ok {
|
|
samplesBuf = slicesutil.SetLength(samplesBuf, len(samplesBuf)+1)
|
|
s = &samplesBuf[len(samplesBuf)-1]
|
|
s.value = sample.value
|
|
s.timestamp = sample.timestamp
|
|
|
|
key := bytesutil.InternString(sample.key)
|
|
m[key] = s
|
|
|
|
das.itemsCount.Add(1)
|
|
das.sizeBytes.Add(uint64(len(key)) + uint64(unsafe.Sizeof(key)+unsafe.Sizeof(s)+unsafe.Sizeof(*s)))
|
|
continue
|
|
}
|
|
if !isDuplicate(s, sample) {
|
|
s.value = sample.value
|
|
s.timestamp = sample.timestamp
|
|
}
|
|
}
|
|
das.samplesBuf = samplesBuf
|
|
}
|
|
|
|
// isDuplicate returns true if b is duplicate of a
|
|
// See https://docs.victoriametrics.com/#deduplication
|
|
func isDuplicate(a *dedupAggrSample, b pushSample) bool {
|
|
if b.timestamp > a.timestamp {
|
|
return false
|
|
}
|
|
if b.timestamp == a.timestamp {
|
|
if decimal.IsStaleNaN(b.value) {
|
|
return false
|
|
}
|
|
if b.value > a.value {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample)) {
|
|
das.mu.Lock()
|
|
|
|
m := das.m
|
|
if len(m) > 0 {
|
|
das.m = make(map[string]*dedupAggrSample, len(m))
|
|
das.sizeBytes.Store(0)
|
|
das.itemsCount.Store(0)
|
|
das.samplesBuf = make([]dedupAggrSample, 0, len(das.samplesBuf))
|
|
}
|
|
|
|
das.mu.Unlock()
|
|
|
|
if len(m) == 0 {
|
|
return
|
|
}
|
|
|
|
dstSamples := ctx.samples
|
|
for key, s := range m {
|
|
dstSamples = append(dstSamples, pushSample{
|
|
key: key,
|
|
value: s.value,
|
|
timestamp: s.timestamp,
|
|
})
|
|
|
|
// Limit the number of samples per each flush in order to limit memory usage.
|
|
if len(dstSamples) >= 10_000 {
|
|
f(dstSamples)
|
|
clear(dstSamples)
|
|
dstSamples = dstSamples[:0]
|
|
}
|
|
}
|
|
f(dstSamples)
|
|
ctx.samples = dstSamples
|
|
}
|