package logstorage

import (
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)

// blockData contains packed data for a single block.
//
// The main purpose of this struct is to reduce the work needed during background merge of parts.
// If the block is full, then the blockData can be written to the destination part
// without the need to unpack it.
type blockData struct {
	// streamID is id of the stream for the data
	streamID streamID

	// uncompressedSizeBytes is the original (uncompressed) size of log entries stored in the block
	uncompressedSizeBytes uint64

	// rowsCount is the number of log entries in the block
	rowsCount uint64

	// timestampsData contains the encoded timestamps data for the block
	timestampsData timestampsData

	// columnsData contains packed per-column data.
	columnsData []columnData

	// constColumns contains data for const columns across the block.
	constColumns []Field

	// a is used for storing byte slices for timestamps and columns.
	//
	// It reduces fragmentation for them.
	a arena
}

// reset resets bd for subsequent re-use
func (bd *blockData) reset() {
	bd.streamID.reset()
	bd.uncompressedSizeBytes = 0
	bd.rowsCount = 0
	bd.timestampsData.reset()

	cds := bd.columnsData
	for i := range cds {
		cds[i].reset()
	}
	bd.columnsData = cds[:0]

	ccs := bd.constColumns
	for i := range ccs {
		ccs[i].Reset()
	}
	bd.constColumns = ccs[:0]

	bd.a.reset()
}

func (bd *blockData) resizeColumnsData(columnsDataLen int) []columnData {
	cds := bd.columnsData
	if n := columnsDataLen - cap(cds); n > 0 {
		cds = append(cds[:cap(cds)], make([]columnData, n)...)
	}
	cds = cds[:columnsDataLen]
	bd.columnsData = cds
	return cds
}

// copyFrom copies src to bd.
func (bd *blockData) copyFrom(src *blockData) {
	bd.reset()

	bd.streamID = src.streamID
	bd.uncompressedSizeBytes = src.uncompressedSizeBytes
	bd.rowsCount = src.rowsCount
	bd.timestampsData.copyFrom(&src.timestampsData, &bd.a)

	cdsSrc := src.columnsData
	cds := bd.resizeColumnsData(len(cdsSrc))
	for i := range cds {
		cds[i].copyFrom(&cdsSrc[i], &bd.a)
	}
	bd.columnsData = cds

	bd.constColumns = append(bd.constColumns[:0], src.constColumns...)
}

// unmarshalRows appends unmarshaled from bd log entries to dst.
//
// The returned log entries are valid until sbu and vd are valid.
func (bd *blockData) unmarshalRows(dst *rows, sbu *stringsBlockUnmarshaler, vd *valuesDecoder) error {
	b := getBlock()
	defer putBlock(b)

	if err := b.InitFromBlockData(bd, sbu, vd); err != nil {
		return err
	}
	b.appendRows(dst)
	return nil
}

// mustWriteTo writes bd with the given sid to sw and updates bh accordingly
func (bd *blockData) mustWriteTo(bh *blockHeader, sw *streamWriters) {
	// Do not store the version used for encoding directly in the block data, since:
	// - all the blocks in the same part use the same encoding
	// - the block encoding version can be put in metadata file for the part (aka metadataFilename)

	bh.reset()

	bh.streamID = bd.streamID
	bh.uncompressedSizeBytes = bd.uncompressedSizeBytes
	bh.rowsCount = bd.rowsCount

	// Marshal timestamps
	bd.timestampsData.mustWriteTo(&bh.timestampsHeader, sw)

	// Marshal columns
	cds := bd.columnsData
	csh := getColumnsHeader()
	chs := csh.resizeColumnHeaders(len(cds))
	for i := range cds {
		cds[i].mustWriteTo(&chs[i], sw)
	}
	csh.constColumns = append(csh.constColumns[:0], bd.constColumns...)

	bb := longTermBufPool.Get()
	bb.B = csh.marshal(bb.B)
	putColumnsHeader(csh)
	bh.columnsHeaderOffset = sw.columnsHeaderWriter.bytesWritten
	bh.columnsHeaderSize = uint64(len(bb.B))
	if bh.columnsHeaderSize > maxColumnsHeaderSize {
		logger.Panicf("BUG: too big columnsHeaderSize: %d bytes; mustn't exceed %d bytes", bh.columnsHeaderSize, maxColumnsHeaderSize)
	}
	sw.columnsHeaderWriter.MustWrite(bb.B)
	longTermBufPool.Put(bb)
}

// mustReadFrom reads block data associated with bh from sr to bd.
func (bd *blockData) mustReadFrom(bh *blockHeader, sr *streamReaders) {
	bd.reset()

	bd.streamID = bh.streamID
	bd.uncompressedSizeBytes = bh.uncompressedSizeBytes
	bd.rowsCount = bh.rowsCount

	// Read timestamps
	bd.timestampsData.mustReadFrom(&bh.timestampsHeader, sr, &bd.a)

	// Read columns
	if bh.columnsHeaderOffset != sr.columnsHeaderReader.bytesRead {
		logger.Panicf("FATAL: %s: unexpected columnsHeaderOffset=%d; must equal to the number of bytes read: %d",
			sr.columnsHeaderReader.Path(), bh.columnsHeaderOffset, sr.columnsHeaderReader.bytesRead)
	}
	columnsHeaderSize := bh.columnsHeaderSize
	if columnsHeaderSize > maxColumnsHeaderSize {
		logger.Panicf("BUG: %s: too big columnsHeaderSize: %d bytes; mustn't exceed %d bytes", sr.columnsHeaderReader.Path(), columnsHeaderSize, maxColumnsHeaderSize)
	}
	bb := longTermBufPool.Get()
	bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, int(columnsHeaderSize))
	sr.columnsHeaderReader.MustReadFull(bb.B)

	csh := getColumnsHeader()
	if err := csh.unmarshal(bb.B); err != nil {
		logger.Panicf("FATAL: %s: cannot unmarshal columnsHeader: %s", sr.columnsHeaderReader.Path(), err)
	}
	longTermBufPool.Put(bb)
	chs := csh.columnHeaders
	cds := bd.resizeColumnsData(len(chs))
	for i := range chs {
		cds[i].mustReadFrom(&chs[i], sr, &bd.a)
	}
	bd.constColumns = append(bd.constColumns[:0], csh.constColumns...)
	putColumnsHeader(csh)
}

// timestampsData contains the encoded timestamps data.
type timestampsData struct {
	// data contains packed timestamps data.
	data []byte

	// marshalType is the marshal type for timestamps
	marshalType encoding.MarshalType

	// minTimestamp is the minimum timestamp in the timestamps data
	minTimestamp int64

	// maxTimestamp is the maximum timestamp in the timestamps data
	maxTimestamp int64
}

// reset resets td for subsequent re-use
func (td *timestampsData) reset() {
	td.data = nil
	td.marshalType = 0
	td.minTimestamp = 0
	td.maxTimestamp = 0
}

// copyFrom copies src to td.
func (td *timestampsData) copyFrom(src *timestampsData, a *arena) {
	td.reset()

	td.data = a.copyBytes(src.data)
	td.marshalType = src.marshalType
	td.minTimestamp = src.minTimestamp
	td.maxTimestamp = src.maxTimestamp
}

// mustWriteTo writes td to sw and updates th accordingly
func (td *timestampsData) mustWriteTo(th *timestampsHeader, sw *streamWriters) {
	th.reset()

	th.marshalType = td.marshalType
	th.minTimestamp = td.minTimestamp
	th.maxTimestamp = td.maxTimestamp
	th.blockOffset = sw.timestampsWriter.bytesWritten
	th.blockSize = uint64(len(td.data))
	if th.blockSize > maxTimestampsBlockSize {
		logger.Panicf("BUG: too big timestampsHeader.blockSize: %d bytes; mustn't exceed %d bytes", th.blockSize, maxTimestampsBlockSize)
	}
	sw.timestampsWriter.MustWrite(td.data)
}

// mustReadFrom reads timestamps data associated with th from sr to td.
func (td *timestampsData) mustReadFrom(th *timestampsHeader, sr *streamReaders, a *arena) {
	td.reset()

	td.marshalType = th.marshalType
	td.minTimestamp = th.minTimestamp
	td.maxTimestamp = th.maxTimestamp

	timestampsReader := &sr.timestampsReader
	if th.blockOffset != timestampsReader.bytesRead {
		logger.Panicf("FATAL: %s: unexpected timestampsHeader.blockOffset=%d; must equal to the number of bytes read: %d",
			timestampsReader.Path(), th.blockOffset, timestampsReader.bytesRead)
	}
	timestampsBlockSize := th.blockSize
	if timestampsBlockSize > maxTimestampsBlockSize {
		logger.Panicf("FATAL: %s: too big timestamps block with %d bytes; the maximum supported block size is %d bytes",
			timestampsReader.Path(), timestampsBlockSize, maxTimestampsBlockSize)
	}
	td.data = a.newBytes(int(timestampsBlockSize))
	timestampsReader.MustReadFull(td.data)
}

// columnData contains packed data for a single column.
type columnData struct {
	// name is the column name
	name string

	// valueType is the type of values stored in valuesData
	valueType valueType

	// minValue is the minimum encoded uint* or float64 value in the columnHeader
	//
	// It is used for fast detection of whether the given columnHeader contains values in the given range
	minValue uint64

	// maxValue is the maximum encoded uint* or float64 value in the columnHeader
	//
	// It is used for fast detection of whether the given columnHeader contains values in the given range
	maxValue uint64

	// valuesDict contains unique values for valueType = valueTypeDict
	valuesDict valuesDict

	// valuesData contains packed values data for the given column
	valuesData []byte

	// bloomFilterData contains packed bloomFilter data for the given column
	bloomFilterData []byte
}

// reset rests cd for subsequent re-use
func (cd *columnData) reset() {
	cd.name = ""
	cd.valueType = 0

	cd.minValue = 0
	cd.maxValue = 0
	cd.valuesDict.reset()

	cd.valuesData = nil
	cd.bloomFilterData = nil
}

// copyFrom copies src to cd.
func (cd *columnData) copyFrom(src *columnData, a *arena) {
	cd.reset()

	cd.name = src.name
	cd.valueType = src.valueType

	cd.minValue = src.minValue
	cd.maxValue = src.maxValue
	cd.valuesDict.copyFrom(&src.valuesDict)

	cd.valuesData = a.copyBytes(src.valuesData)
	cd.bloomFilterData = a.copyBytes(src.bloomFilterData)
}

// mustWriteTo writes cd to sw and updates ch accordingly.
func (cd *columnData) mustWriteTo(ch *columnHeader, sw *streamWriters) {
	ch.reset()

	valuesWriter := &sw.fieldValuesWriter
	bloomFilterWriter := &sw.fieldBloomFilterWriter
	if cd.name == "" {
		valuesWriter = &sw.messageValuesWriter
		bloomFilterWriter = &sw.messageBloomFilterWriter
	}

	ch.name = cd.name
	ch.valueType = cd.valueType

	ch.minValue = cd.minValue
	ch.maxValue = cd.maxValue
	ch.valuesDict.copyFrom(&cd.valuesDict)

	// marshal values
	ch.valuesSize = uint64(len(cd.valuesData))
	if ch.valuesSize > maxValuesBlockSize {
		logger.Panicf("BUG: too big valuesSize: %d bytes; mustn't exceed %d bytes", ch.valuesSize, maxValuesBlockSize)
	}
	ch.valuesOffset = valuesWriter.bytesWritten
	valuesWriter.MustWrite(cd.valuesData)

	// marshal bloom filter
	ch.bloomFilterSize = uint64(len(cd.bloomFilterData))
	if ch.bloomFilterSize > maxBloomFilterBlockSize {
		logger.Panicf("BUG: too big bloomFilterSize: %d bytes; mustn't exceed %d bytes", ch.bloomFilterSize, maxBloomFilterBlockSize)
	}
	ch.bloomFilterOffset = bloomFilterWriter.bytesWritten
	bloomFilterWriter.MustWrite(cd.bloomFilterData)
}

// mustReadFrom reads columns data associated with ch from sr to cd.
func (cd *columnData) mustReadFrom(ch *columnHeader, sr *streamReaders, a *arena) {
	cd.reset()

	valuesReader := &sr.fieldValuesReader
	bloomFilterReader := &sr.fieldBloomFilterReader
	if ch.name == "" {
		valuesReader = &sr.messageValuesReader
		bloomFilterReader = &sr.messageBloomFilterReader
	}

	cd.name = ch.name
	cd.valueType = ch.valueType

	cd.minValue = ch.minValue
	cd.maxValue = ch.maxValue
	cd.valuesDict.copyFrom(&ch.valuesDict)

	// read values
	if ch.valuesOffset != valuesReader.bytesRead {
		logger.Panicf("FATAL: %s: unexpected columnHeader.valuesOffset=%d; must equal to the number of bytes read: %d",
			valuesReader.Path(), ch.valuesOffset, valuesReader.bytesRead)
	}
	valuesSize := ch.valuesSize
	if valuesSize > maxValuesBlockSize {
		logger.Panicf("FATAL: %s: values block size cannot exceed %d bytes; got %d bytes", valuesReader.Path(), maxValuesBlockSize, valuesSize)
	}
	cd.valuesData = a.newBytes(int(valuesSize))
	valuesReader.MustReadFull(cd.valuesData)

	// read bloom filter
	// bloom filter is missing in valueTypeDict.
	if ch.valueType != valueTypeDict {
		if ch.bloomFilterOffset != bloomFilterReader.bytesRead {
			logger.Panicf("FATAL: %s: unexpected columnHeader.bloomFilterOffset=%d; must equal to the number of bytes read: %d",
				bloomFilterReader.Path(), ch.bloomFilterOffset, bloomFilterReader.bytesRead)
		}
		bloomFilterSize := ch.bloomFilterSize
		if bloomFilterSize > maxBloomFilterBlockSize {
			logger.Panicf("FATAL: %s: bloom filter block size cannot exceed %d bytes; got %d bytes", bloomFilterReader.Path(), maxBloomFilterBlockSize, bloomFilterSize)
		}
		cd.bloomFilterData = a.newBytes(int(bloomFilterSize))
		bloomFilterReader.MustReadFull(cd.bloomFilterData)
	}
}