diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index ece64fba2..f7c761b36 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1280,6 +1280,19 @@ over the last 5 minutes: _time:5m | stats by (_time:1m) count() logs_total, count_uniq(ip) ips_total ``` +Additionally, the following `step` values are supported: + +- `nanosecond` - equals to `1ns` [duration](#duration-values). +- `microsecond` - equals to `1µs` [duration](#duration-values). +- `millisecond` - equals to `1ms` [duration](#duration-values). +- `second` - equals to `1s` [duration](#duration-values). +- `minute` - equals to `1m` [duration](#duration-values). +- `hour` - equalst to `1h` [duration](#duration-values). +- `day` - equals to `1d` [duration](#duration-values). +- `week` - equals to `1w` [duration](#duration-values). +- `month` - equals to one month. It properly takes into account the number of days per each month. +- `year` - equals to one year. It properly takes into account the number of days per each year. + #### Stats by time buckets with timezone offset VictoriaLogs stores [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) values as [Unix time](https://en.wikipedia.org/wiki/Unix_time) diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index 94071eaff..7126f485a 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "math" "slices" + "time" "unsafe" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -443,40 +444,40 @@ func (br *blockResult) addConstColumn(name, value string) { br.csInitialized = false } -func (br *blockResult) getBucketedColumnValues(c *blockResultColumn, bucketSize, bucketOffset float64) []string { +func (br *blockResult) getBucketedColumnValues(c *blockResultColumn, bf *byStatsField) []string { if c.isConst { - return br.getBucketedConstValues(c.encodedValues[0], bucketSize, bucketOffset) + return br.getBucketedConstValues(c.encodedValues[0], bf) } if c.isTime { - return br.getBucketedTimestampValues(bucketSize, bucketOffset) + return br.getBucketedTimestampValues(bf) } switch c.valueType { case valueTypeString: - return br.getBucketedStringValues(c.encodedValues, bucketSize, bucketOffset) + return br.getBucketedStringValues(c.encodedValues, bf) case valueTypeDict: - return br.getBucketedDictValues(c.encodedValues, c.dictValues, bucketSize, bucketOffset) + return br.getBucketedDictValues(c.encodedValues, c.dictValues, bf) case valueTypeUint8: - return br.getBucketedUint8Values(c.encodedValues, bucketSize, bucketOffset) + return br.getBucketedUint8Values(c.encodedValues, bf) case valueTypeUint16: - return br.getBucketedUint16Values(c.encodedValues, bucketSize, bucketOffset) + return br.getBucketedUint16Values(c.encodedValues, bf) case valueTypeUint32: - return br.getBucketedUint32Values(c.encodedValues, bucketSize, bucketOffset) + return br.getBucketedUint32Values(c.encodedValues, bf) case valueTypeUint64: - return br.getBucketedUint64Values(c.encodedValues, bucketSize, bucketOffset) + return br.getBucketedUint64Values(c.encodedValues, bf) case valueTypeFloat64: - return br.getBucketedFloat64Values(c.encodedValues, bucketSize, bucketOffset) + return br.getBucketedFloat64Values(c.encodedValues, bf) case valueTypeIPv4: - return br.getBucketedIPv4Values(c.encodedValues, bucketSize, bucketOffset) + return br.getBucketedIPv4Values(c.encodedValues, bf) case valueTypeTimestampISO8601: - return br.getBucketedTimestampISO8601Values(c.encodedValues, bucketSize, bucketOffset) + return br.getBucketedTimestampISO8601Values(c.encodedValues, bf) default: logger.Panicf("BUG: unknown valueType=%d", c.valueType) return nil } } -func (br *blockResult) getBucketedConstValues(v string, bucketSize, bucketOffset float64) []string { +func (br *blockResult) getBucketedConstValues(v string, bf *byStatsField) []string { if v == "" { // Fast path - return a slice of empty strings without constructing the slice. return getEmptyStrings(len(br.timestamps)) @@ -487,7 +488,7 @@ func (br *blockResult) getBucketedConstValues(v string, bucketSize, bucketOffset valuesBuf := br.valuesBuf valuesBufLen := len(valuesBuf) - v = br.getBucketedValue(v, bucketSize, bucketOffset) + v = br.getBucketedValue(v, bf) for range br.timestamps { valuesBuf = append(valuesBuf, v) } @@ -497,7 +498,7 @@ func (br *blockResult) getBucketedConstValues(v string, bucketSize, bucketOffset return valuesBuf[valuesBufLen:] } -func (br *blockResult) getBucketedTimestampValues(bucketSize, bucketOffset float64) []string { +func (br *blockResult) getBucketedTimestampValues(bf *byStatsField) []string { buf := br.buf valuesBuf := br.valuesBuf valuesBufLen := len(valuesBuf) @@ -505,7 +506,7 @@ func (br *blockResult) getBucketedTimestampValues(bucketSize, bucketOffset float timestamps := br.timestamps var s string - if bucketSize <= 1 && bucketOffset == 0 { + if !bf.hasBucketConfig() { for i := range timestamps { if i > 0 && timestamps[i-1] == timestamps[i] { valuesBuf = append(valuesBuf, s) @@ -518,9 +519,11 @@ func (br *blockResult) getBucketedTimestampValues(bucketSize, bucketOffset float valuesBuf = append(valuesBuf, s) } } else { - bucketSizeInt := int64(bucketSize) - bucketOffsetInt := int64(bucketOffset) - var prevTimestamp int64 + bucketSizeInt := int64(bf.bucketSize) + if bucketSizeInt <= 0 { + bucketSizeInt = 1 + } + bucketOffsetInt := int64(bf.bucketOffset) for i := range timestamps { if i > 0 && timestamps[i-1] == timestamps[i] { valuesBuf = append(valuesBuf, s) @@ -529,14 +532,15 @@ func (br *blockResult) getBucketedTimestampValues(bucketSize, bucketOffset float timestamp := timestamps[i] timestamp -= bucketOffsetInt - timestamp -= timestamp % bucketSizeInt - timestamp += bucketOffsetInt - if i > 0 && timestamp == prevTimestamp { - valuesBuf = append(valuesBuf, s) - continue + if bf.bucketSizeStr == "month" { + timestamp = truncateTimestampToMonth(timestamp) + } else if bf.bucketSizeStr == "year" { + timestamp = truncateTimestampToYear(timestamp) + } else { + timestamp -= timestamp % bucketSizeInt } + timestamp += bucketOffsetInt - prevTimestamp = timestamp bufLen := len(buf) buf = marshalTimestampRFC3339Nano(buf, timestamp) s = bytesutil.ToUnsafeString(buf[bufLen:]) @@ -550,8 +554,8 @@ func (br *blockResult) getBucketedTimestampValues(bucketSize, bucketOffset float return valuesBuf[valuesBufLen:] } -func (br *blockResult) getBucketedStringValues(values []string, bucketSize, bucketOffset float64) []string { - if bucketSize <= 0 && bucketOffset == 0 { +func (br *blockResult) getBucketedStringValues(values []string, bf *byStatsField) []string { + if !bf.hasBucketConfig() { return values } @@ -565,7 +569,7 @@ func (br *blockResult) getBucketedStringValues(values []string, bucketSize, buck continue } - s = br.getBucketedValue(values[i], bucketSize, bucketOffset) + s = br.getBucketedValue(values[i], bf) valuesBuf = append(valuesBuf, s) } @@ -574,11 +578,11 @@ func (br *blockResult) getBucketedStringValues(values []string, bucketSize, buck return valuesBuf[valuesBufLen:] } -func (br *blockResult) getBucketedDictValues(encodedValues, dictValues []string, bucketSize, bucketOffset float64) []string { +func (br *blockResult) getBucketedDictValues(encodedValues, dictValues []string, bf *byStatsField) []string { valuesBuf := br.valuesBuf valuesBufLen := len(valuesBuf) - dictValues = br.getBucketedStringValues(dictValues, bucketSize, bucketOffset) + dictValues = br.getBucketedStringValues(dictValues, bf) for _, v := range encodedValues { dictIdx := v[0] valuesBuf = append(valuesBuf, dictValues[dictIdx]) @@ -589,14 +593,14 @@ func (br *blockResult) getBucketedDictValues(encodedValues, dictValues []string, return valuesBuf[valuesBufLen:] } -func (br *blockResult) getBucketedUint8Values(encodedValues []string, bucketSize, bucketOffset float64) []string { +func (br *blockResult) getBucketedUint8Values(encodedValues []string, bf *byStatsField) []string { buf := br.buf valuesBuf := br.valuesBuf valuesBufLen := len(valuesBuf) var s string - if (bucketSize <= 1 || bucketSize >= (1<<8)) && bucketOffset == 0 { + if !bf.hasBucketConfig() { for i, v := range encodedValues { if i > 0 && encodedValues[i-1] == encodedValues[i] { valuesBuf = append(valuesBuf, s) @@ -610,9 +614,12 @@ func (br *blockResult) getBucketedUint8Values(encodedValues []string, bucketSize valuesBuf = append(valuesBuf, s) } } else { - bucketSizeInt := uint64(bucketSize) - bucketOffsetInt := uint64(int64(bucketOffset)) - var nPrev uint64 + bucketSizeInt := uint64(bf.bucketSize) + if bucketSizeInt <= 0 { + bucketSizeInt = 1 + } + bucketOffsetInt := uint64(int64(bf.bucketOffset)) + for i, v := range encodedValues { if i > 0 && encodedValues[i-1] == encodedValues[i] { valuesBuf = append(valuesBuf, s) @@ -623,12 +630,7 @@ func (br *blockResult) getBucketedUint8Values(encodedValues []string, bucketSize n -= bucketOffsetInt n -= n % bucketSizeInt n += bucketOffsetInt - if i > 0 && n == nPrev { - valuesBuf = append(valuesBuf, s) - continue - } - nPrev = n bufLen := len(buf) buf = marshalUint64(buf, n) s = bytesutil.ToUnsafeString(buf[bufLen:]) @@ -642,14 +644,14 @@ func (br *blockResult) getBucketedUint8Values(encodedValues []string, bucketSize return br.valuesBuf[valuesBufLen:] } -func (br *blockResult) getBucketedUint16Values(encodedValues []string, bucketSize, bucketOffset float64) []string { +func (br *blockResult) getBucketedUint16Values(encodedValues []string, bf *byStatsField) []string { buf := br.buf valuesBuf := br.valuesBuf valuesBufLen := len(valuesBuf) var s string - if (bucketSize <= 1 || bucketSize >= (1<<16)) && bucketOffset == 0 { + if !bf.hasBucketConfig() { for i, v := range encodedValues { if i > 0 && encodedValues[i-1] == encodedValues[i] { valuesBuf = append(valuesBuf, s) @@ -664,9 +666,12 @@ func (br *blockResult) getBucketedUint16Values(encodedValues []string, bucketSiz valuesBuf = append(valuesBuf, s) } } else { - bucketSizeInt := uint64(bucketSize) - bucketOffsetInt := uint64(int64(bucketOffset)) - var nPrev uint64 + bucketSizeInt := uint64(bf.bucketSize) + if bucketSizeInt <= 0 { + bucketSizeInt = 1 + } + bucketOffsetInt := uint64(int64(bf.bucketOffset)) + for i, v := range encodedValues { if i > 0 && encodedValues[i-1] == encodedValues[i] { valuesBuf = append(valuesBuf, s) @@ -678,12 +683,7 @@ func (br *blockResult) getBucketedUint16Values(encodedValues []string, bucketSiz n -= bucketOffsetInt n -= n % bucketSizeInt n += bucketOffsetInt - if i > 0 && n == nPrev { - valuesBuf = append(valuesBuf, s) - continue - } - nPrev = n bufLen := len(buf) buf = marshalUint64(buf, n) s = bytesutil.ToUnsafeString(buf[bufLen:]) @@ -697,14 +697,14 @@ func (br *blockResult) getBucketedUint16Values(encodedValues []string, bucketSiz return br.valuesBuf[valuesBufLen:] } -func (br *blockResult) getBucketedUint32Values(encodedValues []string, bucketSize, bucketOffset float64) []string { +func (br *blockResult) getBucketedUint32Values(encodedValues []string, bf *byStatsField) []string { buf := br.buf valuesBuf := br.valuesBuf valuesBufLen := len(valuesBuf) var s string - if (bucketSize <= 1 || bucketSize >= (1<<32)) && bucketOffset == 0 { + if !bf.hasBucketConfig() { for i, v := range encodedValues { if i > 0 && encodedValues[i-1] == encodedValues[i] { valuesBuf = append(valuesBuf, s) @@ -719,9 +719,12 @@ func (br *blockResult) getBucketedUint32Values(encodedValues []string, bucketSiz valuesBuf = append(valuesBuf, s) } } else { - bucketSizeInt := uint64(bucketSize) - bucketOffsetInt := uint64(int64(bucketOffset)) - var nPrev uint64 + bucketSizeInt := uint64(bf.bucketSize) + if bucketSizeInt <= 0 { + bucketSizeInt = 1 + } + bucketOffsetInt := uint64(int64(bf.bucketOffset)) + for i, v := range encodedValues { if i > 0 && encodedValues[i-1] == encodedValues[i] { valuesBuf = append(valuesBuf, s) @@ -733,12 +736,7 @@ func (br *blockResult) getBucketedUint32Values(encodedValues []string, bucketSiz n -= bucketOffsetInt n -= n % bucketSizeInt n += bucketOffsetInt - if i > 0 && n == nPrev { - valuesBuf = append(valuesBuf, s) - continue - } - nPrev = n bufLen := len(buf) buf = marshalUint64(buf, n) s = bytesutil.ToUnsafeString(buf[bufLen:]) @@ -752,14 +750,14 @@ func (br *blockResult) getBucketedUint32Values(encodedValues []string, bucketSiz return br.valuesBuf[valuesBufLen:] } -func (br *blockResult) getBucketedUint64Values(encodedValues []string, bucketSize, bucketOffset float64) []string { +func (br *blockResult) getBucketedUint64Values(encodedValues []string, bf *byStatsField) []string { buf := br.buf valuesBuf := br.valuesBuf valuesBufLen := len(valuesBuf) var s string - if (bucketSize <= 1 || bucketSize >= (1<<64)) && bucketOffset == 0 { + if !bf.hasBucketConfig() { for i, v := range encodedValues { if i > 0 && encodedValues[i-1] == encodedValues[i] { valuesBuf = append(valuesBuf, s) @@ -774,9 +772,12 @@ func (br *blockResult) getBucketedUint64Values(encodedValues []string, bucketSiz valuesBuf = append(valuesBuf, s) } } else { - bucketSizeInt := uint64(bucketSize) - bucketOffsetInt := uint64(int64(bucketOffset)) - var nPrev uint64 + bucketSizeInt := uint64(bf.bucketSize) + if bucketSizeInt <= 0 { + bucketSizeInt = 1 + } + bucketOffsetInt := uint64(int64(bf.bucketOffset)) + for i, v := range encodedValues { if i > 0 && encodedValues[i-1] == encodedValues[i] { valuesBuf = append(valuesBuf, s) @@ -788,12 +789,7 @@ func (br *blockResult) getBucketedUint64Values(encodedValues []string, bucketSiz n -= bucketOffsetInt n -= n % bucketSizeInt n += bucketOffsetInt - if i > 0 && n == nPrev { - valuesBuf = append(valuesBuf, s) - continue - } - nPrev = n bufLen := len(buf) buf = marshalUint64(buf, n) s = bytesutil.ToUnsafeString(buf[bufLen:]) @@ -807,14 +803,14 @@ func (br *blockResult) getBucketedUint64Values(encodedValues []string, bucketSiz return br.valuesBuf[valuesBufLen:] } -func (br *blockResult) getBucketedFloat64Values(encodedValues []string, bucketSize, bucketOffset float64) []string { +func (br *blockResult) getBucketedFloat64Values(encodedValues []string, bf *byStatsField) []string { buf := br.buf valuesBuf := br.valuesBuf valuesBufLen := len(valuesBuf) var s string - if bucketSize <= 0 && bucketOffset == 0 { + if !bf.hasBucketConfig() { for i, v := range encodedValues { if i > 0 && encodedValues[i-1] == encodedValues[i] { valuesBuf = append(valuesBuf, s) @@ -831,10 +827,14 @@ func (br *blockResult) getBucketedFloat64Values(encodedValues []string, bucketSi valuesBuf = append(valuesBuf, s) } } else { + bucketSize := bf.bucketSize + if bucketSize <= 0 { + bucketSize = 1 + } + _, e := decimal.FromFloat(bucketSize) p10 := math.Pow10(int(-e)) bucketSizeP10 := int64(bucketSize * p10) - var fPrev float64 for i, v := range encodedValues { if i > 0 && encodedValues[i-1] == encodedValues[i] { valuesBuf = append(valuesBuf, s) @@ -845,21 +845,15 @@ func (br *blockResult) getBucketedFloat64Values(encodedValues []string, bucketSi n := encoding.UnmarshalUint64(b) f := math.Float64frombits(n) - f -= bucketOffset + f -= bf.bucketOffset // emulate f % bucketSize for float64 values fP10 := int64(f * p10) fP10 -= fP10 % bucketSizeP10 f = float64(fP10) / p10 - f += bucketOffset + f += bf.bucketOffset - if i > 0 && f == fPrev { - valuesBuf = append(valuesBuf, s) - continue - } - - fPrev = f bufLen := len(buf) buf = marshalFloat64(buf, f) s = bytesutil.ToUnsafeString(buf[bufLen:]) @@ -873,14 +867,14 @@ func (br *blockResult) getBucketedFloat64Values(encodedValues []string, bucketSi return br.valuesBuf[valuesBufLen:] } -func (br *blockResult) getBucketedIPv4Values(encodedValues []string, bucketSize, bucketOffset float64) []string { +func (br *blockResult) getBucketedIPv4Values(encodedValues []string, bf *byStatsField) []string { buf := br.buf valuesBuf := br.valuesBuf valuesBufLen := len(valuesBuf) var s string - if bucketSize <= 1 && bucketOffset == 0 { + if !bf.hasBucketConfig() { for i, v := range encodedValues { if i > 0 && encodedValues[i-1] == encodedValues[i] { valuesBuf = append(valuesBuf, s) @@ -893,9 +887,12 @@ func (br *blockResult) getBucketedIPv4Values(encodedValues []string, bucketSize, valuesBuf = append(valuesBuf, s) } } else { - bucketSizeInt := uint32(bucketSize) - bucketOffsetInt := uint32(int32(bucketOffset)) - var nPrev uint32 + bucketSizeInt := uint32(bf.bucketSize) + if bucketSizeInt <= 0 { + bucketSizeInt = 1 + } + bucketOffsetInt := uint32(int32(bf.bucketOffset)) + for i, v := range encodedValues { if i > 0 && encodedValues[i-1] == encodedValues[i] { valuesBuf = append(valuesBuf, s) @@ -907,12 +904,7 @@ func (br *blockResult) getBucketedIPv4Values(encodedValues []string, bucketSize, n -= bucketOffsetInt n -= n % bucketSizeInt n += bucketOffsetInt - if i > 0 && n == nPrev { - valuesBuf = append(valuesBuf, s) - continue - } - nPrev = n bufLen := len(buf) buf = marshalIPv4(buf, n) s = bytesutil.ToUnsafeString(buf[bufLen:]) @@ -926,14 +918,14 @@ func (br *blockResult) getBucketedIPv4Values(encodedValues []string, bucketSize, return valuesBuf[valuesBufLen:] } -func (br *blockResult) getBucketedTimestampISO8601Values(encodedValues []string, bucketSize, bucketOffset float64) []string { +func (br *blockResult) getBucketedTimestampISO8601Values(encodedValues []string, bf *byStatsField) []string { buf := br.buf valuesBuf := br.valuesBuf valuesBufLen := len(valuesBuf) var s string - if bucketSize <= 1 && bucketOffset == 0 { + if !bf.hasBucketConfig() { for i, v := range encodedValues { if i > 0 && encodedValues[i-1] == encodedValues[i] { valuesBuf = append(valuesBuf, s) @@ -949,9 +941,12 @@ func (br *blockResult) getBucketedTimestampISO8601Values(encodedValues []string, valuesBuf = append(valuesBuf, s) } } else { - bucketSizeInt := uint64(bucketSize) - bucketOffsetInt := uint64(int64(bucketOffset)) - var nPrev uint64 + bucketSizeInt := int64(bf.bucketSize) + if bucketSizeInt <= 0 { + bucketSizeInt = 1 + } + bucketOffsetInt := int64(bf.bucketOffset) + bb := bbPool.Get() for i, v := range encodedValues { if i > 0 && encodedValues[i-1] == encodedValues[i] { @@ -960,18 +955,20 @@ func (br *blockResult) getBucketedTimestampISO8601Values(encodedValues []string, } b := bytesutil.ToUnsafeBytes(v) - n := encoding.UnmarshalUint64(b) - n -= bucketOffsetInt - n -= n % bucketSizeInt - n += bucketOffsetInt - if i > 0 && n == nPrev { - valuesBuf = append(valuesBuf, s) - continue + timestamp := int64(encoding.UnmarshalUint64(b)) + timestamp -= bucketOffsetInt + if bf.bucketSizeStr == "month" { + timestamp = truncateTimestampToMonth(timestamp) + } else if bf.bucketSizeStr == "year" { + timestamp = truncateTimestampToYear(timestamp) + } else { + timestamp -= timestamp % bucketSizeInt } + timestamp -= timestamp % bucketSizeInt + timestamp += bucketOffsetInt - nPrev = n bufLen := len(buf) - buf = marshalTimestampISO8601(buf, int64(n)) + buf = marshalTimestampISO8601(buf, int64(timestamp)) s = bytesutil.ToUnsafeString(buf[bufLen:]) valuesBuf = append(valuesBuf, s) } @@ -984,9 +981,9 @@ func (br *blockResult) getBucketedTimestampISO8601Values(encodedValues []string, return valuesBuf[valuesBufLen:] } -// getBucketedValue returns bucketed s according to the given bucketSize and bucketOffset -func (br *blockResult) getBucketedValue(s string, bucketSize, bucketOffset float64) string { - if bucketSize <= 0 && bucketOffset == 0 { +// getBucketedValue returns bucketed s according to the given bf +func (br *blockResult) getBucketedValue(s string, bf *byStatsField) string { + if !bf.hasBucketConfig() { return s } if len(s) == 0 { @@ -1000,7 +997,12 @@ func (br *blockResult) getBucketedValue(s string, bucketSize, bucketOffset float } if f, ok := tryParseFloat64(s); ok { - f -= bucketOffset + bucketSize := bf.bucketSize + if bucketSize <= 0 { + bucketSize = 1 + } + + f -= bf.bucketOffset // emulate f % bucketSize for float64 values _, e := decimal.FromFloat(bucketSize) @@ -1009,44 +1011,84 @@ func (br *blockResult) getBucketedValue(s string, bucketSize, bucketOffset float fP10 -= fP10 % int64(bucketSize*p10) f = float64(fP10) / p10 - f += bucketOffset + f += bf.bucketOffset bufLen := len(br.buf) br.buf = marshalFloat64(br.buf, f) return bytesutil.ToUnsafeString(br.buf[bufLen:]) } - if nsecs, ok := tryParseTimestampISO8601(s); ok { - nsecs -= int64(bucketOffset) - nsecs -= nsecs % int64(bucketSize) - nsecs += int64(bucketOffset) + if timestamp, ok := tryParseTimestampISO8601(s); ok { + bucketSizeInt := int64(bf.bucketSize) + if bucketSizeInt <= 0 { + bucketSizeInt = 1 + } + bucketOffset := int64(bf.bucketOffset) + + timestamp -= bucketOffset + if bf.bucketSizeStr == "month" { + timestamp = truncateTimestampToMonth(timestamp) + } else if bf.bucketSizeStr == "year" { + timestamp = truncateTimestampToYear(timestamp) + } else { + timestamp -= timestamp % bucketSizeInt + } + timestamp += bucketOffset + bufLen := len(br.buf) - br.buf = marshalTimestampISO8601(br.buf, nsecs) + br.buf = marshalTimestampISO8601(br.buf, timestamp) return bytesutil.ToUnsafeString(br.buf[bufLen:]) } - if nsecs, ok := tryParseTimestampRFC3339Nano(s); ok { - nsecs -= int64(bucketOffset) - nsecs -= nsecs % int64(bucketSize) - nsecs += int64(bucketOffset) + if timestamp, ok := tryParseTimestampRFC3339Nano(s); ok { + bucketSizeInt := int64(bf.bucketSize) + if bucketSizeInt <= 0 { + bucketSizeInt = 1 + } + bucketOffset := int64(bf.bucketOffset) + + timestamp -= bucketOffset + if bf.bucketSizeStr == "month" { + timestamp = truncateTimestampToMonth(timestamp) + } else if bf.bucketSizeStr == "year" { + timestamp = truncateTimestampToYear(timestamp) + } else { + timestamp -= timestamp % bucketSizeInt + } + timestamp += bucketOffset + bufLen := len(br.buf) - br.buf = marshalTimestampRFC3339Nano(br.buf, nsecs) + br.buf = marshalTimestampRFC3339Nano(br.buf, timestamp) return bytesutil.ToUnsafeString(br.buf[bufLen:]) } if n, ok := tryParseIPv4(s); ok { - n -= uint32(int32(bucketOffset)) - n -= n % uint32(bucketSize) - n += uint32(int32(bucketOffset)) + bucketSizeInt := uint32(bf.bucketSize) + if bucketSizeInt <= 0 { + bucketSizeInt = 1 + } + bucketOffset := uint32(int32(bf.bucketOffset)) + + n -= bucketOffset + n -= n % bucketSizeInt + n += bucketOffset + bufLen := len(br.buf) br.buf = marshalIPv4(br.buf, n) return bytesutil.ToUnsafeString(br.buf[bufLen:]) } if nsecs, ok := tryParseDuration(s); ok { - nsecs -= int64(bucketOffset) - nsecs -= nsecs % int64(bucketSize) - nsecs += int64(bucketOffset) + bucketSizeInt := int64(bf.bucketSize) + if bucketSizeInt <= 0 { + bucketSizeInt = 1 + } + bucketOffset := int64(bf.bucketOffset) + + nsecs -= bucketOffset + nsecs -= nsecs % bucketSizeInt + nsecs += bucketOffset + bufLen := len(br.buf) br.buf = marshalDuration(br.buf, nsecs) return bytesutil.ToUnsafeString(br.buf[bufLen:]) @@ -1261,11 +1303,11 @@ type blockResultColumn struct { // bucketedValues contains values after getBucketedValues() call bucketedValues []string - // bucketSize contains bucketSize for bucketedValues - bucketSize float64 + // bucketSizeStr contains bucketSizeStr for bucketedValues + bucketSizeStr string - // bucketOffset contains bucketOffset for bucketedValues - bucketOffset float64 + // bucketOffsetStr contains bucketOffset for bucketedValues + bucketOffsetStr string } // clone returns a clone of c backed by data from br. @@ -1279,8 +1321,8 @@ func (c *blockResultColumn) clone(br *blockResult) blockResultColumn { cNew.dictValues = br.cloneValues(c.dictValues) cNew.encodedValues = br.cloneValues(c.encodedValues) // do not copy c.values and c.bucketedValues - they should be re-created from scrach if needed - cNew.bucketSize = c.bucketSize - cNew.bucketOffset = c.bucketOffset + cNew.bucketSizeStr = c.bucketSizeStr + cNew.bucketOffsetStr = c.bucketOffsetStr return cNew } @@ -1331,20 +1373,20 @@ func (c *blockResultColumn) getValueAtRow(br *blockResult, rowIdx int) string { return values[rowIdx] } -// getValues returns values for the given column, bucketed according to bucketSize and bucketOffset. +// getValues returns values for the given column, bucketed according to bf. // // The returned values are valid until br.reset() is called. -func (c *blockResultColumn) getBucketedValues(br *blockResult, bucketSize, bucketOffset float64) []string { - if bucketSize <= 0 { +func (c *blockResultColumn) getBucketedValues(br *blockResult, bf *byStatsField) []string { + if !bf.hasBucketConfig() { return c.getValues(br) } - if values := c.bucketedValues; values != nil && c.bucketSize == bucketSize && c.bucketOffset == bucketOffset { + if values := c.bucketedValues; values != nil && c.bucketSizeStr == bf.bucketSizeStr && c.bucketOffsetStr == bf.bucketOffsetStr { return values } - c.bucketedValues = br.getBucketedColumnValues(c, bucketSize, bucketOffset) - c.bucketSize = bucketSize - c.bucketOffset = bucketOffset + c.bucketedValues = br.getBucketedColumnValues(c, bf) + c.bucketSizeStr = bf.bucketSizeStr + c.bucketOffsetStr = bf.bucketOffsetStr return c.bucketedValues } @@ -1356,7 +1398,7 @@ func (c *blockResultColumn) getValues(br *blockResult) []string { return values } - c.values = br.getBucketedColumnValues(c, 0, 0) + c.values = br.getBucketedColumnValues(c, zeroByStatsField) return c.values } @@ -1738,5 +1780,15 @@ func (rc *resultColumn) addValue(v string) { rc.values = append(values, bytesutil.ToUnsafeString(rc.buf[bufLen:])) } +func truncateTimestampToMonth(timestamp int64) int64 { + t := time.Unix(0, timestamp).UTC() + return time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, time.UTC).UnixNano() +} + +func truncateTimestampToYear(timestamp int64) int64 { + t := time.Unix(0, timestamp).UTC() + return time.Date(t.Year(), time.January, 1, 0, 0, 0, 0, time.UTC).UnixNano() +} + var nan = math.NaN() var inf = math.Inf(1) diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 6b01168db..5b535d750 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -927,6 +927,16 @@ func TestParseQuerySuccess(t *testing.T) { f(`*|stats by(client_ip:/24, server_ip:/16) count() foo`, `* | stats by (client_ip:/24, server_ip:/16) count(*) as foo`) f(`* | stats by(_time:1d offset 2h) count() as foo`, `* | stats by (_time:1d offset 2h) count(*) as foo`) f(`* | stats by(_time:1d offset -2.5h5m) count() as foo`, `* | stats by (_time:1d offset -2.5h5m) count(*) as foo`) + f(`* | stats by (_time:nanosecond) count() foo`, `* | stats by (_time:nanosecond) count(*) as foo`) + f(`* | stats by (_time:microsecond) count() foo`, `* | stats by (_time:microsecond) count(*) as foo`) + f(`* | stats by (_time:millisecond) count() foo`, `* | stats by (_time:millisecond) count(*) as foo`) + f(`* | stats by (_time:second) count() foo`, `* | stats by (_time:second) count(*) as foo`) + f(`* | stats by (_time:minute) count() foo`, `* | stats by (_time:minute) count(*) as foo`) + f(`* | stats by (_time:hour) count() foo`, `* | stats by (_time:hour) count(*) as foo`) + f(`* | stats by (_time:day) count() foo`, `* | stats by (_time:day) count(*) as foo`) + f(`* | stats by (_time:week) count() foo`, `* | stats by (_time:week) count(*) as foo`) + f(`* | stats by (_time:month) count() foo`, `* | stats by (_time:month) count(*) as foo`) + f(`* | stats by (_time:year offset 6.5h) count() foo`, `* | stats by (_time:year offset 6.5h) count(*) as foo`) // sort pipe f(`* | sort`, `* | sort`) diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index cf3115ed9..e1355c265 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -176,14 +176,14 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) { c := br.getColumnByName(bf.name) if c.isConst { // Fast path for column with constant value. - v := br.getBucketedValue(c.encodedValues[0], bf.bucketSize, bf.bucketOffset) + v := br.getBucketedValue(c.encodedValues[0], bf) shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(v)) psg := shard.getPipeStatsGroup(shard.keyBuf) shard.stateSizeBudget -= psg.updateStatsForAllRows(br) return } - values := c.getBucketedValues(br, bf.bucketSize, bf.bucketOffset) + values := c.getBucketedValues(br, bf) if areConstValues(values) { // Fast path for column with constant values. shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(values[0])) @@ -210,7 +210,7 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) { columnValues := shard.columnValues[:0] for _, bf := range byFields { c := br.getColumnByName(bf.name) - values := c.getBucketedValues(br, bf.bucketSize, bf.bucketOffset) + values := c.getBucketedValues(br, bf) columnValues = append(columnValues, values) } shard.columnValues = columnValues @@ -544,6 +544,8 @@ func parseResultName(lex *lexer) (string, error) { return resultName, nil } +var zeroByStatsField = &byStatsField{} + // byStatsField represents 'by (...)' part of the pipeStats. // // It can have either 'name' representation or 'name:bucket' or 'name:buket offset off' representation, @@ -576,6 +578,10 @@ func (bf *byStatsField) String() string { return s } +func (bf *byStatsField) hasBucketConfig() bool { + return len(bf.bucketSizeStr) > 0 || len(bf.bucketOffsetStr) > 0 +} + func parseByStatsFields(lex *lexer) ([]*byStatsField, error) { if !lex.isKeyword("(") { return nil, fmt.Errorf("missing `(`") @@ -603,12 +609,14 @@ func parseByStatsFields(lex *lexer) ([]*byStatsField, error) { bucketSizeStr += lex.token lex.nextToken() } - bucketSize, ok := tryParseBucketSize(bucketSizeStr) - if !ok { - return nil, fmt.Errorf("cannot parse bucket size for field %q: %q", fieldName, bucketSizeStr) + if bucketSizeStr != "year" && bucketSizeStr != "month" { + bucketSize, ok := tryParseBucketSize(bucketSizeStr) + if !ok { + return nil, fmt.Errorf("cannot parse bucket size for field %q: %q", fieldName, bucketSizeStr) + } + bf.bucketSize = bucketSize } bf.bucketSizeStr = bucketSizeStr - bf.bucketSize = bucketSize // Parse bucket offset if lex.isKeyword("offset") { @@ -672,6 +680,25 @@ func tryParseBucketOffset(s string) (float64, bool) { // - bytes: 1.5KiB // - ipv4 mask: /24 func tryParseBucketSize(s string) (float64, bool) { + switch s { + case "nanosecond": + return 1, true + case "microsecond": + return nsecsPerMicrosecond, true + case "millisecond": + return nsecsPerMillisecond, true + case "second": + return nsecsPerSecond, true + case "minute": + return nsecsPerMinute, true + case "hour": + return nsecsPerHour, true + case "day": + return nsecsPerDay, true + case "week": + return nsecsPerWeek, true + } + // Try parsing s as floating point number if f, ok := tryParseFloat64(s); ok { return f, true