diff --git a/lib/encoding/float.go b/lib/encoding/float.go new file mode 100644 index 000000000..c6f1786bc --- /dev/null +++ b/lib/encoding/float.go @@ -0,0 +1,37 @@ +package encoding + +import ( + "sync" +) + +// GetFloat64s returns a slice of float64 values with the given size. +// +// When the returned slice is no longer needed, it is advised calling PutFloat64s() on it, +// so it could be re-used. +func GetFloat64s(size int) *Float64s { + v := float64sPool.Get() + if v == nil { + v = &Float64s{} + } + a := v.(*Float64s) + if n := size - cap(a.A); n > 0 { + a.A = append(a.A[:cap(a.A)], make([]float64, n)...) + } + a.A = a.A[:size] + return a +} + +// PutFloat64s returns a to the pool, so it can be re-used via GetFloat64s. +// +// The a cannot be used after returning to the pull. +func PutFloat64s(a *Float64s) { + a.A = a.A[:0] + float64sPool.Put(a) +} + +var float64sPool sync.Pool + +// Float64s holds an array of float64 values. +type Float64s struct { + A []float64 +} diff --git a/lib/logstorage/block_search.go b/lib/logstorage/block_search.go index 0b678be24..94fc1ad07 100644 --- a/lib/logstorage/block_search.go +++ b/lib/logstorage/block_search.go @@ -1,6 +1,7 @@ package logstorage import ( + "math" "strconv" "sync" "time" @@ -274,25 +275,26 @@ func (ih *indexBlockHeader) mustReadBlockHeaders(dst []blockHeader, p *part) []b } type blockResult struct { - buf []byte + // buf holds all the bytes behind the requested column values in the block. + buf []byte + + // values holds all the requested column values in the block. valuesBuf []string - // streamID is streamID for the given blockResult + // streamID is streamID for the given blockResult. streamID streamID - // cs contain values for the requested columns. - // - // The corresponding requested column names are stored at columnsNames. - cs []blockResultColumn - - // timestamps contain timestamps for the selected log entries + // timestamps contain timestamps for the selected log entries in the block. timestamps []int64 - // columnNamesBuf is used only if all the columns must be fetched. - columnNamesBuf []string + // csOffset contains cs offset for the requested columns. + // + // columns with indexes below csOffset are ignored. + // This is needed for simplifying data transformations at pipe stages. + csOffset int - // columnNames references the list of names for cs columns. - columnNames []string + // cs contains requested columns. + cs []blockResultColumn } func (br *blockResult) reset() { @@ -303,65 +305,60 @@ func (br *blockResult) reset() { br.streamID.reset() + br.timestamps = br.timestamps[:0] + + br.csOffset = 0 + cs := br.cs for i := range cs { cs[i].reset() } br.cs = cs[:0] - - br.timestamps = br.timestamps[:0] - - clear(br.columnNamesBuf) - br.columnNamesBuf = br.columnNamesBuf[:0] - - br.columnNames = nil } func (br *blockResult) fetchAllColumns(bs *blockSearch, bm *bitmap) { - // Add _stream column - br.columnNamesBuf = append(br.columnNamesBuf, "_stream") if !br.addStreamColumn(bs) { // Skip the current block, since the associated stream tags are missing. br.reset() return } - // Add _time column - br.columnNamesBuf = append(br.columnNamesBuf, "_time") br.addTimeColumn() // Add _msg column v := bs.csh.getConstColumnValue("_msg") if v != "" { - br.columnNamesBuf = append(br.columnNamesBuf, "_msg") - br.addConstColumn(v) + br.addConstColumn("_msg", v) } else if ch := bs.csh.getColumnHeader("_msg"); ch != nil { - br.columnNamesBuf = append(br.columnNamesBuf, "_msg") br.addColumn(bs, ch, bm) + } else { + br.addConstColumn("_msg", "") } + // Add other const columns for _, cc := range bs.csh.constColumns { if isMsgFieldName(cc.Name) { continue } - br.columnNamesBuf = append(br.columnNamesBuf, cc.Name) - br.addConstColumn(cc.Value) + br.addConstColumn(cc.Name, cc.Value) } + // Add other non-const columns chs := bs.csh.columnHeaders for i := range chs { ch := &chs[i] if isMsgFieldName(ch.name) { continue } - br.columnNamesBuf = append(br.columnNamesBuf, ch.name) br.addColumn(bs, ch, bm) } - br.columnNames = br.columnNamesBuf } func (br *blockResult) fetchRequestedColumns(bs *blockSearch, bm *bitmap) { for _, columnName := range bs.bsw.so.resultColumnNames { + if columnName == "" { + columnName = "_msg" + } switch columnName { case "_stream": if !br.addStreamColumn(bs) { @@ -374,22 +371,14 @@ func (br *blockResult) fetchRequestedColumns(bs *blockSearch, bm *bitmap) { default: v := bs.csh.getConstColumnValue(columnName) if v != "" { - br.addConstColumn(v) - continue - } - ch := bs.csh.getColumnHeader(columnName) - if ch == nil { - br.addConstColumn("") - } else { + br.addConstColumn(columnName, v) + } else if ch := bs.csh.getColumnHeader(columnName); ch != nil { br.addColumn(bs, ch, bm) + } else { + br.addConstColumn(columnName, "") } } } - br.columnNames = bs.bsw.so.resultColumnNames -} - -func (br *blockResult) RowsCount() int { - return len(br.timestamps) } func (br *blockResult) mustInit(bs *blockSearch, bm *bitmap) { @@ -401,13 +390,17 @@ func (br *blockResult) mustInit(bs *blockSearch, bm *bitmap) { // Nothing to initialize for zero matching log entries in the block. return } - // Initialize timestamps, since they are used for determining the number of rows in br.RowsCount() + + // Initialize timestamps, since they are required for all the further work with br. + srcTimestamps := bs.getTimestamps() if bm.areAllBitsSet() { + // Fast path - all the rows in the block are selected, so copy all the timestamps without any filtering. br.timestamps = append(br.timestamps[:0], srcTimestamps...) return } + // Slow path - copy only the needed timestamps to br according to filter results. dstTimestamps := br.timestamps[:0] bm.forEachSetBit(func(idx int) bool { ts := srcTimestamps[idx] @@ -517,7 +510,12 @@ func (br *blockResult) addColumn(bs *blockSearch, ch *columnHeader, bm *bitmap) } dictValues = valuesBuf[valuesBufLen:] + name := ch.name + if name == "" { + name = "_msg" + } br.cs = append(br.cs, blockResultColumn{ + name: name, valueType: ch.valueType, dictValues: dictValues, encodedValues: encodedValues, @@ -528,6 +526,7 @@ func (br *blockResult) addColumn(bs *blockSearch, ch *columnHeader, bm *bitmap) func (br *blockResult) addTimeColumn() { br.cs = append(br.cs, blockResultColumn{ + name: "_time", isTime: true, }) } @@ -551,11 +550,11 @@ func (br *blockResult) addStreamColumn(bs *blockSearch) bool { PutStreamTags(st) s := bytesutil.ToUnsafeString(bb.B) - br.addConstColumn(s) + br.addConstColumn("_stream", s) return true } -func (br *blockResult) addConstColumn(value string) { +func (br *blockResult) addConstColumn(name, value string) { buf := br.buf bufLen := len(buf) buf = append(buf, value...) @@ -568,17 +567,198 @@ func (br *blockResult) addConstColumn(value string) { br.valuesBuf = valuesBuf br.cs = append(br.cs, blockResultColumn{ + name: name, isConst: true, - valueType: valueTypeUnknown, encodedValues: valuesBuf[valuesBufLen:], }) } -// getColumnValues returns values for the column with the given idx. +func (br *blockResult) updateColumns(columnNames []string) { + if br.areSameColumns(columnNames) { + // Fast path - nothing to change. + return + } + + // Slow path - construct the requested columns + cs := br.cs + csOffset := len(cs) + for _, columnName := range columnNames { + c := br.getColumnByName(columnName) + cs = append(cs, c) + } + br.csOffset = csOffset + br.cs = cs +} + +func (br *blockResult) areSameColumns(columnNames []string) bool { + cs := br.getColumns() + if len(cs) != len(columnNames) { + return false + } + for i := range cs { + if cs[i].name != columnNames[i] { + return false + } + } + return true +} + +func (br *blockResult) getColumnByName(columnName string) blockResultColumn { + if columnName == "" { + columnName = "_msg" + } + + cs := br.getColumns() + for i := range cs { + if cs[i].name == columnName { + return cs[i] + } + } + + return blockResultColumn{ + name: columnName, + isConst: true, + encodedValues: getEmptyStrings(1), + } +} + +func (br *blockResult) getColumns() []blockResultColumn { + return br.cs[br.csOffset:] +} + +func (br *blockResult) skipRows(skipRows int) { + br.timestamps = append(br.timestamps[:0], br.timestamps[skipRows:]...) + cs := br.getColumns() + for i := range cs { + c := &cs[i] + if c.values != nil { + c.values = append(c.values[:0], c.values[skipRows:]...) + } + if c.isConst { + continue + } + if c.encodedValues != nil { + c.encodedValues = append(c.encodedValues[:0], c.encodedValues[skipRows:]...) + } + } +} + +func (br *blockResult) truncateRows(keepRows int) { + br.timestamps = br.timestamps[:keepRows] + cs := br.getColumns() + for i := range cs { + c := &cs[i] + if c.values != nil { + c.values = c.values[:keepRows] + } + if c.isConst { + continue + } + if c.encodedValues != nil { + c.encodedValues = c.encodedValues[:keepRows] + } + } +} + +func (br *blockResult) appendColumnValues(dst [][]string, columnNames []string) [][]string { + for _, columnName := range columnNames { + c := br.getColumnByName(columnName) + values := c.getValues(br) + dst = append(dst, values) + } + return dst +} + +type blockResultColumn struct { + // name is column name. + name string + + // isConst is set to true if the column is const. + // + // The column value is stored in encodedValues[0] + isConst bool + + // isTime is set to true if the column contains _time values. + // + // The column values are stored in blockResult.timestamps + isTime bool + + // valueType is the type of non-cost value + valueType valueType + + // dictValues contain dictionary values for valueTypeDict column + dictValues []string + + // encodedValues contain encoded values for non-const column + encodedValues []string + + // values contain decoded values after getValues() call for the given column + values []string +} + +func (c *blockResultColumn) reset() { + c.name = "" + c.isConst = false + c.isTime = false + c.valueType = valueTypeUnknown + c.dictValues = nil + c.encodedValues = nil + c.values = nil +} + +// getEncodedValues returns encoded values for the given column. +// +// The returned encoded values are valid until br.reset() is called. +func (c *blockResultColumn) getEncodedValues(br *blockResult) []string { + if c.encodedValues != nil { + return c.encodedValues + } + + if !c.isTime { + logger.Panicf("BUG: encodedValues may be missing only for _time column; got %q column", c.name) + } + + buf := br.buf + valuesBuf := br.valuesBuf + valuesBufLen := len(valuesBuf) + + for _, timestamp := range br.timestamps { + bufLen := len(buf) + buf = encoding.MarshalInt64(buf, timestamp) + s := bytesutil.ToUnsafeString(buf[bufLen:]) + valuesBuf = append(valuesBuf, s) + } + + c.encodedValues = valuesBuf[valuesBufLen:] + + br.valuesBuf = valuesBuf + br.buf = buf + + return c.encodedValues +} + +// getValueAtRow returns value for the value at the given rowIdx. +// +// The returned value is valid until br.reset() is called. +func (c *blockResultColumn) getValueAtRow(br *blockResult, rowIdx int) string { + if c.isConst { + // Fast path for const column. + return c.encodedValues[0] + } + if c.values != nil { + // Fast path, which avoids call overhead for getValues(). + return c.values[rowIdx] + } + + // Slow path - decode all the values and return the given value. + values := c.getValues(br) + return values[rowIdx] +} + +// getValues returns values for the given column. // // The returned values are valid until br.reset() is called. -func (br *blockResult) getColumnValues(idx int) []string { - c := &br.cs[idx] +func (c *blockResultColumn) getValues(br *blockResult) []string { if c.values != nil { return c.values } @@ -589,6 +769,13 @@ func (br *blockResult) getColumnValues(idx int) []string { if c.isConst { v := c.encodedValues[0] + if v == "" { + // Fast path - return a slice of empty strings without constructing it. + c.values = getEmptyStrings(len(br.timestamps)) + return c.values + } + + // Slower path - construct slice of identical values with the len(br.timestamps) for range br.timestamps { valuesBuf = append(valuesBuf, v) } @@ -694,35 +881,137 @@ func (br *blockResult) getColumnValues(idx int) []string { return c.values } -type blockResultColumn struct { - // isConst is set to true if the column is const. - // - // The column value is stored in encodedValues[0] - isConst bool +func (c *blockResultColumn) getFloatValueAtRow(rowIdx int) float64 { + if c.isConst { + v := c.encodedValues[0] + f, _ := tryParseFloat64(v) + return f + } + if c.isTime { + return 0 + } - // isTime is set to true if the column contains _time values. - // - // The column values are stored in blockResult.timestamps - isTime bool - - // valueType is the type of non-cost value - valueType valueType - - // dictValues contain dictionary values for valueTypeDict column - dictValues []string - - // encodedValues contain encoded values for non-const column - encodedValues []string - - // values contain decoded values after getColumnValues() call for the given column - values []string + switch c.valueType { + case valueTypeString: + f, _ := tryParseFloat64(c.encodedValues[rowIdx]) + return f + case valueTypeDict: + dictIdx := c.encodedValues[rowIdx][0] + f, _ := tryParseFloat64(c.dictValues[dictIdx]) + return f + case valueTypeUint8: + return float64(c.encodedValues[rowIdx][0]) + case valueTypeUint16: + b := bytesutil.ToUnsafeBytes(c.encodedValues[rowIdx]) + return float64(encoding.UnmarshalUint16(b)) + case valueTypeUint32: + b := bytesutil.ToUnsafeBytes(c.encodedValues[rowIdx]) + return float64(encoding.UnmarshalUint32(b)) + case valueTypeUint64: + b := bytesutil.ToUnsafeBytes(c.encodedValues[rowIdx]) + return float64(encoding.UnmarshalUint64(b)) + case valueTypeFloat64: + b := bytesutil.ToUnsafeBytes(c.encodedValues[rowIdx]) + n := encoding.UnmarshalUint64(b) + return math.Float64frombits(n) + case valueTypeIPv4: + return 0 + case valueTypeTimestampISO8601: + return 0 + default: + logger.Panicf("BUG: unknown valueType=%d", c.valueType) + return 0 + } } -func (c *blockResultColumn) reset() { - c.isConst = false - c.isTime = false - c.valueType = valueTypeUnknown - c.dictValues = nil - c.encodedValues = nil - c.values = nil +func (c *blockResultColumn) sumValues(br *blockResult) float64 { + if c.isConst { + v := c.encodedValues[0] + f, _ := tryParseFloat64(v) + if f == 0 || math.IsNaN(f) { + return 0 + } + return f * float64(len(br.timestamps)) + } + if c.isTime { + return 0 + } + + switch c.valueType { + case valueTypeString: + sum := float64(0) + f := float64(0) + values := c.encodedValues + for i := range values { + if i == 0 || values[i-1] != values[i] { + f, _ = tryParseFloat64(values[i]) + } + if !math.IsNaN(f) { + sum += f + } + } + return sum + case valueTypeDict: + a := encoding.GetFloat64s(len(c.dictValues)) + dictValuesFloat := a.A + for i, v := range c.dictValues { + f, _ := tryParseFloat64(v) + if math.IsNaN(f) { + f = 0 + } + dictValuesFloat[i] = f + } + sum := float64(0) + for _, v := range c.encodedValues { + dictIdx := v[0] + sum += dictValuesFloat[dictIdx] + } + encoding.PutFloat64s(a) + return sum + case valueTypeUint8: + sum := uint64(0) + for _, v := range c.encodedValues { + sum += uint64(v[0]) + } + return float64(sum) + case valueTypeUint16: + sum := uint64(0) + for _, v := range c.encodedValues { + b := bytesutil.ToUnsafeBytes(v) + sum += uint64(encoding.UnmarshalUint16(b)) + } + return float64(sum) + case valueTypeUint32: + sum := uint64(0) + for _, v := range c.encodedValues { + b := bytesutil.ToUnsafeBytes(v) + sum += uint64(encoding.UnmarshalUint32(b)) + } + return float64(sum) + case valueTypeUint64: + sum := float64(0) + for _, v := range c.encodedValues { + b := bytesutil.ToUnsafeBytes(v) + sum += float64(encoding.UnmarshalUint64(b)) + } + return sum + case valueTypeFloat64: + sum := float64(0) + for _, v := range c.encodedValues { + b := bytesutil.ToUnsafeBytes(v) + n := encoding.UnmarshalUint64(b) + f := math.Float64frombits(n) + if !math.IsNaN(f) { + sum += f + } + } + return sum + case valueTypeIPv4: + return 0 + case valueTypeTimestampISO8601: + return 0 + default: + logger.Panicf("BUG: unknown valueType=%d", c.valueType) + return 0 + } } diff --git a/lib/logstorage/filter_test.go b/lib/logstorage/filter_test.go index 585099f5a..e05792d11 100644 --- a/lib/logstorage/filter_test.go +++ b/lib/logstorage/filter_test.go @@ -203,10 +203,11 @@ func testFilterMatchForStorage(t *testing.T, s *Storage, tenantID TenantID, f fi } // Verify columns - if len(br.cs) != 1 { - t.Fatalf("unexpected number of columns in blockResult; got %d; want 1", len(br.cs)) + cs := br.getColumns() + if len(cs) != 1 { + t.Fatalf("unexpected number of columns in blockResult; got %d; want 1", len(cs)) } - results := br.getColumnValues(0) + results := cs[0].getValues(br) if !reflect.DeepEqual(results, expectedResults) { t.Fatalf("unexpected results matched;\ngot\n%q\nwant\n%q", results, expectedResults) } diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index e08e57734..a286e1c1c 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -27,13 +27,13 @@ type pipeProcessor interface { // The workerID is the id of the worker goroutine, which calls the writeBlock. // It is in the range 0 ... workersCount-1 . // - // It is forbidden to hold references to columns after returning from writeBlock, since the caller re-uses columns. + // It is forbidden to hold references br after returning from writeBlock, since the caller re-uses it. // // If any error occurs at writeBlock, then cancel() must be called by pipeProcessor in order to notify worker goroutines // to stop sending new data. The occurred error must be returned from flush(). // // cancel() may be called also when the pipeProcessor decides to stop accepting new data, even if there is no any error. - writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) + writeBlock(workerID uint, br *blockResult) // flush must flush all the data accumulated in the pipeProcessor to the base pipeProcessor. // @@ -43,14 +43,14 @@ type pipeProcessor interface { flush() error } -type defaultPipeProcessor func(workerID uint, timestamps []int64, columns []BlockColumn) +type defaultPipeProcessor func(workerID uint, br *blockResult) -func newDefaultPipeProcessor(writeBlock func(workerID uint, timestamps []int64, columns []BlockColumn)) pipeProcessor { +func newDefaultPipeProcessor(writeBlock func(workerID uint, br *blockResult)) pipeProcessor { return defaultPipeProcessor(writeBlock) } -func (dpp defaultPipeProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) { - dpp(workerID, timestamps, columns) +func (dpp defaultPipeProcessor) writeBlock(workerID uint, br *blockResult) { + dpp(workerID, br) } func (dpp defaultPipeProcessor) flush() error { diff --git a/lib/logstorage/pipe_fields.go b/lib/logstorage/pipe_fields.go index 3d9c339c2..3822e2bff 100644 --- a/lib/logstorage/pipe_fields.go +++ b/lib/logstorage/pipe_fields.go @@ -34,26 +34,11 @@ type pipeFieldsProcessor struct { ppBase pipeProcessor } -func (fpp *pipeFieldsProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) { - if fpp.pf.containsStar || areSameBlockColumns(columns, fpp.pf.fields) { - // Fast path - there is no need in additional transformations before writing the block to ppBase. - fpp.ppBase.writeBlock(workerID, timestamps, columns) - return +func (fpp *pipeFieldsProcessor) writeBlock(workerID uint, br *blockResult) { + if !fpp.pf.containsStar { + br.updateColumns(fpp.pf.fields) } - - // Slow path - construct columns for fpp.pf.fields before writing them to ppBase. - brs := getBlockRows() - cs := brs.cs - for _, f := range fpp.pf.fields { - values := getBlockColumnValues(columns, f, len(timestamps)) - cs = append(cs, BlockColumn{ - Name: f, - Values: values, - }) - } - fpp.ppBase.writeBlock(workerID, timestamps, cs) - brs.cs = cs - putBlockRows(brs) + fpp.ppBase.writeBlock(workerID, br) } func (fpp *pipeFieldsProcessor) flush() error { diff --git a/lib/logstorage/pipe_head.go b/lib/logstorage/pipe_head.go index 698c75f33..3eda2ca83 100644 --- a/lib/logstorage/pipe_head.go +++ b/lib/logstorage/pipe_head.go @@ -33,31 +33,25 @@ type pipeHeadProcessor struct { rowsProcessed atomic.Uint64 } -func (hpp *pipeHeadProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) { - rowsProcessed := hpp.rowsProcessed.Add(uint64(len(timestamps))) +func (hpp *pipeHeadProcessor) writeBlock(workerID uint, br *blockResult) { + rowsProcessed := hpp.rowsProcessed.Add(uint64(len(br.timestamps))) if rowsProcessed <= hpp.ph.n { // Fast path - write all the rows to ppBase. - hpp.ppBase.writeBlock(workerID, timestamps, columns) + hpp.ppBase.writeBlock(workerID, br) return } // Slow path - overflow. Write the remaining rows if needed. - rowsProcessed -= uint64(len(timestamps)) + rowsProcessed -= uint64(len(br.timestamps)) if rowsProcessed >= hpp.ph.n { // Nothing to write. There is no need in cancel() call, since it has been called by another goroutine. return } // Write remaining rows. - rowsRemaining := hpp.ph.n - rowsProcessed - cs := make([]BlockColumn, len(columns)) - for i, c := range columns { - cDst := &cs[i] - cDst.Name = c.Name - cDst.Values = c.Values[:rowsRemaining] - } - timestamps = timestamps[:rowsRemaining] - hpp.ppBase.writeBlock(workerID, timestamps, cs) + keepRows := hpp.ph.n - rowsProcessed + br.truncateRows(int(keepRows)) + hpp.ppBase.writeBlock(workerID, br) // Notify the caller that it should stop passing more data to writeBlock(). hpp.cancel() diff --git a/lib/logstorage/pipe_skip.go b/lib/logstorage/pipe_skip.go index d2cf71495..7df995376 100644 --- a/lib/logstorage/pipe_skip.go +++ b/lib/logstorage/pipe_skip.go @@ -27,27 +27,21 @@ type pipeSkipProcessor struct { rowsProcessed atomic.Uint64 } -func (spp *pipeSkipProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) { - rowsProcessed := spp.rowsProcessed.Add(uint64(len(timestamps))) +func (spp *pipeSkipProcessor) writeBlock(workerID uint, br *blockResult) { + rowsProcessed := spp.rowsProcessed.Add(uint64(len(br.timestamps))) if rowsProcessed <= spp.ps.n { return } - rowsProcessed -= uint64(len(timestamps)) + rowsProcessed -= uint64(len(br.timestamps)) if rowsProcessed >= spp.ps.n { - spp.ppBase.writeBlock(workerID, timestamps, columns) + spp.ppBase.writeBlock(workerID, br) return } - rowsRemaining := spp.ps.n - rowsProcessed - cs := make([]BlockColumn, len(columns)) - for i, c := range columns { - cDst := &cs[i] - cDst.Name = c.Name - cDst.Values = c.Values[rowsRemaining:] - } - timestamps = timestamps[rowsRemaining:] - spp.ppBase.writeBlock(workerID, timestamps, cs) + rowsSkip := spp.ps.n - rowsProcessed + br.skipRows(int(rowsSkip)) + spp.ppBase.writeBlock(workerID, br) } func (spp *pipeSkipProcessor) flush() error { diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index 25dfc6620..b42d1031b 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -35,15 +35,15 @@ type statsFunc interface { // All the statsProcessor methods are called from a single goroutine at a time, // so there is no need in the internal synchronization. type statsProcessor interface { - // updateStatsForAllRows must update statsProcessor stats from all the rows. + // updateStatsForAllRows must update statsProcessor stats for all the rows in br. // - // It must return the increase of internal state size in bytes for the statsProcessor. - updateStatsForAllRows(timestamps []int64, columns []BlockColumn) int + // It must return the change of internal state size in bytes for the statsProcessor. + updateStatsForAllRows(br *blockResult) int - // updateStatsForRow must update statsProcessor stats from the row at rowIndex. + // updateStatsForRow must update statsProcessor stats for the row at rowIndex in br. // - // It must return the increase of internal state size in bytes for the statsProcessor. - updateStatsForRow(timestamps []int64, columns []BlockColumn, rowIndex int) int + // It must return the change of internal state size in bytes for the statsProcessor. + updateStatsForRow(br *blockResult, rowIndex int) int // mergeState must merge sfp state into statsProcessor state. mergeState(sfp statsProcessor) @@ -149,7 +149,7 @@ type pipeStatsGroup struct { sfps []statsProcessor } -func (spp *pipeStatsProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) { +func (spp *pipeStatsProcessor) writeBlock(workerID uint, br *blockResult) { shard := &spp.shards[workerID] for shard.stateSizeBudget < 0 { @@ -170,60 +170,69 @@ func (spp *pipeStatsProcessor) writeBlock(workerID uint, timestamps []int64, col if len(byFields) == 0 { // Fast path - pass all the rows to a single group with empty key. for _, sfp := range shard.getStatsProcessors(nil) { - shard.stateSizeBudget -= sfp.updateStatsForAllRows(timestamps, columns) + shard.stateSizeBudget -= sfp.updateStatsForAllRows(br) } return } if len(byFields) == 1 { // Special case for grouping by a single column. - values := getBlockColumnValues(columns, byFields[0], len(timestamps)) - if isConstValue(values) { + c := br.getColumnByName(byFields[0]) + if c.isConst { // Fast path for column with constant value. - shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(values[0])) + shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(c.encodedValues[0])) for _, sfp := range shard.getStatsProcessors(shard.keyBuf) { - shard.stateSizeBudget -= sfp.updateStatsForAllRows(timestamps, columns) + shard.stateSizeBudget -= sfp.updateStatsForAllRows(br) } return } // Slower path for column with different values. + values := c.getValues(br) var sfps []statsProcessor - keyBuf := shard.keyBuf - for i := range timestamps { + keyBuf := shard.keyBuf[:0] + for i := range br.timestamps { if i <= 0 || values[i-1] != values[i] { keyBuf = encoding.MarshalBytes(keyBuf[:0], bytesutil.ToUnsafeBytes(values[i])) sfps = shard.getStatsProcessors(keyBuf) } for _, sfp := range sfps { - shard.stateSizeBudget -= sfp.updateStatsForRow(timestamps, columns, i) + shard.stateSizeBudget -= sfp.updateStatsForRow(br, i) } } shard.keyBuf = keyBuf return } - // Pre-calculate column values for byFields in order to speed up building group key in the loop below. - shard.columnValues = appendBlockColumnValues(shard.columnValues[:0], columns, byFields, len(timestamps)) - columnValues := shard.columnValues + // Verify whether all the 'by (...)' columns are constant. + areAllConstColumns := true + keyBuf := shard.keyBuf[:0] + for _, f := range byFields { + c := br.getColumnByName(f) + if !c.isConst { + areAllConstColumns = false + break + } + keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(c.encodedValues[0])) + } + shard.keyBuf = keyBuf - if areConstValues(columnValues) { - // Fast path for columns with constant values. - keyBuf := shard.keyBuf[:0] - for _, values := range columnValues { - keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[0])) - } + if areAllConstColumns { + // Fast path for constant 'by (...)' columns. for _, sfp := range shard.getStatsProcessors(keyBuf) { - shard.stateSizeBudget -= sfp.updateStatsForAllRows(timestamps, columns) + shard.stateSizeBudget -= sfp.updateStatsForAllRows(br) } - shard.keyBuf = keyBuf return } - // The slowest path - group by multiple columns. + // The slowest path - group by multiple columns with different values across rows. + + // Pre-calculate column values for byFields in order to speed up building group key in the loop below. + shard.columnValues = br.appendColumnValues(shard.columnValues[:0], byFields) + columnValues := shard.columnValues + var sfps []statsProcessor - keyBuf := shard.keyBuf - for i := range timestamps { - // verify whether the key for 'by (...)' fields equals the previous key + for i := range br.timestamps { + // Verify whether the key for 'by (...)' fields equals the previous key sameValue := sfps != nil for _, values := range columnValues { if i <= 0 || values[i-1] != values[i] { @@ -240,35 +249,12 @@ func (spp *pipeStatsProcessor) writeBlock(workerID uint, timestamps []int64, col sfps = shard.getStatsProcessors(keyBuf) } for _, sfp := range sfps { - shard.stateSizeBudget -= sfp.updateStatsForRow(timestamps, columns, i) + shard.stateSizeBudget -= sfp.updateStatsForRow(br, i) } } shard.keyBuf = keyBuf } -func areConstValues(valuess [][]string) bool { - for _, values := range valuess { - if !isConstValue(values) { - return false - } - } - return true -} - -func isConstValue(values []string) bool { - if len(values) == 0 { - // Return false, since it is impossible to get values[0] value from empty values. - return false - } - vFirst := values[0] - for _, v := range values[1:] { - if v != vFirst { - return false - } - } - return true -} - func (spp *pipeStatsProcessor) flush() error { if n := spp.stateSizeBudget.Load(); n <= 0 { return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", spp.ps.String(), spp.maxStateSize/(1<<20)) @@ -282,7 +268,7 @@ func (spp *pipeStatsProcessor) flush() error { shard := &shards[i] for key, spg := range shard.m { // shard.m may be quite big, so this loop can take a lot of time and CPU. - // Stop processing data as soon as stopCh is closed without wasting CPU time. + // Stop processing data as soon as stopCh is closed without wasting additional CPU time. select { case <-spp.stopCh: return nil @@ -309,10 +295,10 @@ func (spp *pipeStatsProcessor) flush() error { } var values []string - var columns []BlockColumn + var br blockResult for key, spg := range m { // m may be quite big, so this loop can take a lot of time and CPU. - // Stop processing data as soon as stopCh is closed without wasting CPU time. + // Stop processing data as soon as stopCh is closed without wasting additional CPU time. select { case <-spp.stopCh: return nil @@ -334,24 +320,20 @@ func (spp *pipeStatsProcessor) flush() error { logger.Panicf("BUG: unexpected number of values decoded from keyBuf; got %d; want %d", len(values), len(byFields)) } + br.reset() + // construct columns for byFields - columns = columns[:0] for i, f := range byFields { - columns = append(columns, BlockColumn{ - Name: f, - Values: values[i : i+1], - }) + br.addConstColumn(f, values[i]) } // construct columns for stats functions for _, sfp := range spg.sfps { name, value := sfp.finalizeStats() - columns = append(columns, BlockColumn{ - Name: name, - Values: []string{value}, - }) + br.addConstColumn(name, value) } - spp.ppBase.writeBlock(0, []int64{0}, columns) + + spp.ppBase.writeBlock(0, &br) } return nil diff --git a/lib/logstorage/stats_count.go b/lib/logstorage/stats_count.go index 7c96cd4b0..04469c5bb 100644 --- a/lib/logstorage/stats_count.go +++ b/lib/logstorage/stats_count.go @@ -5,6 +5,8 @@ import ( "slices" "strconv" "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) type statsCount struct { @@ -35,49 +37,149 @@ type statsCountProcessor struct { rowsCount uint64 } -func (scp *statsCountProcessor) updateStatsForAllRows(timestamps []int64, columns []BlockColumn) int { +func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int { fields := scp.sc.fields if len(fields) == 0 || scp.sc.containsStar { - // Fast path - count all the columns. - scp.rowsCount += uint64(len(timestamps)) + // Fast path - unconditionally count all the columns. + scp.rowsCount += uint64(len(br.timestamps)) return 0 } + if len(fields) == 1 { + // Fast path for count(single_column) + c := br.getColumnByName(fields[0]) + if c.isConst { + if c.encodedValues[0] != "" { + scp.rowsCount += uint64(len(br.timestamps)) + } + return 0 + } + if c.isTime { + scp.rowsCount += uint64(len(br.timestamps)) + return 0 + } + switch c.valueType { + case valueTypeString: + for _, v := range c.encodedValues { + if v != "" { + scp.rowsCount++ + } + } + return 0 + case valueTypeDict: + zeroDictIdx := slices.Index(c.dictValues, "") + if zeroDictIdx < 0 { + scp.rowsCount += uint64(len(br.timestamps)) + return 0 + } + for _, v := range c.encodedValues { + if int(v[0]) != zeroDictIdx { + scp.rowsCount++ + } + } + return 0 + case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64, valueTypeFloat64, valueTypeIPv4, valueTypeTimestampISO8601: + scp.rowsCount += uint64(len(br.timestamps)) + return 0 + default: + logger.Panicf("BUG: unknown valueType=%d", c.valueType) + return 0 + } + } // Slow path - count rows containing at least a single non-empty value for the fields enumerated inside count(). - bm := getBitmap(len(timestamps)) + bm := getBitmap(len(br.timestamps)) defer putBitmap(bm) bm.setBits() for _, f := range fields { - if idx := getBlockColumnIndex(columns, f); idx >= 0 { - values := columns[idx].Values + c := br.getColumnByName(f) + if c.isConst { + if c.encodedValues[0] != "" { + scp.rowsCount += uint64(len(br.timestamps)) + return 0 + } + continue + } + if c.isTime { + scp.rowsCount += uint64(len(br.timestamps)) + return 0 + } + switch c.valueType { + case valueTypeString: bm.forEachSetBit(func(i int) bool { - return values[i] == "" + return c.encodedValues[i] == "" }) + case valueTypeDict: + if !slices.Contains(c.dictValues, "") { + scp.rowsCount += uint64(len(br.timestamps)) + return 0 + } + bm.forEachSetBit(func(i int) bool { + dictIdx := c.encodedValues[i][0] + return c.dictValues[dictIdx] == "" + }) + case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64, valueTypeFloat64, valueTypeIPv4, valueTypeTimestampISO8601: + scp.rowsCount += uint64(len(br.timestamps)) + return 0 + default: + logger.Panicf("BUG: unknown valueType=%d", c.valueType) + return 0 } } - emptyValues := 0 + scp.rowsCount += uint64(len(br.timestamps)) bm.forEachSetBit(func(i int) bool { - emptyValues++ + scp.rowsCount-- return true }) - - scp.rowsCount += uint64(len(timestamps) - emptyValues) return 0 } -func (scp *statsCountProcessor) updateStatsForRow(_ []int64, columns []BlockColumn, rowIdx int) int { +func (scp *statsCountProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { fields := scp.sc.fields if len(fields) == 0 || scp.sc.containsStar { - // Fast path - count the given column + // Fast path - unconditionally count the given column scp.rowsCount++ return 0 } + if len(fields) == 1 { + // Fast path for count(single_column) + c := br.getColumnByName(fields[0]) + if c.isConst { + if c.encodedValues[0] != "" { + scp.rowsCount++ + } + return 0 + } + if c.isTime { + scp.rowsCount++ + return 0 + } + switch c.valueType { + case valueTypeString: + if v := c.encodedValues[rowIdx]; v != "" { + scp.rowsCount++ + } + return 0 + case valueTypeDict: + dictIdx := c.encodedValues[rowIdx][0] + if v := c.dictValues[dictIdx]; v != "" { + scp.rowsCount++ + } + return 0 + case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64, valueTypeFloat64, valueTypeIPv4, valueTypeTimestampISO8601: + scp.rowsCount++ + return 0 + default: + logger.Panicf("BUG: unknown valueType=%d", c.valueType) + return 0 + } + } // Slow path - count the row at rowIdx if at least a single field enumerated inside count() is non-empty for _, f := range fields { - if idx := getBlockColumnIndex(columns, f); idx >= 0 && columns[idx].Values[rowIdx] != "" { + c := br.getColumnByName(f) + if v := c.getValueAtRow(br, rowIdx); v != "" { scp.rowsCount++ return 0 } diff --git a/lib/logstorage/stats_sum.go b/lib/logstorage/stats_sum.go index 0ee7b32e6..4ab3dcb64 100644 --- a/lib/logstorage/stats_sum.go +++ b/lib/logstorage/stats_sum.go @@ -35,46 +35,28 @@ type statsSumProcessor struct { sum float64 } -func (ssp *statsSumProcessor) updateStatsForAllRows(timestamps []int64, columns []BlockColumn) int { +func (ssp *statsSumProcessor) updateStatsForAllRows(br *blockResult) int { if ssp.ss.containsStar { // Sum all the columns - for _, c := range columns { - ssp.sum += sumValues(c.Values) + for _, c := range br.getColumns() { + ssp.sum += c.sumValues(br) } return 0 } // Sum the requested columns for _, field := range ssp.ss.fields { - if idx := getBlockColumnIndex(columns, field); idx >= 0 { - ssp.sum += sumValues(columns[idx].Values) - } + c := br.getColumnByName(field) + ssp.sum += c.sumValues(br) } return 0 } -func sumValues(values []string) float64 { - sum := float64(0) - f := float64(0) - for i, v := range values { - if i == 0 || values[i-1] != v { - f, _ = tryParseFloat64(v) - if math.IsNaN(f) { - // Ignore NaN values, since this is the expected behaviour by most users. - f = 0 - } - } - sum += f - } - return sum -} - -func (ssp *statsSumProcessor) updateStatsForRow(_ []int64, columns []BlockColumn, rowIdx int) int { +func (ssp *statsSumProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { if ssp.ss.containsStar { // Sum all the fields for the given row - for _, c := range columns { - v := c.Values[rowIdx] - f, _ := tryParseFloat64(v) + for _, c := range br.getColumns() { + f := c.getFloatValueAtRow(rowIdx) if !math.IsNaN(f) { ssp.sum += f } @@ -84,12 +66,10 @@ func (ssp *statsSumProcessor) updateStatsForRow(_ []int64, columns []BlockColumn // Sum only the given fields for the given row for _, field := range ssp.ss.fields { - if idx := getBlockColumnIndex(columns, field); idx >= 0 { - v := columns[idx].Values[rowIdx] - f, _ := tryParseFloat64(v) - if !math.IsNaN(f) { - ssp.sum += f - } + c := br.getColumnByName(field) + f := c.getFloatValueAtRow(rowIdx) + if !math.IsNaN(f) { + ssp.sum += f } } return 0 diff --git a/lib/logstorage/stats_unique.go b/lib/logstorage/stats_unique.go index 265841f5d..721cbd53a 100644 --- a/lib/logstorage/stats_unique.go +++ b/lib/logstorage/stats_unique.go @@ -4,7 +4,6 @@ import ( "fmt" "slices" "strconv" - "strings" "unsafe" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -43,36 +42,38 @@ type statsUniqProcessor struct { keyBuf []byte } -func (sup *statsUniqProcessor) updateStatsForAllRows(timestamps []int64, columns []BlockColumn) int { +func (sup *statsUniqProcessor) updateStatsForAllRows(br *blockResult) int { fields := sup.su.fields m := sup.m stateSizeIncrease := 0 if len(fields) == 0 || sup.su.containsStar { // Count unique rows - keyBuf := sup.keyBuf - for i := range timestamps { + columns := br.getColumns() + keyBuf := sup.keyBuf[:0] + for i := range br.timestamps { seenKey := true for _, c := range columns { - values := c.Values + values := c.getValues(br) if i == 0 || values[i-1] != values[i] { seenKey = false break } } if seenKey { + // This key has been already counted. continue } allEmptyValues := true keyBuf = keyBuf[:0] for _, c := range columns { - v := c.Values[i] + v := c.getValueAtRow(br, i) if v != "" { allEmptyValues = false } // Put column name into key, since every block can contain different set of columns for '*' selector. - keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(c.Name)) + keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(c.name)) keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v)) } if allEmptyValues { @@ -88,39 +89,103 @@ func (sup *statsUniqProcessor) updateStatsForAllRows(timestamps []int64, columns return stateSizeIncrease } if len(fields) == 1 { - // Fast path for a single column - if idx := getBlockColumnIndex(columns, fields[0]); idx >= 0 { - values := columns[idx].Values - for i, v := range values { + // Fast path for a single column. + // The unique key is formed as " ? ", + // where is skipped if == 1. + // This guarantees that keys do not clash for different column types acorss blocks. + c := br.getColumnByName(fields[0]) + if c.isTime { + // Count unique br.timestamps + timestamps := br.timestamps + keyBuf := sup.keyBuf[:0] + for i, timestamp := range timestamps { + if i > 0 && timestamps[i-1] == timestamps[i] { + // This timestamp has been already counted. + continue + } + keyBuf = append(keyBuf[:0], 1) + keyBuf = encoding.MarshalInt64(keyBuf, timestamp) + if _, ok := m[string(keyBuf)]; !ok { + m[string(keyBuf)] = struct{}{} + stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) + } + } + sup.keyBuf = keyBuf + return stateSizeIncrease + } + if c.isConst { + // count unique const values + v := c.encodedValues[0] + if v == "" { + // Do not count empty values + return stateSizeIncrease + } + keyBuf := sup.keyBuf[:0] + keyBuf = append(keyBuf[:0], 0, byte(valueTypeString)) + keyBuf = append(keyBuf, v...) + if _, ok := m[string(keyBuf)]; !ok { + m[string(keyBuf)] = struct{}{} + stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) + } + sup.keyBuf = keyBuf + return stateSizeIncrease + } + if c.valueType == valueTypeDict { + // count unique non-zero c.dictValues + keyBuf := sup.keyBuf[:0] + for i, v := range c.dictValues { if v == "" { // Do not count empty values continue } - if i > 0 && values[i-1] == v { - continue - } - if _, ok := m[v]; !ok { - vCopy := strings.Clone(v) - m[vCopy] = struct{}{} - stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy)) + keyBuf = append(keyBuf[:0], 0, byte(valueTypeDict)) + keyBuf = append(keyBuf, byte(i)) + if _, ok := m[string(keyBuf)]; !ok { + m[string(keyBuf)] = struct{}{} + stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) } } + sup.keyBuf = keyBuf + return stateSizeIncrease } + + // Count unique values across encodedValues + encodedValues := c.getEncodedValues(br) + isStringValueType := c.valueType == valueTypeString + keyBuf := sup.keyBuf[:0] + for i, v := range encodedValues { + if isStringValueType && v == "" { + // Do not count empty values + continue + } + if i > 0 && encodedValues[i-1] == v { + // This value has been already counted. + continue + } + keyBuf = append(keyBuf[:0], 0, byte(c.valueType)) + keyBuf = append(keyBuf, v...) + if _, ok := m[string(keyBuf)]; !ok { + m[string(keyBuf)] = struct{}{} + stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) + } + } + keyBuf = sup.keyBuf return stateSizeIncrease } // Slow path for multiple columns. // Pre-calculate column values for byFields in order to speed up building group key in the loop below. - sup.columnValues = appendBlockColumnValues(sup.columnValues[:0], columns, fields, len(timestamps)) + sup.columnValues = br.appendColumnValues(sup.columnValues[:0], fields) columnValues := sup.columnValues - keyBuf := sup.keyBuf - for i := range timestamps { + keyBuf := sup.keyBuf[:0] + for i := range br.timestamps { seenKey := true for _, values := range columnValues { if i == 0 || values[i-1] != values[i] { seenKey = false + break } } if seenKey { @@ -149,7 +214,7 @@ func (sup *statsUniqProcessor) updateStatsForAllRows(timestamps []int64, columns return stateSizeIncrease } -func (sup *statsUniqProcessor) updateStatsForRow(timestamps []int64, columns []BlockColumn, rowIdx int) int { +func (sup *statsUniqProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { fields := sup.su.fields m := sup.m @@ -158,13 +223,13 @@ func (sup *statsUniqProcessor) updateStatsForRow(timestamps []int64, columns []B // Count unique rows allEmptyValues := true keyBuf := sup.keyBuf[:0] - for _, c := range columns { - v := c.Values[rowIdx] + for _, c := range br.getColumns() { + v := c.getValueAtRow(br, rowIdx) if v != "" { allEmptyValues = false } // Put column name into key, since every block can contain different set of columns for '*' selector. - keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(c.Name)) + keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(c.name)) keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v)) } sup.keyBuf = keyBuf @@ -180,19 +245,73 @@ func (sup *statsUniqProcessor) updateStatsForRow(timestamps []int64, columns []B return stateSizeIncrease } if len(fields) == 1 { - // Fast path for a single column - if idx := getBlockColumnIndex(columns, fields[0]); idx >= 0 { - v := columns[idx].Values[rowIdx] + // Fast path for a single column. + // The unique key is formed as " ? ", + // where is skipped if == 1. + // This guarantees that keys do not clash for different column types acorss blocks. + c := br.getColumnByName(fields[0]) + if c.isTime { + // Count unique br.timestamps + keyBuf := sup.keyBuf[:0] + keyBuf = append(keyBuf[:0], 1) + keyBuf = encoding.MarshalInt64(keyBuf, br.timestamps[rowIdx]) + if _, ok := m[string(keyBuf)]; !ok { + m[string(keyBuf)] = struct{}{} + stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) + } + sup.keyBuf = keyBuf + return stateSizeIncrease + } + if c.isConst { + // count unique const values + v := c.encodedValues[0] if v == "" { // Do not count empty values return stateSizeIncrease } - if _, ok := m[v]; !ok { - vCopy := strings.Clone(v) - m[vCopy] = struct{}{} - stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy)) + keyBuf := sup.keyBuf[:0] + keyBuf = append(keyBuf[:0], 0, byte(valueTypeString)) + keyBuf = append(keyBuf, v...) + if _, ok := m[string(keyBuf)]; !ok { + m[string(keyBuf)] = struct{}{} + stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) } + sup.keyBuf = keyBuf + return stateSizeIncrease } + if c.valueType == valueTypeDict { + // count unique non-zero c.dictValues + dictIdx := c.encodedValues[rowIdx][0] + if c.dictValues[dictIdx] == "" { + // Do not count empty values + return stateSizeIncrease + } + keyBuf := sup.keyBuf[:0] + keyBuf = append(keyBuf[:0], 0, byte(valueTypeDict)) + keyBuf = append(keyBuf, dictIdx) + if _, ok := m[string(keyBuf)]; !ok { + m[string(keyBuf)] = struct{}{} + stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) + } + sup.keyBuf = keyBuf + return stateSizeIncrease + } + + // Count unique values across encodedValues + encodedValues := c.getEncodedValues(br) + v := encodedValues[rowIdx] + if c.valueType == valueTypeString && v == "" { + // Do not count empty values + return stateSizeIncrease + } + keyBuf := sup.keyBuf[:0] + keyBuf = append(keyBuf[:0], 0, byte(c.valueType)) + keyBuf = append(keyBuf, v...) + if _, ok := m[string(keyBuf)]; !ok { + m[string(keyBuf)] = struct{}{} + stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) + } + keyBuf = sup.keyBuf return stateSizeIncrease } @@ -200,10 +319,8 @@ func (sup *statsUniqProcessor) updateStatsForRow(timestamps []int64, columns []B allEmptyValues := true keyBuf := sup.keyBuf[:0] for _, f := range fields { - v := "" - if idx := getBlockColumnIndex(columns, f); idx >= 0 { - v = columns[idx].Values[rowIdx] - } + c := br.getColumnByName(f) + v := c.getValueAtRow(br, rowIdx) if v != "" { allEmptyValues = false } diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index d70dcfc32..1cb3ebb41 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -63,7 +63,24 @@ func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query, workersCount := cgroup.AvailableCPUs() - pp := newDefaultPipeProcessor(writeBlock) + pp := newDefaultPipeProcessor(func(workerID uint, br *blockResult) { + brs := getBlockRows() + csDst := brs.cs + + csSrc := br.getColumns() + for _, c := range csSrc { + values := c.getValues(br) + csDst = append(csDst, BlockColumn{ + Name: c.name, + Values: values, + }) + } + writeBlock(workerID, br.timestamps, csDst) + + brs.cs = csDst + putBlockRows(brs) + }) + ppMain := pp stopCh := ctx.Done() cancels := make([]func(), len(q.pipes)) @@ -79,21 +96,7 @@ func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query, pps[i] = pp } - s.search(workersCount, so, stopCh, func(workerID uint, br *blockResult) { - brs := getBlockRows() - cs := brs.cs - - for i, columnName := range br.columnNames { - cs = append(cs, BlockColumn{ - Name: columnName, - Values: br.getColumnValues(i), - }) - } - pp.writeBlock(workerID, br.timestamps, cs) - - brs.cs = cs - putBlockRows(brs) - }) + s.search(workersCount, so, stopCh, pp.writeBlock) var errFlush error for i, pp := range pps { @@ -150,18 +153,6 @@ func (c *BlockColumn) reset() { c.Values = nil } -func areSameBlockColumns(columns []BlockColumn, columnNames []string) bool { - if len(columnNames) != len(columns) { - return false - } - for i, name := range columnNames { - if columns[i].Name != name { - return false - } - } - return true -} - func getBlockColumnIndex(columns []BlockColumn, columnName string) int { for i, c := range columns { if c.Name == columnName { @@ -237,7 +228,7 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch } bs.search(bsw) - if bs.br.RowsCount() > 0 { + if len(bs.br.timestamps) > 0 { processBlockResult(workerID, &bs.br) } } diff --git a/lib/logstorage/storage_search_test.go b/lib/logstorage/storage_search_test.go index 349b3b2f6..5c1c00bbf 100644 --- a/lib/logstorage/storage_search_test.go +++ b/lib/logstorage/storage_search_test.go @@ -466,7 +466,7 @@ func TestStorageSearch(t *testing.T) { if !br.streamID.tenantID.equal(&tenantID) { panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID)) } - rowsCountTotal.Add(uint32(br.RowsCount())) + rowsCountTotal.Add(uint32(len(br.timestamps))) } s.search(workersCount, so, nil, processBlock) @@ -487,7 +487,7 @@ func TestStorageSearch(t *testing.T) { } var rowsCountTotal atomic.Uint32 processBlock := func(_ uint, br *blockResult) { - rowsCountTotal.Add(uint32(br.RowsCount())) + rowsCountTotal.Add(uint32(len(br.timestamps))) } s.search(workersCount, so, nil, processBlock) @@ -531,7 +531,7 @@ func TestStorageSearch(t *testing.T) { if !br.streamID.tenantID.equal(&tenantID) { panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID)) } - rowsCountTotal.Add(uint32(br.RowsCount())) + rowsCountTotal.Add(uint32(len(br.timestamps))) } s.search(workersCount, so, nil, processBlock) @@ -560,7 +560,7 @@ func TestStorageSearch(t *testing.T) { if !br.streamID.tenantID.equal(&tenantID) { panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID)) } - rowsCountTotal.Add(uint32(br.RowsCount())) + rowsCountTotal.Add(uint32(len(br.timestamps))) } s.search(workersCount, so, nil, processBlock) @@ -597,7 +597,7 @@ func TestStorageSearch(t *testing.T) { if !br.streamID.tenantID.equal(&tenantID) { panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID)) } - rowsCountTotal.Add(uint32(br.RowsCount())) + rowsCountTotal.Add(uint32(len(br.timestamps))) } s.search(workersCount, so, nil, processBlock) @@ -622,7 +622,7 @@ func TestStorageSearch(t *testing.T) { } var rowsCountTotal atomic.Uint32 processBlock := func(_ uint, br *blockResult) { - rowsCountTotal.Add(uint32(br.RowsCount())) + rowsCountTotal.Add(uint32(len(br.timestamps))) } s.search(workersCount, so, nil, processBlock)