This commit is contained in:
Aliaksandr Valialkin 2024-05-17 15:32:00 +02:00
parent 306d6e4f6d
commit c5ced867dc
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB

View file

@ -78,6 +78,7 @@ type pipeTopkProcessorShardNopad struct {
byColumnValues [][]string
otherColumnValues []pipeTopkOtherColumn
byColumns []string
byColumnsIsTime []bool
otherColumns []Field
// stateSizeBudget is the remaining budget for the whole state size for the shard.
@ -87,7 +88,9 @@ type pipeTopkProcessorShardNopad struct {
type pipeTopkRow struct {
byColumns []string
byColumnsIsTime []bool
otherColumns []Field
timestamp int64
}
type pipeTopkOtherColumn struct {
@ -101,6 +104,8 @@ func (r *pipeTopkRow) clone() *pipeTopkRow {
byColumnsCopy[i] = strings.Clone(r.byColumns[i])
}
byColumnsIsTime := append([]bool{}, r.byColumnsIsTime...)
otherColumnsCopy := make([]Field, len(r.otherColumns))
for i := range otherColumnsCopy {
src := &r.otherColumns[i]
@ -111,7 +116,9 @@ func (r *pipeTopkRow) clone() *pipeTopkRow {
return &pipeTopkRow{
byColumns: byColumnsCopy,
byColumnsIsTime: byColumnsIsTime,
otherColumns: otherColumnsCopy,
timestamp: r.timestamp,
}
}
@ -123,6 +130,8 @@ func (r *pipeTopkRow) sizeBytes() int {
}
n += len(r.byColumns) * int(unsafe.Sizeof(r.byColumns[0]))
n += len(r.byColumnsIsTime) * int(unsafe.Sizeof(r.byColumnsIsTime[0]))
for _, f := range r.otherColumns {
n += len(f.Name) + len(f.Value)
}
@ -170,14 +179,16 @@ func (shard *pipeTopkProcessorShard) writeBlock(br *blockResult) {
byColumnValues := shard.byColumnValues[:0]
for _, c := range cs {
byColumnValues = append(byColumnValues, c.getValues(br))
values := c.getValues(br)
byColumnValues = append(byColumnValues, values)
}
shard.byColumnValues = byColumnValues
byColumns := shard.byColumns[:0]
byColumnsIsTime := shard.byColumnsIsTime[:0]
otherColumns := shard.otherColumns[:0]
bb := bbPool.Get()
for rowIdx := range br.timestamps {
for rowIdx, timestamp := range br.timestamps {
byColumns = byColumns[:0]
bb.B = bb.B[:0]
for i, values := range byColumnValues {
@ -186,6 +197,7 @@ func (shard *pipeTopkProcessorShard) writeBlock(br *blockResult) {
bb.B = append(bb.B, ',')
}
byColumns = append(byColumns, bytesutil.ToUnsafeString(bb.B))
byColumnsIsTime = append(byColumnsIsTime, false)
otherColumns = otherColumns[:0]
for i, values := range byColumnValues {
@ -195,20 +207,30 @@ func (shard *pipeTopkProcessorShard) writeBlock(br *blockResult) {
})
}
shard.addRow(byColumns, otherColumns)
shard.addRow(byColumns, byColumnsIsTime, otherColumns, timestamp)
}
bbPool.Put(bb)
shard.byColumns = byColumns
shard.byColumnsIsTime = byColumnsIsTime
shard.otherColumns = otherColumns
} else {
// Sort by byFields
byColumnValues := shard.byColumnValues[:0]
byColumnsIsTime := shard.byColumnsIsTime[:0]
for _, bf := range byFields {
c := br.getColumnByName(bf.name)
byColumnValues = append(byColumnValues, c.getValues(br))
byColumnsIsTime = append(byColumnsIsTime, c.isTime)
var values []string
if !c.isTime {
values = c.getValues(br)
}
byColumnValues = append(byColumnValues, values)
}
shard.byColumnValues = byColumnValues
shard.byColumnsIsTime = byColumnsIsTime
otherColumnValues := shard.otherColumnValues[:0]
for _, c := range cs {
@ -231,10 +253,15 @@ func (shard *pipeTopkProcessorShard) writeBlock(br *blockResult) {
// add rows to shard
byColumns := shard.byColumns[:0]
otherColumns := shard.otherColumns[:0]
for rowIdx := range br.timestamps {
for rowIdx, timestamp := range br.timestamps {
byColumns = byColumns[:0]
for _, values := range byColumnValues {
byColumns = append(byColumns, values[rowIdx])
for i, values := range byColumnValues {
v := ""
if !byColumnsIsTime[i] {
v = values[rowIdx]
}
byColumns = append(byColumns, v)
}
otherColumns = otherColumns[:0]
@ -245,17 +272,19 @@ func (shard *pipeTopkProcessorShard) writeBlock(br *blockResult) {
})
}
shard.addRow(byColumns, otherColumns)
shard.addRow(byColumns, byColumnsIsTime, otherColumns, timestamp)
}
shard.byColumns = byColumns
shard.otherColumns = otherColumns
}
}
func (shard *pipeTopkProcessorShard) addRow(byColumns []string, otherColumns []Field) {
func (shard *pipeTopkProcessorShard) addRow(byColumns []string, byColumnsIsTime []bool, otherColumns []Field, timestamp int64) {
r := &shard.tmpRow
r.byColumns = byColumns
r.byColumnsIsTime = byColumnsIsTime
r.otherColumns = otherColumns
r.timestamp = timestamp
rows := shard.rows
if len(rows) > 0 && !topkLess(shard.ps, r, rows[0]) {
@ -458,9 +487,15 @@ func (wctx *pipeTopkWriteContext) writeNextRow(shard *pipeTopkProcessorShard) bo
wctx.rcs = rcs
}
var tmpBuf []byte
byColumns := r.byColumns
byColumnsIsTime := r.byColumnsIsTime
for i := range byFields {
v := byColumns[i]
if byColumnsIsTime[i] {
tmpBuf = marshalTimestampRFC3339NanoString(tmpBuf[:0], r.timestamp)
v = bytesutil.ToUnsafeString(tmpBuf)
}
rcs[i].addValue(v)
wctx.valuesLen += len(v)
}
@ -532,25 +567,59 @@ func topkLess(ps *pipeSort, a, b *pipeTopkRow) bool {
byFields := ps.byFields
csA := a.byColumns
csB := b.byColumns
isTimeA := a.byColumnsIsTime
for k := range csA {
csB := b.byColumns
isTimeB := b.byColumnsIsTime
for i := range csA {
isDesc := ps.isDesc
if len(byFields) > 0 && byFields[k].isDesc {
if len(byFields) > 0 && byFields[i].isDesc {
isDesc = !isDesc
}
vA := csA[k]
vB := csB[k]
if isTimeA[i] && isTimeB[i] {
// Fast path - compare timestamps
if a.timestamp == b.timestamp {
continue
}
if isDesc {
return b.timestamp < a.timestamp
}
return a.timestamp < b.timestamp
}
vA := csA[i]
vB := csB[i]
var bb *bytesutil.ByteBuffer
if isTimeA[i] || isTimeB[i] {
bb = bbPool.Get()
}
if isTimeA[i] {
bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], a.timestamp)
vA = bytesutil.ToUnsafeString(bb.B)
} else if isTimeB[i] {
bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], a.timestamp)
vB = bytesutil.ToUnsafeString(bb.B)
}
if vA == vB {
if bb != nil {
bbPool.Put(bb)
}
continue
}
if isDesc {
return lessString(vB, vA)
vA, vB = vB, vA
}
return lessString(vA, vB)
ok := lessString(vA, vB)
if bb != nil {
bbPool.Put(bb)
}
return ok
}
return false
}