From 7fd9d31e90d038f01d14b8eeec30e4c2b16e1f63 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 3 May 2024 12:10:45 +0200 Subject: [PATCH] wip --- docs/VictoriaLogs/LogsQL.md | 7 ++ lib/logstorage/block_result.go | 111 ++++++++++++++++++------------ lib/logstorage/parser_test.go | 5 ++ lib/logstorage/pipe_stats.go | 65 ++++++++++++++--- lib/logstorage/pipe_stats_test.go | 64 ++++++++++++++++- lib/logstorage/values_encoder.go | 12 +++- 6 files changed, 209 insertions(+), 55 deletions(-) diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 09432a7f4..99ce31d66 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1128,6 +1128,13 @@ For example, the following query returns per-minute number of log messages with _time:10m error | stats by (_time:1m) count() errors_per_minute ``` +It is possible to add offset (for example, [timezone offset](https://en.wikipedia.org/wiki/UTC_offset)) when bucketing by `_time`. For example, the following query calculates +the number of per-day log entries for the last week at '2h' offset aka `UTC+02:00` offset: + +```logsql +_time:1w | stats by (_time:1d offset 2h) count() logs_per_day_kyiv_offset +``` + #### Numeric buckets Stats can be bucketed by any numeric [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with the `field_name:bucket_size` syntax inside `by(...)` clause. diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index 80ed98596..7c3673176 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -338,40 +338,40 @@ func (br *blockResult) addConstColumn(name, value string) { }) } -func (br *blockResult) getBucketedColumnValues(c *blockResultColumn, bucketSize float64) []string { +func (br *blockResult) getBucketedColumnValues(c *blockResultColumn, bucketSize, bucketOffset float64) []string { if c.isConst { - return br.getBucketedConstValues(c.encodedValues[0], bucketSize) + return br.getBucketedConstValues(c.encodedValues[0], bucketSize, bucketOffset) } if c.isTime { - return br.getBucketedTimestampValues(bucketSize) + return br.getBucketedTimestampValues(bucketSize, bucketOffset) } switch c.valueType { case valueTypeString: - return br.getBucketedStringValues(c.encodedValues, bucketSize) + return br.getBucketedStringValues(c.encodedValues, bucketSize, bucketOffset) case valueTypeDict: - return br.getBucketedDictValues(c.encodedValues, c.dictValues, bucketSize) + return br.getBucketedDictValues(c.encodedValues, c.dictValues, bucketSize, bucketOffset) case valueTypeUint8: - return br.getBucketedUint8Values(c.encodedValues, bucketSize) + return br.getBucketedUint8Values(c.encodedValues, bucketSize, bucketOffset) case valueTypeUint16: - return br.getBucketedUint16Values(c.encodedValues, bucketSize) + return br.getBucketedUint16Values(c.encodedValues, bucketSize, bucketOffset) case valueTypeUint32: - return br.getBucketedUint32Values(c.encodedValues, bucketSize) + return br.getBucketedUint32Values(c.encodedValues, bucketSize, bucketOffset) case valueTypeUint64: - return br.getBucketedUint64Values(c.encodedValues, bucketSize) + return br.getBucketedUint64Values(c.encodedValues, bucketSize, bucketOffset) case valueTypeFloat64: - return br.getBucketedFloat64Values(c.encodedValues, bucketSize) + return br.getBucketedFloat64Values(c.encodedValues, bucketSize, bucketOffset) case valueTypeIPv4: - return br.getBucketedIPv4Values(c.encodedValues, bucketSize) + return br.getBucketedIPv4Values(c.encodedValues, bucketSize, bucketOffset) case valueTypeTimestampISO8601: - return br.getBucketedTimestampISO8601Values(c.encodedValues, bucketSize) + return br.getBucketedTimestampISO8601Values(c.encodedValues, bucketSize, bucketOffset) default: logger.Panicf("BUG: unknown valueType=%d", c.valueType) return nil } } -func (br *blockResult) getBucketedConstValues(v string, bucketSize float64) []string { +func (br *blockResult) getBucketedConstValues(v string, bucketSize, bucketOffset float64) []string { if v == "" { // Fast path - return a slice of empty strings without constructing the slice. return getEmptyStrings(len(br.timestamps)) @@ -382,7 +382,7 @@ func (br *blockResult) getBucketedConstValues(v string, bucketSize float64) []st valuesBuf := br.valuesBuf valuesBufLen := len(valuesBuf) - v = br.getBucketedValue(v, bucketSize) + v = br.getBucketedValue(v, bucketSize, bucketOffset) for range br.timestamps { valuesBuf = append(valuesBuf, v) } @@ -392,7 +392,7 @@ func (br *blockResult) getBucketedConstValues(v string, bucketSize float64) []st return valuesBuf[valuesBufLen:] } -func (br *blockResult) getBucketedTimestampValues(bucketSize float64) []string { +func (br *blockResult) getBucketedTimestampValues(bucketSize, bucketOffset float64) []string { buf := br.buf valuesBuf := br.valuesBuf valuesBufLen := len(valuesBuf) @@ -400,7 +400,7 @@ func (br *blockResult) getBucketedTimestampValues(bucketSize float64) []string { timestamps := br.timestamps var s string - if bucketSize <= 1 { + if bucketSize <= 1 && bucketOffset == 0 { for i := range timestamps { if i > 0 && timestamps[i-1] == timestamps[i] { valuesBuf = append(valuesBuf, s) @@ -414,6 +414,7 @@ func (br *blockResult) getBucketedTimestampValues(bucketSize float64) []string { } } else { bucketSizeInt := int64(bucketSize) + bucketOffsetInt := int64(bucketOffset) var prevTimestamp int64 for i := range timestamps { if i > 0 && timestamps[i-1] == timestamps[i] { @@ -422,6 +423,7 @@ func (br *blockResult) getBucketedTimestampValues(bucketSize float64) []string { } timestamp := timestamps[i] + timestamp -= bucketOffsetInt timestamp -= timestamp % bucketSizeInt if i > 0 && timestamp == prevTimestamp { valuesBuf = append(valuesBuf, s) @@ -442,8 +444,8 @@ func (br *blockResult) getBucketedTimestampValues(bucketSize float64) []string { return valuesBuf[valuesBufLen:] } -func (br *blockResult) getBucketedStringValues(values []string, bucketSize float64) []string { - if bucketSize <= 0 { +func (br *blockResult) getBucketedStringValues(values []string, bucketSize, bucketOffset float64) []string { + if bucketSize <= 0 && bucketOffset == 0 { return values } @@ -457,7 +459,7 @@ func (br *blockResult) getBucketedStringValues(values []string, bucketSize float continue } - s = br.getBucketedValue(values[i], bucketSize) + s = br.getBucketedValue(values[i], bucketSize, bucketOffset) valuesBuf = append(valuesBuf, s) } @@ -466,11 +468,11 @@ func (br *blockResult) getBucketedStringValues(values []string, bucketSize float return valuesBuf[valuesBufLen:] } -func (br *blockResult) getBucketedDictValues(encodedValues, dictValues []string, bucketSize float64) []string { +func (br *blockResult) getBucketedDictValues(encodedValues, dictValues []string, bucketSize, bucketOffset float64) []string { valuesBuf := br.valuesBuf valuesBufLen := len(valuesBuf) - dictValues = br.getBucketedStringValues(dictValues, bucketSize) + dictValues = br.getBucketedStringValues(dictValues, bucketSize, bucketOffset) for _, v := range encodedValues { dictIdx := v[0] valuesBuf = append(valuesBuf, dictValues[dictIdx]) @@ -481,14 +483,14 @@ func (br *blockResult) getBucketedDictValues(encodedValues, dictValues []string, return valuesBuf[valuesBufLen:] } -func (br *blockResult) getBucketedUint8Values(encodedValues []string, bucketSize float64) []string { +func (br *blockResult) getBucketedUint8Values(encodedValues []string, bucketSize, bucketOffset float64) []string { buf := br.buf valuesBuf := br.valuesBuf valuesBufLen := len(valuesBuf) var s string - if bucketSize <= 1 || bucketSize >= (1<<8) { + if (bucketSize <= 1 || bucketSize >= (1<<8)) && bucketOffset == 0 { for i, v := range encodedValues { if i > 0 && encodedValues[i-1] == encodedValues[i] { valuesBuf = append(valuesBuf, s) @@ -503,6 +505,7 @@ func (br *blockResult) getBucketedUint8Values(encodedValues []string, bucketSize } } else { bucketSizeInt := uint64(bucketSize) + bucketOffsetInt := uint64(int64(bucketOffset)) var nPrev uint64 for i, v := range encodedValues { if i > 0 && encodedValues[i-1] == encodedValues[i] { @@ -511,6 +514,7 @@ func (br *blockResult) getBucketedUint8Values(encodedValues []string, bucketSize } n := uint64(v[0]) + n -= bucketOffsetInt n -= n % bucketSizeInt if i > 0 && n == nPrev { valuesBuf = append(valuesBuf, s) @@ -531,14 +535,14 @@ func (br *blockResult) getBucketedUint8Values(encodedValues []string, bucketSize return br.valuesBuf[valuesBufLen:] } -func (br *blockResult) getBucketedUint16Values(encodedValues []string, bucketSize float64) []string { +func (br *blockResult) getBucketedUint16Values(encodedValues []string, bucketSize, bucketOffset float64) []string { buf := br.buf valuesBuf := br.valuesBuf valuesBufLen := len(valuesBuf) var s string - if bucketSize <= 1 || bucketSize >= (1<<16) { + if (bucketSize <= 1 || bucketSize >= (1<<16)) && bucketOffset == 0 { for i, v := range encodedValues { if i > 0 && encodedValues[i-1] == encodedValues[i] { valuesBuf = append(valuesBuf, s) @@ -554,6 +558,7 @@ func (br *blockResult) getBucketedUint16Values(encodedValues []string, bucketSiz } } else { bucketSizeInt := uint64(bucketSize) + bucketOffsetInt := uint64(int64(bucketOffset)) var nPrev uint64 for i, v := range encodedValues { if i > 0 && encodedValues[i-1] == encodedValues[i] { @@ -563,6 +568,7 @@ func (br *blockResult) getBucketedUint16Values(encodedValues []string, bucketSiz b := bytesutil.ToUnsafeBytes(v) n := uint64(encoding.UnmarshalUint16(b)) + n -= bucketOffsetInt n -= n % bucketSizeInt if i > 0 && n == nPrev { valuesBuf = append(valuesBuf, s) @@ -583,14 +589,14 @@ func (br *blockResult) getBucketedUint16Values(encodedValues []string, bucketSiz return br.valuesBuf[valuesBufLen:] } -func (br *blockResult) getBucketedUint32Values(encodedValues []string, bucketSize float64) []string { +func (br *blockResult) getBucketedUint32Values(encodedValues []string, bucketSize, bucketOffset float64) []string { buf := br.buf valuesBuf := br.valuesBuf valuesBufLen := len(valuesBuf) var s string - if bucketSize <= 1 || bucketSize >= (1<<32) { + if (bucketSize <= 1 || bucketSize >= (1<<32)) && bucketOffset == 0 { for i, v := range encodedValues { if i > 0 && encodedValues[i-1] == encodedValues[i] { valuesBuf = append(valuesBuf, s) @@ -606,6 +612,7 @@ func (br *blockResult) getBucketedUint32Values(encodedValues []string, bucketSiz } } else { bucketSizeInt := uint64(bucketSize) + bucketOffsetInt := uint64(int64(bucketOffset)) var nPrev uint64 for i, v := range encodedValues { if i > 0 && encodedValues[i-1] == encodedValues[i] { @@ -615,6 +622,7 @@ func (br *blockResult) getBucketedUint32Values(encodedValues []string, bucketSiz b := bytesutil.ToUnsafeBytes(v) n := uint64(encoding.UnmarshalUint32(b)) + n -= bucketOffsetInt n -= n % bucketSizeInt if i > 0 && n == nPrev { valuesBuf = append(valuesBuf, s) @@ -635,14 +643,14 @@ func (br *blockResult) getBucketedUint32Values(encodedValues []string, bucketSiz return br.valuesBuf[valuesBufLen:] } -func (br *blockResult) getBucketedUint64Values(encodedValues []string, bucketSize float64) []string { +func (br *blockResult) getBucketedUint64Values(encodedValues []string, bucketSize, bucketOffset float64) []string { buf := br.buf valuesBuf := br.valuesBuf valuesBufLen := len(valuesBuf) var s string - if bucketSize <= 1 || bucketSize >= (1<<64) { + if (bucketSize <= 1 || bucketSize >= (1<<64)) && bucketOffset == 0 { for i, v := range encodedValues { if i > 0 && encodedValues[i-1] == encodedValues[i] { valuesBuf = append(valuesBuf, s) @@ -658,6 +666,7 @@ func (br *blockResult) getBucketedUint64Values(encodedValues []string, bucketSiz } } else { bucketSizeInt := uint64(bucketSize) + bucketOffsetInt := uint64(int64(bucketOffset)) var nPrev uint64 for i, v := range encodedValues { if i > 0 && encodedValues[i-1] == encodedValues[i] { @@ -667,6 +676,7 @@ func (br *blockResult) getBucketedUint64Values(encodedValues []string, bucketSiz b := bytesutil.ToUnsafeBytes(v) n := encoding.UnmarshalUint64(b) + n -= bucketOffsetInt n -= n % bucketSizeInt if i > 0 && n == nPrev { valuesBuf = append(valuesBuf, s) @@ -687,14 +697,14 @@ func (br *blockResult) getBucketedUint64Values(encodedValues []string, bucketSiz return br.valuesBuf[valuesBufLen:] } -func (br *blockResult) getBucketedFloat64Values(encodedValues []string, bucketSize float64) []string { +func (br *blockResult) getBucketedFloat64Values(encodedValues []string, bucketSize, bucketOffset float64) []string { buf := br.buf valuesBuf := br.valuesBuf valuesBufLen := len(valuesBuf) var s string - if bucketSize <= 0 { + if bucketSize <= 0 && bucketOffset == 0 { for i, v := range encodedValues { if i > 0 && encodedValues[i-1] == encodedValues[i] { valuesBuf = append(valuesBuf, s) @@ -725,6 +735,8 @@ func (br *blockResult) getBucketedFloat64Values(encodedValues []string, bucketSi n := encoding.UnmarshalUint64(b) f := math.Float64frombits(n) + f -= bucketOffset + // emulate f % bucketSize for float64 values fP10 := int64(f * p10) fP10 -= fP10 % bucketSizeP10 @@ -749,14 +761,14 @@ func (br *blockResult) getBucketedFloat64Values(encodedValues []string, bucketSi return br.valuesBuf[valuesBufLen:] } -func (br *blockResult) getBucketedIPv4Values(encodedValues []string, bucketSize float64) []string { +func (br *blockResult) getBucketedIPv4Values(encodedValues []string, bucketSize, bucketOffset float64) []string { buf := br.buf valuesBuf := br.valuesBuf valuesBufLen := len(valuesBuf) var s string - if bucketSize <= 1 { + if bucketSize <= 1 && bucketOffset == 0 { for i, v := range encodedValues { if i > 0 && encodedValues[i-1] == encodedValues[i] { valuesBuf = append(valuesBuf, s) @@ -770,6 +782,7 @@ func (br *blockResult) getBucketedIPv4Values(encodedValues []string, bucketSize } } else { bucketSizeInt := uint32(bucketSize) + bucketOffsetInt := uint32(int32(bucketOffset)) var nPrev uint32 for i, v := range encodedValues { if i > 0 && encodedValues[i-1] == encodedValues[i] { @@ -779,6 +792,7 @@ func (br *blockResult) getBucketedIPv4Values(encodedValues []string, bucketSize b := bytesutil.ToUnsafeBytes(v) n := binary.BigEndian.Uint32(b) + n -= bucketOffsetInt n -= n % bucketSizeInt if i > 0 && n == nPrev { valuesBuf = append(valuesBuf, s) @@ -799,14 +813,14 @@ func (br *blockResult) getBucketedIPv4Values(encodedValues []string, bucketSize return valuesBuf[valuesBufLen:] } -func (br *blockResult) getBucketedTimestampISO8601Values(encodedValues []string, bucketSize float64) []string { +func (br *blockResult) getBucketedTimestampISO8601Values(encodedValues []string, bucketSize, bucketOffset float64) []string { buf := br.buf valuesBuf := br.valuesBuf valuesBufLen := len(valuesBuf) var s string - if bucketSize <= 1 { + if bucketSize <= 1 && bucketOffset == 0 { for i, v := range encodedValues { if i > 0 && encodedValues[i-1] == encodedValues[i] { valuesBuf = append(valuesBuf, s) @@ -823,6 +837,7 @@ func (br *blockResult) getBucketedTimestampISO8601Values(encodedValues []string, } } else { bucketSizeInt := uint64(bucketSize) + bucketOffsetInt := uint64(int64(bucketOffset)) var nPrev uint64 bb := bbPool.Get() for i, v := range encodedValues { @@ -833,6 +848,7 @@ func (br *blockResult) getBucketedTimestampISO8601Values(encodedValues []string, b := bytesutil.ToUnsafeBytes(v) n := encoding.UnmarshalUint64(b) + n -= bucketOffsetInt n -= n % bucketSizeInt if i > 0 && n == nPrev { valuesBuf = append(valuesBuf, s) @@ -854,9 +870,9 @@ func (br *blockResult) getBucketedTimestampISO8601Values(encodedValues []string, return valuesBuf[valuesBufLen:] } -// getBucketedValue returns bucketed s according to the given bucketSize. -func (br *blockResult) getBucketedValue(s string, bucketSize float64) string { - if bucketSize <= 0 { +// getBucketedValue returns bucketed s according to the given bucketSize and bucketOffset +func (br *blockResult) getBucketedValue(s string, bucketSize, bucketOffset float64) string { + if bucketSize <= 0 && bucketOffset == 0 { return s } if len(s) == 0 { @@ -870,6 +886,7 @@ func (br *blockResult) getBucketedValue(s string, bucketSize float64) string { } if f, ok := tryParseFloat64(s); ok { + f -= bucketOffset // emulate f % bucketSize for float64 values _, e := decimal.FromFloat(bucketSize) p10 := math.Pow10(int(-e)) @@ -883,6 +900,7 @@ func (br *blockResult) getBucketedValue(s string, bucketSize float64) string { } if nsecs, ok := tryParseTimestampISO8601(s); ok { + nsecs -= int64(bucketOffset) nsecs -= nsecs % int64(bucketSize) bufLen := len(br.buf) br.buf = marshalTimestampISO8601(br.buf, nsecs) @@ -890,6 +908,7 @@ func (br *blockResult) getBucketedValue(s string, bucketSize float64) string { } if nsecs, ok := tryParseTimestampRFC3339Nano(s); ok { + nsecs -= int64(bucketOffset) nsecs -= nsecs % int64(bucketSize) bufLen := len(br.buf) br.buf = marshalTimestampRFC3339Nano(br.buf, nsecs) @@ -897,6 +916,7 @@ func (br *blockResult) getBucketedValue(s string, bucketSize float64) string { } if n, ok := tryParseIPv4(s); ok { + n -= uint32(int32(bucketOffset)) n -= n % uint32(bucketSize) bufLen := len(br.buf) br.buf = marshalIPv4(br.buf, n) @@ -904,6 +924,7 @@ func (br *blockResult) getBucketedValue(s string, bucketSize float64) string { } if nsecs, ok := tryParseDuration(s); ok { + nsecs -= int64(bucketOffset) nsecs -= nsecs % int64(bucketSize) bufLen := len(br.buf) br.buf = marshalDuration(br.buf, nsecs) @@ -1042,6 +1063,9 @@ type blockResultColumn struct { // bucketSize contains bucketSize for bucketedValues bucketSize float64 + // bucketOffset contains bucketOffset for bucketedValues + bucketOffset float64 + // buf and valuesBuf are used by addValue() in order to re-use memory across resetRows(). buf []byte valuesBuf []string @@ -1135,19 +1159,20 @@ func (c *blockResultColumn) getValueAtRow(br *blockResult, rowIdx int) string { return values[rowIdx] } -// getValues returns values for the given column, bucketed according to bucketSize. +// getValues returns values for the given column, bucketed according to bucketSize and bucketOffset. // // The returned values are valid until br.reset() is called. -func (c *blockResultColumn) getBucketedValues(br *blockResult, bucketSize float64) []string { +func (c *blockResultColumn) getBucketedValues(br *blockResult, bucketSize, bucketOffset float64) []string { if bucketSize <= 0 { return c.getValues(br) } - if values := c.bucketedValues; values != nil && c.bucketSize == bucketSize { + if values := c.bucketedValues; values != nil && c.bucketSize == bucketSize && c.bucketOffset == bucketOffset { return values } - c.bucketedValues = br.getBucketedColumnValues(c, bucketSize) + c.bucketedValues = br.getBucketedColumnValues(c, bucketSize, bucketOffset) c.bucketSize = bucketSize + c.bucketOffset = bucketOffset return c.bucketedValues } @@ -1159,7 +1184,7 @@ func (c *blockResultColumn) getValues(br *blockResult) []string { return values } - c.values = br.getBucketedColumnValues(c, 0) + c.values = br.getBucketedColumnValues(c, 0, 0) return c.values } diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 25f2c5688..7df246af9 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -865,6 +865,8 @@ func TestParseQuerySuccess(t *testing.T) { // stats pipe with grouping buckets f(`* | stats by(_time:1d, response_size:1_000KiB, request_duration:5s, foo) count() as foo`, `* | stats by (_time:1d, response_size:1_000KiB, request_duration:5s, foo) count() as foo`) f(`*|stats by(client_ip:/24, server_ip:/16) count() foo`, `* | stats by (client_ip:/24, server_ip:/16) count() as foo`) + f(`* | stats by(_time:1d offset 2h) count() as foo`, `* | stats by (_time:1d offset 2h) count() as foo`) + f(`* | stats by(_time:1d offset -2.5h5m) count() as foo`, `* | stats by (_time:1d offset -2.5h5m) count() as foo`) // multiple different pipes f(`* | fields foo, bar | head 100 | stats by(foo,bar) count(baz) as qwert`, `* | fields foo, bar | head 100 | stats by (foo, bar) count(baz) as qwert`) @@ -1137,6 +1139,9 @@ func TestParseQueryFailure(t *testing.T) { // invalid grouping fields f(`foo | stats by(foo:bar) count() baz`) f(`foo | stats by(foo:/bar) count() baz`) + f(`foo | stats by(foo:-1h) count() baz`) + f(`foo | stats by (foo:1h offset) count() baz`) + f(`foo | stats by (foo:1h offset bar) count() baz`) // invalid by clause f(`foo | stats by`) diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index 2b43bc3d9..37f67a99b 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -193,7 +193,7 @@ func (psp *pipeStatsProcessor) writeBlock(workerID uint, br *blockResult) { c := br.getColumnByName(bf.name) if c.isConst { // Fast path for column with constant value. - v := br.getBucketedValue(c.encodedValues[0], bf.bucketSize) + v := br.getBucketedValue(c.encodedValues[0], bf.bucketSize, bf.bucketOffset) shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(v)) for _, sfp := range shard.getStatsProcessors(shard.keyBuf) { shard.stateSizeBudget -= sfp.updateStatsForAllRows(br) @@ -201,7 +201,7 @@ func (psp *pipeStatsProcessor) writeBlock(workerID uint, br *blockResult) { return } - values := c.getBucketedValues(br, bf.bucketSize) + values := c.getBucketedValues(br, bf.bucketSize, bf.bucketOffset) if areConstValues(values) { // Fast path for column with constant values. shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(values[0])) @@ -231,7 +231,7 @@ func (psp *pipeStatsProcessor) writeBlock(workerID uint, br *blockResult) { columnValues := shard.columnValues[:0] for _, bf := range byFields { c := br.getColumnByName(bf.name) - values := c.getBucketedValues(br, bf.bucketSize) + values := c.getBucketedValues(br, bf.bucketSize, bf.bucketOffset) columnValues = append(columnValues, values) } shard.columnValues = columnValues @@ -509,14 +509,23 @@ type byField struct { // bucketSizeStr is string representation of the bucket size bucketSizeStr string - // bucketSize is the bucket for grouping the given field values with value/bucketSize calculations. + // bucketSize is the bucket for grouping the given field values with value/bucketSize calculations bucketSize float64 + + // bucketOffsetStr is string representation of the offset for bucketSize + bucketOffsetStr string + + // bucketOffset is the offset for bucketSize + bucketOffset float64 } func (bf *byField) String() string { s := quoteTokenIfNeeded(bf.name) if bf.bucketSizeStr != "" { s += ":" + bf.bucketSizeStr + if bf.bucketOffsetStr != "" { + s += " offset " + bf.bucketOffsetStr + } } return s } @@ -545,6 +554,7 @@ func parseByFields(lex *lexer) ([]*byField, error) { name: fieldName, } if lex.isKeyword(":") { + // Parse bucket size lex.nextToken() bucketSizeStr := lex.token lex.nextToken() @@ -556,11 +566,25 @@ func parseByFields(lex *lexer) ([]*byField, error) { if !ok { return nil, fmt.Errorf("cannot parse bucket size for field %q: %q", fieldName, bucketSizeStr) } - if bucketSize < 0 { - return nil, fmt.Errorf("bucketSize for the field %q cannot be negative; got %q", fieldName, bucketSizeStr) - } bf.bucketSizeStr = bucketSizeStr bf.bucketSize = bucketSize + + // Parse bucket offset + if lex.isKeyword("offset") { + lex.nextToken() + bucketOffsetStr := lex.token + lex.nextToken() + if bucketOffsetStr == "-" { + bucketOffsetStr += lex.token + lex.nextToken() + } + bucketOffset, ok := tryParseBucketOffset(bucketOffsetStr) + if !ok { + return nil, fmt.Errorf("cannot parse bucket offset for field %q: %q", fieldName, bucketOffsetStr) + } + bf.bucketOffsetStr = bucketOffsetStr + bf.bucketOffset = bucketOffset + } } bfs = append(bfs, bf) switch { @@ -574,11 +598,36 @@ func parseByFields(lex *lexer) ([]*byField, error) { } } +// tryParseBucketOffset tries parsing bucket offset, which can have the following formats: +// +// - integer number: 12345 +// - floating-point number: 1.2345 +// - duration: 1.5s - it is converted to nanoseconds +// - bytes: 1.5KiB +func tryParseBucketOffset(s string) (float64, bool) { + // Try parsing s as floating point number + if f, ok := tryParseFloat64(s); ok { + return f, true + } + + // Try parsing s as duration (1s, 5m, etc.) + if nsecs, ok := tryParseDuration(s); ok { + return float64(nsecs), true + } + + // Try parsing s as bytes (KiB, MB, etc.) + if n, ok := tryParseBytes(s); ok { + return float64(n), true + } + + return 0, false +} + // tryParseBucketSize tries parsing bucket size, which can have the following formats: // // - integer number: 12345 // - floating-point number: 1.2345 -// - duration: 1.5s - it is converted to the number of nanoseconds +// - duration: 1.5s - it is converted to nanoseconds // - bytes: 1.5KiB // - ipv4 mask: /24 func tryParseBucketSize(s string) (float64, bool) { diff --git a/lib/logstorage/pipe_stats_test.go b/lib/logstorage/pipe_stats_test.go index 1b409bd62..eba8f2e4b 100644 --- a/lib/logstorage/pipe_stats_test.go +++ b/lib/logstorage/pipe_stats_test.go @@ -41,6 +41,13 @@ func TestTryParseBucketSize_Success(t *testing.T) { f("5.5KiB", 5.5*(1<<10)) f("10MB500KB10B", 10*1_000_000+500*1_000+10) f("10m0k", 10*1_000_000) + f("-10MB", -10*1_000_000) + + // ipv4 mask + f("/0", 1<<32) + f("/32", 1) + f("/16", 1<<16) + f("/8", 1<<24) } func TestTryParseBucketSize_Failure(t *testing.T) { @@ -55,7 +62,58 @@ func TestTryParseBucketSize_Failure(t *testing.T) { f("") f("foo") - - // negative bytes are forbidden - f("-10MB") +} + +func TestTryParseBucketOffset_Success(t *testing.T) { + f := func(s string, resultExpected float64) { + t.Helper() + + result, ok := tryParseBucketOffset(s) + if !ok { + t.Fatalf("cannot parse %q", s) + } + if result != resultExpected { + t.Fatalf("unexpected result; got %f; want %f", result, resultExpected) + } + } + + // integers + f("0", 0) + f("123", 123) + f("1_234_678", 1234678) + f("-1_234_678", -1234678) + + // floating-point numbers + f("0.0", 0) + f("123.435", 123.435) + f("1_000.433_344", 1000.433344) + f("-1_000.433_344", -1000.433344) + + // durations + f("5m", 5*nsecsPerMinute) + f("1h5m3.5s", nsecsPerHour+5*nsecsPerMinute+3.5*nsecsPerSecond) + f("-1h5m3.5s", -(nsecsPerHour + 5*nsecsPerMinute + 3.5*nsecsPerSecond)) + + // bytes + f("1b", 1) + f("1k", 1_000) + f("1Kb", 1_000) + f("5.5KiB", 5.5*(1<<10)) + f("10MB500KB10B", 10*1_000_000+500*1_000+10) + f("10m0k", 10*1_000_000) + f("-10mb", -10*1_000_000) +} + +func TestTryParseBucketOffset_Failure(t *testing.T) { + f := func(s string) { + t.Helper() + + _, ok := tryParseBucketOffset(s) + if ok { + t.Fatalf("expecting error when parsing %q", s) + } + } + + f("") + f("foo") } diff --git a/lib/logstorage/values_encoder.go b/lib/logstorage/values_encoder.go index 50cb68760..d5f65b037 100644 --- a/lib/logstorage/values_encoder.go +++ b/lib/logstorage/values_encoder.go @@ -719,6 +719,12 @@ func tryParseBytes(s string) (int64, bool) { if len(s) == 0 { return 0, false } + + isMinus := s[0] == '-' + if isMinus { + s = s[1:] + } + n := int64(0) for len(s) > 0 { f, ok, tail := tryParseFloat64Prefix(s) @@ -812,6 +818,10 @@ func tryParseBytes(s string) (int64, bool) { continue } } + + if isMinus { + n = -n + } return n, true } @@ -973,7 +983,7 @@ func marshalDuration(dst []byte, nsecs int64) []byte { } const ( - nsecsPerYear = 365 * 24 * 3600 * 1e9 + nsecsPerYear = 365 * 24 * 3600 * 1e9 nsecsPerWeek = 7 * 24 * 3600 * 1e9 nsecsPerDay = 24 * 3600 * 1e9 nsecsPerHour = 3600 * 1e9