mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 15:16:42 +00:00
wip
This commit is contained in:
parent
f0d8284c8a
commit
b1df5ce183
2 changed files with 209 additions and 161 deletions
|
@ -4,6 +4,7 @@ import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"math"
|
"math"
|
||||||
"slices"
|
"slices"
|
||||||
|
"unsafe"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
||||||
|
@ -54,6 +55,68 @@ func (br *blockResult) reset() {
|
||||||
br.cs = br.cs[:0]
|
br.cs = br.cs[:0]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// clone returns a clone of br, which owns its own memory.
|
||||||
|
func (br *blockResult) clone() *blockResult {
|
||||||
|
brNew := &blockResult{}
|
||||||
|
|
||||||
|
cs := br.getColumns()
|
||||||
|
|
||||||
|
bufLen := 0
|
||||||
|
for _, c := range cs {
|
||||||
|
bufLen += c.neededBackingBufLen()
|
||||||
|
}
|
||||||
|
brNew.buf = make([]byte, 0, bufLen)
|
||||||
|
|
||||||
|
valuesBufLen := 0
|
||||||
|
for _, c := range cs {
|
||||||
|
valuesBufLen += c.neededBackingValuesBufLen()
|
||||||
|
}
|
||||||
|
brNew.valuesBuf = make([]string, 0, valuesBufLen)
|
||||||
|
|
||||||
|
brNew.streamID = br.streamID
|
||||||
|
|
||||||
|
brNew.timestamps = make([]int64, len(br.timestamps))
|
||||||
|
copy(brNew.timestamps, br.timestamps)
|
||||||
|
|
||||||
|
csNew := make([]blockResultColumn, len(cs))
|
||||||
|
for i, c := range cs {
|
||||||
|
csNew[i] = c.clone(brNew)
|
||||||
|
}
|
||||||
|
brNew.cs = csNew
|
||||||
|
|
||||||
|
return brNew
|
||||||
|
}
|
||||||
|
|
||||||
|
// cloneValues clones the given values into br and returns the cloned values.
|
||||||
|
func (br *blockResult) cloneValues(values []string) []string {
|
||||||
|
buf := br.buf
|
||||||
|
valuesBuf := br.valuesBuf
|
||||||
|
valuesBufLen := len(valuesBuf)
|
||||||
|
|
||||||
|
for _, v := range values {
|
||||||
|
bufLen := len(buf)
|
||||||
|
buf = append(buf, v...)
|
||||||
|
valuesBuf = append(valuesBuf, bytesutil.ToUnsafeString(buf[bufLen:]))
|
||||||
|
}
|
||||||
|
|
||||||
|
br.valuesBuf = valuesBuf
|
||||||
|
br.buf = buf
|
||||||
|
|
||||||
|
return valuesBuf[valuesBufLen:]
|
||||||
|
}
|
||||||
|
|
||||||
|
// sizeBytes returns the size of br in bytes.
|
||||||
|
func (br *blockResult) sizeBytes() int {
|
||||||
|
n := int(unsafe.Sizeof(*br))
|
||||||
|
|
||||||
|
n += cap(br.buf)
|
||||||
|
n += cap(br.valuesBuf) * int(unsafe.Sizeof(br.valuesBuf[0]))
|
||||||
|
n += cap(br.timestamps) * int(unsafe.Sizeof(br.timestamps[0]))
|
||||||
|
n += cap(br.cs) * int(unsafe.Sizeof(br.cs[0]))
|
||||||
|
|
||||||
|
return n
|
||||||
|
}
|
||||||
|
|
||||||
// setResultColumns sets the given rcs as br columns.
|
// setResultColumns sets the given rcs as br columns.
|
||||||
//
|
//
|
||||||
// The returned result is valid only until rcs are modified.
|
// The returned result is valid only until rcs are modified.
|
||||||
|
@ -67,12 +130,21 @@ func (br *blockResult) setResultColumns(rcs []resultColumn) {
|
||||||
|
|
||||||
cs := br.cs
|
cs := br.cs
|
||||||
for _, rc := range rcs {
|
for _, rc := range rcs {
|
||||||
|
if areConstValues(rc.values) {
|
||||||
|
// This optimization allows reducing memory usage after br cloning
|
||||||
|
cs = append(cs, blockResultColumn{
|
||||||
|
name: rc.name,
|
||||||
|
isConst: true,
|
||||||
|
encodedValues: rc.values[:1],
|
||||||
|
})
|
||||||
|
} else {
|
||||||
cs = append(cs, blockResultColumn{
|
cs = append(cs, blockResultColumn{
|
||||||
name: rc.name,
|
name: rc.name,
|
||||||
valueType: valueTypeString,
|
valueType: valueTypeString,
|
||||||
encodedValues: rc.values,
|
encodedValues: rc.values,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
}
|
||||||
br.cs = cs
|
br.cs = cs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -940,13 +1012,6 @@ func (br *blockResult) getBucketedValue(s string, bucketSize, bucketOffset float
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
func (br *blockResult) addEmptyStringColumn(columnName string) {
|
|
||||||
br.cs = append(br.cs, blockResultColumn{
|
|
||||||
name: columnName,
|
|
||||||
valueType: valueTypeString,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// copyColumns copies columns from srcColumnNames to dstColumnNames.
|
// copyColumns copies columns from srcColumnNames to dstColumnNames.
|
||||||
func (br *blockResult) copyColumns(srcColumnNames, dstColumnNames []string) {
|
func (br *blockResult) copyColumns(srcColumnNames, dstColumnNames []string) {
|
||||||
if len(srcColumnNames) == 0 {
|
if len(srcColumnNames) == 0 {
|
||||||
|
@ -1155,6 +1220,51 @@ type blockResultColumn struct {
|
||||||
bucketOffset float64
|
bucketOffset float64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// clone returns a clone of c backed by data from br.
|
||||||
|
func (c *blockResultColumn) clone(br *blockResult) blockResultColumn {
|
||||||
|
var cNew blockResultColumn
|
||||||
|
|
||||||
|
cNew.name = c.name
|
||||||
|
cNew.isConst = c.isConst
|
||||||
|
cNew.isTime = c.isTime
|
||||||
|
cNew.valueType = c.valueType
|
||||||
|
cNew.dictValues = br.cloneValues(c.dictValues)
|
||||||
|
cNew.encodedValues = br.cloneValues(c.encodedValues)
|
||||||
|
// do not copy c.values and c.bucketedValues - they should be re-created from scrach if needed
|
||||||
|
cNew.bucketSize = c.bucketSize
|
||||||
|
cNew.bucketOffset = c.bucketOffset
|
||||||
|
|
||||||
|
return cNew
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *blockResultColumn) neededBackingBufLen() int {
|
||||||
|
n := 0
|
||||||
|
|
||||||
|
n += valuesSizeBytes(c.dictValues)
|
||||||
|
n += valuesSizeBytes(c.encodedValues)
|
||||||
|
// do not take into account c.values and c.bucketedValues, since they should be re-created from scratch if needed
|
||||||
|
|
||||||
|
return n
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *blockResultColumn) neededBackingValuesBufLen() int {
|
||||||
|
n := 0
|
||||||
|
|
||||||
|
n += len(c.dictValues)
|
||||||
|
n += len(c.encodedValues)
|
||||||
|
// do not take into account c.values and c.bucketedValues, since they should be re-created from scratch if needed
|
||||||
|
|
||||||
|
return n
|
||||||
|
}
|
||||||
|
|
||||||
|
func valuesSizeBytes(values []string) int {
|
||||||
|
n := 0
|
||||||
|
for _, v := range values {
|
||||||
|
n += len(v)
|
||||||
|
}
|
||||||
|
return n
|
||||||
|
}
|
||||||
|
|
||||||
// getValueAtRow returns value for the value at the given rowIdx.
|
// getValueAtRow returns value for the value at the given rowIdx.
|
||||||
//
|
//
|
||||||
// The returned value is valid until br.reset() is called.
|
// The returned value is valid until br.reset() is called.
|
||||||
|
|
|
@ -10,7 +10,6 @@ import (
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"unsafe"
|
"unsafe"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||||
)
|
)
|
||||||
|
@ -49,7 +48,7 @@ func (ps *pipeSort) getNeededFields() ([]string, map[string][]string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ps *pipeSort) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor {
|
func (ps *pipeSort) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor {
|
||||||
maxStateSize := int64(float64(memory.Allowed()) * 0.1)
|
maxStateSize := int64(float64(memory.Allowed()) * 0.2)
|
||||||
|
|
||||||
shards := make([]pipeSortProcessorShard, workersCount)
|
shards := make([]pipeSortProcessorShard, workersCount)
|
||||||
for i := range shards {
|
for i := range shards {
|
||||||
|
@ -97,40 +96,22 @@ type pipeSortProcessorShardNopad struct {
|
||||||
// ps point to the parent pipeSort.
|
// ps point to the parent pipeSort.
|
||||||
ps *pipeSort
|
ps *pipeSort
|
||||||
|
|
||||||
// buf holds all the logs data written to the given shard.
|
// u64ValuesBuf holds uint64 values parsed from values for speeding up the sorting.
|
||||||
buf []byte
|
|
||||||
|
|
||||||
// valuesBuf holds all the string values written to the given shard
|
|
||||||
// The actual strings are stored in buf.
|
|
||||||
valuesBuf []string
|
|
||||||
|
|
||||||
// u64ValuesBuf holds uint64 values parsed from valuesBuf for speeding up the sorting.
|
|
||||||
u64ValuesBuf []uint64
|
u64ValuesBuf []uint64
|
||||||
|
|
||||||
// f64ValuesBuf holds float64 values parsed from valuesBuf for speeding up the sorting.
|
// f64ValuesBuf holds float64 values parsed from values for speeding up the sorting.
|
||||||
f64ValuesBuf []float64
|
f64ValuesBuf []float64
|
||||||
|
|
||||||
// timestampsBuf holds timestamps if _time columns are used for sorting.
|
|
||||||
// This speeds up sorting by _time.
|
|
||||||
timestampsBuf []int64
|
|
||||||
|
|
||||||
// byColumnsBuf holds `by(...)` columns written to the shard.
|
|
||||||
byColumnsBuf []sortBlockByColumn
|
|
||||||
|
|
||||||
// otherColumnsBuf holds other than `by(...)` columns written to the shard.
|
|
||||||
otherColumnsBuf []sortBlockOtherColumn
|
|
||||||
|
|
||||||
// blocks holds all the blocks with logs written to the shard.
|
// blocks holds all the blocks with logs written to the shard.
|
||||||
blocks []sortBlock
|
blocks []sortBlock
|
||||||
|
|
||||||
// rowRefs holds references to all the rows stored in blocks.
|
// rowRefs holds references to all the rows stored in blocks.
|
||||||
//
|
//
|
||||||
// Sorting sorts rowRefs, while blocks remain unchanged.
|
// Sorting sorts rowRefs, while blocks remain unchanged. This should speed up sorting.
|
||||||
// This should speed up sorting.
|
|
||||||
rowRefs []sortRowRef
|
rowRefs []sortRowRef
|
||||||
|
|
||||||
// rowRefNext points to the next index at rowRefs during merge shards phase
|
// rowRefNext points to the next index at rowRefs during merge shards phase
|
||||||
rowRefNext uint
|
rowRefNext int
|
||||||
|
|
||||||
// stateSizeBudget is the remaining budget for the whole state size for the shard.
|
// stateSizeBudget is the remaining budget for the whole state size for the shard.
|
||||||
// The per-shard budget is provided in chunks from the parent pipeSortProcessor.
|
// The per-shard budget is provided in chunks from the parent pipeSortProcessor.
|
||||||
|
@ -138,81 +119,88 @@ type pipeSortProcessorShardNopad struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// sortBlock represents a block of logs for sorting.
|
// sortBlock represents a block of logs for sorting.
|
||||||
//
|
|
||||||
// It doesn't own the data it refers - all the data belongs to pipeSortProcessorShard.
|
|
||||||
type sortBlock struct {
|
type sortBlock struct {
|
||||||
|
// br is a result block to sort
|
||||||
|
br *blockResult
|
||||||
|
|
||||||
// byColumns refers block data for 'by(...)' columns
|
// byColumns refers block data for 'by(...)' columns
|
||||||
byColumns []sortBlockByColumn
|
byColumns []sortBlockByColumn
|
||||||
|
|
||||||
// otherColumns refers block data for other than 'by(...)' columns
|
// otherColumns refers block data for other than 'by(...)' columns
|
||||||
otherColumns []sortBlockOtherColumn
|
otherColumns []blockResultColumn
|
||||||
}
|
}
|
||||||
|
|
||||||
// sortBlockByColumn represents data for a single column from 'sort by(...)' clause.
|
// sortBlockByColumn represents data for a single column from 'sort by(...)' clause.
|
||||||
//
|
|
||||||
// It doesn't own the data it refers - all the data belongs to pipeSortProcessorShard.
|
|
||||||
type sortBlockByColumn struct {
|
type sortBlockByColumn struct {
|
||||||
// values contains column values
|
// c contains column data
|
||||||
values []string
|
c blockResultColumn
|
||||||
|
|
||||||
// u64Values contains uint6464 numbers parsed from values
|
// u64Values contains uint64 numbers parsed from values
|
||||||
u64Values []uint64
|
u64Values []uint64
|
||||||
|
|
||||||
// f64Values contains float64 numbers parsed from values
|
// f64Values contains float64 numbers parsed from values
|
||||||
f64Values []float64
|
f64Values []float64
|
||||||
|
|
||||||
// timestamps contains timestamps for blockResultColumn.isTime column
|
|
||||||
timestamps []int64
|
|
||||||
}
|
|
||||||
|
|
||||||
// sortBlockOtherColumn represents data for a single column outside 'sort by(...)' clause.
|
|
||||||
//
|
|
||||||
// It doesn't own the data it refers - all the data belongs to pipeSortProcessorShard.
|
|
||||||
type sortBlockOtherColumn struct {
|
|
||||||
// name is the column name
|
|
||||||
name string
|
|
||||||
|
|
||||||
// values contains column values
|
|
||||||
values []string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// sortRowRef is the reference to a single log entry written to `sort` pipe.
|
// sortRowRef is the reference to a single log entry written to `sort` pipe.
|
||||||
type sortRowRef struct {
|
type sortRowRef struct {
|
||||||
// blockIdx is the index of the block at pipeSortProcessorShard.blocks.
|
// blockIdx is the index of the block at pipeSortProcessorShard.blocks.
|
||||||
blockIdx uint
|
blockIdx int
|
||||||
|
|
||||||
// rowIdx is the index of the log entry inside the block referenced by blockIdx.
|
// rowIdx is the index of the log entry inside the block referenced by blockIdx.
|
||||||
rowIdx uint
|
rowIdx int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *sortBlockByColumn) getU64ValueAtRow(rowIdx int) uint64 {
|
||||||
|
if c.c.isConst {
|
||||||
|
return c.u64Values[0]
|
||||||
|
}
|
||||||
|
return c.u64Values[rowIdx]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *sortBlockByColumn) getF64ValueAtRow(rowIdx int) float64 {
|
||||||
|
if c.c.isConst {
|
||||||
|
return c.f64Values[0]
|
||||||
|
}
|
||||||
|
return c.f64Values[rowIdx]
|
||||||
}
|
}
|
||||||
|
|
||||||
// writeBlock writes br with the given byFields to shard.
|
// writeBlock writes br with the given byFields to shard.
|
||||||
func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) {
|
func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) {
|
||||||
|
// clone br, so it could be owned by shard
|
||||||
|
br = br.clone()
|
||||||
|
|
||||||
byFields := shard.ps.byFields
|
byFields := shard.ps.byFields
|
||||||
cs := br.getColumns()
|
|
||||||
|
|
||||||
// Collect values for columns from byFields.
|
// Collect values for columns from byFields.
|
||||||
byColumnsBuf := shard.byColumnsBuf
|
byColumns := make([]sortBlockByColumn, len(byFields))
|
||||||
byColumnsBufLen := len(byColumnsBuf)
|
for i, bf := range byFields {
|
||||||
for _, bf := range byFields {
|
|
||||||
c := br.getColumnByName(bf.name)
|
c := br.getColumnByName(bf.name)
|
||||||
byColumnsBuf = append(byColumnsBuf, sortBlockByColumn{})
|
bc := &byColumns[i]
|
||||||
bc := &byColumnsBuf[len(byColumnsBuf)-1]
|
bc.c = c
|
||||||
|
|
||||||
if c.isTime {
|
if c.isTime {
|
||||||
bc.timestamps = shard.copyTimestamps(br.timestamps)
|
// Do not initialize bc.values, bc.u64Values and bc.f64Values, since they aren't used.
|
||||||
} else {
|
// This saves some memory.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if c.isConst {
|
||||||
|
// Do not initialize bc.values in order to save some memory.
|
||||||
|
bc.u64Values = shard.createUint64Values(c.encodedValues)
|
||||||
|
bc.f64Values = shard.createFloat64Values(c.encodedValues)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// pre-populate values in order to track better br memory usage
|
||||||
values := c.getValues(br)
|
values := c.getValues(br)
|
||||||
bc.values = shard.copyValues(values)
|
|
||||||
bc.u64Values = shard.createUint64Values(values)
|
bc.u64Values = shard.createUint64Values(values)
|
||||||
bc.f64Values = shard.createFloat64Values(values)
|
bc.f64Values = shard.createFloat64Values(values)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
shard.byColumnsBuf = byColumnsBuf
|
|
||||||
byColumns := byColumnsBuf[byColumnsBufLen:]
|
|
||||||
shard.stateSizeBudget -= len(byColumns) * int(unsafe.Sizeof(byColumns[0]))
|
shard.stateSizeBudget -= len(byColumns) * int(unsafe.Sizeof(byColumns[0]))
|
||||||
|
|
||||||
// Collect values for other columns.
|
// Collect values for other columns.
|
||||||
otherColumnsBuf := shard.otherColumnsBuf
|
cs := br.getColumns()
|
||||||
otherColumnsBufLen := len(otherColumnsBuf)
|
otherColumns := make([]blockResultColumn, 0, len(cs))
|
||||||
for _, c := range cs {
|
for _, c := range cs {
|
||||||
isByField := false
|
isByField := false
|
||||||
for _, bf := range byFields {
|
for _, bf := range byFields {
|
||||||
|
@ -221,65 +209,35 @@ func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if isByField {
|
if !isByField {
|
||||||
continue
|
otherColumns = append(otherColumns, c)
|
||||||
}
|
}
|
||||||
|
|
||||||
values := c.getValues(br)
|
|
||||||
values = shard.copyValues(values)
|
|
||||||
otherColumnsBuf = append(otherColumnsBuf, sortBlockOtherColumn{
|
|
||||||
name: c.name,
|
|
||||||
values: values,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
shard.otherColumnsBuf = otherColumnsBuf
|
|
||||||
otherColumns := otherColumnsBuf[otherColumnsBufLen:]
|
|
||||||
shard.stateSizeBudget -= len(otherColumns) * int(unsafe.Sizeof(otherColumns[0]))
|
shard.stateSizeBudget -= len(otherColumns) * int(unsafe.Sizeof(otherColumns[0]))
|
||||||
|
|
||||||
// Add row references to rowRefs.
|
// Add row references to rowRefs.
|
||||||
blockIdx := uint(len(shard.blocks))
|
blockIdx := len(shard.blocks)
|
||||||
rowRefs := shard.rowRefs
|
rowRefs := shard.rowRefs
|
||||||
rowRefsLen := len(rowRefs)
|
rowRefsLen := len(rowRefs)
|
||||||
for i := range br.timestamps {
|
for i := range br.timestamps {
|
||||||
rowRefs = append(rowRefs, sortRowRef{
|
rowRefs = append(rowRefs, sortRowRef{
|
||||||
blockIdx: blockIdx,
|
blockIdx: blockIdx,
|
||||||
rowIdx: uint(i),
|
rowIdx: i,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
shard.rowRefs = rowRefs
|
shard.rowRefs = rowRefs
|
||||||
shard.stateSizeBudget -= (len(rowRefs) - rowRefsLen) * int(unsafe.Sizeof(rowRefs[0]))
|
shard.stateSizeBudget -= (len(rowRefs) - rowRefsLen) * int(unsafe.Sizeof(rowRefs[0]))
|
||||||
|
|
||||||
// Add byColumns and otherColumns to blocks.
|
// Append br to shard.blocks.
|
||||||
shard.blocks = append(shard.blocks, sortBlock{
|
shard.blocks = append(shard.blocks, sortBlock{
|
||||||
|
br: br,
|
||||||
byColumns: byColumns,
|
byColumns: byColumns,
|
||||||
otherColumns: otherColumns,
|
otherColumns: otherColumns,
|
||||||
})
|
})
|
||||||
|
shard.stateSizeBudget -= br.sizeBytes()
|
||||||
shard.stateSizeBudget -= int(unsafe.Sizeof(shard.blocks[0]))
|
shard.stateSizeBudget -= int(unsafe.Sizeof(shard.blocks[0]))
|
||||||
}
|
}
|
||||||
|
|
||||||
// copyValues copies values to the shard and returns the copied values.
|
|
||||||
func (shard *pipeSortProcessorShard) copyValues(values []string) []string {
|
|
||||||
buf := shard.buf
|
|
||||||
bufLenOriginal := len(buf)
|
|
||||||
|
|
||||||
valuesBuf := shard.valuesBuf
|
|
||||||
valuesBufLen := len(valuesBuf)
|
|
||||||
|
|
||||||
for _, v := range values {
|
|
||||||
bufLen := len(buf)
|
|
||||||
buf = append(buf, v...)
|
|
||||||
valuesBuf = append(valuesBuf, bytesutil.ToUnsafeString(buf[bufLen:]))
|
|
||||||
}
|
|
||||||
|
|
||||||
shard.valuesBuf = valuesBuf
|
|
||||||
shard.buf = buf
|
|
||||||
|
|
||||||
shard.stateSizeBudget -= len(buf) - bufLenOriginal
|
|
||||||
shard.stateSizeBudget -= (len(valuesBuf) - valuesBufLen) * int(unsafe.Sizeof(valuesBuf[0]))
|
|
||||||
|
|
||||||
return valuesBuf[valuesBufLen:]
|
|
||||||
}
|
|
||||||
|
|
||||||
func (shard *pipeSortProcessorShard) createUint64Values(values []string) []uint64 {
|
func (shard *pipeSortProcessorShard) createUint64Values(values []string) []uint64 {
|
||||||
u64ValuesBuf := shard.u64ValuesBuf
|
u64ValuesBuf := shard.u64ValuesBuf
|
||||||
u64ValuesBufLen := len(u64ValuesBuf)
|
u64ValuesBufLen := len(u64ValuesBuf)
|
||||||
|
@ -318,17 +276,6 @@ func (shard *pipeSortProcessorShard) createFloat64Values(values []string) []floa
|
||||||
return f64ValuesBuf[f64ValuesBufLen:]
|
return f64ValuesBuf[f64ValuesBufLen:]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (shard *pipeSortProcessorShard) copyTimestamps(timestamps []int64) []int64 {
|
|
||||||
timestampsBuf := shard.timestampsBuf
|
|
||||||
timestampsBufLen := len(shard.timestampsBuf)
|
|
||||||
timestampsBuf = append(timestampsBuf, timestamps...)
|
|
||||||
shard.timestampsBuf = timestampsBuf
|
|
||||||
|
|
||||||
shard.stateSizeBudget -= (len(timestampsBuf) - timestampsBufLen) * int(unsafe.Sizeof(timestampsBuf[0]))
|
|
||||||
|
|
||||||
return shard.timestampsBuf[timestampsBufLen:]
|
|
||||||
}
|
|
||||||
|
|
||||||
func (psp *pipeSortProcessorShard) Len() int {
|
func (psp *pipeSortProcessorShard) Len() int {
|
||||||
return len(psp.rowRefs)
|
return len(psp.rowRefs)
|
||||||
}
|
}
|
||||||
|
@ -339,7 +286,7 @@ func (psp *pipeSortProcessorShard) Swap(i, j int) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (psp *pipeSortProcessorShard) Less(i, j int) bool {
|
func (psp *pipeSortProcessorShard) Less(i, j int) bool {
|
||||||
return sortBlockLess(psp, uint(i), psp, uint(j))
|
return sortBlockLess(psp, i, psp, j)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (psp *pipeSortProcessor) writeBlock(workerID uint, br *blockResult) {
|
func (psp *pipeSortProcessor) writeBlock(workerID uint, br *blockResult) {
|
||||||
|
@ -414,12 +361,13 @@ func (psp *pipeSortProcessor) flush() error {
|
||||||
psp: psp,
|
psp: psp,
|
||||||
}
|
}
|
||||||
var shardNext *pipeSortProcessorShard
|
var shardNext *pipeSortProcessorShard
|
||||||
|
|
||||||
for len(sh) > 1 {
|
for len(sh) > 1 {
|
||||||
shard := sh[0]
|
shard := sh[0]
|
||||||
wctx.writeRow(shard, shard.rowRefNext)
|
wctx.writeRow(shard, shard.rowRefNext)
|
||||||
shard.rowRefNext++
|
shard.rowRefNext++
|
||||||
|
|
||||||
if shard.rowRefNext >= uint(len(shard.rowRefs)) {
|
if shard.rowRefNext >= len(shard.rowRefs) {
|
||||||
_ = heap.Pop(&sh)
|
_ = heap.Pop(&sh)
|
||||||
shardNext = nil
|
shardNext = nil
|
||||||
|
|
||||||
|
@ -452,7 +400,7 @@ func (psp *pipeSortProcessor) flush() error {
|
||||||
}
|
}
|
||||||
if len(sh) == 1 {
|
if len(sh) == 1 {
|
||||||
shard := sh[0]
|
shard := sh[0]
|
||||||
for shard.rowRefNext < uint(len(shard.rowRefs)) {
|
for shard.rowRefNext < len(shard.rowRefs) {
|
||||||
wctx.writeRow(shard, shard.rowRefNext)
|
wctx.writeRow(shard, shard.rowRefNext)
|
||||||
shard.rowRefNext++
|
shard.rowRefNext++
|
||||||
}
|
}
|
||||||
|
@ -467,12 +415,10 @@ type pipeSortWriteContext struct {
|
||||||
rcs []resultColumn
|
rcs []resultColumn
|
||||||
br blockResult
|
br blockResult
|
||||||
|
|
||||||
auxBuf []byte
|
|
||||||
|
|
||||||
valuesLen int
|
valuesLen int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wctx *pipeSortWriteContext) writeRow(shard *pipeSortProcessorShard, rowIdx uint) {
|
func (wctx *pipeSortWriteContext) writeRow(shard *pipeSortProcessorShard, rowIdx int) {
|
||||||
rr := shard.rowRefs[rowIdx]
|
rr := shard.rowRefs[rowIdx]
|
||||||
b := &shard.blocks[rr.blockIdx]
|
b := &shard.blocks[rr.blockIdx]
|
||||||
|
|
||||||
|
@ -506,25 +452,17 @@ func (wctx *pipeSortWriteContext) writeRow(shard *pipeSortProcessorShard, rowIdx
|
||||||
wctx.rcs = rcs
|
wctx.rcs = rcs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
br := b.br
|
||||||
byColumns := b.byColumns
|
byColumns := b.byColumns
|
||||||
auxBuf := wctx.auxBuf
|
for i := range byColumns {
|
||||||
for i := range byFields {
|
v := byColumns[i].c.getValueAtRow(br, rr.rowIdx)
|
||||||
bc := &byColumns[i]
|
|
||||||
if len(bc.timestamps) > 0 {
|
|
||||||
auxBuf = marshalTimestampISO8601(auxBuf[:0], bc.timestamps[rr.rowIdx])
|
|
||||||
rcs[i].addValue(bytesutil.ToUnsafeString(auxBuf))
|
|
||||||
wctx.valuesLen += len(auxBuf)
|
|
||||||
} else {
|
|
||||||
v := bc.values[rr.rowIdx]
|
|
||||||
rcs[i].addValue(v)
|
rcs[i].addValue(v)
|
||||||
wctx.valuesLen += len(v)
|
wctx.valuesLen += len(v)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
wctx.auxBuf = auxBuf
|
|
||||||
|
|
||||||
otherColumns := b.otherColumns
|
otherColumns := b.otherColumns
|
||||||
for i := range otherColumns {
|
for i := range otherColumns {
|
||||||
v := otherColumns[i].values[rr.rowIdx]
|
v := otherColumns[i].getValueAtRow(br, rr.rowIdx)
|
||||||
rcs[len(byFields)+i].addValue(v)
|
rcs[len(byFields)+i].addValue(v)
|
||||||
wctx.valuesLen += len(v)
|
wctx.valuesLen += len(v)
|
||||||
}
|
}
|
||||||
|
@ -584,22 +522,22 @@ func (sh *pipeSortProcessorShardsHeap) Pop() any {
|
||||||
return x
|
return x
|
||||||
}
|
}
|
||||||
|
|
||||||
func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA uint, shardB *pipeSortProcessorShard, rowIdxB uint) bool {
|
func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA int, shardB *pipeSortProcessorShard, rowIdxB int) bool {
|
||||||
byFields := shardA.ps.byFields
|
byFields := shardA.ps.byFields
|
||||||
|
|
||||||
rrA := shardA.rowRefs[rowIdxA]
|
rrA := shardA.rowRefs[rowIdxA]
|
||||||
rrB := shardB.rowRefs[rowIdxB]
|
rrB := shardB.rowRefs[rowIdxB]
|
||||||
csA := shardA.blocks[rrA.blockIdx].byColumns
|
bA := &shardA.blocks[rrA.blockIdx]
|
||||||
csB := shardB.blocks[rrB.blockIdx].byColumns
|
bB := &shardB.blocks[rrB.blockIdx]
|
||||||
for idx := range csA {
|
for idx := range bA.byColumns {
|
||||||
cA := &csA[idx]
|
cA := &bA.byColumns[idx]
|
||||||
cB := &csB[idx]
|
cB := &bB.byColumns[idx]
|
||||||
bf := byFields[idx]
|
bf := byFields[idx]
|
||||||
|
|
||||||
if len(cA.timestamps) > 0 && len(cB.timestamps) > 0 {
|
if cA.c.isTime && cB.c.isTime {
|
||||||
// Fast path - sort by _time
|
// Fast path - sort by _time
|
||||||
tA := cA.timestamps[rrA.rowIdx]
|
tA := bA.br.timestamps[rrA.rowIdx]
|
||||||
tB := cB.timestamps[rrB.rowIdx]
|
tB := bB.br.timestamps[rrB.rowIdx]
|
||||||
if tA == tB {
|
if tA == tB {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -608,18 +546,18 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA uint, shardB *pipeSor
|
||||||
}
|
}
|
||||||
return tA < tB
|
return tA < tB
|
||||||
}
|
}
|
||||||
if len(cA.timestamps) > 0 {
|
if cA.c.isTime {
|
||||||
// treat timestamps as smaller than other values
|
// treat timestamps as smaller than other values
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if len(cB.timestamps) > 0 {
|
if cB.c.isTime {
|
||||||
// treat timestamps as smaller than other values
|
// treat timestamps as smaller than other values
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try sorting by uint64 values at first
|
// Try sorting by uint64 values at first
|
||||||
uA := cA.u64Values[rrA.rowIdx]
|
uA := cA.getU64ValueAtRow(rrA.rowIdx)
|
||||||
uB := cB.u64Values[rrB.rowIdx]
|
uB := cB.getU64ValueAtRow(rrB.rowIdx)
|
||||||
if uA != 0 && uB != 0 {
|
if uA != 0 && uB != 0 {
|
||||||
if uA == uB {
|
if uA == uB {
|
||||||
continue
|
continue
|
||||||
|
@ -631,8 +569,8 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA uint, shardB *pipeSor
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try sorting by float64 then
|
// Try sorting by float64 then
|
||||||
fA := cA.f64Values[rrA.rowIdx]
|
fA := cA.getF64ValueAtRow(rrA.rowIdx)
|
||||||
fB := cB.f64Values[rrB.rowIdx]
|
fB := cB.getF64ValueAtRow(rrB.rowIdx)
|
||||||
if !math.IsNaN(fA) && !math.IsNaN(fB) {
|
if !math.IsNaN(fA) && !math.IsNaN(fB) {
|
||||||
if fA == fB {
|
if fA == fB {
|
||||||
continue
|
continue
|
||||||
|
@ -644,8 +582,8 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA uint, shardB *pipeSor
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fall back to string sorting
|
// Fall back to string sorting
|
||||||
sA := cA.values[rrA.rowIdx]
|
sA := cA.c.getValueAtRow(bA.br, rrA.rowIdx)
|
||||||
sB := cB.values[rrB.rowIdx]
|
sB := cB.c.getValueAtRow(bB.br, rrB.rowIdx)
|
||||||
if sA == sB {
|
if sA == sB {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue