mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 15:16:42 +00:00
wip
This commit is contained in:
parent
e54966456c
commit
a19611336c
1 changed files with 12 additions and 1 deletions
|
@ -399,6 +399,10 @@ func (psp *pipeSortProcessor) flush() error {
|
||||||
sh = append(sh, shard)
|
sh = append(sh, shard)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if len(sh) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
heap.Init(&sh)
|
heap.Init(&sh)
|
||||||
|
|
||||||
wctx := &pipeSortWriteContext{
|
wctx := &pipeSortWriteContext{
|
||||||
|
@ -413,6 +417,13 @@ func (psp *pipeSortProcessor) flush() error {
|
||||||
if shard.rowRefNext >= uint(len(shard.rowRefs)) {
|
if shard.rowRefNext >= uint(len(shard.rowRefs)) {
|
||||||
_ = heap.Pop(&sh)
|
_ = heap.Pop(&sh)
|
||||||
shardNext = nil
|
shardNext = nil
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-psp.stopCh:
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -473,7 +484,7 @@ func (wctx *pipeSortWriteContext) writeRow(shard *pipeSortProcessorShard, rowIdx
|
||||||
// send the current block to bbBase and construct new columns
|
// send the current block to bbBase and construct new columns
|
||||||
wctx.flush()
|
wctx.flush()
|
||||||
|
|
||||||
rcs = rcs[:0]
|
rcs = wctx.rcs[:0]
|
||||||
for _, bf := range byFields {
|
for _, bf := range byFields {
|
||||||
rcs = append(rcs, resultColumn{
|
rcs = append(rcs, resultColumn{
|
||||||
name: bf.name,
|
name: bf.name,
|
||||||
|
|
Loading…
Reference in a new issue