diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index e2c09cc5c..7c40abde5 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -16,6 +16,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip * FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): add button for enabling auto refresh, similarly to VictoriaMetrics vmui. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7017). +* FEATURE: improve performance of analytical queries, which do not need reading the `_time` field. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7070). * BUGFIX: properly return logs without [`_msg`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) field when `*` query is passed to [`/select/logsql/query` endpoint](https://docs.victoriametrics.com/victorialogs/querying/#querying-logs) together with positive `limit` arg. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6785). Thanks to @jiekun for itentifying the root cause of the issue. diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index 2dc30a3fb..a4e14f695 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -21,14 +21,29 @@ import ( // // It is expected that its contents is accessed only from a single goroutine at a time. type blockResult struct { + // rowsLen is the number of rows in the given blockResult. + rowsLen int + + // bs is the associated blockSearch for the given blockResult. + // + // bs is nil for the blockResult constructed by pipes. + bs *blockSearch + + // bm is the associated bitmap for the given blockResult. + // + // bm is nil for the blockResult constructed by pipes. + bm *bitmap + // a holds all the bytes behind the requested column values in the block. a arena - // values holds all the requested column values in the block. + // valuesBuf holds all the requested column values in the block. valuesBuf []string - // timestamps contain timestamps for the selected log entries in the block. - timestamps []int64 + // timestampsBuf contains cached timestamps for the selected log entries in the block. + // + // timestamps must be obtained via blockResult.getTimestamps() call. + timestampsBuf []int64 // csBuf contains requested columns. csBuf []blockResultColumn @@ -47,12 +62,17 @@ type blockResult struct { } func (br *blockResult) reset() { + br.rowsLen = 0 + + br.cs = nil + br.bm = nil + br.a.reset() clear(br.valuesBuf) br.valuesBuf = br.valuesBuf[:0] - br.timestamps = br.timestamps[:0] + br.timestampsBuf = br.timestampsBuf[:0] clear(br.csBuf) br.csBuf = br.csBuf[:0] @@ -76,6 +96,11 @@ func (br *blockResult) reset() { func (br *blockResult) clone() *blockResult { brNew := &blockResult{} + brNew.rowsLen = br.rowsLen + + // do not clone br.cs, since it may be updated at any time. + // do not clone br.bm, since it may be updated at any time. + cs := br.getColumns() // Pre-populate values in every column in order to properly calculate the needed backing buffer size below. @@ -96,8 +121,10 @@ func (br *blockResult) clone() *blockResult { } brNew.valuesBuf = make([]string, 0, valuesBufLen) - brNew.timestamps = make([]int64, len(br.timestamps)) - copy(brNew.timestamps, br.timestamps) + srcTimestamps := br.getTimestamps() + brNew.timestampsBuf = make([]int64, len(srcTimestamps)) + copy(brNew.timestampsBuf, srcTimestamps) + brNew.checkTimestampsLen() csNew := make([]blockResultColumn, len(cs)) for i, c := range cs { @@ -112,18 +139,19 @@ func (br *blockResult) clone() *blockResult { return brNew } -// initFromFilterAllColumns initializes br from brSrc by copying rows identified by set bets at bm. +// initFromFilterAllColumns initializes br from brSrc by copying rows identified by set bits at bm. // // The br is valid until brSrc or bm is updated. func (br *blockResult) initFromFilterAllColumns(brSrc *blockResult, bm *bitmap) { br.reset() - srcTimestamps := brSrc.timestamps - dstTimestamps := br.timestamps[:0] + srcTimestamps := brSrc.getTimestamps() + dstTimestamps := br.timestampsBuf[:0] bm.forEachSetBitReadonly(func(idx int) { dstTimestamps = append(dstTimestamps, srcTimestamps[idx]) }) - br.timestamps = dstTimestamps + br.timestampsBuf = dstTimestamps + br.rowsLen = len(br.timestampsBuf) for _, cSrc := range brSrc.getColumns() { br.appendFilteredColumn(brSrc, cSrc, bm) @@ -134,7 +162,7 @@ func (br *blockResult) initFromFilterAllColumns(brSrc *blockResult, bm *bitmap) // // the br is valid until brSrc, cSrc or bm is updated. func (br *blockResult) appendFilteredColumn(brSrc *blockResult, cSrc *blockResultColumn, bm *bitmap) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } cDst := blockResultColumn{ @@ -211,7 +239,7 @@ func (br *blockResult) sizeBytes() int { n += br.a.sizeBytes() n += cap(br.valuesBuf) * int(unsafe.Sizeof(br.valuesBuf[0])) - n += cap(br.timestamps) * int(unsafe.Sizeof(br.timestamps[0])) + n += cap(br.timestampsBuf) * int(unsafe.Sizeof(br.timestampsBuf[0])) n += cap(br.csBuf) * int(unsafe.Sizeof(br.csBuf[0])) n += cap(br.cs) * int(unsafe.Sizeof(br.cs[0])) @@ -221,10 +249,10 @@ func (br *blockResult) sizeBytes() int { // setResultColumns sets the given rcs as br columns. // // The br is valid only until rcs are modified. -func (br *blockResult) setResultColumns(rcs []resultColumn, rowsCount int) { +func (br *blockResult) setResultColumns(rcs []resultColumn, rowsLen int) { br.reset() - br.timestamps = fastnum.AppendInt64Zeros(br.timestamps[:0], rowsCount) + br.rowsLen = rowsLen for i := range rcs { br.addResultColumn(&rcs[i]) @@ -232,8 +260,8 @@ func (br *blockResult) setResultColumns(rcs []resultColumn, rowsCount int) { } func (br *blockResult) addResultColumn(rc *resultColumn) { - if len(rc.values) != len(br.timestamps) { - logger.Panicf("BUG: column %q must contain %d rows, but it contains %d rows", rc.name, len(br.timestamps), len(rc.values)) + if len(rc.values) != br.rowsLen { + logger.Panicf("BUG: column %q must contain %d rows, but it contains %d rows", rc.name, br.rowsLen, len(rc.values)) } if areConstValues(rc.values) { // This optimization allows reducing memory usage after br cloning @@ -252,11 +280,9 @@ func (br *blockResult) addResultColumn(rc *resultColumn) { br.csInitialized = false } -// initAllColumns initializes all the columns in br according to bs and bm. -// -// The initialized columns are valid until bs and bm are changed. -func (br *blockResult) initAllColumns(bs *blockSearch, bm *bitmap) { - unneededColumnNames := bs.bsw.so.unneededColumnNames +// initAllColumns initializes all the columns in br. +func (br *blockResult) initAllColumns() { + unneededColumnNames := br.bs.bsw.so.unneededColumnNames if !slices.Contains(unneededColumnNames, "_time") { // Add _time column @@ -265,12 +291,12 @@ func (br *blockResult) initAllColumns(bs *blockSearch, bm *bitmap) { if !slices.Contains(unneededColumnNames, "_stream_id") { // Add _stream_id column - br.addStreamIDColumn(bs) + br.addStreamIDColumn() } if !slices.Contains(unneededColumnNames, "_stream") { // Add _stream column - if !br.addStreamColumn(bs) { + if !br.addStreamColumn() { // Skip the current block, since the associated stream tags are missing br.reset() return @@ -279,18 +305,18 @@ func (br *blockResult) initAllColumns(bs *blockSearch, bm *bitmap) { if !slices.Contains(unneededColumnNames, "_msg") { // Add _msg column - v := bs.csh.getConstColumnValue("_msg") + v := br.bs.csh.getConstColumnValue("_msg") if v != "" { br.addConstColumn("_msg", v) - } else if ch := bs.csh.getColumnHeader("_msg"); ch != nil { - br.addColumn(bs, bm, ch) + } else if ch := br.bs.csh.getColumnHeader("_msg"); ch != nil { + br.addColumn(ch) } else { br.addConstColumn("_msg", "") } } // Add other const columns - for _, cc := range bs.csh.constColumns { + for _, cc := range br.bs.csh.constColumns { if isMsgFieldName(cc.Name) { continue } @@ -300,30 +326,28 @@ func (br *blockResult) initAllColumns(bs *blockSearch, bm *bitmap) { } // Add other non-const columns - chs := bs.csh.columnHeaders + chs := br.bs.csh.columnHeaders for i := range chs { ch := &chs[i] if isMsgFieldName(ch.name) { continue } if !slices.Contains(unneededColumnNames, ch.name) { - br.addColumn(bs, bm, ch) + br.addColumn(ch) } } br.csInitFast() } -// initRequestedColumns initialized only requested columns in br according to bs and bm. -// -// The initialized columns are valid until bs and bm are changed. -func (br *blockResult) initRequestedColumns(bs *blockSearch, bm *bitmap) { - for _, columnName := range bs.bsw.so.neededColumnNames { +// initRequestedColumns initialized only requested columns in br. +func (br *blockResult) initRequestedColumns() { + for _, columnName := range br.bs.bsw.so.neededColumnNames { switch columnName { case "_stream_id": - br.addStreamIDColumn(bs) + br.addStreamIDColumn() case "_stream": - if !br.addStreamColumn(bs) { + if !br.addStreamColumn() { // Skip the current block, since the associated stream tags are missing. br.reset() return @@ -331,11 +355,11 @@ func (br *blockResult) initRequestedColumns(bs *blockSearch, bm *bitmap) { case "_time": br.addTimeColumn() default: - v := bs.csh.getConstColumnValue(columnName) + v := br.bs.csh.getConstColumnValue(columnName) if v != "" { br.addConstColumn(columnName, v) - } else if ch := bs.csh.getColumnHeader(columnName); ch != nil { - br.addColumn(bs, bm, ch) + } else if ch := br.bs.csh.getColumnHeader(columnName); ch != nil { + br.addColumn(ch) } else { br.addConstColumn(columnName, "") } @@ -345,38 +369,92 @@ func (br *blockResult) initRequestedColumns(bs *blockSearch, bm *bitmap) { br.csInitFast() } +// mustInit initializes br with the given bs and bm. +// +// br is valid until bs or bm changes. func (br *blockResult) mustInit(bs *blockSearch, bm *bitmap) { br.reset() - if bm.isZero() { - // Nothing to initialize for zero matching log entries in the block. + br.rowsLen = bm.onesCount() + if br.rowsLen == 0 { return } - // Initialize timestamps, since they are required for all the further work with br. - so := bs.bsw.so - if !so.needAllColumns && !slices.Contains(so.neededColumnNames, "_time") || so.needAllColumns && slices.Contains(so.unneededColumnNames, "_time") { - // The fastest path - _time column wasn't requested, so it is enough to initialize br.timestamps with zeroes. - rowsLen := bm.onesCount() - br.timestamps = fastnum.AppendInt64Zeros(br.timestamps[:0], rowsLen) + br.bs = bs + br.bm = bm +} + +func (br *blockResult) getMinTimestamp() int64 { + if br.bm != nil && br.bm.bitsLen == br.rowsLen { + return br.bs.bsw.bh.timestampsHeader.minTimestamp + } + + timestamps := br.getTimestamps() + if len(timestamps) == 0 { + return -1 << 63 + } + minTimestamp := timestamps[0] + for i := 1; i < len(timestamps); i++ { + if timestamps[i] < minTimestamp { + minTimestamp = timestamps[i] + } + } + return minTimestamp +} + +func (br *blockResult) getMaxTimestamp() int64 { + if br.bm != nil && br.bm.bitsLen == br.rowsLen { + return br.bs.bsw.bh.timestampsHeader.maxTimestamp + } + + timestamps := br.getTimestamps() + if len(timestamps) == 0 { + return (1 << 63) - 1 + } + maxTimestamp := timestamps[len(timestamps)-1] + for i := len(timestamps) - 2; i >= 0; i-- { + if timestamps[i] > maxTimestamp { + maxTimestamp = timestamps[i] + } + } + return maxTimestamp +} + +func (br *blockResult) getTimestamps() []int64 { + if br.rowsLen > 0 && len(br.timestampsBuf) == 0 { + br.initTimestamps() + } + return br.timestampsBuf +} + +func (br *blockResult) initTimestamps() { + if br.bs == nil { + br.timestampsBuf = fastnum.AppendInt64Zeros(br.timestampsBuf[:0], br.rowsLen) return } - // Slow path - the _time column is requested, so we need to initialize br.timestamps with real timestamps. - srcTimestamps := bs.getTimestamps() - if bm.areAllBitsSet() { + srcTimestamps := br.bs.getTimestamps() + if br.bm.areAllBitsSet() { // Fast path - all the rows in the block are selected, so copy all the timestamps without any filtering. - br.timestamps = append(br.timestamps[:0], srcTimestamps...) + br.timestampsBuf = append(br.timestampsBuf[:0], srcTimestamps...) + br.checkTimestampsLen() return } // Slow path - copy only the needed timestamps to br according to filter results. - dstTimestamps := br.timestamps[:0] - bm.forEachSetBitReadonly(func(idx int) { + dstTimestamps := br.timestampsBuf[:0] + br.bm.forEachSetBitReadonly(func(idx int) { ts := srcTimestamps[idx] dstTimestamps = append(dstTimestamps, ts) }) - br.timestamps = dstTimestamps + br.timestampsBuf = dstTimestamps + br.checkTimestampsLen() +} + +func (br *blockResult) checkTimestampsLen() { + if len(br.timestampsBuf) != br.rowsLen { + logger.Panicf("BUG: unexpected number of timestamps; got %d; want %d", len(br.timestampsBuf), br.rowsLen) + } } func (br *blockResult) newValuesEncodedFromColumnHeader(bs *blockSearch, bm *bitmap, ch *columnHeader) []string { @@ -454,8 +532,8 @@ func (br *blockResult) newValuesEncodedFromColumnHeader(bs *blockSearch, bm *bit // addColumn adds column for the given ch to br. // -// The added column is valid until bs, bm or ch is changed. -func (br *blockResult) addColumn(bs *blockSearch, bm *bitmap, ch *columnHeader) { +// The added column is valid until ch is changed. +func (br *blockResult) addColumn(ch *columnHeader) { br.csBuf = append(br.csBuf, blockResultColumn{ name: getCanonicalColumnName(ch.name), valueType: ch.valueType, @@ -466,8 +544,8 @@ func (br *blockResult) addColumn(bs *blockSearch, bm *bitmap, ch *columnHeader) c := &br.csBuf[len(br.csBuf)-1] br.svecs = append(br.svecs, searchValuesEncodedCreator{ - bs: bs, - bm: bm, + bs: br.bs, + bm: br.bm, ch: ch, }) c.valuesEncodedCreator = &br.svecs[len(br.svecs)-1] @@ -492,15 +570,15 @@ func (br *blockResult) addTimeColumn() { br.csInitialized = false } -func (br *blockResult) addStreamIDColumn(bs *blockSearch) { +func (br *blockResult) addStreamIDColumn() { bb := bbPool.Get() - bb.B = bs.bsw.bh.streamID.marshalString(bb.B) + bb.B = br.bs.bsw.bh.streamID.marshalString(bb.B) br.addConstColumn("_stream_id", bytesutil.ToUnsafeString(bb.B)) bbPool.Put(bb) } -func (br *blockResult) addStreamColumn(bs *blockSearch) bool { - streamStr := bs.getStreamStr() +func (br *blockResult) addStreamColumn() bool { + streamStr := br.bs.getStreamStr() if streamStr == "" { return false } @@ -562,16 +640,16 @@ func (br *blockResult) newValuesBucketedForColumn(c *blockResultColumn, bf *bySt func (br *blockResult) getBucketedConstValues(v string, bf *byStatsField) []string { if v == "" { // Fast path - return a slice of empty strings without constructing the slice. - return getEmptyStrings(len(br.timestamps)) + return getEmptyStrings(br.rowsLen) } - // Slower path - construct slice of identical values with the len(br.timestamps) + // Slower path - construct slice of identical values with the length equal to br.rowsLen valuesBuf := br.valuesBuf valuesBufLen := len(valuesBuf) v = br.getBucketedValue(v, bf) - for range br.timestamps { + for i := 0; i < br.rowsLen; i++ { valuesBuf = append(valuesBuf, v) } @@ -585,7 +663,7 @@ func (br *blockResult) getBucketedTimestampValues(bf *byStatsField) []string { valuesBuf := br.valuesBuf valuesBufLen := len(valuesBuf) - timestamps := br.timestamps + timestamps := br.getTimestamps() var s string if !bf.hasBucketConfig() { @@ -1401,7 +1479,11 @@ func getBlockResultColumnIdxByName(cs []*blockResultColumn, name string) int { } func (br *blockResult) skipRows(skipRows int) { - br.timestamps = append(br.timestamps[:0], br.timestamps[skipRows:]...) + timestamps := br.getTimestamps() + br.timestampsBuf = append(br.timestampsBuf[:0], timestamps[skipRows:]...) + br.rowsLen -= skipRows + br.checkTimestampsLen() + for _, c := range br.getColumns() { if c.values != nil { c.values = append(c.values[:0], c.values[skipRows:]...) @@ -1421,7 +1503,11 @@ func (br *blockResult) skipRows(skipRows int) { } func (br *blockResult) truncateRows(keepRows int) { - br.timestamps = br.timestamps[:keepRows] + timestamps := br.getTimestamps() + br.timestampsBuf = append(br.timestampsBuf[:0], timestamps[:keepRows]...) + br.rowsLen = keepRows + br.checkTimestampsLen() + for _, c := range br.getColumns() { if c.values != nil { c.values = c.values[:keepRows] @@ -1676,10 +1762,10 @@ func (c *blockResultColumn) getFloatValueAtRow(br *blockResult, rowIdx int) (flo func (c *blockResultColumn) sumLenValues(br *blockResult) uint64 { if c.isConst { v := c.valuesEncoded[0] - return uint64(len(v)) * uint64(len(br.timestamps)) + return uint64(len(v)) * uint64(br.rowsLen) } if c.isTime { - return uint64(len(time.RFC3339Nano)) * uint64(len(br.timestamps)) + return uint64(len(time.RFC3339Nano)) * uint64(br.rowsLen) } switch c.valueType { @@ -1707,7 +1793,7 @@ func (c *blockResultColumn) sumLenValues(br *blockResult) uint64 { case valueTypeIPv4: return c.sumLenStringValues(br) case valueTypeTimestampISO8601: - return uint64(len(iso8601Timestamp)) * uint64(len(br.timestamps)) + return uint64(len(iso8601Timestamp)) * uint64(br.rowsLen) default: logger.Panicf("BUG: unknown valueType=%d", c.valueType) return 0 @@ -1729,7 +1815,7 @@ func (c *blockResultColumn) sumValues(br *blockResult) (float64, int) { if !ok { return 0, 0 } - return f * float64(len(br.timestamps)), len(br.timestamps) + return f * float64(br.rowsLen), br.rowsLen } if c.isTime { return 0, 0 @@ -1780,25 +1866,25 @@ func (c *blockResultColumn) sumValues(br *blockResult) (float64, int) { for _, v := range c.getValuesEncoded(br) { sum += uint64(unmarshalUint8(v)) } - return float64(sum), len(br.timestamps) + return float64(sum), br.rowsLen case valueTypeUint16: sum := uint64(0) for _, v := range c.getValuesEncoded(br) { sum += uint64(unmarshalUint16(v)) } - return float64(sum), len(br.timestamps) + return float64(sum), br.rowsLen case valueTypeUint32: sum := uint64(0) for _, v := range c.getValuesEncoded(br) { sum += uint64(unmarshalUint32(v)) } - return float64(sum), len(br.timestamps) + return float64(sum), br.rowsLen case valueTypeUint64: sum := float64(0) for _, v := range c.getValuesEncoded(br) { sum += float64(unmarshalUint64(v)) } - return sum, len(br.timestamps) + return sum, br.rowsLen case valueTypeFloat64: sum := float64(0) for _, v := range c.getValuesEncoded(br) { @@ -1807,7 +1893,7 @@ func (c *blockResultColumn) sumValues(br *blockResult) (float64, int) { sum += f } } - return sum, len(br.timestamps) + return sum, br.rowsLen case valueTypeIPv4: return 0, 0 case valueTypeTimestampISO8601: @@ -1864,15 +1950,15 @@ func truncateTimestampToYear(timestamp int64) int64 { return time.Date(t.Year(), time.January, 1, 0, 0, 0, 0, time.UTC).UnixNano() } -func getEmptyStrings(rowsCount int) []string { +func getEmptyStrings(rowsLen int) []string { p := emptyStrings.Load() if p == nil { - values := make([]string, rowsCount) + values := make([]string, rowsLen) emptyStrings.Store(&values) return values } values := *p - return slicesutil.SetLength(values, rowsCount) + return slicesutil.SetLength(values, rowsLen) } var emptyStrings atomic.Pointer[[]string] diff --git a/lib/logstorage/block_search.go b/lib/logstorage/block_search.go index 3512dd321..d260ba264 100644 --- a/lib/logstorage/block_search.go +++ b/lib/logstorage/block_search.go @@ -177,9 +177,9 @@ func (bs *blockSearch) search(bsw *blockSearchWork, bm *bitmap) { // fetch the requested columns to bs.br. if bs.bsw.so.needAllColumns { - bs.br.initAllColumns(bs, bm) + bs.br.initAllColumns() } else { - bs.br.initRequestedColumns(bs, bm) + bs.br.initRequestedColumns() } } diff --git a/lib/logstorage/filter_day_range.go b/lib/logstorage/filter_day_range.go index 0f7608894..4ad88f0a8 100644 --- a/lib/logstorage/filter_day_range.go +++ b/lib/logstorage/filter_day_range.go @@ -47,7 +47,7 @@ func (fr *filterDayRange) applyToBlockResult(br *blockResult, bm *bitmap) { return } if c.isTime { - timestamps := br.timestamps + timestamps := br.getTimestamps() bm.forEachSetBit(func(idx int) bool { timestamp := timestamps[idx] return fr.matchTimestampValue(timestamp) diff --git a/lib/logstorage/filter_range.go b/lib/logstorage/filter_range.go index 3ad8233fb..8776730bb 100644 --- a/lib/logstorage/filter_range.go +++ b/lib/logstorage/filter_range.go @@ -44,9 +44,10 @@ func (fr *filterRange) applyToBlockResult(br *blockResult, bm *bitmap) { return } if c.isTime { + timestamps := br.getTimestamps() minValueInt, maxValueInt := toInt64Range(minValue, maxValue) bm.forEachSetBit(func(idx int) bool { - timestamp := br.timestamps[idx] + timestamp := timestamps[idx] return timestamp >= minValueInt && timestamp <= maxValueInt }) return diff --git a/lib/logstorage/filter_test.go b/lib/logstorage/filter_test.go index 67cc5764a..90f288280 100644 --- a/lib/logstorage/filter_test.go +++ b/lib/logstorage/filter_test.go @@ -213,11 +213,12 @@ func testFilterMatchForStorage(t *testing.T, s *Storage, tenantID TenantID, f fi t.Fatalf("unexpected number of columns in blockResult; got %d; want 2", len(cs)) } values := cs[0].getValues(br) + timestamps := br.getTimestamps() resultsMu.Lock() for i, v := range values { results = append(results, result{ value: strings.Clone(v), - timestamp: br.timestamps[i], + timestamp: timestamps[i], }) } resultsMu.Unlock() diff --git a/lib/logstorage/filter_time.go b/lib/logstorage/filter_time.go index 1009657f6..dfe1a5788 100644 --- a/lib/logstorage/filter_time.go +++ b/lib/logstorage/filter_time.go @@ -41,7 +41,7 @@ func (ft *filterTime) applyToBlockResult(br *blockResult, bm *bitmap) { return } if c.isTime { - timestamps := br.timestamps + timestamps := br.getTimestamps() bm.forEachSetBit(func(idx int) bool { timestamp := timestamps[idx] return ft.matchTimestampValue(timestamp) diff --git a/lib/logstorage/filter_week_range.go b/lib/logstorage/filter_week_range.go index c83ea89f9..3b17845ef 100644 --- a/lib/logstorage/filter_week_range.go +++ b/lib/logstorage/filter_week_range.go @@ -49,7 +49,7 @@ func (fr *filterWeekRange) applyToBlockResult(br *blockResult, bm *bitmap) { return } if c.isTime { - timestamps := br.timestamps + timestamps := br.getTimestamps() bm.forEachSetBit(func(idx int) bool { timestamp := timestamps[idx] return fr.matchTimestampValue(timestamp) diff --git a/lib/logstorage/pipe_copy.go b/lib/logstorage/pipe_copy.go index 1f1c12873..997023167 100644 --- a/lib/logstorage/pipe_copy.go +++ b/lib/logstorage/pipe_copy.go @@ -79,7 +79,7 @@ type pipeCopyProcessor struct { } func (pcp *pipeCopyProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } diff --git a/lib/logstorage/pipe_delete.go b/lib/logstorage/pipe_delete.go index 15ef3c7ec..e61fcfcc6 100644 --- a/lib/logstorage/pipe_delete.go +++ b/lib/logstorage/pipe_delete.go @@ -59,7 +59,7 @@ type pipeDeleteProcessor struct { } func (pdp *pipeDeleteProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } diff --git a/lib/logstorage/pipe_drop_empty_fields.go b/lib/logstorage/pipe_drop_empty_fields.go index 98f846f65..ba18ad65b 100644 --- a/lib/logstorage/pipe_drop_empty_fields.go +++ b/lib/logstorage/pipe_drop_empty_fields.go @@ -66,7 +66,7 @@ type pipeDropEmptyFieldsProcessorShardNopad struct { } func (pdp *pipeDropEmptyFieldsProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } @@ -90,7 +90,7 @@ func (pdp *pipeDropEmptyFieldsProcessor) writeBlock(workerID uint, br *blockResu shard.wctx.init(workerID, pdp.ppNext) fields := shard.fields - for rowIdx := range br.timestamps { + for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ { fields = fields[:0] for i, values := range columnValues { v := values[rowIdx] diff --git a/lib/logstorage/pipe_extract.go b/lib/logstorage/pipe_extract.go index 2aeb49a3c..89f518e31 100644 --- a/lib/logstorage/pipe_extract.go +++ b/lib/logstorage/pipe_extract.go @@ -151,7 +151,7 @@ type pipeExtractProcessorShardNopad struct { } func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } @@ -159,7 +159,7 @@ func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) { shard := &pep.shards[workerID] bm := &shard.bm - bm.init(len(br.timestamps)) + bm.init(br.rowsLen) bm.setBits() if iff := pe.iff; iff != nil { iff.f.applyToBlockResult(br, bm) diff --git a/lib/logstorage/pipe_extract_regexp.go b/lib/logstorage/pipe_extract_regexp.go index 1f7357b21..3ce6ee503 100644 --- a/lib/logstorage/pipe_extract_regexp.go +++ b/lib/logstorage/pipe_extract_regexp.go @@ -175,7 +175,7 @@ type pipeExtractRegexpProcessorShardNopad struct { } func (pep *pipeExtractRegexpProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } @@ -183,7 +183,7 @@ func (pep *pipeExtractRegexpProcessor) writeBlock(workerID uint, br *blockResult shard := &pep.shards[workerID] bm := &shard.bm - bm.init(len(br.timestamps)) + bm.init(br.rowsLen) bm.setBits() if iff := pe.iff; iff != nil { iff.f.applyToBlockResult(br, bm) diff --git a/lib/logstorage/pipe_field_names.go b/lib/logstorage/pipe_field_names.go index 177b3eb84..0ef824ed7 100644 --- a/lib/logstorage/pipe_field_names.go +++ b/lib/logstorage/pipe_field_names.go @@ -94,7 +94,7 @@ func (shard *pipeFieldNamesProcessorShard) getM() map[string]*uint64 { } func (pfp *pipeFieldNamesProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } @@ -113,7 +113,7 @@ func (pfp *pipeFieldNamesProcessor) writeBlock(workerID uint, br *blockResult) { // Assume that the column is set for all the rows in the block. // This is much faster than reading all the column values and counting non-empty rows. - *pHits += uint64(len(br.timestamps)) + *pHits += uint64(br.rowsLen) } } diff --git a/lib/logstorage/pipe_fields.go b/lib/logstorage/pipe_fields.go index 0b4f4186e..f08e5c608 100644 --- a/lib/logstorage/pipe_fields.go +++ b/lib/logstorage/pipe_fields.go @@ -79,7 +79,7 @@ type pipeFieldsProcessor struct { } func (pfp *pipeFieldsProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } diff --git a/lib/logstorage/pipe_filter.go b/lib/logstorage/pipe_filter.go index 3dd22e5e0..7b7308342 100644 --- a/lib/logstorage/pipe_filter.go +++ b/lib/logstorage/pipe_filter.go @@ -83,14 +83,14 @@ type pipeFilterProcessorShardNopad struct { } func (pfp *pipeFilterProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } shard := &pfp.shards[workerID] bm := &shard.bm - bm.init(len(br.timestamps)) + bm.init(br.rowsLen) bm.setBits() pfp.pf.f.applyToBlockResult(br, bm) if bm.areAllBitsSet() { diff --git a/lib/logstorage/pipe_format.go b/lib/logstorage/pipe_format.go index b6e80573e..765810186 100644 --- a/lib/logstorage/pipe_format.go +++ b/lib/logstorage/pipe_format.go @@ -136,7 +136,7 @@ type pipeFormatProcessorShardNopad struct { } func (pfp *pipeFormatProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } @@ -144,7 +144,7 @@ func (pfp *pipeFormatProcessor) writeBlock(workerID uint, br *blockResult) { pf := pfp.pf bm := &shard.bm - bm.init(len(br.timestamps)) + bm.init(br.rowsLen) bm.setBits() if iff := pf.iff; iff != nil { iff.f.applyToBlockResult(br, bm) @@ -157,7 +157,7 @@ func (pfp *pipeFormatProcessor) writeBlock(workerID uint, br *blockResult) { shard.rc.name = pf.resultField resultColumn := br.getColumnByName(pf.resultField) - for rowIdx := range br.timestamps { + for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ { v := "" if bm.isSetBit(rowIdx) { v = shard.formatRow(pf, br, rowIdx) diff --git a/lib/logstorage/pipe_limit.go b/lib/logstorage/pipe_limit.go index 71ddac098..6dec31bae 100644 --- a/lib/logstorage/pipe_limit.go +++ b/lib/logstorage/pipe_limit.go @@ -57,11 +57,11 @@ type pipeLimitProcessor struct { } func (plp *pipeLimitProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } - rowsProcessed := plp.rowsProcessed.Add(uint64(len(br.timestamps))) + rowsProcessed := plp.rowsProcessed.Add(uint64(br.rowsLen)) limit := plp.pl.limit if rowsProcessed <= limit { // Fast path - write all the rows to ppNext. @@ -73,7 +73,7 @@ func (plp *pipeLimitProcessor) writeBlock(workerID uint, br *blockResult) { } // Slow path - overflow. Write the remaining rows if needed. - rowsProcessed -= uint64(len(br.timestamps)) + rowsProcessed -= uint64(br.rowsLen) if rowsProcessed >= limit { // Nothing to write. There is no need in cancel() call, since it has been called by another goroutine. return diff --git a/lib/logstorage/pipe_math.go b/lib/logstorage/pipe_math.go index 7ce50bfb1..c3999351f 100644 --- a/lib/logstorage/pipe_math.go +++ b/lib/logstorage/pipe_math.go @@ -293,12 +293,12 @@ func (shard *pipeMathProcessorShard) executeExpr(me *mathExpr, br *blockResult) rIdx := len(shard.rs) shard.rs = slicesutil.SetLength(shard.rs, len(shard.rs)+1) - shard.rsBuf = slicesutil.SetLength(shard.rsBuf, len(shard.rsBuf)+len(br.timestamps)) - shard.rs[rIdx] = shard.rsBuf[len(shard.rsBuf)-len(br.timestamps):] + shard.rsBuf = slicesutil.SetLength(shard.rsBuf, len(shard.rsBuf)+br.rowsLen) + shard.rs[rIdx] = shard.rsBuf[len(shard.rsBuf)-br.rowsLen:] if me.isConst { r := shard.rs[rIdx] - for i := range br.timestamps { + for i := 0; i < br.rowsLen; i++ { r[i] = me.constValue } return @@ -331,7 +331,7 @@ func (shard *pipeMathProcessorShard) executeExpr(me *mathExpr, br *blockResult) } func (pmp *pipeMathProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } diff --git a/lib/logstorage/pipe_offset.go b/lib/logstorage/pipe_offset.go index dc888f4e5..5f3f92314 100644 --- a/lib/logstorage/pipe_offset.go +++ b/lib/logstorage/pipe_offset.go @@ -51,16 +51,16 @@ type pipeOffsetProcessor struct { } func (pop *pipeOffsetProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } - rowsProcessed := pop.rowsProcessed.Add(uint64(len(br.timestamps))) + rowsProcessed := pop.rowsProcessed.Add(uint64(br.rowsLen)) if rowsProcessed <= pop.po.offset { return } - rowsProcessed -= uint64(len(br.timestamps)) + rowsProcessed -= uint64(br.rowsLen) if rowsProcessed >= pop.po.offset { pop.ppNext.writeBlock(workerID, br) return diff --git a/lib/logstorage/pipe_pack.go b/lib/logstorage/pipe_pack.go index e911121a2..4093d27ee 100644 --- a/lib/logstorage/pipe_pack.go +++ b/lib/logstorage/pipe_pack.go @@ -64,7 +64,7 @@ type pipePackProcessorShardNopad struct { } func (ppp *pipePackProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } @@ -86,7 +86,7 @@ func (ppp *pipePackProcessor) writeBlock(workerID uint, br *blockResult) { buf := shard.buf[:0] fields := shard.fields - for rowIdx := range br.timestamps { + for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ { fields = fields[:0] for _, c := range cs { v := c.getValueAtRow(br, rowIdx) diff --git a/lib/logstorage/pipe_rename.go b/lib/logstorage/pipe_rename.go index ff85a65d8..c35c409ee 100644 --- a/lib/logstorage/pipe_rename.go +++ b/lib/logstorage/pipe_rename.go @@ -83,7 +83,7 @@ type pipeRenameProcessor struct { } func (prp *pipeRenameProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } diff --git a/lib/logstorage/pipe_sort.go b/lib/logstorage/pipe_sort.go index 07fa5d553..caebb08b6 100644 --- a/lib/logstorage/pipe_sort.go +++ b/lib/logstorage/pipe_sort.go @@ -245,11 +245,11 @@ func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) { shard.columnValues = columnValues // Generate byColumns - valuesEncoded := make([]string, len(br.timestamps)) + valuesEncoded := make([]string, br.rowsLen) shard.stateSizeBudget -= len(valuesEncoded) * int(unsafe.Sizeof(valuesEncoded[0])) bb := bbPool.Get() - for rowIdx := range br.timestamps { + for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ { // Marshal all the columns per each row into a single string // and sort rows by the resulting string. bb.B = bb.B[:0] @@ -267,8 +267,8 @@ func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) { } bbPool.Put(bb) - i64Values := make([]int64, len(br.timestamps)) - f64Values := make([]float64, len(br.timestamps)) + i64Values := make([]int64, br.rowsLen) + f64Values := make([]float64, br.rowsLen) for i := range f64Values { f64Values[i] = nan } @@ -347,7 +347,7 @@ func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) { blockIdx := len(shard.blocks) - 1 rowRefs := shard.rowRefs rowRefsLen := len(rowRefs) - for i := range br.timestamps { + for i := 0; i < br.rowsLen; i++ { rowRefs = append(rowRefs, sortRowRef{ blockIdx: blockIdx, rowIdx: i, @@ -405,7 +405,7 @@ func (shard *pipeSortProcessorShard) Less(i, j int) bool { } func (psp *pipeSortProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } @@ -686,8 +686,10 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA int, shardB *pipeSort if cA.c.isTime && cB.c.isTime { // Fast path - sort by _time - tA := bA.br.timestamps[rrA.rowIdx] - tB := bB.br.timestamps[rrB.rowIdx] + timestampsA := bA.br.getTimestamps() + timestampsB := bB.br.getTimestamps() + tA := timestampsA[rrA.rowIdx] + tB := timestampsB[rrB.rowIdx] if tA == tB { continue } diff --git a/lib/logstorage/pipe_sort_topk.go b/lib/logstorage/pipe_sort_topk.go index 6d9d26c20..5a25b3444 100644 --- a/lib/logstorage/pipe_sort_topk.go +++ b/lib/logstorage/pipe_sort_topk.go @@ -182,7 +182,8 @@ func (shard *pipeTopkProcessorShard) writeBlock(br *blockResult) { byColumns := shard.byColumns[:0] byColumnsIsTime := shard.byColumnsIsTime[:0] bb := bbPool.Get() - for rowIdx, timestamp := range br.timestamps { + timestamps := br.getTimestamps() + for rowIdx, timestamp := range timestamps { byColumns = byColumns[:0] bb.B = bb.B[:0] for i, values := range byColumnValues { @@ -234,7 +235,8 @@ func (shard *pipeTopkProcessorShard) writeBlock(br *blockResult) { // add rows to shard byColumns := shard.byColumns[:0] - for rowIdx, timestamp := range br.timestamps { + timestamps := br.getTimestamps() + for rowIdx, timestamp := range timestamps { byColumns = byColumns[:0] for i, values := range byColumnValues { @@ -307,7 +309,7 @@ func (shard *pipeTopkProcessorShard) sortRows(stopCh <-chan struct{}) { } func (ptp *pipeTopkProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index c26b5a8f8..9d1906fd3 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -269,7 +269,7 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) { // Slower generic path for a column with different values. var psg *pipeStatsGroup keyBuf := shard.keyBuf[:0] - for i := range br.timestamps { + for i := 0; i < br.rowsLen; i++ { if i <= 0 || values[i-1] != values[i] { keyBuf = encoding.MarshalBytes(keyBuf[:0], bytesutil.ToUnsafeBytes(values[i])) psg = shard.getPipeStatsGroup(keyBuf) @@ -312,7 +312,7 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) { // The slowest path - group by multiple columns with different values across rows. var psg *pipeStatsGroup keyBuf := shard.keyBuf[:0] - for i := range br.timestamps { + for i := 0; i < br.rowsLen; i++ { // Verify whether the key for 'by (...)' fields equals the previous key sameValue := i > 0 for _, values := range columnValues { @@ -338,7 +338,7 @@ func (shard *pipeStatsProcessorShard) applyPerFunctionFilters(br *blockResult) { funcs := shard.ps.funcs for i := range funcs { bm := &shard.bms[i] - bm.init(len(br.timestamps)) + bm.init(br.rowsLen) bm.setBits() iff := funcs[i].iff @@ -400,7 +400,7 @@ func (psg *pipeStatsGroup) updateStatsForRow(bms []bitmap, br *blockResult, rowI } func (psp *pipeStatsProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } diff --git a/lib/logstorage/pipe_stream_context.go b/lib/logstorage/pipe_stream_context.go index b35824a2c..679e19386 100644 --- a/lib/logstorage/pipe_stream_context.go +++ b/lib/logstorage/pipe_stream_context.go @@ -139,7 +139,8 @@ func getStreamRows(ctx context.Context, s *Storage, streamID string, minTimestam } cs := br.getColumns() - for i, timestamp := range br.timestamps { + timestamps := br.getTimestamps() + for i, timestamp := range timestamps { fields := make([]Field, len(cs)) stateSize += int(unsafe.Sizeof(fields[0])) * len(fields) @@ -210,7 +211,8 @@ func (shard *pipeStreamContextProcessorShard) writeBlock(br *blockResult) { cs := br.getColumns() cStreamID := br.getColumnByName("_stream_id") stateSize := 0 - for i, timestamp := range br.timestamps { + timestamps := br.getTimestamps() + for i, timestamp := range timestamps { fields := make([]Field, len(cs)) stateSize += int(unsafe.Sizeof(fields[0])) * len(fields) @@ -250,7 +252,7 @@ func (shard *pipeStreamContextProcessorShard) getM() map[string][]streamContextR } func (pcp *pipeStreamContextProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } if pcp.pc.linesBefore <= 0 && pcp.pc.linesAfter <= 0 { diff --git a/lib/logstorage/pipe_top.go b/lib/logstorage/pipe_top.go index 26eb55fb7..6230ad161 100644 --- a/lib/logstorage/pipe_top.go +++ b/lib/logstorage/pipe_top.go @@ -145,7 +145,7 @@ func (shard *pipeTopProcessorShard) writeBlock(br *blockResult) { // Take into account all the columns in br. keyBuf := shard.keyBuf cs := br.getColumns() - for i := range br.timestamps { + for i := 0; i < br.rowsLen; i++ { keyBuf = keyBuf[:0] for _, c := range cs { v := c.getValueAtRow(br, i) @@ -162,7 +162,7 @@ func (shard *pipeTopProcessorShard) writeBlock(br *blockResult) { c := br.getColumnByName(byFields[0]) if c.isConst { v := c.valuesEncoded[0] - shard.updateState(v, uint64(len(br.timestamps))) + shard.updateState(v, uint64(br.rowsLen)) return } if c.valueType == valueTypeDict { @@ -197,7 +197,7 @@ func (shard *pipeTopProcessorShard) writeBlock(br *blockResult) { shard.columnValues = columnValues keyBuf := shard.keyBuf - for i := range br.timestamps { + for i := 0; i < br.rowsLen; i++ { keyBuf = keyBuf[:0] for _, values := range columnValues { keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[i])) @@ -228,7 +228,7 @@ func (shard *pipeTopProcessorShard) getM() map[string]*uint64 { } func (ptp *pipeTopProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } diff --git a/lib/logstorage/pipe_uniq.go b/lib/logstorage/pipe_uniq.go index 8e33d7eb0..ef1b2985f 100644 --- a/lib/logstorage/pipe_uniq.go +++ b/lib/logstorage/pipe_uniq.go @@ -147,7 +147,7 @@ func (shard *pipeUniqProcessorShard) writeBlock(br *blockResult) bool { // Take into account all the columns in br. keyBuf := shard.keyBuf cs := br.getColumns() - for i := range br.timestamps { + for i := 0; i < br.rowsLen; i++ { keyBuf = keyBuf[:0] for _, c := range cs { v := c.getValueAtRow(br, i) @@ -164,7 +164,7 @@ func (shard *pipeUniqProcessorShard) writeBlock(br *blockResult) bool { c := br.getColumnByName(byFields[0]) if c.isConst { v := c.valuesEncoded[0] - shard.updateState(v, uint64(len(br.timestamps))) + shard.updateState(v, uint64(br.rowsLen)) return true } if c.valueType == valueTypeDict { @@ -207,7 +207,7 @@ func (shard *pipeUniqProcessorShard) writeBlock(br *blockResult) bool { shard.columnValues = columnValues keyBuf := shard.keyBuf - for i := range br.timestamps { + for i := 0; i < br.rowsLen; i++ { seenValue := true for _, values := range columnValues { if needHits || i == 0 || values[i-1] != values[i] { @@ -251,7 +251,7 @@ func (shard *pipeUniqProcessorShard) getM() map[string]*uint64 { } func (pup *pipeUniqProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } diff --git a/lib/logstorage/pipe_unpack.go b/lib/logstorage/pipe_unpack.go index fbc4589f1..c4c53fca1 100644 --- a/lib/logstorage/pipe_unpack.go +++ b/lib/logstorage/pipe_unpack.go @@ -148,7 +148,7 @@ type pipeUnpackProcessorShardNopad struct { } func (pup *pipeUnpackProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } @@ -157,7 +157,7 @@ func (pup *pipeUnpackProcessor) writeBlock(workerID uint, br *blockResult) { shard.uctx.init(pup.fieldPrefix) bm := &shard.bm - bm.init(len(br.timestamps)) + bm.init(br.rowsLen) bm.setBits() if pup.iff != nil { pup.iff.f.applyToBlockResult(br, bm) @@ -172,7 +172,7 @@ func (pup *pipeUnpackProcessor) writeBlock(workerID uint, br *blockResult) { v := c.valuesEncoded[0] shard.uctx.resetFields() pup.unpackFunc(&shard.uctx, v) - for rowIdx := range br.timestamps { + for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ { if bm.isSetBit(rowIdx) { shard.wctx.writeRow(rowIdx, shard.uctx.fields) } else { diff --git a/lib/logstorage/pipe_unroll.go b/lib/logstorage/pipe_unroll.go index 11a012529..edeb3d5d2 100644 --- a/lib/logstorage/pipe_unroll.go +++ b/lib/logstorage/pipe_unroll.go @@ -106,7 +106,7 @@ type pipeUnrollProcessorShardNopad struct { } func (pup *pipeUnrollProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } @@ -115,7 +115,7 @@ func (pup *pipeUnrollProcessor) writeBlock(workerID uint, br *blockResult) { shard.wctx.init(workerID, pup.ppNext, false, false, br) bm := &shard.bm - bm.init(len(br.timestamps)) + bm.init(br.rowsLen) bm.setBits() if iff := pu.iff; iff != nil { iff.f.applyToBlockResult(br, bm) @@ -133,7 +133,7 @@ func (pup *pipeUnrollProcessor) writeBlock(workerID uint, br *blockResult) { } fields := shard.fields - for rowIdx := range br.timestamps { + for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ { if bm.isSetBit(rowIdx) { if needStop(pup.stopCh) { return diff --git a/lib/logstorage/pipe_update.go b/lib/logstorage/pipe_update.go index 2684bd9a5..15fc02415 100644 --- a/lib/logstorage/pipe_update.go +++ b/lib/logstorage/pipe_update.go @@ -62,14 +62,14 @@ type pipeUpdateProcessorShardNopad struct { } func (pup *pipeUpdateProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } shard := &pup.shards[workerID] bm := &shard.bm - bm.init(len(br.timestamps)) + bm.init(br.rowsLen) bm.setBits() if iff := pup.iff; iff != nil { iff.f.applyToBlockResult(br, bm) diff --git a/lib/logstorage/pipe_utils_test.go b/lib/logstorage/pipe_utils_test.go index 418f235bc..5a638c38d 100644 --- a/lib/logstorage/pipe_utils_test.go +++ b/lib/logstorage/pipe_utils_test.go @@ -136,7 +136,7 @@ func (pp *testPipeProcessor) writeBlock(_ uint, br *blockResult) { columnValues = append(columnValues, values) } - for i := range br.timestamps { + for i := 0; i < br.rowsLen; i++ { row := make([]Field, len(columnValues)) for j, values := range columnValues { r := &row[j] diff --git a/lib/logstorage/stats_count.go b/lib/logstorage/stats_count.go index 8b852ef0e..9b7255c8e 100644 --- a/lib/logstorage/stats_count.go +++ b/lib/logstorage/stats_count.go @@ -18,7 +18,7 @@ func (sc *statsCount) String() string { func (sc *statsCount) updateNeededFields(neededFields fieldsSet) { if len(sc.fields) == 0 { - // There is no need in fetching any columns for count(*) - the number of matching rows can be calculated as len(blockResult.timestamps) + // There is no need in fetching any columns for count(*) - the number of matching rows can be calculated as blockResult.rowsLen return } neededFields.addFields(sc.fields) @@ -41,7 +41,7 @@ func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int { fields := scp.sc.fields if len(fields) == 0 { // Fast path - unconditionally count all the columns. - scp.rowsCount += uint64(len(br.timestamps)) + scp.rowsCount += uint64(br.rowsLen) return 0 } if len(fields) == 1 { @@ -49,12 +49,12 @@ func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int { c := br.getColumnByName(fields[0]) if c.isConst { if c.valuesEncoded[0] != "" { - scp.rowsCount += uint64(len(br.timestamps)) + scp.rowsCount += uint64(br.rowsLen) } return 0 } if c.isTime { - scp.rowsCount += uint64(len(br.timestamps)) + scp.rowsCount += uint64(br.rowsLen) return 0 } switch c.valueType { @@ -68,7 +68,7 @@ func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int { case valueTypeDict: zeroDictIdx := slices.Index(c.dictValues, "") if zeroDictIdx < 0 { - scp.rowsCount += uint64(len(br.timestamps)) + scp.rowsCount += uint64(br.rowsLen) return 0 } for _, v := range c.getValuesEncoded(br) { @@ -78,7 +78,7 @@ func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int { } return 0 case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64, valueTypeFloat64, valueTypeIPv4, valueTypeTimestampISO8601: - scp.rowsCount += uint64(len(br.timestamps)) + scp.rowsCount += uint64(br.rowsLen) return 0 default: logger.Panicf("BUG: unknown valueType=%d", c.valueType) @@ -87,7 +87,7 @@ func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int { } // Slow path - count rows containing at least a single non-empty value for the fields enumerated inside count(). - bm := getBitmap(len(br.timestamps)) + bm := getBitmap(br.rowsLen) defer putBitmap(bm) bm.setBits() @@ -95,13 +95,13 @@ func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int { c := br.getColumnByName(f) if c.isConst { if c.valuesEncoded[0] != "" { - scp.rowsCount += uint64(len(br.timestamps)) + scp.rowsCount += uint64(br.rowsLen) return 0 } continue } if c.isTime { - scp.rowsCount += uint64(len(br.timestamps)) + scp.rowsCount += uint64(br.rowsLen) return 0 } @@ -113,7 +113,7 @@ func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int { }) case valueTypeDict: if !slices.Contains(c.dictValues, "") { - scp.rowsCount += uint64(len(br.timestamps)) + scp.rowsCount += uint64(br.rowsLen) return 0 } valuesEncoded := c.getValuesEncoded(br) @@ -122,7 +122,7 @@ func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int { return c.dictValues[dictIdx] == "" }) case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64, valueTypeFloat64, valueTypeIPv4, valueTypeTimestampISO8601: - scp.rowsCount += uint64(len(br.timestamps)) + scp.rowsCount += uint64(br.rowsLen) return 0 default: logger.Panicf("BUG: unknown valueType=%d", c.valueType) @@ -130,7 +130,7 @@ func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int { } } - scp.rowsCount += uint64(len(br.timestamps)) + scp.rowsCount += uint64(br.rowsLen) scp.rowsCount -= uint64(bm.onesCount()) return 0 } diff --git a/lib/logstorage/stats_count_empty.go b/lib/logstorage/stats_count_empty.go index 69c180911..9d2193080 100644 --- a/lib/logstorage/stats_count_empty.go +++ b/lib/logstorage/stats_count_empty.go @@ -36,7 +36,7 @@ type statsCountEmptyProcessor struct { func (scp *statsCountEmptyProcessor) updateStatsForAllRows(br *blockResult) int { fields := scp.sc.fields if len(fields) == 0 { - bm := getBitmap(len(br.timestamps)) + bm := getBitmap(br.rowsLen) bm.setBits() for _, c := range br.getColumns() { values := c.getValues(br) @@ -53,7 +53,7 @@ func (scp *statsCountEmptyProcessor) updateStatsForAllRows(br *blockResult) int c := br.getColumnByName(fields[0]) if c.isConst { if c.valuesEncoded[0] == "" { - scp.rowsCount += uint64(len(br.timestamps)) + scp.rowsCount += uint64(br.rowsLen) } return 0 } @@ -88,7 +88,7 @@ func (scp *statsCountEmptyProcessor) updateStatsForAllRows(br *blockResult) int } // Slow path - count rows containing empty value for all the fields enumerated inside count_empty(). - bm := getBitmap(len(br.timestamps)) + bm := getBitmap(br.rowsLen) defer putBitmap(bm) bm.setBits() diff --git a/lib/logstorage/stats_count_uniq.go b/lib/logstorage/stats_count_uniq.go index f61d461db..b7aa87e4e 100644 --- a/lib/logstorage/stats_count_uniq.go +++ b/lib/logstorage/stats_count_uniq.go @@ -64,7 +64,7 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int { sup.columnValues = columnValues keyBuf := sup.keyBuf[:0] - for i := range br.timestamps { + for i := 0; i < br.rowsLen; i++ { seenKey := true for _, values := range columnValues { if i == 0 || values[i-1] != values[i] { @@ -103,8 +103,8 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int { // This guarantees that keys do not clash for different column types across blocks. c := br.getColumnByName(fields[0]) if c.isTime { - // Count unique br.timestamps - timestamps := br.timestamps + // Count unique timestamps + timestamps := br.getTimestamps() keyBuf := sup.keyBuf[:0] for i, timestamp := range timestamps { if i > 0 && timestamps[i-1] == timestamps[i] { @@ -180,7 +180,7 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int { sup.columnValues = columnValues keyBuf := sup.keyBuf[:0] - for i := range br.timestamps { + for i := 0; i < br.rowsLen; i++ { seenKey := true for _, values := range columnValues { if i == 0 || values[i-1] != values[i] { @@ -247,10 +247,11 @@ func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx in // This guarantees that keys do not clash for different column types across blocks. c := br.getColumnByName(fields[0]) if c.isTime { - // Count unique br.timestamps + // Count unique timestamps + timestamps := br.getTimestamps() keyBuf := sup.keyBuf[:0] keyBuf = append(keyBuf[:0], 1) - keyBuf = encoding.MarshalInt64(keyBuf, br.timestamps[rowIdx]) + keyBuf = encoding.MarshalInt64(keyBuf, timestamps[rowIdx]) stateSizeIncrease += sup.updateState(keyBuf) sup.keyBuf = keyBuf return stateSizeIncrease diff --git a/lib/logstorage/stats_max.go b/lib/logstorage/stats_max.go index c92cd7a75..22eb0cbee 100644 --- a/lib/logstorage/stats_max.go +++ b/lib/logstorage/stats_max.go @@ -80,20 +80,12 @@ func (smp *statsMaxProcessor) mergeState(sfp statsProcessor) { } func (smp *statsMaxProcessor) updateStateForColumn(br *blockResult, c *blockResultColumn) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } if c.isTime { - // Special case for time column - timestamps := br.timestamps - maxTimestamp := timestamps[len(timestamps)-1] - for _, timestamp := range timestamps[:len(timestamps)-1] { - if timestamp > maxTimestamp { - maxTimestamp = timestamp - } - } - + maxTimestamp := br.getMaxTimestamp() bb := bbPool.Get() bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], maxTimestamp) smp.updateStateBytes(bb.B) diff --git a/lib/logstorage/stats_min.go b/lib/logstorage/stats_min.go index fe9890dab..4be8d550e 100644 --- a/lib/logstorage/stats_min.go +++ b/lib/logstorage/stats_min.go @@ -82,20 +82,12 @@ func (smp *statsMinProcessor) mergeState(sfp statsProcessor) { } func (smp *statsMinProcessor) updateStateForColumn(br *blockResult, c *blockResultColumn) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } if c.isTime { - // Special case for time column - timestamps := br.timestamps - minTimestamp := timestamps[0] - for _, timestamp := range timestamps[1:] { - if timestamp < minTimestamp { - minTimestamp = timestamp - } - } - + minTimestamp := br.getMinTimestamp() bb := bbPool.Get() bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], minTimestamp) smp.updateStateBytes(bb.B) diff --git a/lib/logstorage/stats_quantile.go b/lib/logstorage/stats_quantile.go index 4bf988f28..16c8446d6 100644 --- a/lib/logstorage/stats_quantile.go +++ b/lib/logstorage/stats_quantile.go @@ -96,7 +96,7 @@ func (sqp *statsQuantileProcessor) updateStateForColumn(br *blockResult, c *bloc if c.isConst { f, ok := tryParseFloat64(c.valuesEncoded[0]) if ok { - for range br.timestamps { + for i := 0; i < br.rowsLen; i++ { stateSizeIncrease += h.update(f) } } diff --git a/lib/logstorage/stats_row_any.go b/lib/logstorage/stats_row_any.go index 0060189ce..04942c662 100644 --- a/lib/logstorage/stats_row_any.go +++ b/lib/logstorage/stats_row_any.go @@ -39,7 +39,7 @@ type statsRowAnyProcessor struct { } func (sap *statsRowAnyProcessor) updateStatsForAllRows(br *blockResult) int { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return 0 } if sap.captured { diff --git a/lib/logstorage/stats_row_max.go b/lib/logstorage/stats_row_max.go index 31c3689a9..9982abebd 100644 --- a/lib/logstorage/stats_row_max.go +++ b/lib/logstorage/stats_row_max.go @@ -60,8 +60,9 @@ func (smp *statsRowMaxProcessor) updateStatsForAllRows(br *blockResult) int { return stateSizeIncrease } if c.isTime { + maxTimestamp := br.getMaxTimestamp() bb := bbPool.Get() - bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], br.timestamps[0]) + bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], maxTimestamp) v := bytesutil.ToUnsafeString(bb.B) stateSizeIncrease += smp.updateState(v, br, 0) bbPool.Put(bb) @@ -124,8 +125,9 @@ func (smp *statsRowMaxProcessor) updateStatsForRow(br *blockResult, rowIdx int) return stateSizeIncrease } if c.isTime { + timestamps := br.getTimestamps() bb := bbPool.Get() - bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], br.timestamps[rowIdx]) + bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], timestamps[rowIdx]) v := bytesutil.ToUnsafeString(bb.B) stateSizeIncrease += smp.updateState(v, br, rowIdx) bbPool.Put(bb) diff --git a/lib/logstorage/stats_row_min.go b/lib/logstorage/stats_row_min.go index 66415dd90..e0f724b79 100644 --- a/lib/logstorage/stats_row_min.go +++ b/lib/logstorage/stats_row_min.go @@ -60,8 +60,9 @@ func (smp *statsRowMinProcessor) updateStatsForAllRows(br *blockResult) int { return stateSizeIncrease } if c.isTime { + minTimestamp := br.getMinTimestamp() bb := bbPool.Get() - bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], br.timestamps[0]) + bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], minTimestamp) v := bytesutil.ToUnsafeString(bb.B) stateSizeIncrease += smp.updateState(v, br, 0) bbPool.Put(bb) @@ -124,8 +125,9 @@ func (smp *statsRowMinProcessor) updateStatsForRow(br *blockResult, rowIdx int) return stateSizeIncrease } if c.isTime { + timestamps := br.getTimestamps() bb := bbPool.Get() - bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], br.timestamps[rowIdx]) + bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], timestamps[rowIdx]) v := bytesutil.ToUnsafeString(bb.B) stateSizeIncrease += smp.updateState(v, br, rowIdx) bbPool.Put(bb) diff --git a/lib/logstorage/stats_values.go b/lib/logstorage/stats_values.go index ceab48998..aab5475d5 100644 --- a/lib/logstorage/stats_values.go +++ b/lib/logstorage/stats_values.go @@ -64,12 +64,12 @@ func (svp *statsValuesProcessor) updateStatsForAllRowsColumn(c *blockResultColum stateSizeIncrease += len(v) values := svp.values - for range br.timestamps { + for i := 0; i < br.rowsLen; i++ { values = append(values, v) } svp.values = values - stateSizeIncrease += len(br.timestamps) * int(unsafe.Sizeof(values[0])) + stateSizeIncrease += br.rowsLen * int(unsafe.Sizeof(values[0])) return stateSizeIncrease } if c.valueType == valueTypeDict { @@ -86,7 +86,7 @@ func (svp *statsValuesProcessor) updateStatsForAllRowsColumn(c *blockResultColum } svp.values = values - stateSizeIncrease += len(br.timestamps) * int(unsafe.Sizeof(values[0])) + stateSizeIncrease += br.rowsLen * int(unsafe.Sizeof(values[0])) return stateSizeIncrease } @@ -100,7 +100,7 @@ func (svp *statsValuesProcessor) updateStatsForAllRowsColumn(c *blockResultColum } svp.values = values - stateSizeIncrease += len(br.timestamps) * int(unsafe.Sizeof(values[0])) + stateSizeIncrease += br.rowsLen * int(unsafe.Sizeof(values[0])) return stateSizeIncrease } diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index 3f9185f54..58724fa58 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -86,7 +86,7 @@ func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query, } writeBlockResult := func(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } @@ -101,7 +101,9 @@ func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query, Values: values, }) } - writeBlock(workerID, br.timestamps, csDst) + + timestamps := br.getTimestamps() + writeBlock(workerID, timestamps, csDst) brs.cs = csDst putBlockRows(brs) @@ -233,7 +235,7 @@ func (s *Storage) getFieldValuesNoHits(ctx context.Context, tenantIDs []TenantID var values []string var valuesLock sync.Mutex writeBlockResult := func(_ uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } @@ -396,7 +398,7 @@ func (s *Storage) runValuesWithHitsQuery(ctx context.Context, tenantIDs []Tenant var results []ValueWithHits var resultsLock sync.Mutex writeBlockResult := func(_ uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } @@ -656,7 +658,7 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch } bs.search(bsw, bm) - if len(bs.br.timestamps) > 0 { + if bs.br.rowsLen > 0 { processBlockResult(workerID, &bs.br) } bsw.reset() diff --git a/lib/logstorage/storage_search_test.go b/lib/logstorage/storage_search_test.go index be39d2d05..0dd9d4f6a 100644 --- a/lib/logstorage/storage_search_test.go +++ b/lib/logstorage/storage_search_test.go @@ -876,7 +876,7 @@ func TestStorageSearch(t *testing.T) { so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"}) var rowsCountTotal atomic.Uint32 processBlock := func(_ uint, br *blockResult) { - rowsCountTotal.Add(uint32(len(br.timestamps))) + rowsCountTotal.Add(uint32(br.rowsLen)) } s.search(workersCount, so, nil, processBlock) @@ -893,7 +893,7 @@ func TestStorageSearch(t *testing.T) { so := newTestGenericSearchOptions(allTenantIDs, f, []string{"_msg"}) var rowsCountTotal atomic.Uint32 processBlock := func(_ uint, br *blockResult) { - rowsCountTotal.Add(uint32(len(br.timestamps))) + rowsCountTotal.Add(uint32(br.rowsLen)) } s.search(workersCount, so, nil, processBlock) @@ -926,7 +926,7 @@ func TestStorageSearch(t *testing.T) { so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"}) var rowsCountTotal atomic.Uint32 processBlock := func(_ uint, br *blockResult) { - rowsCountTotal.Add(uint32(len(br.timestamps))) + rowsCountTotal.Add(uint32(br.rowsLen)) } s.search(workersCount, so, nil, processBlock) @@ -948,7 +948,7 @@ func TestStorageSearch(t *testing.T) { so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"}) var rowsCountTotal atomic.Uint32 processBlock := func(_ uint, br *blockResult) { - rowsCountTotal.Add(uint32(len(br.timestamps))) + rowsCountTotal.Add(uint32(br.rowsLen)) } s.search(workersCount, so, nil, processBlock) @@ -978,7 +978,7 @@ func TestStorageSearch(t *testing.T) { so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"}) var rowsCountTotal atomic.Uint32 processBlock := func(_ uint, br *blockResult) { - rowsCountTotal.Add(uint32(len(br.timestamps))) + rowsCountTotal.Add(uint32(br.rowsLen)) } s.search(workersCount, so, nil, processBlock) @@ -999,7 +999,7 @@ func TestStorageSearch(t *testing.T) { so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"}) var rowsCountTotal atomic.Uint32 processBlock := func(_ uint, br *blockResult) { - rowsCountTotal.Add(uint32(len(br.timestamps))) + rowsCountTotal.Add(uint32(br.rowsLen)) } s.search(workersCount, so, nil, processBlock)