diff --git a/lib/logstorage/pipe_topk.go b/lib/logstorage/pipe_topk.go index 20e2d6eb0..f92ea86a1 100644 --- a/lib/logstorage/pipe_topk.go +++ b/lib/logstorage/pipe_topk.go @@ -78,6 +78,7 @@ type pipeTopkProcessorShardNopad struct { byColumnValues [][]string otherColumnValues []pipeTopkOtherColumn byColumns []string + byColumnsIsTime []bool otherColumns []Field // stateSizeBudget is the remaining budget for the whole state size for the shard. @@ -86,8 +87,10 @@ type pipeTopkProcessorShardNopad struct { } type pipeTopkRow struct { - byColumns []string - otherColumns []Field + byColumns []string + byColumnsIsTime []bool + otherColumns []Field + timestamp int64 } type pipeTopkOtherColumn struct { @@ -101,6 +104,8 @@ func (r *pipeTopkRow) clone() *pipeTopkRow { byColumnsCopy[i] = strings.Clone(r.byColumns[i]) } + byColumnsIsTime := append([]bool{}, r.byColumnsIsTime...) + otherColumnsCopy := make([]Field, len(r.otherColumns)) for i := range otherColumnsCopy { src := &r.otherColumns[i] @@ -110,8 +115,10 @@ func (r *pipeTopkRow) clone() *pipeTopkRow { } return &pipeTopkRow{ - byColumns: byColumnsCopy, - otherColumns: otherColumnsCopy, + byColumns: byColumnsCopy, + byColumnsIsTime: byColumnsIsTime, + otherColumns: otherColumnsCopy, + timestamp: r.timestamp, } } @@ -123,6 +130,8 @@ func (r *pipeTopkRow) sizeBytes() int { } n += len(r.byColumns) * int(unsafe.Sizeof(r.byColumns[0])) + n += len(r.byColumnsIsTime) * int(unsafe.Sizeof(r.byColumnsIsTime[0])) + for _, f := range r.otherColumns { n += len(f.Name) + len(f.Value) } @@ -170,14 +179,16 @@ func (shard *pipeTopkProcessorShard) writeBlock(br *blockResult) { byColumnValues := shard.byColumnValues[:0] for _, c := range cs { - byColumnValues = append(byColumnValues, c.getValues(br)) + values := c.getValues(br) + byColumnValues = append(byColumnValues, values) } shard.byColumnValues = byColumnValues byColumns := shard.byColumns[:0] + byColumnsIsTime := shard.byColumnsIsTime[:0] otherColumns := shard.otherColumns[:0] bb := bbPool.Get() - for rowIdx := range br.timestamps { + for rowIdx, timestamp := range br.timestamps { byColumns = byColumns[:0] bb.B = bb.B[:0] for i, values := range byColumnValues { @@ -186,6 +197,7 @@ func (shard *pipeTopkProcessorShard) writeBlock(br *blockResult) { bb.B = append(bb.B, ',') } byColumns = append(byColumns, bytesutil.ToUnsafeString(bb.B)) + byColumnsIsTime = append(byColumnsIsTime, false) otherColumns = otherColumns[:0] for i, values := range byColumnValues { @@ -195,20 +207,30 @@ func (shard *pipeTopkProcessorShard) writeBlock(br *blockResult) { }) } - shard.addRow(byColumns, otherColumns) + shard.addRow(byColumns, byColumnsIsTime, otherColumns, timestamp) } bbPool.Put(bb) shard.byColumns = byColumns + shard.byColumnsIsTime = byColumnsIsTime shard.otherColumns = otherColumns } else { // Sort by byFields byColumnValues := shard.byColumnValues[:0] + byColumnsIsTime := shard.byColumnsIsTime[:0] for _, bf := range byFields { c := br.getColumnByName(bf.name) - byColumnValues = append(byColumnValues, c.getValues(br)) + + byColumnsIsTime = append(byColumnsIsTime, c.isTime) + + var values []string + if !c.isTime { + values = c.getValues(br) + } + byColumnValues = append(byColumnValues, values) } shard.byColumnValues = byColumnValues + shard.byColumnsIsTime = byColumnsIsTime otherColumnValues := shard.otherColumnValues[:0] for _, c := range cs { @@ -231,10 +253,15 @@ func (shard *pipeTopkProcessorShard) writeBlock(br *blockResult) { // add rows to shard byColumns := shard.byColumns[:0] otherColumns := shard.otherColumns[:0] - for rowIdx := range br.timestamps { + for rowIdx, timestamp := range br.timestamps { byColumns = byColumns[:0] - for _, values := range byColumnValues { - byColumns = append(byColumns, values[rowIdx]) + + for i, values := range byColumnValues { + v := "" + if !byColumnsIsTime[i] { + v = values[rowIdx] + } + byColumns = append(byColumns, v) } otherColumns = otherColumns[:0] @@ -245,17 +272,19 @@ func (shard *pipeTopkProcessorShard) writeBlock(br *blockResult) { }) } - shard.addRow(byColumns, otherColumns) + shard.addRow(byColumns, byColumnsIsTime, otherColumns, timestamp) } shard.byColumns = byColumns shard.otherColumns = otherColumns } } -func (shard *pipeTopkProcessorShard) addRow(byColumns []string, otherColumns []Field) { +func (shard *pipeTopkProcessorShard) addRow(byColumns []string, byColumnsIsTime []bool, otherColumns []Field, timestamp int64) { r := &shard.tmpRow r.byColumns = byColumns + r.byColumnsIsTime = byColumnsIsTime r.otherColumns = otherColumns + r.timestamp = timestamp rows := shard.rows if len(rows) > 0 && !topkLess(shard.ps, r, rows[0]) { @@ -458,9 +487,15 @@ func (wctx *pipeTopkWriteContext) writeNextRow(shard *pipeTopkProcessorShard) bo wctx.rcs = rcs } + var tmpBuf []byte byColumns := r.byColumns + byColumnsIsTime := r.byColumnsIsTime for i := range byFields { v := byColumns[i] + if byColumnsIsTime[i] { + tmpBuf = marshalTimestampRFC3339NanoString(tmpBuf[:0], r.timestamp) + v = bytesutil.ToUnsafeString(tmpBuf) + } rcs[i].addValue(v) wctx.valuesLen += len(v) } @@ -532,25 +567,59 @@ func topkLess(ps *pipeSort, a, b *pipeTopkRow) bool { byFields := ps.byFields csA := a.byColumns - csB := b.byColumns + isTimeA := a.byColumnsIsTime - for k := range csA { + csB := b.byColumns + isTimeB := b.byColumnsIsTime + + for i := range csA { isDesc := ps.isDesc - if len(byFields) > 0 && byFields[k].isDesc { + if len(byFields) > 0 && byFields[i].isDesc { isDesc = !isDesc } - vA := csA[k] - vB := csB[k] + if isTimeA[i] && isTimeB[i] { + // Fast path - compare timestamps + if a.timestamp == b.timestamp { + continue + } + if isDesc { + return b.timestamp < a.timestamp + } + return a.timestamp < b.timestamp + } + + vA := csA[i] + vB := csB[i] + + var bb *bytesutil.ByteBuffer + + if isTimeA[i] || isTimeB[i] { + bb = bbPool.Get() + } + if isTimeA[i] { + bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], a.timestamp) + vA = bytesutil.ToUnsafeString(bb.B) + } else if isTimeB[i] { + bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], a.timestamp) + vB = bytesutil.ToUnsafeString(bb.B) + } if vA == vB { + if bb != nil { + bbPool.Put(bb) + } continue } if isDesc { - return lessString(vB, vA) + vA, vB = vB, vA } - return lessString(vA, vB) + ok := lessString(vA, vB) + if bb != nil { + bbPool.Put(bb) + } + return ok } return false }