diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index 31693c8ed..21ed4ae46 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -29,14 +29,20 @@ type blockResult struct { // timestamps contain timestamps for the selected log entries in the block. timestamps []int64 - // csOffset contains cs offset for the requested columns. + // csBufOffset contains csBuf offset for the requested columns. // - // columns with indexes below csOffset are ignored. + // columns with indexes below csBufOffset are ignored. // This is needed for simplifying data transformations at pipe stages. - csOffset int + csBufOffset int - // cs contains requested columns. - cs []blockResultColumn + // csBuf contains requested columns. + csBuf []blockResultColumn + + // cs contains cached pointers to requested columns returned from getColumns() if csInitialized=true. + cs []*blockResultColumn + + // csInitialized is set to true if cs is properly initialized and can be returned from getColumns(). + csInitialized bool } func (br *blockResult) reset() { @@ -49,10 +55,15 @@ func (br *blockResult) reset() { br.timestamps = br.timestamps[:0] - br.csOffset = 0 + br.csBufOffset = 0 + + clear(br.csBuf) + br.csBuf = br.csBuf[:0] clear(br.cs) br.cs = br.cs[:0] + + br.csInitialized = false } // clone returns a clone of br, which owns its own memory. @@ -82,7 +93,7 @@ func (br *blockResult) clone() *blockResult { for i, c := range cs { csNew[i] = c.clone(brNew) } - brNew.cs = csNew + brNew.csBuf = csNew return brNew } @@ -116,6 +127,7 @@ func (br *blockResult) sizeBytes() int { n += cap(br.buf) n += cap(br.valuesBuf) * int(unsafe.Sizeof(br.valuesBuf[0])) n += cap(br.timestamps) * int(unsafe.Sizeof(br.timestamps[0])) + n += cap(br.csBuf) * int(unsafe.Sizeof(br.csBuf[0])) n += cap(br.cs) * int(unsafe.Sizeof(br.cs[0])) return n @@ -132,24 +144,25 @@ func (br *blockResult) setResultColumns(rcs []resultColumn) { br.timestamps = fastnum.AppendInt64Zeros(br.timestamps[:0], len(rcs[0].values)) - cs := br.cs + csBuf := br.csBuf for _, rc := range rcs { if areConstValues(rc.values) { // This optimization allows reducing memory usage after br cloning - cs = append(cs, blockResultColumn{ + csBuf = append(csBuf, blockResultColumn{ name: rc.name, isConst: true, encodedValues: rc.values[:1], }) } else { - cs = append(cs, blockResultColumn{ + csBuf = append(csBuf, blockResultColumn{ name: rc.name, valueType: valueTypeString, encodedValues: rc.values, }) } } - br.cs = cs + br.csBuf = csBuf + br.csInitialized = false } func (br *blockResult) fetchAllColumns(bs *blockSearch, bm *bitmap) { @@ -349,21 +362,23 @@ func (br *blockResult) addColumn(bs *blockSearch, ch *columnHeader, bm *bitmap) dictValues = valuesBuf[valuesBufLen:] name := getCanonicalColumnName(ch.name) - br.cs = append(br.cs, blockResultColumn{ + br.csBuf = append(br.csBuf, blockResultColumn{ name: name, valueType: ch.valueType, dictValues: dictValues, encodedValues: encodedValues, }) + br.csInitialized = false br.buf = buf br.valuesBuf = valuesBuf } func (br *blockResult) addTimeColumn() { - br.cs = append(br.cs, blockResultColumn{ + br.csBuf = append(br.csBuf, blockResultColumn{ name: "_time", isTime: true, }) + br.csInitialized = false } func (br *blockResult) addStreamColumn(bs *blockSearch) bool { @@ -401,11 +416,12 @@ func (br *blockResult) addConstColumn(name, value string) { valuesBuf = append(valuesBuf, s) br.valuesBuf = valuesBuf - br.cs = append(br.cs, blockResultColumn{ + br.csBuf = append(br.csBuf, blockResultColumn{ name: name, isConst: true, encodedValues: valuesBuf[valuesBufLen:], }) + br.csInitialized = false } func (br *blockResult) getBucketedColumnValues(c *blockResultColumn, bucketSize, bucketOffset float64) []string { @@ -1027,20 +1043,21 @@ func (br *blockResult) copyColumns(srcColumnNames, dstColumnNames []string) { return } - cs := br.cs - csOffset := len(cs) + csBuf := br.csBuf + csBufOffset := len(csBuf) for _, c := range br.getColumns() { if idx := slices.Index(srcColumnNames, c.name); idx >= 0 { c.name = dstColumnNames[idx] - cs = append(cs, c) + csBuf = append(csBuf, *c) // continue is skipped intentionally in order to leave the original column in the columns list. } if !slices.Contains(dstColumnNames, c.name) { - cs = append(cs, c) + csBuf = append(csBuf, *c) } } - br.csOffset = csOffset - br.cs = cs + br.csBufOffset = csBufOffset + br.csBuf = csBuf + br.csInitialized = false for _, dstColumnName := range dstColumnNames { br.createMissingColumnByName(dstColumnName) @@ -1053,20 +1070,21 @@ func (br *blockResult) renameColumns(srcColumnNames, dstColumnNames []string) { return } - cs := br.cs - csOffset := len(cs) + csBuf := br.csBuf + csBufOffset := len(csBuf) for _, c := range br.getColumns() { if idx := slices.Index(srcColumnNames, c.name); idx >= 0 { c.name = dstColumnNames[idx] - cs = append(cs, c) + csBuf = append(csBuf, *c) continue } if !slices.Contains(dstColumnNames, c.name) { - cs = append(cs, c) + csBuf = append(csBuf, *c) } } - br.csOffset = csOffset - br.cs = cs + br.csBufOffset = csBufOffset + br.csBuf = csBuf + br.csInitialized = false for _, dstColumnName := range dstColumnNames { br.createMissingColumnByName(dstColumnName) @@ -1079,15 +1097,16 @@ func (br *blockResult) deleteColumns(columnNames []string) { return } - cs := br.cs - csOffset := len(cs) + csBuf := br.csBuf + csBufOffset := len(csBuf) for _, c := range br.getColumns() { if !slices.Contains(columnNames, c.name) { - cs = append(cs, c) + csBuf = append(csBuf, *c) } } - br.csOffset = csOffset - br.cs = cs + br.csBufOffset = csBufOffset + br.csBuf = csBuf + br.csInitialized = false } // setColumns sets the resulting columns to the given columnNames. @@ -1098,14 +1117,15 @@ func (br *blockResult) setColumns(columnNames []string) { } // Slow path - construct the requested columns - cs := br.cs - csOffset := len(cs) + csBuf := br.csBuf + csBufOffset := len(csBuf) for _, columnName := range columnNames { c := br.getColumnByName(columnName) - cs = append(cs, c) + csBuf = append(csBuf, *c) } - br.csOffset = csOffset - br.cs = cs + br.csBufOffset = csBufOffset + br.csBuf = csBuf + br.csInitialized = false } func (br *blockResult) areSameColumns(columnNames []string) bool { @@ -1113,53 +1133,55 @@ func (br *blockResult) areSameColumns(columnNames []string) bool { if len(cs) != len(columnNames) { return false } - for i := range cs { - if cs[i].name != columnNames[i] { + for i, c := range cs { + if c.name != columnNames[i] { return false } } return true } -func (br *blockResult) getColumnByName(columnName string) blockResultColumn { - cs := br.getColumns() - for i := range cs { - if cs[i].name == columnName { - return cs[i] +func (br *blockResult) getColumnByName(columnName string) *blockResultColumn { + for _, c := range br.getColumns() { + if c.name == columnName { + return c } } - return blockResultColumn{ - name: columnName, - isConst: true, - encodedValues: getEmptyStrings(1), - } + br.addConstColumn(columnName, "") + return &br.csBuf[len(br.csBuf)-1] } func (br *blockResult) createMissingColumnByName(columnName string) { - cs := br.getColumns() - for i := range cs { - if cs[i].name == columnName { + for _, c := range br.getColumns() { + if c.name == columnName { return } } - br.cs = append(br.cs, blockResultColumn{ - name: columnName, - isConst: true, - encodedValues: getEmptyStrings(1), - }) + br.addConstColumn(columnName, "") } -func (br *blockResult) getColumns() []blockResultColumn { - return br.cs[br.csOffset:] +func (br *blockResult) getColumns() []*blockResultColumn { + if br.csInitialized { + return br.cs + } + + csBuf := br.csBuf[br.csBufOffset:] + clear(br.cs) + cs := br.cs[:0] + for i := range csBuf { + cs = append(cs, &csBuf[i]) + } + br.cs = cs + br.csInitialized = true + + return br.cs } func (br *blockResult) skipRows(skipRows int) { br.timestamps = append(br.timestamps[:0], br.timestamps[skipRows:]...) - cs := br.getColumns() - for i := range cs { - c := &cs[i] + for _, c := range br.getColumns() { if c.values != nil { c.values = append(c.values[:0], c.values[skipRows:]...) } @@ -1174,9 +1196,7 @@ func (br *blockResult) skipRows(skipRows int) { func (br *blockResult) truncateRows(keepRows int) { br.timestamps = br.timestamps[:keepRows] - cs := br.getColumns() - for i := range cs { - c := &cs[i] + for _, c := range br.getColumns() { if c.values != nil { c.values = c.values[:keepRows] } diff --git a/lib/logstorage/pipe_sort.go b/lib/logstorage/pipe_sort.go index 5207b0e96..c411a5be2 100644 --- a/lib/logstorage/pipe_sort.go +++ b/lib/logstorage/pipe_sort.go @@ -127,13 +127,13 @@ type sortBlock struct { byColumns []sortBlockByColumn // otherColumns refers block data for other than 'by(...)' columns - otherColumns []blockResultColumn + otherColumns []*blockResultColumn } // sortBlockByColumn represents data for a single column from 'sort by(...)' clause. type sortBlockByColumn struct { // c contains column data - c blockResultColumn + c *blockResultColumn // i64Values contains int64 numbers parsed from values i64Values []int64 @@ -182,8 +182,7 @@ func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) { // JSON-encode all the columns per each row into a single string // and sort rows by the resulting string. bb.B = bb.B[:0] - for j := range cs { - c := &cs[j] + for _, c := range cs { v := c.getValueAtRow(br, i) bb.B = marshalJSONKeyValue(bb.B, c.name, v) bb.B = append(bb.B, ',') @@ -193,7 +192,7 @@ func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) { bbPool.Put(bb) byColumns := []sortBlockByColumn{ { - c: blockResultColumn{ + c: &blockResultColumn{ valueType: valueTypeString, encodedValues: rc.values, }, @@ -201,7 +200,7 @@ func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) { f64Values: make([]float64, len(br.timestamps)), }, } - shard.stateSizeBudget -= int(unsafe.Sizeof(byColumns[0])) + shard.stateSizeBudget -= int(unsafe.Sizeof(byColumns[0]) + unsafe.Sizeof(*byColumns[0].c)) // Append br to shard.blocks. shard.blocks = append(shard.blocks, sortBlock{ @@ -236,7 +235,7 @@ func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) { shard.stateSizeBudget -= len(byColumns) * int(unsafe.Sizeof(byColumns[0])) // Collect values for other columns. - otherColumns := make([]blockResultColumn, 0, len(cs)) + otherColumns := make([]*blockResultColumn, 0, len(cs)) for _, c := range cs { isByField := false for _, bf := range byFields { @@ -494,9 +493,8 @@ func (wctx *pipeSortWriteContext) writeRow(shard *pipeSortProcessorShard, rowIdx wctx.valuesLen += len(v) } - otherColumns := b.otherColumns - for i := range otherColumns { - v := otherColumns[i].getValueAtRow(br, rr.rowIdx) + for i, c := range b.otherColumns { + v := c.getValueAtRow(br, rr.rowIdx) rcs[len(byFields)+i].addValue(v) wctx.valuesLen += len(v) } diff --git a/lib/logstorage/stats_count_uniq.go b/lib/logstorage/stats_count_uniq.go index ca5f9b0cf..931535667 100644 --- a/lib/logstorage/stats_count_uniq.go +++ b/lib/logstorage/stats_count_uniq.go @@ -47,11 +47,11 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int { stateSizeIncrease := 0 if sup.su.containsStar { // Count unique rows - columns := br.getColumns() + cs := br.getColumns() keyBuf := sup.keyBuf[:0] for i := range br.timestamps { seenKey := true - for _, c := range columns { + for _, c := range cs { values := c.getValues(br) if i == 0 || values[i-1] != values[i] { seenKey = false @@ -65,7 +65,7 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int { allEmptyValues := true keyBuf = keyBuf[:0] - for _, c := range columns { + for _, c := range cs { v := c.getValueAtRow(br, i) if v != "" { allEmptyValues = false diff --git a/lib/logstorage/stats_uniq_values.go b/lib/logstorage/stats_uniq_values.go index 6542bb143..5602e9415 100644 --- a/lib/logstorage/stats_uniq_values.go +++ b/lib/logstorage/stats_uniq_values.go @@ -41,14 +41,13 @@ type statsUniqValuesProcessor struct { func (sup *statsUniqValuesProcessor) updateStatsForAllRows(br *blockResult) int { stateSizeIncrease := 0 if sup.su.containsStar { - columns := br.getColumns() - for i := range columns { - stateSizeIncrease += sup.updateStatsForAllRowsColumn(&columns[i], br) + for _, c := range br.getColumns() { + stateSizeIncrease += sup.updateStatsForAllRowsColumn(c, br) } } else { for _, field := range sup.su.fields { c := br.getColumnByName(field) - stateSizeIncrease += sup.updateStatsForAllRowsColumn(&c, br) + stateSizeIncrease += sup.updateStatsForAllRowsColumn(c, br) } } return stateSizeIncrease @@ -110,14 +109,13 @@ func (sup *statsUniqValuesProcessor) updateStatsForAllRowsColumn(c *blockResultC func (sup *statsUniqValuesProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { stateSizeIncrease := 0 if sup.su.containsStar { - columns := br.getColumns() - for i := range columns { - stateSizeIncrease += sup.updateStatsForRowColumn(&columns[i], br, rowIdx) + for _, c := range br.getColumns() { + stateSizeIncrease += sup.updateStatsForRowColumn(c, br, rowIdx) } } else { for _, field := range sup.su.fields { c := br.getColumnByName(field) - stateSizeIncrease += sup.updateStatsForRowColumn(&c, br, rowIdx) + stateSizeIncrease += sup.updateStatsForRowColumn(c, br, rowIdx) } } return stateSizeIncrease diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index 61fdc45f3..4240915e3 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -67,9 +67,7 @@ func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query, brs := getBlockRows() csDst := brs.cs - csSrc := br.getColumns() - for i := range csSrc { - c := &csSrc[i] + for _, c := range br.getColumns() { values := c.getValues(br) csDst = append(csDst, BlockColumn{ Name: c.name,