mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-31 15:06:26 +00:00
wip
This commit is contained in:
parent
9f33220e51
commit
866c070f32
1 changed files with 37 additions and 22 deletions
|
@ -188,17 +188,16 @@ func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) {
|
|||
byColumnsBufLen := len(byColumnsBuf)
|
||||
for _, bf := range byFields {
|
||||
c := br.getColumnByName(bf.name)
|
||||
values := c.getValues(br)
|
||||
values = shard.copyValues(values)
|
||||
u64Values := shard.createUint64Values(values)
|
||||
f64Values := shard.createFloat64Values(values)
|
||||
timestamps := shard.createTimestampsIfNeeded(br.timestamps, c.isTime)
|
||||
byColumnsBuf = append(byColumnsBuf, sortBlockByColumn{
|
||||
values: values,
|
||||
u64Values: u64Values,
|
||||
f64Values: f64Values,
|
||||
timestamps: timestamps,
|
||||
})
|
||||
byColumnsBuf = append(byColumnsBuf, sortBlockByColumn{})
|
||||
bc := &byColumnsBuf[len(byColumnsBuf)-1]
|
||||
if c.isTime {
|
||||
bc.timestamps = shard.copyTimestamps(br.timestamps)
|
||||
} else {
|
||||
values := c.getValues(br)
|
||||
bc.values = shard.copyValues(values)
|
||||
bc.u64Values = shard.createUint64Values(values)
|
||||
bc.f64Values = shard.createFloat64Values(values)
|
||||
}
|
||||
}
|
||||
shard.byColumnsBuf = byColumnsBuf
|
||||
byColumns := byColumnsBuf[byColumnsBufLen:]
|
||||
|
@ -312,11 +311,7 @@ func (shard *pipeSortProcessorShard) createFloat64Values(values []string) []floa
|
|||
return f64ValuesBuf[f64ValuesBufLen:]
|
||||
}
|
||||
|
||||
func (shard *pipeSortProcessorShard) createTimestampsIfNeeded(timestamps []int64, isTime bool) []int64 {
|
||||
if !isTime {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (shard *pipeSortProcessorShard) copyTimestamps(timestamps []int64) []int64 {
|
||||
timestampsBuf := shard.timestampsBuf
|
||||
timestampsBufLen := len(shard.timestampsBuf)
|
||||
timestampsBuf = append(timestampsBuf, timestamps...)
|
||||
|
@ -461,9 +456,12 @@ func (psp *pipeSortProcessor) flush() error {
|
|||
}
|
||||
|
||||
type pipeSortWriteContext struct {
|
||||
psp *pipeSortProcessor
|
||||
rcs []resultColumn
|
||||
br blockResult
|
||||
psp *pipeSortProcessor
|
||||
rcs []resultColumn
|
||||
br blockResult
|
||||
|
||||
auxBuf []byte
|
||||
|
||||
valuesLen int
|
||||
}
|
||||
|
||||
|
@ -502,11 +500,20 @@ func (wctx *pipeSortWriteContext) writeRow(shard *pipeSortProcessorShard, rowIdx
|
|||
}
|
||||
|
||||
byColumns := b.byColumns
|
||||
auxBuf := wctx.auxBuf
|
||||
for i := range byFields {
|
||||
v := byColumns[i].values[rr.rowIdx]
|
||||
rcs[i].addValue(v)
|
||||
wctx.valuesLen += len(v)
|
||||
bc := &byColumns[i]
|
||||
if len(bc.timestamps) > 0 {
|
||||
auxBuf = marshalTimestampISO8601(auxBuf[:0], bc.timestamps[rr.rowIdx])
|
||||
rcs[i].addValue(bytesutil.ToUnsafeString(auxBuf))
|
||||
wctx.valuesLen += len(auxBuf)
|
||||
} else {
|
||||
v := bc.values[rr.rowIdx]
|
||||
rcs[i].addValue(v)
|
||||
wctx.valuesLen += len(v)
|
||||
}
|
||||
}
|
||||
wctx.auxBuf = auxBuf
|
||||
|
||||
otherColumns := b.otherColumns
|
||||
for i := range otherColumns {
|
||||
|
@ -594,6 +601,14 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA uint, shardB *pipeSor
|
|||
}
|
||||
return tA < tB
|
||||
}
|
||||
if len(cA.timestamps) > 0 {
|
||||
// treat timestamps as smaller than other values
|
||||
return true
|
||||
}
|
||||
if len(cB.timestamps) > 0 {
|
||||
// treat timestamps as smaller than other values
|
||||
return false
|
||||
}
|
||||
|
||||
// Try sorting by uint64 values at first
|
||||
uA := cA.u64Values[rrA.rowIdx]
|
||||
|
|
Loading…
Reference in a new issue