package logstorage import ( "fmt" "sort" "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" ) // block represents a block of log entries. type block struct { // timestamps contains timestamps for log entries. timestamps []int64 // columns contains values for fields seen in log entries. columns []column // constColumns contains fields with constant values across all the block entries. constColumns []Field } func (b *block) reset() { b.timestamps = b.timestamps[:0] cs := b.columns for i := range cs { cs[i].reset() } b.columns = cs[:0] ccs := b.constColumns for i := range ccs { ccs[i].Reset() } b.constColumns = ccs[:0] } // uncompressedSizeBytes returns the total size of the origianl log entries stored in b. // // It is supposed that every log entry has the following format: // // 2006-01-02T15:04:05.999999999Z07:00 field1=value1 ... fieldN=valueN func (b *block) uncompressedSizeBytes() uint64 { rowsCount := uint64(b.Len()) // Take into account timestamps n := rowsCount * uint64(len(time.RFC3339Nano)) // Take into account columns cs := b.columns for i := range cs { c := &cs[i] nameLen := uint64(len(c.name)) if nameLen == 0 { nameLen = uint64(len("_msg")) } for _, v := range c.values { if len(v) > 0 { n += nameLen + 2 + uint64(len(v)) } } } // Take into account constColumns ccs := b.constColumns for i := range ccs { cc := &ccs[i] nameLen := uint64(len(cc.Name)) if nameLen == 0 { nameLen = uint64(len("_msg")) } n += rowsCount * (2 + nameLen + uint64(len(cc.Value))) } return n } // uncompressedRowsSizeBytes returns the size of the uncompressed rows. // // It is supposed that every row has the following format: // // 2006-01-02T15:04:05.999999999Z07:00 field1=value1 ... fieldN=valueN func uncompressedRowsSizeBytes(rows [][]Field) uint64 { n := uint64(0) for _, fields := range rows { n += uncompressedRowSizeBytes(fields) } return n } // uncompressedRowSizeBytes returns the size of uncompressed row. // // It is supposed that the row has the following format: // // 2006-01-02T15:04:05.999999999Z07:00 field1=value1 ... fieldN=valueN func uncompressedRowSizeBytes(fields []Field) uint64 { n := uint64(len(time.RFC3339Nano)) // log timestamp for _, f := range fields { nameLen := len(f.Name) if nameLen == 0 { nameLen = len("_msg") } n += uint64(2 + nameLen + len(f.Value)) } return n } // column contains values for the given field name seen in log entries. type column struct { // name is the field name name string // values is the values seen for the given log entries. values []string } func (c *column) reset() { c.name = "" clear(c.values) c.values = c.values[:0] } func (c *column) canStoreInConstColumn() bool { values := c.values if len(values) == 0 { return true } value := values[0] if len(value) > maxConstColumnValueSize { return false } for _, v := range values[1:] { if value != v { return false } } return true } func (c *column) resizeValues(valuesLen int) []string { c.values = slicesutil.SetLength(c.values, valuesLen) return c.values } // mustWriteTo writes c to sw and updates ch accordingly. // // ch is valid until c is changed. func (c *column) mustWriteTo(ch *columnHeader, sw *streamWriters) { ch.reset() ch.name = c.name bloomValuesWriter := sw.getBloomValuesWriterForColumnName(ch.name) // encode values ve := getValuesEncoder() ch.valueType, ch.minValue, ch.maxValue = ve.encode(c.values, &ch.valuesDict) bb := longTermBufPool.Get() defer longTermBufPool.Put(bb) // marshal values bb.B = marshalStringsBlock(bb.B[:0], ve.values) putValuesEncoder(ve) ch.valuesSize = uint64(len(bb.B)) if ch.valuesSize > maxValuesBlockSize { logger.Panicf("BUG: too valuesSize: %d bytes; mustn't exceed %d bytes", ch.valuesSize, maxValuesBlockSize) } ch.valuesOffset = bloomValuesWriter.values.bytesWritten bloomValuesWriter.values.MustWrite(bb.B) // create and marshal bloom filter for c.values if ch.valueType != valueTypeDict { hashesBuf := encoding.GetUint64s(0) hashesBuf.A = tokenizeHashes(hashesBuf.A[:0], c.values) bb.B = bloomFilterMarshalHashes(bb.B[:0], hashesBuf.A) encoding.PutUint64s(hashesBuf) } else { // there is no need in ecoding bloom filter for dictionary type, // since it isn't used during querying - all the dictionary values are available in ch.valuesDict bb.B = bb.B[:0] } ch.bloomFilterSize = uint64(len(bb.B)) if ch.bloomFilterSize > maxBloomFilterBlockSize { logger.Panicf("BUG: too big bloomFilterSize: %d bytes; mustn't exceed %d bytes", ch.bloomFilterSize, maxBloomFilterBlockSize) } ch.bloomFilterOffset = bloomValuesWriter.bloom.bytesWritten bloomValuesWriter.bloom.MustWrite(bb.B) } func (b *block) assertValid() { // Check that timestamps are in ascending order timestamps := b.timestamps for i := 1; i < len(timestamps); i++ { if timestamps[i-1] > timestamps[i] { logger.Panicf("BUG: log entries must be sorted by timestamp; got the previous entry with bigger timestamp %d than the current entry with timestamp %d", timestamps[i-1], timestamps[i]) } } // Check that the number of items in each column matches the number of items in the block. itemsCount := len(timestamps) columns := b.columns for _, c := range columns { if len(c.values) != itemsCount { logger.Panicf("BUG: unexpected number of values for column %q: got %d; want %d", c.name, len(c.values), itemsCount) } } } // MustInitFromRows initializes b from the given timestamps and rows. // // It is expected that timestamps are sorted. // // b is valid until rows are changed. // // Returns the number of the processed timestamps and rows. // If the returned number is smaller than len(rows), then the rest of rows aren't processed. func (b *block) MustInitFromRows(timestamps []int64, rows [][]Field) int { b.reset() assertTimestampsSorted(timestamps) rowsProcessed := b.mustInitFromRows(timestamps, rows) b.sortColumnsByName() return rowsProcessed } // mustInitFromRows initializes b from the given timestamps and rows. // // b is valid until rows are changed. // // Returns the number of the processed timestamps and rows. // If the returned number is smaller than len(rows), then the rest of rows aren't processed. func (b *block) mustInitFromRows(timestamps []int64, rows [][]Field) int { if len(timestamps) != len(rows) { logger.Panicf("BUG: len of timestamps %d and rows %d must be equal", len(timestamps), len(rows)) } rowsLen := len(rows) if rowsLen == 0 { // Nothing to do return 0 } if areSameFieldsInRows(rows) { // Fast path - all the log entries have the same fields b.timestamps = append(b.timestamps, timestamps...) fields := rows[0] for i := range fields { f := &fields[i] if canStoreInConstColumn(rows, i) { cc := b.extendConstColumns() cc.Name = f.Name cc.Value = f.Value } else { c := b.extendColumns() c.name = f.Name values := c.resizeValues(rowsLen) for j := range rows { values[j] = rows[j][i].Value } } } return rowsLen } // Slow path - log entries contain different set of fields // Determine indexes for columns columnIdxs := getColumnIdxs() i := 0 for i < len(rows) { fields := rows[i] if len(columnIdxs)+len(fields) > maxColumnsPerBlock { break } for j := range fields { name := fields[j].Name if _, ok := columnIdxs[name]; !ok { columnIdxs[name] = len(columnIdxs) } } i++ } rowsProcessed := i // keep only rows that fit maxColumnsPerBlock limit rows = rows[:rowsProcessed] timestamps = timestamps[:rowsProcessed] b.timestamps = append(b.timestamps, timestamps...) // Initialize columns cs := b.resizeColumns(len(columnIdxs)) for name, idx := range columnIdxs { c := &cs[idx] c.name = name c.resizeValues(len(rows)) } // Write rows to block for i := range rows { for _, f := range rows[i] { idx := columnIdxs[f.Name] cs[idx].values[i] = f.Value } } putColumnIdxs(columnIdxs) // Detect const columns for i := len(cs) - 1; i >= 0; i-- { c := &cs[i] if !c.canStoreInConstColumn() { continue } cc := b.extendConstColumns() cc.Name = c.name cc.Value = c.values[0] c.reset() if i < len(cs)-1 { swapColumns(c, &cs[len(cs)-1]) } cs = cs[:len(cs)-1] } b.columns = cs return rowsProcessed } func swapColumns(a, b *column) { *a, *b = *b, *a } func canStoreInConstColumn(rows [][]Field, colIdx int) bool { if len(rows) == 0 { return true } value := rows[0][colIdx].Value if len(value) > maxConstColumnValueSize { return false } rows = rows[1:] for i := range rows { if value != rows[i][colIdx].Value { return false } } return true } func assertTimestampsSorted(timestamps []int64) { for i := range timestamps { if i > 0 && timestamps[i-1] > timestamps[i] { logger.Panicf("BUG: log entries must be sorted by timestamp; got the previous entry with bigger timestamp %d than the current entry with timestamp %d", timestamps[i-1], timestamps[i]) } } } func (b *block) extendConstColumns() *Field { ccs := b.constColumns if cap(ccs) > len(ccs) { ccs = ccs[:len(ccs)+1] } else { ccs = append(ccs, Field{}) } b.constColumns = ccs return &ccs[len(ccs)-1] } func (b *block) extendColumns() *column { cs := b.columns if cap(cs) > len(cs) { cs = cs[:len(cs)+1] } else { cs = append(cs, column{}) } b.columns = cs return &cs[len(cs)-1] } func (b *block) resizeColumns(columnsLen int) []column { b.columns = slicesutil.SetLength(b.columns, columnsLen) return b.columns } func (b *block) sortColumnsByName() { if len(b.columns)+len(b.constColumns) > maxColumnsPerBlock { logger.Panicf("BUG: too big number of columns detected in the block: %d; the number of columns mustn't exceed %d", len(b.columns)+len(b.constColumns), maxColumnsPerBlock) } cs := getColumnsSorter() cs.columns = b.columns sort.Sort(cs) putColumnsSorter(cs) ccs := getConstColumnsSorter() ccs.columns = b.constColumns sort.Sort(ccs) putConstColumnsSorter(ccs) } // Len returns the number of log entries in b. func (b *block) Len() int { return len(b.timestamps) } // InitFromBlockData unmarshals bd to b. // // sbu and vd are used as a temporary storage for unmarshaled column values. // // The b becomes outdated after sbu or vd is reset. func (b *block) InitFromBlockData(bd *blockData, sbu *stringsBlockUnmarshaler, vd *valuesDecoder) error { b.reset() if bd.rowsCount > maxRowsPerBlock { return fmt.Errorf("too many entries found in the block: %d; mustn't exceed %d", bd.rowsCount, maxRowsPerBlock) } rowsCount := int(bd.rowsCount) // unmarshal timestamps td := &bd.timestampsData var err error b.timestamps, err = encoding.UnmarshalTimestamps(b.timestamps[:0], td.data, td.marshalType, td.minTimestamp, rowsCount) if err != nil { return fmt.Errorf("cannot unmarshal timestamps: %w", err) } // unmarshal columns cds := bd.columnsData cs := b.resizeColumns(len(cds)) for i := range cds { cd := &cds[i] c := &cs[i] c.name = sbu.copyString(cd.name) c.values, err = sbu.unmarshal(c.values[:0], cd.valuesData, uint64(rowsCount)) if err != nil { return fmt.Errorf("cannot unmarshal column %d: %w", i, err) } if err = vd.decodeInplace(c.values, cd.valueType, cd.valuesDict.values); err != nil { return fmt.Errorf("cannot decode column values: %w", err) } } // unmarshal constColumns b.constColumns = sbu.appendFields(b.constColumns[:0], bd.constColumns) return nil } // mustWriteTo writes b with the given sid to sw and updates bh accordingly. func (b *block) mustWriteTo(sid *streamID, bh *blockHeader, sw *streamWriters, g *columnNameIDGenerator) { b.assertValid() bh.reset() bh.streamID = *sid bh.uncompressedSizeBytes = b.uncompressedSizeBytes() bh.rowsCount = uint64(b.Len()) // Marshal timestamps mustWriteTimestampsTo(&bh.timestampsHeader, b.timestamps, sw) // Marshal columns cs := b.columns csh := getColumnsHeader() chs := csh.resizeColumnHeaders(len(cs)) for i := range cs { cs[i].mustWriteTo(&chs[i], sw) } csh.constColumns = append(csh.constColumns[:0], b.constColumns...) csh.mustWriteTo(bh, sw, g) putColumnsHeader(csh) } // appendRowsTo appends log entries from b to dst. func (b *block) appendRowsTo(dst *rows) { // copy timestamps dst.timestamps = append(dst.timestamps, b.timestamps...) // copy columns fieldsBuf := dst.fieldsBuf ccs := b.constColumns cs := b.columns for i := range b.timestamps { fieldsLen := len(fieldsBuf) // copy const columns fieldsBuf = append(fieldsBuf, ccs...) // copy other columns for j := range cs { c := &cs[j] value := c.values[i] if len(value) == 0 { continue } fieldsBuf = append(fieldsBuf, Field{ Name: c.name, Value: value, }) } dst.rows = append(dst.rows, fieldsBuf[fieldsLen:]) } dst.fieldsBuf = fieldsBuf } func areSameFieldsInRows(rows [][]Field) bool { if len(rows) < 2 { return true } fields := rows[0] // Verify that all the field names are unique m := getFieldsSet() for i := range fields { f := &fields[i] if _, ok := m[f.Name]; ok { // Field name isn't unique return false } m[f.Name] = struct{}{} } putFieldsSet(m) // Verify that all the fields are the same across rows rows = rows[1:] for i := range rows { leFields := rows[i] if len(fields) != len(leFields) { return false } for j := range leFields { if leFields[j].Name != fields[j].Name { return false } } } return true } func getFieldsSet() map[string]struct{} { v := fieldsSetPool.Get() if v == nil { return make(map[string]struct{}) } return v.(map[string]struct{}) } func putFieldsSet(m map[string]struct{}) { clear(m) fieldsSetPool.Put(m) } var fieldsSetPool sync.Pool var columnIdxsPool sync.Pool func getColumnIdxs() map[string]int { v := columnIdxsPool.Get() if v == nil { return make(map[string]int) } return v.(map[string]int) } func putColumnIdxs(m map[string]int) { clear(m) columnIdxsPool.Put(m) } func getBlock() *block { v := blockPool.Get() if v == nil { return &block{} } return v.(*block) } func putBlock(b *block) { b.reset() blockPool.Put(b) } var blockPool sync.Pool type columnsSorter struct { columns []column } func (cs *columnsSorter) reset() { cs.columns = nil } func (cs *columnsSorter) Len() int { return len(cs.columns) } func (cs *columnsSorter) Less(i, j int) bool { columns := cs.columns return columns[i].name < columns[j].name } func (cs *columnsSorter) Swap(i, j int) { columns := cs.columns columns[i], columns[j] = columns[j], columns[i] } func getColumnsSorter() *columnsSorter { v := columnsSorterPool.Get() if v == nil { return &columnsSorter{} } return v.(*columnsSorter) } func putColumnsSorter(cs *columnsSorter) { cs.reset() columnsSorterPool.Put(cs) } var columnsSorterPool sync.Pool type constColumnsSorter struct { columns []Field } func (ccs *constColumnsSorter) reset() { ccs.columns = nil } func (ccs *constColumnsSorter) Len() int { return len(ccs.columns) } func (ccs *constColumnsSorter) Less(i, j int) bool { columns := ccs.columns return columns[i].Name < columns[j].Name } func (ccs *constColumnsSorter) Swap(i, j int) { columns := ccs.columns columns[i], columns[j] = columns[j], columns[i] } func getConstColumnsSorter() *constColumnsSorter { v := constColumnsSorterPool.Get() if v == nil { return &constColumnsSorter{} } return v.(*constColumnsSorter) } func putConstColumnsSorter(ccs *constColumnsSorter) { ccs.reset() constColumnsSorterPool.Put(ccs) } var constColumnsSorterPool sync.Pool // mustWriteTimestampsTo writes timestamps to sw and updates th accordingly func mustWriteTimestampsTo(th *timestampsHeader, timestamps []int64, sw *streamWriters) { th.reset() bb := longTermBufPool.Get() bb.B, th.marshalType, th.minTimestamp = encoding.MarshalTimestamps(bb.B[:0], timestamps, 64) if len(bb.B) > maxTimestampsBlockSize { logger.Panicf("BUG: too big block with timestamps: %d bytes; the maximum supported size is %d bytes", len(bb.B), maxTimestampsBlockSize) } th.maxTimestamp = timestamps[len(timestamps)-1] th.blockOffset = sw.timestampsWriter.bytesWritten th.blockSize = uint64(len(bb.B)) sw.timestampsWriter.MustWrite(bb.B) longTermBufPool.Put(bb) }