This commit is contained in:
Aliaksandr Valialkin 2024-05-03 11:15:09 +02:00
parent caf1304ee4
commit 77e2d0be60
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
21 changed files with 1996 additions and 362 deletions

View file

@ -1069,7 +1069,7 @@ See the [Roadmap](https://docs.victoriametrics.com/VictoriaLogs/Roadmap.html) fo
## Stats
LogsQL supports calculating the following stats:
LogsQL supports calculating the following stats functions:
- The number of matching log entries. Examples:
- `error | stats count() as errors_total` returns the number of [log messages](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) with the `error` [word](#word).
@ -1117,6 +1117,48 @@ LogsQL supports calculating the following stats:
across [log messages](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) with the `GET` [word](#word), grouped
by `path` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) value.
### Grouping stats by buckets
#### Time buckets
Stats can be bucketed by [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) with the `_time:bucket_duration` syntax inside `by(...)` clause.
For example, the following query returns per-minute number of log messages with the `error` [word](#word) for the last 10 minutes:
```logsql
_time:10m error | stats by (_time:1m) count() errors_per_minute
```
#### Numeric buckets
Stats can be bucketed by any numeric [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with the `field_name:bucket_size` syntax inside `by(...)` clause.
For example, the following query returns the number of log messages with the `status=200` [phrase](#phrase-filter) bucketed by `request_duration_seconds` numeric field with `0.5` step:
```logsql
_time:10m "status=200" | stats by (request_duration_seconds:0.5) count() requests
```
The `bucket_size` can contain the following convenient suffixes:
- `KB` - the `bucket_size` is multiplied by `1000` in this case. For example, `10KB`.
- `MB` - the `bucket_size` is multiplied by `1_000_000` in this case. For example, `10MB`.
- `GB` - the `bucket_size` is multiplied by `1_000_000_000` in this case. For example, `10GB`.
- `TB` - the `bucket_size` is multiplied by `1_000_000_000_000` in this case. For example, `10TB`.
- `KiB` - the `bucket_size` is multiplied by `1024` in this case. For example, `10KiB`.
- `MiB` - the `bucket_size` is multiplied by `1024*1024` in this case. For example, `10MiB`.
- `GiB` - the `bucket_size` is multiplied by `1024*1024*1024` in this case. For example, `10GiB`.
- `TiB` - the `bucket_size` is multiplied by `1024*1024*1024*1024` in this case. For example, `10TiB`.
#### IPv4 mask buckets
Stats can be bucketed by [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with [IPv4 addresses](https://en.wikipedia.org/wiki/IP_address)
via the `ip_field_name:/network_mask` syntax inside `by(...)` clause. For example, the following query returns the number of log entries per `/24` subnetwork during the last 10 minutes:
```logsql
_time:10m | stats by (ip:/24) count() requests_per_subnet
```
### Calculating multiple stats
Stats calculations can be combined. For example, the following query calculates the number of log messages with the `error` [word](#word),
the number of unique values for `ip` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) and the sum of `duration`
[field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model), grouped by `namespace` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model):
@ -1128,6 +1170,8 @@ error | stats by (namespace)
sum(duration) as duration_sum
```
### Stats TODO
LogsQL will support calculating the following additional stats based on the [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model)
and fields created by [transformations](#transformations):

View file

@ -711,10 +711,10 @@ type timestampsHeader struct {
// blockSize is the size of the timestamps block inside timestampsFilename file
blockSize uint64
// minTimestamp is the mimumum timestamp seen in the block
// minTimestamp is the mimumum timestamp seen in the block in nanoseconds
minTimestamp int64
// maxTimestamp is the maximum timestamp seen in the block
// maxTimestamp is the maximum timestamp seen in the block in nanoseconds
maxTimestamp int64
// marshalType is the type used for encoding the timestamps block

View file

@ -1,15 +1,18 @@
package logstorage
import (
"encoding/binary"
"math"
"strconv"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// blockResult holds results for a single block of log entries.
//
// It is expected that its contents is accessed only from a single goroutine at a time.
type blockResult struct {
// buf holds all the bytes behind the requested column values in the block.
buf []byte
@ -335,6 +338,582 @@ func (br *blockResult) addConstColumn(name, value string) {
})
}
func (br *blockResult) getBucketedColumnValues(c *blockResultColumn, bucketSize float64) []string {
if c.isConst {
return br.getBucketedConstValues(c.encodedValues[0], bucketSize)
}
if c.isTime {
return br.getBucketedTimestampValues(bucketSize)
}
switch c.valueType {
case valueTypeString:
return br.getBucketedStringValues(c.encodedValues, bucketSize)
case valueTypeDict:
return br.getBucketedDictValues(c.encodedValues, c.dictValues, bucketSize)
case valueTypeUint8:
return br.getBucketedUint8Values(c.encodedValues, bucketSize)
case valueTypeUint16:
return br.getBucketedUint16Values(c.encodedValues, bucketSize)
case valueTypeUint32:
return br.getBucketedUint32Values(c.encodedValues, bucketSize)
case valueTypeUint64:
return br.getBucketedUint64Values(c.encodedValues, bucketSize)
case valueTypeFloat64:
return br.getBucketedFloat64Values(c.encodedValues, bucketSize)
case valueTypeIPv4:
return br.getBucketedIPv4Values(c.encodedValues, bucketSize)
case valueTypeTimestampISO8601:
return br.getBucketedTimestampISO8601Values(c.encodedValues, bucketSize)
default:
logger.Panicf("BUG: unknown valueType=%d", c.valueType)
return nil
}
}
func (br *blockResult) getBucketedConstValues(v string, bucketSize float64) []string {
if v == "" {
// Fast path - return a slice of empty strings without constructing the slice.
return getEmptyStrings(len(br.timestamps))
}
// Slower path - construct slice of identical values with the len(br.timestamps)
valuesBuf := br.valuesBuf
valuesBufLen := len(valuesBuf)
v = br.getBucketedValue(v, bucketSize)
for range br.timestamps {
valuesBuf = append(valuesBuf, v)
}
br.valuesBuf = valuesBuf
return valuesBuf[valuesBufLen:]
}
func (br *blockResult) getBucketedTimestampValues(bucketSize float64) []string {
buf := br.buf
valuesBuf := br.valuesBuf
valuesBufLen := len(valuesBuf)
timestamps := br.timestamps
var s string
if bucketSize <= 1 {
for i := range timestamps {
if i > 0 && timestamps[i-1] == timestamps[i] {
valuesBuf = append(valuesBuf, s)
continue
}
bufLen := len(buf)
buf = marshalTimestampRFC3339Nano(buf, timestamps[i])
s = bytesutil.ToUnsafeString(buf[bufLen:])
valuesBuf = append(valuesBuf, s)
}
} else {
bucketSizeInt := int64(bucketSize)
var prevTimestamp int64
for i := range timestamps {
if i > 0 && timestamps[i-1] == timestamps[i] {
valuesBuf = append(valuesBuf, s)
continue
}
timestamp := timestamps[i]
timestamp -= timestamp % bucketSizeInt
if i > 0 && timestamp == prevTimestamp {
valuesBuf = append(valuesBuf, s)
continue
}
prevTimestamp = timestamp
bufLen := len(buf)
buf = marshalTimestampRFC3339Nano(buf, timestamp)
s = bytesutil.ToUnsafeString(buf[bufLen:])
valuesBuf = append(valuesBuf, s)
}
}
br.buf = buf
br.valuesBuf = valuesBuf
return valuesBuf[valuesBufLen:]
}
func (br *blockResult) getBucketedStringValues(values []string, bucketSize float64) []string {
if bucketSize <= 0 {
return values
}
valuesBuf := br.valuesBuf
valuesBufLen := len(valuesBuf)
var s string
for i := range values {
if i > 0 && values[i-1] == values[i] {
valuesBuf = append(valuesBuf, s)
continue
}
s = br.getBucketedValue(values[i], bucketSize)
valuesBuf = append(valuesBuf, s)
}
br.valuesBuf = valuesBuf
return valuesBuf[valuesBufLen:]
}
func (br *blockResult) getBucketedDictValues(encodedValues, dictValues []string, bucketSize float64) []string {
valuesBuf := br.valuesBuf
valuesBufLen := len(valuesBuf)
dictValues = br.getBucketedStringValues(dictValues, bucketSize)
for _, v := range encodedValues {
dictIdx := v[0]
valuesBuf = append(valuesBuf, dictValues[dictIdx])
}
br.valuesBuf = valuesBuf
return valuesBuf[valuesBufLen:]
}
func (br *blockResult) getBucketedUint8Values(encodedValues []string, bucketSize float64) []string {
buf := br.buf
valuesBuf := br.valuesBuf
valuesBufLen := len(valuesBuf)
var s string
if bucketSize <= 1 || bucketSize >= (1<<8) {
for i, v := range encodedValues {
if i > 0 && encodedValues[i-1] == encodedValues[i] {
valuesBuf = append(valuesBuf, s)
continue
}
n := uint64(v[0])
bufLen := len(buf)
buf = marshalUint64(buf, n)
s = bytesutil.ToUnsafeString(buf[bufLen:])
valuesBuf = append(valuesBuf, s)
}
} else {
bucketSizeInt := uint64(bucketSize)
var nPrev uint64
for i, v := range encodedValues {
if i > 0 && encodedValues[i-1] == encodedValues[i] {
valuesBuf = append(valuesBuf, s)
continue
}
n := uint64(v[0])
n -= n % bucketSizeInt
if i > 0 && n == nPrev {
valuesBuf = append(valuesBuf, s)
continue
}
nPrev = n
bufLen := len(buf)
buf = marshalUint64(buf, n)
s = bytesutil.ToUnsafeString(buf[bufLen:])
valuesBuf = append(valuesBuf, s)
}
}
br.valuesBuf = valuesBuf
br.buf = buf
return br.valuesBuf[valuesBufLen:]
}
func (br *blockResult) getBucketedUint16Values(encodedValues []string, bucketSize float64) []string {
buf := br.buf
valuesBuf := br.valuesBuf
valuesBufLen := len(valuesBuf)
var s string
if bucketSize <= 1 || bucketSize >= (1<<16) {
for i, v := range encodedValues {
if i > 0 && encodedValues[i-1] == encodedValues[i] {
valuesBuf = append(valuesBuf, s)
continue
}
b := bytesutil.ToUnsafeBytes(v)
n := uint64(encoding.UnmarshalUint16(b))
bufLen := len(buf)
buf = marshalUint64(buf, n)
s = bytesutil.ToUnsafeString(buf[bufLen:])
valuesBuf = append(valuesBuf, s)
}
} else {
bucketSizeInt := uint64(bucketSize)
var nPrev uint64
for i, v := range encodedValues {
if i > 0 && encodedValues[i-1] == encodedValues[i] {
valuesBuf = append(valuesBuf, s)
continue
}
b := bytesutil.ToUnsafeBytes(v)
n := uint64(encoding.UnmarshalUint16(b))
n -= n % bucketSizeInt
if i > 0 && n == nPrev {
valuesBuf = append(valuesBuf, s)
continue
}
nPrev = n
bufLen := len(buf)
buf = marshalUint64(buf, n)
s = bytesutil.ToUnsafeString(buf[bufLen:])
valuesBuf = append(valuesBuf, s)
}
}
br.valuesBuf = valuesBuf
br.buf = buf
return br.valuesBuf[valuesBufLen:]
}
func (br *blockResult) getBucketedUint32Values(encodedValues []string, bucketSize float64) []string {
buf := br.buf
valuesBuf := br.valuesBuf
valuesBufLen := len(valuesBuf)
var s string
if bucketSize <= 1 || bucketSize >= (1<<32) {
for i, v := range encodedValues {
if i > 0 && encodedValues[i-1] == encodedValues[i] {
valuesBuf = append(valuesBuf, s)
continue
}
b := bytesutil.ToUnsafeBytes(v)
n := uint64(encoding.UnmarshalUint32(b))
bufLen := len(buf)
buf = marshalUint64(buf, n)
s = bytesutil.ToUnsafeString(buf[bufLen:])
valuesBuf = append(valuesBuf, s)
}
} else {
bucketSizeInt := uint64(bucketSize)
var nPrev uint64
for i, v := range encodedValues {
if i > 0 && encodedValues[i-1] == encodedValues[i] {
valuesBuf = append(valuesBuf, s)
continue
}
b := bytesutil.ToUnsafeBytes(v)
n := uint64(encoding.UnmarshalUint32(b))
n -= n % bucketSizeInt
if i > 0 && n == nPrev {
valuesBuf = append(valuesBuf, s)
continue
}
nPrev = n
bufLen := len(buf)
buf = marshalUint64(buf, n)
s = bytesutil.ToUnsafeString(buf[bufLen:])
valuesBuf = append(valuesBuf, s)
}
}
br.valuesBuf = valuesBuf
br.buf = buf
return br.valuesBuf[valuesBufLen:]
}
func (br *blockResult) getBucketedUint64Values(encodedValues []string, bucketSize float64) []string {
buf := br.buf
valuesBuf := br.valuesBuf
valuesBufLen := len(valuesBuf)
var s string
if bucketSize <= 1 || bucketSize >= (1<<64) {
for i, v := range encodedValues {
if i > 0 && encodedValues[i-1] == encodedValues[i] {
valuesBuf = append(valuesBuf, s)
continue
}
b := bytesutil.ToUnsafeBytes(v)
n := encoding.UnmarshalUint64(b)
bufLen := len(buf)
buf = marshalUint64(buf, n)
s = bytesutil.ToUnsafeString(buf[bufLen:])
valuesBuf = append(valuesBuf, s)
}
} else {
bucketSizeInt := uint64(bucketSize)
var nPrev uint64
for i, v := range encodedValues {
if i > 0 && encodedValues[i-1] == encodedValues[i] {
valuesBuf = append(valuesBuf, s)
continue
}
b := bytesutil.ToUnsafeBytes(v)
n := encoding.UnmarshalUint64(b)
n -= n % bucketSizeInt
if i > 0 && n == nPrev {
valuesBuf = append(valuesBuf, s)
continue
}
nPrev = n
bufLen := len(buf)
buf = marshalUint64(buf, n)
s = bytesutil.ToUnsafeString(buf[bufLen:])
valuesBuf = append(valuesBuf, s)
}
}
br.valuesBuf = valuesBuf
br.buf = buf
return br.valuesBuf[valuesBufLen:]
}
func (br *blockResult) getBucketedFloat64Values(encodedValues []string, bucketSize float64) []string {
buf := br.buf
valuesBuf := br.valuesBuf
valuesBufLen := len(valuesBuf)
var s string
if bucketSize <= 0 {
for i, v := range encodedValues {
if i > 0 && encodedValues[i-1] == encodedValues[i] {
valuesBuf = append(valuesBuf, s)
continue
}
b := bytesutil.ToUnsafeBytes(v)
n := encoding.UnmarshalUint64(b)
f := math.Float64frombits(n)
bufLen := len(buf)
buf = marshalFloat64(buf, f)
s = bytesutil.ToUnsafeString(buf[bufLen:])
valuesBuf = append(valuesBuf, s)
}
} else {
_, e := decimal.FromFloat(bucketSize)
p10 := math.Pow10(int(-e))
bucketSizeP10 := int64(bucketSize * p10)
var fPrev float64
for i, v := range encodedValues {
if i > 0 && encodedValues[i-1] == encodedValues[i] {
valuesBuf = append(valuesBuf, s)
continue
}
b := bytesutil.ToUnsafeBytes(v)
n := encoding.UnmarshalUint64(b)
f := math.Float64frombits(n)
// emulate f % bucketSize for float64 values
fP10 := int64(f * p10)
fP10 -= fP10 % bucketSizeP10
f = float64(fP10) / p10
if i > 0 && f == fPrev {
valuesBuf = append(valuesBuf, s)
continue
}
fPrev = f
bufLen := len(buf)
buf = marshalFloat64(buf, f)
s = bytesutil.ToUnsafeString(buf[bufLen:])
valuesBuf = append(valuesBuf, s)
}
}
br.valuesBuf = valuesBuf
br.buf = buf
return br.valuesBuf[valuesBufLen:]
}
func (br *blockResult) getBucketedIPv4Values(encodedValues []string, bucketSize float64) []string {
buf := br.buf
valuesBuf := br.valuesBuf
valuesBufLen := len(valuesBuf)
var s string
if bucketSize <= 1 {
for i, v := range encodedValues {
if i > 0 && encodedValues[i-1] == encodedValues[i] {
valuesBuf = append(valuesBuf, s)
continue
}
bufLen := len(buf)
buf = toIPv4String(buf, v)
s = bytesutil.ToUnsafeString(buf[bufLen:])
valuesBuf = append(valuesBuf, s)
}
} else {
bucketSizeInt := uint32(bucketSize)
var nPrev uint32
for i, v := range encodedValues {
if i > 0 && encodedValues[i-1] == encodedValues[i] {
valuesBuf = append(valuesBuf, s)
continue
}
b := bytesutil.ToUnsafeBytes(v)
n := binary.BigEndian.Uint32(b)
n -= n % bucketSizeInt
if i > 0 && n == nPrev {
valuesBuf = append(valuesBuf, s)
continue
}
nPrev = n
bufLen := len(buf)
buf = marshalIPv4(buf, n)
s = bytesutil.ToUnsafeString(buf[bufLen:])
valuesBuf = append(valuesBuf, s)
}
}
br.valuesBuf = valuesBuf
br.buf = buf
return valuesBuf[valuesBufLen:]
}
func (br *blockResult) getBucketedTimestampISO8601Values(encodedValues []string, bucketSize float64) []string {
buf := br.buf
valuesBuf := br.valuesBuf
valuesBufLen := len(valuesBuf)
var s string
if bucketSize <= 1 {
for i, v := range encodedValues {
if i > 0 && encodedValues[i-1] == encodedValues[i] {
valuesBuf = append(valuesBuf, s)
continue
}
b := bytesutil.ToUnsafeBytes(v)
n := encoding.UnmarshalUint64(b)
bufLen := len(buf)
buf = marshalTimestampISO8601(buf, int64(n))
s = bytesutil.ToUnsafeString(buf[bufLen:])
valuesBuf = append(valuesBuf, s)
}
} else {
bucketSizeInt := uint64(bucketSize)
var nPrev uint64
bb := bbPool.Get()
for i, v := range encodedValues {
if i > 0 && encodedValues[i-1] == encodedValues[i] {
valuesBuf = append(valuesBuf, s)
continue
}
b := bytesutil.ToUnsafeBytes(v)
n := encoding.UnmarshalUint64(b)
n -= n % bucketSizeInt
if i > 0 && n == nPrev {
valuesBuf = append(valuesBuf, s)
continue
}
nPrev = n
bufLen := len(buf)
buf = marshalTimestampISO8601(buf, int64(n))
s = bytesutil.ToUnsafeString(buf[bufLen:])
valuesBuf = append(valuesBuf, s)
}
bbPool.Put(bb)
}
br.valuesBuf = valuesBuf
br.buf = buf
return valuesBuf[valuesBufLen:]
}
// getBucketedValue returns bucketed s according to the given bucketSize.
func (br *blockResult) getBucketedValue(s string, bucketSize float64) string {
if bucketSize <= 0 {
return s
}
if len(s) == 0 {
return s
}
c := s[0]
if (c < '0' || c > '9') && c != '-' {
// Fast path - the value cannot be bucketed, since it starts with unexpected chars.
return s
}
if f, ok := tryParseFloat64(s); ok {
// emulate f % bucketSize for float64 values
_, e := decimal.FromFloat(bucketSize)
p10 := math.Pow10(int(-e))
fP10 := int64(f * p10)
fP10 -= fP10 % int64(bucketSize*p10)
f = float64(fP10) / p10
bufLen := len(br.buf)
br.buf = marshalFloat64(br.buf, f)
return bytesutil.ToUnsafeString(br.buf[bufLen:])
}
if nsecs, ok := tryParseTimestampISO8601(s); ok {
nsecs -= nsecs % int64(bucketSize)
bufLen := len(br.buf)
br.buf = marshalTimestampISO8601(br.buf, nsecs)
return bytesutil.ToUnsafeString(br.buf[bufLen:])
}
if nsecs, ok := tryParseTimestampRFC3339Nano(s); ok {
nsecs -= nsecs % int64(bucketSize)
bufLen := len(br.buf)
br.buf = marshalTimestampRFC3339Nano(br.buf, nsecs)
return bytesutil.ToUnsafeString(br.buf[bufLen:])
}
if n, ok := tryParseIPv4(s); ok {
n -= n % uint32(bucketSize)
bufLen := len(br.buf)
br.buf = marshalIPv4(br.buf, n)
return bytesutil.ToUnsafeString(br.buf[bufLen:])
}
if nsecs, ok := tryParseDuration(s); ok {
nsecs -= nsecs % int64(bucketSize)
bufLen := len(br.buf)
br.buf = marshalDuration(br.buf, nsecs)
return bytesutil.ToUnsafeString(br.buf[bufLen:])
}
// Couldn't parse s, so return it as is.
return s
}
func (br *blockResult) addEmptyStringColumn(columnName string) {
br.cs = append(br.cs, blockResultColumn{
name: columnName,
@ -378,7 +957,9 @@ func (br *blockResult) getColumnByName(columnName string) blockResultColumn {
}
cs := br.getColumns()
for i := range cs {
// iterate columns in reverse order, so overridden column results are returned instead of original column results.
for i := len(cs) - 1; i >= 0; i-- {
if cs[i].name == columnName {
return cs[i]
}
@ -429,15 +1010,6 @@ func (br *blockResult) truncateRows(keepRows int) {
}
}
func (br *blockResult) appendColumnValues(dst [][]string, columnNames []string) [][]string {
for _, columnName := range columnNames {
c := br.getColumnByName(columnName)
values := c.getValues(br)
dst = append(dst, values)
}
return dst
}
type blockResultColumn struct {
// name is column name.
name string
@ -455,15 +1027,21 @@ type blockResultColumn struct {
// valueType is the type of non-cost value
valueType valueType
// dictValues contain dictionary values for valueTypeDict column
// dictValues contains dictionary values for valueTypeDict column
dictValues []string
// encodedValues contain encoded values for non-const column
// encodedValues contains encoded values for non-const column
encodedValues []string
// values contain decoded values after getValues() call for the given column
// values contains decoded values after getValues() call
values []string
// bucketedValues contains values after getBucketedValues() call
bucketedValues []string
// bucketSize contains bucketSize for bucketedValues
bucketSize float64
// buf and valuesBuf are used by addValue() in order to re-use memory across resetRows().
buf []byte
valuesBuf []string
@ -557,129 +1135,31 @@ func (c *blockResultColumn) getValueAtRow(br *blockResult, rowIdx int) string {
return values[rowIdx]
}
// getValues returns values for the given column, bucketed according to bucketSize.
//
// The returned values are valid until br.reset() is called.
func (c *blockResultColumn) getBucketedValues(br *blockResult, bucketSize float64) []string {
if bucketSize <= 0 {
return c.getValues(br)
}
if values := c.bucketedValues; values != nil && c.bucketSize == bucketSize {
return values
}
c.bucketedValues = br.getBucketedColumnValues(c, bucketSize)
c.bucketSize = bucketSize
return c.bucketedValues
}
// getValues returns values for the given column.
//
// The returned values are valid until br.reset() is called.
func (c *blockResultColumn) getValues(br *blockResult) []string {
if c.values != nil {
return c.values
if values := c.values; values != nil {
return values
}
buf := br.buf
valuesBuf := br.valuesBuf
valuesBufLen := len(valuesBuf)
if c.isConst {
v := c.encodedValues[0]
if v == "" {
// Fast path - return a slice of empty strings without constructing it.
c.values = getEmptyStrings(len(br.timestamps))
return c.values
}
// Slower path - construct slice of identical values with the len(br.timestamps)
for range br.timestamps {
valuesBuf = append(valuesBuf, v)
}
c.values = valuesBuf[valuesBufLen:]
br.valuesBuf = valuesBuf
return c.values
}
if c.isTime {
for _, timestamp := range br.timestamps {
t := time.Unix(0, timestamp).UTC()
bufLen := len(buf)
buf = t.AppendFormat(buf, time.RFC3339Nano)
s := bytesutil.ToUnsafeString(buf[bufLen:])
valuesBuf = append(valuesBuf, s)
}
c.values = valuesBuf[valuesBufLen:]
br.buf = buf
br.valuesBuf = valuesBuf
return c.values
}
appendValue := func(v string) {
bufLen := len(buf)
buf = append(buf, v...)
s := bytesutil.ToUnsafeString(buf[bufLen:])
valuesBuf = append(valuesBuf, s)
}
switch c.valueType {
case valueTypeString:
c.values = c.encodedValues
return c.values
case valueTypeDict:
dictValues := c.dictValues
for _, v := range c.encodedValues {
dictIdx := v[0]
appendValue(dictValues[dictIdx])
}
case valueTypeUint8:
bb := bbPool.Get()
for _, v := range c.encodedValues {
n := uint64(v[0])
bb.B = strconv.AppendUint(bb.B[:0], n, 10)
appendValue(bytesutil.ToUnsafeString(bb.B))
}
bbPool.Put(bb)
case valueTypeUint16:
bb := bbPool.Get()
for _, v := range c.encodedValues {
b := bytesutil.ToUnsafeBytes(v)
n := uint64(encoding.UnmarshalUint16(b))
bb.B = strconv.AppendUint(bb.B[:0], n, 10)
appendValue(bytesutil.ToUnsafeString(bb.B))
}
bbPool.Put(bb)
case valueTypeUint32:
bb := bbPool.Get()
for _, v := range c.encodedValues {
b := bytesutil.ToUnsafeBytes(v)
n := uint64(encoding.UnmarshalUint32(b))
bb.B = strconv.AppendUint(bb.B[:0], n, 10)
appendValue(bytesutil.ToUnsafeString(bb.B))
}
bbPool.Put(bb)
case valueTypeUint64:
bb := bbPool.Get()
for _, v := range c.encodedValues {
b := bytesutil.ToUnsafeBytes(v)
n := encoding.UnmarshalUint64(b)
bb.B = strconv.AppendUint(bb.B[:0], n, 10)
appendValue(bytesutil.ToUnsafeString(bb.B))
}
bbPool.Put(bb)
case valueTypeFloat64:
bb := bbPool.Get()
for _, v := range c.encodedValues {
bb.B = toFloat64String(bb.B[:0], v)
appendValue(bytesutil.ToUnsafeString(bb.B))
}
bbPool.Put(bb)
case valueTypeIPv4:
bb := bbPool.Get()
for _, v := range c.encodedValues {
bb.B = toIPv4String(bb.B[:0], v)
appendValue(bytesutil.ToUnsafeString(bb.B))
}
bbPool.Put(bb)
case valueTypeTimestampISO8601:
bb := bbPool.Get()
for _, v := range c.encodedValues {
bb.B = toTimestampISO8601String(bb.B[:0], v)
appendValue(bytesutil.ToUnsafeString(bb.B))
}
bbPool.Put(bb)
default:
logger.Panicf("BUG: unknown valueType=%d", c.valueType)
}
c.values = valuesBuf[valuesBufLen:]
br.buf = buf
br.valuesBuf = valuesBuf
c.values = br.getBucketedColumnValues(c, 0)
return c.values
}

View file

@ -84,12 +84,12 @@ func (fe *filterExact) apply(bs *blockSearch, bm *bitmap) {
func matchTimestampISO8601ByExactValue(bs *blockSearch, ch *columnHeader, bm *bitmap, value string, tokens []string) {
n, ok := tryParseTimestampISO8601(value)
if !ok || n < ch.minValue || n > ch.maxValue {
if !ok || n < int64(ch.minValue) || n > int64(ch.maxValue) {
bm.resetBits()
return
}
bb := bbPool.Get()
bb.B = encoding.MarshalUint64(bb.B, n)
bb.B = encoding.MarshalUint64(bb.B, uint64(n))
matchBinaryValue(bs, ch, bm, bb.B, tokens)
bbPool.Put(bb)
}

View file

@ -242,7 +242,7 @@ func (fi *filterIn) initTimestampISO8601Values() {
continue
}
bufLen := len(buf)
buf = encoding.MarshalUint64(buf, n)
buf = encoding.MarshalUint64(buf, uint64(n))
s := bytesutil.ToUnsafeString(buf[bufLen:])
m[s] = struct{}{}
}

View file

@ -1,7 +1,6 @@
package logstorage
import (
"strconv"
"unicode/utf8"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@ -192,12 +191,12 @@ func matchMinMaxValueLen(ch *columnHeader, minLen, maxLen uint64) bool {
bb := bbPool.Get()
defer bbPool.Put(bb)
bb.B = strconv.AppendUint(bb.B[:0], ch.minValue, 10)
bb.B = marshalUint64(bb.B[:0], ch.minValue)
s := bytesutil.ToUnsafeString(bb.B)
if maxLen < uint64(len(s)) {
return false
}
bb.B = strconv.AppendUint(bb.B[:0], ch.maxValue, 10)
bb.B = marshalUint64(bb.B[:0], ch.maxValue)
s = bytesutil.ToUnsafeString(bb.B)
return minLen <= uint64(len(s))
}

View file

@ -2,7 +2,6 @@ package logstorage
import (
"fmt"
"strconv"
"strings"
"sync"
"unicode/utf8"
@ -323,7 +322,7 @@ func toUint8String(bs *blockSearch, bb *bytesutil.ByteBuffer, v string) string {
logger.Panicf("FATAL: %s: unexpected length for binary representation of uint8 number: got %d; want 1", bs.partPath(), len(v))
}
n := uint64(v[0])
bb.B = strconv.AppendUint(bb.B[:0], n, 10)
bb.B = marshalUint64(bb.B[:0], n)
return bytesutil.ToUnsafeString(bb.B)
}
@ -333,7 +332,7 @@ func toUint16String(bs *blockSearch, bb *bytesutil.ByteBuffer, v string) string
}
b := bytesutil.ToUnsafeBytes(v)
n := uint64(encoding.UnmarshalUint16(b))
bb.B = strconv.AppendUint(bb.B[:0], n, 10)
bb.B = marshalUint64(bb.B[:0], n)
return bytesutil.ToUnsafeString(bb.B)
}
@ -343,7 +342,7 @@ func toUint32String(bs *blockSearch, bb *bytesutil.ByteBuffer, v string) string
}
b := bytesutil.ToUnsafeBytes(v)
n := uint64(encoding.UnmarshalUint32(b))
bb.B = strconv.AppendUint(bb.B[:0], n, 10)
bb.B = marshalUint64(bb.B[:0], n)
return bytesutil.ToUnsafeString(bb.B)
}
@ -353,6 +352,6 @@ func toUint64String(bs *blockSearch, bb *bytesutil.ByteBuffer, v string) string
}
b := bytesutil.ToUnsafeBytes(v)
n := encoding.UnmarshalUint64(b)
bb.B = strconv.AppendUint(bb.B[:0], n, 10)
bb.B = marshalUint64(bb.B[:0], n)
return bytesutil.ToUnsafeString(bb.B)
}

View file

@ -4,9 +4,13 @@ package logstorage
//
// It is expressed as `_time:(start, end]` in LogsQL.
type filterTime struct {
// mintimestamp is the minimum timestamp in nanoseconds to find
minTimestamp int64
// maxTimestamp is the maximum timestamp in nanoseconds to find
maxTimestamp int64
// stringRepr is string representation of the filter
stringRepr string
}

View file

@ -219,7 +219,7 @@ func ParseQuery(s string) (*Query, error) {
f, err := parseFilter(lex)
if err != nil {
return nil, fmt.Errorf("%w; context: %s", err, lex.context())
return nil, fmt.Errorf("%w; context: [%s]", err, lex.context())
}
q := &Query{
f: f,
@ -227,12 +227,12 @@ func ParseQuery(s string) (*Query, error) {
pipes, err := parsePipes(lex)
if err != nil {
return nil, fmt.Errorf("%w; context: %s", err, lex.context())
return nil, fmt.Errorf("%w; context: [%s]", err, lex.context())
}
q.pipes = pipes
if !lex.isEnd() {
return nil, fmt.Errorf("unexpected unparsed tail; context: %s", lex.context())
return nil, fmt.Errorf("unexpected unparsed tail; context: [%s]", lex.context())
}
return q, nil
@ -344,25 +344,25 @@ func parseGenericFilter(lex *lexer, fieldName string) (filter, error) {
case lex.isKeyword(",", ")", "[", "]"):
return nil, fmt.Errorf("unexpected token %q", lex.token)
}
phrase := getCompoundPhrase(lex, fieldName == "")
phrase := getCompoundPhrase(lex, fieldName != "")
return parseFilterForPhrase(lex, phrase, fieldName)
}
func getCompoundPhrase(lex *lexer, stopOnColon bool) string {
func getCompoundPhrase(lex *lexer, allowColon bool) string {
phrase := lex.token
rawPhrase := lex.rawToken
lex.nextToken()
suffix := getCompoundSuffix(lex, stopOnColon)
suffix := getCompoundSuffix(lex, allowColon)
if suffix == "" {
return phrase
}
return rawPhrase + suffix
}
func getCompoundSuffix(lex *lexer, stopOnColon bool) string {
func getCompoundSuffix(lex *lexer, allowColon bool) string {
s := ""
stopTokens := []string{"*", ",", "(", ")", "[", "]", "|", ""}
if stopOnColon {
if !allowColon {
stopTokens = append(stopTokens, ":")
}
for !lex.isSkippedSpace && !lex.isKeyword(stopTokens...) {
@ -495,7 +495,7 @@ func parseFuncArgMaybePrefix(lex *lexer, funcName, fieldName string, callback fu
phrase := lex.token
lex.nextToken()
if !lex.isKeyword("(") {
phrase += getCompoundSuffix(lex, fieldName == "")
phrase += getCompoundSuffix(lex, fieldName != "")
return parseFilterForPhrase(lex, phrase, fieldName)
}
if !lex.mustNextToken() {
@ -676,7 +676,7 @@ func parseFilterRange(lex *lexer, fieldName string) (filter, error) {
case lex.isKeyword("["):
includeMinValue = true
default:
phrase := funcName + getCompoundSuffix(lex, fieldName == "")
phrase := funcName + getCompoundSuffix(lex, fieldName != "")
return parseFilterForPhrase(lex, phrase, fieldName)
}
if !lex.mustNextToken() {
@ -765,7 +765,7 @@ func parseFuncArgs(lex *lexer, fieldName string, callback func(args []string) (f
funcName := lex.token
lex.nextToken()
if !lex.isKeyword("(") {
phrase := funcName + getCompoundSuffix(lex, fieldName == "")
phrase := funcName + getCompoundSuffix(lex, fieldName != "")
return parseFilterForPhrase(lex, phrase, fieldName)
}
if !lex.mustNextToken() {
@ -824,9 +824,9 @@ func parseFilterTimeWithOffset(lex *lexer) (*filterTime, error) {
return nil, fmt.Errorf("missing offset for _time filter %s", ft)
}
s := getCompoundToken(lex)
d, err := promutils.ParseDuration(s)
if err != nil {
return nil, fmt.Errorf("cannot parse offset for _time filter %s: %w", ft, err)
d, ok := tryParseDuration(s)
if !ok {
return nil, fmt.Errorf("cannot parse offset %q for _time filter %s: %w", s, ft, err)
}
offset := int64(d)
ft.minTimestamp -= offset
@ -862,9 +862,9 @@ func parseFilterTime(lex *lexer) (*filterTime, error) {
return ft, nil
}
// Parse _time:duration, which transforms to '_time:(now-duration, now]'
d, err := promutils.ParseDuration(s)
if err != nil {
return nil, fmt.Errorf("cannot parse duration in _time filter: %w", err)
d, ok := tryParseDuration(s)
if !ok {
return nil, fmt.Errorf("cannot parse duration %q in _time filter", s)
}
if d < 0 {
d = -d

View file

@ -862,6 +862,10 @@ func TestParseQuerySuccess(t *testing.T) {
f(`* | stats count() "foo.bar:baz", uniq(a) bar`, `* | stats count() as "foo.bar:baz", uniq(a) as bar`)
f(`* | stats by (x, y) count(*) foo, uniq(a,b) bar`, `* | stats by (x, y) count(*) as foo, uniq(a, b) as bar`)
// stats pipe with grouping buckets
f(`* | stats by(_time:1d, response_size:1_000KiB, request_duration:5s, foo) count() as foo`, `* | stats by (_time:1d, response_size:1_000KiB, request_duration:5s, foo) count() as foo`)
f(`*|stats by(client_ip:/24, server_ip:/16) count() foo`, `* | stats by (client_ip:/24, server_ip:/16) count() as foo`)
// multiple different pipes
f(`* | fields foo, bar | head 100 | stats by(foo,bar) count(baz) as qwert`, `* | fields foo, bar | head 100 | stats by (foo, bar) count(baz) as qwert`)
f(`* | skip 100 | head 20 | skip 10`, `* | skip 100 | head 20 | skip 10`)
@ -1130,6 +1134,10 @@ func TestParseQueryFailure(t *testing.T) {
f(`foo | stats uniq`)
f(`foo | stats uniq()`)
// invalid grouping fields
f(`foo | stats by(foo:bar) count() baz`)
f(`foo | stats by(foo:/bar) count() baz`)
// invalid by clause
f(`foo | stats by`)
f(`foo | stats by bar`)

View file

@ -16,8 +16,8 @@ import (
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#stats
type pipeStats struct {
// byFields contains field names from 'by(...)' clause.
byFields []string
// byFields contains field names with optional buckets from 'by(...)' clause.
byFields []*byField
// resultNames contains names of output results generated by funcs.
resultNames []string
@ -64,7 +64,11 @@ type statsProcessor interface {
func (ps *pipeStats) String() string {
s := "stats "
if len(ps.byFields) > 0 {
s += "by (" + fieldNamesString(ps.byFields) + ") "
a := make([]string, len(ps.byFields))
for i := range ps.byFields {
a[i] = ps.byFields[i].String()
}
s += "by (" + strings.Join(a, ", ") + ") "
}
if len(ps.funcs) == 0 {
@ -185,18 +189,29 @@ func (psp *pipeStatsProcessor) writeBlock(workerID uint, br *blockResult) {
}
if len(byFields) == 1 {
// Special case for grouping by a single column.
c := br.getColumnByName(byFields[0])
bf := byFields[0]
c := br.getColumnByName(bf.name)
if c.isConst {
// Fast path for column with constant value.
shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(c.encodedValues[0]))
v := br.getBucketedValue(c.encodedValues[0], bf.bucketSize)
shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(v))
for _, sfp := range shard.getStatsProcessors(shard.keyBuf) {
shard.stateSizeBudget -= sfp.updateStatsForAllRows(br)
}
return
}
// Slower path for column with different values.
values := c.getValues(br)
values := c.getBucketedValues(br, bf.bucketSize)
if areConstValues(values) {
// Fast path for column with constant values.
shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(values[0]))
for _, sfp := range shard.getStatsProcessors(shard.keyBuf) {
shard.stateSizeBudget -= sfp.updateStatsForAllRows(br)
}
return
}
// Slower generic path for a column with different values.
var sfps []statsProcessor
keyBuf := shard.keyBuf[:0]
for i := range br.timestamps {
@ -212,34 +227,39 @@ func (psp *pipeStatsProcessor) writeBlock(workerID uint, br *blockResult) {
return
}
// Obtain columns for byFields
columnValues := shard.columnValues[:0]
for _, bf := range byFields {
c := br.getColumnByName(bf.name)
values := c.getBucketedValues(br, bf.bucketSize)
columnValues = append(columnValues, values)
}
shard.columnValues = columnValues
// Verify whether all the 'by (...)' columns are constant.
areAllConstColumns := true
keyBuf := shard.keyBuf[:0]
for _, f := range byFields {
c := br.getColumnByName(f)
if !c.isConst {
for _, values := range columnValues {
if !areConstValues(values) {
areAllConstColumns = false
break
}
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(c.encodedValues[0]))
}
shard.keyBuf = keyBuf
if areAllConstColumns {
// Fast path for constant 'by (...)' columns.
keyBuf := shard.keyBuf[:0]
for _, values := range columnValues {
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[0]))
}
for _, sfp := range shard.getStatsProcessors(keyBuf) {
shard.stateSizeBudget -= sfp.updateStatsForAllRows(br)
}
shard.keyBuf = keyBuf
return
}
// The slowest path - group by multiple columns with different values across rows.
// Pre-calculate column values for byFields in order to speed up building group key in the loop below.
shard.columnValues = br.appendColumnValues(shard.columnValues[:0], byFields)
columnValues := shard.columnValues
var sfps []statsProcessor
keyBuf := shard.keyBuf[:0]
for i := range br.timestamps {
// Verify whether the key for 'by (...)' fields equals the previous key
sameValue := sfps != nil
@ -305,8 +325,8 @@ func (psp *pipeStatsProcessor) flush() error {
var values []string
var br blockResult
for _, f := range byFields {
br.addEmptyStringColumn(f)
for _, bf := range byFields {
br.addEmptyStringColumn(bf.name)
}
for _, resultName := range psp.ps.resultNames {
br.addEmptyStringColumn(resultName)
@ -358,20 +378,22 @@ func (psp *pipeStatsProcessor) flush() error {
func (ps *pipeStats) neededFields() []string {
var neededFields []string
m := make(map[string]struct{})
updateNeededFields := func(fields []string) {
for _, field := range fields {
if _, ok := m[field]; !ok {
m[field] = struct{}{}
neededFields = append(neededFields, field)
}
for _, bf := range ps.byFields {
name := bf.name
if _, ok := m[name]; !ok {
m[name] = struct{}{}
neededFields = append(neededFields, name)
}
}
updateNeededFields(ps.byFields)
for _, f := range ps.funcs {
fields := f.neededFields()
updateNeededFields(fields)
for _, fieldName := range f.neededFields() {
if _, ok := m[fieldName]; !ok {
m[fieldName] = struct{}{}
neededFields = append(neededFields, fieldName)
}
}
}
return neededFields
@ -385,11 +407,11 @@ func parsePipeStats(lex *lexer) (*pipeStats, error) {
var ps pipeStats
if lex.isKeyword("by") {
lex.nextToken()
fields, err := parseFieldNamesInParens(lex)
bfs, err := parseByFields(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'by': %w", err)
return nil, fmt.Errorf("cannot parse 'by' clause: %w", err)
}
ps.byFields = fields
ps.byFields = bfs
}
var resultNames []string
@ -476,6 +498,124 @@ func parseResultName(lex *lexer) (string, error) {
return resultName, nil
}
// byField represents by(...) field.
//
// It can have either `name` representation of `name:bucket` representation,
// where `bucket` can contain duration, size or numeric value for creating different buckets
// for 'value/bucket'.
type byField struct {
name string
// bucketSizeStr is string representation of the bucket size
bucketSizeStr string
// bucketSize is the bucket for grouping the given field values with value/bucketSize calculations.
bucketSize float64
}
func (bf *byField) String() string {
s := quoteTokenIfNeeded(bf.name)
if bf.bucketSizeStr != "" {
s += ":" + bf.bucketSizeStr
}
return s
}
func parseByFields(lex *lexer) ([]*byField, error) {
if !lex.isKeyword("(") {
return nil, fmt.Errorf("missing `(`")
}
var bfs []*byField
for {
if !lex.mustNextToken() {
return nil, fmt.Errorf("missing field name or ')'")
}
if lex.isKeyword(")") {
lex.nextToken()
return bfs, nil
}
if lex.isKeyword(",") {
return nil, fmt.Errorf("unexpected `,`")
}
fieldName, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse field name: %w", err)
}
bf := &byField{
name: fieldName,
}
if lex.isKeyword(":") {
lex.nextToken()
bucketSizeStr := lex.token
lex.nextToken()
if bucketSizeStr == "/" {
bucketSizeStr += lex.token
lex.nextToken()
}
bucketSize, ok := tryParseBucketSize(bucketSizeStr)
if !ok {
return nil, fmt.Errorf("cannot parse bucket size for field %q: %q", fieldName, bucketSizeStr)
}
if bucketSize < 0 {
return nil, fmt.Errorf("bucketSize for the field %q cannot be negative; got %q", fieldName, bucketSizeStr)
}
bf.bucketSizeStr = bucketSizeStr
bf.bucketSize = bucketSize
}
bfs = append(bfs, bf)
switch {
case lex.isKeyword(")"):
lex.nextToken()
return bfs, nil
case lex.isKeyword(","):
default:
return nil, fmt.Errorf("unexpected token: %q; expecting ',' or ')'", lex.token)
}
}
}
// tryParseBucketSize tries parsing bucket size, which can have the following formats:
//
// - integer number: 12345
// - floating-point number: 1.2345
// - duration: 1.5s - it is converted to the number of nanoseconds
// - bytes: 1.5KiB
// - ipv4 mask: /24
func tryParseBucketSize(s string) (float64, bool) {
// Try parsing s as floating point number
if f, ok := tryParseFloat64(s); ok {
return f, true
}
// Try parsing s as duration (1s, 5m, etc.)
if nsecs, ok := tryParseDuration(s); ok {
return float64(nsecs), true
}
// Try parsing s as bytes (KiB, MB, etc.)
if n, ok := tryParseBytes(s); ok {
return float64(n), true
}
if n, ok := tryParseIPv4Mask(s); ok {
return float64(n), true
}
return 0, false
}
func parseFieldNamesForFunc(lex *lexer, funcName string) ([]string, error) {
if !lex.isKeyword(funcName) {
return nil, fmt.Errorf("unexpected func; got %q; want %q", lex.token, funcName)
}
lex.nextToken()
fields, err := parseFieldNamesInParens(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse %q args: %w", funcName, err)
}
return fields, nil
}
func parseFieldNamesInParens(lex *lexer) ([]string, error) {
if !lex.isKeyword("(") {
return nil, fmt.Errorf("missing `(`")
@ -509,10 +649,10 @@ func parseFieldNamesInParens(lex *lexer) ([]string, error) {
}
func parseFieldName(lex *lexer) (string, error) {
if lex.isKeyword(",", "(", ")", "[", "]", "|", "") {
if lex.isKeyword(",", "(", ")", "[", "]", "|", ":", "") {
return "", fmt.Errorf("unexpected token: %q", lex.token)
}
token := getCompoundPhrase(lex, true)
token := getCompoundPhrase(lex, false)
return token, nil
}
@ -526,3 +666,16 @@ func fieldNamesString(fields []string) string {
}
return strings.Join(a, ", ")
}
func areConstValues(values []string) bool {
if len(values) == 0 {
return false
}
v := values[0]
for i := 1; i < len(values); i++ {
if v != values[i] {
return false
}
}
return true
}

View file

@ -0,0 +1,61 @@
package logstorage
import (
"testing"
)
func TestTryParseBucketSize_Success(t *testing.T) {
f := func(s string, resultExpected float64) {
t.Helper()
result, ok := tryParseBucketSize(s)
if !ok {
t.Fatalf("cannot parse %q", s)
}
if result != resultExpected {
t.Fatalf("unexpected result; got %f; want %f", result, resultExpected)
}
}
// integers
f("0", 0)
f("123", 123)
f("1_234_678", 1234678)
f("-1_234_678", -1234678)
// floating-point numbers
f("0.0", 0)
f("123.435", 123.435)
f("1_000.433_344", 1000.433344)
f("-1_000.433_344", -1000.433344)
// durations
f("5m", 5*nsecsPerMinute)
f("1h5m3.5s", nsecsPerHour+5*nsecsPerMinute+3.5*nsecsPerSecond)
f("-1h5m3.5s", -(nsecsPerHour + 5*nsecsPerMinute + 3.5*nsecsPerSecond))
// bytes
f("1b", 1)
f("1k", 1_000)
f("1Kb", 1_000)
f("5.5KiB", 5.5*(1<<10))
f("10MB500KB10B", 10*1_000_000+500*1_000+10)
f("10m0k", 10*1_000_000)
}
func TestTryParseBucketSize_Failure(t *testing.T) {
f := func(s string) {
t.Helper()
_, ok := tryParseBucketSize(s)
if ok {
t.Fatalf("expecting error when parsing %q", s)
}
}
f("")
f("foo")
// negative bytes are forbidden
f("-10MB")
}

View file

@ -93,10 +93,9 @@ func (sap *statsAvgProcessor) finalizeStats() string {
}
func parseStatsAvg(lex *lexer) (*statsAvg, error) {
lex.nextToken()
fields, err := parseFieldNamesInParens(lex)
fields, err := parseFieldNamesForFunc(lex, "avg")
if err != nil {
return nil, fmt.Errorf("cannot parse 'avg' args: %w", err)
return nil, err
}
if len(fields) == 0 {
return nil, fmt.Errorf("'avg' must contain at least one arg")

View file

@ -1,7 +1,6 @@
package logstorage
import (
"fmt"
"slices"
"strconv"
"unsafe"
@ -195,10 +194,9 @@ func (scp *statsCountProcessor) finalizeStats() string {
}
func parseStatsCount(lex *lexer) (*statsCount, error) {
lex.nextToken()
fields, err := parseFieldNamesInParens(lex)
fields, err := parseFieldNamesForFunc(lex, "count")
if err != nil {
return nil, fmt.Errorf("cannot parse 'count' args: %w", err)
return nil, err
}
sc := &statsCount{
fields: fields,

View file

@ -93,10 +93,9 @@ func (smp *statsMaxProcessor) finalizeStats() string {
}
func parseStatsMax(lex *lexer) (*statsMax, error) {
lex.nextToken()
fields, err := parseFieldNamesInParens(lex)
fields, err := parseFieldNamesForFunc(lex, "max")
if err != nil {
return nil, fmt.Errorf("cannot parse 'max' args: %w", err)
return nil, err
}
if len(fields) == 0 {
return nil, fmt.Errorf("'max' must contain at least one arg")

View file

@ -93,10 +93,9 @@ func (smp *statsMinProcessor) finalizeStats() string {
}
func parseStatsMin(lex *lexer) (*statsMin, error) {
lex.nextToken()
fields, err := parseFieldNamesInParens(lex)
fields, err := parseFieldNamesForFunc(lex, "min")
if err != nil {
return nil, fmt.Errorf("cannot parse 'min' args: %w", err)
return nil, err
}
if len(fields) == 0 {
return nil, fmt.Errorf("'min' must contain at least one arg")

View file

@ -107,10 +107,9 @@ func (ssp *statsSumProcessor) finalizeStats() string {
}
func parseStatsSum(lex *lexer) (*statsSum, error) {
lex.nextToken()
fields, err := parseFieldNamesInParens(lex)
fields, err := parseFieldNamesForFunc(lex, "sum")
if err != nil {
return nil, fmt.Errorf("cannot parse 'sum' args: %w", err)
return nil, err
}
if len(fields) == 0 {
return nil, fmt.Errorf("'sum' must contain at least one arg")

View file

@ -1,7 +1,6 @@
package logstorage
import (
"fmt"
"slices"
"strconv"
"unsafe"
@ -175,8 +174,13 @@ func (sup *statsUniqProcessor) updateStatsForAllRows(br *blockResult) int {
// Slow path for multiple columns.
// Pre-calculate column values for byFields in order to speed up building group key in the loop below.
sup.columnValues = br.appendColumnValues(sup.columnValues[:0], fields)
columnValues := sup.columnValues
columnValues := sup.columnValues[:0]
for _, f := range fields {
c := br.getColumnByName(f)
values := c.getValues(br)
columnValues = append(columnValues, values)
}
sup.columnValues = columnValues
keyBuf := sup.keyBuf[:0]
for i := range br.timestamps {
@ -352,10 +356,9 @@ func (sup *statsUniqProcessor) finalizeStats() string {
}
func parseStatsUniq(lex *lexer) (*statsUniq, error) {
lex.nextToken()
fields, err := parseFieldNamesInParens(lex)
fields, err := parseFieldNamesForFunc(lex, "uniq")
if err != nil {
return nil, fmt.Errorf("cannot parse 'uniq' args: %w", err)
return nil, err
}
su := &statsUniq{
fields: fields,

View file

@ -156,7 +156,7 @@ func (vd *valuesDecoder) decodeInplace(values []string, vt valueType, dict *valu
}
n := uint64(v[0])
dstLen := len(dstBuf)
dstBuf = strconv.AppendUint(dstBuf, n, 10)
dstBuf = marshalUint64(dstBuf, n)
values[i] = bytesutil.ToUnsafeString(dstBuf[dstLen:])
}
case valueTypeUint16:
@ -167,7 +167,7 @@ func (vd *valuesDecoder) decodeInplace(values []string, vt valueType, dict *valu
b := bytesutil.ToUnsafeBytes(v)
n := uint64(encoding.UnmarshalUint16(b))
dstLen := len(dstBuf)
dstBuf = strconv.AppendUint(dstBuf, n, 10)
dstBuf = marshalUint64(dstBuf, n)
values[i] = bytesutil.ToUnsafeString(dstBuf[dstLen:])
}
case valueTypeUint32:
@ -178,7 +178,7 @@ func (vd *valuesDecoder) decodeInplace(values []string, vt valueType, dict *valu
b := bytesutil.ToUnsafeBytes(v)
n := uint64(encoding.UnmarshalUint32(b))
dstLen := len(dstBuf)
dstBuf = strconv.AppendUint(dstBuf, n, 10)
dstBuf = marshalUint64(dstBuf, n)
values[i] = bytesutil.ToUnsafeString(dstBuf[dstLen:])
}
case valueTypeUint64:
@ -189,7 +189,7 @@ func (vd *valuesDecoder) decodeInplace(values []string, vt valueType, dict *valu
b := bytesutil.ToUnsafeBytes(v)
n := encoding.UnmarshalUint64(b)
dstLen := len(dstBuf)
dstBuf = strconv.AppendUint(dstBuf, n, 10)
dstBuf = marshalUint64(dstBuf, n)
values[i] = bytesutil.ToUnsafeString(dstBuf[dstLen:])
}
case valueTypeDict:
@ -239,19 +239,18 @@ func (vd *valuesDecoder) decodeInplace(values []string, vt valueType, dict *valu
func toTimestampISO8601String(dst []byte, v string) []byte {
b := bytesutil.ToUnsafeBytes(v)
n := encoding.UnmarshalUint64(b)
t := time.Unix(0, int64(n)).UTC()
dst = t.AppendFormat(dst, iso8601Timestamp)
dst = marshalTimestampISO8601(dst, int64(n))
return dst
}
func toIPv4String(dst []byte, v string) []byte {
dst = strconv.AppendUint(dst, uint64(v[0]), 10)
dst = marshalUint64(dst, uint64(v[0]))
dst = append(dst, '.')
dst = strconv.AppendUint(dst, uint64(v[1]), 10)
dst = marshalUint64(dst, uint64(v[1]))
dst = append(dst, '.')
dst = strconv.AppendUint(dst, uint64(v[2]), 10)
dst = marshalUint64(dst, uint64(v[2]))
dst = append(dst, '.')
dst = strconv.AppendUint(dst, uint64(v[3]), 10)
dst = marshalUint64(dst, uint64(v[3]))
return dst
}
@ -259,7 +258,7 @@ func toFloat64String(dst []byte, v string) []byte {
b := bytesutil.ToUnsafeBytes(v)
n := encoding.UnmarshalUint64(b)
f := math.Float64frombits(n)
dst = strconv.AppendFloat(dst, f, 'g', -1, 64)
dst = marshalFloat64(dst, f)
return dst
}
@ -279,10 +278,10 @@ func putValuesDecoder(vd *valuesDecoder) {
var valuesDecoderPool sync.Pool
func tryTimestampISO8601Encoding(dstBuf []byte, dstValues, srcValues []string) ([]byte, []string, valueType, uint64, uint64) {
u64s := encoding.GetUint64s(len(srcValues))
defer encoding.PutUint64s(u64s)
u64s := encoding.GetInt64s(len(srcValues))
defer encoding.PutInt64s(u64s)
a := u64s.A
var minValue, maxValue uint64
var minValue, maxValue int64
for i, v := range srcValues {
n, ok := tryParseTimestampISO8601(v)
if !ok {
@ -298,14 +297,69 @@ func tryTimestampISO8601Encoding(dstBuf []byte, dstValues, srcValues []string) (
}
for _, n := range a {
dstLen := len(dstBuf)
dstBuf = encoding.MarshalUint64(dstBuf, n)
dstBuf = encoding.MarshalUint64(dstBuf, uint64(n))
v := bytesutil.ToUnsafeString(dstBuf[dstLen:])
dstValues = append(dstValues, v)
}
return dstBuf, dstValues, valueTypeTimestampISO8601, minValue, maxValue
return dstBuf, dstValues, valueTypeTimestampISO8601, uint64(minValue), uint64(maxValue)
}
func tryParseTimestampISO8601(s string) (uint64, bool) {
// tryParseTimestampRFC3339Nano parses 'YYYY-MM-DDThh:mm:ss' with optional nanoseconds part and 'Z' tail and returns unix timestamp in nanoseconds.
//
// The returned timestamp can be negative if s is smaller than 1970 year.
func tryParseTimestampRFC3339Nano(s string) (int64, bool) {
// Do not parse timestamps with timezone other than Z, since they cannot be converted back
// to the same string representation in general case.
// This may break search.
if len(s) < len("2006-01-02T15:04:05Z") {
return 0, false
}
secs, ok, tail := tryParseTimestampSecs(s)
if !ok {
return 0, false
}
s = tail
nsecs := secs * 1e9
// Parse optional fractional part of seconds.
n := strings.IndexByte(s, 'Z')
if n < 0 || n != len(s)-1 {
return 0, false
}
s = s[:n]
if len(s) == 0 {
return nsecs, true
}
if s[0] == '.' {
s = s[1:]
}
digits := len(s)
if digits > 9 {
return 0, false
}
n64, ok := tryParseUint64(s)
if !ok {
return 0, false
}
if digits < 9 {
n64 *= uint64(math.Pow10(9 - digits))
}
nsecs += int64(n64)
return nsecs, true
}
// marshalTimestampRFC3339Nano appends RFC3339Nano-formatted nsecs to dst and returns the result.
func marshalTimestampRFC3339Nano(dst []byte, nsecs int64) []byte {
return time.Unix(0, nsecs).UTC().AppendFormat(dst, time.RFC3339Nano)
}
// tryParseTimestampISO8601 parses 'YYYY-MM-DDThh:mm:ss.mssZ' and returns unix timestamp in nanoseconds.
//
// The returned timestamp can be negative if s is smaller than 1970 year.
func tryParseTimestampISO8601(s string) (int64, bool) {
// Do not parse timestamps with timezone, since they cannot be converted back
// to the same string representation in general case.
// This may break search.
@ -313,117 +367,155 @@ func tryParseTimestampISO8601(s string) (uint64, bool) {
return 0, false
}
secs, ok, tail := tryParseTimestampSecs(s)
if !ok {
return 0, false
}
s = tail
nsecs := secs * 1e9
if s[0] != '.' {
return 0, false
}
s = s[1:]
// Parse milliseconds
tzDelimiter := s[len("000")]
if tzDelimiter != 'Z' {
return 0, false
}
millisecondStr := s[:len("000")]
msecs, ok := tryParseUint64(millisecondStr)
if !ok {
return 0, false
}
s = s[len("000")+1:]
if len(s) != 0 {
logger.Panicf("BUG: unexpected tail in timestamp: %q", s)
}
nsecs += int64(msecs) * 1e6
return nsecs, true
}
// marshalTimestampISO8601 appends ISO8601-formatted nsecs to dst and returns the result.
func marshalTimestampISO8601(dst []byte, nsecs int64) []byte {
return time.Unix(0, nsecs).UTC().AppendFormat(dst, iso8601Timestamp)
}
const iso8601Timestamp = "2006-01-02T15:04:05.000Z"
// tryParseTimestampSecs parses YYYY-MM-DDTHH:mm:ss into unix timestamp in seconds.
func tryParseTimestampSecs(s string) (int64, bool, string) {
// Parse year
if s[len("YYYY")] != '-' {
return 0, false
return 0, false, s
}
yearStr := s[:len("YYYY")]
n, ok := tryParseUint64(yearStr)
if !ok || n > 3000 {
return 0, false
if !ok || n < 1677 || n > 2262 {
return 0, false, s
}
year := int(n)
s = s[len("YYYY")+1:]
// Parse month
if s[len("MM")] != '-' {
return 0, false
return 0, false, s
}
monthStr := s[:len("MM")]
n, ok = tryParseUint64(monthStr)
if !ok || n < 1 || n > 12 {
return 0, false
if !ok {
return 0, false, s
}
month := time.Month(n)
s = s[len("MM")+1:]
// Parse day
if s[len("DD")] != 'T' {
return 0, false
return 0, false, s
}
dayStr := s[:len("DD")]
n, ok = tryParseUint64(dayStr)
if !ok || n < 1 || n > 31 {
return 0, false
if !ok {
return 0, false, s
}
day := int(n)
s = s[len("DD")+1:]
// Parse hour
if s[len("HH")] != ':' {
return 0, false
return 0, false, s
}
hourStr := s[:len("HH")]
n, ok = tryParseUint64(hourStr)
if !ok || n > 23 {
return 0, false
if !ok {
return 0, false, s
}
hour := int(n)
s = s[len("HH")+1:]
// Parse minute
if s[len("MM")] != ':' {
return 0, false
return 0, false, s
}
minuteStr := s[:len("MM")]
n, ok = tryParseUint64(minuteStr)
if !ok || n > 59 {
return 0, false
if !ok {
return 0, false, s
}
minute := int(n)
s = s[len("MM")+1:]
// Parse second
if s[len("SS")] != '.' {
return 0, false
}
secondStr := s[:len("SS")]
n, ok = tryParseUint64(secondStr)
if !ok || n > 59 {
return 0, false
if !ok {
return 0, false, s
}
second := int(n)
s = s[len("SS")+1:]
s = s[len("SS"):]
// Parse millisecond
tzDelimiter := s[len("000")]
if tzDelimiter != 'Z' {
return 0, false
secs := time.Date(year, month, day, hour, minute, second, 0, time.UTC).Unix()
if secs < int64(-1<<63)/1e9 || secs >= int64((1<<63)-1)/1e9 {
// Too big or too small timestamp
return 0, false, s
}
millisecondStr := s[:len("000")]
n, ok = tryParseUint64(millisecondStr)
if !ok || n > 999 {
return 0, false
}
millisecond := int(n)
s = s[len("000")+1:]
if len(s) != 0 {
return 0, false
}
t := time.Date(year, month, day, hour, minute, second, millisecond*1e6, time.UTC)
ts := t.UnixNano()
return uint64(ts), true
return secs, true, s
}
// tryParseUint64 parses s as uint64 value.
func tryParseUint64(s string) (uint64, bool) {
if len(s) == 0 || len(s) > 18 {
if len(s) == 0 || len(s) > len("18_446_744_073_709_551_615") {
return 0, false
}
n := uint64(0)
for i := 0; i < len(s); i++ {
ch := s[i]
if ch == '_' {
continue
}
if ch < '0' || ch > '9' {
return 0, false
}
if n > ((1<<64)-1)/10 {
return 0, false
}
n *= 10
n += uint64(ch - '0')
d := uint64(ch - '0')
if n > (1<<64)-1-d {
return 0, false
}
n += d
}
return n, true
}
const iso8601Timestamp = "2006-01-02T15:04:05.000Z"
// marshalUint64 appends string representation of n to dst and returns the result.
func marshalUint64(dst []byte, n uint64) []byte {
return strconv.AppendUint(dst, n, 10)
}
func tryIPv4Encoding(dstBuf []byte, dstValues, srcValues []string) ([]byte, []string, valueType, uint64, uint64) {
u32s := encoding.GetUint32s(len(srcValues))
@ -452,6 +544,7 @@ func tryIPv4Encoding(dstBuf []byte, dstValues, srcValues []string) ([]byte, []st
return dstBuf, dstValues, valueTypeIPv4, uint64(minValue), uint64(maxValue)
}
// tryParseIPv4 tries parsing ipv4 from s.
func tryParseIPv4(s string) (uint32, bool) {
if len(s) < len("1.1.1.1") || len(s) > len("255.255.255.255") || strings.Count(s, ".") != 3 {
// Fast path - the entry isn't IPv4
@ -509,6 +602,18 @@ func tryParseIPv4(s string) (uint32, bool) {
return ipv4, true
}
// marshalIPv4 appends string representation of IPv4 address in n to dst and returns the result.
func marshalIPv4(dst []byte, n uint32) []byte {
dst = marshalUint64(dst, uint64(n>>24))
dst = append(dst, '.')
dst = marshalUint64(dst, uint64((n>>16)&0xff))
dst = append(dst, '.')
dst = marshalUint64(dst, uint64((n>>8)&0xff))
dst = append(dst, '.')
dst = marshalUint64(dst, uint64(n&0xff))
return dst
}
func tryFloat64Encoding(dstBuf []byte, dstValues, srcValues []string) ([]byte, []string, valueType, uint64, uint64) {
u64s := encoding.GetUint64s(len(srcValues))
defer encoding.PutUint64s(u64s)
@ -538,6 +643,20 @@ func tryFloat64Encoding(dstBuf []byte, dstValues, srcValues []string) ([]byte, [
return dstBuf, dstValues, valueTypeFloat64, minValueU64, maxValueU64
}
// tryParseFloat64Prefix tries parsing float64 number at the beginning of s and returns the remaining tail.
func tryParseFloat64Prefix(s string) (float64, bool, string) {
i := 0
for i < len(s) && (s[i] >= '0' && s[i] <= '9' || s[i] == '.' || s[i] == '_') {
i++
}
if i == 0 {
return 0, false, s
}
f, ok := tryParseFloat64(s[:i])
return f, ok, s[i:]
}
// tryParseFloat64 tries parsing s as float64.
func tryParseFloat64(s string) (float64, bool) {
if len(s) == 0 || len(s) > 20 {
return 0, false
@ -578,13 +697,292 @@ func tryParseFloat64(s string) (float64, bool) {
if !ok {
return 0, false
}
f := math.FMA(float64(nFrac), math.Pow10(-len(sFrac)), float64(nInt))
p10 := math.Pow10(strings.Count(sFrac, "_") - len(sFrac))
f := math.FMA(float64(nFrac), p10, float64(nInt))
if minus {
f = -f
}
return f, true
}
// marshalFloat64 appends formatted f to dst and returns the result.
func marshalFloat64(dst []byte, f float64) []byte {
return strconv.AppendFloat(dst, f, 'f', -1, 64)
}
// tryParseBytes parses user-readable bytes representation in s.
//
// Supported suffixes:
//
// K, KB - for 1000
func tryParseBytes(s string) (int64, bool) {
if len(s) == 0 {
return 0, false
}
n := int64(0)
for len(s) > 0 {
f, ok, tail := tryParseFloat64Prefix(s)
if !ok {
return 0, false
}
s = tail
if len(s) == 0 {
n += int64(f)
continue
}
if len(s) >= 3 {
prefix := s[:3]
switch {
case strings.EqualFold(prefix, "kib"):
n += int64(f * (1 << 10))
s = s[3:]
continue
case strings.EqualFold(prefix, "mib"):
n += int64(f * (1 << 20))
s = s[3:]
continue
case strings.EqualFold(prefix, "gib"):
n += int64(f * (1 << 30))
s = s[3:]
continue
case strings.EqualFold(prefix, "tib"):
n += int64(f * (1 << 40))
s = s[3:]
continue
}
}
if len(s) >= 2 {
prefix := s[:2]
switch {
case strings.EqualFold(prefix, "ki"):
n += int64(f * (1 << 10))
s = s[2:]
continue
case strings.EqualFold(prefix, "mi"):
n += int64(f * (1 << 20))
s = s[2:]
continue
case strings.EqualFold(prefix, "gi"):
n += int64(f * (1 << 30))
s = s[2:]
continue
case strings.EqualFold(prefix, "ti"):
n += int64(f * (1 << 40))
s = s[2:]
continue
case strings.EqualFold(prefix, "kb"):
n += int64(f * 1_000)
s = s[2:]
continue
case strings.EqualFold(prefix, "mb"):
n += int64(f * 1_000_000)
s = s[2:]
continue
case strings.EqualFold(prefix, "gb"):
n += int64(f * 1_000_000_000)
s = s[2:]
continue
case strings.EqualFold(prefix, "tb"):
n += int64(f * 1_000_000_000_000)
s = s[2:]
continue
}
}
prefix := s[:1]
switch {
case strings.EqualFold(prefix, "b"):
n += int64(f)
s = s[1:]
continue
case strings.EqualFold(prefix, "k"):
n += int64(f * 1_000)
s = s[1:]
continue
case strings.EqualFold(prefix, "m"):
n += int64(f * 1_000_000)
s = s[1:]
continue
case strings.EqualFold(prefix, "g"):
n += int64(f * 1_000_000_000)
s = s[1:]
continue
case strings.EqualFold(prefix, "t"):
n += int64(f * 1_000_000_000_000)
s = s[1:]
continue
}
}
return n, true
}
// tryParseIPv4Mask parses '/num' ipv4 mask and returns (1<<(32-num))
func tryParseIPv4Mask(s string) (uint64, bool) {
if len(s) == 0 || s[0] != '/' {
return 0, false
}
s = s[1:]
n, ok := tryParseUint64(s)
if !ok || n > 32 {
return 0, false
}
return 1 << (32 - uint8(n)), true
}
// tryParseDuration parses the given duration in nanoseconds and returns the result.
func tryParseDuration(s string) (int64, bool) {
if len(s) == 0 {
return 0, false
}
isMinus := s[0] == '-'
if isMinus {
s = s[1:]
}
nsecs := int64(0)
for len(s) > 0 {
f, ok, tail := tryParseFloat64Prefix(s)
if !ok {
return 0, false
}
s = tail
if len(s) == 0 {
return 0, false
}
if len(s) >= 3 {
prefix := s[:3]
if strings.EqualFold(prefix, "µs") {
nsecs += int64(f * nsecsPerMicrosecond)
s = s[3:]
continue
}
}
if len(s) >= 2 {
prefix := s[:2]
switch {
case strings.EqualFold(prefix, "ms"):
nsecs += int64(f * nsecsPerMillisecond)
s = s[2:]
continue
case strings.EqualFold(prefix, "ns"):
nsecs += int64(f)
s = s[2:]
continue
}
}
prefix := s[:1]
switch {
case strings.EqualFold(prefix, "y"):
nsecs += int64(f * nsecsPerYear)
s = s[1:]
case strings.EqualFold(prefix, "w"):
nsecs += int64(f * nsecsPerWeek)
s = s[1:]
continue
case strings.EqualFold(prefix, "d"):
nsecs += int64(f * nsecsPerDay)
s = s[1:]
continue
case strings.EqualFold(prefix, "h"):
nsecs += int64(f * nsecsPerHour)
s = s[1:]
continue
case strings.EqualFold(prefix, "m"):
nsecs += int64(f * nsecsPerMinute)
s = s[1:]
continue
case strings.EqualFold(prefix, "s"):
nsecs += int64(f * nsecsPerSecond)
s = s[1:]
continue
default:
return 0, false
}
}
if isMinus {
nsecs = -nsecs
}
return nsecs, true
}
// marshalDuration appends string representation of nsec duration to dst and returns the result.
func marshalDuration(dst []byte, nsecs int64) []byte {
if nsecs == 0 {
return append(dst, '0')
}
if nsecs < 0 {
dst = append(dst, '-')
nsecs = -nsecs
}
formatFloat64Seconds := nsecs >= nsecsPerSecond
if nsecs >= nsecsPerWeek {
weeks := nsecs / nsecsPerWeek
nsecs -= weeks * nsecsPerWeek
dst = marshalUint64(dst, uint64(weeks))
dst = append(dst, 'w')
}
if nsecs >= nsecsPerDay {
days := nsecs / nsecsPerDay
nsecs -= days * nsecsPerDay
dst = marshalUint64(dst, uint64(days))
dst = append(dst, 'd')
}
if nsecs >= nsecsPerHour {
hours := nsecs / nsecsPerHour
nsecs -= hours * nsecsPerHour
dst = marshalUint64(dst, uint64(hours))
dst = append(dst, 'h')
}
if nsecs >= nsecsPerMinute {
minutes := nsecs / nsecsPerMinute
nsecs -= minutes * nsecsPerMinute
dst = marshalUint64(dst, uint64(minutes))
dst = append(dst, 'm')
}
if nsecs >= nsecsPerSecond {
if formatFloat64Seconds {
seconds := float64(nsecs) / nsecsPerSecond
dst = marshalFloat64(dst, seconds)
dst = append(dst, 's')
return dst
}
seconds := nsecs / nsecsPerSecond
nsecs -= seconds * nsecsPerSecond
dst = marshalUint64(dst, uint64(seconds))
dst = append(dst, 's')
}
if nsecs >= nsecsPerMillisecond {
msecs := nsecs / nsecsPerMillisecond
nsecs -= msecs * nsecsPerMillisecond
dst = marshalUint64(dst, uint64(msecs))
dst = append(dst, "ms"...)
}
if nsecs >= nsecsPerMicrosecond {
usecs := nsecs / nsecsPerMicrosecond
nsecs -= usecs * nsecsPerMicrosecond
dst = marshalUint64(dst, uint64(usecs))
dst = append(dst, "µs"...)
}
if nsecs > 0 {
dst = marshalUint64(dst, uint64(nsecs))
dst = append(dst, "ns"...)
}
return dst
}
const (
nsecsPerYear = 365 * 24 * 3600 * 1e9
nsecsPerWeek = 7 * 24 * 3600 * 1e9
nsecsPerDay = 24 * 3600 * 1e9
nsecsPerHour = 3600 * 1e9
nsecsPerMinute = 60 * 1e9
nsecsPerSecond = 1e9
nsecsPerMillisecond = 1e6
nsecsPerMicrosecond = 1e3
)
func tryUintEncoding(dstBuf []byte, dstValues, srcValues []string) ([]byte, []string, valueType, uint64, uint64) {
u64s := encoding.GetUint64s(len(srcValues))
defer encoding.PutUint64s(u64s)

View file

@ -96,133 +96,601 @@ func TestValuesEncoder(t *testing.T) {
f(values, valueTypeFloat64, 4607182418800017408, 4613937818241073152)
}
func TestTryParseIPv4(t *testing.T) {
f := func(s string, nExpected uint32, okExpected bool) {
func TestTryParseIPv4_Success(t *testing.T) {
f := func(s string) {
t.Helper()
n, ok := tryParseIPv4(s)
if n != nExpected {
t.Fatalf("unexpected n; got %d; want %d", n, nExpected)
if !ok {
t.Fatalf("cannot parse %q", s)
}
if ok != okExpected {
t.Fatalf("unexpected ok; got %v; want %v", ok, okExpected)
data := marshalIPv4(nil, n)
if string(data) != s {
t.Fatalf("unexpected ip; got %q; want %q", data, s)
}
}
f("", 0, false)
f("foo", 0, false)
f("a.b.c.d", 0, false)
f("1.2.3.4", 0x01020304, true)
f("255.255.255.255", 0xffffffff, true)
f("0.0.0.0", 0, true)
f("127.0.0.1", 0x7f000001, true)
f("127.0.0.x", 0, false)
f("127.0.x.0", 0, false)
f("127.x.0.0", 0, false)
f("x.0.0.0", 0, false)
f("127.127.127.256", 0, false)
f("127.127.256.127", 0, false)
f("127.256.127.127", 0, false)
f("256.127.127.127", 0, false)
f("-1.127.127.127", 0, false)
f("127.-1.127.127", 0, false)
f("127.127.-1.127", 0, false)
f("127.127.127.-1", 0, false)
f("0.0.0.0")
f("1.2.3.4")
f("255.255.255.255")
f("127.0.0.1")
}
func TestTryParseTimestampISO8601(t *testing.T) {
f := func(s string, timestampExpected uint64, okExpected bool) {
func TestTryParseIPv4_Failure(t *testing.T) {
f := func(s string) {
t.Helper()
timestamp, ok := tryParseTimestampISO8601(s)
if timestamp != timestampExpected {
t.Fatalf("unexpected timestamp; got %d; want %d", timestamp, timestampExpected)
}
if ok != okExpected {
t.Fatalf("unexpected ok; got %v; want %v", ok, okExpected)
_, ok := tryParseIPv4(s)
if ok {
t.Fatalf("expecting error when parsing %q", s)
}
}
f("2023-01-15T23:45:51.123Z", 1673826351123000000, true)
f("")
f("foo")
f("a.b.c.d")
f("127.0.0.x")
f("127.0.x.0")
f("127.x.0.0")
f("x.0.0.0")
// Invalid milliseconds
f("2023-01-15T22:15:51.12345Z", 0, false)
f("2023-01-15T22:15:51.12Z", 0, false)
f("2023-01-15T22:15:51Z", 0, false)
// Too big octets
f("127.127.127.256")
f("127.127.256.127")
f("127.256.127.127")
f("256.127.127.127")
// Missing Z
f("2023-01-15T23:45:51.123", 0, false)
// Negative octets
f("-1.127.127.127")
f("127.-1.127.127")
f("127.127.-1.127")
f("127.127.127.-1")
}
// Invalid timestamp
f("foo", 0, false)
f("2023-01-15T23:45:51.123Zxyabcd", 0, false)
f("2023-01-15T23:45:51.123Z01:00", 0, false)
func TestTryParseTimestampRFC3339Nano_Success(t *testing.T) {
f := func(s string) {
t.Helper()
nsecs, ok := tryParseTimestampRFC3339Nano(s)
if !ok {
t.Fatalf("cannot parse timestamp %q", s)
}
data := marshalTimestampRFC3339Nano(nil, nsecs)
if string(data) != s {
t.Fatalf("unexpected timestamp; got %q; want %q", data, s)
}
}
// No fractional seconds
f("2023-01-15T23:45:51Z")
// Different number of fractional seconds
f("2023-01-15T23:45:51.1Z")
f("2023-01-15T23:45:51.12Z")
f("2023-01-15T23:45:51.123Z")
f("2023-01-15T23:45:51.1234Z")
f("2023-01-15T23:45:51.12345Z")
f("2023-01-15T23:45:51.123456Z")
f("2023-01-15T23:45:51.1234567Z")
f("2023-01-15T23:45:51.12345678Z")
f("2023-01-15T23:45:51.123456789Z")
// The minimum possible timestamp
f("1677-09-21T00:12:44Z")
// The maximum possible timestamp
f("2262-04-11T23:47:15.999999999Z")
}
func TestTryParseTimestampRFC3339Nano_Failure(t *testing.T) {
f := func(s string) {
t.Helper()
_, ok := tryParseTimestampRFC3339Nano(s)
if ok {
t.Fatalf("expecting faulure when parsing %q", s)
}
}
// invalid length
f("")
f("foobar")
// Missing Z at the end
f("2023-01-15T22:15:51")
f("2023-01-15T22:15:51.123")
// missing fractional part after dot
f("2023-01-15T22:15:51.Z")
// timestamp with timezone
f("2023-01-16T00:45:51.123+01:00", 0, false)
f("2023-01-16T00:45:51+01:00")
f("2023-01-16T00:45:51.123+01:00")
// too small year
f("1676-09-21T00:12:43Z")
// too big year
f("2263-04-11T23:47:17Z")
// too small timestamp
f("1677-09-21T00:12:43.999999999Z")
// too big timestamp
f("2262-04-11T23:47:16Z")
// invalid year
f("YYYY-04-11T23:47:17Z")
// invalid moth
f("2023-MM-11T23:47:17Z")
// invalid day
f("2023-01-DDT23:47:17Z")
// invalid hour
f("2023-01-23Thh:47:17Z")
// invalid minute
f("2023-01-23T23:mm:17Z")
// invalid second
f("2023-01-23T23:33:ssZ")
}
func TestTryParseFloat64(t *testing.T) {
f := func(s string, valueExpected float64, okExpected bool) {
func TestTryParseTimestampISO8601_Success(t *testing.T) {
f := func(s string) {
t.Helper()
value, ok := tryParseFloat64(s)
if value != valueExpected {
t.Fatalf("unexpected value; got %v; want %v", value, valueExpected)
nsecs, ok := tryParseTimestampISO8601(s)
if !ok {
t.Fatalf("cannot parse timestamp %q", s)
}
if ok != okExpected {
t.Fatalf("unexpected ok; got %v; want %v", ok, okExpected)
data := marshalTimestampISO8601(nil, nsecs)
if string(data) != s {
t.Fatalf("unexpected timestamp; got %q; want %q", data, s)
}
}
f("0", 0, true)
f("1234567890", 1234567890, true)
f("-1.234567", -1.234567, true)
// regular timestamp
f("2023-01-15T23:45:51.123Z")
// The minimum possible timestamp
f("1677-09-21T00:12:44.000Z")
// The maximum possible timestamp
f("2262-04-11T23:47:15.999Z")
}
func TestTryParseTimestampISO8601_Failure(t *testing.T) {
f := func(s string) {
t.Helper()
_, ok := tryParseTimestampISO8601(s)
if ok {
t.Fatalf("expecting faulure when parsing %q", s)
}
}
// invalid length
f("")
f("foobar")
// Missing Z at the end
f("2023-01-15T22:15:51.123")
f("2023-01-15T22:15:51.1234")
// timestamp with timezone
f("2023-01-16T00:45:51.123+01:00")
// too small year
f("1676-09-21T00:12:43.434Z")
// too big year
f("2263-04-11T23:47:17.434Z")
// too small timestamp
f("1677-09-21T00:12:43.999Z")
// too big timestamp
f("2262-04-11T23:47:16.000Z")
// invalid year
f("YYYY-04-11T23:47:17.123Z")
// invalid moth
f("2023-MM-11T23:47:17.123Z")
// invalid day
f("2023-01-DDT23:47:17.123Z")
// invalid hour
f("2023-01-23Thh:47:17.123Z")
// invalid minute
f("2023-01-23T23:mm:17.123Z")
// invalid second
f("2023-01-23T23:33:ss.123Z")
}
func TestTryParseDuration_Success(t *testing.T) {
f := func(s string, nsecsExpected int64) {
t.Helper()
nsecs, ok := tryParseDuration(s)
if !ok {
t.Fatalf("cannot parse %q", s)
}
if nsecs != nsecsExpected {
t.Fatalf("unexpected value; got %d; want %d", nsecs, nsecsExpected)
}
}
// zero duration
f("0s", 0)
f("0S", 0)
f("0.0w0d0h0s0.0ms", 0)
f("-0w", 0)
// positive duration
f("1s", nsecsPerSecond)
f("1.5ms", 1.5*nsecsPerMillisecond)
f("1µs", nsecsPerMicrosecond)
f("1ns", 1)
f("1NS", 1)
f("1nS", 1)
f("1Ns", 1)
f("1h", nsecsPerHour)
f("1H", nsecsPerHour)
f("1.5d", 1.5*nsecsPerDay)
f("1.5D", 1.5*nsecsPerDay)
f("1.5w", 1.5*nsecsPerWeek)
f("1.5W", 1.5*nsecsPerWeek)
f("2.5y", 2.5*nsecsPerYear)
f("1m5.123456789s", nsecsPerMinute+5.123456789*nsecsPerSecond)
// composite duration
f("1h5m", nsecsPerHour+5*nsecsPerMinute)
f("1.1h5m2.5s3_456ns", 1.1*nsecsPerHour+5*nsecsPerMinute+2.5*nsecsPerSecond+3456)
// nedgative duration
f("-1h5m3s", -(nsecsPerHour + 5*nsecsPerMinute + 3*nsecsPerSecond))
}
func TestTryParseDuration_Failure(t *testing.T) {
f := func(s string) {
t.Helper()
_, ok := tryParseDuration(s)
if ok {
t.Fatalf("expecting error for parsing %q", s)
}
}
// empty string
f("")
// missing suffix
f("2")
f("2.5")
// invalid string
f("foobar")
f("1foo")
f("1soo")
f("3.43e")
f("3.43es")
// superflouous space
f(" 2s")
f("2s ")
f("2s 3ms")
}
func TestMarshalDuration(t *testing.T) {
f := func(nsecs int64, resultExpected string) {
t.Helper()
result := marshalDuration(nil, nsecs)
if string(result) != resultExpected {
t.Fatalf("unexpected result; got %q; want %q", result, resultExpected)
}
}
f(0, "0")
f(1, "1ns")
f(-1, "-1ns")
f(12345, "12µs345ns")
f(123456789, "123ms456µs789ns")
f(12345678901, "12.345678901s")
f(1234567890143, "20m34.567890143s")
f(1234567890123457, "2w6h56m7.890123457s")
}
func TestTryParseBytes_Success(t *testing.T) {
f := func(s string, resultExpected int64) {
t.Helper()
result, ok := tryParseBytes(s)
if !ok {
t.Fatalf("cannot parse %q", s)
}
if result != resultExpected {
t.Fatalf("unexpected result; got %d; want %d", result, resultExpected)
}
}
f("123.456", 123)
f("1_500", 1_500)
f("2.5b", 2)
f("2.5B", 2)
f("1.5k", 1_500)
f("1.5m", 1_500_000)
f("1.5g", 1_500_000_000)
f("1.5t", 1_500_000_000_000)
f("1.5K", 1_500)
f("1.5M", 1_500_000)
f("1.5G", 1_500_000_000)
f("1.5T", 1_500_000_000_000)
f("1.5kb", 1_500)
f("1.5mb", 1_500_000)
f("1.5gb", 1_500_000_000)
f("1.5tb", 1_500_000_000_000)
f("1.5Kb", 1_500)
f("1.5Mb", 1_500_000)
f("1.5Gb", 1_500_000_000)
f("1.5Tb", 1_500_000_000_000)
f("1.5KB", 1_500)
f("1.5MB", 1_500_000)
f("1.5GB", 1_500_000_000)
f("1.5TB", 1_500_000_000_000)
f("1.5ki", 1.5*(1<<10))
f("1.5mi", 1.5*(1<<20))
f("1.5gi", 1.5*(1<<30))
f("1.5ti", 1.5*(1<<40))
f("1.5Ki", 1.5*(1<<10))
f("1.5Mi", 1.5*(1<<20))
f("1.5Gi", 1.5*(1<<30))
f("1.5Ti", 1.5*(1<<40))
f("1.5KI", 1.5*(1<<10))
f("1.5MI", 1.5*(1<<20))
f("1.5GI", 1.5*(1<<30))
f("1.5TI", 1.5*(1<<40))
f("1.5kib", 1.5*(1<<10))
f("1.5mib", 1.5*(1<<20))
f("1.5gib", 1.5*(1<<30))
f("1.5tib", 1.5*(1<<40))
f("1.5kiB", 1.5*(1<<10))
f("1.5miB", 1.5*(1<<20))
f("1.5giB", 1.5*(1<<30))
f("1.5tiB", 1.5*(1<<40))
f("1.5KiB", 1.5*(1<<10))
f("1.5MiB", 1.5*(1<<20))
f("1.5GiB", 1.5*(1<<30))
f("1.5TiB", 1.5*(1<<40))
f("1MiB500KiB200B", (1<<20)+500*(1<<10)+200)
}
func TestTryParseBytes_Failure(t *testing.T) {
f := func(s string) {
t.Helper()
_, ok := tryParseBytes(s)
if ok {
t.Fatalf("expecting error when parsing %q", s)
}
}
// empty string
f("")
// invalid number
f("foobar")
// invalid suffix
f("123q")
f("123qs")
f("123qsb")
f("123sqsb")
f("123s5qsb")
}
func TestTryParseFloat64_Success(t *testing.T) {
f := func(s string, resultExpected float64) {
t.Helper()
result, ok := tryParseFloat64(s)
if !ok {
t.Fatalf("cannot parse %q", s)
}
if !float64Equal(result, resultExpected) {
t.Fatalf("unexpected value; got %f; want %f", result, resultExpected)
}
}
f("0", 0)
f("1", 1)
f("-1", -1)
f("1234567890", 1234567890)
f("1_234_567_890", 1234567890)
f("-1.234_567", -1.234567)
f("0.345", 0.345)
f("-0.345", -0.345)
}
func float64Equal(a, b float64) bool {
return math.Abs(a-b)*math.Abs(max(a, b)) < 1e-15
}
func TestTryParseFloat64_Failure(t *testing.T) {
f := func(s string) {
t.Helper()
_, ok := tryParseFloat64(s)
if ok {
t.Fatalf("expecting error when parsing %q", s)
}
}
// Empty value
f("", 0, false)
f("")
// Plus in the value isn't allowed, since it cannot be convered back to the same string representation
f("+123", 0, false)
f("+123")
// Dot at the beginning and the end of value isn't allowed, since it cannot converted back to the same string representation
f(".123", 0, false)
f("123.", 0, false)
f(".123")
f("123.")
// Multiple dots aren't allowed
f("123.434.55", 0, false)
f("123.434.55")
// Invalid dots
f("-.123", 0, false)
f(".", 0, false)
f("-.123")
f(".")
// Scientific notation isn't allowed, since it cannot be converted back to the same string representation
f("12e5", 0, false)
f("12e5")
// Minus in the middle of string isn't allowed
f("12-5", 0, false)
f("12-5")
}
func TestTryParseUint64(t *testing.T) {
f := func(s string, valueExpected uint64, okExpected bool) {
func TestMarshalFloat64(t *testing.T) {
f := func(f float64, resultExpected string) {
t.Helper()
value, ok := tryParseUint64(s)
if value != valueExpected {
t.Fatalf("unexpected value; got %d; want %d", value, valueExpected)
}
if ok != okExpected {
t.Fatalf("unexpected ok; got %v; want %v", ok, okExpected)
result := marshalFloat64(nil, f)
if string(result) != resultExpected {
t.Fatalf("unexpected result; got %q; want %q", result, resultExpected)
}
}
f("0", 0, true)
f("123456789012345678", 123456789012345678, true)
f(0, "0")
f(1234, "1234")
f(-12345678, "-12345678")
f(1.234, "1.234")
f(-1.234567, "-1.234567")
}
func TestTryParseUint64_Success(t *testing.T) {
f := func(s string, resultExpected uint64) {
t.Helper()
result, ok := tryParseUint64(s)
if !ok {
t.Fatalf("cannot parse %q", s)
}
if result != resultExpected {
t.Fatalf("unexpected value; got %d; want %d", result, resultExpected)
}
}
f("0", 0)
f("123", 123)
f("123456", 123456)
f("123456789", 123456789)
f("123456789012", 123456789012)
f("123456789012345", 123456789012345)
f("123456789012345678", 123456789012345678)
f("12345678901234567890", 12345678901234567890)
f("12_345_678_901_234_567_890", 12345678901234567890)
// the maximum possible value
f("18446744073709551615", 18446744073709551615)
}
func TestTryParseUint64_Failure(t *testing.T) {
f := func(s string) {
t.Helper()
_, ok := tryParseUint64(s)
if ok {
t.Fatalf("expecting error when parsing %q", s)
}
}
// empty value
f("", 0, false)
f("")
// too big value
f("1234567890123456789", 0, false)
f("18446744073709551616")
// invalid value
f("foo", 0, false)
f("foo")
}
func TestMarshalUint64(t *testing.T) {
f := func(n uint64, resultExpected string) {
t.Helper()
result := marshalUint64(nil, n)
if string(result) != resultExpected {
t.Fatalf("unexpected result; got %q; want %q", result, resultExpected)
}
}
f(0, "0")
f(123456, "123456")
// the maximum possible value
f(18446744073709551615, "18446744073709551615")
f(18_446_744_073_709_551_615, "18446744073709551615")
}
func TestTryParseIPv4Mask_Success(t *testing.T) {
f := func(s string, resultExpected uint64) {
t.Helper()
result, ok := tryParseIPv4Mask(s)
if !ok {
t.Fatalf("cannot parse %q", s)
}
if result != resultExpected {
t.Fatalf("unexpected result; got %d; want %d", result, resultExpected)
}
}
f("/0", 1<<32)
f("/1", 1<<31)
f("/8", 1<<24)
f("/24", 1<<8)
f("/32", 1)
}
func TestTryParseIPv4Mask_Failure(t *testing.T) {
f := func(s string) {
t.Helper()
_, ok := tryParseIPv4Mask(s)
if ok {
t.Fatalf("expecting error when parsing %q", s)
}
}
// Empty mask
f("")
// Invalid prefix
f("foo")
// Non-numeric mask
f("/foo")
// Too big mask
f("/33")
// Negative mask
f("/-1")
}

View file

@ -5,12 +5,35 @@ import (
"testing"
)
func BenchmarkTryParseTimestampRFC3339Nano(b *testing.B) {
a := []string{
"2023-01-15T23:45:51Z",
"2023-02-15T23:45:51.123Z",
"2024-02-15T23:45:51.123456Z",
"2025-02-15T22:45:51.123456789Z",
"2023-02-15T22:45:51.000000000Z",
}
b.SetBytes(int64(len(a)))
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
for _, s := range a {
_, ok := tryParseTimestampRFC3339Nano(s)
if !ok {
panic(fmt.Errorf("cannot parse timestamp %q", s))
}
}
}
})
}
func BenchmarkTryParseTimestampISO8601(b *testing.B) {
a := []string{
"2023-01-15T23:45:51.123Z",
"2023-02-15T23:45:51.123Z",
"2023-02-15T23:45:51.123+01:00",
"2023-02-15T22:45:51.123-10:30",
"2024-02-15T23:45:51.123Z",
"2025-02-15T22:45:51.123Z",
"2023-02-15T22:45:51.000Z",
}