From 4599429f516116ae96c7a6b99e0729626f5e9d9c Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 25 Sep 2024 16:16:53 +0200 Subject: [PATCH] lib/logstorage: read timestamps column when it is really needed during query execution Previously timestamps column was read unconditionally on every query. This could significantly slow down queries, which do not need reading this column like in https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7070 . --- docs/VictoriaLogs/CHANGELOG.md | 1 + lib/logstorage/block_result.go | 246 +++++++++++++++-------- lib/logstorage/block_search.go | 4 +- lib/logstorage/filter_day_range.go | 2 +- lib/logstorage/filter_range.go | 3 +- lib/logstorage/filter_test.go | 3 +- lib/logstorage/filter_time.go | 2 +- lib/logstorage/filter_week_range.go | 2 +- lib/logstorage/pipe_copy.go | 2 +- lib/logstorage/pipe_delete.go | 2 +- lib/logstorage/pipe_drop_empty_fields.go | 4 +- lib/logstorage/pipe_extract.go | 4 +- lib/logstorage/pipe_extract_regexp.go | 4 +- lib/logstorage/pipe_field_names.go | 4 +- lib/logstorage/pipe_fields.go | 2 +- lib/logstorage/pipe_filter.go | 4 +- lib/logstorage/pipe_format.go | 6 +- lib/logstorage/pipe_limit.go | 6 +- lib/logstorage/pipe_math.go | 8 +- lib/logstorage/pipe_offset.go | 6 +- lib/logstorage/pipe_pack.go | 4 +- lib/logstorage/pipe_rename.go | 2 +- lib/logstorage/pipe_sort.go | 18 +- lib/logstorage/pipe_sort_topk.go | 8 +- lib/logstorage/pipe_stats.go | 8 +- lib/logstorage/pipe_stream_context.go | 8 +- lib/logstorage/pipe_top.go | 8 +- lib/logstorage/pipe_uniq.go | 8 +- lib/logstorage/pipe_unpack.go | 6 +- lib/logstorage/pipe_unroll.go | 6 +- lib/logstorage/pipe_update.go | 4 +- lib/logstorage/pipe_utils_test.go | 2 +- lib/logstorage/stats_count.go | 24 +-- lib/logstorage/stats_count_empty.go | 6 +- lib/logstorage/stats_count_uniq.go | 13 +- lib/logstorage/stats_max.go | 12 +- lib/logstorage/stats_min.go | 12 +- lib/logstorage/stats_quantile.go | 2 +- lib/logstorage/stats_row_any.go | 2 +- lib/logstorage/stats_row_max.go | 6 +- lib/logstorage/stats_row_min.go | 6 +- lib/logstorage/stats_values.go | 8 +- lib/logstorage/storage_search.go | 12 +- lib/logstorage/storage_search_test.go | 12 +- 44 files changed, 299 insertions(+), 213 deletions(-) diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index e2c09cc5c..7c40abde5 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -16,6 +16,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip * FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): add button for enabling auto refresh, similarly to VictoriaMetrics vmui. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7017). +* FEATURE: improve performance of analytical queries, which do not need reading the `_time` field. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7070). * BUGFIX: properly return logs without [`_msg`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) field when `*` query is passed to [`/select/logsql/query` endpoint](https://docs.victoriametrics.com/victorialogs/querying/#querying-logs) together with positive `limit` arg. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6785). Thanks to @jiekun for itentifying the root cause of the issue. diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index 2dc30a3fb..a4e14f695 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -21,14 +21,29 @@ import ( // // It is expected that its contents is accessed only from a single goroutine at a time. type blockResult struct { + // rowsLen is the number of rows in the given blockResult. + rowsLen int + + // bs is the associated blockSearch for the given blockResult. + // + // bs is nil for the blockResult constructed by pipes. + bs *blockSearch + + // bm is the associated bitmap for the given blockResult. + // + // bm is nil for the blockResult constructed by pipes. + bm *bitmap + // a holds all the bytes behind the requested column values in the block. a arena - // values holds all the requested column values in the block. + // valuesBuf holds all the requested column values in the block. valuesBuf []string - // timestamps contain timestamps for the selected log entries in the block. - timestamps []int64 + // timestampsBuf contains cached timestamps for the selected log entries in the block. + // + // timestamps must be obtained via blockResult.getTimestamps() call. + timestampsBuf []int64 // csBuf contains requested columns. csBuf []blockResultColumn @@ -47,12 +62,17 @@ type blockResult struct { } func (br *blockResult) reset() { + br.rowsLen = 0 + + br.cs = nil + br.bm = nil + br.a.reset() clear(br.valuesBuf) br.valuesBuf = br.valuesBuf[:0] - br.timestamps = br.timestamps[:0] + br.timestampsBuf = br.timestampsBuf[:0] clear(br.csBuf) br.csBuf = br.csBuf[:0] @@ -76,6 +96,11 @@ func (br *blockResult) reset() { func (br *blockResult) clone() *blockResult { brNew := &blockResult{} + brNew.rowsLen = br.rowsLen + + // do not clone br.cs, since it may be updated at any time. + // do not clone br.bm, since it may be updated at any time. + cs := br.getColumns() // Pre-populate values in every column in order to properly calculate the needed backing buffer size below. @@ -96,8 +121,10 @@ func (br *blockResult) clone() *blockResult { } brNew.valuesBuf = make([]string, 0, valuesBufLen) - brNew.timestamps = make([]int64, len(br.timestamps)) - copy(brNew.timestamps, br.timestamps) + srcTimestamps := br.getTimestamps() + brNew.timestampsBuf = make([]int64, len(srcTimestamps)) + copy(brNew.timestampsBuf, srcTimestamps) + brNew.checkTimestampsLen() csNew := make([]blockResultColumn, len(cs)) for i, c := range cs { @@ -112,18 +139,19 @@ func (br *blockResult) clone() *blockResult { return brNew } -// initFromFilterAllColumns initializes br from brSrc by copying rows identified by set bets at bm. +// initFromFilterAllColumns initializes br from brSrc by copying rows identified by set bits at bm. // // The br is valid until brSrc or bm is updated. func (br *blockResult) initFromFilterAllColumns(brSrc *blockResult, bm *bitmap) { br.reset() - srcTimestamps := brSrc.timestamps - dstTimestamps := br.timestamps[:0] + srcTimestamps := brSrc.getTimestamps() + dstTimestamps := br.timestampsBuf[:0] bm.forEachSetBitReadonly(func(idx int) { dstTimestamps = append(dstTimestamps, srcTimestamps[idx]) }) - br.timestamps = dstTimestamps + br.timestampsBuf = dstTimestamps + br.rowsLen = len(br.timestampsBuf) for _, cSrc := range brSrc.getColumns() { br.appendFilteredColumn(brSrc, cSrc, bm) @@ -134,7 +162,7 @@ func (br *blockResult) initFromFilterAllColumns(brSrc *blockResult, bm *bitmap) // // the br is valid until brSrc, cSrc or bm is updated. func (br *blockResult) appendFilteredColumn(brSrc *blockResult, cSrc *blockResultColumn, bm *bitmap) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } cDst := blockResultColumn{ @@ -211,7 +239,7 @@ func (br *blockResult) sizeBytes() int { n += br.a.sizeBytes() n += cap(br.valuesBuf) * int(unsafe.Sizeof(br.valuesBuf[0])) - n += cap(br.timestamps) * int(unsafe.Sizeof(br.timestamps[0])) + n += cap(br.timestampsBuf) * int(unsafe.Sizeof(br.timestampsBuf[0])) n += cap(br.csBuf) * int(unsafe.Sizeof(br.csBuf[0])) n += cap(br.cs) * int(unsafe.Sizeof(br.cs[0])) @@ -221,10 +249,10 @@ func (br *blockResult) sizeBytes() int { // setResultColumns sets the given rcs as br columns. // // The br is valid only until rcs are modified. -func (br *blockResult) setResultColumns(rcs []resultColumn, rowsCount int) { +func (br *blockResult) setResultColumns(rcs []resultColumn, rowsLen int) { br.reset() - br.timestamps = fastnum.AppendInt64Zeros(br.timestamps[:0], rowsCount) + br.rowsLen = rowsLen for i := range rcs { br.addResultColumn(&rcs[i]) @@ -232,8 +260,8 @@ func (br *blockResult) setResultColumns(rcs []resultColumn, rowsCount int) { } func (br *blockResult) addResultColumn(rc *resultColumn) { - if len(rc.values) != len(br.timestamps) { - logger.Panicf("BUG: column %q must contain %d rows, but it contains %d rows", rc.name, len(br.timestamps), len(rc.values)) + if len(rc.values) != br.rowsLen { + logger.Panicf("BUG: column %q must contain %d rows, but it contains %d rows", rc.name, br.rowsLen, len(rc.values)) } if areConstValues(rc.values) { // This optimization allows reducing memory usage after br cloning @@ -252,11 +280,9 @@ func (br *blockResult) addResultColumn(rc *resultColumn) { br.csInitialized = false } -// initAllColumns initializes all the columns in br according to bs and bm. -// -// The initialized columns are valid until bs and bm are changed. -func (br *blockResult) initAllColumns(bs *blockSearch, bm *bitmap) { - unneededColumnNames := bs.bsw.so.unneededColumnNames +// initAllColumns initializes all the columns in br. +func (br *blockResult) initAllColumns() { + unneededColumnNames := br.bs.bsw.so.unneededColumnNames if !slices.Contains(unneededColumnNames, "_time") { // Add _time column @@ -265,12 +291,12 @@ func (br *blockResult) initAllColumns(bs *blockSearch, bm *bitmap) { if !slices.Contains(unneededColumnNames, "_stream_id") { // Add _stream_id column - br.addStreamIDColumn(bs) + br.addStreamIDColumn() } if !slices.Contains(unneededColumnNames, "_stream") { // Add _stream column - if !br.addStreamColumn(bs) { + if !br.addStreamColumn() { // Skip the current block, since the associated stream tags are missing br.reset() return @@ -279,18 +305,18 @@ func (br *blockResult) initAllColumns(bs *blockSearch, bm *bitmap) { if !slices.Contains(unneededColumnNames, "_msg") { // Add _msg column - v := bs.csh.getConstColumnValue("_msg") + v := br.bs.csh.getConstColumnValue("_msg") if v != "" { br.addConstColumn("_msg", v) - } else if ch := bs.csh.getColumnHeader("_msg"); ch != nil { - br.addColumn(bs, bm, ch) + } else if ch := br.bs.csh.getColumnHeader("_msg"); ch != nil { + br.addColumn(ch) } else { br.addConstColumn("_msg", "") } } // Add other const columns - for _, cc := range bs.csh.constColumns { + for _, cc := range br.bs.csh.constColumns { if isMsgFieldName(cc.Name) { continue } @@ -300,30 +326,28 @@ func (br *blockResult) initAllColumns(bs *blockSearch, bm *bitmap) { } // Add other non-const columns - chs := bs.csh.columnHeaders + chs := br.bs.csh.columnHeaders for i := range chs { ch := &chs[i] if isMsgFieldName(ch.name) { continue } if !slices.Contains(unneededColumnNames, ch.name) { - br.addColumn(bs, bm, ch) + br.addColumn(ch) } } br.csInitFast() } -// initRequestedColumns initialized only requested columns in br according to bs and bm. -// -// The initialized columns are valid until bs and bm are changed. -func (br *blockResult) initRequestedColumns(bs *blockSearch, bm *bitmap) { - for _, columnName := range bs.bsw.so.neededColumnNames { +// initRequestedColumns initialized only requested columns in br. +func (br *blockResult) initRequestedColumns() { + for _, columnName := range br.bs.bsw.so.neededColumnNames { switch columnName { case "_stream_id": - br.addStreamIDColumn(bs) + br.addStreamIDColumn() case "_stream": - if !br.addStreamColumn(bs) { + if !br.addStreamColumn() { // Skip the current block, since the associated stream tags are missing. br.reset() return @@ -331,11 +355,11 @@ func (br *blockResult) initRequestedColumns(bs *blockSearch, bm *bitmap) { case "_time": br.addTimeColumn() default: - v := bs.csh.getConstColumnValue(columnName) + v := br.bs.csh.getConstColumnValue(columnName) if v != "" { br.addConstColumn(columnName, v) - } else if ch := bs.csh.getColumnHeader(columnName); ch != nil { - br.addColumn(bs, bm, ch) + } else if ch := br.bs.csh.getColumnHeader(columnName); ch != nil { + br.addColumn(ch) } else { br.addConstColumn(columnName, "") } @@ -345,38 +369,92 @@ func (br *blockResult) initRequestedColumns(bs *blockSearch, bm *bitmap) { br.csInitFast() } +// mustInit initializes br with the given bs and bm. +// +// br is valid until bs or bm changes. func (br *blockResult) mustInit(bs *blockSearch, bm *bitmap) { br.reset() - if bm.isZero() { - // Nothing to initialize for zero matching log entries in the block. + br.rowsLen = bm.onesCount() + if br.rowsLen == 0 { return } - // Initialize timestamps, since they are required for all the further work with br. - so := bs.bsw.so - if !so.needAllColumns && !slices.Contains(so.neededColumnNames, "_time") || so.needAllColumns && slices.Contains(so.unneededColumnNames, "_time") { - // The fastest path - _time column wasn't requested, so it is enough to initialize br.timestamps with zeroes. - rowsLen := bm.onesCount() - br.timestamps = fastnum.AppendInt64Zeros(br.timestamps[:0], rowsLen) + br.bs = bs + br.bm = bm +} + +func (br *blockResult) getMinTimestamp() int64 { + if br.bm != nil && br.bm.bitsLen == br.rowsLen { + return br.bs.bsw.bh.timestampsHeader.minTimestamp + } + + timestamps := br.getTimestamps() + if len(timestamps) == 0 { + return -1 << 63 + } + minTimestamp := timestamps[0] + for i := 1; i < len(timestamps); i++ { + if timestamps[i] < minTimestamp { + minTimestamp = timestamps[i] + } + } + return minTimestamp +} + +func (br *blockResult) getMaxTimestamp() int64 { + if br.bm != nil && br.bm.bitsLen == br.rowsLen { + return br.bs.bsw.bh.timestampsHeader.maxTimestamp + } + + timestamps := br.getTimestamps() + if len(timestamps) == 0 { + return (1 << 63) - 1 + } + maxTimestamp := timestamps[len(timestamps)-1] + for i := len(timestamps) - 2; i >= 0; i-- { + if timestamps[i] > maxTimestamp { + maxTimestamp = timestamps[i] + } + } + return maxTimestamp +} + +func (br *blockResult) getTimestamps() []int64 { + if br.rowsLen > 0 && len(br.timestampsBuf) == 0 { + br.initTimestamps() + } + return br.timestampsBuf +} + +func (br *blockResult) initTimestamps() { + if br.bs == nil { + br.timestampsBuf = fastnum.AppendInt64Zeros(br.timestampsBuf[:0], br.rowsLen) return } - // Slow path - the _time column is requested, so we need to initialize br.timestamps with real timestamps. - srcTimestamps := bs.getTimestamps() - if bm.areAllBitsSet() { + srcTimestamps := br.bs.getTimestamps() + if br.bm.areAllBitsSet() { // Fast path - all the rows in the block are selected, so copy all the timestamps without any filtering. - br.timestamps = append(br.timestamps[:0], srcTimestamps...) + br.timestampsBuf = append(br.timestampsBuf[:0], srcTimestamps...) + br.checkTimestampsLen() return } // Slow path - copy only the needed timestamps to br according to filter results. - dstTimestamps := br.timestamps[:0] - bm.forEachSetBitReadonly(func(idx int) { + dstTimestamps := br.timestampsBuf[:0] + br.bm.forEachSetBitReadonly(func(idx int) { ts := srcTimestamps[idx] dstTimestamps = append(dstTimestamps, ts) }) - br.timestamps = dstTimestamps + br.timestampsBuf = dstTimestamps + br.checkTimestampsLen() +} + +func (br *blockResult) checkTimestampsLen() { + if len(br.timestampsBuf) != br.rowsLen { + logger.Panicf("BUG: unexpected number of timestamps; got %d; want %d", len(br.timestampsBuf), br.rowsLen) + } } func (br *blockResult) newValuesEncodedFromColumnHeader(bs *blockSearch, bm *bitmap, ch *columnHeader) []string { @@ -454,8 +532,8 @@ func (br *blockResult) newValuesEncodedFromColumnHeader(bs *blockSearch, bm *bit // addColumn adds column for the given ch to br. // -// The added column is valid until bs, bm or ch is changed. -func (br *blockResult) addColumn(bs *blockSearch, bm *bitmap, ch *columnHeader) { +// The added column is valid until ch is changed. +func (br *blockResult) addColumn(ch *columnHeader) { br.csBuf = append(br.csBuf, blockResultColumn{ name: getCanonicalColumnName(ch.name), valueType: ch.valueType, @@ -466,8 +544,8 @@ func (br *blockResult) addColumn(bs *blockSearch, bm *bitmap, ch *columnHeader) c := &br.csBuf[len(br.csBuf)-1] br.svecs = append(br.svecs, searchValuesEncodedCreator{ - bs: bs, - bm: bm, + bs: br.bs, + bm: br.bm, ch: ch, }) c.valuesEncodedCreator = &br.svecs[len(br.svecs)-1] @@ -492,15 +570,15 @@ func (br *blockResult) addTimeColumn() { br.csInitialized = false } -func (br *blockResult) addStreamIDColumn(bs *blockSearch) { +func (br *blockResult) addStreamIDColumn() { bb := bbPool.Get() - bb.B = bs.bsw.bh.streamID.marshalString(bb.B) + bb.B = br.bs.bsw.bh.streamID.marshalString(bb.B) br.addConstColumn("_stream_id", bytesutil.ToUnsafeString(bb.B)) bbPool.Put(bb) } -func (br *blockResult) addStreamColumn(bs *blockSearch) bool { - streamStr := bs.getStreamStr() +func (br *blockResult) addStreamColumn() bool { + streamStr := br.bs.getStreamStr() if streamStr == "" { return false } @@ -562,16 +640,16 @@ func (br *blockResult) newValuesBucketedForColumn(c *blockResultColumn, bf *bySt func (br *blockResult) getBucketedConstValues(v string, bf *byStatsField) []string { if v == "" { // Fast path - return a slice of empty strings without constructing the slice. - return getEmptyStrings(len(br.timestamps)) + return getEmptyStrings(br.rowsLen) } - // Slower path - construct slice of identical values with the len(br.timestamps) + // Slower path - construct slice of identical values with the length equal to br.rowsLen valuesBuf := br.valuesBuf valuesBufLen := len(valuesBuf) v = br.getBucketedValue(v, bf) - for range br.timestamps { + for i := 0; i < br.rowsLen; i++ { valuesBuf = append(valuesBuf, v) } @@ -585,7 +663,7 @@ func (br *blockResult) getBucketedTimestampValues(bf *byStatsField) []string { valuesBuf := br.valuesBuf valuesBufLen := len(valuesBuf) - timestamps := br.timestamps + timestamps := br.getTimestamps() var s string if !bf.hasBucketConfig() { @@ -1401,7 +1479,11 @@ func getBlockResultColumnIdxByName(cs []*blockResultColumn, name string) int { } func (br *blockResult) skipRows(skipRows int) { - br.timestamps = append(br.timestamps[:0], br.timestamps[skipRows:]...) + timestamps := br.getTimestamps() + br.timestampsBuf = append(br.timestampsBuf[:0], timestamps[skipRows:]...) + br.rowsLen -= skipRows + br.checkTimestampsLen() + for _, c := range br.getColumns() { if c.values != nil { c.values = append(c.values[:0], c.values[skipRows:]...) @@ -1421,7 +1503,11 @@ func (br *blockResult) skipRows(skipRows int) { } func (br *blockResult) truncateRows(keepRows int) { - br.timestamps = br.timestamps[:keepRows] + timestamps := br.getTimestamps() + br.timestampsBuf = append(br.timestampsBuf[:0], timestamps[:keepRows]...) + br.rowsLen = keepRows + br.checkTimestampsLen() + for _, c := range br.getColumns() { if c.values != nil { c.values = c.values[:keepRows] @@ -1676,10 +1762,10 @@ func (c *blockResultColumn) getFloatValueAtRow(br *blockResult, rowIdx int) (flo func (c *blockResultColumn) sumLenValues(br *blockResult) uint64 { if c.isConst { v := c.valuesEncoded[0] - return uint64(len(v)) * uint64(len(br.timestamps)) + return uint64(len(v)) * uint64(br.rowsLen) } if c.isTime { - return uint64(len(time.RFC3339Nano)) * uint64(len(br.timestamps)) + return uint64(len(time.RFC3339Nano)) * uint64(br.rowsLen) } switch c.valueType { @@ -1707,7 +1793,7 @@ func (c *blockResultColumn) sumLenValues(br *blockResult) uint64 { case valueTypeIPv4: return c.sumLenStringValues(br) case valueTypeTimestampISO8601: - return uint64(len(iso8601Timestamp)) * uint64(len(br.timestamps)) + return uint64(len(iso8601Timestamp)) * uint64(br.rowsLen) default: logger.Panicf("BUG: unknown valueType=%d", c.valueType) return 0 @@ -1729,7 +1815,7 @@ func (c *blockResultColumn) sumValues(br *blockResult) (float64, int) { if !ok { return 0, 0 } - return f * float64(len(br.timestamps)), len(br.timestamps) + return f * float64(br.rowsLen), br.rowsLen } if c.isTime { return 0, 0 @@ -1780,25 +1866,25 @@ func (c *blockResultColumn) sumValues(br *blockResult) (float64, int) { for _, v := range c.getValuesEncoded(br) { sum += uint64(unmarshalUint8(v)) } - return float64(sum), len(br.timestamps) + return float64(sum), br.rowsLen case valueTypeUint16: sum := uint64(0) for _, v := range c.getValuesEncoded(br) { sum += uint64(unmarshalUint16(v)) } - return float64(sum), len(br.timestamps) + return float64(sum), br.rowsLen case valueTypeUint32: sum := uint64(0) for _, v := range c.getValuesEncoded(br) { sum += uint64(unmarshalUint32(v)) } - return float64(sum), len(br.timestamps) + return float64(sum), br.rowsLen case valueTypeUint64: sum := float64(0) for _, v := range c.getValuesEncoded(br) { sum += float64(unmarshalUint64(v)) } - return sum, len(br.timestamps) + return sum, br.rowsLen case valueTypeFloat64: sum := float64(0) for _, v := range c.getValuesEncoded(br) { @@ -1807,7 +1893,7 @@ func (c *blockResultColumn) sumValues(br *blockResult) (float64, int) { sum += f } } - return sum, len(br.timestamps) + return sum, br.rowsLen case valueTypeIPv4: return 0, 0 case valueTypeTimestampISO8601: @@ -1864,15 +1950,15 @@ func truncateTimestampToYear(timestamp int64) int64 { return time.Date(t.Year(), time.January, 1, 0, 0, 0, 0, time.UTC).UnixNano() } -func getEmptyStrings(rowsCount int) []string { +func getEmptyStrings(rowsLen int) []string { p := emptyStrings.Load() if p == nil { - values := make([]string, rowsCount) + values := make([]string, rowsLen) emptyStrings.Store(&values) return values } values := *p - return slicesutil.SetLength(values, rowsCount) + return slicesutil.SetLength(values, rowsLen) } var emptyStrings atomic.Pointer[[]string] diff --git a/lib/logstorage/block_search.go b/lib/logstorage/block_search.go index 3512dd321..d260ba264 100644 --- a/lib/logstorage/block_search.go +++ b/lib/logstorage/block_search.go @@ -177,9 +177,9 @@ func (bs *blockSearch) search(bsw *blockSearchWork, bm *bitmap) { // fetch the requested columns to bs.br. if bs.bsw.so.needAllColumns { - bs.br.initAllColumns(bs, bm) + bs.br.initAllColumns() } else { - bs.br.initRequestedColumns(bs, bm) + bs.br.initRequestedColumns() } } diff --git a/lib/logstorage/filter_day_range.go b/lib/logstorage/filter_day_range.go index 0f7608894..4ad88f0a8 100644 --- a/lib/logstorage/filter_day_range.go +++ b/lib/logstorage/filter_day_range.go @@ -47,7 +47,7 @@ func (fr *filterDayRange) applyToBlockResult(br *blockResult, bm *bitmap) { return } if c.isTime { - timestamps := br.timestamps + timestamps := br.getTimestamps() bm.forEachSetBit(func(idx int) bool { timestamp := timestamps[idx] return fr.matchTimestampValue(timestamp) diff --git a/lib/logstorage/filter_range.go b/lib/logstorage/filter_range.go index 3ad8233fb..8776730bb 100644 --- a/lib/logstorage/filter_range.go +++ b/lib/logstorage/filter_range.go @@ -44,9 +44,10 @@ func (fr *filterRange) applyToBlockResult(br *blockResult, bm *bitmap) { return } if c.isTime { + timestamps := br.getTimestamps() minValueInt, maxValueInt := toInt64Range(minValue, maxValue) bm.forEachSetBit(func(idx int) bool { - timestamp := br.timestamps[idx] + timestamp := timestamps[idx] return timestamp >= minValueInt && timestamp <= maxValueInt }) return diff --git a/lib/logstorage/filter_test.go b/lib/logstorage/filter_test.go index 67cc5764a..90f288280 100644 --- a/lib/logstorage/filter_test.go +++ b/lib/logstorage/filter_test.go @@ -213,11 +213,12 @@ func testFilterMatchForStorage(t *testing.T, s *Storage, tenantID TenantID, f fi t.Fatalf("unexpected number of columns in blockResult; got %d; want 2", len(cs)) } values := cs[0].getValues(br) + timestamps := br.getTimestamps() resultsMu.Lock() for i, v := range values { results = append(results, result{ value: strings.Clone(v), - timestamp: br.timestamps[i], + timestamp: timestamps[i], }) } resultsMu.Unlock() diff --git a/lib/logstorage/filter_time.go b/lib/logstorage/filter_time.go index 1009657f6..dfe1a5788 100644 --- a/lib/logstorage/filter_time.go +++ b/lib/logstorage/filter_time.go @@ -41,7 +41,7 @@ func (ft *filterTime) applyToBlockResult(br *blockResult, bm *bitmap) { return } if c.isTime { - timestamps := br.timestamps + timestamps := br.getTimestamps() bm.forEachSetBit(func(idx int) bool { timestamp := timestamps[idx] return ft.matchTimestampValue(timestamp) diff --git a/lib/logstorage/filter_week_range.go b/lib/logstorage/filter_week_range.go index c83ea89f9..3b17845ef 100644 --- a/lib/logstorage/filter_week_range.go +++ b/lib/logstorage/filter_week_range.go @@ -49,7 +49,7 @@ func (fr *filterWeekRange) applyToBlockResult(br *blockResult, bm *bitmap) { return } if c.isTime { - timestamps := br.timestamps + timestamps := br.getTimestamps() bm.forEachSetBit(func(idx int) bool { timestamp := timestamps[idx] return fr.matchTimestampValue(timestamp) diff --git a/lib/logstorage/pipe_copy.go b/lib/logstorage/pipe_copy.go index 1f1c12873..997023167 100644 --- a/lib/logstorage/pipe_copy.go +++ b/lib/logstorage/pipe_copy.go @@ -79,7 +79,7 @@ type pipeCopyProcessor struct { } func (pcp *pipeCopyProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } diff --git a/lib/logstorage/pipe_delete.go b/lib/logstorage/pipe_delete.go index 15ef3c7ec..e61fcfcc6 100644 --- a/lib/logstorage/pipe_delete.go +++ b/lib/logstorage/pipe_delete.go @@ -59,7 +59,7 @@ type pipeDeleteProcessor struct { } func (pdp *pipeDeleteProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } diff --git a/lib/logstorage/pipe_drop_empty_fields.go b/lib/logstorage/pipe_drop_empty_fields.go index 98f846f65..ba18ad65b 100644 --- a/lib/logstorage/pipe_drop_empty_fields.go +++ b/lib/logstorage/pipe_drop_empty_fields.go @@ -66,7 +66,7 @@ type pipeDropEmptyFieldsProcessorShardNopad struct { } func (pdp *pipeDropEmptyFieldsProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } @@ -90,7 +90,7 @@ func (pdp *pipeDropEmptyFieldsProcessor) writeBlock(workerID uint, br *blockResu shard.wctx.init(workerID, pdp.ppNext) fields := shard.fields - for rowIdx := range br.timestamps { + for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ { fields = fields[:0] for i, values := range columnValues { v := values[rowIdx] diff --git a/lib/logstorage/pipe_extract.go b/lib/logstorage/pipe_extract.go index 2aeb49a3c..89f518e31 100644 --- a/lib/logstorage/pipe_extract.go +++ b/lib/logstorage/pipe_extract.go @@ -151,7 +151,7 @@ type pipeExtractProcessorShardNopad struct { } func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } @@ -159,7 +159,7 @@ func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) { shard := &pep.shards[workerID] bm := &shard.bm - bm.init(len(br.timestamps)) + bm.init(br.rowsLen) bm.setBits() if iff := pe.iff; iff != nil { iff.f.applyToBlockResult(br, bm) diff --git a/lib/logstorage/pipe_extract_regexp.go b/lib/logstorage/pipe_extract_regexp.go index 1f7357b21..3ce6ee503 100644 --- a/lib/logstorage/pipe_extract_regexp.go +++ b/lib/logstorage/pipe_extract_regexp.go @@ -175,7 +175,7 @@ type pipeExtractRegexpProcessorShardNopad struct { } func (pep *pipeExtractRegexpProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } @@ -183,7 +183,7 @@ func (pep *pipeExtractRegexpProcessor) writeBlock(workerID uint, br *blockResult shard := &pep.shards[workerID] bm := &shard.bm - bm.init(len(br.timestamps)) + bm.init(br.rowsLen) bm.setBits() if iff := pe.iff; iff != nil { iff.f.applyToBlockResult(br, bm) diff --git a/lib/logstorage/pipe_field_names.go b/lib/logstorage/pipe_field_names.go index 177b3eb84..0ef824ed7 100644 --- a/lib/logstorage/pipe_field_names.go +++ b/lib/logstorage/pipe_field_names.go @@ -94,7 +94,7 @@ func (shard *pipeFieldNamesProcessorShard) getM() map[string]*uint64 { } func (pfp *pipeFieldNamesProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } @@ -113,7 +113,7 @@ func (pfp *pipeFieldNamesProcessor) writeBlock(workerID uint, br *blockResult) { // Assume that the column is set for all the rows in the block. // This is much faster than reading all the column values and counting non-empty rows. - *pHits += uint64(len(br.timestamps)) + *pHits += uint64(br.rowsLen) } } diff --git a/lib/logstorage/pipe_fields.go b/lib/logstorage/pipe_fields.go index 0b4f4186e..f08e5c608 100644 --- a/lib/logstorage/pipe_fields.go +++ b/lib/logstorage/pipe_fields.go @@ -79,7 +79,7 @@ type pipeFieldsProcessor struct { } func (pfp *pipeFieldsProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } diff --git a/lib/logstorage/pipe_filter.go b/lib/logstorage/pipe_filter.go index 3dd22e5e0..7b7308342 100644 --- a/lib/logstorage/pipe_filter.go +++ b/lib/logstorage/pipe_filter.go @@ -83,14 +83,14 @@ type pipeFilterProcessorShardNopad struct { } func (pfp *pipeFilterProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } shard := &pfp.shards[workerID] bm := &shard.bm - bm.init(len(br.timestamps)) + bm.init(br.rowsLen) bm.setBits() pfp.pf.f.applyToBlockResult(br, bm) if bm.areAllBitsSet() { diff --git a/lib/logstorage/pipe_format.go b/lib/logstorage/pipe_format.go index b6e80573e..765810186 100644 --- a/lib/logstorage/pipe_format.go +++ b/lib/logstorage/pipe_format.go @@ -136,7 +136,7 @@ type pipeFormatProcessorShardNopad struct { } func (pfp *pipeFormatProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } @@ -144,7 +144,7 @@ func (pfp *pipeFormatProcessor) writeBlock(workerID uint, br *blockResult) { pf := pfp.pf bm := &shard.bm - bm.init(len(br.timestamps)) + bm.init(br.rowsLen) bm.setBits() if iff := pf.iff; iff != nil { iff.f.applyToBlockResult(br, bm) @@ -157,7 +157,7 @@ func (pfp *pipeFormatProcessor) writeBlock(workerID uint, br *blockResult) { shard.rc.name = pf.resultField resultColumn := br.getColumnByName(pf.resultField) - for rowIdx := range br.timestamps { + for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ { v := "" if bm.isSetBit(rowIdx) { v = shard.formatRow(pf, br, rowIdx) diff --git a/lib/logstorage/pipe_limit.go b/lib/logstorage/pipe_limit.go index 71ddac098..6dec31bae 100644 --- a/lib/logstorage/pipe_limit.go +++ b/lib/logstorage/pipe_limit.go @@ -57,11 +57,11 @@ type pipeLimitProcessor struct { } func (plp *pipeLimitProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } - rowsProcessed := plp.rowsProcessed.Add(uint64(len(br.timestamps))) + rowsProcessed := plp.rowsProcessed.Add(uint64(br.rowsLen)) limit := plp.pl.limit if rowsProcessed <= limit { // Fast path - write all the rows to ppNext. @@ -73,7 +73,7 @@ func (plp *pipeLimitProcessor) writeBlock(workerID uint, br *blockResult) { } // Slow path - overflow. Write the remaining rows if needed. - rowsProcessed -= uint64(len(br.timestamps)) + rowsProcessed -= uint64(br.rowsLen) if rowsProcessed >= limit { // Nothing to write. There is no need in cancel() call, since it has been called by another goroutine. return diff --git a/lib/logstorage/pipe_math.go b/lib/logstorage/pipe_math.go index 7ce50bfb1..c3999351f 100644 --- a/lib/logstorage/pipe_math.go +++ b/lib/logstorage/pipe_math.go @@ -293,12 +293,12 @@ func (shard *pipeMathProcessorShard) executeExpr(me *mathExpr, br *blockResult) rIdx := len(shard.rs) shard.rs = slicesutil.SetLength(shard.rs, len(shard.rs)+1) - shard.rsBuf = slicesutil.SetLength(shard.rsBuf, len(shard.rsBuf)+len(br.timestamps)) - shard.rs[rIdx] = shard.rsBuf[len(shard.rsBuf)-len(br.timestamps):] + shard.rsBuf = slicesutil.SetLength(shard.rsBuf, len(shard.rsBuf)+br.rowsLen) + shard.rs[rIdx] = shard.rsBuf[len(shard.rsBuf)-br.rowsLen:] if me.isConst { r := shard.rs[rIdx] - for i := range br.timestamps { + for i := 0; i < br.rowsLen; i++ { r[i] = me.constValue } return @@ -331,7 +331,7 @@ func (shard *pipeMathProcessorShard) executeExpr(me *mathExpr, br *blockResult) } func (pmp *pipeMathProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } diff --git a/lib/logstorage/pipe_offset.go b/lib/logstorage/pipe_offset.go index dc888f4e5..5f3f92314 100644 --- a/lib/logstorage/pipe_offset.go +++ b/lib/logstorage/pipe_offset.go @@ -51,16 +51,16 @@ type pipeOffsetProcessor struct { } func (pop *pipeOffsetProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } - rowsProcessed := pop.rowsProcessed.Add(uint64(len(br.timestamps))) + rowsProcessed := pop.rowsProcessed.Add(uint64(br.rowsLen)) if rowsProcessed <= pop.po.offset { return } - rowsProcessed -= uint64(len(br.timestamps)) + rowsProcessed -= uint64(br.rowsLen) if rowsProcessed >= pop.po.offset { pop.ppNext.writeBlock(workerID, br) return diff --git a/lib/logstorage/pipe_pack.go b/lib/logstorage/pipe_pack.go index e911121a2..4093d27ee 100644 --- a/lib/logstorage/pipe_pack.go +++ b/lib/logstorage/pipe_pack.go @@ -64,7 +64,7 @@ type pipePackProcessorShardNopad struct { } func (ppp *pipePackProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } @@ -86,7 +86,7 @@ func (ppp *pipePackProcessor) writeBlock(workerID uint, br *blockResult) { buf := shard.buf[:0] fields := shard.fields - for rowIdx := range br.timestamps { + for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ { fields = fields[:0] for _, c := range cs { v := c.getValueAtRow(br, rowIdx) diff --git a/lib/logstorage/pipe_rename.go b/lib/logstorage/pipe_rename.go index ff85a65d8..c35c409ee 100644 --- a/lib/logstorage/pipe_rename.go +++ b/lib/logstorage/pipe_rename.go @@ -83,7 +83,7 @@ type pipeRenameProcessor struct { } func (prp *pipeRenameProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } diff --git a/lib/logstorage/pipe_sort.go b/lib/logstorage/pipe_sort.go index 07fa5d553..caebb08b6 100644 --- a/lib/logstorage/pipe_sort.go +++ b/lib/logstorage/pipe_sort.go @@ -245,11 +245,11 @@ func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) { shard.columnValues = columnValues // Generate byColumns - valuesEncoded := make([]string, len(br.timestamps)) + valuesEncoded := make([]string, br.rowsLen) shard.stateSizeBudget -= len(valuesEncoded) * int(unsafe.Sizeof(valuesEncoded[0])) bb := bbPool.Get() - for rowIdx := range br.timestamps { + for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ { // Marshal all the columns per each row into a single string // and sort rows by the resulting string. bb.B = bb.B[:0] @@ -267,8 +267,8 @@ func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) { } bbPool.Put(bb) - i64Values := make([]int64, len(br.timestamps)) - f64Values := make([]float64, len(br.timestamps)) + i64Values := make([]int64, br.rowsLen) + f64Values := make([]float64, br.rowsLen) for i := range f64Values { f64Values[i] = nan } @@ -347,7 +347,7 @@ func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) { blockIdx := len(shard.blocks) - 1 rowRefs := shard.rowRefs rowRefsLen := len(rowRefs) - for i := range br.timestamps { + for i := 0; i < br.rowsLen; i++ { rowRefs = append(rowRefs, sortRowRef{ blockIdx: blockIdx, rowIdx: i, @@ -405,7 +405,7 @@ func (shard *pipeSortProcessorShard) Less(i, j int) bool { } func (psp *pipeSortProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } @@ -686,8 +686,10 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA int, shardB *pipeSort if cA.c.isTime && cB.c.isTime { // Fast path - sort by _time - tA := bA.br.timestamps[rrA.rowIdx] - tB := bB.br.timestamps[rrB.rowIdx] + timestampsA := bA.br.getTimestamps() + timestampsB := bB.br.getTimestamps() + tA := timestampsA[rrA.rowIdx] + tB := timestampsB[rrB.rowIdx] if tA == tB { continue } diff --git a/lib/logstorage/pipe_sort_topk.go b/lib/logstorage/pipe_sort_topk.go index 6d9d26c20..5a25b3444 100644 --- a/lib/logstorage/pipe_sort_topk.go +++ b/lib/logstorage/pipe_sort_topk.go @@ -182,7 +182,8 @@ func (shard *pipeTopkProcessorShard) writeBlock(br *blockResult) { byColumns := shard.byColumns[:0] byColumnsIsTime := shard.byColumnsIsTime[:0] bb := bbPool.Get() - for rowIdx, timestamp := range br.timestamps { + timestamps := br.getTimestamps() + for rowIdx, timestamp := range timestamps { byColumns = byColumns[:0] bb.B = bb.B[:0] for i, values := range byColumnValues { @@ -234,7 +235,8 @@ func (shard *pipeTopkProcessorShard) writeBlock(br *blockResult) { // add rows to shard byColumns := shard.byColumns[:0] - for rowIdx, timestamp := range br.timestamps { + timestamps := br.getTimestamps() + for rowIdx, timestamp := range timestamps { byColumns = byColumns[:0] for i, values := range byColumnValues { @@ -307,7 +309,7 @@ func (shard *pipeTopkProcessorShard) sortRows(stopCh <-chan struct{}) { } func (ptp *pipeTopkProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index c26b5a8f8..9d1906fd3 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -269,7 +269,7 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) { // Slower generic path for a column with different values. var psg *pipeStatsGroup keyBuf := shard.keyBuf[:0] - for i := range br.timestamps { + for i := 0; i < br.rowsLen; i++ { if i <= 0 || values[i-1] != values[i] { keyBuf = encoding.MarshalBytes(keyBuf[:0], bytesutil.ToUnsafeBytes(values[i])) psg = shard.getPipeStatsGroup(keyBuf) @@ -312,7 +312,7 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) { // The slowest path - group by multiple columns with different values across rows. var psg *pipeStatsGroup keyBuf := shard.keyBuf[:0] - for i := range br.timestamps { + for i := 0; i < br.rowsLen; i++ { // Verify whether the key for 'by (...)' fields equals the previous key sameValue := i > 0 for _, values := range columnValues { @@ -338,7 +338,7 @@ func (shard *pipeStatsProcessorShard) applyPerFunctionFilters(br *blockResult) { funcs := shard.ps.funcs for i := range funcs { bm := &shard.bms[i] - bm.init(len(br.timestamps)) + bm.init(br.rowsLen) bm.setBits() iff := funcs[i].iff @@ -400,7 +400,7 @@ func (psg *pipeStatsGroup) updateStatsForRow(bms []bitmap, br *blockResult, rowI } func (psp *pipeStatsProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } diff --git a/lib/logstorage/pipe_stream_context.go b/lib/logstorage/pipe_stream_context.go index b35824a2c..679e19386 100644 --- a/lib/logstorage/pipe_stream_context.go +++ b/lib/logstorage/pipe_stream_context.go @@ -139,7 +139,8 @@ func getStreamRows(ctx context.Context, s *Storage, streamID string, minTimestam } cs := br.getColumns() - for i, timestamp := range br.timestamps { + timestamps := br.getTimestamps() + for i, timestamp := range timestamps { fields := make([]Field, len(cs)) stateSize += int(unsafe.Sizeof(fields[0])) * len(fields) @@ -210,7 +211,8 @@ func (shard *pipeStreamContextProcessorShard) writeBlock(br *blockResult) { cs := br.getColumns() cStreamID := br.getColumnByName("_stream_id") stateSize := 0 - for i, timestamp := range br.timestamps { + timestamps := br.getTimestamps() + for i, timestamp := range timestamps { fields := make([]Field, len(cs)) stateSize += int(unsafe.Sizeof(fields[0])) * len(fields) @@ -250,7 +252,7 @@ func (shard *pipeStreamContextProcessorShard) getM() map[string][]streamContextR } func (pcp *pipeStreamContextProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } if pcp.pc.linesBefore <= 0 && pcp.pc.linesAfter <= 0 { diff --git a/lib/logstorage/pipe_top.go b/lib/logstorage/pipe_top.go index 26eb55fb7..6230ad161 100644 --- a/lib/logstorage/pipe_top.go +++ b/lib/logstorage/pipe_top.go @@ -145,7 +145,7 @@ func (shard *pipeTopProcessorShard) writeBlock(br *blockResult) { // Take into account all the columns in br. keyBuf := shard.keyBuf cs := br.getColumns() - for i := range br.timestamps { + for i := 0; i < br.rowsLen; i++ { keyBuf = keyBuf[:0] for _, c := range cs { v := c.getValueAtRow(br, i) @@ -162,7 +162,7 @@ func (shard *pipeTopProcessorShard) writeBlock(br *blockResult) { c := br.getColumnByName(byFields[0]) if c.isConst { v := c.valuesEncoded[0] - shard.updateState(v, uint64(len(br.timestamps))) + shard.updateState(v, uint64(br.rowsLen)) return } if c.valueType == valueTypeDict { @@ -197,7 +197,7 @@ func (shard *pipeTopProcessorShard) writeBlock(br *blockResult) { shard.columnValues = columnValues keyBuf := shard.keyBuf - for i := range br.timestamps { + for i := 0; i < br.rowsLen; i++ { keyBuf = keyBuf[:0] for _, values := range columnValues { keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[i])) @@ -228,7 +228,7 @@ func (shard *pipeTopProcessorShard) getM() map[string]*uint64 { } func (ptp *pipeTopProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } diff --git a/lib/logstorage/pipe_uniq.go b/lib/logstorage/pipe_uniq.go index 8e33d7eb0..ef1b2985f 100644 --- a/lib/logstorage/pipe_uniq.go +++ b/lib/logstorage/pipe_uniq.go @@ -147,7 +147,7 @@ func (shard *pipeUniqProcessorShard) writeBlock(br *blockResult) bool { // Take into account all the columns in br. keyBuf := shard.keyBuf cs := br.getColumns() - for i := range br.timestamps { + for i := 0; i < br.rowsLen; i++ { keyBuf = keyBuf[:0] for _, c := range cs { v := c.getValueAtRow(br, i) @@ -164,7 +164,7 @@ func (shard *pipeUniqProcessorShard) writeBlock(br *blockResult) bool { c := br.getColumnByName(byFields[0]) if c.isConst { v := c.valuesEncoded[0] - shard.updateState(v, uint64(len(br.timestamps))) + shard.updateState(v, uint64(br.rowsLen)) return true } if c.valueType == valueTypeDict { @@ -207,7 +207,7 @@ func (shard *pipeUniqProcessorShard) writeBlock(br *blockResult) bool { shard.columnValues = columnValues keyBuf := shard.keyBuf - for i := range br.timestamps { + for i := 0; i < br.rowsLen; i++ { seenValue := true for _, values := range columnValues { if needHits || i == 0 || values[i-1] != values[i] { @@ -251,7 +251,7 @@ func (shard *pipeUniqProcessorShard) getM() map[string]*uint64 { } func (pup *pipeUniqProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } diff --git a/lib/logstorage/pipe_unpack.go b/lib/logstorage/pipe_unpack.go index fbc4589f1..c4c53fca1 100644 --- a/lib/logstorage/pipe_unpack.go +++ b/lib/logstorage/pipe_unpack.go @@ -148,7 +148,7 @@ type pipeUnpackProcessorShardNopad struct { } func (pup *pipeUnpackProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } @@ -157,7 +157,7 @@ func (pup *pipeUnpackProcessor) writeBlock(workerID uint, br *blockResult) { shard.uctx.init(pup.fieldPrefix) bm := &shard.bm - bm.init(len(br.timestamps)) + bm.init(br.rowsLen) bm.setBits() if pup.iff != nil { pup.iff.f.applyToBlockResult(br, bm) @@ -172,7 +172,7 @@ func (pup *pipeUnpackProcessor) writeBlock(workerID uint, br *blockResult) { v := c.valuesEncoded[0] shard.uctx.resetFields() pup.unpackFunc(&shard.uctx, v) - for rowIdx := range br.timestamps { + for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ { if bm.isSetBit(rowIdx) { shard.wctx.writeRow(rowIdx, shard.uctx.fields) } else { diff --git a/lib/logstorage/pipe_unroll.go b/lib/logstorage/pipe_unroll.go index 11a012529..edeb3d5d2 100644 --- a/lib/logstorage/pipe_unroll.go +++ b/lib/logstorage/pipe_unroll.go @@ -106,7 +106,7 @@ type pipeUnrollProcessorShardNopad struct { } func (pup *pipeUnrollProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } @@ -115,7 +115,7 @@ func (pup *pipeUnrollProcessor) writeBlock(workerID uint, br *blockResult) { shard.wctx.init(workerID, pup.ppNext, false, false, br) bm := &shard.bm - bm.init(len(br.timestamps)) + bm.init(br.rowsLen) bm.setBits() if iff := pu.iff; iff != nil { iff.f.applyToBlockResult(br, bm) @@ -133,7 +133,7 @@ func (pup *pipeUnrollProcessor) writeBlock(workerID uint, br *blockResult) { } fields := shard.fields - for rowIdx := range br.timestamps { + for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ { if bm.isSetBit(rowIdx) { if needStop(pup.stopCh) { return diff --git a/lib/logstorage/pipe_update.go b/lib/logstorage/pipe_update.go index 2684bd9a5..15fc02415 100644 --- a/lib/logstorage/pipe_update.go +++ b/lib/logstorage/pipe_update.go @@ -62,14 +62,14 @@ type pipeUpdateProcessorShardNopad struct { } func (pup *pipeUpdateProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } shard := &pup.shards[workerID] bm := &shard.bm - bm.init(len(br.timestamps)) + bm.init(br.rowsLen) bm.setBits() if iff := pup.iff; iff != nil { iff.f.applyToBlockResult(br, bm) diff --git a/lib/logstorage/pipe_utils_test.go b/lib/logstorage/pipe_utils_test.go index 418f235bc..5a638c38d 100644 --- a/lib/logstorage/pipe_utils_test.go +++ b/lib/logstorage/pipe_utils_test.go @@ -136,7 +136,7 @@ func (pp *testPipeProcessor) writeBlock(_ uint, br *blockResult) { columnValues = append(columnValues, values) } - for i := range br.timestamps { + for i := 0; i < br.rowsLen; i++ { row := make([]Field, len(columnValues)) for j, values := range columnValues { r := &row[j] diff --git a/lib/logstorage/stats_count.go b/lib/logstorage/stats_count.go index 8b852ef0e..9b7255c8e 100644 --- a/lib/logstorage/stats_count.go +++ b/lib/logstorage/stats_count.go @@ -18,7 +18,7 @@ func (sc *statsCount) String() string { func (sc *statsCount) updateNeededFields(neededFields fieldsSet) { if len(sc.fields) == 0 { - // There is no need in fetching any columns for count(*) - the number of matching rows can be calculated as len(blockResult.timestamps) + // There is no need in fetching any columns for count(*) - the number of matching rows can be calculated as blockResult.rowsLen return } neededFields.addFields(sc.fields) @@ -41,7 +41,7 @@ func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int { fields := scp.sc.fields if len(fields) == 0 { // Fast path - unconditionally count all the columns. - scp.rowsCount += uint64(len(br.timestamps)) + scp.rowsCount += uint64(br.rowsLen) return 0 } if len(fields) == 1 { @@ -49,12 +49,12 @@ func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int { c := br.getColumnByName(fields[0]) if c.isConst { if c.valuesEncoded[0] != "" { - scp.rowsCount += uint64(len(br.timestamps)) + scp.rowsCount += uint64(br.rowsLen) } return 0 } if c.isTime { - scp.rowsCount += uint64(len(br.timestamps)) + scp.rowsCount += uint64(br.rowsLen) return 0 } switch c.valueType { @@ -68,7 +68,7 @@ func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int { case valueTypeDict: zeroDictIdx := slices.Index(c.dictValues, "") if zeroDictIdx < 0 { - scp.rowsCount += uint64(len(br.timestamps)) + scp.rowsCount += uint64(br.rowsLen) return 0 } for _, v := range c.getValuesEncoded(br) { @@ -78,7 +78,7 @@ func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int { } return 0 case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64, valueTypeFloat64, valueTypeIPv4, valueTypeTimestampISO8601: - scp.rowsCount += uint64(len(br.timestamps)) + scp.rowsCount += uint64(br.rowsLen) return 0 default: logger.Panicf("BUG: unknown valueType=%d", c.valueType) @@ -87,7 +87,7 @@ func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int { } // Slow path - count rows containing at least a single non-empty value for the fields enumerated inside count(). - bm := getBitmap(len(br.timestamps)) + bm := getBitmap(br.rowsLen) defer putBitmap(bm) bm.setBits() @@ -95,13 +95,13 @@ func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int { c := br.getColumnByName(f) if c.isConst { if c.valuesEncoded[0] != "" { - scp.rowsCount += uint64(len(br.timestamps)) + scp.rowsCount += uint64(br.rowsLen) return 0 } continue } if c.isTime { - scp.rowsCount += uint64(len(br.timestamps)) + scp.rowsCount += uint64(br.rowsLen) return 0 } @@ -113,7 +113,7 @@ func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int { }) case valueTypeDict: if !slices.Contains(c.dictValues, "") { - scp.rowsCount += uint64(len(br.timestamps)) + scp.rowsCount += uint64(br.rowsLen) return 0 } valuesEncoded := c.getValuesEncoded(br) @@ -122,7 +122,7 @@ func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int { return c.dictValues[dictIdx] == "" }) case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64, valueTypeFloat64, valueTypeIPv4, valueTypeTimestampISO8601: - scp.rowsCount += uint64(len(br.timestamps)) + scp.rowsCount += uint64(br.rowsLen) return 0 default: logger.Panicf("BUG: unknown valueType=%d", c.valueType) @@ -130,7 +130,7 @@ func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int { } } - scp.rowsCount += uint64(len(br.timestamps)) + scp.rowsCount += uint64(br.rowsLen) scp.rowsCount -= uint64(bm.onesCount()) return 0 } diff --git a/lib/logstorage/stats_count_empty.go b/lib/logstorage/stats_count_empty.go index 69c180911..9d2193080 100644 --- a/lib/logstorage/stats_count_empty.go +++ b/lib/logstorage/stats_count_empty.go @@ -36,7 +36,7 @@ type statsCountEmptyProcessor struct { func (scp *statsCountEmptyProcessor) updateStatsForAllRows(br *blockResult) int { fields := scp.sc.fields if len(fields) == 0 { - bm := getBitmap(len(br.timestamps)) + bm := getBitmap(br.rowsLen) bm.setBits() for _, c := range br.getColumns() { values := c.getValues(br) @@ -53,7 +53,7 @@ func (scp *statsCountEmptyProcessor) updateStatsForAllRows(br *blockResult) int c := br.getColumnByName(fields[0]) if c.isConst { if c.valuesEncoded[0] == "" { - scp.rowsCount += uint64(len(br.timestamps)) + scp.rowsCount += uint64(br.rowsLen) } return 0 } @@ -88,7 +88,7 @@ func (scp *statsCountEmptyProcessor) updateStatsForAllRows(br *blockResult) int } // Slow path - count rows containing empty value for all the fields enumerated inside count_empty(). - bm := getBitmap(len(br.timestamps)) + bm := getBitmap(br.rowsLen) defer putBitmap(bm) bm.setBits() diff --git a/lib/logstorage/stats_count_uniq.go b/lib/logstorage/stats_count_uniq.go index f61d461db..b7aa87e4e 100644 --- a/lib/logstorage/stats_count_uniq.go +++ b/lib/logstorage/stats_count_uniq.go @@ -64,7 +64,7 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int { sup.columnValues = columnValues keyBuf := sup.keyBuf[:0] - for i := range br.timestamps { + for i := 0; i < br.rowsLen; i++ { seenKey := true for _, values := range columnValues { if i == 0 || values[i-1] != values[i] { @@ -103,8 +103,8 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int { // This guarantees that keys do not clash for different column types across blocks. c := br.getColumnByName(fields[0]) if c.isTime { - // Count unique br.timestamps - timestamps := br.timestamps + // Count unique timestamps + timestamps := br.getTimestamps() keyBuf := sup.keyBuf[:0] for i, timestamp := range timestamps { if i > 0 && timestamps[i-1] == timestamps[i] { @@ -180,7 +180,7 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int { sup.columnValues = columnValues keyBuf := sup.keyBuf[:0] - for i := range br.timestamps { + for i := 0; i < br.rowsLen; i++ { seenKey := true for _, values := range columnValues { if i == 0 || values[i-1] != values[i] { @@ -247,10 +247,11 @@ func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx in // This guarantees that keys do not clash for different column types across blocks. c := br.getColumnByName(fields[0]) if c.isTime { - // Count unique br.timestamps + // Count unique timestamps + timestamps := br.getTimestamps() keyBuf := sup.keyBuf[:0] keyBuf = append(keyBuf[:0], 1) - keyBuf = encoding.MarshalInt64(keyBuf, br.timestamps[rowIdx]) + keyBuf = encoding.MarshalInt64(keyBuf, timestamps[rowIdx]) stateSizeIncrease += sup.updateState(keyBuf) sup.keyBuf = keyBuf return stateSizeIncrease diff --git a/lib/logstorage/stats_max.go b/lib/logstorage/stats_max.go index c92cd7a75..22eb0cbee 100644 --- a/lib/logstorage/stats_max.go +++ b/lib/logstorage/stats_max.go @@ -80,20 +80,12 @@ func (smp *statsMaxProcessor) mergeState(sfp statsProcessor) { } func (smp *statsMaxProcessor) updateStateForColumn(br *blockResult, c *blockResultColumn) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } if c.isTime { - // Special case for time column - timestamps := br.timestamps - maxTimestamp := timestamps[len(timestamps)-1] - for _, timestamp := range timestamps[:len(timestamps)-1] { - if timestamp > maxTimestamp { - maxTimestamp = timestamp - } - } - + maxTimestamp := br.getMaxTimestamp() bb := bbPool.Get() bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], maxTimestamp) smp.updateStateBytes(bb.B) diff --git a/lib/logstorage/stats_min.go b/lib/logstorage/stats_min.go index fe9890dab..4be8d550e 100644 --- a/lib/logstorage/stats_min.go +++ b/lib/logstorage/stats_min.go @@ -82,20 +82,12 @@ func (smp *statsMinProcessor) mergeState(sfp statsProcessor) { } func (smp *statsMinProcessor) updateStateForColumn(br *blockResult, c *blockResultColumn) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } if c.isTime { - // Special case for time column - timestamps := br.timestamps - minTimestamp := timestamps[0] - for _, timestamp := range timestamps[1:] { - if timestamp < minTimestamp { - minTimestamp = timestamp - } - } - + minTimestamp := br.getMinTimestamp() bb := bbPool.Get() bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], minTimestamp) smp.updateStateBytes(bb.B) diff --git a/lib/logstorage/stats_quantile.go b/lib/logstorage/stats_quantile.go index 4bf988f28..16c8446d6 100644 --- a/lib/logstorage/stats_quantile.go +++ b/lib/logstorage/stats_quantile.go @@ -96,7 +96,7 @@ func (sqp *statsQuantileProcessor) updateStateForColumn(br *blockResult, c *bloc if c.isConst { f, ok := tryParseFloat64(c.valuesEncoded[0]) if ok { - for range br.timestamps { + for i := 0; i < br.rowsLen; i++ { stateSizeIncrease += h.update(f) } } diff --git a/lib/logstorage/stats_row_any.go b/lib/logstorage/stats_row_any.go index 0060189ce..04942c662 100644 --- a/lib/logstorage/stats_row_any.go +++ b/lib/logstorage/stats_row_any.go @@ -39,7 +39,7 @@ type statsRowAnyProcessor struct { } func (sap *statsRowAnyProcessor) updateStatsForAllRows(br *blockResult) int { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return 0 } if sap.captured { diff --git a/lib/logstorage/stats_row_max.go b/lib/logstorage/stats_row_max.go index 31c3689a9..9982abebd 100644 --- a/lib/logstorage/stats_row_max.go +++ b/lib/logstorage/stats_row_max.go @@ -60,8 +60,9 @@ func (smp *statsRowMaxProcessor) updateStatsForAllRows(br *blockResult) int { return stateSizeIncrease } if c.isTime { + maxTimestamp := br.getMaxTimestamp() bb := bbPool.Get() - bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], br.timestamps[0]) + bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], maxTimestamp) v := bytesutil.ToUnsafeString(bb.B) stateSizeIncrease += smp.updateState(v, br, 0) bbPool.Put(bb) @@ -124,8 +125,9 @@ func (smp *statsRowMaxProcessor) updateStatsForRow(br *blockResult, rowIdx int) return stateSizeIncrease } if c.isTime { + timestamps := br.getTimestamps() bb := bbPool.Get() - bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], br.timestamps[rowIdx]) + bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], timestamps[rowIdx]) v := bytesutil.ToUnsafeString(bb.B) stateSizeIncrease += smp.updateState(v, br, rowIdx) bbPool.Put(bb) diff --git a/lib/logstorage/stats_row_min.go b/lib/logstorage/stats_row_min.go index 66415dd90..e0f724b79 100644 --- a/lib/logstorage/stats_row_min.go +++ b/lib/logstorage/stats_row_min.go @@ -60,8 +60,9 @@ func (smp *statsRowMinProcessor) updateStatsForAllRows(br *blockResult) int { return stateSizeIncrease } if c.isTime { + minTimestamp := br.getMinTimestamp() bb := bbPool.Get() - bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], br.timestamps[0]) + bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], minTimestamp) v := bytesutil.ToUnsafeString(bb.B) stateSizeIncrease += smp.updateState(v, br, 0) bbPool.Put(bb) @@ -124,8 +125,9 @@ func (smp *statsRowMinProcessor) updateStatsForRow(br *blockResult, rowIdx int) return stateSizeIncrease } if c.isTime { + timestamps := br.getTimestamps() bb := bbPool.Get() - bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], br.timestamps[rowIdx]) + bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], timestamps[rowIdx]) v := bytesutil.ToUnsafeString(bb.B) stateSizeIncrease += smp.updateState(v, br, rowIdx) bbPool.Put(bb) diff --git a/lib/logstorage/stats_values.go b/lib/logstorage/stats_values.go index ceab48998..aab5475d5 100644 --- a/lib/logstorage/stats_values.go +++ b/lib/logstorage/stats_values.go @@ -64,12 +64,12 @@ func (svp *statsValuesProcessor) updateStatsForAllRowsColumn(c *blockResultColum stateSizeIncrease += len(v) values := svp.values - for range br.timestamps { + for i := 0; i < br.rowsLen; i++ { values = append(values, v) } svp.values = values - stateSizeIncrease += len(br.timestamps) * int(unsafe.Sizeof(values[0])) + stateSizeIncrease += br.rowsLen * int(unsafe.Sizeof(values[0])) return stateSizeIncrease } if c.valueType == valueTypeDict { @@ -86,7 +86,7 @@ func (svp *statsValuesProcessor) updateStatsForAllRowsColumn(c *blockResultColum } svp.values = values - stateSizeIncrease += len(br.timestamps) * int(unsafe.Sizeof(values[0])) + stateSizeIncrease += br.rowsLen * int(unsafe.Sizeof(values[0])) return stateSizeIncrease } @@ -100,7 +100,7 @@ func (svp *statsValuesProcessor) updateStatsForAllRowsColumn(c *blockResultColum } svp.values = values - stateSizeIncrease += len(br.timestamps) * int(unsafe.Sizeof(values[0])) + stateSizeIncrease += br.rowsLen * int(unsafe.Sizeof(values[0])) return stateSizeIncrease } diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index 3f9185f54..58724fa58 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -86,7 +86,7 @@ func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query, } writeBlockResult := func(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } @@ -101,7 +101,9 @@ func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query, Values: values, }) } - writeBlock(workerID, br.timestamps, csDst) + + timestamps := br.getTimestamps() + writeBlock(workerID, timestamps, csDst) brs.cs = csDst putBlockRows(brs) @@ -233,7 +235,7 @@ func (s *Storage) getFieldValuesNoHits(ctx context.Context, tenantIDs []TenantID var values []string var valuesLock sync.Mutex writeBlockResult := func(_ uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } @@ -396,7 +398,7 @@ func (s *Storage) runValuesWithHitsQuery(ctx context.Context, tenantIDs []Tenant var results []ValueWithHits var resultsLock sync.Mutex writeBlockResult := func(_ uint, br *blockResult) { - if len(br.timestamps) == 0 { + if br.rowsLen == 0 { return } @@ -656,7 +658,7 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch } bs.search(bsw, bm) - if len(bs.br.timestamps) > 0 { + if bs.br.rowsLen > 0 { processBlockResult(workerID, &bs.br) } bsw.reset() diff --git a/lib/logstorage/storage_search_test.go b/lib/logstorage/storage_search_test.go index be39d2d05..0dd9d4f6a 100644 --- a/lib/logstorage/storage_search_test.go +++ b/lib/logstorage/storage_search_test.go @@ -876,7 +876,7 @@ func TestStorageSearch(t *testing.T) { so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"}) var rowsCountTotal atomic.Uint32 processBlock := func(_ uint, br *blockResult) { - rowsCountTotal.Add(uint32(len(br.timestamps))) + rowsCountTotal.Add(uint32(br.rowsLen)) } s.search(workersCount, so, nil, processBlock) @@ -893,7 +893,7 @@ func TestStorageSearch(t *testing.T) { so := newTestGenericSearchOptions(allTenantIDs, f, []string{"_msg"}) var rowsCountTotal atomic.Uint32 processBlock := func(_ uint, br *blockResult) { - rowsCountTotal.Add(uint32(len(br.timestamps))) + rowsCountTotal.Add(uint32(br.rowsLen)) } s.search(workersCount, so, nil, processBlock) @@ -926,7 +926,7 @@ func TestStorageSearch(t *testing.T) { so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"}) var rowsCountTotal atomic.Uint32 processBlock := func(_ uint, br *blockResult) { - rowsCountTotal.Add(uint32(len(br.timestamps))) + rowsCountTotal.Add(uint32(br.rowsLen)) } s.search(workersCount, so, nil, processBlock) @@ -948,7 +948,7 @@ func TestStorageSearch(t *testing.T) { so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"}) var rowsCountTotal atomic.Uint32 processBlock := func(_ uint, br *blockResult) { - rowsCountTotal.Add(uint32(len(br.timestamps))) + rowsCountTotal.Add(uint32(br.rowsLen)) } s.search(workersCount, so, nil, processBlock) @@ -978,7 +978,7 @@ func TestStorageSearch(t *testing.T) { so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"}) var rowsCountTotal atomic.Uint32 processBlock := func(_ uint, br *blockResult) { - rowsCountTotal.Add(uint32(len(br.timestamps))) + rowsCountTotal.Add(uint32(br.rowsLen)) } s.search(workersCount, so, nil, processBlock) @@ -999,7 +999,7 @@ func TestStorageSearch(t *testing.T) { so := newTestGenericSearchOptions([]TenantID{tenantID}, f, []string{"_msg"}) var rowsCountTotal atomic.Uint32 processBlock := func(_ uint, br *blockResult) { - rowsCountTotal.Add(uint32(len(br.timestamps))) + rowsCountTotal.Add(uint32(br.rowsLen)) } s.search(workersCount, so, nil, processBlock)