This commit is contained in:
Aliaksandr Valialkin 2024-05-09 13:58:06 +02:00
parent 0519b1a714
commit 5a883be63f
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
5 changed files with 102 additions and 88 deletions

View file

@ -29,14 +29,20 @@ type blockResult struct {
// timestamps contain timestamps for the selected log entries in the block.
timestamps []int64
// csOffset contains cs offset for the requested columns.
// csBufOffset contains csBuf offset for the requested columns.
//
// columns with indexes below csOffset are ignored.
// columns with indexes below csBufOffset are ignored.
// This is needed for simplifying data transformations at pipe stages.
csOffset int
csBufOffset int
// cs contains requested columns.
cs []blockResultColumn
// csBuf contains requested columns.
csBuf []blockResultColumn
// cs contains cached pointers to requested columns returned from getColumns() if csInitialized=true.
cs []*blockResultColumn
// csInitialized is set to true if cs is properly initialized and can be returned from getColumns().
csInitialized bool
}
func (br *blockResult) reset() {
@ -49,10 +55,15 @@ func (br *blockResult) reset() {
br.timestamps = br.timestamps[:0]
br.csOffset = 0
br.csBufOffset = 0
clear(br.csBuf)
br.csBuf = br.csBuf[:0]
clear(br.cs)
br.cs = br.cs[:0]
br.csInitialized = false
}
// clone returns a clone of br, which owns its own memory.
@ -82,7 +93,7 @@ func (br *blockResult) clone() *blockResult {
for i, c := range cs {
csNew[i] = c.clone(brNew)
}
brNew.cs = csNew
brNew.csBuf = csNew
return brNew
}
@ -116,6 +127,7 @@ func (br *blockResult) sizeBytes() int {
n += cap(br.buf)
n += cap(br.valuesBuf) * int(unsafe.Sizeof(br.valuesBuf[0]))
n += cap(br.timestamps) * int(unsafe.Sizeof(br.timestamps[0]))
n += cap(br.csBuf) * int(unsafe.Sizeof(br.csBuf[0]))
n += cap(br.cs) * int(unsafe.Sizeof(br.cs[0]))
return n
@ -132,24 +144,25 @@ func (br *blockResult) setResultColumns(rcs []resultColumn) {
br.timestamps = fastnum.AppendInt64Zeros(br.timestamps[:0], len(rcs[0].values))
cs := br.cs
csBuf := br.csBuf
for _, rc := range rcs {
if areConstValues(rc.values) {
// This optimization allows reducing memory usage after br cloning
cs = append(cs, blockResultColumn{
csBuf = append(csBuf, blockResultColumn{
name: rc.name,
isConst: true,
encodedValues: rc.values[:1],
})
} else {
cs = append(cs, blockResultColumn{
csBuf = append(csBuf, blockResultColumn{
name: rc.name,
valueType: valueTypeString,
encodedValues: rc.values,
})
}
}
br.cs = cs
br.csBuf = csBuf
br.csInitialized = false
}
func (br *blockResult) fetchAllColumns(bs *blockSearch, bm *bitmap) {
@ -349,21 +362,23 @@ func (br *blockResult) addColumn(bs *blockSearch, ch *columnHeader, bm *bitmap)
dictValues = valuesBuf[valuesBufLen:]
name := getCanonicalColumnName(ch.name)
br.cs = append(br.cs, blockResultColumn{
br.csBuf = append(br.csBuf, blockResultColumn{
name: name,
valueType: ch.valueType,
dictValues: dictValues,
encodedValues: encodedValues,
})
br.csInitialized = false
br.buf = buf
br.valuesBuf = valuesBuf
}
func (br *blockResult) addTimeColumn() {
br.cs = append(br.cs, blockResultColumn{
br.csBuf = append(br.csBuf, blockResultColumn{
name: "_time",
isTime: true,
})
br.csInitialized = false
}
func (br *blockResult) addStreamColumn(bs *blockSearch) bool {
@ -401,11 +416,12 @@ func (br *blockResult) addConstColumn(name, value string) {
valuesBuf = append(valuesBuf, s)
br.valuesBuf = valuesBuf
br.cs = append(br.cs, blockResultColumn{
br.csBuf = append(br.csBuf, blockResultColumn{
name: name,
isConst: true,
encodedValues: valuesBuf[valuesBufLen:],
})
br.csInitialized = false
}
func (br *blockResult) getBucketedColumnValues(c *blockResultColumn, bucketSize, bucketOffset float64) []string {
@ -1027,20 +1043,21 @@ func (br *blockResult) copyColumns(srcColumnNames, dstColumnNames []string) {
return
}
cs := br.cs
csOffset := len(cs)
csBuf := br.csBuf
csBufOffset := len(csBuf)
for _, c := range br.getColumns() {
if idx := slices.Index(srcColumnNames, c.name); idx >= 0 {
c.name = dstColumnNames[idx]
cs = append(cs, c)
csBuf = append(csBuf, *c)
// continue is skipped intentionally in order to leave the original column in the columns list.
}
if !slices.Contains(dstColumnNames, c.name) {
cs = append(cs, c)
csBuf = append(csBuf, *c)
}
}
br.csOffset = csOffset
br.cs = cs
br.csBufOffset = csBufOffset
br.csBuf = csBuf
br.csInitialized = false
for _, dstColumnName := range dstColumnNames {
br.createMissingColumnByName(dstColumnName)
@ -1053,20 +1070,21 @@ func (br *blockResult) renameColumns(srcColumnNames, dstColumnNames []string) {
return
}
cs := br.cs
csOffset := len(cs)
csBuf := br.csBuf
csBufOffset := len(csBuf)
for _, c := range br.getColumns() {
if idx := slices.Index(srcColumnNames, c.name); idx >= 0 {
c.name = dstColumnNames[idx]
cs = append(cs, c)
csBuf = append(csBuf, *c)
continue
}
if !slices.Contains(dstColumnNames, c.name) {
cs = append(cs, c)
csBuf = append(csBuf, *c)
}
}
br.csOffset = csOffset
br.cs = cs
br.csBufOffset = csBufOffset
br.csBuf = csBuf
br.csInitialized = false
for _, dstColumnName := range dstColumnNames {
br.createMissingColumnByName(dstColumnName)
@ -1079,15 +1097,16 @@ func (br *blockResult) deleteColumns(columnNames []string) {
return
}
cs := br.cs
csOffset := len(cs)
csBuf := br.csBuf
csBufOffset := len(csBuf)
for _, c := range br.getColumns() {
if !slices.Contains(columnNames, c.name) {
cs = append(cs, c)
csBuf = append(csBuf, *c)
}
}
br.csOffset = csOffset
br.cs = cs
br.csBufOffset = csBufOffset
br.csBuf = csBuf
br.csInitialized = false
}
// setColumns sets the resulting columns to the given columnNames.
@ -1098,14 +1117,15 @@ func (br *blockResult) setColumns(columnNames []string) {
}
// Slow path - construct the requested columns
cs := br.cs
csOffset := len(cs)
csBuf := br.csBuf
csBufOffset := len(csBuf)
for _, columnName := range columnNames {
c := br.getColumnByName(columnName)
cs = append(cs, c)
csBuf = append(csBuf, *c)
}
br.csOffset = csOffset
br.cs = cs
br.csBufOffset = csBufOffset
br.csBuf = csBuf
br.csInitialized = false
}
func (br *blockResult) areSameColumns(columnNames []string) bool {
@ -1113,53 +1133,55 @@ func (br *blockResult) areSameColumns(columnNames []string) bool {
if len(cs) != len(columnNames) {
return false
}
for i := range cs {
if cs[i].name != columnNames[i] {
for i, c := range cs {
if c.name != columnNames[i] {
return false
}
}
return true
}
func (br *blockResult) getColumnByName(columnName string) blockResultColumn {
cs := br.getColumns()
for i := range cs {
if cs[i].name == columnName {
return cs[i]
func (br *blockResult) getColumnByName(columnName string) *blockResultColumn {
for _, c := range br.getColumns() {
if c.name == columnName {
return c
}
}
return blockResultColumn{
name: columnName,
isConst: true,
encodedValues: getEmptyStrings(1),
}
br.addConstColumn(columnName, "")
return &br.csBuf[len(br.csBuf)-1]
}
func (br *blockResult) createMissingColumnByName(columnName string) {
cs := br.getColumns()
for i := range cs {
if cs[i].name == columnName {
for _, c := range br.getColumns() {
if c.name == columnName {
return
}
}
br.cs = append(br.cs, blockResultColumn{
name: columnName,
isConst: true,
encodedValues: getEmptyStrings(1),
})
br.addConstColumn(columnName, "")
}
func (br *blockResult) getColumns() []blockResultColumn {
return br.cs[br.csOffset:]
func (br *blockResult) getColumns() []*blockResultColumn {
if br.csInitialized {
return br.cs
}
csBuf := br.csBuf[br.csBufOffset:]
clear(br.cs)
cs := br.cs[:0]
for i := range csBuf {
cs = append(cs, &csBuf[i])
}
br.cs = cs
br.csInitialized = true
return br.cs
}
func (br *blockResult) skipRows(skipRows int) {
br.timestamps = append(br.timestamps[:0], br.timestamps[skipRows:]...)
cs := br.getColumns()
for i := range cs {
c := &cs[i]
for _, c := range br.getColumns() {
if c.values != nil {
c.values = append(c.values[:0], c.values[skipRows:]...)
}
@ -1174,9 +1196,7 @@ func (br *blockResult) skipRows(skipRows int) {
func (br *blockResult) truncateRows(keepRows int) {
br.timestamps = br.timestamps[:keepRows]
cs := br.getColumns()
for i := range cs {
c := &cs[i]
for _, c := range br.getColumns() {
if c.values != nil {
c.values = c.values[:keepRows]
}

View file

@ -127,13 +127,13 @@ type sortBlock struct {
byColumns []sortBlockByColumn
// otherColumns refers block data for other than 'by(...)' columns
otherColumns []blockResultColumn
otherColumns []*blockResultColumn
}
// sortBlockByColumn represents data for a single column from 'sort by(...)' clause.
type sortBlockByColumn struct {
// c contains column data
c blockResultColumn
c *blockResultColumn
// i64Values contains int64 numbers parsed from values
i64Values []int64
@ -182,8 +182,7 @@ func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) {
// JSON-encode all the columns per each row into a single string
// and sort rows by the resulting string.
bb.B = bb.B[:0]
for j := range cs {
c := &cs[j]
for _, c := range cs {
v := c.getValueAtRow(br, i)
bb.B = marshalJSONKeyValue(bb.B, c.name, v)
bb.B = append(bb.B, ',')
@ -193,7 +192,7 @@ func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) {
bbPool.Put(bb)
byColumns := []sortBlockByColumn{
{
c: blockResultColumn{
c: &blockResultColumn{
valueType: valueTypeString,
encodedValues: rc.values,
},
@ -201,7 +200,7 @@ func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) {
f64Values: make([]float64, len(br.timestamps)),
},
}
shard.stateSizeBudget -= int(unsafe.Sizeof(byColumns[0]))
shard.stateSizeBudget -= int(unsafe.Sizeof(byColumns[0]) + unsafe.Sizeof(*byColumns[0].c))
// Append br to shard.blocks.
shard.blocks = append(shard.blocks, sortBlock{
@ -236,7 +235,7 @@ func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) {
shard.stateSizeBudget -= len(byColumns) * int(unsafe.Sizeof(byColumns[0]))
// Collect values for other columns.
otherColumns := make([]blockResultColumn, 0, len(cs))
otherColumns := make([]*blockResultColumn, 0, len(cs))
for _, c := range cs {
isByField := false
for _, bf := range byFields {
@ -494,9 +493,8 @@ func (wctx *pipeSortWriteContext) writeRow(shard *pipeSortProcessorShard, rowIdx
wctx.valuesLen += len(v)
}
otherColumns := b.otherColumns
for i := range otherColumns {
v := otherColumns[i].getValueAtRow(br, rr.rowIdx)
for i, c := range b.otherColumns {
v := c.getValueAtRow(br, rr.rowIdx)
rcs[len(byFields)+i].addValue(v)
wctx.valuesLen += len(v)
}

View file

@ -47,11 +47,11 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int {
stateSizeIncrease := 0
if sup.su.containsStar {
// Count unique rows
columns := br.getColumns()
cs := br.getColumns()
keyBuf := sup.keyBuf[:0]
for i := range br.timestamps {
seenKey := true
for _, c := range columns {
for _, c := range cs {
values := c.getValues(br)
if i == 0 || values[i-1] != values[i] {
seenKey = false
@ -65,7 +65,7 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int {
allEmptyValues := true
keyBuf = keyBuf[:0]
for _, c := range columns {
for _, c := range cs {
v := c.getValueAtRow(br, i)
if v != "" {
allEmptyValues = false

View file

@ -41,14 +41,13 @@ type statsUniqValuesProcessor struct {
func (sup *statsUniqValuesProcessor) updateStatsForAllRows(br *blockResult) int {
stateSizeIncrease := 0
if sup.su.containsStar {
columns := br.getColumns()
for i := range columns {
stateSizeIncrease += sup.updateStatsForAllRowsColumn(&columns[i], br)
for _, c := range br.getColumns() {
stateSizeIncrease += sup.updateStatsForAllRowsColumn(c, br)
}
} else {
for _, field := range sup.su.fields {
c := br.getColumnByName(field)
stateSizeIncrease += sup.updateStatsForAllRowsColumn(&c, br)
stateSizeIncrease += sup.updateStatsForAllRowsColumn(c, br)
}
}
return stateSizeIncrease
@ -110,14 +109,13 @@ func (sup *statsUniqValuesProcessor) updateStatsForAllRowsColumn(c *blockResultC
func (sup *statsUniqValuesProcessor) updateStatsForRow(br *blockResult, rowIdx int) int {
stateSizeIncrease := 0
if sup.su.containsStar {
columns := br.getColumns()
for i := range columns {
stateSizeIncrease += sup.updateStatsForRowColumn(&columns[i], br, rowIdx)
for _, c := range br.getColumns() {
stateSizeIncrease += sup.updateStatsForRowColumn(c, br, rowIdx)
}
} else {
for _, field := range sup.su.fields {
c := br.getColumnByName(field)
stateSizeIncrease += sup.updateStatsForRowColumn(&c, br, rowIdx)
stateSizeIncrease += sup.updateStatsForRowColumn(c, br, rowIdx)
}
}
return stateSizeIncrease

View file

@ -67,9 +67,7 @@ func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query,
brs := getBlockRows()
csDst := brs.cs
csSrc := br.getColumns()
for i := range csSrc {
c := &csSrc[i]
for _, c := range br.getColumns() {
values := c.getValues(br)
csDst = append(csDst, BlockColumn{
Name: c.name,