From 77e2d0be600b7ea6528921feb3ae7b4c9a5186aa Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 3 May 2024 11:15:09 +0200 Subject: [PATCH] wip --- docs/VictoriaLogs/LogsQL.md | 46 +- lib/logstorage/block_header.go | 4 +- lib/logstorage/block_result.go | 744 +++++++++++++++---- lib/logstorage/filter_exact.go | 4 +- lib/logstorage/filter_in.go | 2 +- lib/logstorage/filter_len_range.go | 5 +- lib/logstorage/filter_prefix.go | 9 +- lib/logstorage/filter_time.go | 4 + lib/logstorage/parser.go | 34 +- lib/logstorage/parser_test.go | 8 + lib/logstorage/pipe_stats.go | 225 +++++- lib/logstorage/pipe_stats_test.go | 61 ++ lib/logstorage/stats_avg.go | 5 +- lib/logstorage/stats_count.go | 6 +- lib/logstorage/stats_max.go | 5 +- lib/logstorage/stats_min.go | 5 +- lib/logstorage/stats_sum.go | 5 +- lib/logstorage/stats_unique.go | 15 +- lib/logstorage/values_encoder.go | 520 +++++++++++-- lib/logstorage/values_encoder_test.go | 624 ++++++++++++++-- lib/logstorage/values_encoder_timing_test.go | 27 +- 21 files changed, 1996 insertions(+), 362 deletions(-) create mode 100644 lib/logstorage/pipe_stats_test.go diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 04b595503..09432a7f4 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1069,7 +1069,7 @@ See the [Roadmap](https://docs.victoriametrics.com/VictoriaLogs/Roadmap.html) fo ## Stats -LogsQL supports calculating the following stats: +LogsQL supports calculating the following stats functions: - The number of matching log entries. Examples: - `error | stats count() as errors_total` returns the number of [log messages](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) with the `error` [word](#word). @@ -1117,6 +1117,48 @@ LogsQL supports calculating the following stats: across [log messages](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) with the `GET` [word](#word), grouped by `path` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) value. +### Grouping stats by buckets + +#### Time buckets + +Stats can be bucketed by [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) with the `_time:bucket_duration` syntax inside `by(...)` clause. +For example, the following query returns per-minute number of log messages with the `error` [word](#word) for the last 10 minutes: + +```logsql +_time:10m error | stats by (_time:1m) count() errors_per_minute +``` + +#### Numeric buckets + +Stats can be bucketed by any numeric [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with the `field_name:bucket_size` syntax inside `by(...)` clause. +For example, the following query returns the number of log messages with the `status=200` [phrase](#phrase-filter) bucketed by `request_duration_seconds` numeric field with `0.5` step: + +```logsql +_time:10m "status=200" | stats by (request_duration_seconds:0.5) count() requests +``` + +The `bucket_size` can contain the following convenient suffixes: + +- `KB` - the `bucket_size` is multiplied by `1000` in this case. For example, `10KB`. +- `MB` - the `bucket_size` is multiplied by `1_000_000` in this case. For example, `10MB`. +- `GB` - the `bucket_size` is multiplied by `1_000_000_000` in this case. For example, `10GB`. +- `TB` - the `bucket_size` is multiplied by `1_000_000_000_000` in this case. For example, `10TB`. +- `KiB` - the `bucket_size` is multiplied by `1024` in this case. For example, `10KiB`. +- `MiB` - the `bucket_size` is multiplied by `1024*1024` in this case. For example, `10MiB`. +- `GiB` - the `bucket_size` is multiplied by `1024*1024*1024` in this case. For example, `10GiB`. +- `TiB` - the `bucket_size` is multiplied by `1024*1024*1024*1024` in this case. For example, `10TiB`. + +#### IPv4 mask buckets + +Stats can be bucketed by [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with [IPv4 addresses](https://en.wikipedia.org/wiki/IP_address) +via the `ip_field_name:/network_mask` syntax inside `by(...)` clause. For example, the following query returns the number of log entries per `/24` subnetwork during the last 10 minutes: + +```logsql +_time:10m | stats by (ip:/24) count() requests_per_subnet +``` + +### Calculating multiple stats + Stats calculations can be combined. For example, the following query calculates the number of log messages with the `error` [word](#word), the number of unique values for `ip` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) and the sum of `duration` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model), grouped by `namespace` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model): @@ -1128,6 +1170,8 @@ error | stats by (namespace) sum(duration) as duration_sum ``` +### Stats TODO + LogsQL will support calculating the following additional stats based on the [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) and fields created by [transformations](#transformations): diff --git a/lib/logstorage/block_header.go b/lib/logstorage/block_header.go index 490c79216..72b463cd7 100644 --- a/lib/logstorage/block_header.go +++ b/lib/logstorage/block_header.go @@ -711,10 +711,10 @@ type timestampsHeader struct { // blockSize is the size of the timestamps block inside timestampsFilename file blockSize uint64 - // minTimestamp is the mimumum timestamp seen in the block + // minTimestamp is the mimumum timestamp seen in the block in nanoseconds minTimestamp int64 - // maxTimestamp is the maximum timestamp seen in the block + // maxTimestamp is the maximum timestamp seen in the block in nanoseconds maxTimestamp int64 // marshalType is the type used for encoding the timestamps block diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index 637873e31..80ed98596 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -1,15 +1,18 @@ package logstorage import ( + "encoding/binary" "math" - "strconv" - "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) +// blockResult holds results for a single block of log entries. +// +// It is expected that its contents is accessed only from a single goroutine at a time. type blockResult struct { // buf holds all the bytes behind the requested column values in the block. buf []byte @@ -335,6 +338,582 @@ func (br *blockResult) addConstColumn(name, value string) { }) } +func (br *blockResult) getBucketedColumnValues(c *blockResultColumn, bucketSize float64) []string { + if c.isConst { + return br.getBucketedConstValues(c.encodedValues[0], bucketSize) + } + if c.isTime { + return br.getBucketedTimestampValues(bucketSize) + } + + switch c.valueType { + case valueTypeString: + return br.getBucketedStringValues(c.encodedValues, bucketSize) + case valueTypeDict: + return br.getBucketedDictValues(c.encodedValues, c.dictValues, bucketSize) + case valueTypeUint8: + return br.getBucketedUint8Values(c.encodedValues, bucketSize) + case valueTypeUint16: + return br.getBucketedUint16Values(c.encodedValues, bucketSize) + case valueTypeUint32: + return br.getBucketedUint32Values(c.encodedValues, bucketSize) + case valueTypeUint64: + return br.getBucketedUint64Values(c.encodedValues, bucketSize) + case valueTypeFloat64: + return br.getBucketedFloat64Values(c.encodedValues, bucketSize) + case valueTypeIPv4: + return br.getBucketedIPv4Values(c.encodedValues, bucketSize) + case valueTypeTimestampISO8601: + return br.getBucketedTimestampISO8601Values(c.encodedValues, bucketSize) + default: + logger.Panicf("BUG: unknown valueType=%d", c.valueType) + return nil + } +} + +func (br *blockResult) getBucketedConstValues(v string, bucketSize float64) []string { + if v == "" { + // Fast path - return a slice of empty strings without constructing the slice. + return getEmptyStrings(len(br.timestamps)) + } + + // Slower path - construct slice of identical values with the len(br.timestamps) + + valuesBuf := br.valuesBuf + valuesBufLen := len(valuesBuf) + + v = br.getBucketedValue(v, bucketSize) + for range br.timestamps { + valuesBuf = append(valuesBuf, v) + } + + br.valuesBuf = valuesBuf + + return valuesBuf[valuesBufLen:] +} + +func (br *blockResult) getBucketedTimestampValues(bucketSize float64) []string { + buf := br.buf + valuesBuf := br.valuesBuf + valuesBufLen := len(valuesBuf) + + timestamps := br.timestamps + var s string + + if bucketSize <= 1 { + for i := range timestamps { + if i > 0 && timestamps[i-1] == timestamps[i] { + valuesBuf = append(valuesBuf, s) + continue + } + + bufLen := len(buf) + buf = marshalTimestampRFC3339Nano(buf, timestamps[i]) + s = bytesutil.ToUnsafeString(buf[bufLen:]) + valuesBuf = append(valuesBuf, s) + } + } else { + bucketSizeInt := int64(bucketSize) + var prevTimestamp int64 + for i := range timestamps { + if i > 0 && timestamps[i-1] == timestamps[i] { + valuesBuf = append(valuesBuf, s) + continue + } + + timestamp := timestamps[i] + timestamp -= timestamp % bucketSizeInt + if i > 0 && timestamp == prevTimestamp { + valuesBuf = append(valuesBuf, s) + continue + } + + prevTimestamp = timestamp + bufLen := len(buf) + buf = marshalTimestampRFC3339Nano(buf, timestamp) + s = bytesutil.ToUnsafeString(buf[bufLen:]) + valuesBuf = append(valuesBuf, s) + } + } + + br.buf = buf + br.valuesBuf = valuesBuf + + return valuesBuf[valuesBufLen:] +} + +func (br *blockResult) getBucketedStringValues(values []string, bucketSize float64) []string { + if bucketSize <= 0 { + return values + } + + valuesBuf := br.valuesBuf + valuesBufLen := len(valuesBuf) + + var s string + for i := range values { + if i > 0 && values[i-1] == values[i] { + valuesBuf = append(valuesBuf, s) + continue + } + + s = br.getBucketedValue(values[i], bucketSize) + valuesBuf = append(valuesBuf, s) + } + + br.valuesBuf = valuesBuf + + return valuesBuf[valuesBufLen:] +} + +func (br *blockResult) getBucketedDictValues(encodedValues, dictValues []string, bucketSize float64) []string { + valuesBuf := br.valuesBuf + valuesBufLen := len(valuesBuf) + + dictValues = br.getBucketedStringValues(dictValues, bucketSize) + for _, v := range encodedValues { + dictIdx := v[0] + valuesBuf = append(valuesBuf, dictValues[dictIdx]) + } + + br.valuesBuf = valuesBuf + + return valuesBuf[valuesBufLen:] +} + +func (br *blockResult) getBucketedUint8Values(encodedValues []string, bucketSize float64) []string { + buf := br.buf + valuesBuf := br.valuesBuf + valuesBufLen := len(valuesBuf) + + var s string + + if bucketSize <= 1 || bucketSize >= (1<<8) { + for i, v := range encodedValues { + if i > 0 && encodedValues[i-1] == encodedValues[i] { + valuesBuf = append(valuesBuf, s) + continue + } + + n := uint64(v[0]) + bufLen := len(buf) + buf = marshalUint64(buf, n) + s = bytesutil.ToUnsafeString(buf[bufLen:]) + valuesBuf = append(valuesBuf, s) + } + } else { + bucketSizeInt := uint64(bucketSize) + var nPrev uint64 + for i, v := range encodedValues { + if i > 0 && encodedValues[i-1] == encodedValues[i] { + valuesBuf = append(valuesBuf, s) + continue + } + + n := uint64(v[0]) + n -= n % bucketSizeInt + if i > 0 && n == nPrev { + valuesBuf = append(valuesBuf, s) + continue + } + + nPrev = n + bufLen := len(buf) + buf = marshalUint64(buf, n) + s = bytesutil.ToUnsafeString(buf[bufLen:]) + valuesBuf = append(valuesBuf, s) + } + } + + br.valuesBuf = valuesBuf + br.buf = buf + + return br.valuesBuf[valuesBufLen:] +} + +func (br *blockResult) getBucketedUint16Values(encodedValues []string, bucketSize float64) []string { + buf := br.buf + valuesBuf := br.valuesBuf + valuesBufLen := len(valuesBuf) + + var s string + + if bucketSize <= 1 || bucketSize >= (1<<16) { + for i, v := range encodedValues { + if i > 0 && encodedValues[i-1] == encodedValues[i] { + valuesBuf = append(valuesBuf, s) + continue + } + + b := bytesutil.ToUnsafeBytes(v) + n := uint64(encoding.UnmarshalUint16(b)) + bufLen := len(buf) + buf = marshalUint64(buf, n) + s = bytesutil.ToUnsafeString(buf[bufLen:]) + valuesBuf = append(valuesBuf, s) + } + } else { + bucketSizeInt := uint64(bucketSize) + var nPrev uint64 + for i, v := range encodedValues { + if i > 0 && encodedValues[i-1] == encodedValues[i] { + valuesBuf = append(valuesBuf, s) + continue + } + + b := bytesutil.ToUnsafeBytes(v) + n := uint64(encoding.UnmarshalUint16(b)) + n -= n % bucketSizeInt + if i > 0 && n == nPrev { + valuesBuf = append(valuesBuf, s) + continue + } + + nPrev = n + bufLen := len(buf) + buf = marshalUint64(buf, n) + s = bytesutil.ToUnsafeString(buf[bufLen:]) + valuesBuf = append(valuesBuf, s) + } + } + + br.valuesBuf = valuesBuf + br.buf = buf + + return br.valuesBuf[valuesBufLen:] +} + +func (br *blockResult) getBucketedUint32Values(encodedValues []string, bucketSize float64) []string { + buf := br.buf + valuesBuf := br.valuesBuf + valuesBufLen := len(valuesBuf) + + var s string + + if bucketSize <= 1 || bucketSize >= (1<<32) { + for i, v := range encodedValues { + if i > 0 && encodedValues[i-1] == encodedValues[i] { + valuesBuf = append(valuesBuf, s) + continue + } + + b := bytesutil.ToUnsafeBytes(v) + n := uint64(encoding.UnmarshalUint32(b)) + bufLen := len(buf) + buf = marshalUint64(buf, n) + s = bytesutil.ToUnsafeString(buf[bufLen:]) + valuesBuf = append(valuesBuf, s) + } + } else { + bucketSizeInt := uint64(bucketSize) + var nPrev uint64 + for i, v := range encodedValues { + if i > 0 && encodedValues[i-1] == encodedValues[i] { + valuesBuf = append(valuesBuf, s) + continue + } + + b := bytesutil.ToUnsafeBytes(v) + n := uint64(encoding.UnmarshalUint32(b)) + n -= n % bucketSizeInt + if i > 0 && n == nPrev { + valuesBuf = append(valuesBuf, s) + continue + } + + nPrev = n + bufLen := len(buf) + buf = marshalUint64(buf, n) + s = bytesutil.ToUnsafeString(buf[bufLen:]) + valuesBuf = append(valuesBuf, s) + } + } + + br.valuesBuf = valuesBuf + br.buf = buf + + return br.valuesBuf[valuesBufLen:] +} + +func (br *blockResult) getBucketedUint64Values(encodedValues []string, bucketSize float64) []string { + buf := br.buf + valuesBuf := br.valuesBuf + valuesBufLen := len(valuesBuf) + + var s string + + if bucketSize <= 1 || bucketSize >= (1<<64) { + for i, v := range encodedValues { + if i > 0 && encodedValues[i-1] == encodedValues[i] { + valuesBuf = append(valuesBuf, s) + continue + } + + b := bytesutil.ToUnsafeBytes(v) + n := encoding.UnmarshalUint64(b) + bufLen := len(buf) + buf = marshalUint64(buf, n) + s = bytesutil.ToUnsafeString(buf[bufLen:]) + valuesBuf = append(valuesBuf, s) + } + } else { + bucketSizeInt := uint64(bucketSize) + var nPrev uint64 + for i, v := range encodedValues { + if i > 0 && encodedValues[i-1] == encodedValues[i] { + valuesBuf = append(valuesBuf, s) + continue + } + + b := bytesutil.ToUnsafeBytes(v) + n := encoding.UnmarshalUint64(b) + n -= n % bucketSizeInt + if i > 0 && n == nPrev { + valuesBuf = append(valuesBuf, s) + continue + } + + nPrev = n + bufLen := len(buf) + buf = marshalUint64(buf, n) + s = bytesutil.ToUnsafeString(buf[bufLen:]) + valuesBuf = append(valuesBuf, s) + } + } + + br.valuesBuf = valuesBuf + br.buf = buf + + return br.valuesBuf[valuesBufLen:] +} + +func (br *blockResult) getBucketedFloat64Values(encodedValues []string, bucketSize float64) []string { + buf := br.buf + valuesBuf := br.valuesBuf + valuesBufLen := len(valuesBuf) + + var s string + + if bucketSize <= 0 { + for i, v := range encodedValues { + if i > 0 && encodedValues[i-1] == encodedValues[i] { + valuesBuf = append(valuesBuf, s) + continue + } + + b := bytesutil.ToUnsafeBytes(v) + n := encoding.UnmarshalUint64(b) + f := math.Float64frombits(n) + + bufLen := len(buf) + buf = marshalFloat64(buf, f) + s = bytesutil.ToUnsafeString(buf[bufLen:]) + valuesBuf = append(valuesBuf, s) + } + } else { + _, e := decimal.FromFloat(bucketSize) + p10 := math.Pow10(int(-e)) + bucketSizeP10 := int64(bucketSize * p10) + var fPrev float64 + for i, v := range encodedValues { + if i > 0 && encodedValues[i-1] == encodedValues[i] { + valuesBuf = append(valuesBuf, s) + continue + } + + b := bytesutil.ToUnsafeBytes(v) + n := encoding.UnmarshalUint64(b) + f := math.Float64frombits(n) + + // emulate f % bucketSize for float64 values + fP10 := int64(f * p10) + fP10 -= fP10 % bucketSizeP10 + f = float64(fP10) / p10 + + if i > 0 && f == fPrev { + valuesBuf = append(valuesBuf, s) + continue + } + + fPrev = f + bufLen := len(buf) + buf = marshalFloat64(buf, f) + s = bytesutil.ToUnsafeString(buf[bufLen:]) + valuesBuf = append(valuesBuf, s) + } + } + + br.valuesBuf = valuesBuf + br.buf = buf + + return br.valuesBuf[valuesBufLen:] +} + +func (br *blockResult) getBucketedIPv4Values(encodedValues []string, bucketSize float64) []string { + buf := br.buf + valuesBuf := br.valuesBuf + valuesBufLen := len(valuesBuf) + + var s string + + if bucketSize <= 1 { + for i, v := range encodedValues { + if i > 0 && encodedValues[i-1] == encodedValues[i] { + valuesBuf = append(valuesBuf, s) + continue + } + + bufLen := len(buf) + buf = toIPv4String(buf, v) + s = bytesutil.ToUnsafeString(buf[bufLen:]) + valuesBuf = append(valuesBuf, s) + } + } else { + bucketSizeInt := uint32(bucketSize) + var nPrev uint32 + for i, v := range encodedValues { + if i > 0 && encodedValues[i-1] == encodedValues[i] { + valuesBuf = append(valuesBuf, s) + continue + } + + b := bytesutil.ToUnsafeBytes(v) + n := binary.BigEndian.Uint32(b) + n -= n % bucketSizeInt + if i > 0 && n == nPrev { + valuesBuf = append(valuesBuf, s) + continue + } + + nPrev = n + bufLen := len(buf) + buf = marshalIPv4(buf, n) + s = bytesutil.ToUnsafeString(buf[bufLen:]) + valuesBuf = append(valuesBuf, s) + } + } + + br.valuesBuf = valuesBuf + br.buf = buf + + return valuesBuf[valuesBufLen:] +} + +func (br *blockResult) getBucketedTimestampISO8601Values(encodedValues []string, bucketSize float64) []string { + buf := br.buf + valuesBuf := br.valuesBuf + valuesBufLen := len(valuesBuf) + + var s string + + if bucketSize <= 1 { + for i, v := range encodedValues { + if i > 0 && encodedValues[i-1] == encodedValues[i] { + valuesBuf = append(valuesBuf, s) + continue + } + + b := bytesutil.ToUnsafeBytes(v) + n := encoding.UnmarshalUint64(b) + + bufLen := len(buf) + buf = marshalTimestampISO8601(buf, int64(n)) + s = bytesutil.ToUnsafeString(buf[bufLen:]) + valuesBuf = append(valuesBuf, s) + } + } else { + bucketSizeInt := uint64(bucketSize) + var nPrev uint64 + bb := bbPool.Get() + for i, v := range encodedValues { + if i > 0 && encodedValues[i-1] == encodedValues[i] { + valuesBuf = append(valuesBuf, s) + continue + } + + b := bytesutil.ToUnsafeBytes(v) + n := encoding.UnmarshalUint64(b) + n -= n % bucketSizeInt + if i > 0 && n == nPrev { + valuesBuf = append(valuesBuf, s) + continue + } + + nPrev = n + bufLen := len(buf) + buf = marshalTimestampISO8601(buf, int64(n)) + s = bytesutil.ToUnsafeString(buf[bufLen:]) + valuesBuf = append(valuesBuf, s) + } + bbPool.Put(bb) + } + + br.valuesBuf = valuesBuf + br.buf = buf + + return valuesBuf[valuesBufLen:] +} + +// getBucketedValue returns bucketed s according to the given bucketSize. +func (br *blockResult) getBucketedValue(s string, bucketSize float64) string { + if bucketSize <= 0 { + return s + } + if len(s) == 0 { + return s + } + + c := s[0] + if (c < '0' || c > '9') && c != '-' { + // Fast path - the value cannot be bucketed, since it starts with unexpected chars. + return s + } + + if f, ok := tryParseFloat64(s); ok { + // emulate f % bucketSize for float64 values + _, e := decimal.FromFloat(bucketSize) + p10 := math.Pow10(int(-e)) + fP10 := int64(f * p10) + fP10 -= fP10 % int64(bucketSize*p10) + f = float64(fP10) / p10 + + bufLen := len(br.buf) + br.buf = marshalFloat64(br.buf, f) + return bytesutil.ToUnsafeString(br.buf[bufLen:]) + } + + if nsecs, ok := tryParseTimestampISO8601(s); ok { + nsecs -= nsecs % int64(bucketSize) + bufLen := len(br.buf) + br.buf = marshalTimestampISO8601(br.buf, nsecs) + return bytesutil.ToUnsafeString(br.buf[bufLen:]) + } + + if nsecs, ok := tryParseTimestampRFC3339Nano(s); ok { + nsecs -= nsecs % int64(bucketSize) + bufLen := len(br.buf) + br.buf = marshalTimestampRFC3339Nano(br.buf, nsecs) + return bytesutil.ToUnsafeString(br.buf[bufLen:]) + } + + if n, ok := tryParseIPv4(s); ok { + n -= n % uint32(bucketSize) + bufLen := len(br.buf) + br.buf = marshalIPv4(br.buf, n) + return bytesutil.ToUnsafeString(br.buf[bufLen:]) + } + + if nsecs, ok := tryParseDuration(s); ok { + nsecs -= nsecs % int64(bucketSize) + bufLen := len(br.buf) + br.buf = marshalDuration(br.buf, nsecs) + return bytesutil.ToUnsafeString(br.buf[bufLen:]) + } + + // Couldn't parse s, so return it as is. + return s +} + func (br *blockResult) addEmptyStringColumn(columnName string) { br.cs = append(br.cs, blockResultColumn{ name: columnName, @@ -378,7 +957,9 @@ func (br *blockResult) getColumnByName(columnName string) blockResultColumn { } cs := br.getColumns() - for i := range cs { + + // iterate columns in reverse order, so overridden column results are returned instead of original column results. + for i := len(cs) - 1; i >= 0; i-- { if cs[i].name == columnName { return cs[i] } @@ -429,15 +1010,6 @@ func (br *blockResult) truncateRows(keepRows int) { } } -func (br *blockResult) appendColumnValues(dst [][]string, columnNames []string) [][]string { - for _, columnName := range columnNames { - c := br.getColumnByName(columnName) - values := c.getValues(br) - dst = append(dst, values) - } - return dst -} - type blockResultColumn struct { // name is column name. name string @@ -455,15 +1027,21 @@ type blockResultColumn struct { // valueType is the type of non-cost value valueType valueType - // dictValues contain dictionary values for valueTypeDict column + // dictValues contains dictionary values for valueTypeDict column dictValues []string - // encodedValues contain encoded values for non-const column + // encodedValues contains encoded values for non-const column encodedValues []string - // values contain decoded values after getValues() call for the given column + // values contains decoded values after getValues() call values []string + // bucketedValues contains values after getBucketedValues() call + bucketedValues []string + + // bucketSize contains bucketSize for bucketedValues + bucketSize float64 + // buf and valuesBuf are used by addValue() in order to re-use memory across resetRows(). buf []byte valuesBuf []string @@ -557,129 +1135,31 @@ func (c *blockResultColumn) getValueAtRow(br *blockResult, rowIdx int) string { return values[rowIdx] } +// getValues returns values for the given column, bucketed according to bucketSize. +// +// The returned values are valid until br.reset() is called. +func (c *blockResultColumn) getBucketedValues(br *blockResult, bucketSize float64) []string { + if bucketSize <= 0 { + return c.getValues(br) + } + if values := c.bucketedValues; values != nil && c.bucketSize == bucketSize { + return values + } + + c.bucketedValues = br.getBucketedColumnValues(c, bucketSize) + c.bucketSize = bucketSize + return c.bucketedValues +} + // getValues returns values for the given column. // // The returned values are valid until br.reset() is called. func (c *blockResultColumn) getValues(br *blockResult) []string { - if c.values != nil { - return c.values + if values := c.values; values != nil { + return values } - buf := br.buf - valuesBuf := br.valuesBuf - valuesBufLen := len(valuesBuf) - - if c.isConst { - v := c.encodedValues[0] - if v == "" { - // Fast path - return a slice of empty strings without constructing it. - c.values = getEmptyStrings(len(br.timestamps)) - return c.values - } - - // Slower path - construct slice of identical values with the len(br.timestamps) - for range br.timestamps { - valuesBuf = append(valuesBuf, v) - } - c.values = valuesBuf[valuesBufLen:] - br.valuesBuf = valuesBuf - return c.values - } - if c.isTime { - for _, timestamp := range br.timestamps { - t := time.Unix(0, timestamp).UTC() - bufLen := len(buf) - buf = t.AppendFormat(buf, time.RFC3339Nano) - s := bytesutil.ToUnsafeString(buf[bufLen:]) - valuesBuf = append(valuesBuf, s) - } - c.values = valuesBuf[valuesBufLen:] - br.buf = buf - br.valuesBuf = valuesBuf - return c.values - } - - appendValue := func(v string) { - bufLen := len(buf) - buf = append(buf, v...) - s := bytesutil.ToUnsafeString(buf[bufLen:]) - valuesBuf = append(valuesBuf, s) - } - - switch c.valueType { - case valueTypeString: - c.values = c.encodedValues - return c.values - case valueTypeDict: - dictValues := c.dictValues - for _, v := range c.encodedValues { - dictIdx := v[0] - appendValue(dictValues[dictIdx]) - } - case valueTypeUint8: - bb := bbPool.Get() - for _, v := range c.encodedValues { - n := uint64(v[0]) - bb.B = strconv.AppendUint(bb.B[:0], n, 10) - appendValue(bytesutil.ToUnsafeString(bb.B)) - } - bbPool.Put(bb) - case valueTypeUint16: - bb := bbPool.Get() - for _, v := range c.encodedValues { - b := bytesutil.ToUnsafeBytes(v) - n := uint64(encoding.UnmarshalUint16(b)) - bb.B = strconv.AppendUint(bb.B[:0], n, 10) - appendValue(bytesutil.ToUnsafeString(bb.B)) - } - bbPool.Put(bb) - case valueTypeUint32: - bb := bbPool.Get() - for _, v := range c.encodedValues { - b := bytesutil.ToUnsafeBytes(v) - n := uint64(encoding.UnmarshalUint32(b)) - bb.B = strconv.AppendUint(bb.B[:0], n, 10) - appendValue(bytesutil.ToUnsafeString(bb.B)) - } - bbPool.Put(bb) - case valueTypeUint64: - bb := bbPool.Get() - for _, v := range c.encodedValues { - b := bytesutil.ToUnsafeBytes(v) - n := encoding.UnmarshalUint64(b) - bb.B = strconv.AppendUint(bb.B[:0], n, 10) - appendValue(bytesutil.ToUnsafeString(bb.B)) - } - bbPool.Put(bb) - case valueTypeFloat64: - bb := bbPool.Get() - for _, v := range c.encodedValues { - bb.B = toFloat64String(bb.B[:0], v) - appendValue(bytesutil.ToUnsafeString(bb.B)) - } - bbPool.Put(bb) - case valueTypeIPv4: - bb := bbPool.Get() - for _, v := range c.encodedValues { - bb.B = toIPv4String(bb.B[:0], v) - appendValue(bytesutil.ToUnsafeString(bb.B)) - } - bbPool.Put(bb) - case valueTypeTimestampISO8601: - bb := bbPool.Get() - for _, v := range c.encodedValues { - bb.B = toTimestampISO8601String(bb.B[:0], v) - appendValue(bytesutil.ToUnsafeString(bb.B)) - } - bbPool.Put(bb) - default: - logger.Panicf("BUG: unknown valueType=%d", c.valueType) - } - - c.values = valuesBuf[valuesBufLen:] - br.buf = buf - br.valuesBuf = valuesBuf - + c.values = br.getBucketedColumnValues(c, 0) return c.values } diff --git a/lib/logstorage/filter_exact.go b/lib/logstorage/filter_exact.go index c868787fe..f4cf1de01 100644 --- a/lib/logstorage/filter_exact.go +++ b/lib/logstorage/filter_exact.go @@ -84,12 +84,12 @@ func (fe *filterExact) apply(bs *blockSearch, bm *bitmap) { func matchTimestampISO8601ByExactValue(bs *blockSearch, ch *columnHeader, bm *bitmap, value string, tokens []string) { n, ok := tryParseTimestampISO8601(value) - if !ok || n < ch.minValue || n > ch.maxValue { + if !ok || n < int64(ch.minValue) || n > int64(ch.maxValue) { bm.resetBits() return } bb := bbPool.Get() - bb.B = encoding.MarshalUint64(bb.B, n) + bb.B = encoding.MarshalUint64(bb.B, uint64(n)) matchBinaryValue(bs, ch, bm, bb.B, tokens) bbPool.Put(bb) } diff --git a/lib/logstorage/filter_in.go b/lib/logstorage/filter_in.go index 36ca0c5c4..57177dbac 100644 --- a/lib/logstorage/filter_in.go +++ b/lib/logstorage/filter_in.go @@ -242,7 +242,7 @@ func (fi *filterIn) initTimestampISO8601Values() { continue } bufLen := len(buf) - buf = encoding.MarshalUint64(buf, n) + buf = encoding.MarshalUint64(buf, uint64(n)) s := bytesutil.ToUnsafeString(buf[bufLen:]) m[s] = struct{}{} } diff --git a/lib/logstorage/filter_len_range.go b/lib/logstorage/filter_len_range.go index 85050814c..b287dcf24 100644 --- a/lib/logstorage/filter_len_range.go +++ b/lib/logstorage/filter_len_range.go @@ -1,7 +1,6 @@ package logstorage import ( - "strconv" "unicode/utf8" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -192,12 +191,12 @@ func matchMinMaxValueLen(ch *columnHeader, minLen, maxLen uint64) bool { bb := bbPool.Get() defer bbPool.Put(bb) - bb.B = strconv.AppendUint(bb.B[:0], ch.minValue, 10) + bb.B = marshalUint64(bb.B[:0], ch.minValue) s := bytesutil.ToUnsafeString(bb.B) if maxLen < uint64(len(s)) { return false } - bb.B = strconv.AppendUint(bb.B[:0], ch.maxValue, 10) + bb.B = marshalUint64(bb.B[:0], ch.maxValue) s = bytesutil.ToUnsafeString(bb.B) return minLen <= uint64(len(s)) } diff --git a/lib/logstorage/filter_prefix.go b/lib/logstorage/filter_prefix.go index ca61ddebf..ea22711fd 100644 --- a/lib/logstorage/filter_prefix.go +++ b/lib/logstorage/filter_prefix.go @@ -2,7 +2,6 @@ package logstorage import ( "fmt" - "strconv" "strings" "sync" "unicode/utf8" @@ -323,7 +322,7 @@ func toUint8String(bs *blockSearch, bb *bytesutil.ByteBuffer, v string) string { logger.Panicf("FATAL: %s: unexpected length for binary representation of uint8 number: got %d; want 1", bs.partPath(), len(v)) } n := uint64(v[0]) - bb.B = strconv.AppendUint(bb.B[:0], n, 10) + bb.B = marshalUint64(bb.B[:0], n) return bytesutil.ToUnsafeString(bb.B) } @@ -333,7 +332,7 @@ func toUint16String(bs *blockSearch, bb *bytesutil.ByteBuffer, v string) string } b := bytesutil.ToUnsafeBytes(v) n := uint64(encoding.UnmarshalUint16(b)) - bb.B = strconv.AppendUint(bb.B[:0], n, 10) + bb.B = marshalUint64(bb.B[:0], n) return bytesutil.ToUnsafeString(bb.B) } @@ -343,7 +342,7 @@ func toUint32String(bs *blockSearch, bb *bytesutil.ByteBuffer, v string) string } b := bytesutil.ToUnsafeBytes(v) n := uint64(encoding.UnmarshalUint32(b)) - bb.B = strconv.AppendUint(bb.B[:0], n, 10) + bb.B = marshalUint64(bb.B[:0], n) return bytesutil.ToUnsafeString(bb.B) } @@ -353,6 +352,6 @@ func toUint64String(bs *blockSearch, bb *bytesutil.ByteBuffer, v string) string } b := bytesutil.ToUnsafeBytes(v) n := encoding.UnmarshalUint64(b) - bb.B = strconv.AppendUint(bb.B[:0], n, 10) + bb.B = marshalUint64(bb.B[:0], n) return bytesutil.ToUnsafeString(bb.B) } diff --git a/lib/logstorage/filter_time.go b/lib/logstorage/filter_time.go index 381b3de61..2cffbf4c4 100644 --- a/lib/logstorage/filter_time.go +++ b/lib/logstorage/filter_time.go @@ -4,9 +4,13 @@ package logstorage // // It is expressed as `_time:(start, end]` in LogsQL. type filterTime struct { + // mintimestamp is the minimum timestamp in nanoseconds to find minTimestamp int64 + + // maxTimestamp is the maximum timestamp in nanoseconds to find maxTimestamp int64 + // stringRepr is string representation of the filter stringRepr string } diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index dc93e1eab..15d158da7 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -219,7 +219,7 @@ func ParseQuery(s string) (*Query, error) { f, err := parseFilter(lex) if err != nil { - return nil, fmt.Errorf("%w; context: %s", err, lex.context()) + return nil, fmt.Errorf("%w; context: [%s]", err, lex.context()) } q := &Query{ f: f, @@ -227,12 +227,12 @@ func ParseQuery(s string) (*Query, error) { pipes, err := parsePipes(lex) if err != nil { - return nil, fmt.Errorf("%w; context: %s", err, lex.context()) + return nil, fmt.Errorf("%w; context: [%s]", err, lex.context()) } q.pipes = pipes if !lex.isEnd() { - return nil, fmt.Errorf("unexpected unparsed tail; context: %s", lex.context()) + return nil, fmt.Errorf("unexpected unparsed tail; context: [%s]", lex.context()) } return q, nil @@ -344,25 +344,25 @@ func parseGenericFilter(lex *lexer, fieldName string) (filter, error) { case lex.isKeyword(",", ")", "[", "]"): return nil, fmt.Errorf("unexpected token %q", lex.token) } - phrase := getCompoundPhrase(lex, fieldName == "") + phrase := getCompoundPhrase(lex, fieldName != "") return parseFilterForPhrase(lex, phrase, fieldName) } -func getCompoundPhrase(lex *lexer, stopOnColon bool) string { +func getCompoundPhrase(lex *lexer, allowColon bool) string { phrase := lex.token rawPhrase := lex.rawToken lex.nextToken() - suffix := getCompoundSuffix(lex, stopOnColon) + suffix := getCompoundSuffix(lex, allowColon) if suffix == "" { return phrase } return rawPhrase + suffix } -func getCompoundSuffix(lex *lexer, stopOnColon bool) string { +func getCompoundSuffix(lex *lexer, allowColon bool) string { s := "" stopTokens := []string{"*", ",", "(", ")", "[", "]", "|", ""} - if stopOnColon { + if !allowColon { stopTokens = append(stopTokens, ":") } for !lex.isSkippedSpace && !lex.isKeyword(stopTokens...) { @@ -495,7 +495,7 @@ func parseFuncArgMaybePrefix(lex *lexer, funcName, fieldName string, callback fu phrase := lex.token lex.nextToken() if !lex.isKeyword("(") { - phrase += getCompoundSuffix(lex, fieldName == "") + phrase += getCompoundSuffix(lex, fieldName != "") return parseFilterForPhrase(lex, phrase, fieldName) } if !lex.mustNextToken() { @@ -676,7 +676,7 @@ func parseFilterRange(lex *lexer, fieldName string) (filter, error) { case lex.isKeyword("["): includeMinValue = true default: - phrase := funcName + getCompoundSuffix(lex, fieldName == "") + phrase := funcName + getCompoundSuffix(lex, fieldName != "") return parseFilterForPhrase(lex, phrase, fieldName) } if !lex.mustNextToken() { @@ -765,7 +765,7 @@ func parseFuncArgs(lex *lexer, fieldName string, callback func(args []string) (f funcName := lex.token lex.nextToken() if !lex.isKeyword("(") { - phrase := funcName + getCompoundSuffix(lex, fieldName == "") + phrase := funcName + getCompoundSuffix(lex, fieldName != "") return parseFilterForPhrase(lex, phrase, fieldName) } if !lex.mustNextToken() { @@ -824,9 +824,9 @@ func parseFilterTimeWithOffset(lex *lexer) (*filterTime, error) { return nil, fmt.Errorf("missing offset for _time filter %s", ft) } s := getCompoundToken(lex) - d, err := promutils.ParseDuration(s) - if err != nil { - return nil, fmt.Errorf("cannot parse offset for _time filter %s: %w", ft, err) + d, ok := tryParseDuration(s) + if !ok { + return nil, fmt.Errorf("cannot parse offset %q for _time filter %s: %w", s, ft, err) } offset := int64(d) ft.minTimestamp -= offset @@ -862,9 +862,9 @@ func parseFilterTime(lex *lexer) (*filterTime, error) { return ft, nil } // Parse _time:duration, which transforms to '_time:(now-duration, now]' - d, err := promutils.ParseDuration(s) - if err != nil { - return nil, fmt.Errorf("cannot parse duration in _time filter: %w", err) + d, ok := tryParseDuration(s) + if !ok { + return nil, fmt.Errorf("cannot parse duration %q in _time filter", s) } if d < 0 { d = -d diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 211b154da..25f2c5688 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -862,6 +862,10 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | stats count() "foo.bar:baz", uniq(a) bar`, `* | stats count() as "foo.bar:baz", uniq(a) as bar`) f(`* | stats by (x, y) count(*) foo, uniq(a,b) bar`, `* | stats by (x, y) count(*) as foo, uniq(a, b) as bar`) + // stats pipe with grouping buckets + f(`* | stats by(_time:1d, response_size:1_000KiB, request_duration:5s, foo) count() as foo`, `* | stats by (_time:1d, response_size:1_000KiB, request_duration:5s, foo) count() as foo`) + f(`*|stats by(client_ip:/24, server_ip:/16) count() foo`, `* | stats by (client_ip:/24, server_ip:/16) count() as foo`) + // multiple different pipes f(`* | fields foo, bar | head 100 | stats by(foo,bar) count(baz) as qwert`, `* | fields foo, bar | head 100 | stats by (foo, bar) count(baz) as qwert`) f(`* | skip 100 | head 20 | skip 10`, `* | skip 100 | head 20 | skip 10`) @@ -1130,6 +1134,10 @@ func TestParseQueryFailure(t *testing.T) { f(`foo | stats uniq`) f(`foo | stats uniq()`) + // invalid grouping fields + f(`foo | stats by(foo:bar) count() baz`) + f(`foo | stats by(foo:/bar) count() baz`) + // invalid by clause f(`foo | stats by`) f(`foo | stats by bar`) diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index 8dd1cf9ab..2b43bc3d9 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -16,8 +16,8 @@ import ( // // See https://docs.victoriametrics.com/victorialogs/logsql/#stats type pipeStats struct { - // byFields contains field names from 'by(...)' clause. - byFields []string + // byFields contains field names with optional buckets from 'by(...)' clause. + byFields []*byField // resultNames contains names of output results generated by funcs. resultNames []string @@ -64,7 +64,11 @@ type statsProcessor interface { func (ps *pipeStats) String() string { s := "stats " if len(ps.byFields) > 0 { - s += "by (" + fieldNamesString(ps.byFields) + ") " + a := make([]string, len(ps.byFields)) + for i := range ps.byFields { + a[i] = ps.byFields[i].String() + } + s += "by (" + strings.Join(a, ", ") + ") " } if len(ps.funcs) == 0 { @@ -185,18 +189,29 @@ func (psp *pipeStatsProcessor) writeBlock(workerID uint, br *blockResult) { } if len(byFields) == 1 { // Special case for grouping by a single column. - c := br.getColumnByName(byFields[0]) + bf := byFields[0] + c := br.getColumnByName(bf.name) if c.isConst { // Fast path for column with constant value. - shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(c.encodedValues[0])) + v := br.getBucketedValue(c.encodedValues[0], bf.bucketSize) + shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(v)) for _, sfp := range shard.getStatsProcessors(shard.keyBuf) { shard.stateSizeBudget -= sfp.updateStatsForAllRows(br) } return } - // Slower path for column with different values. - values := c.getValues(br) + values := c.getBucketedValues(br, bf.bucketSize) + if areConstValues(values) { + // Fast path for column with constant values. + shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(values[0])) + for _, sfp := range shard.getStatsProcessors(shard.keyBuf) { + shard.stateSizeBudget -= sfp.updateStatsForAllRows(br) + } + return + } + + // Slower generic path for a column with different values. var sfps []statsProcessor keyBuf := shard.keyBuf[:0] for i := range br.timestamps { @@ -212,34 +227,39 @@ func (psp *pipeStatsProcessor) writeBlock(workerID uint, br *blockResult) { return } + // Obtain columns for byFields + columnValues := shard.columnValues[:0] + for _, bf := range byFields { + c := br.getColumnByName(bf.name) + values := c.getBucketedValues(br, bf.bucketSize) + columnValues = append(columnValues, values) + } + shard.columnValues = columnValues + // Verify whether all the 'by (...)' columns are constant. areAllConstColumns := true - keyBuf := shard.keyBuf[:0] - for _, f := range byFields { - c := br.getColumnByName(f) - if !c.isConst { + for _, values := range columnValues { + if !areConstValues(values) { areAllConstColumns = false break } - keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(c.encodedValues[0])) } - shard.keyBuf = keyBuf - if areAllConstColumns { // Fast path for constant 'by (...)' columns. + keyBuf := shard.keyBuf[:0] + for _, values := range columnValues { + keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[0])) + } for _, sfp := range shard.getStatsProcessors(keyBuf) { shard.stateSizeBudget -= sfp.updateStatsForAllRows(br) } + shard.keyBuf = keyBuf return } // The slowest path - group by multiple columns with different values across rows. - - // Pre-calculate column values for byFields in order to speed up building group key in the loop below. - shard.columnValues = br.appendColumnValues(shard.columnValues[:0], byFields) - columnValues := shard.columnValues - var sfps []statsProcessor + keyBuf := shard.keyBuf[:0] for i := range br.timestamps { // Verify whether the key for 'by (...)' fields equals the previous key sameValue := sfps != nil @@ -305,8 +325,8 @@ func (psp *pipeStatsProcessor) flush() error { var values []string var br blockResult - for _, f := range byFields { - br.addEmptyStringColumn(f) + for _, bf := range byFields { + br.addEmptyStringColumn(bf.name) } for _, resultName := range psp.ps.resultNames { br.addEmptyStringColumn(resultName) @@ -358,20 +378,22 @@ func (psp *pipeStatsProcessor) flush() error { func (ps *pipeStats) neededFields() []string { var neededFields []string m := make(map[string]struct{}) - updateNeededFields := func(fields []string) { - for _, field := range fields { - if _, ok := m[field]; !ok { - m[field] = struct{}{} - neededFields = append(neededFields, field) - } + + for _, bf := range ps.byFields { + name := bf.name + if _, ok := m[name]; !ok { + m[name] = struct{}{} + neededFields = append(neededFields, name) } } - updateNeededFields(ps.byFields) - for _, f := range ps.funcs { - fields := f.neededFields() - updateNeededFields(fields) + for _, fieldName := range f.neededFields() { + if _, ok := m[fieldName]; !ok { + m[fieldName] = struct{}{} + neededFields = append(neededFields, fieldName) + } + } } return neededFields @@ -385,11 +407,11 @@ func parsePipeStats(lex *lexer) (*pipeStats, error) { var ps pipeStats if lex.isKeyword("by") { lex.nextToken() - fields, err := parseFieldNamesInParens(lex) + bfs, err := parseByFields(lex) if err != nil { - return nil, fmt.Errorf("cannot parse 'by': %w", err) + return nil, fmt.Errorf("cannot parse 'by' clause: %w", err) } - ps.byFields = fields + ps.byFields = bfs } var resultNames []string @@ -476,6 +498,124 @@ func parseResultName(lex *lexer) (string, error) { return resultName, nil } +// byField represents by(...) field. +// +// It can have either `name` representation of `name:bucket` representation, +// where `bucket` can contain duration, size or numeric value for creating different buckets +// for 'value/bucket'. +type byField struct { + name string + + // bucketSizeStr is string representation of the bucket size + bucketSizeStr string + + // bucketSize is the bucket for grouping the given field values with value/bucketSize calculations. + bucketSize float64 +} + +func (bf *byField) String() string { + s := quoteTokenIfNeeded(bf.name) + if bf.bucketSizeStr != "" { + s += ":" + bf.bucketSizeStr + } + return s +} + +func parseByFields(lex *lexer) ([]*byField, error) { + if !lex.isKeyword("(") { + return nil, fmt.Errorf("missing `(`") + } + var bfs []*byField + for { + if !lex.mustNextToken() { + return nil, fmt.Errorf("missing field name or ')'") + } + if lex.isKeyword(")") { + lex.nextToken() + return bfs, nil + } + if lex.isKeyword(",") { + return nil, fmt.Errorf("unexpected `,`") + } + fieldName, err := parseFieldName(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse field name: %w", err) + } + bf := &byField{ + name: fieldName, + } + if lex.isKeyword(":") { + lex.nextToken() + bucketSizeStr := lex.token + lex.nextToken() + if bucketSizeStr == "/" { + bucketSizeStr += lex.token + lex.nextToken() + } + bucketSize, ok := tryParseBucketSize(bucketSizeStr) + if !ok { + return nil, fmt.Errorf("cannot parse bucket size for field %q: %q", fieldName, bucketSizeStr) + } + if bucketSize < 0 { + return nil, fmt.Errorf("bucketSize for the field %q cannot be negative; got %q", fieldName, bucketSizeStr) + } + bf.bucketSizeStr = bucketSizeStr + bf.bucketSize = bucketSize + } + bfs = append(bfs, bf) + switch { + case lex.isKeyword(")"): + lex.nextToken() + return bfs, nil + case lex.isKeyword(","): + default: + return nil, fmt.Errorf("unexpected token: %q; expecting ',' or ')'", lex.token) + } + } +} + +// tryParseBucketSize tries parsing bucket size, which can have the following formats: +// +// - integer number: 12345 +// - floating-point number: 1.2345 +// - duration: 1.5s - it is converted to the number of nanoseconds +// - bytes: 1.5KiB +// - ipv4 mask: /24 +func tryParseBucketSize(s string) (float64, bool) { + // Try parsing s as floating point number + if f, ok := tryParseFloat64(s); ok { + return f, true + } + + // Try parsing s as duration (1s, 5m, etc.) + if nsecs, ok := tryParseDuration(s); ok { + return float64(nsecs), true + } + + // Try parsing s as bytes (KiB, MB, etc.) + if n, ok := tryParseBytes(s); ok { + return float64(n), true + } + + if n, ok := tryParseIPv4Mask(s); ok { + return float64(n), true + } + + return 0, false +} + +func parseFieldNamesForFunc(lex *lexer, funcName string) ([]string, error) { + if !lex.isKeyword(funcName) { + return nil, fmt.Errorf("unexpected func; got %q; want %q", lex.token, funcName) + } + lex.nextToken() + fields, err := parseFieldNamesInParens(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse %q args: %w", funcName, err) + } + return fields, nil +} + func parseFieldNamesInParens(lex *lexer) ([]string, error) { if !lex.isKeyword("(") { return nil, fmt.Errorf("missing `(`") @@ -509,10 +649,10 @@ func parseFieldNamesInParens(lex *lexer) ([]string, error) { } func parseFieldName(lex *lexer) (string, error) { - if lex.isKeyword(",", "(", ")", "[", "]", "|", "") { + if lex.isKeyword(",", "(", ")", "[", "]", "|", ":", "") { return "", fmt.Errorf("unexpected token: %q", lex.token) } - token := getCompoundPhrase(lex, true) + token := getCompoundPhrase(lex, false) return token, nil } @@ -526,3 +666,16 @@ func fieldNamesString(fields []string) string { } return strings.Join(a, ", ") } + +func areConstValues(values []string) bool { + if len(values) == 0 { + return false + } + v := values[0] + for i := 1; i < len(values); i++ { + if v != values[i] { + return false + } + } + return true +} diff --git a/lib/logstorage/pipe_stats_test.go b/lib/logstorage/pipe_stats_test.go new file mode 100644 index 000000000..1b409bd62 --- /dev/null +++ b/lib/logstorage/pipe_stats_test.go @@ -0,0 +1,61 @@ +package logstorage + +import ( + "testing" +) + +func TestTryParseBucketSize_Success(t *testing.T) { + f := func(s string, resultExpected float64) { + t.Helper() + + result, ok := tryParseBucketSize(s) + if !ok { + t.Fatalf("cannot parse %q", s) + } + if result != resultExpected { + t.Fatalf("unexpected result; got %f; want %f", result, resultExpected) + } + } + + // integers + f("0", 0) + f("123", 123) + f("1_234_678", 1234678) + f("-1_234_678", -1234678) + + // floating-point numbers + f("0.0", 0) + f("123.435", 123.435) + f("1_000.433_344", 1000.433344) + f("-1_000.433_344", -1000.433344) + + // durations + f("5m", 5*nsecsPerMinute) + f("1h5m3.5s", nsecsPerHour+5*nsecsPerMinute+3.5*nsecsPerSecond) + f("-1h5m3.5s", -(nsecsPerHour + 5*nsecsPerMinute + 3.5*nsecsPerSecond)) + + // bytes + f("1b", 1) + f("1k", 1_000) + f("1Kb", 1_000) + f("5.5KiB", 5.5*(1<<10)) + f("10MB500KB10B", 10*1_000_000+500*1_000+10) + f("10m0k", 10*1_000_000) +} + +func TestTryParseBucketSize_Failure(t *testing.T) { + f := func(s string) { + t.Helper() + + _, ok := tryParseBucketSize(s) + if ok { + t.Fatalf("expecting error when parsing %q", s) + } + } + + f("") + f("foo") + + // negative bytes are forbidden + f("-10MB") +} diff --git a/lib/logstorage/stats_avg.go b/lib/logstorage/stats_avg.go index 1e05ab830..edeb2a2a5 100644 --- a/lib/logstorage/stats_avg.go +++ b/lib/logstorage/stats_avg.go @@ -93,10 +93,9 @@ func (sap *statsAvgProcessor) finalizeStats() string { } func parseStatsAvg(lex *lexer) (*statsAvg, error) { - lex.nextToken() - fields, err := parseFieldNamesInParens(lex) + fields, err := parseFieldNamesForFunc(lex, "avg") if err != nil { - return nil, fmt.Errorf("cannot parse 'avg' args: %w", err) + return nil, err } if len(fields) == 0 { return nil, fmt.Errorf("'avg' must contain at least one arg") diff --git a/lib/logstorage/stats_count.go b/lib/logstorage/stats_count.go index 4d066ed7f..81d9ebf0c 100644 --- a/lib/logstorage/stats_count.go +++ b/lib/logstorage/stats_count.go @@ -1,7 +1,6 @@ package logstorage import ( - "fmt" "slices" "strconv" "unsafe" @@ -195,10 +194,9 @@ func (scp *statsCountProcessor) finalizeStats() string { } func parseStatsCount(lex *lexer) (*statsCount, error) { - lex.nextToken() - fields, err := parseFieldNamesInParens(lex) + fields, err := parseFieldNamesForFunc(lex, "count") if err != nil { - return nil, fmt.Errorf("cannot parse 'count' args: %w", err) + return nil, err } sc := &statsCount{ fields: fields, diff --git a/lib/logstorage/stats_max.go b/lib/logstorage/stats_max.go index 34ada87a1..b842810de 100644 --- a/lib/logstorage/stats_max.go +++ b/lib/logstorage/stats_max.go @@ -93,10 +93,9 @@ func (smp *statsMaxProcessor) finalizeStats() string { } func parseStatsMax(lex *lexer) (*statsMax, error) { - lex.nextToken() - fields, err := parseFieldNamesInParens(lex) + fields, err := parseFieldNamesForFunc(lex, "max") if err != nil { - return nil, fmt.Errorf("cannot parse 'max' args: %w", err) + return nil, err } if len(fields) == 0 { return nil, fmt.Errorf("'max' must contain at least one arg") diff --git a/lib/logstorage/stats_min.go b/lib/logstorage/stats_min.go index 50e47e1fd..e15c47794 100644 --- a/lib/logstorage/stats_min.go +++ b/lib/logstorage/stats_min.go @@ -93,10 +93,9 @@ func (smp *statsMinProcessor) finalizeStats() string { } func parseStatsMin(lex *lexer) (*statsMin, error) { - lex.nextToken() - fields, err := parseFieldNamesInParens(lex) + fields, err := parseFieldNamesForFunc(lex, "min") if err != nil { - return nil, fmt.Errorf("cannot parse 'min' args: %w", err) + return nil, err } if len(fields) == 0 { return nil, fmt.Errorf("'min' must contain at least one arg") diff --git a/lib/logstorage/stats_sum.go b/lib/logstorage/stats_sum.go index 7515abf15..5ed33418a 100644 --- a/lib/logstorage/stats_sum.go +++ b/lib/logstorage/stats_sum.go @@ -107,10 +107,9 @@ func (ssp *statsSumProcessor) finalizeStats() string { } func parseStatsSum(lex *lexer) (*statsSum, error) { - lex.nextToken() - fields, err := parseFieldNamesInParens(lex) + fields, err := parseFieldNamesForFunc(lex, "sum") if err != nil { - return nil, fmt.Errorf("cannot parse 'sum' args: %w", err) + return nil, err } if len(fields) == 0 { return nil, fmt.Errorf("'sum' must contain at least one arg") diff --git a/lib/logstorage/stats_unique.go b/lib/logstorage/stats_unique.go index 19cfd00a9..dfb486807 100644 --- a/lib/logstorage/stats_unique.go +++ b/lib/logstorage/stats_unique.go @@ -1,7 +1,6 @@ package logstorage import ( - "fmt" "slices" "strconv" "unsafe" @@ -175,8 +174,13 @@ func (sup *statsUniqProcessor) updateStatsForAllRows(br *blockResult) int { // Slow path for multiple columns. // Pre-calculate column values for byFields in order to speed up building group key in the loop below. - sup.columnValues = br.appendColumnValues(sup.columnValues[:0], fields) - columnValues := sup.columnValues + columnValues := sup.columnValues[:0] + for _, f := range fields { + c := br.getColumnByName(f) + values := c.getValues(br) + columnValues = append(columnValues, values) + } + sup.columnValues = columnValues keyBuf := sup.keyBuf[:0] for i := range br.timestamps { @@ -352,10 +356,9 @@ func (sup *statsUniqProcessor) finalizeStats() string { } func parseStatsUniq(lex *lexer) (*statsUniq, error) { - lex.nextToken() - fields, err := parseFieldNamesInParens(lex) + fields, err := parseFieldNamesForFunc(lex, "uniq") if err != nil { - return nil, fmt.Errorf("cannot parse 'uniq' args: %w", err) + return nil, err } su := &statsUniq{ fields: fields, diff --git a/lib/logstorage/values_encoder.go b/lib/logstorage/values_encoder.go index 73034bdec..50cb68760 100644 --- a/lib/logstorage/values_encoder.go +++ b/lib/logstorage/values_encoder.go @@ -156,7 +156,7 @@ func (vd *valuesDecoder) decodeInplace(values []string, vt valueType, dict *valu } n := uint64(v[0]) dstLen := len(dstBuf) - dstBuf = strconv.AppendUint(dstBuf, n, 10) + dstBuf = marshalUint64(dstBuf, n) values[i] = bytesutil.ToUnsafeString(dstBuf[dstLen:]) } case valueTypeUint16: @@ -167,7 +167,7 @@ func (vd *valuesDecoder) decodeInplace(values []string, vt valueType, dict *valu b := bytesutil.ToUnsafeBytes(v) n := uint64(encoding.UnmarshalUint16(b)) dstLen := len(dstBuf) - dstBuf = strconv.AppendUint(dstBuf, n, 10) + dstBuf = marshalUint64(dstBuf, n) values[i] = bytesutil.ToUnsafeString(dstBuf[dstLen:]) } case valueTypeUint32: @@ -178,7 +178,7 @@ func (vd *valuesDecoder) decodeInplace(values []string, vt valueType, dict *valu b := bytesutil.ToUnsafeBytes(v) n := uint64(encoding.UnmarshalUint32(b)) dstLen := len(dstBuf) - dstBuf = strconv.AppendUint(dstBuf, n, 10) + dstBuf = marshalUint64(dstBuf, n) values[i] = bytesutil.ToUnsafeString(dstBuf[dstLen:]) } case valueTypeUint64: @@ -189,7 +189,7 @@ func (vd *valuesDecoder) decodeInplace(values []string, vt valueType, dict *valu b := bytesutil.ToUnsafeBytes(v) n := encoding.UnmarshalUint64(b) dstLen := len(dstBuf) - dstBuf = strconv.AppendUint(dstBuf, n, 10) + dstBuf = marshalUint64(dstBuf, n) values[i] = bytesutil.ToUnsafeString(dstBuf[dstLen:]) } case valueTypeDict: @@ -239,19 +239,18 @@ func (vd *valuesDecoder) decodeInplace(values []string, vt valueType, dict *valu func toTimestampISO8601String(dst []byte, v string) []byte { b := bytesutil.ToUnsafeBytes(v) n := encoding.UnmarshalUint64(b) - t := time.Unix(0, int64(n)).UTC() - dst = t.AppendFormat(dst, iso8601Timestamp) + dst = marshalTimestampISO8601(dst, int64(n)) return dst } func toIPv4String(dst []byte, v string) []byte { - dst = strconv.AppendUint(dst, uint64(v[0]), 10) + dst = marshalUint64(dst, uint64(v[0])) dst = append(dst, '.') - dst = strconv.AppendUint(dst, uint64(v[1]), 10) + dst = marshalUint64(dst, uint64(v[1])) dst = append(dst, '.') - dst = strconv.AppendUint(dst, uint64(v[2]), 10) + dst = marshalUint64(dst, uint64(v[2])) dst = append(dst, '.') - dst = strconv.AppendUint(dst, uint64(v[3]), 10) + dst = marshalUint64(dst, uint64(v[3])) return dst } @@ -259,7 +258,7 @@ func toFloat64String(dst []byte, v string) []byte { b := bytesutil.ToUnsafeBytes(v) n := encoding.UnmarshalUint64(b) f := math.Float64frombits(n) - dst = strconv.AppendFloat(dst, f, 'g', -1, 64) + dst = marshalFloat64(dst, f) return dst } @@ -279,10 +278,10 @@ func putValuesDecoder(vd *valuesDecoder) { var valuesDecoderPool sync.Pool func tryTimestampISO8601Encoding(dstBuf []byte, dstValues, srcValues []string) ([]byte, []string, valueType, uint64, uint64) { - u64s := encoding.GetUint64s(len(srcValues)) - defer encoding.PutUint64s(u64s) + u64s := encoding.GetInt64s(len(srcValues)) + defer encoding.PutInt64s(u64s) a := u64s.A - var minValue, maxValue uint64 + var minValue, maxValue int64 for i, v := range srcValues { n, ok := tryParseTimestampISO8601(v) if !ok { @@ -298,14 +297,69 @@ func tryTimestampISO8601Encoding(dstBuf []byte, dstValues, srcValues []string) ( } for _, n := range a { dstLen := len(dstBuf) - dstBuf = encoding.MarshalUint64(dstBuf, n) + dstBuf = encoding.MarshalUint64(dstBuf, uint64(n)) v := bytesutil.ToUnsafeString(dstBuf[dstLen:]) dstValues = append(dstValues, v) } - return dstBuf, dstValues, valueTypeTimestampISO8601, minValue, maxValue + return dstBuf, dstValues, valueTypeTimestampISO8601, uint64(minValue), uint64(maxValue) } -func tryParseTimestampISO8601(s string) (uint64, bool) { +// tryParseTimestampRFC3339Nano parses 'YYYY-MM-DDThh:mm:ss' with optional nanoseconds part and 'Z' tail and returns unix timestamp in nanoseconds. +// +// The returned timestamp can be negative if s is smaller than 1970 year. +func tryParseTimestampRFC3339Nano(s string) (int64, bool) { + // Do not parse timestamps with timezone other than Z, since they cannot be converted back + // to the same string representation in general case. + // This may break search. + + if len(s) < len("2006-01-02T15:04:05Z") { + return 0, false + } + + secs, ok, tail := tryParseTimestampSecs(s) + if !ok { + return 0, false + } + s = tail + nsecs := secs * 1e9 + + // Parse optional fractional part of seconds. + n := strings.IndexByte(s, 'Z') + if n < 0 || n != len(s)-1 { + return 0, false + } + s = s[:n] + if len(s) == 0 { + return nsecs, true + } + if s[0] == '.' { + s = s[1:] + } + digits := len(s) + if digits > 9 { + return 0, false + } + n64, ok := tryParseUint64(s) + if !ok { + return 0, false + } + + if digits < 9 { + n64 *= uint64(math.Pow10(9 - digits)) + } + nsecs += int64(n64) + return nsecs, true +} + +// marshalTimestampRFC3339Nano appends RFC3339Nano-formatted nsecs to dst and returns the result. +func marshalTimestampRFC3339Nano(dst []byte, nsecs int64) []byte { + return time.Unix(0, nsecs).UTC().AppendFormat(dst, time.RFC3339Nano) +} + +// tryParseTimestampISO8601 parses 'YYYY-MM-DDThh:mm:ss.mssZ' and returns unix timestamp in nanoseconds. +// +// The returned timestamp can be negative if s is smaller than 1970 year. +func tryParseTimestampISO8601(s string) (int64, bool) { // Do not parse timestamps with timezone, since they cannot be converted back // to the same string representation in general case. // This may break search. @@ -313,117 +367,155 @@ func tryParseTimestampISO8601(s string) (uint64, bool) { return 0, false } + secs, ok, tail := tryParseTimestampSecs(s) + if !ok { + return 0, false + } + s = tail + nsecs := secs * 1e9 + + if s[0] != '.' { + return 0, false + } + s = s[1:] + + // Parse milliseconds + tzDelimiter := s[len("000")] + if tzDelimiter != 'Z' { + return 0, false + } + millisecondStr := s[:len("000")] + msecs, ok := tryParseUint64(millisecondStr) + if !ok { + return 0, false + } + s = s[len("000")+1:] + + if len(s) != 0 { + logger.Panicf("BUG: unexpected tail in timestamp: %q", s) + } + + nsecs += int64(msecs) * 1e6 + return nsecs, true +} + +// marshalTimestampISO8601 appends ISO8601-formatted nsecs to dst and returns the result. +func marshalTimestampISO8601(dst []byte, nsecs int64) []byte { + return time.Unix(0, nsecs).UTC().AppendFormat(dst, iso8601Timestamp) +} + +const iso8601Timestamp = "2006-01-02T15:04:05.000Z" + +// tryParseTimestampSecs parses YYYY-MM-DDTHH:mm:ss into unix timestamp in seconds. +func tryParseTimestampSecs(s string) (int64, bool, string) { // Parse year if s[len("YYYY")] != '-' { - return 0, false + return 0, false, s } yearStr := s[:len("YYYY")] n, ok := tryParseUint64(yearStr) - if !ok || n > 3000 { - return 0, false + if !ok || n < 1677 || n > 2262 { + return 0, false, s } year := int(n) s = s[len("YYYY")+1:] // Parse month if s[len("MM")] != '-' { - return 0, false + return 0, false, s } monthStr := s[:len("MM")] n, ok = tryParseUint64(monthStr) - if !ok || n < 1 || n > 12 { - return 0, false + if !ok { + return 0, false, s } month := time.Month(n) s = s[len("MM")+1:] // Parse day if s[len("DD")] != 'T' { - return 0, false + return 0, false, s } dayStr := s[:len("DD")] n, ok = tryParseUint64(dayStr) - if !ok || n < 1 || n > 31 { - return 0, false + if !ok { + return 0, false, s } day := int(n) s = s[len("DD")+1:] // Parse hour if s[len("HH")] != ':' { - return 0, false + return 0, false, s } hourStr := s[:len("HH")] n, ok = tryParseUint64(hourStr) - if !ok || n > 23 { - return 0, false + if !ok { + return 0, false, s } hour := int(n) s = s[len("HH")+1:] // Parse minute if s[len("MM")] != ':' { - return 0, false + return 0, false, s } minuteStr := s[:len("MM")] n, ok = tryParseUint64(minuteStr) - if !ok || n > 59 { - return 0, false + if !ok { + return 0, false, s } minute := int(n) s = s[len("MM")+1:] // Parse second - if s[len("SS")] != '.' { - return 0, false - } secondStr := s[:len("SS")] n, ok = tryParseUint64(secondStr) - if !ok || n > 59 { - return 0, false + if !ok { + return 0, false, s } second := int(n) - s = s[len("SS")+1:] + s = s[len("SS"):] - // Parse millisecond - tzDelimiter := s[len("000")] - if tzDelimiter != 'Z' { - return 0, false + secs := time.Date(year, month, day, hour, minute, second, 0, time.UTC).Unix() + if secs < int64(-1<<63)/1e9 || secs >= int64((1<<63)-1)/1e9 { + // Too big or too small timestamp + return 0, false, s } - millisecondStr := s[:len("000")] - n, ok = tryParseUint64(millisecondStr) - if !ok || n > 999 { - return 0, false - } - millisecond := int(n) - s = s[len("000")+1:] - - if len(s) != 0 { - return 0, false - } - - t := time.Date(year, month, day, hour, minute, second, millisecond*1e6, time.UTC) - ts := t.UnixNano() - return uint64(ts), true + return secs, true, s } +// tryParseUint64 parses s as uint64 value. func tryParseUint64(s string) (uint64, bool) { - if len(s) == 0 || len(s) > 18 { + if len(s) == 0 || len(s) > len("18_446_744_073_709_551_615") { return 0, false } n := uint64(0) for i := 0; i < len(s); i++ { ch := s[i] + if ch == '_' { + continue + } if ch < '0' || ch > '9' { return 0, false } + if n > ((1<<64)-1)/10 { + return 0, false + } n *= 10 - n += uint64(ch - '0') + d := uint64(ch - '0') + if n > (1<<64)-1-d { + return 0, false + } + n += d } return n, true } -const iso8601Timestamp = "2006-01-02T15:04:05.000Z" +// marshalUint64 appends string representation of n to dst and returns the result. +func marshalUint64(dst []byte, n uint64) []byte { + return strconv.AppendUint(dst, n, 10) +} func tryIPv4Encoding(dstBuf []byte, dstValues, srcValues []string) ([]byte, []string, valueType, uint64, uint64) { u32s := encoding.GetUint32s(len(srcValues)) @@ -452,6 +544,7 @@ func tryIPv4Encoding(dstBuf []byte, dstValues, srcValues []string) ([]byte, []st return dstBuf, dstValues, valueTypeIPv4, uint64(minValue), uint64(maxValue) } +// tryParseIPv4 tries parsing ipv4 from s. func tryParseIPv4(s string) (uint32, bool) { if len(s) < len("1.1.1.1") || len(s) > len("255.255.255.255") || strings.Count(s, ".") != 3 { // Fast path - the entry isn't IPv4 @@ -509,6 +602,18 @@ func tryParseIPv4(s string) (uint32, bool) { return ipv4, true } +// marshalIPv4 appends string representation of IPv4 address in n to dst and returns the result. +func marshalIPv4(dst []byte, n uint32) []byte { + dst = marshalUint64(dst, uint64(n>>24)) + dst = append(dst, '.') + dst = marshalUint64(dst, uint64((n>>16)&0xff)) + dst = append(dst, '.') + dst = marshalUint64(dst, uint64((n>>8)&0xff)) + dst = append(dst, '.') + dst = marshalUint64(dst, uint64(n&0xff)) + return dst +} + func tryFloat64Encoding(dstBuf []byte, dstValues, srcValues []string) ([]byte, []string, valueType, uint64, uint64) { u64s := encoding.GetUint64s(len(srcValues)) defer encoding.PutUint64s(u64s) @@ -538,6 +643,20 @@ func tryFloat64Encoding(dstBuf []byte, dstValues, srcValues []string) ([]byte, [ return dstBuf, dstValues, valueTypeFloat64, minValueU64, maxValueU64 } +// tryParseFloat64Prefix tries parsing float64 number at the beginning of s and returns the remaining tail. +func tryParseFloat64Prefix(s string) (float64, bool, string) { + i := 0 + for i < len(s) && (s[i] >= '0' && s[i] <= '9' || s[i] == '.' || s[i] == '_') { + i++ + } + if i == 0 { + return 0, false, s + } + f, ok := tryParseFloat64(s[:i]) + return f, ok, s[i:] +} + +// tryParseFloat64 tries parsing s as float64. func tryParseFloat64(s string) (float64, bool) { if len(s) == 0 || len(s) > 20 { return 0, false @@ -578,13 +697,292 @@ func tryParseFloat64(s string) (float64, bool) { if !ok { return 0, false } - f := math.FMA(float64(nFrac), math.Pow10(-len(sFrac)), float64(nInt)) + p10 := math.Pow10(strings.Count(sFrac, "_") - len(sFrac)) + f := math.FMA(float64(nFrac), p10, float64(nInt)) if minus { f = -f } return f, true } +// marshalFloat64 appends formatted f to dst and returns the result. +func marshalFloat64(dst []byte, f float64) []byte { + return strconv.AppendFloat(dst, f, 'f', -1, 64) +} + +// tryParseBytes parses user-readable bytes representation in s. +// +// Supported suffixes: +// +// K, KB - for 1000 +func tryParseBytes(s string) (int64, bool) { + if len(s) == 0 { + return 0, false + } + n := int64(0) + for len(s) > 0 { + f, ok, tail := tryParseFloat64Prefix(s) + if !ok { + return 0, false + } + s = tail + if len(s) == 0 { + n += int64(f) + continue + } + if len(s) >= 3 { + prefix := s[:3] + switch { + case strings.EqualFold(prefix, "kib"): + n += int64(f * (1 << 10)) + s = s[3:] + continue + case strings.EqualFold(prefix, "mib"): + n += int64(f * (1 << 20)) + s = s[3:] + continue + case strings.EqualFold(prefix, "gib"): + n += int64(f * (1 << 30)) + s = s[3:] + continue + case strings.EqualFold(prefix, "tib"): + n += int64(f * (1 << 40)) + s = s[3:] + continue + } + } + if len(s) >= 2 { + prefix := s[:2] + switch { + case strings.EqualFold(prefix, "ki"): + n += int64(f * (1 << 10)) + s = s[2:] + continue + case strings.EqualFold(prefix, "mi"): + n += int64(f * (1 << 20)) + s = s[2:] + continue + case strings.EqualFold(prefix, "gi"): + n += int64(f * (1 << 30)) + s = s[2:] + continue + case strings.EqualFold(prefix, "ti"): + n += int64(f * (1 << 40)) + s = s[2:] + continue + case strings.EqualFold(prefix, "kb"): + n += int64(f * 1_000) + s = s[2:] + continue + case strings.EqualFold(prefix, "mb"): + n += int64(f * 1_000_000) + s = s[2:] + continue + case strings.EqualFold(prefix, "gb"): + n += int64(f * 1_000_000_000) + s = s[2:] + continue + case strings.EqualFold(prefix, "tb"): + n += int64(f * 1_000_000_000_000) + s = s[2:] + continue + } + } + prefix := s[:1] + switch { + case strings.EqualFold(prefix, "b"): + n += int64(f) + s = s[1:] + continue + case strings.EqualFold(prefix, "k"): + n += int64(f * 1_000) + s = s[1:] + continue + case strings.EqualFold(prefix, "m"): + n += int64(f * 1_000_000) + s = s[1:] + continue + case strings.EqualFold(prefix, "g"): + n += int64(f * 1_000_000_000) + s = s[1:] + continue + case strings.EqualFold(prefix, "t"): + n += int64(f * 1_000_000_000_000) + s = s[1:] + continue + } + } + return n, true +} + +// tryParseIPv4Mask parses '/num' ipv4 mask and returns (1<<(32-num)) +func tryParseIPv4Mask(s string) (uint64, bool) { + if len(s) == 0 || s[0] != '/' { + return 0, false + } + s = s[1:] + n, ok := tryParseUint64(s) + if !ok || n > 32 { + return 0, false + } + return 1 << (32 - uint8(n)), true +} + +// tryParseDuration parses the given duration in nanoseconds and returns the result. +func tryParseDuration(s string) (int64, bool) { + if len(s) == 0 { + return 0, false + } + isMinus := s[0] == '-' + if isMinus { + s = s[1:] + } + + nsecs := int64(0) + for len(s) > 0 { + f, ok, tail := tryParseFloat64Prefix(s) + if !ok { + return 0, false + } + s = tail + if len(s) == 0 { + return 0, false + } + if len(s) >= 3 { + prefix := s[:3] + if strings.EqualFold(prefix, "µs") { + nsecs += int64(f * nsecsPerMicrosecond) + s = s[3:] + continue + } + } + if len(s) >= 2 { + prefix := s[:2] + switch { + case strings.EqualFold(prefix, "ms"): + nsecs += int64(f * nsecsPerMillisecond) + s = s[2:] + continue + case strings.EqualFold(prefix, "ns"): + nsecs += int64(f) + s = s[2:] + continue + } + } + prefix := s[:1] + switch { + case strings.EqualFold(prefix, "y"): + nsecs += int64(f * nsecsPerYear) + s = s[1:] + case strings.EqualFold(prefix, "w"): + nsecs += int64(f * nsecsPerWeek) + s = s[1:] + continue + case strings.EqualFold(prefix, "d"): + nsecs += int64(f * nsecsPerDay) + s = s[1:] + continue + case strings.EqualFold(prefix, "h"): + nsecs += int64(f * nsecsPerHour) + s = s[1:] + continue + case strings.EqualFold(prefix, "m"): + nsecs += int64(f * nsecsPerMinute) + s = s[1:] + continue + case strings.EqualFold(prefix, "s"): + nsecs += int64(f * nsecsPerSecond) + s = s[1:] + continue + default: + return 0, false + } + } + + if isMinus { + nsecs = -nsecs + } + return nsecs, true +} + +// marshalDuration appends string representation of nsec duration to dst and returns the result. +func marshalDuration(dst []byte, nsecs int64) []byte { + if nsecs == 0 { + return append(dst, '0') + } + + if nsecs < 0 { + dst = append(dst, '-') + nsecs = -nsecs + } + formatFloat64Seconds := nsecs >= nsecsPerSecond + + if nsecs >= nsecsPerWeek { + weeks := nsecs / nsecsPerWeek + nsecs -= weeks * nsecsPerWeek + dst = marshalUint64(dst, uint64(weeks)) + dst = append(dst, 'w') + } + if nsecs >= nsecsPerDay { + days := nsecs / nsecsPerDay + nsecs -= days * nsecsPerDay + dst = marshalUint64(dst, uint64(days)) + dst = append(dst, 'd') + } + if nsecs >= nsecsPerHour { + hours := nsecs / nsecsPerHour + nsecs -= hours * nsecsPerHour + dst = marshalUint64(dst, uint64(hours)) + dst = append(dst, 'h') + } + if nsecs >= nsecsPerMinute { + minutes := nsecs / nsecsPerMinute + nsecs -= minutes * nsecsPerMinute + dst = marshalUint64(dst, uint64(minutes)) + dst = append(dst, 'm') + } + if nsecs >= nsecsPerSecond { + if formatFloat64Seconds { + seconds := float64(nsecs) / nsecsPerSecond + dst = marshalFloat64(dst, seconds) + dst = append(dst, 's') + return dst + } + seconds := nsecs / nsecsPerSecond + nsecs -= seconds * nsecsPerSecond + dst = marshalUint64(dst, uint64(seconds)) + dst = append(dst, 's') + } + if nsecs >= nsecsPerMillisecond { + msecs := nsecs / nsecsPerMillisecond + nsecs -= msecs * nsecsPerMillisecond + dst = marshalUint64(dst, uint64(msecs)) + dst = append(dst, "ms"...) + } + if nsecs >= nsecsPerMicrosecond { + usecs := nsecs / nsecsPerMicrosecond + nsecs -= usecs * nsecsPerMicrosecond + dst = marshalUint64(dst, uint64(usecs)) + dst = append(dst, "µs"...) + } + if nsecs > 0 { + dst = marshalUint64(dst, uint64(nsecs)) + dst = append(dst, "ns"...) + } + return dst +} + +const ( + nsecsPerYear = 365 * 24 * 3600 * 1e9 + nsecsPerWeek = 7 * 24 * 3600 * 1e9 + nsecsPerDay = 24 * 3600 * 1e9 + nsecsPerHour = 3600 * 1e9 + nsecsPerMinute = 60 * 1e9 + nsecsPerSecond = 1e9 + nsecsPerMillisecond = 1e6 + nsecsPerMicrosecond = 1e3 +) + func tryUintEncoding(dstBuf []byte, dstValues, srcValues []string) ([]byte, []string, valueType, uint64, uint64) { u64s := encoding.GetUint64s(len(srcValues)) defer encoding.PutUint64s(u64s) diff --git a/lib/logstorage/values_encoder_test.go b/lib/logstorage/values_encoder_test.go index c1a7c5409..d7c068b61 100644 --- a/lib/logstorage/values_encoder_test.go +++ b/lib/logstorage/values_encoder_test.go @@ -96,133 +96,601 @@ func TestValuesEncoder(t *testing.T) { f(values, valueTypeFloat64, 4607182418800017408, 4613937818241073152) } -func TestTryParseIPv4(t *testing.T) { - f := func(s string, nExpected uint32, okExpected bool) { +func TestTryParseIPv4_Success(t *testing.T) { + f := func(s string) { t.Helper() + n, ok := tryParseIPv4(s) - if n != nExpected { - t.Fatalf("unexpected n; got %d; want %d", n, nExpected) + if !ok { + t.Fatalf("cannot parse %q", s) } - if ok != okExpected { - t.Fatalf("unexpected ok; got %v; want %v", ok, okExpected) + data := marshalIPv4(nil, n) + if string(data) != s { + t.Fatalf("unexpected ip; got %q; want %q", data, s) } } - f("", 0, false) - f("foo", 0, false) - f("a.b.c.d", 0, false) - f("1.2.3.4", 0x01020304, true) - f("255.255.255.255", 0xffffffff, true) - f("0.0.0.0", 0, true) - f("127.0.0.1", 0x7f000001, true) - f("127.0.0.x", 0, false) - f("127.0.x.0", 0, false) - f("127.x.0.0", 0, false) - f("x.0.0.0", 0, false) - f("127.127.127.256", 0, false) - f("127.127.256.127", 0, false) - f("127.256.127.127", 0, false) - f("256.127.127.127", 0, false) - f("-1.127.127.127", 0, false) - f("127.-1.127.127", 0, false) - f("127.127.-1.127", 0, false) - f("127.127.127.-1", 0, false) + f("0.0.0.0") + f("1.2.3.4") + f("255.255.255.255") + f("127.0.0.1") } -func TestTryParseTimestampISO8601(t *testing.T) { - f := func(s string, timestampExpected uint64, okExpected bool) { +func TestTryParseIPv4_Failure(t *testing.T) { + f := func(s string) { t.Helper() - timestamp, ok := tryParseTimestampISO8601(s) - if timestamp != timestampExpected { - t.Fatalf("unexpected timestamp; got %d; want %d", timestamp, timestampExpected) - } - if ok != okExpected { - t.Fatalf("unexpected ok; got %v; want %v", ok, okExpected) + + _, ok := tryParseIPv4(s) + if ok { + t.Fatalf("expecting error when parsing %q", s) } } - f("2023-01-15T23:45:51.123Z", 1673826351123000000, true) + f("") + f("foo") + f("a.b.c.d") + f("127.0.0.x") + f("127.0.x.0") + f("127.x.0.0") + f("x.0.0.0") - // Invalid milliseconds - f("2023-01-15T22:15:51.12345Z", 0, false) - f("2023-01-15T22:15:51.12Z", 0, false) - f("2023-01-15T22:15:51Z", 0, false) + // Too big octets + f("127.127.127.256") + f("127.127.256.127") + f("127.256.127.127") + f("256.127.127.127") - // Missing Z - f("2023-01-15T23:45:51.123", 0, false) + // Negative octets + f("-1.127.127.127") + f("127.-1.127.127") + f("127.127.-1.127") + f("127.127.127.-1") +} - // Invalid timestamp - f("foo", 0, false) - f("2023-01-15T23:45:51.123Zxyabcd", 0, false) - f("2023-01-15T23:45:51.123Z01:00", 0, false) +func TestTryParseTimestampRFC3339Nano_Success(t *testing.T) { + f := func(s string) { + t.Helper() + nsecs, ok := tryParseTimestampRFC3339Nano(s) + if !ok { + t.Fatalf("cannot parse timestamp %q", s) + } + data := marshalTimestampRFC3339Nano(nil, nsecs) + if string(data) != s { + t.Fatalf("unexpected timestamp; got %q; want %q", data, s) + } + } + + // No fractional seconds + f("2023-01-15T23:45:51Z") + + // Different number of fractional seconds + f("2023-01-15T23:45:51.1Z") + f("2023-01-15T23:45:51.12Z") + f("2023-01-15T23:45:51.123Z") + f("2023-01-15T23:45:51.1234Z") + f("2023-01-15T23:45:51.12345Z") + f("2023-01-15T23:45:51.123456Z") + f("2023-01-15T23:45:51.1234567Z") + f("2023-01-15T23:45:51.12345678Z") + f("2023-01-15T23:45:51.123456789Z") + + // The minimum possible timestamp + f("1677-09-21T00:12:44Z") + + // The maximum possible timestamp + f("2262-04-11T23:47:15.999999999Z") +} + +func TestTryParseTimestampRFC3339Nano_Failure(t *testing.T) { + f := func(s string) { + t.Helper() + _, ok := tryParseTimestampRFC3339Nano(s) + if ok { + t.Fatalf("expecting faulure when parsing %q", s) + } + } + + // invalid length + f("") + f("foobar") + + // Missing Z at the end + f("2023-01-15T22:15:51") + f("2023-01-15T22:15:51.123") + + // missing fractional part after dot + f("2023-01-15T22:15:51.Z") // timestamp with timezone - f("2023-01-16T00:45:51.123+01:00", 0, false) + f("2023-01-16T00:45:51+01:00") + f("2023-01-16T00:45:51.123+01:00") + + // too small year + f("1676-09-21T00:12:43Z") + + // too big year + f("2263-04-11T23:47:17Z") + + // too small timestamp + f("1677-09-21T00:12:43.999999999Z") + + // too big timestamp + f("2262-04-11T23:47:16Z") + + // invalid year + f("YYYY-04-11T23:47:17Z") + + // invalid moth + f("2023-MM-11T23:47:17Z") + + // invalid day + f("2023-01-DDT23:47:17Z") + + // invalid hour + f("2023-01-23Thh:47:17Z") + + // invalid minute + f("2023-01-23T23:mm:17Z") + + // invalid second + f("2023-01-23T23:33:ssZ") } -func TestTryParseFloat64(t *testing.T) { - f := func(s string, valueExpected float64, okExpected bool) { +func TestTryParseTimestampISO8601_Success(t *testing.T) { + f := func(s string) { t.Helper() - - value, ok := tryParseFloat64(s) - if value != valueExpected { - t.Fatalf("unexpected value; got %v; want %v", value, valueExpected) + nsecs, ok := tryParseTimestampISO8601(s) + if !ok { + t.Fatalf("cannot parse timestamp %q", s) } - if ok != okExpected { - t.Fatalf("unexpected ok; got %v; want %v", ok, okExpected) + data := marshalTimestampISO8601(nil, nsecs) + if string(data) != s { + t.Fatalf("unexpected timestamp; got %q; want %q", data, s) } } - f("0", 0, true) - f("1234567890", 1234567890, true) - f("-1.234567", -1.234567, true) + // regular timestamp + f("2023-01-15T23:45:51.123Z") + + // The minimum possible timestamp + f("1677-09-21T00:12:44.000Z") + + // The maximum possible timestamp + f("2262-04-11T23:47:15.999Z") +} + +func TestTryParseTimestampISO8601_Failure(t *testing.T) { + f := func(s string) { + t.Helper() + _, ok := tryParseTimestampISO8601(s) + if ok { + t.Fatalf("expecting faulure when parsing %q", s) + } + } + + // invalid length + f("") + f("foobar") + + // Missing Z at the end + f("2023-01-15T22:15:51.123") + f("2023-01-15T22:15:51.1234") + + // timestamp with timezone + f("2023-01-16T00:45:51.123+01:00") + + // too small year + f("1676-09-21T00:12:43.434Z") + + // too big year + f("2263-04-11T23:47:17.434Z") + + // too small timestamp + f("1677-09-21T00:12:43.999Z") + + // too big timestamp + f("2262-04-11T23:47:16.000Z") + + // invalid year + f("YYYY-04-11T23:47:17.123Z") + + // invalid moth + f("2023-MM-11T23:47:17.123Z") + + // invalid day + f("2023-01-DDT23:47:17.123Z") + + // invalid hour + f("2023-01-23Thh:47:17.123Z") + + // invalid minute + f("2023-01-23T23:mm:17.123Z") + + // invalid second + f("2023-01-23T23:33:ss.123Z") +} + +func TestTryParseDuration_Success(t *testing.T) { + f := func(s string, nsecsExpected int64) { + t.Helper() + + nsecs, ok := tryParseDuration(s) + if !ok { + t.Fatalf("cannot parse %q", s) + } + if nsecs != nsecsExpected { + t.Fatalf("unexpected value; got %d; want %d", nsecs, nsecsExpected) + } + } + + // zero duration + f("0s", 0) + f("0S", 0) + f("0.0w0d0h0s0.0ms", 0) + f("-0w", 0) + + // positive duration + f("1s", nsecsPerSecond) + f("1.5ms", 1.5*nsecsPerMillisecond) + f("1µs", nsecsPerMicrosecond) + f("1ns", 1) + f("1NS", 1) + f("1nS", 1) + f("1Ns", 1) + f("1h", nsecsPerHour) + f("1H", nsecsPerHour) + f("1.5d", 1.5*nsecsPerDay) + f("1.5D", 1.5*nsecsPerDay) + f("1.5w", 1.5*nsecsPerWeek) + f("1.5W", 1.5*nsecsPerWeek) + f("2.5y", 2.5*nsecsPerYear) + f("1m5.123456789s", nsecsPerMinute+5.123456789*nsecsPerSecond) + + // composite duration + f("1h5m", nsecsPerHour+5*nsecsPerMinute) + f("1.1h5m2.5s3_456ns", 1.1*nsecsPerHour+5*nsecsPerMinute+2.5*nsecsPerSecond+3456) + + // nedgative duration + f("-1h5m3s", -(nsecsPerHour + 5*nsecsPerMinute + 3*nsecsPerSecond)) +} + +func TestTryParseDuration_Failure(t *testing.T) { + f := func(s string) { + t.Helper() + + _, ok := tryParseDuration(s) + if ok { + t.Fatalf("expecting error for parsing %q", s) + } + } + + // empty string + f("") + + // missing suffix + f("2") + f("2.5") + + // invalid string + f("foobar") + f("1foo") + f("1soo") + f("3.43e") + f("3.43es") + + // superflouous space + f(" 2s") + f("2s ") + f("2s 3ms") +} + +func TestMarshalDuration(t *testing.T) { + f := func(nsecs int64, resultExpected string) { + t.Helper() + + result := marshalDuration(nil, nsecs) + if string(result) != resultExpected { + t.Fatalf("unexpected result; got %q; want %q", result, resultExpected) + } + } + + f(0, "0") + f(1, "1ns") + f(-1, "-1ns") + f(12345, "12µs345ns") + f(123456789, "123ms456µs789ns") + f(12345678901, "12.345678901s") + f(1234567890143, "20m34.567890143s") + f(1234567890123457, "2w6h56m7.890123457s") +} + +func TestTryParseBytes_Success(t *testing.T) { + f := func(s string, resultExpected int64) { + t.Helper() + + result, ok := tryParseBytes(s) + if !ok { + t.Fatalf("cannot parse %q", s) + } + if result != resultExpected { + t.Fatalf("unexpected result; got %d; want %d", result, resultExpected) + } + } + + f("123.456", 123) + f("1_500", 1_500) + + f("2.5b", 2) + f("2.5B", 2) + + f("1.5k", 1_500) + f("1.5m", 1_500_000) + f("1.5g", 1_500_000_000) + f("1.5t", 1_500_000_000_000) + + f("1.5K", 1_500) + f("1.5M", 1_500_000) + f("1.5G", 1_500_000_000) + f("1.5T", 1_500_000_000_000) + + f("1.5kb", 1_500) + f("1.5mb", 1_500_000) + f("1.5gb", 1_500_000_000) + f("1.5tb", 1_500_000_000_000) + + f("1.5Kb", 1_500) + f("1.5Mb", 1_500_000) + f("1.5Gb", 1_500_000_000) + f("1.5Tb", 1_500_000_000_000) + + f("1.5KB", 1_500) + f("1.5MB", 1_500_000) + f("1.5GB", 1_500_000_000) + f("1.5TB", 1_500_000_000_000) + + f("1.5ki", 1.5*(1<<10)) + f("1.5mi", 1.5*(1<<20)) + f("1.5gi", 1.5*(1<<30)) + f("1.5ti", 1.5*(1<<40)) + + f("1.5Ki", 1.5*(1<<10)) + f("1.5Mi", 1.5*(1<<20)) + f("1.5Gi", 1.5*(1<<30)) + f("1.5Ti", 1.5*(1<<40)) + + f("1.5KI", 1.5*(1<<10)) + f("1.5MI", 1.5*(1<<20)) + f("1.5GI", 1.5*(1<<30)) + f("1.5TI", 1.5*(1<<40)) + + f("1.5kib", 1.5*(1<<10)) + f("1.5mib", 1.5*(1<<20)) + f("1.5gib", 1.5*(1<<30)) + f("1.5tib", 1.5*(1<<40)) + + f("1.5kiB", 1.5*(1<<10)) + f("1.5miB", 1.5*(1<<20)) + f("1.5giB", 1.5*(1<<30)) + f("1.5tiB", 1.5*(1<<40)) + + f("1.5KiB", 1.5*(1<<10)) + f("1.5MiB", 1.5*(1<<20)) + f("1.5GiB", 1.5*(1<<30)) + f("1.5TiB", 1.5*(1<<40)) + + f("1MiB500KiB200B", (1<<20)+500*(1<<10)+200) +} + +func TestTryParseBytes_Failure(t *testing.T) { + f := func(s string) { + t.Helper() + + _, ok := tryParseBytes(s) + if ok { + t.Fatalf("expecting error when parsing %q", s) + } + } + + // empty string + f("") + + // invalid number + f("foobar") + + // invalid suffix + f("123q") + f("123qs") + f("123qsb") + f("123sqsb") + f("123s5qsb") +} + +func TestTryParseFloat64_Success(t *testing.T) { + f := func(s string, resultExpected float64) { + t.Helper() + + result, ok := tryParseFloat64(s) + if !ok { + t.Fatalf("cannot parse %q", s) + } + if !float64Equal(result, resultExpected) { + t.Fatalf("unexpected value; got %f; want %f", result, resultExpected) + } + } + + f("0", 0) + f("1", 1) + f("-1", -1) + f("1234567890", 1234567890) + f("1_234_567_890", 1234567890) + f("-1.234_567", -1.234567) + + f("0.345", 0.345) + f("-0.345", -0.345) +} + +func float64Equal(a, b float64) bool { + return math.Abs(a-b)*math.Abs(max(a, b)) < 1e-15 +} + +func TestTryParseFloat64_Failure(t *testing.T) { + f := func(s string) { + t.Helper() + + _, ok := tryParseFloat64(s) + if ok { + t.Fatalf("expecting error when parsing %q", s) + } + } // Empty value - f("", 0, false) + f("") // Plus in the value isn't allowed, since it cannot be convered back to the same string representation - f("+123", 0, false) + f("+123") // Dot at the beginning and the end of value isn't allowed, since it cannot converted back to the same string representation - f(".123", 0, false) - f("123.", 0, false) + f(".123") + f("123.") // Multiple dots aren't allowed - f("123.434.55", 0, false) + f("123.434.55") // Invalid dots - f("-.123", 0, false) - f(".", 0, false) + f("-.123") + f(".") // Scientific notation isn't allowed, since it cannot be converted back to the same string representation - f("12e5", 0, false) + f("12e5") // Minus in the middle of string isn't allowed - f("12-5", 0, false) + f("12-5") } -func TestTryParseUint64(t *testing.T) { - f := func(s string, valueExpected uint64, okExpected bool) { +func TestMarshalFloat64(t *testing.T) { + f := func(f float64, resultExpected string) { t.Helper() - value, ok := tryParseUint64(s) - if value != valueExpected { - t.Fatalf("unexpected value; got %d; want %d", value, valueExpected) - } - if ok != okExpected { - t.Fatalf("unexpected ok; got %v; want %v", ok, okExpected) + result := marshalFloat64(nil, f) + if string(result) != resultExpected { + t.Fatalf("unexpected result; got %q; want %q", result, resultExpected) } } - f("0", 0, true) - f("123456789012345678", 123456789012345678, true) + f(0, "0") + f(1234, "1234") + f(-12345678, "-12345678") + f(1.234, "1.234") + f(-1.234567, "-1.234567") +} + +func TestTryParseUint64_Success(t *testing.T) { + f := func(s string, resultExpected uint64) { + t.Helper() + + result, ok := tryParseUint64(s) + if !ok { + t.Fatalf("cannot parse %q", s) + } + if result != resultExpected { + t.Fatalf("unexpected value; got %d; want %d", result, resultExpected) + } + } + + f("0", 0) + f("123", 123) + f("123456", 123456) + f("123456789", 123456789) + f("123456789012", 123456789012) + f("123456789012345", 123456789012345) + f("123456789012345678", 123456789012345678) + f("12345678901234567890", 12345678901234567890) + f("12_345_678_901_234_567_890", 12345678901234567890) + + // the maximum possible value + f("18446744073709551615", 18446744073709551615) +} + +func TestTryParseUint64_Failure(t *testing.T) { + f := func(s string) { + t.Helper() + + _, ok := tryParseUint64(s) + if ok { + t.Fatalf("expecting error when parsing %q", s) + } + } // empty value - f("", 0, false) + f("") // too big value - f("1234567890123456789", 0, false) + f("18446744073709551616") // invalid value - f("foo", 0, false) + f("foo") +} + +func TestMarshalUint64(t *testing.T) { + f := func(n uint64, resultExpected string) { + t.Helper() + + result := marshalUint64(nil, n) + if string(result) != resultExpected { + t.Fatalf("unexpected result; got %q; want %q", result, resultExpected) + } + } + + f(0, "0") + f(123456, "123456") + + // the maximum possible value + f(18446744073709551615, "18446744073709551615") + f(18_446_744_073_709_551_615, "18446744073709551615") +} + +func TestTryParseIPv4Mask_Success(t *testing.T) { + f := func(s string, resultExpected uint64) { + t.Helper() + + result, ok := tryParseIPv4Mask(s) + if !ok { + t.Fatalf("cannot parse %q", s) + } + if result != resultExpected { + t.Fatalf("unexpected result; got %d; want %d", result, resultExpected) + } + } + + f("/0", 1<<32) + f("/1", 1<<31) + f("/8", 1<<24) + f("/24", 1<<8) + f("/32", 1) +} + +func TestTryParseIPv4Mask_Failure(t *testing.T) { + f := func(s string) { + t.Helper() + + _, ok := tryParseIPv4Mask(s) + if ok { + t.Fatalf("expecting error when parsing %q", s) + } + } + + // Empty mask + f("") + + // Invalid prefix + f("foo") + + // Non-numeric mask + f("/foo") + + // Too big mask + f("/33") + + // Negative mask + f("/-1") } diff --git a/lib/logstorage/values_encoder_timing_test.go b/lib/logstorage/values_encoder_timing_test.go index 10059da41..a86d91c23 100644 --- a/lib/logstorage/values_encoder_timing_test.go +++ b/lib/logstorage/values_encoder_timing_test.go @@ -5,12 +5,35 @@ import ( "testing" ) +func BenchmarkTryParseTimestampRFC3339Nano(b *testing.B) { + a := []string{ + "2023-01-15T23:45:51Z", + "2023-02-15T23:45:51.123Z", + "2024-02-15T23:45:51.123456Z", + "2025-02-15T22:45:51.123456789Z", + "2023-02-15T22:45:51.000000000Z", + } + + b.SetBytes(int64(len(a))) + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + for _, s := range a { + _, ok := tryParseTimestampRFC3339Nano(s) + if !ok { + panic(fmt.Errorf("cannot parse timestamp %q", s)) + } + } + } + }) +} + func BenchmarkTryParseTimestampISO8601(b *testing.B) { a := []string{ "2023-01-15T23:45:51.123Z", "2023-02-15T23:45:51.123Z", - "2023-02-15T23:45:51.123+01:00", - "2023-02-15T22:45:51.123-10:30", + "2024-02-15T23:45:51.123Z", + "2025-02-15T22:45:51.123Z", "2023-02-15T22:45:51.000Z", }