lib/mergeset: limit the frequency for flushCallback calls to once per 10 seconds

This should improve hit ratio for tagFiltersCache when big number of new time series are constantly registered
(aka high churn rate). This, in turn, should reduce CPU usage for queries over such time series.
This commit is contained in:
Aliaksandr Valialkin 2021-07-06 12:17:15 +03:00
parent 22c6e64bbc
commit fd32855a6c

View file

@ -97,6 +97,8 @@ type Table struct {
path string path string
flushCallback func() flushCallback func()
flushCallbackWorkerWG sync.WaitGroup
needFlushCallbackCall uint32
prepareBlock PrepareBlockCallback prepareBlock PrepareBlockCallback
@ -209,7 +211,7 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) error {
} }
ris.mu.Unlock() ris.mu.Unlock()
tb.mergeRawItemsBlocks(blocksToFlush) tb.mergeRawItemsBlocks(blocksToFlush, false)
return err return err
} }
@ -299,6 +301,27 @@ func OpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallb
tb.convertersWG.Done() tb.convertersWG.Done()
}() }()
if flushCallback != nil {
tb.flushCallbackWorkerWG.Add(1)
go func() {
// call flushCallback once per 10 seconds in order to improve the effectiveness of caches,
// which are reset by the flushCallback.
tc := time.NewTicker(10 * time.Second)
for {
select {
case <-tb.stopCh:
tb.flushCallback()
tb.flushCallbackWorkerWG.Done()
return
case <-tc.C:
if atomic.CompareAndSwapUint32(&tb.needFlushCallbackCall, 1, 0) {
tb.flushCallback()
}
}
}
}()
}
return tb, nil return tb, nil
} }
@ -347,6 +370,11 @@ func (tb *Table) MustClose() {
} }
logger.Infof("%d inmemory parts have been flushed to files in %.3f seconds on %q", len(pws), time.Since(startTime).Seconds(), tb.path) logger.Infof("%d inmemory parts have been flushed to files in %.3f seconds on %q", len(pws), time.Since(startTime).Seconds(), tb.path)
logger.Infof("waiting for flush callback worker to stop on %q...", tb.path)
startTime = time.Now()
tb.flushCallbackWorkerWG.Wait()
logger.Infof("flush callback worker stopped in %.3f seconds on %q", time.Since(startTime).Seconds(), tb.path)
// Remove references to parts from the tb, so they may be eventually closed // Remove references to parts from the tb, so they may be eventually closed
// after all the searches are done. // after all the searches are done.
tb.partsLock.Lock() tb.partsLock.Lock()
@ -587,7 +615,7 @@ func (riss *rawItemsShards) flush(tb *Table, isFinal bool) {
for i := range riss.shards { for i := range riss.shards {
blocksToFlush = riss.shards[i].appendBlocksToFlush(blocksToFlush, tb, isFinal) blocksToFlush = riss.shards[i].appendBlocksToFlush(blocksToFlush, tb, isFinal)
} }
tb.mergeRawItemsBlocks(blocksToFlush) tb.mergeRawItemsBlocks(blocksToFlush, isFinal)
} }
func (ris *rawItemsShard) appendBlocksToFlush(dst []*inmemoryBlock, tb *Table, isFinal bool) []*inmemoryBlock { func (ris *rawItemsShard) appendBlocksToFlush(dst []*inmemoryBlock, tb *Table, isFinal bool) []*inmemoryBlock {
@ -612,7 +640,7 @@ func (ris *rawItemsShard) appendBlocksToFlush(dst []*inmemoryBlock, tb *Table, i
return dst return dst
} }
func (tb *Table) mergeRawItemsBlocks(ibs []*inmemoryBlock) { func (tb *Table) mergeRawItemsBlocks(ibs []*inmemoryBlock, isFinal bool) {
if len(ibs) == 0 { if len(ibs) == 0 {
return return
} }
@ -647,7 +675,11 @@ func (tb *Table) mergeRawItemsBlocks(ibs []*inmemoryBlock) {
logger.Panicf("FATAL: cannot merge raw parts: %s", err) logger.Panicf("FATAL: cannot merge raw parts: %s", err)
} }
if tb.flushCallback != nil { if tb.flushCallback != nil {
if isFinal {
tb.flushCallback() tb.flushCallback()
} else {
atomic.CompareAndSwapUint32(&tb.needFlushCallbackCall, 0, 1)
}
} }
} }