This commit is contained in:
Aliaksandr Valialkin 2024-05-06 19:02:47 +02:00
parent 39e0412bfc
commit 97786db05e
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB

View file

@ -286,17 +286,12 @@ func (shard *pipeSortProcessorShard) createUint64Values(values []string) []uint6
u64, ok := tryParseUint64(v)
if ok {
u64ValuesBuf = append(u64ValuesBuf, u64)
continue
}
u32, ok := tryParseIPv4(v)
if ok {
u64ValuesBuf = append(u64ValuesBuf, uint64(u32))
}
i64, ok := tryParseTimestampRFC3339Nano(v)
if ok {
u64ValuesBuf = append(u64ValuesBuf, uint64(i64))
}
i64, ok = tryParseDuration(v)
u64ValuesBuf = append(u64ValuesBuf, uint64(i64))
u32, _ := tryParseIPv4(v)
u64ValuesBuf = append(u64ValuesBuf, uint64(u32))
// Do not try parsing timestamp and duration, since they may be negative.
// This breaks sorting.
}
shard.u64ValuesBuf = u64ValuesBuf
@ -433,7 +428,7 @@ func (psp *pipeSortProcessor) flush() error {
}
}
if sortBlockLess(shardNext, shardNext.rowRefNext, shard, shard.rowRefNext) {d
if sortBlockLess(shardNext, shardNext.rowRefNext, shard, shard.rowRefNext) {
heap.Fix(&sh, 0)
shardNext = nil
@ -464,8 +459,8 @@ type pipeSortWriteContext struct {
}
func (wctx *pipeSortWriteContext) writeRow(shard *pipeSortProcessorShard, rowIdx uint) {
rowRef := shard.rowRefs[rowIdx]
block := &shard.blocks[rowRef.blockIdx]
rr := shard.rowRefs[rowIdx]
block := &shard.blocks[rr.blockIdx]
byFields := shard.ps.byFields
rcs := wctx.rcs
@ -498,12 +493,12 @@ func (wctx *pipeSortWriteContext) writeRow(shard *pipeSortProcessorShard, rowIdx
}
for i, c := range block.byColumns {
v := c.values[rowRef.rowIdx]
v := c.values[rr.rowIdx]
rcs[i].addValue(v)
wctx.valuesLen += len(v)
}
for i, c := range block.otherColumns {
v := c.values[rowRef.rowIdx]
v := c.values[rr.rowIdx]
rcs[len(byFields)+i].addValue(v)
wctx.valuesLen += len(v)
}
@ -566,10 +561,10 @@ func (sh *pipeSortProcessorShardsHeap) Pop() any {
func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA uint, shardB *pipeSortProcessorShard, rowIdxB uint) bool {
byFields := shardA.ps.byFields
rowRefA := shardA.rowRefs[rowIdxA]
rowRefB := shardB.rowRefs[rowIdxB]
csA := shardA.blocks[rowRefA.blockIdx].byColumns
csB := shardB.blocks[rowRefB.blockIdx].byColumns
rrA := shardA.rowRefs[rowIdxA]
rrB := shardB.rowRefs[rowIdxB]
csA := shardA.blocks[rrA.blockIdx].byColumns
csB := shardB.blocks[rrB.blockIdx].byColumns
for idx := range csA {
cA := &csA[idx]
cB := &csB[idx]
@ -577,8 +572,8 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA uint, shardB *pipeSor
if len(cA.timestamps) > 0 && len(cB.timestamps) > 0 {
// Fast path - sort by _time
tA := cA.timestamps[rowIdxA]
tB := cB.timestamps[rowIdxB]
tA := cA.timestamps[rrA.rowIdx]
tB := cB.timestamps[rrB.rowIdx]
if tA == tB {
continue
}
@ -589,8 +584,8 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA uint, shardB *pipeSor
}
// Try sorting by uint64 values at first
uA := cA.u64Values[rowIdxA]
uB := cB.u64Values[rowIdxB]
uA := cA.u64Values[rrA.rowIdx]
uB := cB.u64Values[rrB.rowIdx]
if uA != 0 && uB != 0 {
if uA == uB {
continue
@ -602,8 +597,8 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA uint, shardB *pipeSor
}
// Try sorting by float64 then
fA := cA.f64Values[rowIdxA]
fB := cB.f64Values[rowIdxB]
fA := cA.f64Values[rrA.rowIdx]
fB := cB.f64Values[rrB.rowIdx]
if !math.IsNaN(fA) && !math.IsNaN(fB) {
if fA == fB {
continue
@ -615,8 +610,8 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA uint, shardB *pipeSor
}
// Fall back to string sorting
sA := cA.values[rowIdxA]
sB := cB.values[rowIdxB]
sA := cA.values[rrA.rowIdx]
sB := cB.values[rrB.rowIdx]
if sA == sB {
continue
}