This commit is contained in:
Aliaksandr Valialkin 2024-05-15 16:04:10 +02:00
parent 639b3091b5
commit 454f781cd1
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
3 changed files with 20 additions and 138 deletions

View file

@ -396,6 +396,8 @@ func (br *blockResult) addColumn(bs *blockSearch, ch *columnHeader, bm *bitmap)
br.csBuf = append(br.csBuf, blockResultColumn{ br.csBuf = append(br.csBuf, blockResultColumn{
name: getCanonicalColumnName(name), name: getCanonicalColumnName(name),
minValue: ch.minValue,
maxValue: ch.maxValue,
valueType: ch.valueType, valueType: ch.valueType,
dictValues: dictValues, dictValues: dictValues,
encodedValues: encodedValues, encodedValues: encodedValues,
@ -1333,6 +1335,12 @@ type blockResultColumn struct {
// name is column name. // name is column name.
name string name string
// minValue is the minimum value in the block for uint*, float64, ipv4 and timestamp valueType
minValue uint64
// maxValue is the maximum value in the block for uint*, float64, ipv4 and timestamp valueType
maxValue uint64
// isConst is set to true if the column is const. // isConst is set to true if the column is const.
// //
// The column value is stored in encodedValues[0] // The column value is stored in encodedValues[0]

View file

@ -122,88 +122,25 @@ func (smp *statsMaxProcessor) updateStateForColumn(br *blockResult, c *blockResu
for _, v := range c.dictValues { for _, v := range c.dictValues {
smp.updateStateString(v) smp.updateStateString(v)
} }
case valueTypeUint8: case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64:
maxN := unmarshalUint8(c.encodedValues[0])
for _, v := range c.encodedValues[1:] {
n := unmarshalUint8(v)
if n > maxN {
maxN = n
}
}
bb := bbPool.Get() bb := bbPool.Get()
bb.B = marshalUint8String(bb.B[:0], maxN) bb.B = marshalUint64String(bb.B[:0], c.maxValue)
smp.updateStateBytes(bb.B)
bbPool.Put(bb)
case valueTypeUint16:
maxN := unmarshalUint16(c.encodedValues[0])
for _, v := range c.encodedValues[1:] {
n := unmarshalUint16(v)
if n > maxN {
maxN = n
}
}
bb := bbPool.Get()
bb.B = marshalUint16String(bb.B[:0], maxN)
smp.updateStateBytes(bb.B)
bbPool.Put(bb)
case valueTypeUint32:
maxN := unmarshalUint32(c.encodedValues[0])
for _, v := range c.encodedValues[1:] {
n := unmarshalUint32(v)
if n > maxN {
maxN = n
}
}
bb := bbPool.Get()
bb.B = marshalUint32String(bb.B[:0], maxN)
smp.updateStateBytes(bb.B)
bbPool.Put(bb)
case valueTypeUint64:
maxN := unmarshalUint64(c.encodedValues[0])
for _, v := range c.encodedValues[1:] {
n := unmarshalUint64(v)
if n > maxN {
maxN = n
}
}
bb := bbPool.Get()
bb.B = marshalUint64String(bb.B[:0], maxN)
smp.updateStateBytes(bb.B) smp.updateStateBytes(bb.B)
bbPool.Put(bb) bbPool.Put(bb)
case valueTypeFloat64: case valueTypeFloat64:
maxF := unmarshalFloat64(c.encodedValues[0]) f := math.Float64frombits(c.maxValue)
for _, v := range c.encodedValues[1:] {
f := unmarshalFloat64(v)
if math.IsNaN(maxF) || f > maxF {
maxF = f
}
}
bb := bbPool.Get() bb := bbPool.Get()
bb.B = marshalFloat64String(bb.B[:0], maxF) bb.B = marshalFloat64String(bb.B[:0], f)
smp.updateStateBytes(bb.B) smp.updateStateBytes(bb.B)
bbPool.Put(bb) bbPool.Put(bb)
case valueTypeIPv4: case valueTypeIPv4:
maxIP := unmarshalIPv4(c.encodedValues[0])
for _, v := range c.encodedValues[1:] {
ip := unmarshalIPv4(v)
if ip > maxIP {
maxIP = ip
}
}
bb := bbPool.Get() bb := bbPool.Get()
bb.B = marshalIPv4String(bb.B[:0], maxIP) bb.B = marshalIPv4String(bb.B[:0], uint32(c.maxValue))
smp.updateStateBytes(bb.B) smp.updateStateBytes(bb.B)
bbPool.Put(bb) bbPool.Put(bb)
case valueTypeTimestampISO8601: case valueTypeTimestampISO8601:
maxTimestamp := unmarshalTimestampISO8601(c.encodedValues[0])
for _, v := range c.encodedValues[1:] {
timestamp := unmarshalTimestampISO8601(v)
if timestamp > maxTimestamp {
maxTimestamp = timestamp
}
}
bb := bbPool.Get() bb := bbPool.Get()
bb.B = marshalTimestampISO8601String(bb.B[:0], maxTimestamp) bb.B = marshalTimestampISO8601String(bb.B[:0], int64(c.maxValue))
smp.updateStateBytes(bb.B) smp.updateStateBytes(bb.B)
bbPool.Put(bb) bbPool.Put(bb)
default: default:

View file

@ -122,88 +122,25 @@ func (smp *statsMinProcessor) updateStateForColumn(br *blockResult, c *blockResu
for _, v := range c.dictValues { for _, v := range c.dictValues {
smp.updateStateString(v) smp.updateStateString(v)
} }
case valueTypeUint8: case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64:
minN := unmarshalUint8(c.encodedValues[0])
for _, v := range c.encodedValues[1:] {
n := unmarshalUint8(v)
if n < minN {
minN = n
}
}
bb := bbPool.Get() bb := bbPool.Get()
bb.B = marshalUint8String(bb.B[:0], minN) bb.B = marshalUint64String(bb.B[:0], c.minValue)
smp.updateStateBytes(bb.B)
bbPool.Put(bb)
case valueTypeUint16:
minN := unmarshalUint16(c.encodedValues[0])
for _, v := range c.encodedValues[1:] {
n := unmarshalUint16(v)
if n < minN {
minN = n
}
}
bb := bbPool.Get()
bb.B = marshalUint16String(bb.B[:0], minN)
smp.updateStateBytes(bb.B)
bbPool.Put(bb)
case valueTypeUint32:
minN := unmarshalUint32(c.encodedValues[0])
for _, v := range c.encodedValues[1:] {
n := unmarshalUint32(v)
if n < minN {
minN = n
}
}
bb := bbPool.Get()
bb.B = marshalUint32String(bb.B[:0], minN)
smp.updateStateBytes(bb.B)
bbPool.Put(bb)
case valueTypeUint64:
minN := unmarshalUint64(c.encodedValues[0])
for _, v := range c.encodedValues[1:] {
n := unmarshalUint64(v)
if n < minN {
minN = n
}
}
bb := bbPool.Get()
bb.B = marshalUint64String(bb.B[:0], minN)
smp.updateStateBytes(bb.B) smp.updateStateBytes(bb.B)
bbPool.Put(bb) bbPool.Put(bb)
case valueTypeFloat64: case valueTypeFloat64:
minF := unmarshalFloat64(c.encodedValues[0]) f := math.Float64frombits(c.minValue)
for _, v := range c.encodedValues[1:] {
f := unmarshalFloat64(v)
if math.IsNaN(minF) || f < minF {
minF = f
}
}
bb := bbPool.Get() bb := bbPool.Get()
bb.B = marshalFloat64String(bb.B[:0], minF) bb.B = marshalFloat64String(bb.B[:0], f)
smp.updateStateBytes(bb.B) smp.updateStateBytes(bb.B)
bbPool.Put(bb) bbPool.Put(bb)
case valueTypeIPv4: case valueTypeIPv4:
minIP := unmarshalIPv4(c.encodedValues[0])
for _, v := range c.encodedValues[1:] {
ip := unmarshalIPv4(v)
if ip < minIP {
minIP = ip
}
}
bb := bbPool.Get() bb := bbPool.Get()
bb.B = marshalIPv4String(bb.B[:0], minIP) bb.B = marshalIPv4String(bb.B[:0], uint32(c.minValue))
smp.updateStateBytes(bb.B) smp.updateStateBytes(bb.B)
bbPool.Put(bb) bbPool.Put(bb)
case valueTypeTimestampISO8601: case valueTypeTimestampISO8601:
minTimestamp := unmarshalTimestampISO8601(c.encodedValues[0])
for _, v := range c.encodedValues[1:] {
timestamp := unmarshalTimestampISO8601(v)
if timestamp < minTimestamp {
minTimestamp = timestamp
}
}
bb := bbPool.Get() bb := bbPool.Get()
bb.B = marshalTimestampISO8601String(bb.B[:0], minTimestamp) bb.B = marshalTimestampISO8601String(bb.B[:0], int64(c.minValue))
smp.updateStateBytes(bb.B) smp.updateStateBytes(bb.B)
bbPool.Put(bb) bbPool.Put(bb)
default: default: