package logstorage

import (
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
)

// blockData contains packed data for a single block.
//
// The main purpose of this struct is to reduce the work needed during background merge of parts.
// If the block is full, then the blockData can be written to the destination part
// without the need to unpack it.
type blockData struct {
	// streamID is id of the stream for the data
	streamID streamID

	// uncompressedSizeBytes is the original (uncompressed) size of log entries stored in the block
	uncompressedSizeBytes uint64

	// rowsCount is the number of log entries in the block
	rowsCount uint64

	// timestampsData contains the encoded timestamps data for the block
	timestampsData timestampsData

	// columnsData contains packed per-column data
	columnsData []columnData

	// constColumns contains data for const columns across the block
	constColumns []Field
}

// reset resets bd for subsequent re-use
func (bd *blockData) reset() {
	bd.streamID.reset()
	bd.uncompressedSizeBytes = 0
	bd.rowsCount = 0
	bd.timestampsData.reset()

	cds := bd.columnsData
	for i := range cds {
		cds[i].reset()
	}
	bd.columnsData = cds[:0]

	ccs := bd.constColumns
	for i := range ccs {
		ccs[i].Reset()
	}
	bd.constColumns = ccs[:0]
}

func (bd *blockData) resizeColumnsData(columnsDataLen int) []columnData {
	bd.columnsData = slicesutil.SetLength(bd.columnsData, columnsDataLen)
	return bd.columnsData
}

// copyFrom copies src to bd.
//
// bd is valid until a.reset() is called.
func (bd *blockData) copyFrom(a *arena, src *blockData) {
	bd.reset()

	bd.streamID = src.streamID
	bd.uncompressedSizeBytes = src.uncompressedSizeBytes
	bd.rowsCount = src.rowsCount
	bd.timestampsData.copyFrom(a, &src.timestampsData)

	cdsSrc := src.columnsData
	cds := bd.resizeColumnsData(len(cdsSrc))
	for i := range cds {
		cds[i].copyFrom(a, &cdsSrc[i])
	}
	bd.columnsData = cds

	bd.constColumns = appendFields(a, bd.constColumns[:0], src.constColumns)
}

// unmarshalRows appends unmarshaled from bd log entries to dst.
//
// The unmarshaled log entries are valid until sbu and vd are reset.
func (bd *blockData) unmarshalRows(dst *rows, sbu *stringsBlockUnmarshaler, vd *valuesDecoder) error {
	b := getBlock()
	defer putBlock(b)

	if err := b.InitFromBlockData(bd, sbu, vd); err != nil {
		return err
	}
	b.appendRowsTo(dst)
	return nil
}

// mustWriteTo writes bd to sw and updates bh accordingly
func (bd *blockData) mustWriteTo(bh *blockHeader, sw *streamWriters) {
	// Do not store the version used for encoding directly in the block data, since:
	// - all the blocks in the same part use the same encoding
	// - the block encoding version can be put in metadata file for the part (aka metadataFilename)

	bh.reset()

	bh.streamID = bd.streamID
	bh.uncompressedSizeBytes = bd.uncompressedSizeBytes
	bh.rowsCount = bd.rowsCount

	// Marshal timestamps
	bd.timestampsData.mustWriteTo(&bh.timestampsHeader, sw)

	// Marshal columns
	cds := bd.columnsData

	csh := getColumnsHeader()

	chs := csh.resizeColumnHeaders(len(cds))
	for i := range cds {
		cds[i].mustWriteToNoArena(&chs[i], sw)
	}
	csh.constColumns = append(csh.constColumns[:0], bd.constColumns...)

	bb := longTermBufPool.Get()
	bb.B = csh.marshal(bb.B)

	putColumnsHeader(csh)

	bh.columnsHeaderOffset = sw.columnsHeaderWriter.bytesWritten
	bh.columnsHeaderSize = uint64(len(bb.B))
	if bh.columnsHeaderSize > maxColumnsHeaderSize {
		logger.Panicf("BUG: too big columnsHeaderSize: %d bytes; mustn't exceed %d bytes", bh.columnsHeaderSize, maxColumnsHeaderSize)
	}
	sw.columnsHeaderWriter.MustWrite(bb.B)
	longTermBufPool.Put(bb)
}

// mustReadFrom reads block data associated with bh from sr to bd.
//
// The bd is valid until a.reset() is called.
func (bd *blockData) mustReadFrom(a *arena, bh *blockHeader, sr *streamReaders) {
	bd.reset()

	bd.streamID = bh.streamID
	bd.uncompressedSizeBytes = bh.uncompressedSizeBytes
	bd.rowsCount = bh.rowsCount

	// Read timestamps
	bd.timestampsData.mustReadFrom(a, &bh.timestampsHeader, sr)

	// Read columns
	if bh.columnsHeaderOffset != sr.columnsHeaderReader.bytesRead {
		logger.Panicf("FATAL: %s: unexpected columnsHeaderOffset=%d; must equal to the number of bytes read: %d",
			sr.columnsHeaderReader.Path(), bh.columnsHeaderOffset, sr.columnsHeaderReader.bytesRead)
	}
	columnsHeaderSize := bh.columnsHeaderSize
	if columnsHeaderSize > maxColumnsHeaderSize {
		logger.Panicf("BUG: %s: too big columnsHeaderSize: %d bytes; mustn't exceed %d bytes", sr.columnsHeaderReader.Path(), columnsHeaderSize, maxColumnsHeaderSize)
	}
	bb := longTermBufPool.Get()
	bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, int(columnsHeaderSize))
	sr.columnsHeaderReader.MustReadFull(bb.B)

	cshA := getArena()
	csh := getColumnsHeader()
	if err := csh.unmarshal(cshA, bb.B); err != nil {
		logger.Panicf("FATAL: %s: cannot unmarshal columnsHeader: %s", sr.columnsHeaderReader.Path(), err)
	}
	longTermBufPool.Put(bb)
	chs := csh.columnHeaders
	cds := bd.resizeColumnsData(len(chs))
	for i := range chs {
		cds[i].mustReadFrom(a, &chs[i], sr)
	}
	bd.constColumns = appendFields(a, bd.constColumns[:0], csh.constColumns)
	putColumnsHeader(csh)
	putArena(cshA)
}

// timestampsData contains the encoded timestamps data.
type timestampsData struct {
	// data contains packed timestamps data.
	data []byte

	// marshalType is the marshal type for timestamps
	marshalType encoding.MarshalType

	// minTimestamp is the minimum timestamp in the timestamps data
	minTimestamp int64

	// maxTimestamp is the maximum timestamp in the timestamps data
	maxTimestamp int64
}

// reset resets td for subsequent re-use
func (td *timestampsData) reset() {
	td.data = nil
	td.marshalType = 0
	td.minTimestamp = 0
	td.maxTimestamp = 0
}

// copyFrom copies src to td.
//
// td is valid until a.reset() is called.
func (td *timestampsData) copyFrom(a *arena, src *timestampsData) {
	td.reset()

	td.data = a.copyBytes(src.data)
	td.marshalType = src.marshalType
	td.minTimestamp = src.minTimestamp
	td.maxTimestamp = src.maxTimestamp
}

// mustWriteTo writes td to sw and updates th accordingly
func (td *timestampsData) mustWriteTo(th *timestampsHeader, sw *streamWriters) {
	th.reset()

	th.marshalType = td.marshalType
	th.minTimestamp = td.minTimestamp
	th.maxTimestamp = td.maxTimestamp
	th.blockOffset = sw.timestampsWriter.bytesWritten
	th.blockSize = uint64(len(td.data))
	if th.blockSize > maxTimestampsBlockSize {
		logger.Panicf("BUG: too big timestampsHeader.blockSize: %d bytes; mustn't exceed %d bytes", th.blockSize, maxTimestampsBlockSize)
	}
	sw.timestampsWriter.MustWrite(td.data)
}

// mustReadFrom reads timestamps data associated with th from sr to td.
//
// td is valid until a.reset() is called.
func (td *timestampsData) mustReadFrom(a *arena, th *timestampsHeader, sr *streamReaders) {
	td.reset()

	td.marshalType = th.marshalType
	td.minTimestamp = th.minTimestamp
	td.maxTimestamp = th.maxTimestamp

	timestampsReader := &sr.timestampsReader
	if th.blockOffset != timestampsReader.bytesRead {
		logger.Panicf("FATAL: %s: unexpected timestampsHeader.blockOffset=%d; must equal to the number of bytes read: %d",
			timestampsReader.Path(), th.blockOffset, timestampsReader.bytesRead)
	}
	timestampsBlockSize := th.blockSize
	if timestampsBlockSize > maxTimestampsBlockSize {
		logger.Panicf("FATAL: %s: too big timestamps block with %d bytes; the maximum supported block size is %d bytes",
			timestampsReader.Path(), timestampsBlockSize, maxTimestampsBlockSize)
	}
	td.data = a.newBytes(int(timestampsBlockSize))
	timestampsReader.MustReadFull(td.data)
}

// columnData contains packed data for a single column.
type columnData struct {
	// name is the column name
	name string

	// valueType is the type of values stored in valuesData
	valueType valueType

	// minValue is the minimum encoded uint* or float64 value in the columnHeader
	//
	// It is used for fast detection of whether the given columnHeader contains values in the given range
	minValue uint64

	// maxValue is the maximum encoded uint* or float64 value in the columnHeader
	//
	// It is used for fast detection of whether the given columnHeader contains values in the given range
	maxValue uint64

	// valuesDict contains unique values for valueType = valueTypeDict
	valuesDict valuesDict

	// valuesData contains packed values data for the given column
	valuesData []byte

	// bloomFilterData contains packed bloomFilter data for the given column
	bloomFilterData []byte
}

// reset rests cd for subsequent re-use
func (cd *columnData) reset() {
	cd.name = ""
	cd.valueType = 0

	cd.minValue = 0
	cd.maxValue = 0
	cd.valuesDict.reset()

	cd.valuesData = nil
	cd.bloomFilterData = nil
}

// copyFrom copies src to cd.
//
// cd is valid until a.reset() is called.
func (cd *columnData) copyFrom(a *arena, src *columnData) {
	cd.reset()

	cd.name = a.copyString(src.name)
	cd.valueType = src.valueType

	cd.minValue = src.minValue
	cd.maxValue = src.maxValue
	cd.valuesDict.copyFrom(a, &src.valuesDict)

	cd.valuesData = a.copyBytes(src.valuesData)
	cd.bloomFilterData = a.copyBytes(src.bloomFilterData)
}

// mustWriteTo writes cd to sw and updates ch accordingly.
//
// ch is valid until cd is changed.
func (cd *columnData) mustWriteToNoArena(ch *columnHeader, sw *streamWriters) {
	ch.reset()

	valuesWriter := &sw.fieldValuesWriter
	bloomFilterWriter := &sw.fieldBloomFilterWriter
	if cd.name == "" {
		valuesWriter = &sw.messageValuesWriter
		bloomFilterWriter = &sw.messageBloomFilterWriter
	}

	ch.name = cd.name
	ch.valueType = cd.valueType

	ch.minValue = cd.minValue
	ch.maxValue = cd.maxValue
	ch.valuesDict.copyFromNoArena(&cd.valuesDict)

	// marshal values
	ch.valuesSize = uint64(len(cd.valuesData))
	if ch.valuesSize > maxValuesBlockSize {
		logger.Panicf("BUG: too big valuesSize: %d bytes; mustn't exceed %d bytes", ch.valuesSize, maxValuesBlockSize)
	}
	ch.valuesOffset = valuesWriter.bytesWritten
	valuesWriter.MustWrite(cd.valuesData)

	// marshal bloom filter
	ch.bloomFilterSize = uint64(len(cd.bloomFilterData))
	if ch.bloomFilterSize > maxBloomFilterBlockSize {
		logger.Panicf("BUG: too big bloomFilterSize: %d bytes; mustn't exceed %d bytes", ch.bloomFilterSize, maxBloomFilterBlockSize)
	}
	ch.bloomFilterOffset = bloomFilterWriter.bytesWritten
	bloomFilterWriter.MustWrite(cd.bloomFilterData)
}

// mustReadFrom reads columns data associated with ch from sr to cd.
//
// cd is valid until a.reset() is called.
func (cd *columnData) mustReadFrom(a *arena, ch *columnHeader, sr *streamReaders) {
	cd.reset()

	valuesReader := &sr.fieldValuesReader
	bloomFilterReader := &sr.fieldBloomFilterReader
	if ch.name == "" {
		valuesReader = &sr.messageValuesReader
		bloomFilterReader = &sr.messageBloomFilterReader
	}

	cd.name = a.copyString(ch.name)
	cd.valueType = ch.valueType

	cd.minValue = ch.minValue
	cd.maxValue = ch.maxValue
	cd.valuesDict.copyFrom(a, &ch.valuesDict)

	// read values
	if ch.valuesOffset != valuesReader.bytesRead {
		logger.Panicf("FATAL: %s: unexpected columnHeader.valuesOffset=%d; must equal to the number of bytes read: %d",
			valuesReader.Path(), ch.valuesOffset, valuesReader.bytesRead)
	}
	valuesSize := ch.valuesSize
	if valuesSize > maxValuesBlockSize {
		logger.Panicf("FATAL: %s: values block size cannot exceed %d bytes; got %d bytes", valuesReader.Path(), maxValuesBlockSize, valuesSize)
	}
	cd.valuesData = a.newBytes(int(valuesSize))
	valuesReader.MustReadFull(cd.valuesData)

	// read bloom filter
	// bloom filter is missing in valueTypeDict.
	if ch.valueType != valueTypeDict {
		if ch.bloomFilterOffset != bloomFilterReader.bytesRead {
			logger.Panicf("FATAL: %s: unexpected columnHeader.bloomFilterOffset=%d; must equal to the number of bytes read: %d",
				bloomFilterReader.Path(), ch.bloomFilterOffset, bloomFilterReader.bytesRead)
		}
		bloomFilterSize := ch.bloomFilterSize
		if bloomFilterSize > maxBloomFilterBlockSize {
			logger.Panicf("FATAL: %s: bloom filter block size cannot exceed %d bytes; got %d bytes", bloomFilterReader.Path(), maxBloomFilterBlockSize, bloomFilterSize)
		}
		cd.bloomFilterData = a.newBytes(int(bloomFilterSize))
		bloomFilterReader.MustReadFull(cd.bloomFilterData)
	}
}