mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 15:16:42 +00:00
wip
This commit is contained in:
parent
5556dda356
commit
e954330e54
4 changed files with 249 additions and 147 deletions
|
@ -1280,6 +1280,19 @@ over the last 5 minutes:
|
||||||
_time:5m | stats by (_time:1m) count() logs_total, count_uniq(ip) ips_total
|
_time:5m | stats by (_time:1m) count() logs_total, count_uniq(ip) ips_total
|
||||||
```
|
```
|
||||||
|
|
||||||
|
Additionally, the following `step` values are supported:
|
||||||
|
|
||||||
|
- `nanosecond` - equals to `1ns` [duration](#duration-values).
|
||||||
|
- `microsecond` - equals to `1µs` [duration](#duration-values).
|
||||||
|
- `millisecond` - equals to `1ms` [duration](#duration-values).
|
||||||
|
- `second` - equals to `1s` [duration](#duration-values).
|
||||||
|
- `minute` - equals to `1m` [duration](#duration-values).
|
||||||
|
- `hour` - equalst to `1h` [duration](#duration-values).
|
||||||
|
- `day` - equals to `1d` [duration](#duration-values).
|
||||||
|
- `week` - equals to `1w` [duration](#duration-values).
|
||||||
|
- `month` - equals to one month. It properly takes into account the number of days per each month.
|
||||||
|
- `year` - equals to one year. It properly takes into account the number of days per each year.
|
||||||
|
|
||||||
#### Stats by time buckets with timezone offset
|
#### Stats by time buckets with timezone offset
|
||||||
|
|
||||||
VictoriaLogs stores [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) values as [Unix time](https://en.wikipedia.org/wiki/Unix_time)
|
VictoriaLogs stores [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) values as [Unix time](https://en.wikipedia.org/wiki/Unix_time)
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"math"
|
"math"
|
||||||
"slices"
|
"slices"
|
||||||
|
"time"
|
||||||
"unsafe"
|
"unsafe"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
|
@ -443,40 +444,40 @@ func (br *blockResult) addConstColumn(name, value string) {
|
||||||
br.csInitialized = false
|
br.csInitialized = false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (br *blockResult) getBucketedColumnValues(c *blockResultColumn, bucketSize, bucketOffset float64) []string {
|
func (br *blockResult) getBucketedColumnValues(c *blockResultColumn, bf *byStatsField) []string {
|
||||||
if c.isConst {
|
if c.isConst {
|
||||||
return br.getBucketedConstValues(c.encodedValues[0], bucketSize, bucketOffset)
|
return br.getBucketedConstValues(c.encodedValues[0], bf)
|
||||||
}
|
}
|
||||||
if c.isTime {
|
if c.isTime {
|
||||||
return br.getBucketedTimestampValues(bucketSize, bucketOffset)
|
return br.getBucketedTimestampValues(bf)
|
||||||
}
|
}
|
||||||
|
|
||||||
switch c.valueType {
|
switch c.valueType {
|
||||||
case valueTypeString:
|
case valueTypeString:
|
||||||
return br.getBucketedStringValues(c.encodedValues, bucketSize, bucketOffset)
|
return br.getBucketedStringValues(c.encodedValues, bf)
|
||||||
case valueTypeDict:
|
case valueTypeDict:
|
||||||
return br.getBucketedDictValues(c.encodedValues, c.dictValues, bucketSize, bucketOffset)
|
return br.getBucketedDictValues(c.encodedValues, c.dictValues, bf)
|
||||||
case valueTypeUint8:
|
case valueTypeUint8:
|
||||||
return br.getBucketedUint8Values(c.encodedValues, bucketSize, bucketOffset)
|
return br.getBucketedUint8Values(c.encodedValues, bf)
|
||||||
case valueTypeUint16:
|
case valueTypeUint16:
|
||||||
return br.getBucketedUint16Values(c.encodedValues, bucketSize, bucketOffset)
|
return br.getBucketedUint16Values(c.encodedValues, bf)
|
||||||
case valueTypeUint32:
|
case valueTypeUint32:
|
||||||
return br.getBucketedUint32Values(c.encodedValues, bucketSize, bucketOffset)
|
return br.getBucketedUint32Values(c.encodedValues, bf)
|
||||||
case valueTypeUint64:
|
case valueTypeUint64:
|
||||||
return br.getBucketedUint64Values(c.encodedValues, bucketSize, bucketOffset)
|
return br.getBucketedUint64Values(c.encodedValues, bf)
|
||||||
case valueTypeFloat64:
|
case valueTypeFloat64:
|
||||||
return br.getBucketedFloat64Values(c.encodedValues, bucketSize, bucketOffset)
|
return br.getBucketedFloat64Values(c.encodedValues, bf)
|
||||||
case valueTypeIPv4:
|
case valueTypeIPv4:
|
||||||
return br.getBucketedIPv4Values(c.encodedValues, bucketSize, bucketOffset)
|
return br.getBucketedIPv4Values(c.encodedValues, bf)
|
||||||
case valueTypeTimestampISO8601:
|
case valueTypeTimestampISO8601:
|
||||||
return br.getBucketedTimestampISO8601Values(c.encodedValues, bucketSize, bucketOffset)
|
return br.getBucketedTimestampISO8601Values(c.encodedValues, bf)
|
||||||
default:
|
default:
|
||||||
logger.Panicf("BUG: unknown valueType=%d", c.valueType)
|
logger.Panicf("BUG: unknown valueType=%d", c.valueType)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (br *blockResult) getBucketedConstValues(v string, bucketSize, bucketOffset float64) []string {
|
func (br *blockResult) getBucketedConstValues(v string, bf *byStatsField) []string {
|
||||||
if v == "" {
|
if v == "" {
|
||||||
// Fast path - return a slice of empty strings without constructing the slice.
|
// Fast path - return a slice of empty strings without constructing the slice.
|
||||||
return getEmptyStrings(len(br.timestamps))
|
return getEmptyStrings(len(br.timestamps))
|
||||||
|
@ -487,7 +488,7 @@ func (br *blockResult) getBucketedConstValues(v string, bucketSize, bucketOffset
|
||||||
valuesBuf := br.valuesBuf
|
valuesBuf := br.valuesBuf
|
||||||
valuesBufLen := len(valuesBuf)
|
valuesBufLen := len(valuesBuf)
|
||||||
|
|
||||||
v = br.getBucketedValue(v, bucketSize, bucketOffset)
|
v = br.getBucketedValue(v, bf)
|
||||||
for range br.timestamps {
|
for range br.timestamps {
|
||||||
valuesBuf = append(valuesBuf, v)
|
valuesBuf = append(valuesBuf, v)
|
||||||
}
|
}
|
||||||
|
@ -497,7 +498,7 @@ func (br *blockResult) getBucketedConstValues(v string, bucketSize, bucketOffset
|
||||||
return valuesBuf[valuesBufLen:]
|
return valuesBuf[valuesBufLen:]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (br *blockResult) getBucketedTimestampValues(bucketSize, bucketOffset float64) []string {
|
func (br *blockResult) getBucketedTimestampValues(bf *byStatsField) []string {
|
||||||
buf := br.buf
|
buf := br.buf
|
||||||
valuesBuf := br.valuesBuf
|
valuesBuf := br.valuesBuf
|
||||||
valuesBufLen := len(valuesBuf)
|
valuesBufLen := len(valuesBuf)
|
||||||
|
@ -505,7 +506,7 @@ func (br *blockResult) getBucketedTimestampValues(bucketSize, bucketOffset float
|
||||||
timestamps := br.timestamps
|
timestamps := br.timestamps
|
||||||
var s string
|
var s string
|
||||||
|
|
||||||
if bucketSize <= 1 && bucketOffset == 0 {
|
if !bf.hasBucketConfig() {
|
||||||
for i := range timestamps {
|
for i := range timestamps {
|
||||||
if i > 0 && timestamps[i-1] == timestamps[i] {
|
if i > 0 && timestamps[i-1] == timestamps[i] {
|
||||||
valuesBuf = append(valuesBuf, s)
|
valuesBuf = append(valuesBuf, s)
|
||||||
|
@ -518,9 +519,11 @@ func (br *blockResult) getBucketedTimestampValues(bucketSize, bucketOffset float
|
||||||
valuesBuf = append(valuesBuf, s)
|
valuesBuf = append(valuesBuf, s)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
bucketSizeInt := int64(bucketSize)
|
bucketSizeInt := int64(bf.bucketSize)
|
||||||
bucketOffsetInt := int64(bucketOffset)
|
if bucketSizeInt <= 0 {
|
||||||
var prevTimestamp int64
|
bucketSizeInt = 1
|
||||||
|
}
|
||||||
|
bucketOffsetInt := int64(bf.bucketOffset)
|
||||||
for i := range timestamps {
|
for i := range timestamps {
|
||||||
if i > 0 && timestamps[i-1] == timestamps[i] {
|
if i > 0 && timestamps[i-1] == timestamps[i] {
|
||||||
valuesBuf = append(valuesBuf, s)
|
valuesBuf = append(valuesBuf, s)
|
||||||
|
@ -529,14 +532,15 @@ func (br *blockResult) getBucketedTimestampValues(bucketSize, bucketOffset float
|
||||||
|
|
||||||
timestamp := timestamps[i]
|
timestamp := timestamps[i]
|
||||||
timestamp -= bucketOffsetInt
|
timestamp -= bucketOffsetInt
|
||||||
timestamp -= timestamp % bucketSizeInt
|
if bf.bucketSizeStr == "month" {
|
||||||
timestamp += bucketOffsetInt
|
timestamp = truncateTimestampToMonth(timestamp)
|
||||||
if i > 0 && timestamp == prevTimestamp {
|
} else if bf.bucketSizeStr == "year" {
|
||||||
valuesBuf = append(valuesBuf, s)
|
timestamp = truncateTimestampToYear(timestamp)
|
||||||
continue
|
} else {
|
||||||
|
timestamp -= timestamp % bucketSizeInt
|
||||||
}
|
}
|
||||||
|
timestamp += bucketOffsetInt
|
||||||
|
|
||||||
prevTimestamp = timestamp
|
|
||||||
bufLen := len(buf)
|
bufLen := len(buf)
|
||||||
buf = marshalTimestampRFC3339Nano(buf, timestamp)
|
buf = marshalTimestampRFC3339Nano(buf, timestamp)
|
||||||
s = bytesutil.ToUnsafeString(buf[bufLen:])
|
s = bytesutil.ToUnsafeString(buf[bufLen:])
|
||||||
|
@ -550,8 +554,8 @@ func (br *blockResult) getBucketedTimestampValues(bucketSize, bucketOffset float
|
||||||
return valuesBuf[valuesBufLen:]
|
return valuesBuf[valuesBufLen:]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (br *blockResult) getBucketedStringValues(values []string, bucketSize, bucketOffset float64) []string {
|
func (br *blockResult) getBucketedStringValues(values []string, bf *byStatsField) []string {
|
||||||
if bucketSize <= 0 && bucketOffset == 0 {
|
if !bf.hasBucketConfig() {
|
||||||
return values
|
return values
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -565,7 +569,7 @@ func (br *blockResult) getBucketedStringValues(values []string, bucketSize, buck
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
s = br.getBucketedValue(values[i], bucketSize, bucketOffset)
|
s = br.getBucketedValue(values[i], bf)
|
||||||
valuesBuf = append(valuesBuf, s)
|
valuesBuf = append(valuesBuf, s)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -574,11 +578,11 @@ func (br *blockResult) getBucketedStringValues(values []string, bucketSize, buck
|
||||||
return valuesBuf[valuesBufLen:]
|
return valuesBuf[valuesBufLen:]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (br *blockResult) getBucketedDictValues(encodedValues, dictValues []string, bucketSize, bucketOffset float64) []string {
|
func (br *blockResult) getBucketedDictValues(encodedValues, dictValues []string, bf *byStatsField) []string {
|
||||||
valuesBuf := br.valuesBuf
|
valuesBuf := br.valuesBuf
|
||||||
valuesBufLen := len(valuesBuf)
|
valuesBufLen := len(valuesBuf)
|
||||||
|
|
||||||
dictValues = br.getBucketedStringValues(dictValues, bucketSize, bucketOffset)
|
dictValues = br.getBucketedStringValues(dictValues, bf)
|
||||||
for _, v := range encodedValues {
|
for _, v := range encodedValues {
|
||||||
dictIdx := v[0]
|
dictIdx := v[0]
|
||||||
valuesBuf = append(valuesBuf, dictValues[dictIdx])
|
valuesBuf = append(valuesBuf, dictValues[dictIdx])
|
||||||
|
@ -589,14 +593,14 @@ func (br *blockResult) getBucketedDictValues(encodedValues, dictValues []string,
|
||||||
return valuesBuf[valuesBufLen:]
|
return valuesBuf[valuesBufLen:]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (br *blockResult) getBucketedUint8Values(encodedValues []string, bucketSize, bucketOffset float64) []string {
|
func (br *blockResult) getBucketedUint8Values(encodedValues []string, bf *byStatsField) []string {
|
||||||
buf := br.buf
|
buf := br.buf
|
||||||
valuesBuf := br.valuesBuf
|
valuesBuf := br.valuesBuf
|
||||||
valuesBufLen := len(valuesBuf)
|
valuesBufLen := len(valuesBuf)
|
||||||
|
|
||||||
var s string
|
var s string
|
||||||
|
|
||||||
if (bucketSize <= 1 || bucketSize >= (1<<8)) && bucketOffset == 0 {
|
if !bf.hasBucketConfig() {
|
||||||
for i, v := range encodedValues {
|
for i, v := range encodedValues {
|
||||||
if i > 0 && encodedValues[i-1] == encodedValues[i] {
|
if i > 0 && encodedValues[i-1] == encodedValues[i] {
|
||||||
valuesBuf = append(valuesBuf, s)
|
valuesBuf = append(valuesBuf, s)
|
||||||
|
@ -610,9 +614,12 @@ func (br *blockResult) getBucketedUint8Values(encodedValues []string, bucketSize
|
||||||
valuesBuf = append(valuesBuf, s)
|
valuesBuf = append(valuesBuf, s)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
bucketSizeInt := uint64(bucketSize)
|
bucketSizeInt := uint64(bf.bucketSize)
|
||||||
bucketOffsetInt := uint64(int64(bucketOffset))
|
if bucketSizeInt <= 0 {
|
||||||
var nPrev uint64
|
bucketSizeInt = 1
|
||||||
|
}
|
||||||
|
bucketOffsetInt := uint64(int64(bf.bucketOffset))
|
||||||
|
|
||||||
for i, v := range encodedValues {
|
for i, v := range encodedValues {
|
||||||
if i > 0 && encodedValues[i-1] == encodedValues[i] {
|
if i > 0 && encodedValues[i-1] == encodedValues[i] {
|
||||||
valuesBuf = append(valuesBuf, s)
|
valuesBuf = append(valuesBuf, s)
|
||||||
|
@ -623,12 +630,7 @@ func (br *blockResult) getBucketedUint8Values(encodedValues []string, bucketSize
|
||||||
n -= bucketOffsetInt
|
n -= bucketOffsetInt
|
||||||
n -= n % bucketSizeInt
|
n -= n % bucketSizeInt
|
||||||
n += bucketOffsetInt
|
n += bucketOffsetInt
|
||||||
if i > 0 && n == nPrev {
|
|
||||||
valuesBuf = append(valuesBuf, s)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
nPrev = n
|
|
||||||
bufLen := len(buf)
|
bufLen := len(buf)
|
||||||
buf = marshalUint64(buf, n)
|
buf = marshalUint64(buf, n)
|
||||||
s = bytesutil.ToUnsafeString(buf[bufLen:])
|
s = bytesutil.ToUnsafeString(buf[bufLen:])
|
||||||
|
@ -642,14 +644,14 @@ func (br *blockResult) getBucketedUint8Values(encodedValues []string, bucketSize
|
||||||
return br.valuesBuf[valuesBufLen:]
|
return br.valuesBuf[valuesBufLen:]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (br *blockResult) getBucketedUint16Values(encodedValues []string, bucketSize, bucketOffset float64) []string {
|
func (br *blockResult) getBucketedUint16Values(encodedValues []string, bf *byStatsField) []string {
|
||||||
buf := br.buf
|
buf := br.buf
|
||||||
valuesBuf := br.valuesBuf
|
valuesBuf := br.valuesBuf
|
||||||
valuesBufLen := len(valuesBuf)
|
valuesBufLen := len(valuesBuf)
|
||||||
|
|
||||||
var s string
|
var s string
|
||||||
|
|
||||||
if (bucketSize <= 1 || bucketSize >= (1<<16)) && bucketOffset == 0 {
|
if !bf.hasBucketConfig() {
|
||||||
for i, v := range encodedValues {
|
for i, v := range encodedValues {
|
||||||
if i > 0 && encodedValues[i-1] == encodedValues[i] {
|
if i > 0 && encodedValues[i-1] == encodedValues[i] {
|
||||||
valuesBuf = append(valuesBuf, s)
|
valuesBuf = append(valuesBuf, s)
|
||||||
|
@ -664,9 +666,12 @@ func (br *blockResult) getBucketedUint16Values(encodedValues []string, bucketSiz
|
||||||
valuesBuf = append(valuesBuf, s)
|
valuesBuf = append(valuesBuf, s)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
bucketSizeInt := uint64(bucketSize)
|
bucketSizeInt := uint64(bf.bucketSize)
|
||||||
bucketOffsetInt := uint64(int64(bucketOffset))
|
if bucketSizeInt <= 0 {
|
||||||
var nPrev uint64
|
bucketSizeInt = 1
|
||||||
|
}
|
||||||
|
bucketOffsetInt := uint64(int64(bf.bucketOffset))
|
||||||
|
|
||||||
for i, v := range encodedValues {
|
for i, v := range encodedValues {
|
||||||
if i > 0 && encodedValues[i-1] == encodedValues[i] {
|
if i > 0 && encodedValues[i-1] == encodedValues[i] {
|
||||||
valuesBuf = append(valuesBuf, s)
|
valuesBuf = append(valuesBuf, s)
|
||||||
|
@ -678,12 +683,7 @@ func (br *blockResult) getBucketedUint16Values(encodedValues []string, bucketSiz
|
||||||
n -= bucketOffsetInt
|
n -= bucketOffsetInt
|
||||||
n -= n % bucketSizeInt
|
n -= n % bucketSizeInt
|
||||||
n += bucketOffsetInt
|
n += bucketOffsetInt
|
||||||
if i > 0 && n == nPrev {
|
|
||||||
valuesBuf = append(valuesBuf, s)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
nPrev = n
|
|
||||||
bufLen := len(buf)
|
bufLen := len(buf)
|
||||||
buf = marshalUint64(buf, n)
|
buf = marshalUint64(buf, n)
|
||||||
s = bytesutil.ToUnsafeString(buf[bufLen:])
|
s = bytesutil.ToUnsafeString(buf[bufLen:])
|
||||||
|
@ -697,14 +697,14 @@ func (br *blockResult) getBucketedUint16Values(encodedValues []string, bucketSiz
|
||||||
return br.valuesBuf[valuesBufLen:]
|
return br.valuesBuf[valuesBufLen:]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (br *blockResult) getBucketedUint32Values(encodedValues []string, bucketSize, bucketOffset float64) []string {
|
func (br *blockResult) getBucketedUint32Values(encodedValues []string, bf *byStatsField) []string {
|
||||||
buf := br.buf
|
buf := br.buf
|
||||||
valuesBuf := br.valuesBuf
|
valuesBuf := br.valuesBuf
|
||||||
valuesBufLen := len(valuesBuf)
|
valuesBufLen := len(valuesBuf)
|
||||||
|
|
||||||
var s string
|
var s string
|
||||||
|
|
||||||
if (bucketSize <= 1 || bucketSize >= (1<<32)) && bucketOffset == 0 {
|
if !bf.hasBucketConfig() {
|
||||||
for i, v := range encodedValues {
|
for i, v := range encodedValues {
|
||||||
if i > 0 && encodedValues[i-1] == encodedValues[i] {
|
if i > 0 && encodedValues[i-1] == encodedValues[i] {
|
||||||
valuesBuf = append(valuesBuf, s)
|
valuesBuf = append(valuesBuf, s)
|
||||||
|
@ -719,9 +719,12 @@ func (br *blockResult) getBucketedUint32Values(encodedValues []string, bucketSiz
|
||||||
valuesBuf = append(valuesBuf, s)
|
valuesBuf = append(valuesBuf, s)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
bucketSizeInt := uint64(bucketSize)
|
bucketSizeInt := uint64(bf.bucketSize)
|
||||||
bucketOffsetInt := uint64(int64(bucketOffset))
|
if bucketSizeInt <= 0 {
|
||||||
var nPrev uint64
|
bucketSizeInt = 1
|
||||||
|
}
|
||||||
|
bucketOffsetInt := uint64(int64(bf.bucketOffset))
|
||||||
|
|
||||||
for i, v := range encodedValues {
|
for i, v := range encodedValues {
|
||||||
if i > 0 && encodedValues[i-1] == encodedValues[i] {
|
if i > 0 && encodedValues[i-1] == encodedValues[i] {
|
||||||
valuesBuf = append(valuesBuf, s)
|
valuesBuf = append(valuesBuf, s)
|
||||||
|
@ -733,12 +736,7 @@ func (br *blockResult) getBucketedUint32Values(encodedValues []string, bucketSiz
|
||||||
n -= bucketOffsetInt
|
n -= bucketOffsetInt
|
||||||
n -= n % bucketSizeInt
|
n -= n % bucketSizeInt
|
||||||
n += bucketOffsetInt
|
n += bucketOffsetInt
|
||||||
if i > 0 && n == nPrev {
|
|
||||||
valuesBuf = append(valuesBuf, s)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
nPrev = n
|
|
||||||
bufLen := len(buf)
|
bufLen := len(buf)
|
||||||
buf = marshalUint64(buf, n)
|
buf = marshalUint64(buf, n)
|
||||||
s = bytesutil.ToUnsafeString(buf[bufLen:])
|
s = bytesutil.ToUnsafeString(buf[bufLen:])
|
||||||
|
@ -752,14 +750,14 @@ func (br *blockResult) getBucketedUint32Values(encodedValues []string, bucketSiz
|
||||||
return br.valuesBuf[valuesBufLen:]
|
return br.valuesBuf[valuesBufLen:]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (br *blockResult) getBucketedUint64Values(encodedValues []string, bucketSize, bucketOffset float64) []string {
|
func (br *blockResult) getBucketedUint64Values(encodedValues []string, bf *byStatsField) []string {
|
||||||
buf := br.buf
|
buf := br.buf
|
||||||
valuesBuf := br.valuesBuf
|
valuesBuf := br.valuesBuf
|
||||||
valuesBufLen := len(valuesBuf)
|
valuesBufLen := len(valuesBuf)
|
||||||
|
|
||||||
var s string
|
var s string
|
||||||
|
|
||||||
if (bucketSize <= 1 || bucketSize >= (1<<64)) && bucketOffset == 0 {
|
if !bf.hasBucketConfig() {
|
||||||
for i, v := range encodedValues {
|
for i, v := range encodedValues {
|
||||||
if i > 0 && encodedValues[i-1] == encodedValues[i] {
|
if i > 0 && encodedValues[i-1] == encodedValues[i] {
|
||||||
valuesBuf = append(valuesBuf, s)
|
valuesBuf = append(valuesBuf, s)
|
||||||
|
@ -774,9 +772,12 @@ func (br *blockResult) getBucketedUint64Values(encodedValues []string, bucketSiz
|
||||||
valuesBuf = append(valuesBuf, s)
|
valuesBuf = append(valuesBuf, s)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
bucketSizeInt := uint64(bucketSize)
|
bucketSizeInt := uint64(bf.bucketSize)
|
||||||
bucketOffsetInt := uint64(int64(bucketOffset))
|
if bucketSizeInt <= 0 {
|
||||||
var nPrev uint64
|
bucketSizeInt = 1
|
||||||
|
}
|
||||||
|
bucketOffsetInt := uint64(int64(bf.bucketOffset))
|
||||||
|
|
||||||
for i, v := range encodedValues {
|
for i, v := range encodedValues {
|
||||||
if i > 0 && encodedValues[i-1] == encodedValues[i] {
|
if i > 0 && encodedValues[i-1] == encodedValues[i] {
|
||||||
valuesBuf = append(valuesBuf, s)
|
valuesBuf = append(valuesBuf, s)
|
||||||
|
@ -788,12 +789,7 @@ func (br *blockResult) getBucketedUint64Values(encodedValues []string, bucketSiz
|
||||||
n -= bucketOffsetInt
|
n -= bucketOffsetInt
|
||||||
n -= n % bucketSizeInt
|
n -= n % bucketSizeInt
|
||||||
n += bucketOffsetInt
|
n += bucketOffsetInt
|
||||||
if i > 0 && n == nPrev {
|
|
||||||
valuesBuf = append(valuesBuf, s)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
nPrev = n
|
|
||||||
bufLen := len(buf)
|
bufLen := len(buf)
|
||||||
buf = marshalUint64(buf, n)
|
buf = marshalUint64(buf, n)
|
||||||
s = bytesutil.ToUnsafeString(buf[bufLen:])
|
s = bytesutil.ToUnsafeString(buf[bufLen:])
|
||||||
|
@ -807,14 +803,14 @@ func (br *blockResult) getBucketedUint64Values(encodedValues []string, bucketSiz
|
||||||
return br.valuesBuf[valuesBufLen:]
|
return br.valuesBuf[valuesBufLen:]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (br *blockResult) getBucketedFloat64Values(encodedValues []string, bucketSize, bucketOffset float64) []string {
|
func (br *blockResult) getBucketedFloat64Values(encodedValues []string, bf *byStatsField) []string {
|
||||||
buf := br.buf
|
buf := br.buf
|
||||||
valuesBuf := br.valuesBuf
|
valuesBuf := br.valuesBuf
|
||||||
valuesBufLen := len(valuesBuf)
|
valuesBufLen := len(valuesBuf)
|
||||||
|
|
||||||
var s string
|
var s string
|
||||||
|
|
||||||
if bucketSize <= 0 && bucketOffset == 0 {
|
if !bf.hasBucketConfig() {
|
||||||
for i, v := range encodedValues {
|
for i, v := range encodedValues {
|
||||||
if i > 0 && encodedValues[i-1] == encodedValues[i] {
|
if i > 0 && encodedValues[i-1] == encodedValues[i] {
|
||||||
valuesBuf = append(valuesBuf, s)
|
valuesBuf = append(valuesBuf, s)
|
||||||
|
@ -831,10 +827,14 @@ func (br *blockResult) getBucketedFloat64Values(encodedValues []string, bucketSi
|
||||||
valuesBuf = append(valuesBuf, s)
|
valuesBuf = append(valuesBuf, s)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
bucketSize := bf.bucketSize
|
||||||
|
if bucketSize <= 0 {
|
||||||
|
bucketSize = 1
|
||||||
|
}
|
||||||
|
|
||||||
_, e := decimal.FromFloat(bucketSize)
|
_, e := decimal.FromFloat(bucketSize)
|
||||||
p10 := math.Pow10(int(-e))
|
p10 := math.Pow10(int(-e))
|
||||||
bucketSizeP10 := int64(bucketSize * p10)
|
bucketSizeP10 := int64(bucketSize * p10)
|
||||||
var fPrev float64
|
|
||||||
for i, v := range encodedValues {
|
for i, v := range encodedValues {
|
||||||
if i > 0 && encodedValues[i-1] == encodedValues[i] {
|
if i > 0 && encodedValues[i-1] == encodedValues[i] {
|
||||||
valuesBuf = append(valuesBuf, s)
|
valuesBuf = append(valuesBuf, s)
|
||||||
|
@ -845,21 +845,15 @@ func (br *blockResult) getBucketedFloat64Values(encodedValues []string, bucketSi
|
||||||
n := encoding.UnmarshalUint64(b)
|
n := encoding.UnmarshalUint64(b)
|
||||||
f := math.Float64frombits(n)
|
f := math.Float64frombits(n)
|
||||||
|
|
||||||
f -= bucketOffset
|
f -= bf.bucketOffset
|
||||||
|
|
||||||
// emulate f % bucketSize for float64 values
|
// emulate f % bucketSize for float64 values
|
||||||
fP10 := int64(f * p10)
|
fP10 := int64(f * p10)
|
||||||
fP10 -= fP10 % bucketSizeP10
|
fP10 -= fP10 % bucketSizeP10
|
||||||
f = float64(fP10) / p10
|
f = float64(fP10) / p10
|
||||||
|
|
||||||
f += bucketOffset
|
f += bf.bucketOffset
|
||||||
|
|
||||||
if i > 0 && f == fPrev {
|
|
||||||
valuesBuf = append(valuesBuf, s)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
fPrev = f
|
|
||||||
bufLen := len(buf)
|
bufLen := len(buf)
|
||||||
buf = marshalFloat64(buf, f)
|
buf = marshalFloat64(buf, f)
|
||||||
s = bytesutil.ToUnsafeString(buf[bufLen:])
|
s = bytesutil.ToUnsafeString(buf[bufLen:])
|
||||||
|
@ -873,14 +867,14 @@ func (br *blockResult) getBucketedFloat64Values(encodedValues []string, bucketSi
|
||||||
return br.valuesBuf[valuesBufLen:]
|
return br.valuesBuf[valuesBufLen:]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (br *blockResult) getBucketedIPv4Values(encodedValues []string, bucketSize, bucketOffset float64) []string {
|
func (br *blockResult) getBucketedIPv4Values(encodedValues []string, bf *byStatsField) []string {
|
||||||
buf := br.buf
|
buf := br.buf
|
||||||
valuesBuf := br.valuesBuf
|
valuesBuf := br.valuesBuf
|
||||||
valuesBufLen := len(valuesBuf)
|
valuesBufLen := len(valuesBuf)
|
||||||
|
|
||||||
var s string
|
var s string
|
||||||
|
|
||||||
if bucketSize <= 1 && bucketOffset == 0 {
|
if !bf.hasBucketConfig() {
|
||||||
for i, v := range encodedValues {
|
for i, v := range encodedValues {
|
||||||
if i > 0 && encodedValues[i-1] == encodedValues[i] {
|
if i > 0 && encodedValues[i-1] == encodedValues[i] {
|
||||||
valuesBuf = append(valuesBuf, s)
|
valuesBuf = append(valuesBuf, s)
|
||||||
|
@ -893,9 +887,12 @@ func (br *blockResult) getBucketedIPv4Values(encodedValues []string, bucketSize,
|
||||||
valuesBuf = append(valuesBuf, s)
|
valuesBuf = append(valuesBuf, s)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
bucketSizeInt := uint32(bucketSize)
|
bucketSizeInt := uint32(bf.bucketSize)
|
||||||
bucketOffsetInt := uint32(int32(bucketOffset))
|
if bucketSizeInt <= 0 {
|
||||||
var nPrev uint32
|
bucketSizeInt = 1
|
||||||
|
}
|
||||||
|
bucketOffsetInt := uint32(int32(bf.bucketOffset))
|
||||||
|
|
||||||
for i, v := range encodedValues {
|
for i, v := range encodedValues {
|
||||||
if i > 0 && encodedValues[i-1] == encodedValues[i] {
|
if i > 0 && encodedValues[i-1] == encodedValues[i] {
|
||||||
valuesBuf = append(valuesBuf, s)
|
valuesBuf = append(valuesBuf, s)
|
||||||
|
@ -907,12 +904,7 @@ func (br *blockResult) getBucketedIPv4Values(encodedValues []string, bucketSize,
|
||||||
n -= bucketOffsetInt
|
n -= bucketOffsetInt
|
||||||
n -= n % bucketSizeInt
|
n -= n % bucketSizeInt
|
||||||
n += bucketOffsetInt
|
n += bucketOffsetInt
|
||||||
if i > 0 && n == nPrev {
|
|
||||||
valuesBuf = append(valuesBuf, s)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
nPrev = n
|
|
||||||
bufLen := len(buf)
|
bufLen := len(buf)
|
||||||
buf = marshalIPv4(buf, n)
|
buf = marshalIPv4(buf, n)
|
||||||
s = bytesutil.ToUnsafeString(buf[bufLen:])
|
s = bytesutil.ToUnsafeString(buf[bufLen:])
|
||||||
|
@ -926,14 +918,14 @@ func (br *blockResult) getBucketedIPv4Values(encodedValues []string, bucketSize,
|
||||||
return valuesBuf[valuesBufLen:]
|
return valuesBuf[valuesBufLen:]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (br *blockResult) getBucketedTimestampISO8601Values(encodedValues []string, bucketSize, bucketOffset float64) []string {
|
func (br *blockResult) getBucketedTimestampISO8601Values(encodedValues []string, bf *byStatsField) []string {
|
||||||
buf := br.buf
|
buf := br.buf
|
||||||
valuesBuf := br.valuesBuf
|
valuesBuf := br.valuesBuf
|
||||||
valuesBufLen := len(valuesBuf)
|
valuesBufLen := len(valuesBuf)
|
||||||
|
|
||||||
var s string
|
var s string
|
||||||
|
|
||||||
if bucketSize <= 1 && bucketOffset == 0 {
|
if !bf.hasBucketConfig() {
|
||||||
for i, v := range encodedValues {
|
for i, v := range encodedValues {
|
||||||
if i > 0 && encodedValues[i-1] == encodedValues[i] {
|
if i > 0 && encodedValues[i-1] == encodedValues[i] {
|
||||||
valuesBuf = append(valuesBuf, s)
|
valuesBuf = append(valuesBuf, s)
|
||||||
|
@ -949,9 +941,12 @@ func (br *blockResult) getBucketedTimestampISO8601Values(encodedValues []string,
|
||||||
valuesBuf = append(valuesBuf, s)
|
valuesBuf = append(valuesBuf, s)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
bucketSizeInt := uint64(bucketSize)
|
bucketSizeInt := int64(bf.bucketSize)
|
||||||
bucketOffsetInt := uint64(int64(bucketOffset))
|
if bucketSizeInt <= 0 {
|
||||||
var nPrev uint64
|
bucketSizeInt = 1
|
||||||
|
}
|
||||||
|
bucketOffsetInt := int64(bf.bucketOffset)
|
||||||
|
|
||||||
bb := bbPool.Get()
|
bb := bbPool.Get()
|
||||||
for i, v := range encodedValues {
|
for i, v := range encodedValues {
|
||||||
if i > 0 && encodedValues[i-1] == encodedValues[i] {
|
if i > 0 && encodedValues[i-1] == encodedValues[i] {
|
||||||
|
@ -960,18 +955,20 @@ func (br *blockResult) getBucketedTimestampISO8601Values(encodedValues []string,
|
||||||
}
|
}
|
||||||
|
|
||||||
b := bytesutil.ToUnsafeBytes(v)
|
b := bytesutil.ToUnsafeBytes(v)
|
||||||
n := encoding.UnmarshalUint64(b)
|
timestamp := int64(encoding.UnmarshalUint64(b))
|
||||||
n -= bucketOffsetInt
|
timestamp -= bucketOffsetInt
|
||||||
n -= n % bucketSizeInt
|
if bf.bucketSizeStr == "month" {
|
||||||
n += bucketOffsetInt
|
timestamp = truncateTimestampToMonth(timestamp)
|
||||||
if i > 0 && n == nPrev {
|
} else if bf.bucketSizeStr == "year" {
|
||||||
valuesBuf = append(valuesBuf, s)
|
timestamp = truncateTimestampToYear(timestamp)
|
||||||
continue
|
} else {
|
||||||
|
timestamp -= timestamp % bucketSizeInt
|
||||||
}
|
}
|
||||||
|
timestamp -= timestamp % bucketSizeInt
|
||||||
|
timestamp += bucketOffsetInt
|
||||||
|
|
||||||
nPrev = n
|
|
||||||
bufLen := len(buf)
|
bufLen := len(buf)
|
||||||
buf = marshalTimestampISO8601(buf, int64(n))
|
buf = marshalTimestampISO8601(buf, int64(timestamp))
|
||||||
s = bytesutil.ToUnsafeString(buf[bufLen:])
|
s = bytesutil.ToUnsafeString(buf[bufLen:])
|
||||||
valuesBuf = append(valuesBuf, s)
|
valuesBuf = append(valuesBuf, s)
|
||||||
}
|
}
|
||||||
|
@ -984,9 +981,9 @@ func (br *blockResult) getBucketedTimestampISO8601Values(encodedValues []string,
|
||||||
return valuesBuf[valuesBufLen:]
|
return valuesBuf[valuesBufLen:]
|
||||||
}
|
}
|
||||||
|
|
||||||
// getBucketedValue returns bucketed s according to the given bucketSize and bucketOffset
|
// getBucketedValue returns bucketed s according to the given bf
|
||||||
func (br *blockResult) getBucketedValue(s string, bucketSize, bucketOffset float64) string {
|
func (br *blockResult) getBucketedValue(s string, bf *byStatsField) string {
|
||||||
if bucketSize <= 0 && bucketOffset == 0 {
|
if !bf.hasBucketConfig() {
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
if len(s) == 0 {
|
if len(s) == 0 {
|
||||||
|
@ -1000,7 +997,12 @@ func (br *blockResult) getBucketedValue(s string, bucketSize, bucketOffset float
|
||||||
}
|
}
|
||||||
|
|
||||||
if f, ok := tryParseFloat64(s); ok {
|
if f, ok := tryParseFloat64(s); ok {
|
||||||
f -= bucketOffset
|
bucketSize := bf.bucketSize
|
||||||
|
if bucketSize <= 0 {
|
||||||
|
bucketSize = 1
|
||||||
|
}
|
||||||
|
|
||||||
|
f -= bf.bucketOffset
|
||||||
|
|
||||||
// emulate f % bucketSize for float64 values
|
// emulate f % bucketSize for float64 values
|
||||||
_, e := decimal.FromFloat(bucketSize)
|
_, e := decimal.FromFloat(bucketSize)
|
||||||
|
@ -1009,44 +1011,84 @@ func (br *blockResult) getBucketedValue(s string, bucketSize, bucketOffset float
|
||||||
fP10 -= fP10 % int64(bucketSize*p10)
|
fP10 -= fP10 % int64(bucketSize*p10)
|
||||||
f = float64(fP10) / p10
|
f = float64(fP10) / p10
|
||||||
|
|
||||||
f += bucketOffset
|
f += bf.bucketOffset
|
||||||
|
|
||||||
bufLen := len(br.buf)
|
bufLen := len(br.buf)
|
||||||
br.buf = marshalFloat64(br.buf, f)
|
br.buf = marshalFloat64(br.buf, f)
|
||||||
return bytesutil.ToUnsafeString(br.buf[bufLen:])
|
return bytesutil.ToUnsafeString(br.buf[bufLen:])
|
||||||
}
|
}
|
||||||
|
|
||||||
if nsecs, ok := tryParseTimestampISO8601(s); ok {
|
if timestamp, ok := tryParseTimestampISO8601(s); ok {
|
||||||
nsecs -= int64(bucketOffset)
|
bucketSizeInt := int64(bf.bucketSize)
|
||||||
nsecs -= nsecs % int64(bucketSize)
|
if bucketSizeInt <= 0 {
|
||||||
nsecs += int64(bucketOffset)
|
bucketSizeInt = 1
|
||||||
|
}
|
||||||
|
bucketOffset := int64(bf.bucketOffset)
|
||||||
|
|
||||||
|
timestamp -= bucketOffset
|
||||||
|
if bf.bucketSizeStr == "month" {
|
||||||
|
timestamp = truncateTimestampToMonth(timestamp)
|
||||||
|
} else if bf.bucketSizeStr == "year" {
|
||||||
|
timestamp = truncateTimestampToYear(timestamp)
|
||||||
|
} else {
|
||||||
|
timestamp -= timestamp % bucketSizeInt
|
||||||
|
}
|
||||||
|
timestamp += bucketOffset
|
||||||
|
|
||||||
bufLen := len(br.buf)
|
bufLen := len(br.buf)
|
||||||
br.buf = marshalTimestampISO8601(br.buf, nsecs)
|
br.buf = marshalTimestampISO8601(br.buf, timestamp)
|
||||||
return bytesutil.ToUnsafeString(br.buf[bufLen:])
|
return bytesutil.ToUnsafeString(br.buf[bufLen:])
|
||||||
}
|
}
|
||||||
|
|
||||||
if nsecs, ok := tryParseTimestampRFC3339Nano(s); ok {
|
if timestamp, ok := tryParseTimestampRFC3339Nano(s); ok {
|
||||||
nsecs -= int64(bucketOffset)
|
bucketSizeInt := int64(bf.bucketSize)
|
||||||
nsecs -= nsecs % int64(bucketSize)
|
if bucketSizeInt <= 0 {
|
||||||
nsecs += int64(bucketOffset)
|
bucketSizeInt = 1
|
||||||
|
}
|
||||||
|
bucketOffset := int64(bf.bucketOffset)
|
||||||
|
|
||||||
|
timestamp -= bucketOffset
|
||||||
|
if bf.bucketSizeStr == "month" {
|
||||||
|
timestamp = truncateTimestampToMonth(timestamp)
|
||||||
|
} else if bf.bucketSizeStr == "year" {
|
||||||
|
timestamp = truncateTimestampToYear(timestamp)
|
||||||
|
} else {
|
||||||
|
timestamp -= timestamp % bucketSizeInt
|
||||||
|
}
|
||||||
|
timestamp += bucketOffset
|
||||||
|
|
||||||
bufLen := len(br.buf)
|
bufLen := len(br.buf)
|
||||||
br.buf = marshalTimestampRFC3339Nano(br.buf, nsecs)
|
br.buf = marshalTimestampRFC3339Nano(br.buf, timestamp)
|
||||||
return bytesutil.ToUnsafeString(br.buf[bufLen:])
|
return bytesutil.ToUnsafeString(br.buf[bufLen:])
|
||||||
}
|
}
|
||||||
|
|
||||||
if n, ok := tryParseIPv4(s); ok {
|
if n, ok := tryParseIPv4(s); ok {
|
||||||
n -= uint32(int32(bucketOffset))
|
bucketSizeInt := uint32(bf.bucketSize)
|
||||||
n -= n % uint32(bucketSize)
|
if bucketSizeInt <= 0 {
|
||||||
n += uint32(int32(bucketOffset))
|
bucketSizeInt = 1
|
||||||
|
}
|
||||||
|
bucketOffset := uint32(int32(bf.bucketOffset))
|
||||||
|
|
||||||
|
n -= bucketOffset
|
||||||
|
n -= n % bucketSizeInt
|
||||||
|
n += bucketOffset
|
||||||
|
|
||||||
bufLen := len(br.buf)
|
bufLen := len(br.buf)
|
||||||
br.buf = marshalIPv4(br.buf, n)
|
br.buf = marshalIPv4(br.buf, n)
|
||||||
return bytesutil.ToUnsafeString(br.buf[bufLen:])
|
return bytesutil.ToUnsafeString(br.buf[bufLen:])
|
||||||
}
|
}
|
||||||
|
|
||||||
if nsecs, ok := tryParseDuration(s); ok {
|
if nsecs, ok := tryParseDuration(s); ok {
|
||||||
nsecs -= int64(bucketOffset)
|
bucketSizeInt := int64(bf.bucketSize)
|
||||||
nsecs -= nsecs % int64(bucketSize)
|
if bucketSizeInt <= 0 {
|
||||||
nsecs += int64(bucketOffset)
|
bucketSizeInt = 1
|
||||||
|
}
|
||||||
|
bucketOffset := int64(bf.bucketOffset)
|
||||||
|
|
||||||
|
nsecs -= bucketOffset
|
||||||
|
nsecs -= nsecs % bucketSizeInt
|
||||||
|
nsecs += bucketOffset
|
||||||
|
|
||||||
bufLen := len(br.buf)
|
bufLen := len(br.buf)
|
||||||
br.buf = marshalDuration(br.buf, nsecs)
|
br.buf = marshalDuration(br.buf, nsecs)
|
||||||
return bytesutil.ToUnsafeString(br.buf[bufLen:])
|
return bytesutil.ToUnsafeString(br.buf[bufLen:])
|
||||||
|
@ -1261,11 +1303,11 @@ type blockResultColumn struct {
|
||||||
// bucketedValues contains values after getBucketedValues() call
|
// bucketedValues contains values after getBucketedValues() call
|
||||||
bucketedValues []string
|
bucketedValues []string
|
||||||
|
|
||||||
// bucketSize contains bucketSize for bucketedValues
|
// bucketSizeStr contains bucketSizeStr for bucketedValues
|
||||||
bucketSize float64
|
bucketSizeStr string
|
||||||
|
|
||||||
// bucketOffset contains bucketOffset for bucketedValues
|
// bucketOffsetStr contains bucketOffset for bucketedValues
|
||||||
bucketOffset float64
|
bucketOffsetStr string
|
||||||
}
|
}
|
||||||
|
|
||||||
// clone returns a clone of c backed by data from br.
|
// clone returns a clone of c backed by data from br.
|
||||||
|
@ -1279,8 +1321,8 @@ func (c *blockResultColumn) clone(br *blockResult) blockResultColumn {
|
||||||
cNew.dictValues = br.cloneValues(c.dictValues)
|
cNew.dictValues = br.cloneValues(c.dictValues)
|
||||||
cNew.encodedValues = br.cloneValues(c.encodedValues)
|
cNew.encodedValues = br.cloneValues(c.encodedValues)
|
||||||
// do not copy c.values and c.bucketedValues - they should be re-created from scrach if needed
|
// do not copy c.values and c.bucketedValues - they should be re-created from scrach if needed
|
||||||
cNew.bucketSize = c.bucketSize
|
cNew.bucketSizeStr = c.bucketSizeStr
|
||||||
cNew.bucketOffset = c.bucketOffset
|
cNew.bucketOffsetStr = c.bucketOffsetStr
|
||||||
|
|
||||||
return cNew
|
return cNew
|
||||||
}
|
}
|
||||||
|
@ -1331,20 +1373,20 @@ func (c *blockResultColumn) getValueAtRow(br *blockResult, rowIdx int) string {
|
||||||
return values[rowIdx]
|
return values[rowIdx]
|
||||||
}
|
}
|
||||||
|
|
||||||
// getValues returns values for the given column, bucketed according to bucketSize and bucketOffset.
|
// getValues returns values for the given column, bucketed according to bf.
|
||||||
//
|
//
|
||||||
// The returned values are valid until br.reset() is called.
|
// The returned values are valid until br.reset() is called.
|
||||||
func (c *blockResultColumn) getBucketedValues(br *blockResult, bucketSize, bucketOffset float64) []string {
|
func (c *blockResultColumn) getBucketedValues(br *blockResult, bf *byStatsField) []string {
|
||||||
if bucketSize <= 0 {
|
if !bf.hasBucketConfig() {
|
||||||
return c.getValues(br)
|
return c.getValues(br)
|
||||||
}
|
}
|
||||||
if values := c.bucketedValues; values != nil && c.bucketSize == bucketSize && c.bucketOffset == bucketOffset {
|
if values := c.bucketedValues; values != nil && c.bucketSizeStr == bf.bucketSizeStr && c.bucketOffsetStr == bf.bucketOffsetStr {
|
||||||
return values
|
return values
|
||||||
}
|
}
|
||||||
|
|
||||||
c.bucketedValues = br.getBucketedColumnValues(c, bucketSize, bucketOffset)
|
c.bucketedValues = br.getBucketedColumnValues(c, bf)
|
||||||
c.bucketSize = bucketSize
|
c.bucketSizeStr = bf.bucketSizeStr
|
||||||
c.bucketOffset = bucketOffset
|
c.bucketOffsetStr = bf.bucketOffsetStr
|
||||||
return c.bucketedValues
|
return c.bucketedValues
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1356,7 +1398,7 @@ func (c *blockResultColumn) getValues(br *blockResult) []string {
|
||||||
return values
|
return values
|
||||||
}
|
}
|
||||||
|
|
||||||
c.values = br.getBucketedColumnValues(c, 0, 0)
|
c.values = br.getBucketedColumnValues(c, zeroByStatsField)
|
||||||
return c.values
|
return c.values
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1738,5 +1780,15 @@ func (rc *resultColumn) addValue(v string) {
|
||||||
rc.values = append(values, bytesutil.ToUnsafeString(rc.buf[bufLen:]))
|
rc.values = append(values, bytesutil.ToUnsafeString(rc.buf[bufLen:]))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func truncateTimestampToMonth(timestamp int64) int64 {
|
||||||
|
t := time.Unix(0, timestamp).UTC()
|
||||||
|
return time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, time.UTC).UnixNano()
|
||||||
|
}
|
||||||
|
|
||||||
|
func truncateTimestampToYear(timestamp int64) int64 {
|
||||||
|
t := time.Unix(0, timestamp).UTC()
|
||||||
|
return time.Date(t.Year(), time.January, 1, 0, 0, 0, 0, time.UTC).UnixNano()
|
||||||
|
}
|
||||||
|
|
||||||
var nan = math.NaN()
|
var nan = math.NaN()
|
||||||
var inf = math.Inf(1)
|
var inf = math.Inf(1)
|
||||||
|
|
|
@ -927,6 +927,16 @@ func TestParseQuerySuccess(t *testing.T) {
|
||||||
f(`*|stats by(client_ip:/24, server_ip:/16) count() foo`, `* | stats by (client_ip:/24, server_ip:/16) count(*) as foo`)
|
f(`*|stats by(client_ip:/24, server_ip:/16) count() foo`, `* | stats by (client_ip:/24, server_ip:/16) count(*) as foo`)
|
||||||
f(`* | stats by(_time:1d offset 2h) count() as foo`, `* | stats by (_time:1d offset 2h) count(*) as foo`)
|
f(`* | stats by(_time:1d offset 2h) count() as foo`, `* | stats by (_time:1d offset 2h) count(*) as foo`)
|
||||||
f(`* | stats by(_time:1d offset -2.5h5m) count() as foo`, `* | stats by (_time:1d offset -2.5h5m) count(*) as foo`)
|
f(`* | stats by(_time:1d offset -2.5h5m) count() as foo`, `* | stats by (_time:1d offset -2.5h5m) count(*) as foo`)
|
||||||
|
f(`* | stats by (_time:nanosecond) count() foo`, `* | stats by (_time:nanosecond) count(*) as foo`)
|
||||||
|
f(`* | stats by (_time:microsecond) count() foo`, `* | stats by (_time:microsecond) count(*) as foo`)
|
||||||
|
f(`* | stats by (_time:millisecond) count() foo`, `* | stats by (_time:millisecond) count(*) as foo`)
|
||||||
|
f(`* | stats by (_time:second) count() foo`, `* | stats by (_time:second) count(*) as foo`)
|
||||||
|
f(`* | stats by (_time:minute) count() foo`, `* | stats by (_time:minute) count(*) as foo`)
|
||||||
|
f(`* | stats by (_time:hour) count() foo`, `* | stats by (_time:hour) count(*) as foo`)
|
||||||
|
f(`* | stats by (_time:day) count() foo`, `* | stats by (_time:day) count(*) as foo`)
|
||||||
|
f(`* | stats by (_time:week) count() foo`, `* | stats by (_time:week) count(*) as foo`)
|
||||||
|
f(`* | stats by (_time:month) count() foo`, `* | stats by (_time:month) count(*) as foo`)
|
||||||
|
f(`* | stats by (_time:year offset 6.5h) count() foo`, `* | stats by (_time:year offset 6.5h) count(*) as foo`)
|
||||||
|
|
||||||
// sort pipe
|
// sort pipe
|
||||||
f(`* | sort`, `* | sort`)
|
f(`* | sort`, `* | sort`)
|
||||||
|
|
|
@ -176,14 +176,14 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) {
|
||||||
c := br.getColumnByName(bf.name)
|
c := br.getColumnByName(bf.name)
|
||||||
if c.isConst {
|
if c.isConst {
|
||||||
// Fast path for column with constant value.
|
// Fast path for column with constant value.
|
||||||
v := br.getBucketedValue(c.encodedValues[0], bf.bucketSize, bf.bucketOffset)
|
v := br.getBucketedValue(c.encodedValues[0], bf)
|
||||||
shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(v))
|
shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(v))
|
||||||
psg := shard.getPipeStatsGroup(shard.keyBuf)
|
psg := shard.getPipeStatsGroup(shard.keyBuf)
|
||||||
shard.stateSizeBudget -= psg.updateStatsForAllRows(br)
|
shard.stateSizeBudget -= psg.updateStatsForAllRows(br)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
values := c.getBucketedValues(br, bf.bucketSize, bf.bucketOffset)
|
values := c.getBucketedValues(br, bf)
|
||||||
if areConstValues(values) {
|
if areConstValues(values) {
|
||||||
// Fast path for column with constant values.
|
// Fast path for column with constant values.
|
||||||
shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(values[0]))
|
shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(values[0]))
|
||||||
|
@ -210,7 +210,7 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) {
|
||||||
columnValues := shard.columnValues[:0]
|
columnValues := shard.columnValues[:0]
|
||||||
for _, bf := range byFields {
|
for _, bf := range byFields {
|
||||||
c := br.getColumnByName(bf.name)
|
c := br.getColumnByName(bf.name)
|
||||||
values := c.getBucketedValues(br, bf.bucketSize, bf.bucketOffset)
|
values := c.getBucketedValues(br, bf)
|
||||||
columnValues = append(columnValues, values)
|
columnValues = append(columnValues, values)
|
||||||
}
|
}
|
||||||
shard.columnValues = columnValues
|
shard.columnValues = columnValues
|
||||||
|
@ -544,6 +544,8 @@ func parseResultName(lex *lexer) (string, error) {
|
||||||
return resultName, nil
|
return resultName, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var zeroByStatsField = &byStatsField{}
|
||||||
|
|
||||||
// byStatsField represents 'by (...)' part of the pipeStats.
|
// byStatsField represents 'by (...)' part of the pipeStats.
|
||||||
//
|
//
|
||||||
// It can have either 'name' representation or 'name:bucket' or 'name:buket offset off' representation,
|
// It can have either 'name' representation or 'name:bucket' or 'name:buket offset off' representation,
|
||||||
|
@ -576,6 +578,10 @@ func (bf *byStatsField) String() string {
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (bf *byStatsField) hasBucketConfig() bool {
|
||||||
|
return len(bf.bucketSizeStr) > 0 || len(bf.bucketOffsetStr) > 0
|
||||||
|
}
|
||||||
|
|
||||||
func parseByStatsFields(lex *lexer) ([]*byStatsField, error) {
|
func parseByStatsFields(lex *lexer) ([]*byStatsField, error) {
|
||||||
if !lex.isKeyword("(") {
|
if !lex.isKeyword("(") {
|
||||||
return nil, fmt.Errorf("missing `(`")
|
return nil, fmt.Errorf("missing `(`")
|
||||||
|
@ -603,12 +609,14 @@ func parseByStatsFields(lex *lexer) ([]*byStatsField, error) {
|
||||||
bucketSizeStr += lex.token
|
bucketSizeStr += lex.token
|
||||||
lex.nextToken()
|
lex.nextToken()
|
||||||
}
|
}
|
||||||
bucketSize, ok := tryParseBucketSize(bucketSizeStr)
|
if bucketSizeStr != "year" && bucketSizeStr != "month" {
|
||||||
if !ok {
|
bucketSize, ok := tryParseBucketSize(bucketSizeStr)
|
||||||
return nil, fmt.Errorf("cannot parse bucket size for field %q: %q", fieldName, bucketSizeStr)
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("cannot parse bucket size for field %q: %q", fieldName, bucketSizeStr)
|
||||||
|
}
|
||||||
|
bf.bucketSize = bucketSize
|
||||||
}
|
}
|
||||||
bf.bucketSizeStr = bucketSizeStr
|
bf.bucketSizeStr = bucketSizeStr
|
||||||
bf.bucketSize = bucketSize
|
|
||||||
|
|
||||||
// Parse bucket offset
|
// Parse bucket offset
|
||||||
if lex.isKeyword("offset") {
|
if lex.isKeyword("offset") {
|
||||||
|
@ -672,6 +680,25 @@ func tryParseBucketOffset(s string) (float64, bool) {
|
||||||
// - bytes: 1.5KiB
|
// - bytes: 1.5KiB
|
||||||
// - ipv4 mask: /24
|
// - ipv4 mask: /24
|
||||||
func tryParseBucketSize(s string) (float64, bool) {
|
func tryParseBucketSize(s string) (float64, bool) {
|
||||||
|
switch s {
|
||||||
|
case "nanosecond":
|
||||||
|
return 1, true
|
||||||
|
case "microsecond":
|
||||||
|
return nsecsPerMicrosecond, true
|
||||||
|
case "millisecond":
|
||||||
|
return nsecsPerMillisecond, true
|
||||||
|
case "second":
|
||||||
|
return nsecsPerSecond, true
|
||||||
|
case "minute":
|
||||||
|
return nsecsPerMinute, true
|
||||||
|
case "hour":
|
||||||
|
return nsecsPerHour, true
|
||||||
|
case "day":
|
||||||
|
return nsecsPerDay, true
|
||||||
|
case "week":
|
||||||
|
return nsecsPerWeek, true
|
||||||
|
}
|
||||||
|
|
||||||
// Try parsing s as floating point number
|
// Try parsing s as floating point number
|
||||||
if f, ok := tryParseFloat64(s); ok {
|
if f, ok := tryParseFloat64(s); ok {
|
||||||
return f, true
|
return f, true
|
||||||
|
|
Loading…
Reference in a new issue