mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
review comments
This commit is contained in:
parent
c8685741b3
commit
129b2236ef
11 changed files with 38 additions and 44 deletions
2
go.mod
2
go.mod
|
@ -8,7 +8,7 @@ require (
|
||||||
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2
|
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2
|
||||||
github.com/VictoriaMetrics/easyproto v0.1.4
|
github.com/VictoriaMetrics/easyproto v0.1.4
|
||||||
github.com/VictoriaMetrics/fastcache v1.12.2
|
github.com/VictoriaMetrics/fastcache v1.12.2
|
||||||
github.com/VictoriaMetrics/metrics v1.34.0
|
github.com/VictoriaMetrics/metrics v1.34.1-0.20240709080436-925f863c7bd0
|
||||||
github.com/VictoriaMetrics/metricsql v0.76.0
|
github.com/VictoriaMetrics/metricsql v0.76.0
|
||||||
github.com/aws/aws-sdk-go-v2 v1.30.1
|
github.com/aws/aws-sdk-go-v2 v1.30.1
|
||||||
github.com/aws/aws-sdk-go-v2/config v1.27.23
|
github.com/aws/aws-sdk-go-v2/config v1.27.23
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -73,6 +73,10 @@ github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjC
|
||||||
github.com/VictoriaMetrics/fastcache v1.12.2/go.mod h1:AmC+Nzz1+3G2eCPapF6UcsnkThDcMsQicp4xDukwJYI=
|
github.com/VictoriaMetrics/fastcache v1.12.2/go.mod h1:AmC+Nzz1+3G2eCPapF6UcsnkThDcMsQicp4xDukwJYI=
|
||||||
github.com/VictoriaMetrics/metrics v1.34.0 h1:0i8k/gdOJdSoZB4Z9pikVnVQXfhcIvnG7M7h2WaQW2w=
|
github.com/VictoriaMetrics/metrics v1.34.0 h1:0i8k/gdOJdSoZB4Z9pikVnVQXfhcIvnG7M7h2WaQW2w=
|
||||||
github.com/VictoriaMetrics/metrics v1.34.0/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8=
|
github.com/VictoriaMetrics/metrics v1.34.0/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8=
|
||||||
|
github.com/VictoriaMetrics/metrics v1.34.1-0.20240709080040-3f62e95de24e h1:IgNJoIYb2IhknxOLEAAG0ktj0f1609jpgmXjpPVrJ7s=
|
||||||
|
github.com/VictoriaMetrics/metrics v1.34.1-0.20240709080040-3f62e95de24e/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8=
|
||||||
|
github.com/VictoriaMetrics/metrics v1.34.1-0.20240709080436-925f863c7bd0 h1:qP+3SX4eslXLPmsJpGjnMv+9UbmyrSj/Yf5CqPm6bLE=
|
||||||
|
github.com/VictoriaMetrics/metrics v1.34.1-0.20240709080436-925f863c7bd0/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8=
|
||||||
github.com/VictoriaMetrics/metricsql v0.76.0 h1:hl7vqJqyH2d8zKImzalkFrkFiD5q4ACF8gl3s86DqKA=
|
github.com/VictoriaMetrics/metricsql v0.76.0 h1:hl7vqJqyH2d8zKImzalkFrkFiD5q4ACF8gl3s86DqKA=
|
||||||
github.com/VictoriaMetrics/metricsql v0.76.0/go.mod h1:1g4hdCwlbJZ851PU9VN65xy9Rdlzupo6fx3SNZ8Z64U=
|
github.com/VictoriaMetrics/metricsql v0.76.0/go.mod h1:1g4hdCwlbJZ851PU9VN65xy9Rdlzupo6fx3SNZ8Z64U=
|
||||||
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
|
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
|
||||||
|
|
|
@ -72,7 +72,7 @@ func (da *dedupAggr) itemsCount() uint64 {
|
||||||
return n
|
return n
|
||||||
}
|
}
|
||||||
|
|
||||||
func (da *dedupAggr) pushSamples(samples []pushSample, dedupIdx int) {
|
func (da *dedupAggr) pushSamples(samples []pushSample, _ int64, dedupIdx int) {
|
||||||
pss := getPerShardSamples()
|
pss := getPerShardSamples()
|
||||||
shards := pss.shards
|
shards := pss.shards
|
||||||
for _, sample := range samples {
|
for _, sample := range samples {
|
||||||
|
|
|
@ -21,7 +21,7 @@ func TestDedupAggrSerial(t *testing.T) {
|
||||||
sample.value = float64(i + j)
|
sample.value = float64(i + j)
|
||||||
expectedSamplesMap[sample.key] = *sample
|
expectedSamplesMap[sample.key] = *sample
|
||||||
}
|
}
|
||||||
da.pushSamples(samples, 0)
|
da.pushSamples(samples, 0, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
if n := da.sizeBytes(); n > 5_000_000 {
|
if n := da.sizeBytes(); n > 5_000_000 {
|
||||||
|
@ -73,7 +73,7 @@ func TestDedupAggrConcurrent(_ *testing.T) {
|
||||||
sample.key = fmt.Sprintf("key_%d", j)
|
sample.key = fmt.Sprintf("key_%d", j)
|
||||||
sample.value = float64(i + j)
|
sample.value = float64(i + j)
|
||||||
}
|
}
|
||||||
da.pushSamples(samples, 0)
|
da.pushSamples(samples, 0, 0)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,7 +27,7 @@ func benchmarkDedupAggr(b *testing.B, samplesPerPush int) {
|
||||||
b.RunParallel(func(pb *testing.PB) {
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
for pb.Next() {
|
for pb.Next() {
|
||||||
for i := 0; i < loops; i++ {
|
for i := 0; i < loops; i++ {
|
||||||
da.pushSamples(benchSamples, 0)
|
da.pushSamples(benchSamples, 0, 0)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
|
@ -115,7 +115,7 @@ func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for idx, ps := range pss {
|
for idx, ps := range pss {
|
||||||
d.da.pushSamples(ps, idx)
|
d.da.pushSamples(ps, 0, idx)
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx.pss = pss
|
ctx.pss = pss
|
||||||
|
|
|
@ -129,7 +129,7 @@ func (as *rateAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int
|
||||||
|
|
||||||
// Delete outdated entries in state
|
// Delete outdated entries in state
|
||||||
var rate float64
|
var rate float64
|
||||||
totalItems := len(sv.state)
|
var totalItems int
|
||||||
for k1, state := range sv.state {
|
for k1, state := range sv.state {
|
||||||
if flushTimestamp > state.deleteDeadline {
|
if flushTimestamp > state.deleteDeadline {
|
||||||
delete(sv.state, k1)
|
delete(sv.state, k1)
|
||||||
|
@ -149,10 +149,8 @@ func (as *rateAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int
|
||||||
rate += (v1.total) * 1000 / float64(rateInterval)
|
rate += (v1.total) * 1000 / float64(rateInterval)
|
||||||
state.prevTimestamp = v1.timestamp
|
state.prevTimestamp = v1.timestamp
|
||||||
state.prevValue = v1.value
|
state.prevValue = v1.value
|
||||||
} else {
|
totalItems++
|
||||||
totalItems--
|
|
||||||
}
|
}
|
||||||
totalItems -= staleInputSamples
|
|
||||||
state.lastValues[idx] = rateLastValueState{}
|
state.lastValues[idx] = rateLastValueState{}
|
||||||
sv.state[k1] = state
|
sv.state[k1] = state
|
||||||
}
|
}
|
||||||
|
|
|
@ -742,7 +742,7 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
|
||||||
|
|
||||||
dedupTime := ct.Truncate(tickInterval)
|
dedupTime := ct.Truncate(tickInterval)
|
||||||
if a.ignoreOldSamples {
|
if a.ignoreOldSamples {
|
||||||
dedupIdx, flushIdx = a.getFlushIndices(dedupInterval, interval, dedupTime, flushDeadline)
|
dedupIdx, flushIdx = getAggrIdxs(dedupInterval, interval, dedupTime, flushDeadline)
|
||||||
}
|
}
|
||||||
pf := pushFunc
|
pf := pushFunc
|
||||||
|
|
||||||
|
@ -789,28 +789,24 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
|
||||||
if !skipIncompleteFlush && ignoreFirstIntervals <= 0 {
|
if !skipIncompleteFlush && ignoreFirstIntervals <= 0 {
|
||||||
dedupTime := time.Now().Truncate(tickInterval).Add(tickInterval)
|
dedupTime := time.Now().Truncate(tickInterval).Add(tickInterval)
|
||||||
if a.ignoreOldSamples {
|
if a.ignoreOldSamples {
|
||||||
dedupIdx, flushIdx = a.getFlushIndices(dedupInterval, interval, dedupTime, flushDeadline)
|
dedupIdx, flushIdx = getAggrIdxs(dedupInterval, interval, dedupTime, flushDeadline)
|
||||||
}
|
}
|
||||||
a.dedupFlush(dedupInterval, flushDeadline.UnixMilli(), dedupIdx, flushIdx)
|
a.dedupFlush(dedupInterval, flushDeadline.UnixMilli(), dedupIdx, flushIdx)
|
||||||
a.flush(pushFunc, interval, flushDeadline.UnixMilli(), flushIdx)
|
a.flush(pushFunc, interval, flushDeadline.UnixMilli(), flushIdx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *aggregator) getFlushIndices(dedupInterval, interval time.Duration, dedupTime, flushTime time.Time) (int, int) {
|
func getAggrIdxs(dedupInterval, interval time.Duration, dedupTime, flushTime time.Time) (int, int) {
|
||||||
flushTimestamp := flushTime.UnixMilli()
|
flushIdx := getStateIdx(interval.Milliseconds(), flushTime.Add(-interval).UnixMilli())
|
||||||
flushIntervals := int(flushTimestamp / int64(interval/time.Millisecond))
|
dedupIdx := flushIdx
|
||||||
var dedupIndex, flushIndex int
|
|
||||||
if dedupInterval > 0 {
|
if dedupInterval > 0 {
|
||||||
dedupTimestamp := dedupTime.UnixMilli()
|
dedupIdx = getStateIdx(dedupInterval.Milliseconds(), dedupTime.Add(-dedupInterval).UnixMilli())
|
||||||
dedupIntervals := int(dedupTimestamp / int64(dedupInterval/time.Millisecond))
|
|
||||||
intervalsRatio := int(interval / dedupInterval)
|
|
||||||
dedupIndex = dedupIntervals % aggrStateSize
|
|
||||||
flushIndex = flushIntervals % (aggrStateSize / intervalsRatio)
|
|
||||||
} else {
|
|
||||||
flushIndex = flushIntervals % aggrStateSize
|
|
||||||
dedupIndex = flushIndex
|
|
||||||
}
|
}
|
||||||
return dedupIndex, flushIndex
|
return dedupIdx, flushIdx
|
||||||
|
}
|
||||||
|
|
||||||
|
func getStateIdx(interval int64, ts int64) int {
|
||||||
|
return int(ts/interval) % aggrStateSize
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *aggregator) dedupFlush(dedupInterval time.Duration, deleteDeadline int64, dedupIdx, flushIdx int) {
|
func (a *aggregator) dedupFlush(dedupInterval time.Duration, deleteDeadline int64, dedupIdx, flushIdx int) {
|
||||||
|
@ -933,13 +929,11 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
|
||||||
a.flushAfter.Update(float64(currentTimestamp - sample.Timestamp))
|
a.flushAfter.Update(float64(currentTimestamp - sample.Timestamp))
|
||||||
a.muFlushAfter.Unlock()
|
a.muFlushAfter.Unlock()
|
||||||
if math.IsNaN(sample.Value) {
|
if math.IsNaN(sample.Value) {
|
||||||
a.ignoredNanSamples.Inc()
|
|
||||||
// Skip NaN values
|
// Skip NaN values
|
||||||
a.ignoredNanSamples.Inc()
|
a.ignoredNanSamples.Inc()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if ignoreOldSamples && sample.Timestamp < minTimestamp {
|
if ignoreOldSamples && sample.Timestamp < minTimestamp {
|
||||||
a.ignoredOldSamples.Inc()
|
|
||||||
// Skip old samples outside the current aggregation interval
|
// Skip old samples outside the current aggregation interval
|
||||||
a.ignoredOldSamples.Inc()
|
a.ignoredOldSamples.Inc()
|
||||||
continue
|
continue
|
||||||
|
@ -948,7 +942,7 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
|
||||||
maxLag = now - sample.Timestamp
|
maxLag = now - sample.Timestamp
|
||||||
}
|
}
|
||||||
if ignoreOldSamples {
|
if ignoreOldSamples {
|
||||||
flushIdx = int((sample.Timestamp)/a.tickInterval+1) % aggrStateSize
|
flushIdx = getStateIdx(a.tickInterval, sample.Timestamp)
|
||||||
}
|
}
|
||||||
samples[flushIdx] = append(samples[flushIdx], pushSample{
|
samples[flushIdx] = append(samples[flushIdx], pushSample{
|
||||||
key: key,
|
key: key,
|
||||||
|
@ -957,20 +951,20 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(samples) > 0 {
|
|
||||||
a.matchedSamples.Add(len(samples))
|
|
||||||
a.samplesLag.Update(float64(maxLag) / 1_000)
|
|
||||||
}
|
|
||||||
ctx.samples = samples
|
ctx.samples = samples
|
||||||
ctx.buf = buf
|
ctx.buf = buf
|
||||||
|
|
||||||
|
pushSamples := a.pushSamples
|
||||||
if a.da != nil {
|
if a.da != nil {
|
||||||
for idx, s := range samples {
|
pushSamples = a.da.pushSamples
|
||||||
a.da.pushSamples(s, idx)
|
}
|
||||||
}
|
|
||||||
} else {
|
for idx, s := range samples {
|
||||||
for idx, s := range samples {
|
if len(s) > 0 {
|
||||||
a.pushSamples(s, deleteDeadlineMilli, idx)
|
a.samplesLag.Update(float64(maxLag) / 1_000)
|
||||||
|
a.matchedSamples.Add(len(s))
|
||||||
|
pushSamples(s, deleteDeadlineMilli, idx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -79,10 +79,8 @@ func (as *uniqueSamplesAggrState) flushState(ctx *flushCtx, flushTimestamp int64
|
||||||
state := len(sv.state[idx])
|
state := len(sv.state[idx])
|
||||||
sv.state[idx] = make(map[float64]struct{})
|
sv.state[idx] = make(map[float64]struct{})
|
||||||
sv.mu.Unlock()
|
sv.mu.Unlock()
|
||||||
if state > 0 {
|
key := k.(string)
|
||||||
key := k.(string)
|
ctx.appendSeries(key, "unique_samples", flushTimestamp, float64(state))
|
||||||
ctx.appendSeries(key, "unique_series", flushTimestamp, float64(state))
|
|
||||||
}
|
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
2
vendor/github.com/VictoriaMetrics/metrics/histogram.go
generated
vendored
2
vendor/github.com/VictoriaMetrics/metrics/histogram.go
generated
vendored
|
@ -130,7 +130,7 @@ func (h *Histogram) Merge(b *Histogram) {
|
||||||
h.decimalBuckets[i] = &b
|
h.decimalBuckets[i] = &b
|
||||||
}
|
}
|
||||||
for j := range db {
|
for j := range db {
|
||||||
h.decimalBuckets[i][j] = db[j]
|
h.decimalBuckets[i][j] += db[j]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
2
vendor/modules.txt
vendored
2
vendor/modules.txt
vendored
|
@ -115,7 +115,7 @@ github.com/VictoriaMetrics/easyproto
|
||||||
# github.com/VictoriaMetrics/fastcache v1.12.2
|
# github.com/VictoriaMetrics/fastcache v1.12.2
|
||||||
## explicit; go 1.13
|
## explicit; go 1.13
|
||||||
github.com/VictoriaMetrics/fastcache
|
github.com/VictoriaMetrics/fastcache
|
||||||
# github.com/VictoriaMetrics/metrics v1.34.0
|
# github.com/VictoriaMetrics/metrics v1.34.1-0.20240709080436-925f863c7bd0
|
||||||
## explicit; go 1.17
|
## explicit; go 1.17
|
||||||
github.com/VictoriaMetrics/metrics
|
github.com/VictoriaMetrics/metrics
|
||||||
# github.com/VictoriaMetrics/metricsql v0.76.0
|
# github.com/VictoriaMetrics/metricsql v0.76.0
|
||||||
|
|
Loading…
Reference in a new issue