package logstorage

import (
	"path/filepath"
	"sync"

	"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)

// writerWithStats writes data to w and tracks the total amounts of data written at bytesWritten.
type writerWithStats struct {
	w            filestream.WriteCloser
	bytesWritten uint64
}

func (w *writerWithStats) reset() {
	w.w = nil
	w.bytesWritten = 0
}

func (w *writerWithStats) init(wc filestream.WriteCloser) {
	w.reset()

	w.w = wc
}

func (w *writerWithStats) Path() string {
	return w.w.Path()
}

func (w *writerWithStats) MustWrite(data []byte) {
	fs.MustWriteData(w.w, data)
	w.bytesWritten += uint64(len(data))
}

// MustClose closes the underlying w.
func (w *writerWithStats) MustClose() {
	w.w.MustClose()
}

// streamWriters contain writers for blockStreamWriter
type streamWriters struct {
	metaindexWriter          writerWithStats
	indexWriter              writerWithStats
	columnsHeaderWriter      writerWithStats
	timestampsWriter         writerWithStats
	fieldValuesWriter        writerWithStats
	fieldBloomFilterWriter   writerWithStats
	messageValuesWriter      writerWithStats
	messageBloomFilterWriter writerWithStats
}

func (sw *streamWriters) reset() {
	sw.metaindexWriter.reset()
	sw.indexWriter.reset()
	sw.columnsHeaderWriter.reset()
	sw.timestampsWriter.reset()
	sw.fieldValuesWriter.reset()
	sw.fieldBloomFilterWriter.reset()
	sw.messageValuesWriter.reset()
	sw.messageBloomFilterWriter.reset()
}

func (sw *streamWriters) init(metaindexWriter, indexWriter, columnsHeaderWriter, timestampsWriter, fieldValuesWriter, fieldBloomFilterWriter,
	messageValuesWriter, messageBloomFilterWriter filestream.WriteCloser,
) {
	sw.metaindexWriter.init(metaindexWriter)
	sw.indexWriter.init(indexWriter)
	sw.columnsHeaderWriter.init(columnsHeaderWriter)
	sw.timestampsWriter.init(timestampsWriter)
	sw.fieldValuesWriter.init(fieldValuesWriter)
	sw.fieldBloomFilterWriter.init(fieldBloomFilterWriter)
	sw.messageValuesWriter.init(messageValuesWriter)
	sw.messageBloomFilterWriter.init(messageBloomFilterWriter)
}

func (sw *streamWriters) totalBytesWritten() uint64 {
	n := uint64(0)
	n += sw.metaindexWriter.bytesWritten
	n += sw.indexWriter.bytesWritten
	n += sw.columnsHeaderWriter.bytesWritten
	n += sw.timestampsWriter.bytesWritten
	n += sw.fieldValuesWriter.bytesWritten
	n += sw.fieldBloomFilterWriter.bytesWritten
	n += sw.messageValuesWriter.bytesWritten
	n += sw.messageBloomFilterWriter.bytesWritten
	return n
}

func (sw *streamWriters) MustClose() {
	sw.metaindexWriter.MustClose()
	sw.indexWriter.MustClose()
	sw.columnsHeaderWriter.MustClose()
	sw.timestampsWriter.MustClose()
	sw.fieldValuesWriter.MustClose()
	sw.fieldBloomFilterWriter.MustClose()
	sw.messageValuesWriter.MustClose()
	sw.messageBloomFilterWriter.MustClose()
}

// blockStreamWriter is used for writing blocks into the underlying storage in streaming manner.
type blockStreamWriter struct {
	// streamWriters contains writer for block data
	streamWriters streamWriters

	// sidLast is the streamID for the last written block
	sidLast streamID

	// sidFirst is the streamID for the first block in the current indexBlock
	sidFirst streamID

	// minTimestampLast is the minimum timestamp seen for the last written block
	minTimestampLast int64

	// minTimestamp is the minimum timestamp seen across written blocks for the current indexBlock
	minTimestamp int64

	// maxTimestamp is the maximum timestamp seen across written blocks for the current indexBlock
	maxTimestamp int64

	// hasWrittenBlocks is set to true if at least a single block is written to the current indexBlock
	hasWrittenBlocks bool

	// globalUncompressedSizeBytes is the total size of all the log entries written via bsw
	globalUncompressedSizeBytes uint64

	// globalRowsCount is the total number of log entries written via bsw
	globalRowsCount uint64

	// globalBlocksCount is the total number of blocks written to bsw
	globalBlocksCount uint64

	// globalMinTimestamp is the minimum timestamp seen across all the blocks written to bsw
	globalMinTimestamp int64

	// globalMaxTimestamp is the maximum timestamp seen across all the blocks written to bsw
	globalMaxTimestamp int64

	// indexBlockData contains marshaled blockHeader data, which isn't written yet to indexFilename
	indexBlockData []byte

	// metaindexData contains marshaled indexBlockHeader data, which isn't written yet to metaindexFilename
	metaindexData []byte

	// indexBlockHeader is used for marshaling the data to metaindexData
	indexBlockHeader indexBlockHeader
}

// reset resets bsw for subsequent re-use.
func (bsw *blockStreamWriter) reset() {
	bsw.streamWriters.reset()
	bsw.sidLast.reset()
	bsw.sidFirst.reset()
	bsw.minTimestampLast = 0
	bsw.minTimestamp = 0
	bsw.maxTimestamp = 0
	bsw.hasWrittenBlocks = false
	bsw.globalUncompressedSizeBytes = 0
	bsw.globalRowsCount = 0
	bsw.globalBlocksCount = 0
	bsw.globalMinTimestamp = 0
	bsw.globalMaxTimestamp = 0
	bsw.indexBlockData = bsw.indexBlockData[:0]

	if len(bsw.metaindexData) > 1024*1024 {
		// The length of bsw.metaindexData is unbound, so drop too long buffer
		// in order to conserve memory.
		bsw.metaindexData = nil
	} else {
		bsw.metaindexData = bsw.metaindexData[:0]
	}

	bsw.indexBlockHeader.reset()
}

// MustInitFromInmemoryPart initializes bsw from mp
func (bsw *blockStreamWriter) MustInitForInmemoryPart(mp *inmemoryPart) {
	bsw.reset()
	bsw.streamWriters.init(&mp.metaindex, &mp.index, &mp.columnsHeader, &mp.timestamps, &mp.fieldValues, &mp.fieldBloomFilter, &mp.messageValues, &mp.messageBloomFilter)
}

// MustInitForFilePart initializes bsw for writing data to file part located at path.
//
// if nocache is true, then the written data doesn't go to OS page cache.
func (bsw *blockStreamWriter) MustInitForFilePart(path string, nocache bool) {
	bsw.reset()

	fs.MustMkdirFailIfExist(path)

	metaindexPath := filepath.Join(path, metaindexFilename)
	indexPath := filepath.Join(path, indexFilename)
	columnsHeaderPath := filepath.Join(path, columnsHeaderFilename)
	timestampsPath := filepath.Join(path, timestampsFilename)
	fieldValuesPath := filepath.Join(path, fieldValuesFilename)
	fieldBloomFilterPath := filepath.Join(path, fieldBloomFilename)
	messageValuesPath := filepath.Join(path, messageValuesFilename)
	messageBloomFilterPath := filepath.Join(path, messageBloomFilename)

	// Always cache metaindex file, since it it re-read immediately after part creation
	metaindexWriter := filestream.MustCreate(metaindexPath, false)

	indexWriter := filestream.MustCreate(indexPath, nocache)
	columnsHeaderWriter := filestream.MustCreate(columnsHeaderPath, nocache)
	timestampsWriter := filestream.MustCreate(timestampsPath, nocache)
	fieldValuesWriter := filestream.MustCreate(fieldValuesPath, nocache)
	fieldBloomFilterWriter := filestream.MustCreate(fieldBloomFilterPath, nocache)
	messageValuesWriter := filestream.MustCreate(messageValuesPath, nocache)
	messageBloomFilterWriter := filestream.MustCreate(messageBloomFilterPath, nocache)

	bsw.streamWriters.init(metaindexWriter, indexWriter, columnsHeaderWriter, timestampsWriter,
		fieldValuesWriter, fieldBloomFilterWriter, messageValuesWriter, messageBloomFilterWriter)
}

// MustWriteRows writes timestamps with rows under the given sid to bsw.
//
// timestamps must be sorted.
// sid must be bigger or equal to the sid for the previously written rs.
func (bsw *blockStreamWriter) MustWriteRows(sid *streamID, timestamps []int64, rows [][]Field) {
	if len(timestamps) == 0 {
		return
	}

	b := getBlock()
	b.MustInitFromRows(timestamps, rows)
	bsw.MustWriteBlock(sid, b)
	putBlock(b)
}

// MustWriteBlockData writes bd to bsw.
//
// The bd.streamID must be bigger or equal to the streamID for the previously written blocks.
func (bsw *blockStreamWriter) MustWriteBlockData(bd *blockData) {
	if bd.rowsCount == 0 {
		return
	}
	bsw.mustWriteBlockInternal(&bd.streamID, nil, bd)
}

// MustWriteBlock writes b under the given sid to bsw.
//
// The sid must be bigger or equal to the sid for the previously written blocks.
// The minimum timestamp in b must be bigger or equal to the minimum timestamp written to the same sid.
func (bsw *blockStreamWriter) MustWriteBlock(sid *streamID, b *block) {
	rowsCount := b.Len()
	if rowsCount == 0 {
		return
	}
	bsw.mustWriteBlockInternal(sid, b, nil)
}

func (bsw *blockStreamWriter) mustWriteBlockInternal(sid *streamID, b *block, bd *blockData) {
	if sid.less(&bsw.sidLast) {
		logger.Panicf("BUG: the sid=%s cannot be smaller than the previously written sid=%s", sid, &bsw.sidLast)
	}
	hasWrittenBlocks := bsw.hasWrittenBlocks
	if !hasWrittenBlocks {
		bsw.sidFirst = *sid
		bsw.hasWrittenBlocks = true
	}
	isSeenSid := sid.equal(&bsw.sidLast)
	bsw.sidLast = *sid

	bh := getBlockHeader()
	if b != nil {
		b.mustWriteTo(sid, bh, &bsw.streamWriters)
	} else {
		bd.mustWriteTo(bh, &bsw.streamWriters)
	}
	th := &bh.timestampsHeader
	if bsw.globalRowsCount == 0 || th.minTimestamp < bsw.globalMinTimestamp {
		bsw.globalMinTimestamp = th.minTimestamp
	}
	if bsw.globalRowsCount == 0 || th.maxTimestamp > bsw.globalMaxTimestamp {
		bsw.globalMaxTimestamp = th.maxTimestamp
	}
	if !hasWrittenBlocks || th.minTimestamp < bsw.minTimestamp {
		bsw.minTimestamp = th.minTimestamp
	}
	if !hasWrittenBlocks || th.maxTimestamp > bsw.maxTimestamp {
		bsw.maxTimestamp = th.maxTimestamp
	}
	if isSeenSid && th.minTimestamp < bsw.minTimestampLast {
		logger.Panicf("BUG: the block for sid=%s cannot contain timestamp smaller than %d, but it contains timestamp %d", sid, bsw.minTimestampLast, th.minTimestamp)
	}
	bsw.minTimestampLast = th.minTimestamp

	bsw.globalUncompressedSizeBytes += bh.uncompressedSizeBytes
	bsw.globalRowsCount += bh.rowsCount
	bsw.globalBlocksCount++

	// Marshal bh
	bsw.indexBlockData = bh.marshal(bsw.indexBlockData)
	putBlockHeader(bh)
	if len(bsw.indexBlockData) > maxUncompressedIndexBlockSize {
		bsw.mustFlushIndexBlock(bsw.indexBlockData)
		bsw.indexBlockData = bsw.indexBlockData[:0]
	}
}

func (bsw *blockStreamWriter) mustFlushIndexBlock(data []byte) {
	if len(data) > 0 {
		bsw.indexBlockHeader.mustWriteIndexBlock(data, bsw.sidFirst, bsw.minTimestamp, bsw.maxTimestamp, &bsw.streamWriters)
		bsw.metaindexData = bsw.indexBlockHeader.marshal(bsw.metaindexData)
	}
	bsw.hasWrittenBlocks = false
	bsw.minTimestamp = 0
	bsw.maxTimestamp = 0
	bsw.sidFirst.reset()
}

// Finalize() finalizes the data write process and updates ph with the finalized stats
//
// It closes the writers passed to MustInit().
//
// bsw can be re-used after calling Finalize().
func (bsw *blockStreamWriter) Finalize(ph *partHeader) {
	ph.UncompressedSizeBytes = bsw.globalUncompressedSizeBytes
	ph.RowsCount = bsw.globalRowsCount
	ph.BlocksCount = bsw.globalBlocksCount
	ph.MinTimestamp = bsw.globalMinTimestamp
	ph.MaxTimestamp = bsw.globalMaxTimestamp

	bsw.mustFlushIndexBlock(bsw.indexBlockData)

	// Write metaindex data
	bb := longTermBufPool.Get()
	bb.B = encoding.CompressZSTDLevel(bb.B[:0], bsw.metaindexData, 1)
	bsw.streamWriters.metaindexWriter.MustWrite(bb.B)
	if len(bb.B) < 1024*1024 {
		longTermBufPool.Put(bb)
	}

	ph.CompressedSizeBytes = bsw.streamWriters.totalBytesWritten()

	bsw.streamWriters.MustClose()
	bsw.reset()
}

var longTermBufPool bytesutil.ByteBufferPool

// getBlockStreamWriter returns new blockStreamWriter from the pool.
//
// Return back the blockStreamWriter to the pool when it is no longer needed by calling putBlockStreamWriter.
func getBlockStreamWriter() *blockStreamWriter {
	v := blockStreamWriterPool.Get()
	if v == nil {
		return &blockStreamWriter{}
	}
	return v.(*blockStreamWriter)
}

// putBlockStreamWriter returns bsw to the pool.
func putBlockStreamWriter(bsw *blockStreamWriter) {
	bsw.reset()
	blockStreamWriterPool.Put(bsw)
}

var blockStreamWriterPool sync.Pool