package logstorage

import (
	"encoding/json"
	"fmt"
	"math"
	"os"
	"path/filepath"
	"sort"
	"sync"
	"sync/atomic"
	"time"

	"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
)

// The maximum size of big part.
//
// This number limits the maximum time required for building big part.
// This time shouldn't exceed a few days.
const maxBigPartSize = 1e12

// The maximum number of inmemory parts in the partition.
//
// The actual number of inmemory parts may exceed this value if in-memory mergers
// cannot keep up with the rate of creating new in-memory parts.
const maxInmemoryPartsPerPartition = 20

// The interval for guaranteed flush of recently ingested data from memory to on-disk parts,
// so they survive process crash.
var dataFlushInterval = 5 * time.Second

// Default number of parts to merge at once.
//
// This number has been obtained empirically - it gives the lowest possible overhead.
// See appendPartsToMerge tests for details.
const defaultPartsToMerge = 15

// minMergeMultiplier is the minimum multiplier for the size of the output part
// compared to the size of the maximum input part for the merge.
//
// Higher value reduces write amplification (disk write IO induced by the merge),
// while increases the number of unmerged parts.
// The 1.7 is good enough for production workloads.
const minMergeMultiplier = 1.7

// datadb represents a database with log data
type datadb struct {
	// mergeIdx is used for generating unique directory names for parts
	mergeIdx atomic.Uint64

	inmemoryMergesTotal  atomic.Uint64
	inmemoryActiveMerges atomic.Int64

	smallPartMergesTotal  atomic.Uint64
	smallPartActiveMerges atomic.Int64

	bigPartMergesTotal  atomic.Uint64
	bigPartActiveMerges atomic.Int64

	// pt is the partition the datadb belongs to
	pt *partition

	// path is the path to the directory with log data
	path string

	// flushInterval is interval for flushing the inmemory parts to disk
	flushInterval time.Duration

	// inmemoryParts contains a list of inmemory parts
	inmemoryParts []*partWrapper

	// smallParts contains a list of file-based small parts
	smallParts []*partWrapper

	// bigParts contains a list of file-based big parts
	bigParts []*partWrapper

	// partsLock protects parts from concurrent access
	partsLock sync.Mutex

	// wg is used for determining when background workers stop
	//
	// wg.Add() must be called under partsLock after checking whether stopCh isn't closed.
	// This should prevent from calling wg.Add() after stopCh is closed and wg.Wait() is called.
	wg sync.WaitGroup

	// stopCh is used for notifying background workers to stop
	//
	// It must be closed under partsLock in order to prevent from calling wg.Add()
	// after stopCh is closed.
	stopCh chan struct{}
}

// partWrapper is a wrapper for opened part.
type partWrapper struct {
	// refCount is the number of references to p.
	//
	// When the number of references reaches zero, then p is closed.
	refCount atomic.Int32

	// The flag, which is set when the part must be deleted after refCount reaches zero.
	mustDrop atomic.Bool

	// p is an opened part
	p *part

	// mp references inmemory part used for initializing p.
	mp *inmemoryPart

	// isInMerge is set to true if the part takes part in merge.
	isInMerge bool

	// The deadline when in-memory part must be flushed to disk.
	flushDeadline time.Time
}

func (pw *partWrapper) incRef() {
	pw.refCount.Add(1)
}

func (pw *partWrapper) decRef() {
	n := pw.refCount.Add(-1)
	if n > 0 {
		return
	}

	deletePath := ""
	if pw.mp == nil {
		if pw.mustDrop.Load() {
			deletePath = pw.p.path
		}
	} else {
		putInmemoryPart(pw.mp)
		pw.mp = nil
	}

	mustClosePart(pw.p)
	pw.p = nil

	if deletePath != "" {
		fs.MustRemoveAll(deletePath)
	}
}

func mustCreateDatadb(path string) {
	fs.MustMkdirFailIfExist(path)
	mustWritePartNames(path, nil, nil)
}

// mustOpenDatadb opens datadb at the given path with the given flushInterval for in-memory data.
func mustOpenDatadb(pt *partition, path string, flushInterval time.Duration) *datadb {
	// Remove temporary directories, which may be left after unclean shutdown.
	fs.MustRemoveTemporaryDirs(path)

	partNames := mustReadPartNames(path)
	mustRemoveUnusedDirs(path, partNames)

	var smallParts []*partWrapper
	var bigParts []*partWrapper
	for _, partName := range partNames {
		// Make sure the partName exists on disk.
		// If it is missing, then manual action from the user is needed,
		// since this is unexpected state, which cannot occur under normal operation,
		// including unclean shutdown.
		partPath := filepath.Join(path, partName)
		if !fs.IsPathExist(partPath) {
			partsFile := filepath.Join(path, partsFilename)
			logger.Panicf("FATAL: part %q is listed in %q, but is missing on disk; "+
				"ensure %q contents is not corrupted; remove %q to rebuild its' content from the list of existing parts",
				partPath, partsFile, partsFile, partsFile)
		}

		p := mustOpenFilePart(pt, partPath)
		pw := newPartWrapper(p, nil, time.Time{})
		if p.ph.CompressedSizeBytes > getMaxInmemoryPartSize() {
			bigParts = append(bigParts, pw)
		} else {
			smallParts = append(smallParts, pw)
		}
	}

	ddb := &datadb{
		pt:            pt,
		flushInterval: flushInterval,
		path:          path,
		smallParts:    smallParts,
		bigParts:      bigParts,
		stopCh:        make(chan struct{}),
	}
	ddb.mergeIdx.Store(uint64(time.Now().UnixNano()))

	ddb.startBackgroundWorkers()

	return ddb
}

func (ddb *datadb) startBackgroundWorkers() {
	// Start file parts mergers, so they could start merging unmerged parts if needed.
	// There is no need in starting in-memory parts mergers, since there are no in-memory parts yet.
	ddb.startSmallPartsMergers()
	ddb.startBigPartsMergers()

	ddb.startInmemoryPartsFlusher()
}

var (
	inmemoryPartsConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())
	smallPartsConcurrencyCh    = make(chan struct{}, cgroup.AvailableCPUs())
	bigPartsConcurrencyCh      = make(chan struct{}, cgroup.AvailableCPUs())
)

func (ddb *datadb) startSmallPartsMergers() {
	ddb.partsLock.Lock()
	for i := 0; i < cap(smallPartsConcurrencyCh); i++ {
		ddb.startSmallPartsMergerLocked()
	}
	ddb.partsLock.Unlock()
}

func (ddb *datadb) startBigPartsMergers() {
	ddb.partsLock.Lock()
	for i := 0; i < cap(bigPartsConcurrencyCh); i++ {
		ddb.startBigPartsMergerLocked()
	}
	ddb.partsLock.Unlock()
}

func (ddb *datadb) startInmemoryPartsMergerLocked() {
	if needStop(ddb.stopCh) {
		return
	}
	ddb.wg.Add(1)
	go func() {
		ddb.inmemoryPartsMerger()
		ddb.wg.Done()
	}()
}

func (ddb *datadb) startSmallPartsMergerLocked() {
	if needStop(ddb.stopCh) {
		return
	}
	ddb.wg.Add(1)
	go func() {
		ddb.smallPartsMerger()
		ddb.wg.Done()
	}()
}

func (ddb *datadb) startBigPartsMergerLocked() {
	if needStop(ddb.stopCh) {
		return
	}
	ddb.wg.Add(1)
	go func() {
		ddb.bigPartsMerger()
		ddb.wg.Done()
	}()
}

func (ddb *datadb) startInmemoryPartsFlusher() {
	ddb.wg.Add(1)
	go func() {
		ddb.inmemoryPartsFlusher()
		ddb.wg.Done()
	}()
}

func (ddb *datadb) inmemoryPartsFlusher() {
	// Do not add jitter to d in order to guarantee the flush interval
	ticker := time.NewTicker(dataFlushInterval)
	defer ticker.Stop()
	for {
		select {
		case <-ddb.stopCh:
			return
		case <-ticker.C:
			ddb.mustFlushInmemoryPartsToFiles(false)
		}
	}
}

func (ddb *datadb) mustFlushInmemoryPartsToFiles(isFinal bool) {
	currentTime := time.Now()
	var pws []*partWrapper

	ddb.partsLock.Lock()
	for _, pw := range ddb.inmemoryParts {
		if !pw.isInMerge && (isFinal || pw.flushDeadline.Before(currentTime)) {
			pw.isInMerge = true
			pws = append(pws, pw)
		}
	}
	ddb.partsLock.Unlock()

	ddb.mustMergePartsToFiles(pws)
}

func (ddb *datadb) mustMergePartsToFiles(pws []*partWrapper) {
	wg := getWaitGroup()
	for len(pws) > 0 {
		pwsToMerge, pwsRemaining := getPartsForOptimalMerge(pws)
		wg.Add(1)
		inmemoryPartsConcurrencyCh <- struct{}{}
		go func(pwsChunk []*partWrapper) {
			defer func() {
				<-inmemoryPartsConcurrencyCh
				wg.Done()
			}()

			ddb.mustMergeParts(pwsChunk, true)
		}(pwsToMerge)
		pws = pwsRemaining
	}
	wg.Wait()
	putWaitGroup(wg)
}

// getPartsForOptimalMerge returns parts from pws for optimal merge, plus the remaining parts.
//
// the pws items are replaced by nil after the call. This is needed for helping Go GC to reclaim the referenced items.
func getPartsForOptimalMerge(pws []*partWrapper) ([]*partWrapper, []*partWrapper) {
	pwsToMerge := appendPartsToMerge(nil, pws, math.MaxUint64)
	if len(pwsToMerge) == 0 {
		return pws, nil
	}

	m := partsToMap(pwsToMerge)
	pwsRemaining := make([]*partWrapper, 0, len(pws)-len(pwsToMerge))
	for _, pw := range pws {
		if _, ok := m[pw]; !ok {
			pwsRemaining = append(pwsRemaining, pw)
		}
	}

	// Clear references to pws items, so they could be reclaimed faster by Go GC.
	for i := range pws {
		pws[i] = nil
	}

	return pwsToMerge, pwsRemaining
}

func getWaitGroup() *sync.WaitGroup {
	v := wgPool.Get()
	if v == nil {
		return &sync.WaitGroup{}
	}
	return v.(*sync.WaitGroup)
}

func putWaitGroup(wg *sync.WaitGroup) {
	wgPool.Put(wg)
}

var wgPool sync.Pool

func (ddb *datadb) inmemoryPartsMerger() {
	for {
		if needStop(ddb.stopCh) {
			return
		}
		maxOutBytes := ddb.getMaxBigPartSize()

		ddb.partsLock.Lock()
		pws := getPartsToMergeLocked(ddb.inmemoryParts, maxOutBytes)
		ddb.partsLock.Unlock()

		if len(pws) == 0 {
			// Nothing to merge
			return
		}

		inmemoryPartsConcurrencyCh <- struct{}{}
		ddb.mustMergeParts(pws, false)
		<-inmemoryPartsConcurrencyCh
	}
}

func (ddb *datadb) smallPartsMerger() {
	for {
		if needStop(ddb.stopCh) {
			return
		}
		maxOutBytes := ddb.getMaxBigPartSize()

		ddb.partsLock.Lock()
		pws := getPartsToMergeLocked(ddb.smallParts, maxOutBytes)
		ddb.partsLock.Unlock()

		if len(pws) == 0 {
			// Nothing to merge
			return
		}

		smallPartsConcurrencyCh <- struct{}{}
		ddb.mustMergeParts(pws, false)
		<-smallPartsConcurrencyCh
	}
}

func (ddb *datadb) bigPartsMerger() {
	for {
		if needStop(ddb.stopCh) {
			return
		}
		maxOutBytes := ddb.getMaxBigPartSize()

		ddb.partsLock.Lock()
		pws := getPartsToMergeLocked(ddb.bigParts, maxOutBytes)
		ddb.partsLock.Unlock()

		if len(pws) == 0 {
			// Nothing to merge
			return
		}

		bigPartsConcurrencyCh <- struct{}{}
		ddb.mustMergeParts(pws, false)
		<-bigPartsConcurrencyCh
	}
}

// getPartsToMergeLocked returns optimal parts to merge from pws.
//
// The summary size of the returned parts must be smaller than maxOutBytes.
func getPartsToMergeLocked(pws []*partWrapper, maxOutBytes uint64) []*partWrapper {
	pwsRemaining := make([]*partWrapper, 0, len(pws))
	for _, pw := range pws {
		if !pw.isInMerge {
			pwsRemaining = append(pwsRemaining, pw)
		}
	}

	pwsToMerge := appendPartsToMerge(nil, pwsRemaining, maxOutBytes)

	for _, pw := range pwsToMerge {
		if pw.isInMerge {
			logger.Panicf("BUG: partWrapper.isInMerge cannot be set")
		}
		pw.isInMerge = true
	}

	return pwsToMerge
}

func assertIsInMerge(pws []*partWrapper) {
	for _, pw := range pws {
		if !pw.isInMerge {
			logger.Panicf("BUG: partWrapper.isInMerge unexpectedly set to false")
		}
	}
}

// mustMergeParts merges pws to a single resulting part.
//
// if isFinal is set, then the resulting part is guaranteed to be saved to disk.
// The pws may remain unmerged after returning from the function if there is no enough disk space.
//
// All the parts inside pws must have isInMerge field set to true.
// The isInMerge field inside pws parts is set to false before returning from the function.
func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) {
	if len(pws) == 0 {
		// Nothing to merge.
		return
	}

	assertIsInMerge(pws)
	defer ddb.releasePartsToMerge(pws)

	startTime := time.Now()

	dstPartType := ddb.getDstPartType(pws, isFinal)
	if dstPartType != partInmemory {
		// Make sure there is enough disk space for performing the merge
		partsSize := getCompressedSize(pws)
		needReleaseDiskSpace := tryReserveDiskSpace(ddb.path, partsSize)
		if needReleaseDiskSpace {
			defer releaseDiskSpace(partsSize)
		} else {
			if !isFinal {
				// There is no enough disk space for performing the non-final merge.
				return
			}
			// Try performing final merge even if there is no enough disk space
			// in order to persist in-memory data to disk.
			// It is better to crash on out of memory error in this case.
		}
	}

	switch dstPartType {
	case partInmemory:
		ddb.inmemoryMergesTotal.Add(1)
		ddb.inmemoryActiveMerges.Add(1)
		defer ddb.inmemoryActiveMerges.Add(-1)
	case partSmall:
		ddb.smallPartMergesTotal.Add(1)
		ddb.smallPartActiveMerges.Add(1)
		defer ddb.smallPartActiveMerges.Add(-1)
	case partBig:
		ddb.bigPartMergesTotal.Add(1)
		ddb.bigPartActiveMerges.Add(1)
		defer ddb.bigPartActiveMerges.Add(-1)
	default:
		logger.Panicf("BUG: unknown partType=%d", dstPartType)
	}

	// Initialize destination paths.
	mergeIdx := ddb.nextMergeIdx()
	dstPartPath := ddb.getDstPartPath(dstPartType, mergeIdx)

	if isFinal && len(pws) == 1 && pws[0].mp != nil {
		// Fast path: flush a single in-memory part to disk.
		mp := pws[0].mp
		mp.MustStoreToDisk(dstPartPath)
		pwNew := ddb.openCreatedPart(&mp.ph, pws, nil, dstPartPath)
		ddb.swapSrcWithDstParts(pws, pwNew, dstPartType)
		return
	}

	// Prepare blockStreamReaders for source parts.
	bsrs := mustOpenBlockStreamReaders(pws)

	// Prepare BlockStreamWriter for destination part.
	srcSize := uint64(0)
	srcRowsCount := uint64(0)
	srcBlocksCount := uint64(0)
	for _, pw := range pws {
		srcSize += pw.p.ph.CompressedSizeBytes
		srcRowsCount += pw.p.ph.RowsCount
		srcBlocksCount += pw.p.ph.BlocksCount
	}
	bsw := getBlockStreamWriter()
	var mpNew *inmemoryPart
	if dstPartType == partInmemory {
		mpNew = getInmemoryPart()
		bsw.MustInitForInmemoryPart(mpNew)
	} else {
		nocache := dstPartType == partBig
		bsw.MustInitForFilePart(dstPartPath, nocache)
	}

	// Merge source parts to destination part.
	var ph partHeader
	stopCh := ddb.stopCh
	if isFinal {
		// The final merge shouldn't be stopped even if ddb.stopCh is closed.
		stopCh = nil
	}
	mustMergeBlockStreams(&ph, bsw, bsrs, stopCh)
	putBlockStreamWriter(bsw)
	for _, bsr := range bsrs {
		putBlockStreamReader(bsr)
	}

	// Persist partHeader for destination part after the merge.
	if mpNew != nil {
		mpNew.ph = ph
	} else {
		ph.mustWriteMetadata(dstPartPath)
		// Make sure the created part directory listing is synced.
		fs.MustSyncPath(dstPartPath)
	}
	if needStop(stopCh) {
		// Remove incomplete destination part
		if dstPartType != partInmemory {
			fs.MustRemoveAll(dstPartPath)
		}
		return
	}

	// Atomically swap the source parts with the newly created part.
	pwNew := ddb.openCreatedPart(&ph, pws, mpNew, dstPartPath)

	dstSize := uint64(0)
	dstRowsCount := uint64(0)
	dstBlocksCount := uint64(0)
	if pwNew != nil {
		pDst := pwNew.p
		dstSize = pDst.ph.CompressedSizeBytes
		dstRowsCount = pDst.ph.RowsCount
		dstBlocksCount = pDst.ph.BlocksCount
	}

	ddb.swapSrcWithDstParts(pws, pwNew, dstPartType)

	d := time.Since(startTime)
	if d <= time.Minute {
		return
	}

	// Log stats for long merges.
	durationSecs := d.Seconds()
	rowsPerSec := int(float64(srcRowsCount) / durationSecs)
	logger.Infof("merged (%d parts, %d rows, %d blocks, %d bytes) into (1 part, %d rows, %d blocks, %d bytes) in %.3f seconds at %d rows/sec to %q",
		len(pws), srcRowsCount, srcBlocksCount, srcSize, dstRowsCount, dstBlocksCount, dstSize, durationSecs, rowsPerSec, dstPartPath)
}

func (ddb *datadb) nextMergeIdx() uint64 {
	return ddb.mergeIdx.Add(1)
}

type partType int

var (
	partInmemory = partType(0)
	partSmall    = partType(1)
	partBig      = partType(2)
)

func (ddb *datadb) getDstPartType(pws []*partWrapper, isFinal bool) partType {
	dstPartSize := getCompressedSize(pws)
	if dstPartSize > ddb.getMaxSmallPartSize() {
		return partBig
	}
	if isFinal || dstPartSize > getMaxInmemoryPartSize() {
		return partSmall
	}
	if !areAllInmemoryParts(pws) {
		// If at least a single source part is located in file,
		// then the destination part must be in file for durability reasons.
		return partSmall
	}
	return partInmemory
}

func (ddb *datadb) getDstPartPath(dstPartType partType, mergeIdx uint64) string {
	ptPath := ddb.path
	dstPartPath := ""
	if dstPartType != partInmemory {
		dstPartPath = filepath.Join(ptPath, fmt.Sprintf("%016X", mergeIdx))
	}
	return dstPartPath
}

func (ddb *datadb) openCreatedPart(ph *partHeader, pws []*partWrapper, mpNew *inmemoryPart, dstPartPath string) *partWrapper {
	// Open the created part.
	if ph.RowsCount == 0 {
		// The created part is empty. Remove it
		if mpNew == nil {
			fs.MustRemoveAll(dstPartPath)
		}
		return nil
	}
	var p *part
	var flushDeadline time.Time
	if mpNew != nil {
		// Open the created part from memory.
		p = mustOpenInmemoryPart(ddb.pt, mpNew)
		flushDeadline = ddb.getFlushToDiskDeadline(pws)
	} else {
		// Open the created part from disk.
		p = mustOpenFilePart(ddb.pt, dstPartPath)
	}
	return newPartWrapper(p, mpNew, flushDeadline)
}

func (ddb *datadb) mustAddRows(lr *LogRows) {
	if len(lr.streamIDs) == 0 {
		return
	}

	inmemoryPartsConcurrencyCh <- struct{}{}
	mp := getInmemoryPart()
	mp.mustInitFromRows(lr)
	p := mustOpenInmemoryPart(ddb.pt, mp)
	<-inmemoryPartsConcurrencyCh

	flushDeadline := time.Now().Add(ddb.flushInterval)
	pw := newPartWrapper(p, mp, flushDeadline)

	ddb.partsLock.Lock()
	ddb.inmemoryParts = append(ddb.inmemoryParts, pw)
	ddb.startInmemoryPartsMergerLocked()
	ddb.partsLock.Unlock()
}

// DatadbStats contains various stats for datadb.
type DatadbStats struct {
	// InmemoryMergesTotal is the number of inmemory merges performed in the given datadb.
	InmemoryMergesTotal uint64

	// InmemoryActiveMerges is the number of currently active inmemory merges performed by the given datadb.
	InmemoryActiveMerges uint64

	// SmallPartMergesTotal is the number of small file merges performed in the given datadb.
	SmallPartMergesTotal uint64

	// SmallPartActiveMerges is the number of currently active small file merges performed by the given datadb.
	SmallPartActiveMerges uint64

	// BigPartMergesTotal is the number of big file merges performed in the given datadb.
	BigPartMergesTotal uint64

	// BigPartActiveMerges is the number of currently active big file merges performed by the given datadb.
	BigPartActiveMerges uint64

	// InmemoryRowsCount is the number of rows, which weren't flushed to disk yet.
	InmemoryRowsCount uint64

	// SmallPartRowsCount is the number of rows stored on disk in small parts.
	SmallPartRowsCount uint64

	// BigPartRowsCount is the number of rows stored on disk in big parts.
	BigPartRowsCount uint64

	// InmemoryParts is the number of in-memory parts, which weren't flushed to disk yet.
	InmemoryParts uint64

	// SmallParts is the number of file-based small parts stored on disk.
	SmallParts uint64

	// BigParts is the number of file-based big parts stored on disk.
	BigParts uint64

	// InmemoryBlocks is the number of in-memory blocks, which weren't flushed to disk yet.
	InmemoryBlocks uint64

	// SmallPartBlocks is the number of file-based small blocks stored on disk.
	SmallPartBlocks uint64

	// BigPartBlocks is the number of file-based big blocks stored on disk.
	BigPartBlocks uint64

	// CompressedInmemorySize is the size of compressed data stored in memory.
	CompressedInmemorySize uint64

	// CompressedSmallPartSize is the size of compressed small parts data stored on disk.
	CompressedSmallPartSize uint64

	// CompressedBigPartSize is the size of compressed big data stored on disk.
	CompressedBigPartSize uint64

	// UncompressedInmemorySize is the size of uncompressed data stored in memory.
	UncompressedInmemorySize uint64

	// UncompressedSmallPartSize is the size of uncompressed small data stored on disk.
	UncompressedSmallPartSize uint64

	// UncompressedBigPartSize is the size of uncompressed big data stored on disk.
	UncompressedBigPartSize uint64
}

func (s *DatadbStats) reset() {
	*s = DatadbStats{}
}

// RowsCount returns the number of rows stored in datadb.
func (s *DatadbStats) RowsCount() uint64 {
	return s.InmemoryRowsCount + s.SmallPartRowsCount + s.BigPartRowsCount
}

// updateStats updates s with ddb stats.
func (ddb *datadb) updateStats(s *DatadbStats) {
	s.InmemoryMergesTotal += ddb.inmemoryMergesTotal.Load()
	s.InmemoryActiveMerges += uint64(ddb.inmemoryActiveMerges.Load())
	s.SmallPartMergesTotal += ddb.smallPartMergesTotal.Load()
	s.SmallPartActiveMerges += uint64(ddb.smallPartActiveMerges.Load())
	s.BigPartMergesTotal += ddb.bigPartMergesTotal.Load()
	s.BigPartActiveMerges += uint64(ddb.bigPartActiveMerges.Load())

	ddb.partsLock.Lock()

	s.InmemoryRowsCount += getRowsCount(ddb.inmemoryParts)
	s.SmallPartRowsCount += getRowsCount(ddb.smallParts)
	s.BigPartRowsCount += getRowsCount(ddb.bigParts)

	s.InmemoryParts += uint64(len(ddb.inmemoryParts))
	s.SmallParts += uint64(len(ddb.smallParts))
	s.BigParts += uint64(len(ddb.bigParts))

	s.InmemoryBlocks += getBlocksCount(ddb.inmemoryParts)
	s.SmallPartBlocks += getBlocksCount(ddb.smallParts)
	s.BigPartBlocks += getBlocksCount(ddb.bigParts)

	s.CompressedInmemorySize += getCompressedSize(ddb.inmemoryParts)
	s.CompressedSmallPartSize += getCompressedSize(ddb.smallParts)
	s.CompressedBigPartSize += getCompressedSize(ddb.bigParts)

	s.UncompressedInmemorySize += getUncompressedSize(ddb.inmemoryParts)
	s.UncompressedSmallPartSize += getUncompressedSize(ddb.smallParts)
	s.UncompressedBigPartSize += getUncompressedSize(ddb.bigParts)

	ddb.partsLock.Unlock()
}

// debugFlush() makes sure that the recently ingested data is availalbe for search.
func (ddb *datadb) debugFlush() {
	// Nothing to do, since all the ingested data is available for search via ddb.inmemoryParts.
}

func (ddb *datadb) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, dstPartType partType) {
	// Atomically unregister old parts and add new part to pt.
	partsToRemove := partsToMap(pws)

	removedInmemoryParts := 0
	removedSmallParts := 0
	removedBigParts := 0

	ddb.partsLock.Lock()

	ddb.inmemoryParts, removedInmemoryParts = removeParts(ddb.inmemoryParts, partsToRemove)
	ddb.smallParts, removedSmallParts = removeParts(ddb.smallParts, partsToRemove)
	ddb.bigParts, removedBigParts = removeParts(ddb.bigParts, partsToRemove)

	if pwNew != nil {
		switch dstPartType {
		case partInmemory:
			ddb.inmemoryParts = append(ddb.inmemoryParts, pwNew)
			ddb.startInmemoryPartsMergerLocked()
		case partSmall:
			ddb.smallParts = append(ddb.smallParts, pwNew)
			ddb.startSmallPartsMergerLocked()
		case partBig:
			ddb.bigParts = append(ddb.bigParts, pwNew)
			ddb.startBigPartsMergerLocked()
		default:
			logger.Panicf("BUG: unknown partType=%d", dstPartType)
		}
	}

	// Atomically store the updated list of file-based parts on disk.
	// This must be performed under partsLock in order to prevent from races
	// when multiple concurrently running goroutines update the list.
	if removedSmallParts > 0 || removedBigParts > 0 || pwNew != nil && dstPartType != partInmemory {
		smallPartNames := getPartNames(ddb.smallParts)
		bigPartNames := getPartNames(ddb.bigParts)
		mustWritePartNames(ddb.path, smallPartNames, bigPartNames)
	}

	ddb.partsLock.Unlock()

	removedParts := removedInmemoryParts + removedSmallParts + removedBigParts
	if removedParts != len(partsToRemove) {
		logger.Panicf("BUG: unexpected number of parts removed; got %d, want %d", removedParts, len(partsToRemove))
	}

	// Mark old parts as must be deleted and decrement reference count, so they are eventually closed and deleted.
	for _, pw := range pws {
		pw.mustDrop.Store(true)
		pw.decRef()
	}
}

func partsToMap(pws []*partWrapper) map[*partWrapper]struct{} {
	m := make(map[*partWrapper]struct{}, len(pws))
	for _, pw := range pws {
		m[pw] = struct{}{}
	}
	if len(m) != len(pws) {
		logger.Panicf("BUG: %d duplicate parts found out of %d parts", len(pws)-len(m), len(pws))
	}
	return m
}

func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]struct{}) ([]*partWrapper, int) {
	dst := pws[:0]
	for _, pw := range pws {
		if _, ok := partsToRemove[pw]; !ok {
			dst = append(dst, pw)
		}
	}
	for i := len(dst); i < len(pws); i++ {
		pws[i] = nil
	}
	return dst, len(pws) - len(dst)
}

func mustOpenBlockStreamReaders(pws []*partWrapper) []*blockStreamReader {
	bsrs := make([]*blockStreamReader, 0, len(pws))
	for _, pw := range pws {
		bsr := getBlockStreamReader()
		if pw.mp != nil {
			bsr.MustInitFromInmemoryPart(pw.mp)
		} else {
			bsr.MustInitFromFilePart(pw.p.path)
		}
		bsrs = append(bsrs, bsr)
	}
	return bsrs
}

func newPartWrapper(p *part, mp *inmemoryPart, flushDeadline time.Time) *partWrapper {
	pw := &partWrapper{
		p:  p,
		mp: mp,

		flushDeadline: flushDeadline,
	}

	// Increase reference counter for newly created part - it is decreased when the part
	// is removed from the list of open parts.
	pw.incRef()

	return pw
}

func (ddb *datadb) getFlushToDiskDeadline(pws []*partWrapper) time.Time {
	d := time.Now().Add(ddb.flushInterval)
	for _, pw := range pws {
		if pw.mp != nil && pw.flushDeadline.Before(d) {
			d = pw.flushDeadline
		}
	}
	return d
}

func getMaxInmemoryPartSize() uint64 {
	// Allocate 10% of allowed memory for in-memory parts.
	n := uint64(0.1 * float64(memory.Allowed()) / maxInmemoryPartsPerPartition)
	if n < 1e6 {
		n = 1e6
	}
	return n
}

func areAllInmemoryParts(pws []*partWrapper) bool {
	for _, pw := range pws {
		if pw.mp == nil {
			return false
		}
	}
	return true
}

func (ddb *datadb) releasePartsToMerge(pws []*partWrapper) {
	ddb.partsLock.Lock()
	for _, pw := range pws {
		if !pw.isInMerge {
			logger.Panicf("BUG: missing isInMerge flag on the part %q", pw.p.path)
		}
		pw.isInMerge = false
	}
	ddb.partsLock.Unlock()
}

func (ddb *datadb) getMaxBigPartSize() uint64 {
	return getMaxOutBytes(ddb.path)
}

func (ddb *datadb) getMaxSmallPartSize() uint64 {
	// Small parts are cached in the OS page cache,
	// so limit their size by the remaining free RAM.
	mem := memory.Remaining()
	n := uint64(mem) / defaultPartsToMerge
	if n < 10e6 {
		n = 10e6
	}
	// Make sure the output part fits available disk space for small parts.
	sizeLimit := getMaxOutBytes(ddb.path)
	if n > sizeLimit {
		n = sizeLimit
	}
	return n
}

func getMaxOutBytes(path string) uint64 {
	n := availableDiskSpace(path)
	if n > maxBigPartSize {
		n = maxBigPartSize
	}
	return n
}

func availableDiskSpace(path string) uint64 {
	available := fs.MustGetFreeSpace(path)
	reserved := reservedDiskSpace.Load()
	if available < reserved {
		return 0
	}
	return available - reserved
}

func tryReserveDiskSpace(path string, n uint64) bool {
	available := fs.MustGetFreeSpace(path)
	reserved := reserveDiskSpace(n)
	if available >= reserved {
		return true
	}
	releaseDiskSpace(n)
	return false
}

func reserveDiskSpace(n uint64) uint64 {
	return reservedDiskSpace.Add(n)
}

func releaseDiskSpace(n uint64) {
	reservedDiskSpace.Add(^(n - 1))
}

// reservedDiskSpace tracks global reserved disk space for currently executed
// background merges across all the partitions.
//
// It should allow avoiding background merges when there is no free disk space.
var reservedDiskSpace atomic.Uint64

func needStop(stopCh <-chan struct{}) bool {
	select {
	case <-stopCh:
		return true
	default:
		return false
	}
}

// mustCloseDatadb can be called only when nobody accesses ddb.
func mustCloseDatadb(ddb *datadb) {
	// Notify background workers to stop.
	// Make it under ddb.partsLock in order to prevent from calling ddb.wg.Add()
	// after ddb.stopCh is closed and ddb.wg.Wait() is called.
	ddb.partsLock.Lock()
	close(ddb.stopCh)
	ddb.partsLock.Unlock()

	// Wait for background workers to stop.
	ddb.wg.Wait()

	// flush in-memory data to disk
	ddb.mustFlushInmemoryPartsToFiles(true)
	if len(ddb.inmemoryParts) > 0 {
		logger.Panicf("BUG: the number of in-memory parts must be zero after flushing them to disk; got %d", len(ddb.inmemoryParts))
	}
	ddb.inmemoryParts = nil

	// close small parts
	for _, pw := range ddb.smallParts {
		pw.decRef()
		if n := pw.refCount.Load(); n != 0 {
			logger.Panicf("BUG: there are %d references to smallPart", n)
		}
	}
	ddb.smallParts = nil

	// close big parts
	for _, pw := range ddb.bigParts {
		pw.decRef()
		if n := pw.refCount.Load(); n != 0 {
			logger.Panicf("BUG: there are %d references to bigPart", n)
		}
	}
	ddb.bigParts = nil

	ddb.path = ""
	ddb.pt = nil
}

func getPartNames(pws []*partWrapper) []string {
	partNames := make([]string, 0, len(pws))
	for _, pw := range pws {
		if pw.mp != nil {
			// Skip in-memory parts
			continue
		}
		partName := filepath.Base(pw.p.path)
		partNames = append(partNames, partName)
	}
	sort.Strings(partNames)
	return partNames
}

func mustWritePartNames(path string, smallPartNames, bigPartNames []string) {
	partNames := append([]string{}, smallPartNames...)
	partNames = append(partNames, bigPartNames...)
	data, err := json.Marshal(partNames)
	if err != nil {
		logger.Panicf("BUG: cannot marshal partNames to JSON: %s", err)
	}
	partNamesPath := filepath.Join(path, partsFilename)
	fs.MustWriteAtomic(partNamesPath, data, true)
}

func mustReadPartNames(path string) []string {
	partNamesPath := filepath.Join(path, partsFilename)
	data, err := os.ReadFile(partNamesPath)
	if err != nil {
		logger.Panicf("FATAL: cannot read %s: %s", partNamesPath, err)
	}
	var partNames []string
	if err := json.Unmarshal(data, &partNames); err != nil {
		logger.Panicf("FATAL: cannot parse %s: %s", partNamesPath, err)
	}
	return partNames
}

// mustRemoveUnusedDirs removes dirs at path, which are missing in partNames.
//
// These dirs may be left after unclean shutdown.
func mustRemoveUnusedDirs(path string, partNames []string) {
	des := fs.MustReadDir(path)
	m := make(map[string]struct{}, len(partNames))
	for _, partName := range partNames {
		m[partName] = struct{}{}
	}
	removedDirs := 0
	for _, de := range des {
		if !fs.IsDirOrSymlink(de) {
			// Skip non-directories.
			continue
		}
		fn := de.Name()
		if _, ok := m[fn]; !ok {
			deletePath := filepath.Join(path, fn)
			fs.MustRemoveAll(deletePath)
			removedDirs++
		}
	}
	if removedDirs > 0 {
		fs.MustSyncPath(path)
	}
}

// appendPartsToMerge finds optimal parts to merge from src,
// appends them to dst and returns the result.
func appendPartsToMerge(dst, src []*partWrapper, maxOutBytes uint64) []*partWrapper {
	if len(src) < 2 {
		// There is no need in merging zero or one part :)
		return dst
	}

	// Filter out too big parts.
	// This should reduce N for O(N^2) algorithm below.
	maxInPartBytes := uint64(float64(maxOutBytes) / minMergeMultiplier)
	tmp := make([]*partWrapper, 0, len(src))
	for _, pw := range src {
		if pw.p.ph.CompressedSizeBytes > maxInPartBytes {
			continue
		}
		tmp = append(tmp, pw)
	}
	src = tmp

	sortPartsForOptimalMerge(src)

	maxSrcParts := defaultPartsToMerge
	if maxSrcParts > len(src) {
		maxSrcParts = len(src)
	}
	minSrcParts := (maxSrcParts + 1) / 2
	if minSrcParts < 2 {
		minSrcParts = 2
	}

	// Exhaustive search for parts giving the lowest write amplification when merged.
	var pws []*partWrapper
	maxM := float64(0)
	for i := minSrcParts; i <= maxSrcParts; i++ {
		for j := 0; j <= len(src)-i; j++ {
			a := src[j : j+i]
			if a[0].p.ph.CompressedSizeBytes*uint64(len(a)) < a[len(a)-1].p.ph.CompressedSizeBytes {
				// Do not merge parts with too big difference in size,
				// since this results in unbalanced merges.
				continue
			}
			outSize := getCompressedSize(a)
			if outSize > maxOutBytes {
				// There is no need in verifying remaining parts with bigger sizes.
				break
			}
			m := float64(outSize) / float64(a[len(a)-1].p.ph.CompressedSizeBytes)
			if m < maxM {
				continue
			}
			maxM = m
			pws = a
		}
	}

	minM := float64(defaultPartsToMerge) / 2
	if minM < minMergeMultiplier {
		minM = minMergeMultiplier
	}
	if maxM < minM {
		// There is no sense in merging parts with too small m,
		// since this leads to high disk write IO.
		return dst
	}
	return append(dst, pws...)
}

func sortPartsForOptimalMerge(pws []*partWrapper) {
	// Sort src parts by size and backwards timestamp.
	// This should improve adjanced points' locality in the merged parts.
	sort.Slice(pws, func(i, j int) bool {
		a := &pws[i].p.ph
		b := &pws[j].p.ph
		if a.CompressedSizeBytes == b.CompressedSizeBytes {
			return a.MinTimestamp > b.MinTimestamp
		}
		return a.CompressedSizeBytes < b.CompressedSizeBytes
	})
}

func getCompressedSize(pws []*partWrapper) uint64 {
	n := uint64(0)
	for _, pw := range pws {
		n += pw.p.ph.CompressedSizeBytes
	}
	return n
}

func getUncompressedSize(pws []*partWrapper) uint64 {
	n := uint64(0)
	for _, pw := range pws {
		n += pw.p.ph.UncompressedSizeBytes
	}
	return n
}

func getRowsCount(pws []*partWrapper) uint64 {
	n := uint64(0)
	for _, pw := range pws {
		n += pw.p.ph.RowsCount
	}
	return n
}

func getBlocksCount(pws []*partWrapper) uint64 {
	n := uint64(0)
	for _, pw := range pws {
		n += pw.p.ph.BlocksCount
	}
	return n
}