diff --git a/lib/logstorage/pipe_sort.go b/lib/logstorage/pipe_sort.go index 5e1a95cc8..4e7995c3e 100644 --- a/lib/logstorage/pipe_sort.go +++ b/lib/logstorage/pipe_sort.go @@ -188,17 +188,16 @@ func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) { byColumnsBufLen := len(byColumnsBuf) for _, bf := range byFields { c := br.getColumnByName(bf.name) - values := c.getValues(br) - values = shard.copyValues(values) - u64Values := shard.createUint64Values(values) - f64Values := shard.createFloat64Values(values) - timestamps := shard.createTimestampsIfNeeded(br.timestamps, c.isTime) - byColumnsBuf = append(byColumnsBuf, sortBlockByColumn{ - values: values, - u64Values: u64Values, - f64Values: f64Values, - timestamps: timestamps, - }) + byColumnsBuf = append(byColumnsBuf, sortBlockByColumn{}) + bc := &byColumnsBuf[len(byColumnsBuf)-1] + if c.isTime { + bc.timestamps = shard.copyTimestamps(br.timestamps) + } else { + values := c.getValues(br) + bc.values = shard.copyValues(values) + bc.u64Values = shard.createUint64Values(values) + bc.f64Values = shard.createFloat64Values(values) + } } shard.byColumnsBuf = byColumnsBuf byColumns := byColumnsBuf[byColumnsBufLen:] @@ -312,11 +311,7 @@ func (shard *pipeSortProcessorShard) createFloat64Values(values []string) []floa return f64ValuesBuf[f64ValuesBufLen:] } -func (shard *pipeSortProcessorShard) createTimestampsIfNeeded(timestamps []int64, isTime bool) []int64 { - if !isTime { - return nil - } - +func (shard *pipeSortProcessorShard) copyTimestamps(timestamps []int64) []int64 { timestampsBuf := shard.timestampsBuf timestampsBufLen := len(shard.timestampsBuf) timestampsBuf = append(timestampsBuf, timestamps...) @@ -461,9 +456,12 @@ func (psp *pipeSortProcessor) flush() error { } type pipeSortWriteContext struct { - psp *pipeSortProcessor - rcs []resultColumn - br blockResult + psp *pipeSortProcessor + rcs []resultColumn + br blockResult + + auxBuf []byte + valuesLen int } @@ -502,11 +500,20 @@ func (wctx *pipeSortWriteContext) writeRow(shard *pipeSortProcessorShard, rowIdx } byColumns := b.byColumns + auxBuf := wctx.auxBuf for i := range byFields { - v := byColumns[i].values[rr.rowIdx] - rcs[i].addValue(v) - wctx.valuesLen += len(v) + bc := &byColumns[i] + if len(bc.timestamps) > 0 { + auxBuf = marshalTimestampISO8601(auxBuf[:0], bc.timestamps[rr.rowIdx]) + rcs[i].addValue(bytesutil.ToUnsafeString(auxBuf)) + wctx.valuesLen += len(auxBuf) + } else { + v := bc.values[rr.rowIdx] + rcs[i].addValue(v) + wctx.valuesLen += len(v) + } } + wctx.auxBuf = auxBuf otherColumns := b.otherColumns for i := range otherColumns { @@ -594,6 +601,14 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA uint, shardB *pipeSor } return tA < tB } + if len(cA.timestamps) > 0 { + // treat timestamps as smaller than other values + return true + } + if len(cB.timestamps) > 0 { + // treat timestamps as smaller than other values + return false + } // Try sorting by uint64 values at first uA := cA.u64Values[rrA.rowIdx]