diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 10f37fd0d..d0ebe237b 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -96,7 +96,9 @@ type Table struct { path string - flushCallback func() + flushCallback func() + flushCallbackWorkerWG sync.WaitGroup + needFlushCallbackCall uint32 prepareBlock PrepareBlockCallback @@ -209,7 +211,7 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) error { } ris.mu.Unlock() - tb.mergeRawItemsBlocks(blocksToFlush) + tb.mergeRawItemsBlocks(blocksToFlush, false) return err } @@ -299,6 +301,27 @@ func OpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallb tb.convertersWG.Done() }() + if flushCallback != nil { + tb.flushCallbackWorkerWG.Add(1) + go func() { + // call flushCallback once per 10 seconds in order to improve the effectiveness of caches, + // which are reset by the flushCallback. + tc := time.NewTicker(10 * time.Second) + for { + select { + case <-tb.stopCh: + tb.flushCallback() + tb.flushCallbackWorkerWG.Done() + return + case <-tc.C: + if atomic.CompareAndSwapUint32(&tb.needFlushCallbackCall, 1, 0) { + tb.flushCallback() + } + } + } + }() + } + return tb, nil } @@ -347,6 +370,11 @@ func (tb *Table) MustClose() { } logger.Infof("%d inmemory parts have been flushed to files in %.3f seconds on %q", len(pws), time.Since(startTime).Seconds(), tb.path) + logger.Infof("waiting for flush callback worker to stop on %q...", tb.path) + startTime = time.Now() + tb.flushCallbackWorkerWG.Wait() + logger.Infof("flush callback worker stopped in %.3f seconds on %q", time.Since(startTime).Seconds(), tb.path) + // Remove references to parts from the tb, so they may be eventually closed // after all the searches are done. tb.partsLock.Lock() @@ -587,7 +615,7 @@ func (riss *rawItemsShards) flush(tb *Table, isFinal bool) { for i := range riss.shards { blocksToFlush = riss.shards[i].appendBlocksToFlush(blocksToFlush, tb, isFinal) } - tb.mergeRawItemsBlocks(blocksToFlush) + tb.mergeRawItemsBlocks(blocksToFlush, isFinal) } func (ris *rawItemsShard) appendBlocksToFlush(dst []*inmemoryBlock, tb *Table, isFinal bool) []*inmemoryBlock { @@ -612,7 +640,7 @@ func (ris *rawItemsShard) appendBlocksToFlush(dst []*inmemoryBlock, tb *Table, i return dst } -func (tb *Table) mergeRawItemsBlocks(ibs []*inmemoryBlock) { +func (tb *Table) mergeRawItemsBlocks(ibs []*inmemoryBlock, isFinal bool) { if len(ibs) == 0 { return } @@ -647,7 +675,11 @@ func (tb *Table) mergeRawItemsBlocks(ibs []*inmemoryBlock) { logger.Panicf("FATAL: cannot merge raw parts: %s", err) } if tb.flushCallback != nil { - tb.flushCallback() + if isFinal { + tb.flushCallback() + } else { + atomic.CompareAndSwapUint32(&tb.needFlushCallbackCall, 0, 1) + } } }