VictoriaMetrics/lib/logstorage/block_result.go
Aliaksandr Valialkin 1716c4e609
lib/logstorage: properly parse timezone offset at TryParseTimestampRFC3339Nano()
The TryParseTimestampRFC3339Nano() must properly parse RFC3339 timestamps with timezone offsets.

While at it, make tryParseTimestampISO8601 function private in order to prevent
from improper usage of this function from outside the lib/logstorage package.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6508
2024-06-25 14:54:24 +02:00

1975 lines
48 KiB
Go

package logstorage
import (
"math"
"slices"
"strconv"
"strings"
"sync/atomic"
"time"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fastnum"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
)
// blockResult holds results for a single block of log entries.
//
// It is expected that its contents is accessed only from a single goroutine at a time.
type blockResult struct {
// a holds all the bytes behind the requested column values in the block.
a arena
// values holds all the requested column values in the block.
valuesBuf []string
// timestamps contain timestamps for the selected log entries in the block.
timestamps []int64
// csBuf contains requested columns.
csBuf []blockResultColumn
// csEmpty contains non-existing columns, which were referenced via getColumnByName()
csEmpty []blockResultColumn
// cs contains cached pointers to requested columns returned from getColumns() if csInitialized=true.
cs []*blockResultColumn
// csInitialized is set to true if cs is properly initialized and can be returned from getColumns().
csInitialized bool
fvecs []filteredValuesEncodedCreator
svecs []searchValuesEncodedCreator
}
func (br *blockResult) reset() {
br.a.reset()
clear(br.valuesBuf)
br.valuesBuf = br.valuesBuf[:0]
br.timestamps = br.timestamps[:0]
clear(br.csBuf)
br.csBuf = br.csBuf[:0]
clear(br.csEmpty)
br.csEmpty = br.csEmpty[:0]
clear(br.cs)
br.cs = br.cs[:0]
br.csInitialized = false
clear(br.fvecs)
br.fvecs = br.fvecs[:0]
clear(br.svecs)
br.svecs = br.svecs[:0]
}
// clone returns a clone of br, which owns its own data.
func (br *blockResult) clone() *blockResult {
brNew := &blockResult{}
cs := br.getColumns()
// Pre-populate values in every column in order to properly calculate the needed backing buffer size below.
for _, c := range cs {
_ = c.getValues(br)
}
// Calculate the backing buffer size needed for cloning column values.
bufLen := 0
for _, c := range cs {
bufLen += c.neededBackingBufLen()
}
brNew.a.preallocate(bufLen)
valuesBufLen := 0
for _, c := range cs {
valuesBufLen += c.neededBackingValuesBufLen()
}
brNew.valuesBuf = make([]string, 0, valuesBufLen)
brNew.timestamps = make([]int64, len(br.timestamps))
copy(brNew.timestamps, br.timestamps)
csNew := make([]blockResultColumn, len(cs))
for i, c := range cs {
csNew[i] = c.clone(brNew)
}
brNew.csBuf = csNew
// do not clone br.csEmpty - it will be populated by the caller via getColumnByName().
// do not clone br.fvecs and br.svecs, since they may point to external data.
return brNew
}
// initFromFilterAllColumns initializes br from brSrc by copying rows identified by set bets at bm.
//
// The br is valid until brSrc or bm is updated.
func (br *blockResult) initFromFilterAllColumns(brSrc *blockResult, bm *bitmap) {
br.reset()
srcTimestamps := brSrc.timestamps
dstTimestamps := br.timestamps[:0]
bm.forEachSetBitReadonly(func(idx int) {
dstTimestamps = append(dstTimestamps, srcTimestamps[idx])
})
br.timestamps = dstTimestamps
for _, cSrc := range brSrc.getColumns() {
br.appendFilteredColumn(brSrc, cSrc, bm)
}
}
// appendFilteredColumn adds cSrc with the given bm filter to br.
//
// the br is valid until brSrc, cSrc or bm is updated.
func (br *blockResult) appendFilteredColumn(brSrc *blockResult, cSrc *blockResultColumn, bm *bitmap) {
if len(br.timestamps) == 0 {
return
}
cDst := blockResultColumn{
name: cSrc.name,
}
if cSrc.isConst {
cDst.isConst = true
cDst.valuesEncoded = cSrc.valuesEncoded
} else if cSrc.isTime {
cDst.isTime = true
} else {
cDst.valueType = cSrc.valueType
cDst.minValue = cSrc.minValue
cDst.maxValue = cSrc.maxValue
cDst.dictValues = cSrc.dictValues
br.fvecs = append(br.fvecs, filteredValuesEncodedCreator{
br: brSrc,
c: cSrc,
bm: bm,
})
cDst.valuesEncodedCreator = &br.fvecs[len(br.fvecs)-1]
}
br.csBuf = append(br.csBuf, cDst)
br.csInitialized = false
}
type filteredValuesEncodedCreator struct {
br *blockResult
c *blockResultColumn
bm *bitmap
}
func (fvec *filteredValuesEncodedCreator) newValuesEncoded(br *blockResult) []string {
valuesEncodedSrc := fvec.c.getValuesEncoded(fvec.br)
valuesBuf := br.valuesBuf
valuesBufLen := len(valuesBuf)
fvec.bm.forEachSetBitReadonly(func(idx int) {
valuesBuf = append(valuesBuf, valuesEncodedSrc[idx])
})
br.valuesBuf = valuesBuf
return valuesBuf[valuesBufLen:]
}
// cloneValues clones the given values into br and returns the cloned values.
func (br *blockResult) cloneValues(values []string) []string {
if values == nil {
return nil
}
valuesBufLen := len(br.valuesBuf)
for _, v := range values {
br.addValue(v)
}
return br.valuesBuf[valuesBufLen:]
}
func (br *blockResult) addValue(v string) {
valuesBuf := br.valuesBuf
if len(valuesBuf) > 0 && v == valuesBuf[len(valuesBuf)-1] {
v = valuesBuf[len(valuesBuf)-1]
} else {
v = br.a.copyString(v)
}
br.valuesBuf = append(br.valuesBuf, v)
}
// sizeBytes returns the size of br in bytes.
func (br *blockResult) sizeBytes() int {
n := int(unsafe.Sizeof(*br))
n += br.a.sizeBytes()
n += cap(br.valuesBuf) * int(unsafe.Sizeof(br.valuesBuf[0]))
n += cap(br.timestamps) * int(unsafe.Sizeof(br.timestamps[0]))
n += cap(br.csBuf) * int(unsafe.Sizeof(br.csBuf[0]))
n += cap(br.cs) * int(unsafe.Sizeof(br.cs[0]))
return n
}
// setResultColumns sets the given rcs as br columns.
//
// The br is valid only until rcs are modified.
func (br *blockResult) setResultColumns(rcs []resultColumn, rowsCount int) {
br.reset()
br.timestamps = fastnum.AppendInt64Zeros(br.timestamps[:0], rowsCount)
for i := range rcs {
br.addResultColumn(&rcs[i])
}
}
func (br *blockResult) addResultColumn(rc *resultColumn) {
if len(rc.values) != len(br.timestamps) {
logger.Panicf("BUG: column %q must contain %d rows, but it contains %d rows", rc.name, len(br.timestamps), len(rc.values))
}
if areConstValues(rc.values) {
// This optimization allows reducing memory usage after br cloning
br.csBuf = append(br.csBuf, blockResultColumn{
name: rc.name,
isConst: true,
valuesEncoded: rc.values[:1],
})
} else {
br.csBuf = append(br.csBuf, blockResultColumn{
name: rc.name,
valueType: valueTypeString,
valuesEncoded: rc.values,
})
}
br.csInitialized = false
}
// initAllColumns initializes all the columns in br according to bs and bm.
//
// The initialized columns are valid until bs and bm are changed.
func (br *blockResult) initAllColumns(bs *blockSearch, bm *bitmap) {
unneededColumnNames := bs.bsw.so.unneededColumnNames
if !slices.Contains(unneededColumnNames, "_time") {
// Add _time column
br.addTimeColumn()
}
if !slices.Contains(unneededColumnNames, "_stream_id") {
// Add _stream_id column
br.addStreamIDColumn(bs)
}
if !slices.Contains(unneededColumnNames, "_stream") {
// Add _stream column
if !br.addStreamColumn(bs) {
// Skip the current block, since the associated stream tags are missing
br.reset()
return
}
}
if !slices.Contains(unneededColumnNames, "_msg") {
// Add _msg column
v := bs.csh.getConstColumnValue("_msg")
if v != "" {
br.addConstColumn("_msg", v)
} else if ch := bs.csh.getColumnHeader("_msg"); ch != nil {
br.addColumn(bs, bm, ch)
} else {
br.addConstColumn("_msg", "")
}
}
// Add other const columns
for _, cc := range bs.csh.constColumns {
if isMsgFieldName(cc.Name) {
continue
}
if !slices.Contains(unneededColumnNames, cc.Name) {
br.addConstColumn(cc.Name, cc.Value)
}
}
// Add other non-const columns
chs := bs.csh.columnHeaders
for i := range chs {
ch := &chs[i]
if isMsgFieldName(ch.name) {
continue
}
if !slices.Contains(unneededColumnNames, ch.name) {
br.addColumn(bs, bm, ch)
}
}
br.csInitFast()
}
// initRequestedColumns initialized only requested columns in br according to bs and bm.
//
// The initialized columns are valid until bs and bm are changed.
func (br *blockResult) initRequestedColumns(bs *blockSearch, bm *bitmap) {
for _, columnName := range bs.bsw.so.neededColumnNames {
switch columnName {
case "_stream_id":
br.addStreamIDColumn(bs)
case "_stream":
if !br.addStreamColumn(bs) {
// Skip the current block, since the associated stream tags are missing.
br.reset()
return
}
case "_time":
br.addTimeColumn()
default:
v := bs.csh.getConstColumnValue(columnName)
if v != "" {
br.addConstColumn(columnName, v)
} else if ch := bs.csh.getColumnHeader(columnName); ch != nil {
br.addColumn(bs, bm, ch)
} else {
br.addConstColumn(columnName, "")
}
}
}
br.csInitFast()
}
func (br *blockResult) mustInit(bs *blockSearch, bm *bitmap) {
br.reset()
if bm.isZero() {
// Nothing to initialize for zero matching log entries in the block.
return
}
// Initialize timestamps, since they are required for all the further work with br.
so := bs.bsw.so
if !so.needAllColumns && !slices.Contains(so.neededColumnNames, "_time") || so.needAllColumns && slices.Contains(so.unneededColumnNames, "_time") {
// The fastest path - _time column wasn't requested, so it is enough to initialize br.timestamps with zeroes.
rowsLen := bm.onesCount()
br.timestamps = fastnum.AppendInt64Zeros(br.timestamps[:0], rowsLen)
return
}
// Slow path - the _time column is requested, so we need to initialize br.timestamps with real timestamps.
srcTimestamps := bs.getTimestamps()
if bm.areAllBitsSet() {
// Fast path - all the rows in the block are selected, so copy all the timestamps without any filtering.
br.timestamps = append(br.timestamps[:0], srcTimestamps...)
return
}
// Slow path - copy only the needed timestamps to br according to filter results.
dstTimestamps := br.timestamps[:0]
bm.forEachSetBitReadonly(func(idx int) {
ts := srcTimestamps[idx]
dstTimestamps = append(dstTimestamps, ts)
})
br.timestamps = dstTimestamps
}
func (br *blockResult) newValuesEncodedFromColumnHeader(bs *blockSearch, bm *bitmap, ch *columnHeader) []string {
valuesBufLen := len(br.valuesBuf)
switch ch.valueType {
case valueTypeString:
visitValuesReadonly(bs, ch, bm, br.addValue)
case valueTypeDict:
visitValuesReadonly(bs, ch, bm, func(v string) {
if len(v) != 1 {
logger.Panicf("FATAL: %s: unexpected dict value size for column %q; got %d bytes; want 1 byte", bs.partPath(), ch.name, len(v))
}
dictIdx := v[0]
if int(dictIdx) >= len(ch.valuesDict.values) {
logger.Panicf("FATAL: %s: too big dict index for column %q: %d; should be smaller than %d", bs.partPath(), ch.name, dictIdx, len(ch.valuesDict.values))
}
br.addValue(v)
})
case valueTypeUint8:
visitValuesReadonly(bs, ch, bm, func(v string) {
if len(v) != 1 {
logger.Panicf("FATAL: %s: unexpected size for uint8 column %q; got %d bytes; want 1 byte", bs.partPath(), ch.name, len(v))
}
br.addValue(v)
})
case valueTypeUint16:
visitValuesReadonly(bs, ch, bm, func(v string) {
if len(v) != 2 {
logger.Panicf("FATAL: %s: unexpected size for uint16 column %q; got %d bytes; want 2 bytes", bs.partPath(), ch.name, len(v))
}
br.addValue(v)
})
case valueTypeUint32:
visitValuesReadonly(bs, ch, bm, func(v string) {
if len(v) != 4 {
logger.Panicf("FATAL: %s: unexpected size for uint32 column %q; got %d bytes; want 4 bytes", bs.partPath(), ch.name, len(v))
}
br.addValue(v)
})
case valueTypeUint64:
visitValuesReadonly(bs, ch, bm, func(v string) {
if len(v) != 8 {
logger.Panicf("FATAL: %s: unexpected size for uint64 column %q; got %d bytes; want 8 bytes", bs.partPath(), ch.name, len(v))
}
br.addValue(v)
})
case valueTypeFloat64:
visitValuesReadonly(bs, ch, bm, func(v string) {
if len(v) != 8 {
logger.Panicf("FATAL: %s: unexpected size for float64 column %q; got %d bytes; want 8 bytes", bs.partPath(), ch.name, len(v))
}
br.addValue(v)
})
case valueTypeIPv4:
visitValuesReadonly(bs, ch, bm, func(v string) {
if len(v) != 4 {
logger.Panicf("FATAL: %s: unexpected size for ipv4 column %q; got %d bytes; want 4 bytes", bs.partPath(), ch.name, len(v))
}
br.addValue(v)
})
case valueTypeTimestampISO8601:
visitValuesReadonly(bs, ch, bm, func(v string) {
if len(v) != 8 {
logger.Panicf("FATAL: %s: unexpected size for timestmap column %q; got %d bytes; want 8 bytes", bs.partPath(), ch.name, len(v))
}
br.addValue(v)
})
default:
logger.Panicf("FATAL: %s: unknown valueType=%d for column %q", bs.partPath(), ch.valueType, ch.name)
}
return br.valuesBuf[valuesBufLen:]
}
// addColumn adds column for the given ch to br.
//
// The added column is valid until bs, bm or ch is changed.
func (br *blockResult) addColumn(bs *blockSearch, bm *bitmap, ch *columnHeader) {
br.csBuf = append(br.csBuf, blockResultColumn{
name: getCanonicalColumnName(ch.name),
valueType: ch.valueType,
minValue: ch.minValue,
maxValue: ch.maxValue,
dictValues: ch.valuesDict.values,
})
c := &br.csBuf[len(br.csBuf)-1]
br.svecs = append(br.svecs, searchValuesEncodedCreator{
bs: bs,
bm: bm,
ch: ch,
})
c.valuesEncodedCreator = &br.svecs[len(br.svecs)-1]
br.csInitialized = false
}
type searchValuesEncodedCreator struct {
bs *blockSearch
bm *bitmap
ch *columnHeader
}
func (svec *searchValuesEncodedCreator) newValuesEncoded(br *blockResult) []string {
return br.newValuesEncodedFromColumnHeader(svec.bs, svec.bm, svec.ch)
}
func (br *blockResult) addTimeColumn() {
br.csBuf = append(br.csBuf, blockResultColumn{
name: "_time",
isTime: true,
})
br.csInitialized = false
}
func (br *blockResult) addStreamIDColumn(bs *blockSearch) {
bb := bbPool.Get()
bb.B = bs.bsw.bh.streamID.marshalString(bb.B)
br.addConstColumn("_stream_id", bytesutil.ToUnsafeString(bb.B))
bbPool.Put(bb)
}
func (br *blockResult) addStreamColumn(bs *blockSearch) bool {
if !bs.prevStreamID.equal(&bs.bsw.bh.streamID) {
return br.addStreamColumnSlow(bs)
}
if len(bs.prevStream) == 0 {
return false
}
br.addConstColumn("_stream", bytesutil.ToUnsafeString(bs.prevStream))
return true
}
func (br *blockResult) addStreamColumnSlow(bs *blockSearch) bool {
bb := bbPool.Get()
defer bbPool.Put(bb)
streamID := &bs.bsw.bh.streamID
bb.B = bs.bsw.p.pt.appendStreamTagsByStreamID(bb.B[:0], streamID)
if len(bb.B) == 0 {
// Couldn't find stream tags by streamID. This may be the case when the corresponding log stream
// was recently registered and its tags aren't visible to search yet.
// The stream tags must become visible in a few seconds.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6042
bs.prevStreamID = *streamID
bs.prevStream = bs.prevStream[:0]
return false
}
st := GetStreamTags()
mustUnmarshalStreamTags(st, bb.B)
bb.B = st.marshalString(bb.B[:0])
PutStreamTags(st)
s := bytesutil.ToUnsafeString(bb.B)
br.addConstColumn("_stream", s)
bs.prevStreamID = *streamID
bs.prevStream = append(bs.prevStream[:0], s...)
return true
}
func (br *blockResult) addConstColumn(name, value string) {
nameCopy := br.a.copyString(name)
valuesBufLen := len(br.valuesBuf)
br.addValue(value)
valuesEncoded := br.valuesBuf[valuesBufLen:]
br.csBuf = append(br.csBuf, blockResultColumn{
name: nameCopy,
isConst: true,
valuesEncoded: valuesEncoded,
})
br.csInitialized = false
}
func (br *blockResult) newValuesBucketedForColumn(c *blockResultColumn, bf *byStatsField) []string {
if c.isConst {
v := c.valuesEncoded[0]
return br.getBucketedConstValues(v, bf)
}
if c.isTime {
return br.getBucketedTimestampValues(bf)
}
valuesEncoded := c.getValuesEncoded(br)
switch c.valueType {
case valueTypeString:
return br.getBucketedStringValues(valuesEncoded, bf)
case valueTypeDict:
return br.getBucketedDictValues(valuesEncoded, c.dictValues, bf)
case valueTypeUint8:
return br.getBucketedUint8Values(valuesEncoded, bf)
case valueTypeUint16:
return br.getBucketedUint16Values(valuesEncoded, bf)
case valueTypeUint32:
return br.getBucketedUint32Values(valuesEncoded, bf)
case valueTypeUint64:
return br.getBucketedUint64Values(valuesEncoded, bf)
case valueTypeFloat64:
return br.getBucketedFloat64Values(valuesEncoded, bf)
case valueTypeIPv4:
return br.getBucketedIPv4Values(valuesEncoded, bf)
case valueTypeTimestampISO8601:
return br.getBucketedTimestampISO8601Values(valuesEncoded, bf)
default:
logger.Panicf("BUG: unknown valueType=%d", c.valueType)
return nil
}
}
func (br *blockResult) getBucketedConstValues(v string, bf *byStatsField) []string {
if v == "" {
// Fast path - return a slice of empty strings without constructing the slice.
return getEmptyStrings(len(br.timestamps))
}
// Slower path - construct slice of identical values with the len(br.timestamps)
valuesBuf := br.valuesBuf
valuesBufLen := len(valuesBuf)
v = br.getBucketedValue(v, bf)
for range br.timestamps {
valuesBuf = append(valuesBuf, v)
}
br.valuesBuf = valuesBuf
return valuesBuf[valuesBufLen:]
}
func (br *blockResult) getBucketedTimestampValues(bf *byStatsField) []string {
buf := br.a.b
valuesBuf := br.valuesBuf
valuesBufLen := len(valuesBuf)
timestamps := br.timestamps
var s string
if !bf.hasBucketConfig() {
for i := range timestamps {
if i > 0 && timestamps[i-1] == timestamps[i] {
valuesBuf = append(valuesBuf, s)
continue
}
bufLen := len(buf)
buf = marshalTimestampRFC3339NanoString(buf, timestamps[i])
s = bytesutil.ToUnsafeString(buf[bufLen:])
valuesBuf = append(valuesBuf, s)
}
} else {
bucketSizeInt := int64(bf.bucketSize)
if bucketSizeInt <= 0 {
bucketSizeInt = 1
}
bucketOffsetInt := int64(bf.bucketOffset)
timestampPrev := int64(0)
for i := range timestamps {
if i > 0 && timestamps[i-1] == timestamps[i] {
valuesBuf = append(valuesBuf, s)
continue
}
timestamp := timestamps[i]
timestamp -= bucketOffsetInt
if bf.bucketSizeStr == "month" {
timestamp = truncateTimestampToMonth(timestamp)
} else if bf.bucketSizeStr == "year" {
timestamp = truncateTimestampToYear(timestamp)
} else {
timestamp -= timestamp % bucketSizeInt
}
timestamp += bucketOffsetInt
if i > 0 && timestampPrev == timestamp {
valuesBuf = append(valuesBuf, s)
continue
}
timestampPrev = timestamp
bufLen := len(buf)
buf = marshalTimestampRFC3339NanoString(buf, timestamp)
s = bytesutil.ToUnsafeString(buf[bufLen:])
valuesBuf = append(valuesBuf, s)
}
}
br.a.b = buf
br.valuesBuf = valuesBuf
return valuesBuf[valuesBufLen:]
}
func (br *blockResult) getBucketedStringValues(values []string, bf *byStatsField) []string {
if !bf.hasBucketConfig() {
return values
}
valuesBuf := br.valuesBuf
valuesBufLen := len(valuesBuf)
var s string
for i := range values {
if i > 0 && values[i-1] == values[i] {
valuesBuf = append(valuesBuf, s)
continue
}
s = br.getBucketedValue(values[i], bf)
valuesBuf = append(valuesBuf, s)
}
br.valuesBuf = valuesBuf
return valuesBuf[valuesBufLen:]
}
func (br *blockResult) getBucketedDictValues(valuesEncoded, dictValues []string, bf *byStatsField) []string {
valuesBuf := br.valuesBuf
valuesBufLen := len(valuesBuf)
dictValues = br.getBucketedStringValues(dictValues, bf)
for _, v := range valuesEncoded {
dictIdx := v[0]
valuesBuf = append(valuesBuf, dictValues[dictIdx])
}
br.valuesBuf = valuesBuf
return valuesBuf[valuesBufLen:]
}
func (br *blockResult) getBucketedUint8Values(valuesEncoded []string, bf *byStatsField) []string {
buf := br.a.b
valuesBuf := br.valuesBuf
valuesBufLen := len(valuesBuf)
var s string
if !bf.hasBucketConfig() {
for i, v := range valuesEncoded {
if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] {
valuesBuf = append(valuesBuf, s)
continue
}
n := unmarshalUint8(v)
bufLen := len(buf)
buf = marshalUint8String(buf, n)
s = bytesutil.ToUnsafeString(buf[bufLen:])
valuesBuf = append(valuesBuf, s)
}
} else {
bucketSizeInt := uint64(bf.bucketSize)
if bucketSizeInt <= 0 {
bucketSizeInt = 1
}
bucketOffsetInt := uint64(int64(bf.bucketOffset))
nPrev := uint64(0)
for i, v := range valuesEncoded {
if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] {
valuesBuf = append(valuesBuf, s)
continue
}
n := uint64(unmarshalUint8(v))
n -= bucketOffsetInt
n -= n % bucketSizeInt
n += bucketOffsetInt
if i > 0 && nPrev == n {
valuesBuf = append(valuesBuf, s)
continue
}
nPrev = n
bufLen := len(buf)
buf = marshalUint64String(buf, n)
s = bytesutil.ToUnsafeString(buf[bufLen:])
valuesBuf = append(valuesBuf, s)
}
}
br.valuesBuf = valuesBuf
br.a.b = buf
return br.valuesBuf[valuesBufLen:]
}
func (br *blockResult) getBucketedUint16Values(valuesEncoded []string, bf *byStatsField) []string {
buf := br.a.b
valuesBuf := br.valuesBuf
valuesBufLen := len(valuesBuf)
var s string
if !bf.hasBucketConfig() {
for i, v := range valuesEncoded {
if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] {
valuesBuf = append(valuesBuf, s)
continue
}
n := unmarshalUint16(v)
bufLen := len(buf)
buf = marshalUint16String(buf, n)
s = bytesutil.ToUnsafeString(buf[bufLen:])
valuesBuf = append(valuesBuf, s)
}
} else {
bucketSizeInt := uint64(bf.bucketSize)
if bucketSizeInt <= 0 {
bucketSizeInt = 1
}
bucketOffsetInt := uint64(int64(bf.bucketOffset))
nPrev := uint64(0)
for i, v := range valuesEncoded {
if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] {
valuesBuf = append(valuesBuf, s)
continue
}
n := uint64(unmarshalUint16(v))
n -= bucketOffsetInt
n -= n % bucketSizeInt
n += bucketOffsetInt
if i > 0 && nPrev == n {
valuesBuf = append(valuesBuf, s)
continue
}
nPrev = n
bufLen := len(buf)
buf = marshalUint64String(buf, n)
s = bytesutil.ToUnsafeString(buf[bufLen:])
valuesBuf = append(valuesBuf, s)
}
}
br.valuesBuf = valuesBuf
br.a.b = buf
return br.valuesBuf[valuesBufLen:]
}
func (br *blockResult) getBucketedUint32Values(valuesEncoded []string, bf *byStatsField) []string {
buf := br.a.b
valuesBuf := br.valuesBuf
valuesBufLen := len(valuesBuf)
var s string
if !bf.hasBucketConfig() {
for i, v := range valuesEncoded {
if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] {
valuesBuf = append(valuesBuf, s)
continue
}
n := unmarshalUint32(v)
bufLen := len(buf)
buf = marshalUint32String(buf, n)
s = bytesutil.ToUnsafeString(buf[bufLen:])
valuesBuf = append(valuesBuf, s)
}
} else {
bucketSizeInt := uint64(bf.bucketSize)
if bucketSizeInt <= 0 {
bucketSizeInt = 1
}
bucketOffsetInt := uint64(int64(bf.bucketOffset))
nPrev := uint64(0)
for i, v := range valuesEncoded {
if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] {
valuesBuf = append(valuesBuf, s)
continue
}
n := uint64(unmarshalUint32(v))
n -= bucketOffsetInt
n -= n % bucketSizeInt
n += bucketOffsetInt
if i > 0 && nPrev == n {
valuesBuf = append(valuesBuf, s)
continue
}
nPrev = n
bufLen := len(buf)
buf = marshalUint64String(buf, n)
s = bytesutil.ToUnsafeString(buf[bufLen:])
valuesBuf = append(valuesBuf, s)
}
}
br.valuesBuf = valuesBuf
br.a.b = buf
return br.valuesBuf[valuesBufLen:]
}
func (br *blockResult) getBucketedUint64Values(valuesEncoded []string, bf *byStatsField) []string {
buf := br.a.b
valuesBuf := br.valuesBuf
valuesBufLen := len(valuesBuf)
var s string
if !bf.hasBucketConfig() {
for i, v := range valuesEncoded {
if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] {
valuesBuf = append(valuesBuf, s)
continue
}
n := unmarshalUint64(v)
bufLen := len(buf)
buf = marshalUint64String(buf, n)
s = bytesutil.ToUnsafeString(buf[bufLen:])
valuesBuf = append(valuesBuf, s)
}
} else {
bucketSizeInt := uint64(bf.bucketSize)
if bucketSizeInt <= 0 {
bucketSizeInt = 1
}
bucketOffsetInt := uint64(int64(bf.bucketOffset))
nPrev := uint64(0)
for i, v := range valuesEncoded {
if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] {
valuesBuf = append(valuesBuf, s)
continue
}
n := unmarshalUint64(v)
n -= bucketOffsetInt
n -= n % bucketSizeInt
n += bucketOffsetInt
if i > 0 && nPrev == n {
valuesBuf = append(valuesBuf, s)
continue
}
nPrev = n
bufLen := len(buf)
buf = marshalUint64String(buf, n)
s = bytesutil.ToUnsafeString(buf[bufLen:])
valuesBuf = append(valuesBuf, s)
}
}
br.valuesBuf = valuesBuf
br.a.b = buf
return br.valuesBuf[valuesBufLen:]
}
func (br *blockResult) getBucketedFloat64Values(valuesEncoded []string, bf *byStatsField) []string {
buf := br.a.b
valuesBuf := br.valuesBuf
valuesBufLen := len(valuesBuf)
var s string
if !bf.hasBucketConfig() {
for i, v := range valuesEncoded {
if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] {
valuesBuf = append(valuesBuf, s)
continue
}
f := unmarshalFloat64(v)
bufLen := len(buf)
buf = marshalFloat64String(buf, f)
s = bytesutil.ToUnsafeString(buf[bufLen:])
valuesBuf = append(valuesBuf, s)
}
} else {
bucketSize := bf.bucketSize
if bucketSize <= 0 {
bucketSize = 1
}
_, e := decimal.FromFloat(bucketSize)
p10 := math.Pow10(int(-e))
bucketSizeP10 := int64(bucketSize * p10)
fPrev := float64(0)
for i, v := range valuesEncoded {
if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] {
valuesBuf = append(valuesBuf, s)
continue
}
f := unmarshalFloat64(v)
f -= bf.bucketOffset
// emulate f % bucketSize for float64 values
fP10 := int64(f * p10)
fP10 -= fP10 % bucketSizeP10
f = float64(fP10) / p10
f += bf.bucketOffset
if fPrev == f {
valuesBuf = append(valuesBuf, s)
continue
}
fPrev = f
bufLen := len(buf)
buf = marshalFloat64String(buf, f)
s = bytesutil.ToUnsafeString(buf[bufLen:])
valuesBuf = append(valuesBuf, s)
}
}
br.valuesBuf = valuesBuf
br.a.b = buf
return br.valuesBuf[valuesBufLen:]
}
func (br *blockResult) getBucketedIPv4Values(valuesEncoded []string, bf *byStatsField) []string {
buf := br.a.b
valuesBuf := br.valuesBuf
valuesBufLen := len(valuesBuf)
var s string
if !bf.hasBucketConfig() {
for i, v := range valuesEncoded {
if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] {
valuesBuf = append(valuesBuf, s)
continue
}
ip := unmarshalIPv4(v)
bufLen := len(buf)
buf = marshalIPv4String(buf, ip)
s = bytesutil.ToUnsafeString(buf[bufLen:])
valuesBuf = append(valuesBuf, s)
}
} else {
bucketSizeInt := uint32(bf.bucketSize)
if bucketSizeInt <= 0 {
bucketSizeInt = 1
}
bucketOffsetInt := uint32(int32(bf.bucketOffset))
nPrev := uint32(0)
for i, v := range valuesEncoded {
if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] {
valuesBuf = append(valuesBuf, s)
continue
}
n := unmarshalIPv4(v)
n -= bucketOffsetInt
n -= n % bucketSizeInt
n += bucketOffsetInt
if i > 0 && nPrev == n {
valuesBuf = append(valuesBuf, s)
continue
}
nPrev = n
bufLen := len(buf)
buf = marshalIPv4String(buf, n)
s = bytesutil.ToUnsafeString(buf[bufLen:])
valuesBuf = append(valuesBuf, s)
}
}
br.valuesBuf = valuesBuf
br.a.b = buf
return valuesBuf[valuesBufLen:]
}
func (br *blockResult) getBucketedTimestampISO8601Values(valuesEncoded []string, bf *byStatsField) []string {
buf := br.a.b
valuesBuf := br.valuesBuf
valuesBufLen := len(valuesBuf)
var s string
if !bf.hasBucketConfig() {
for i, v := range valuesEncoded {
if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] {
valuesBuf = append(valuesBuf, s)
continue
}
n := unmarshalTimestampISO8601(v)
bufLen := len(buf)
buf = marshalTimestampISO8601String(buf, n)
s = bytesutil.ToUnsafeString(buf[bufLen:])
valuesBuf = append(valuesBuf, s)
}
} else {
bucketSizeInt := int64(bf.bucketSize)
if bucketSizeInt <= 0 {
bucketSizeInt = 1
}
bucketOffsetInt := int64(bf.bucketOffset)
timestampPrev := int64(0)
bb := bbPool.Get()
for i, v := range valuesEncoded {
if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] {
valuesBuf = append(valuesBuf, s)
continue
}
timestamp := unmarshalTimestampISO8601(v)
timestamp -= bucketOffsetInt
if bf.bucketSizeStr == "month" {
timestamp = truncateTimestampToMonth(timestamp)
} else if bf.bucketSizeStr == "year" {
timestamp = truncateTimestampToYear(timestamp)
} else {
timestamp -= timestamp % bucketSizeInt
}
timestamp -= timestamp % bucketSizeInt
timestamp += bucketOffsetInt
if timestampPrev == timestamp {
valuesBuf = append(valuesBuf, s)
continue
}
timestampPrev = timestamp
bufLen := len(buf)
buf = marshalTimestampISO8601String(buf, timestamp)
s = bytesutil.ToUnsafeString(buf[bufLen:])
valuesBuf = append(valuesBuf, s)
}
bbPool.Put(bb)
}
br.valuesBuf = valuesBuf
br.a.b = buf
return valuesBuf[valuesBufLen:]
}
// getBucketedValue returns bucketed s according to the given bf
func (br *blockResult) getBucketedValue(s string, bf *byStatsField) string {
if !bf.hasBucketConfig() {
return s
}
if len(s) == 0 {
return s
}
c := s[0]
if (c < '0' || c > '9') && c != '-' {
// Fast path - the value cannot be bucketed, since it starts with unexpected chars.
return s
}
if f, ok := tryParseFloat64(s); ok {
bucketSize := bf.bucketSize
if bucketSize <= 0 {
bucketSize = 1
}
f -= bf.bucketOffset
// emulate f % bucketSize for float64 values
_, e := decimal.FromFloat(bucketSize)
p10 := math.Pow10(int(-e))
fP10 := int64(f * p10)
fP10 -= fP10 % int64(bucketSize*p10)
f = float64(fP10) / p10
f += bf.bucketOffset
buf := br.a.b
bufLen := len(buf)
buf = marshalFloat64String(buf, f)
br.a.b = buf
return bytesutil.ToUnsafeString(buf[bufLen:])
}
// There is no need in calling tryParseTimestampISO8601 here, since TryParseTimestampRFC3339Nano
// should successfully parse ISO8601 timestamps.
if timestamp, ok := TryParseTimestampRFC3339Nano(s); ok {
bucketSizeInt := int64(bf.bucketSize)
if bucketSizeInt <= 0 {
bucketSizeInt = 1
}
bucketOffset := int64(bf.bucketOffset)
timestamp -= bucketOffset
if bf.bucketSizeStr == "month" {
timestamp = truncateTimestampToMonth(timestamp)
} else if bf.bucketSizeStr == "year" {
timestamp = truncateTimestampToYear(timestamp)
} else {
timestamp -= timestamp % bucketSizeInt
}
timestamp += bucketOffset
buf := br.a.b
bufLen := len(buf)
buf = marshalTimestampRFC3339NanoString(buf, timestamp)
br.a.b = buf
return bytesutil.ToUnsafeString(buf[bufLen:])
}
if n, ok := tryParseIPv4(s); ok {
bucketSizeInt := uint32(bf.bucketSize)
if bucketSizeInt <= 0 {
bucketSizeInt = 1
}
bucketOffset := uint32(int32(bf.bucketOffset))
n -= bucketOffset
n -= n % bucketSizeInt
n += bucketOffset
buf := br.a.b
bufLen := len(buf)
buf = marshalIPv4String(buf, n)
br.a.b = buf
return bytesutil.ToUnsafeString(buf[bufLen:])
}
if nsecs, ok := tryParseDuration(s); ok {
bucketSizeInt := int64(bf.bucketSize)
if bucketSizeInt <= 0 {
bucketSizeInt = 1
}
bucketOffset := int64(bf.bucketOffset)
nsecs -= bucketOffset
nsecs -= nsecs % bucketSizeInt
nsecs += bucketOffset
buf := br.a.b
bufLen := len(buf)
buf = marshalDurationString(buf, nsecs)
br.a.b = buf
return bytesutil.ToUnsafeString(buf[bufLen:])
}
// Couldn't parse s, so return it as is.
return s
}
// copyColumns copies columns from srcColumnNames to dstColumnNames.
func (br *blockResult) copyColumns(srcColumnNames, dstColumnNames []string) {
for i, srcName := range srcColumnNames {
br.copySingleColumn(srcName, dstColumnNames[i])
}
}
func (br *blockResult) copySingleColumn(srcName, dstName string) {
found := false
cs := br.getColumns()
csBufLen := len(br.csBuf)
for _, c := range cs {
if c.name != dstName {
br.csBuf = append(br.csBuf, *c)
}
if c.name == srcName {
cCopy := *c
cCopy.name = dstName
br.csBuf = append(br.csBuf, cCopy)
found = true
}
}
if !found {
br.addConstColumn(dstName, "")
}
br.csBuf = append(br.csBuf[:0], br.csBuf[csBufLen:]...)
br.csInitialized = false
}
// renameColumns renames columns from srcColumnNames to dstColumnNames.
func (br *blockResult) renameColumns(srcColumnNames, dstColumnNames []string) {
for i, srcName := range srcColumnNames {
br.renameSingleColumn(srcName, dstColumnNames[i])
}
}
func (br *blockResult) renameSingleColumn(srcName, dstName string) {
found := false
cs := br.getColumns()
csBufLen := len(br.csBuf)
for _, c := range cs {
if c.name == srcName {
cCopy := *c
cCopy.name = dstName
br.csBuf = append(br.csBuf, cCopy)
found = true
} else if c.name != dstName {
br.csBuf = append(br.csBuf, *c)
}
}
if !found {
br.addConstColumn(dstName, "")
}
br.csBuf = append(br.csBuf[:0], br.csBuf[csBufLen:]...)
br.csInitialized = false
}
// deleteColumns deletes columns with the given columnNames.
func (br *blockResult) deleteColumns(columnNames []string) {
if len(columnNames) == 0 {
return
}
cs := br.getColumns()
csBufLen := len(br.csBuf)
for _, c := range cs {
if !slices.Contains(columnNames, c.name) {
br.csBuf = append(br.csBuf, *c)
}
}
br.csBuf = append(br.csBuf[:0], br.csBuf[csBufLen:]...)
br.csInitialized = false
}
// setColumns sets the resulting columns to the given columnNames.
func (br *blockResult) setColumns(columnNames []string) {
if br.areSameColumns(columnNames) {
// Fast path - nothing to change.
return
}
// Slow path - construct the requested columns
cs := br.getColumns()
csBufLen := len(br.csBuf)
for _, c := range cs {
if slices.Contains(columnNames, c.name) {
br.csBuf = append(br.csBuf, *c)
}
}
for _, columnName := range columnNames {
if idx := getBlockResultColumnIdxByName(cs, columnName); idx < 0 {
br.addConstColumn(columnName, "")
}
}
br.csBuf = append(br.csBuf[:0], br.csBuf[csBufLen:]...)
br.csInitialized = false
}
func (br *blockResult) areSameColumns(columnNames []string) bool {
cs := br.getColumns()
if len(cs) != len(columnNames) {
return false
}
for i, c := range cs {
if c.name != columnNames[i] {
return false
}
}
return true
}
func (br *blockResult) getColumnByName(columnName string) *blockResultColumn {
if columnName == "" {
columnName = "_msg"
}
cs := br.getColumns()
idx := getBlockResultColumnIdxByName(cs, columnName)
if idx >= 0 {
return cs[idx]
}
// Search for empty column with the given name
csEmpty := br.csEmpty
for i := range csEmpty {
if csEmpty[i].name == columnName {
return &csEmpty[i]
}
}
// Create missing empty column
br.csEmpty = append(br.csEmpty, blockResultColumn{
name: br.a.copyString(columnName),
isConst: true,
valuesEncoded: getEmptyStrings(1),
})
return &br.csEmpty[len(br.csEmpty)-1]
}
func (br *blockResult) getColumns() []*blockResultColumn {
if !br.csInitialized {
br.csInit()
}
return br.cs
}
func (br *blockResult) csInit() {
csBuf := br.csBuf
clear(br.cs)
cs := br.cs[:0]
for i := range csBuf {
c := &csBuf[i]
idx := getBlockResultColumnIdxByName(cs, c.name)
if idx >= 0 {
cs[idx] = c
} else {
cs = append(cs, c)
}
}
br.cs = cs
br.csInitialized = true
}
func (br *blockResult) csInitFast() {
csBuf := br.csBuf
clear(br.cs)
cs := slicesutil.SetLength(br.cs, len(csBuf))
for i := range csBuf {
cs[i] = &csBuf[i]
}
br.cs = cs
br.csInitialized = true
}
func getBlockResultColumnIdxByName(cs []*blockResultColumn, name string) int {
for i, c := range cs {
if c.name == name {
return i
}
}
return -1
}
func (br *blockResult) skipRows(skipRows int) {
br.timestamps = append(br.timestamps[:0], br.timestamps[skipRows:]...)
for _, c := range br.getColumns() {
if c.values != nil {
c.values = append(c.values[:0], c.values[skipRows:]...)
}
if c.isConst {
continue
}
valuesEncoded := c.getValuesEncoded(br)
if valuesEncoded != nil {
c.valuesEncoded = append(valuesEncoded[:0], valuesEncoded[skipRows:]...)
}
if c.valuesBucketed != nil {
c.valuesBucketed = append(c.valuesBucketed[:0], c.valuesBucketed[skipRows:]...)
}
}
}
func (br *blockResult) truncateRows(keepRows int) {
br.timestamps = br.timestamps[:keepRows]
for _, c := range br.getColumns() {
if c.values != nil {
c.values = c.values[:keepRows]
}
if c.isConst {
continue
}
valuesEncoded := c.getValuesEncoded(br)
if valuesEncoded != nil {
c.valuesEncoded = valuesEncoded[:keepRows]
}
if c.valuesBucketed != nil {
c.valuesBucketed = append(c.valuesBucketed[:0], c.valuesBucketed[keepRows:]...)
}
}
}
// blockResultColumn represents named column from blockResult.
//
// blockResultColumn doesn't own any referred data - all the referred data must be owned by blockResult.
// This simplifies copying, resetting and re-using of the struct.
type blockResultColumn struct {
// name is column name
name string
// isConst is set to true if the column is const.
//
// The column value is stored in valuesEncoded[0]
isConst bool
// isTime is set to true if the column contains _time values.
//
// The column values are stored in blockResult.timestamps, while valuesEncoded is nil.
isTime bool
// valueType is the type of non-cost value
valueType valueType
// minValue is the minimum encoded value for uint*, ipv4, timestamp and float64 value
//
// It is used for fast detection of whether the given column contains values in the given range
minValue uint64
// maxValue is the maximum encoded value for uint*, ipv4, timestamp and float64 value
//
// It is used for fast detection of whether the given column contains values in the given range
maxValue uint64
// dictValues contains dict values for valueType=valueTypeDict.
dictValues []string
// valuesEncoded contains encoded values for non-const and non-time column after getValuesEncoded() call
valuesEncoded []string
// values contains decoded values after getValues() call
values []string
// valuesBucketed contains values after getValuesBucketed() call
valuesBucketed []string
// valuesEncodedCreator must return valuesEncoded.
//
// This interface must be set for non-const and non-time columns if valuesEncoded field isn't set.
valuesEncodedCreator columnValuesEncodedCreator
// bucketSizeStr contains bucketSizeStr for valuesBucketed
bucketSizeStr string
// bucketOffsetStr contains bucketOffset for valuesBucketed
bucketOffsetStr string
}
// columnValuesEncodedCreator must return encoded values for the current column.
type columnValuesEncodedCreator interface {
newValuesEncoded(br *blockResult) []string
}
// clone returns a clone of c backed by data from br.
//
// It is expected that c.valuesEncoded is already initialized for non-time column.
//
// The clone is valid until br is reset.
func (c *blockResultColumn) clone(br *blockResult) blockResultColumn {
var cNew blockResultColumn
cNew.name = br.a.copyString(c.name)
cNew.isConst = c.isConst
cNew.isTime = c.isTime
cNew.valueType = c.valueType
cNew.minValue = c.minValue
cNew.maxValue = c.maxValue
cNew.dictValues = br.cloneValues(c.dictValues)
if !c.isTime && c.valuesEncoded == nil {
logger.Panicf("BUG: valuesEncoded must be non-nil for non-time column %q; isConst=%v; valueType=%d", c.name, c.isConst, c.valueType)
}
cNew.valuesEncoded = br.cloneValues(c.valuesEncoded)
if c.valueType != valueTypeString {
cNew.values = br.cloneValues(c.values)
}
cNew.valuesBucketed = br.cloneValues(c.valuesBucketed)
// Do not copy c.valuesEncodedCreator, since it may refer to data, which may change over time.
// We already copied c.valuesEncoded, so cNew.valuesEncodedCreator must be nil.
cNew.bucketSizeStr = c.bucketSizeStr
cNew.bucketOffsetStr = c.bucketOffsetStr
return cNew
}
func (c *blockResultColumn) neededBackingBufLen() int {
n := len(c.name)
n += valuesSizeBytes(c.dictValues)
n += valuesSizeBytes(c.valuesEncoded)
if c.valueType != valueTypeString {
n += valuesSizeBytes(c.values)
}
n += valuesSizeBytes(c.valuesBucketed)
return n
}
func (c *blockResultColumn) neededBackingValuesBufLen() int {
n := 0
n += len(c.dictValues)
n += len(c.valuesEncoded)
if c.valueType != valueTypeString {
n += len(c.values)
}
n += len(c.valuesBucketed)
return n
}
func valuesSizeBytes(values []string) int {
n := 0
for _, v := range values {
n += len(v)
}
return n
}
// getValueAtRow returns value for the value at the given rowIdx.
//
// The returned value is valid until br.reset() is called.
func (c *blockResultColumn) getValueAtRow(br *blockResult, rowIdx int) string {
if c.isConst {
// Fast path for const column.
return c.valuesEncoded[0]
}
if c.values != nil {
// Fast path, which avoids call overhead for getValues().
return c.values[rowIdx]
}
// Slow path - decode all the values and return the given value.
values := c.getValues(br)
return values[rowIdx]
}
// getValuesBucketed returns values for the given column, bucketed according to bf.
//
// The returned values are valid until br.reset() is called.
func (c *blockResultColumn) getValuesBucketed(br *blockResult, bf *byStatsField) []string {
if !bf.hasBucketConfig() {
return c.getValues(br)
}
if values := c.valuesBucketed; values != nil && c.bucketSizeStr == bf.bucketSizeStr && c.bucketOffsetStr == bf.bucketOffsetStr {
return values
}
c.valuesBucketed = br.newValuesBucketedForColumn(c, bf)
c.bucketSizeStr = bf.bucketSizeStr
c.bucketOffsetStr = bf.bucketOffsetStr
return c.valuesBucketed
}
// getValues returns values for the given column.
//
// The returned values are valid until br.reset() is called.
func (c *blockResultColumn) getValues(br *blockResult) []string {
if values := c.values; values != nil {
return values
}
c.values = br.newValuesBucketedForColumn(c, zeroByStatsField)
return c.values
}
// getValuesEncoded returns encoded values for the given column.
//
// The returned values are valid until br.reset() is called.
func (c *blockResultColumn) getValuesEncoded(br *blockResult) []string {
if c.isTime {
return nil
}
if c.valuesEncoded == nil {
c.valuesEncoded = c.valuesEncodedCreator.newValuesEncoded(br)
}
return c.valuesEncoded
}
func (c *blockResultColumn) getFloatValueAtRow(br *blockResult, rowIdx int) (float64, bool) {
if c.isConst {
v := c.valuesEncoded[0]
return tryParseFloat64(v)
}
if c.isTime {
return 0, false
}
valuesEncoded := c.getValuesEncoded(br)
switch c.valueType {
case valueTypeString:
v := valuesEncoded[rowIdx]
return tryParseFloat64(v)
case valueTypeDict:
dictIdx := valuesEncoded[rowIdx][0]
v := c.dictValues[dictIdx]
return tryParseFloat64(v)
case valueTypeUint8:
v := valuesEncoded[rowIdx]
return float64(unmarshalUint8(v)), true
case valueTypeUint16:
v := valuesEncoded[rowIdx]
return float64(unmarshalUint16(v)), true
case valueTypeUint32:
v := valuesEncoded[rowIdx]
return float64(unmarshalUint32(v)), true
case valueTypeUint64:
v := valuesEncoded[rowIdx]
return float64(unmarshalUint64(v)), true
case valueTypeFloat64:
v := valuesEncoded[rowIdx]
f := unmarshalFloat64(v)
return f, !math.IsNaN(f)
case valueTypeIPv4:
return 0, false
case valueTypeTimestampISO8601:
return 0, false
default:
logger.Panicf("BUG: unknown valueType=%d", c.valueType)
return 0, false
}
}
func (c *blockResultColumn) sumLenValues(br *blockResult) uint64 {
if c.isConst {
v := c.valuesEncoded[0]
return uint64(len(v)) * uint64(len(br.timestamps))
}
if c.isTime {
return uint64(len(time.RFC3339Nano)) * uint64(len(br.timestamps))
}
switch c.valueType {
case valueTypeString:
return c.sumLenStringValues(br)
case valueTypeDict:
n := uint64(0)
dictValues := c.dictValues
for _, v := range c.getValuesEncoded(br) {
idx := v[0]
v := dictValues[idx]
n += uint64(len(v))
}
return n
case valueTypeUint8:
return c.sumLenStringValues(br)
case valueTypeUint16:
return c.sumLenStringValues(br)
case valueTypeUint32:
return c.sumLenStringValues(br)
case valueTypeUint64:
return c.sumLenStringValues(br)
case valueTypeFloat64:
return c.sumLenStringValues(br)
case valueTypeIPv4:
return c.sumLenStringValues(br)
case valueTypeTimestampISO8601:
return uint64(len(iso8601Timestamp)) * uint64(len(br.timestamps))
default:
logger.Panicf("BUG: unknown valueType=%d", c.valueType)
return 0
}
}
func (c *blockResultColumn) sumLenStringValues(br *blockResult) uint64 {
n := uint64(0)
for _, v := range c.getValues(br) {
n += uint64(len(v))
}
return n
}
func (c *blockResultColumn) sumValues(br *blockResult) (float64, int) {
if c.isConst {
v := c.valuesEncoded[0]
f, ok := tryParseFloat64(v)
if !ok {
return 0, 0
}
return f * float64(len(br.timestamps)), len(br.timestamps)
}
if c.isTime {
return 0, 0
}
switch c.valueType {
case valueTypeString:
sum := float64(0)
count := 0
f := float64(0)
ok := false
values := c.getValuesEncoded(br)
for i := range values {
if i == 0 || values[i-1] != values[i] {
f, ok = tryParseNumber(values[i])
}
if ok {
sum += f
count++
}
}
return sum, count
case valueTypeDict:
dictValues := c.dictValues
a := encoding.GetFloat64s(len(dictValues))
dictValuesFloat := a.A
for i, v := range dictValues {
f, ok := tryParseNumber(v)
if !ok {
f = nan
}
dictValuesFloat[i] = f
}
sum := float64(0)
count := 0
for _, v := range c.getValuesEncoded(br) {
dictIdx := v[0]
f := dictValuesFloat[dictIdx]
if !math.IsNaN(f) {
sum += f
count++
}
}
encoding.PutFloat64s(a)
return sum, count
case valueTypeUint8:
sum := uint64(0)
for _, v := range c.getValuesEncoded(br) {
sum += uint64(unmarshalUint8(v))
}
return float64(sum), len(br.timestamps)
case valueTypeUint16:
sum := uint64(0)
for _, v := range c.getValuesEncoded(br) {
sum += uint64(unmarshalUint16(v))
}
return float64(sum), len(br.timestamps)
case valueTypeUint32:
sum := uint64(0)
for _, v := range c.getValuesEncoded(br) {
sum += uint64(unmarshalUint32(v))
}
return float64(sum), len(br.timestamps)
case valueTypeUint64:
sum := float64(0)
for _, v := range c.getValuesEncoded(br) {
sum += float64(unmarshalUint64(v))
}
return sum, len(br.timestamps)
case valueTypeFloat64:
sum := float64(0)
for _, v := range c.getValuesEncoded(br) {
f := unmarshalFloat64(v)
if !math.IsNaN(f) {
sum += f
}
}
return sum, len(br.timestamps)
case valueTypeIPv4:
return 0, 0
case valueTypeTimestampISO8601:
return 0, 0
default:
logger.Panicf("BUG: unknown valueType=%d", c.valueType)
return 0, 0
}
}
// resultColumn represents a column with result values.
//
// It doesn't own the result values.
type resultColumn struct {
// name is column name.
name string
// values is the result values.
values []string
}
func (rc *resultColumn) reset() {
rc.name = ""
rc.resetValues()
}
func (rc *resultColumn) resetValues() {
clear(rc.values)
rc.values = rc.values[:0]
}
func appendResultColumnWithName(dst []resultColumn, name string) []resultColumn {
dst = slicesutil.SetLength(dst, len(dst)+1)
rc := &dst[len(dst)-1]
rc.name = name
rc.resetValues()
return dst
}
// addValue adds the given values v to rc.
//
// rc is valid until v is modified.
func (rc *resultColumn) addValue(v string) {
rc.values = append(rc.values, v)
}
func truncateTimestampToMonth(timestamp int64) int64 {
t := time.Unix(0, timestamp).UTC()
return time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, time.UTC).UnixNano()
}
func truncateTimestampToYear(timestamp int64) int64 {
t := time.Unix(0, timestamp).UTC()
return time.Date(t.Year(), time.January, 1, 0, 0, 0, 0, time.UTC).UnixNano()
}
func getEmptyStrings(rowsCount int) []string {
p := emptyStrings.Load()
if p == nil {
values := make([]string, rowsCount)
emptyStrings.Store(&values)
return values
}
values := *p
return slicesutil.SetLength(values, rowsCount)
}
var emptyStrings atomic.Pointer[[]string]
func visitValuesReadonly(bs *blockSearch, ch *columnHeader, bm *bitmap, f func(value string)) {
if bm.isZero() {
// Fast path - nothing to visit
return
}
values := bs.getValuesForColumn(ch)
bm.forEachSetBitReadonly(func(idx int) {
f(values[idx])
})
}
func getCanonicalColumnName(columnName string) string {
if columnName == "" {
return "_msg"
}
return columnName
}
func tryParseNumber(s string) (float64, bool) {
if len(s) == 0 {
return 0, false
}
f, ok := tryParseFloat64(s)
if ok {
return f, true
}
nsecs, ok := tryParseDuration(s)
if ok {
return float64(nsecs), true
}
bytes, ok := tryParseBytes(s)
if ok {
return float64(bytes), true
}
if isLikelyNumber(s) {
f, err := strconv.ParseFloat(s, 64)
if err == nil {
return f, true
}
n, err := strconv.ParseInt(s, 0, 64)
if err == nil {
return float64(n), true
}
}
return 0, false
}
func isLikelyNumber(s string) bool {
if !isNumberPrefix(s) {
return false
}
if strings.Count(s, ".") > 1 {
// This is likely IP address
return false
}
if strings.IndexByte(s, ':') >= 0 || strings.Count(s, "-") > 2 {
// This is likely a timestamp
return false
}
return true
}
var nan = math.NaN()
var inf = math.Inf(1)