lib/storage: fix collect downsampling metrics (#489)

* lib/storage: fix downsampling

* lib/storage: update logic

* lib/storage: fix comments, removed unneeded check
This commit is contained in:
Dmytro Kozlov 2022-12-20 20:11:38 +02:00 committed by Aliaksandr Valialkin
parent 1ca79bae1a
commit 78f8e3a2c3
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
2 changed files with 6 additions and 5 deletions

View file

@ -1090,10 +1090,6 @@ func atomicSetBool(p *uint64, b bool) {
func (pt *partition) runFinalDedup() error {
requiredDedupInterval, actualDedupInterval := pt.getRequiredDedupInterval()
if requiredDedupInterval <= actualDedupInterval {
// Deduplication isn't needed.
return nil
}
t := time.Now()
logger.Infof("starting final dedup for partition %s using requiredDedupInterval=%d ms, since the partition has smaller actualDedupInterval=%d ms",
pt.bigPartsPath, requiredDedupInterval, actualDedupInterval)
@ -1104,6 +1100,11 @@ func (pt *partition) runFinalDedup() error {
return nil
}
func (pt *partition) isFinalDedupNeeded() bool {
requiredDedupInterval, actualDedupInterval := pt.getRequiredDedupInterval()
return requiredDedupInterval > actualDedupInterval
}
func (pt *partition) getRequiredDedupInterval() (int64, int64) {
pws := pt.GetParts(nil)
defer pt.PutParts(pws)

View file

@ -455,7 +455,7 @@ func (tb *table) finalDedupWatcher() {
timestamp := timestampFromTime(time.Now())
currentPartitionName := timestampToPartitionName(timestamp)
for _, ptw := range ptws {
if ptw.pt.name == currentPartitionName {
if ptw.pt.name == currentPartitionName || !ptw.pt.isFinalDedupNeeded() {
// Do not run final dedup for the current month.
continue
}