From 97786db05ec6ecd34b984209aa32dc1744c74e36 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 6 May 2024 19:02:47 +0200 Subject: [PATCH] wip --- lib/logstorage/pipe_sort.go | 49 +++++++++++++++++-------------------- 1 file changed, 22 insertions(+), 27 deletions(-) diff --git a/lib/logstorage/pipe_sort.go b/lib/logstorage/pipe_sort.go index d02ed6763..11387c625 100644 --- a/lib/logstorage/pipe_sort.go +++ b/lib/logstorage/pipe_sort.go @@ -286,17 +286,12 @@ func (shard *pipeSortProcessorShard) createUint64Values(values []string) []uint6 u64, ok := tryParseUint64(v) if ok { u64ValuesBuf = append(u64ValuesBuf, u64) + continue } - u32, ok := tryParseIPv4(v) - if ok { - u64ValuesBuf = append(u64ValuesBuf, uint64(u32)) - } - i64, ok := tryParseTimestampRFC3339Nano(v) - if ok { - u64ValuesBuf = append(u64ValuesBuf, uint64(i64)) - } - i64, ok = tryParseDuration(v) - u64ValuesBuf = append(u64ValuesBuf, uint64(i64)) + u32, _ := tryParseIPv4(v) + u64ValuesBuf = append(u64ValuesBuf, uint64(u32)) + // Do not try parsing timestamp and duration, since they may be negative. + // This breaks sorting. } shard.u64ValuesBuf = u64ValuesBuf @@ -433,7 +428,7 @@ func (psp *pipeSortProcessor) flush() error { } } - if sortBlockLess(shardNext, shardNext.rowRefNext, shard, shard.rowRefNext) {d + if sortBlockLess(shardNext, shardNext.rowRefNext, shard, shard.rowRefNext) { heap.Fix(&sh, 0) shardNext = nil @@ -464,8 +459,8 @@ type pipeSortWriteContext struct { } func (wctx *pipeSortWriteContext) writeRow(shard *pipeSortProcessorShard, rowIdx uint) { - rowRef := shard.rowRefs[rowIdx] - block := &shard.blocks[rowRef.blockIdx] + rr := shard.rowRefs[rowIdx] + block := &shard.blocks[rr.blockIdx] byFields := shard.ps.byFields rcs := wctx.rcs @@ -498,12 +493,12 @@ func (wctx *pipeSortWriteContext) writeRow(shard *pipeSortProcessorShard, rowIdx } for i, c := range block.byColumns { - v := c.values[rowRef.rowIdx] + v := c.values[rr.rowIdx] rcs[i].addValue(v) wctx.valuesLen += len(v) } for i, c := range block.otherColumns { - v := c.values[rowRef.rowIdx] + v := c.values[rr.rowIdx] rcs[len(byFields)+i].addValue(v) wctx.valuesLen += len(v) } @@ -566,10 +561,10 @@ func (sh *pipeSortProcessorShardsHeap) Pop() any { func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA uint, shardB *pipeSortProcessorShard, rowIdxB uint) bool { byFields := shardA.ps.byFields - rowRefA := shardA.rowRefs[rowIdxA] - rowRefB := shardB.rowRefs[rowIdxB] - csA := shardA.blocks[rowRefA.blockIdx].byColumns - csB := shardB.blocks[rowRefB.blockIdx].byColumns + rrA := shardA.rowRefs[rowIdxA] + rrB := shardB.rowRefs[rowIdxB] + csA := shardA.blocks[rrA.blockIdx].byColumns + csB := shardB.blocks[rrB.blockIdx].byColumns for idx := range csA { cA := &csA[idx] cB := &csB[idx] @@ -577,8 +572,8 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA uint, shardB *pipeSor if len(cA.timestamps) > 0 && len(cB.timestamps) > 0 { // Fast path - sort by _time - tA := cA.timestamps[rowIdxA] - tB := cB.timestamps[rowIdxB] + tA := cA.timestamps[rrA.rowIdx] + tB := cB.timestamps[rrB.rowIdx] if tA == tB { continue } @@ -589,8 +584,8 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA uint, shardB *pipeSor } // Try sorting by uint64 values at first - uA := cA.u64Values[rowIdxA] - uB := cB.u64Values[rowIdxB] + uA := cA.u64Values[rrA.rowIdx] + uB := cB.u64Values[rrB.rowIdx] if uA != 0 && uB != 0 { if uA == uB { continue @@ -602,8 +597,8 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA uint, shardB *pipeSor } // Try sorting by float64 then - fA := cA.f64Values[rowIdxA] - fB := cB.f64Values[rowIdxB] + fA := cA.f64Values[rrA.rowIdx] + fB := cB.f64Values[rrB.rowIdx] if !math.IsNaN(fA) && !math.IsNaN(fB) { if fA == fB { continue @@ -615,8 +610,8 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA uint, shardB *pipeSor } // Fall back to string sorting - sA := cA.values[rowIdxA] - sB := cB.values[rowIdxB] + sA := cA.values[rrA.rowIdx] + sB := cB.values[rrB.rowIdx] if sA == sB { continue }