diff --git a/lib/logstorage/datadb.go b/lib/logstorage/datadb.go index d2fd8d5c57..81d3ff322e 100644 --- a/lib/logstorage/datadb.go +++ b/lib/logstorage/datadb.go @@ -228,7 +228,7 @@ func (ddb *datadb) flushInmemoryParts() { } err := ddb.mergePartsFinal(partsToFlush) if err != nil { - logger.Errorf("cannot flush inmemory parts to disk: %s", err) + logger.Panicf("FATAL: cannot flush inmemory parts to disk: %s", err) } select { @@ -256,7 +256,7 @@ func (ddb *datadb) startMergeWorkerLocked() { err := ddb.mergeExistingParts() <-globalMergeLimitCh if err != nil && !errors.Is(err, errReadOnly) { - logger.Errorf("cannot merge parts: %s", err) + logger.Panicf("FATAL: background merge failed: %s", err) } ddb.wg.Done() }() @@ -307,7 +307,6 @@ func (ddb *datadb) mergeExistingParts() error { } err := ddb.mergeParts(pws, false) ddb.releaseDiskSpace(partsSize) - ddb.releasePartsToMerge(pws) if err != nil { return err } @@ -356,15 +355,16 @@ var errReadOnly = errors.New("the storage is in read-only mode") // // All the parts inside pws must have isInMerge field set to true. func (ddb *datadb) mergeParts(pws []*partWrapper, isFinal bool) error { - if ddb.IsReadOnly() { - return errReadOnly - } - if len(pws) == 0 { // Nothing to merge. return nil } + + if ddb.IsReadOnly() { + return errReadOnly + } assertIsInMerge(pws) + defer ddb.releasePartsToMerge(pws) startTime := time.Now() @@ -436,7 +436,6 @@ func (ddb *datadb) mergeParts(pws []*partWrapper, isFinal bool) error { fs.MustSyncPath(dstPartPath) } if needStop(stopCh) { - ddb.releasePartsToMerge(pws) // Remove incomplete destination part if dstPartType == partFile { fs.MustRemoveAll(dstPartPath) @@ -548,7 +547,43 @@ func (ddb *datadb) mustAddRows(lr *LogRows) { if len(ddb.inmemoryParts) > defaultPartsToMerge { ddb.startMergeWorkerLocked() } + needAssistedMerge := ddb.needAssistedMergeForInmemoryPartsLocked() ddb.partsLock.Unlock() + + if needAssistedMerge { + ddb.assistedMergeForInmemoryParts() + } +} + +func (ddb *datadb) needAssistedMergeForInmemoryPartsLocked() bool { + if ddb.IsReadOnly() { + return false + } + if len(ddb.inmemoryParts) < maxInmemoryPartsPerPartition { + return false + } + n := 0 + for _, pw := range ddb.inmemoryParts { + if pw.isInMerge { + n++ + } + } + return n >= defaultPartsToMerge +} + +func (ddb *datadb) assistedMergeForInmemoryParts() { + ddb.partsLock.Lock() + parts := make([]*partWrapper, 0, len(ddb.inmemoryParts)) + parts = appendNotInMergePartsLocked(parts, ddb.inmemoryParts) + pws := appendPartsToMerge(nil, parts, (1<<64)-1) + setInMergeLocked(pws) + ddb.partsLock.Unlock() + + err := ddb.mergeParts(pws, false) + if err == nil || errors.Is(err, errReadOnly) { + return + } + logger.Panicf("FATAL: cannot perform assisted merge for in-memory parts: %s", err) } // DatadbStats contains various stats for datadb. @@ -646,18 +681,18 @@ func (ddb *datadb) mergePartsFinal(pws []*partWrapper) error { if len(pwsChunk) == 0 { pwsChunk = append(pwsChunk[:0], pws...) } - err := ddb.mergeParts(pwsChunk, true) - if err != nil { - ddb.releasePartsToMerge(pwsChunk) - return err - } - partsToRemove := partsToMap(pwsChunk) removedParts := 0 pws, removedParts = removeParts(pws, partsToRemove) if removedParts != len(pwsChunk) { logger.Panicf("BUG: unexpected number of parts removed; got %d; want %d", removedParts, len(pwsChunk)) } + + err := ddb.mergeParts(pwsChunk, true) + if err != nil { + ddb.releasePartsToMerge(pws) + return err + } } return nil }