From 639b3091b5a47bccb36d14d12f4ede8f73497b70 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 15 May 2024 15:46:42 +0200 Subject: [PATCH] wip --- lib/logstorage/block_result.go | 116 +++++------ lib/logstorage/filter_exact_prefix.go | 6 +- lib/logstorage/filter_ipv4_range.go | 11 +- lib/logstorage/filter_len_range.go | 15 +- lib/logstorage/filter_phrase.go | 21 +- lib/logstorage/filter_prefix.go | 26 +-- lib/logstorage/filter_range.go | 17 +- lib/logstorage/filter_regexp.go | 6 +- lib/logstorage/filter_sequence.go | 6 +- lib/logstorage/filter_string_range.go | 6 +- lib/logstorage/parser.go | 4 +- lib/logstorage/stats_max.go | 126 ++++++++++-- lib/logstorage/stats_min.go | 126 ++++++++++-- lib/logstorage/values_encoder.go | 199 +++++++++++-------- lib/logstorage/values_encoder_test.go | 92 +++++++-- lib/logstorage/values_encoder_timing_test.go | 44 +++- 16 files changed, 562 insertions(+), 259 deletions(-) diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index 90c88ce56..ed0899ef4 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -1,7 +1,6 @@ package logstorage import ( - "encoding/binary" "math" "slices" "sync/atomic" @@ -524,7 +523,7 @@ func (br *blockResult) getBucketedTimestampValues(bf *byStatsField) []string { } bufLen := len(buf) - buf = marshalTimestampRFC3339Nano(buf, timestamps[i]) + buf = marshalTimestampRFC3339NanoString(buf, timestamps[i]) s = bytesutil.ToUnsafeString(buf[bufLen:]) valuesBuf = append(valuesBuf, s) } @@ -560,7 +559,7 @@ func (br *blockResult) getBucketedTimestampValues(bf *byStatsField) []string { timestampPrev = timestamp bufLen := len(buf) - buf = marshalTimestampRFC3339Nano(buf, timestamp) + buf = marshalTimestampRFC3339NanoString(buf, timestamp) s = bytesutil.ToUnsafeString(buf[bufLen:]) valuesBuf = append(valuesBuf, s) } @@ -625,9 +624,9 @@ func (br *blockResult) getBucketedUint8Values(encodedValues []string, bf *byStat continue } - n := uint64(v[0]) + n := unmarshalUint8(v) bufLen := len(buf) - buf = marshalUint64(buf, n) + buf = marshalUint8String(buf, n) s = bytesutil.ToUnsafeString(buf[bufLen:]) valuesBuf = append(valuesBuf, s) } @@ -645,7 +644,7 @@ func (br *blockResult) getBucketedUint8Values(encodedValues []string, bf *byStat continue } - n := uint64(v[0]) + n := uint64(unmarshalUint8(v)) n -= bucketOffsetInt n -= n % bucketSizeInt n += bucketOffsetInt @@ -657,7 +656,7 @@ func (br *blockResult) getBucketedUint8Values(encodedValues []string, bf *byStat nPrev = n bufLen := len(buf) - buf = marshalUint64(buf, n) + buf = marshalUint64String(buf, n) s = bytesutil.ToUnsafeString(buf[bufLen:]) valuesBuf = append(valuesBuf, s) } @@ -683,10 +682,9 @@ func (br *blockResult) getBucketedUint16Values(encodedValues []string, bf *bySta continue } - b := bytesutil.ToUnsafeBytes(v) - n := uint64(encoding.UnmarshalUint16(b)) + n := unmarshalUint16(v) bufLen := len(buf) - buf = marshalUint64(buf, n) + buf = marshalUint16String(buf, n) s = bytesutil.ToUnsafeString(buf[bufLen:]) valuesBuf = append(valuesBuf, s) } @@ -704,8 +702,7 @@ func (br *blockResult) getBucketedUint16Values(encodedValues []string, bf *bySta continue } - b := bytesutil.ToUnsafeBytes(v) - n := uint64(encoding.UnmarshalUint16(b)) + n := uint64(unmarshalUint16(v)) n -= bucketOffsetInt n -= n % bucketSizeInt n += bucketOffsetInt @@ -717,7 +714,7 @@ func (br *blockResult) getBucketedUint16Values(encodedValues []string, bf *bySta nPrev = n bufLen := len(buf) - buf = marshalUint64(buf, n) + buf = marshalUint64String(buf, n) s = bytesutil.ToUnsafeString(buf[bufLen:]) valuesBuf = append(valuesBuf, s) } @@ -743,10 +740,9 @@ func (br *blockResult) getBucketedUint32Values(encodedValues []string, bf *bySta continue } - b := bytesutil.ToUnsafeBytes(v) - n := uint64(encoding.UnmarshalUint32(b)) + n := unmarshalUint32(v) bufLen := len(buf) - buf = marshalUint64(buf, n) + buf = marshalUint32String(buf, n) s = bytesutil.ToUnsafeString(buf[bufLen:]) valuesBuf = append(valuesBuf, s) } @@ -764,8 +760,7 @@ func (br *blockResult) getBucketedUint32Values(encodedValues []string, bf *bySta continue } - b := bytesutil.ToUnsafeBytes(v) - n := uint64(encoding.UnmarshalUint32(b)) + n := uint64(unmarshalUint32(v)) n -= bucketOffsetInt n -= n % bucketSizeInt n += bucketOffsetInt @@ -777,7 +772,7 @@ func (br *blockResult) getBucketedUint32Values(encodedValues []string, bf *bySta nPrev = n bufLen := len(buf) - buf = marshalUint64(buf, n) + buf = marshalUint64String(buf, n) s = bytesutil.ToUnsafeString(buf[bufLen:]) valuesBuf = append(valuesBuf, s) } @@ -803,10 +798,9 @@ func (br *blockResult) getBucketedUint64Values(encodedValues []string, bf *bySta continue } - b := bytesutil.ToUnsafeBytes(v) - n := encoding.UnmarshalUint64(b) + n := unmarshalUint64(v) bufLen := len(buf) - buf = marshalUint64(buf, n) + buf = marshalUint64String(buf, n) s = bytesutil.ToUnsafeString(buf[bufLen:]) valuesBuf = append(valuesBuf, s) } @@ -824,8 +818,7 @@ func (br *blockResult) getBucketedUint64Values(encodedValues []string, bf *bySta continue } - b := bytesutil.ToUnsafeBytes(v) - n := encoding.UnmarshalUint64(b) + n := unmarshalUint64(v) n -= bucketOffsetInt n -= n % bucketSizeInt n += bucketOffsetInt @@ -837,7 +830,7 @@ func (br *blockResult) getBucketedUint64Values(encodedValues []string, bf *bySta nPrev = n bufLen := len(buf) - buf = marshalUint64(buf, n) + buf = marshalUint64String(buf, n) s = bytesutil.ToUnsafeString(buf[bufLen:]) valuesBuf = append(valuesBuf, s) } @@ -863,12 +856,10 @@ func (br *blockResult) getBucketedFloat64Values(encodedValues []string, bf *bySt continue } - b := bytesutil.ToUnsafeBytes(v) - n := encoding.UnmarshalUint64(b) - f := math.Float64frombits(n) + f := unmarshalFloat64(v) bufLen := len(buf) - buf = marshalFloat64(buf, f) + buf = marshalFloat64String(buf, f) s = bytesutil.ToUnsafeString(buf[bufLen:]) valuesBuf = append(valuesBuf, s) } @@ -889,9 +880,7 @@ func (br *blockResult) getBucketedFloat64Values(encodedValues []string, bf *bySt continue } - b := bytesutil.ToUnsafeBytes(v) - n := encoding.UnmarshalUint64(b) - f := math.Float64frombits(n) + f := unmarshalFloat64(v) f -= bf.bucketOffset @@ -909,7 +898,7 @@ func (br *blockResult) getBucketedFloat64Values(encodedValues []string, bf *bySt fPrev = f bufLen := len(buf) - buf = marshalFloat64(buf, f) + buf = marshalFloat64String(buf, f) s = bytesutil.ToUnsafeString(buf[bufLen:]) valuesBuf = append(valuesBuf, s) } @@ -935,8 +924,9 @@ func (br *blockResult) getBucketedIPv4Values(encodedValues []string, bf *byStats continue } + ip := unmarshalIPv4(v) bufLen := len(buf) - buf = toIPv4String(buf, v) + buf = marshalIPv4String(buf, ip) s = bytesutil.ToUnsafeString(buf[bufLen:]) valuesBuf = append(valuesBuf, s) } @@ -954,8 +944,7 @@ func (br *blockResult) getBucketedIPv4Values(encodedValues []string, bf *byStats continue } - b := bytesutil.ToUnsafeBytes(v) - n := binary.BigEndian.Uint32(b) + n := unmarshalIPv4(v) n -= bucketOffsetInt n -= n % bucketSizeInt n += bucketOffsetInt @@ -967,7 +956,7 @@ func (br *blockResult) getBucketedIPv4Values(encodedValues []string, bf *byStats nPrev = n bufLen := len(buf) - buf = marshalIPv4(buf, n) + buf = marshalIPv4String(buf, n) s = bytesutil.ToUnsafeString(buf[bufLen:]) valuesBuf = append(valuesBuf, s) } @@ -993,11 +982,10 @@ func (br *blockResult) getBucketedTimestampISO8601Values(encodedValues []string, continue } - b := bytesutil.ToUnsafeBytes(v) - n := encoding.UnmarshalUint64(b) + n := unmarshalTimestampISO8601(v) bufLen := len(buf) - buf = marshalTimestampISO8601(buf, int64(n)) + buf = marshalTimestampISO8601String(buf, n) s = bytesutil.ToUnsafeString(buf[bufLen:]) valuesBuf = append(valuesBuf, s) } @@ -1016,8 +1004,7 @@ func (br *blockResult) getBucketedTimestampISO8601Values(encodedValues []string, continue } - b := bytesutil.ToUnsafeBytes(v) - timestamp := int64(encoding.UnmarshalUint64(b)) + timestamp := unmarshalTimestampISO8601(v) timestamp -= bucketOffsetInt if bf.bucketSizeStr == "month" { timestamp = truncateTimestampToMonth(timestamp) @@ -1036,7 +1023,7 @@ func (br *blockResult) getBucketedTimestampISO8601Values(encodedValues []string, timestampPrev = timestamp bufLen := len(buf) - buf = marshalTimestampISO8601(buf, int64(timestamp)) + buf = marshalTimestampISO8601String(buf, timestamp) s = bytesutil.ToUnsafeString(buf[bufLen:]) valuesBuf = append(valuesBuf, s) } @@ -1082,7 +1069,7 @@ func (br *blockResult) getBucketedValue(s string, bf *byStatsField) string { f += bf.bucketOffset bufLen := len(br.buf) - br.buf = marshalFloat64(br.buf, f) + br.buf = marshalFloat64String(br.buf, f) return bytesutil.ToUnsafeString(br.buf[bufLen:]) } @@ -1104,7 +1091,7 @@ func (br *blockResult) getBucketedValue(s string, bf *byStatsField) string { timestamp += bucketOffset bufLen := len(br.buf) - br.buf = marshalTimestampISO8601(br.buf, timestamp) + br.buf = marshalTimestampISO8601String(br.buf, timestamp) return bytesutil.ToUnsafeString(br.buf[bufLen:]) } @@ -1126,7 +1113,7 @@ func (br *blockResult) getBucketedValue(s string, bf *byStatsField) string { timestamp += bucketOffset bufLen := len(br.buf) - br.buf = marshalTimestampRFC3339Nano(br.buf, timestamp) + br.buf = marshalTimestampRFC3339NanoString(br.buf, timestamp) return bytesutil.ToUnsafeString(br.buf[bufLen:]) } @@ -1142,7 +1129,7 @@ func (br *blockResult) getBucketedValue(s string, bf *byStatsField) string { n += bucketOffset bufLen := len(br.buf) - br.buf = marshalIPv4(br.buf, n) + br.buf = marshalIPv4String(br.buf, n) return bytesutil.ToUnsafeString(br.buf[bufLen:]) } @@ -1488,20 +1475,20 @@ func (c *blockResultColumn) getFloatValueAtRow(rowIdx int) (float64, bool) { v := c.dictValues[dictIdx] return tryParseFloat64(v) case valueTypeUint8: - return float64(c.encodedValues[rowIdx][0]), true + v := c.encodedValues[rowIdx] + return float64(unmarshalUint8(v)), true case valueTypeUint16: - b := bytesutil.ToUnsafeBytes(c.encodedValues[rowIdx]) - return float64(encoding.UnmarshalUint16(b)), true + v := c.encodedValues[rowIdx] + return float64(unmarshalUint16(v)), true case valueTypeUint32: - b := bytesutil.ToUnsafeBytes(c.encodedValues[rowIdx]) - return float64(encoding.UnmarshalUint32(b)), true + v := c.encodedValues[rowIdx] + return float64(unmarshalUint32(v)), true case valueTypeUint64: - b := bytesutil.ToUnsafeBytes(c.encodedValues[rowIdx]) - return float64(encoding.UnmarshalUint64(b)), true + v := c.encodedValues[rowIdx] + return float64(unmarshalUint64(v)), true case valueTypeFloat64: - b := bytesutil.ToUnsafeBytes(c.encodedValues[rowIdx]) - n := encoding.UnmarshalUint64(b) - f := math.Float64frombits(n) + v := c.encodedValues[rowIdx] + f := unmarshalFloat64(v) return f, !math.IsNaN(f) case valueTypeIPv4: return 0, false @@ -1617,36 +1604,31 @@ func (c *blockResultColumn) sumValues(br *blockResult) (float64, int) { case valueTypeUint8: sum := uint64(0) for _, v := range c.encodedValues { - sum += uint64(v[0]) + sum += uint64(unmarshalUint8(v)) } return float64(sum), len(br.timestamps) case valueTypeUint16: sum := uint64(0) for _, v := range c.encodedValues { - b := bytesutil.ToUnsafeBytes(v) - sum += uint64(encoding.UnmarshalUint16(b)) + sum += uint64(unmarshalUint16(v)) } return float64(sum), len(br.timestamps) case valueTypeUint32: sum := uint64(0) for _, v := range c.encodedValues { - b := bytesutil.ToUnsafeBytes(v) - sum += uint64(encoding.UnmarshalUint32(b)) + sum += uint64(unmarshalUint32(v)) } return float64(sum), len(br.timestamps) case valueTypeUint64: sum := float64(0) for _, v := range c.encodedValues { - b := bytesutil.ToUnsafeBytes(v) - sum += float64(encoding.UnmarshalUint64(b)) + sum += float64(unmarshalUint64(v)) } return sum, len(br.timestamps) case valueTypeFloat64: sum := float64(0) for _, v := range c.encodedValues { - b := bytesutil.ToUnsafeBytes(v) - n := encoding.UnmarshalUint64(b) - f := math.Float64frombits(n) + f := unmarshalFloat64(v) if !math.IsNaN(f) { sum += f } diff --git a/lib/logstorage/filter_exact_prefix.go b/lib/logstorage/filter_exact_prefix.go index 8ee4b23c1..1fa6475c4 100644 --- a/lib/logstorage/filter_exact_prefix.go +++ b/lib/logstorage/filter_exact_prefix.go @@ -91,7 +91,7 @@ func matchTimestampISO8601ByExactPrefix(bs *blockSearch, ch *columnHeader, bm *b bb := bbPool.Get() visitValues(bs, ch, bm, func(v string) bool { - s := toTimestampISO8601StringExt(bs, bb, v) + s := toTimestampISO8601String(bs, bb, v) return matchExactPrefix(s, prefix) }) bbPool.Put(bb) @@ -108,7 +108,7 @@ func matchIPv4ByExactPrefix(bs *blockSearch, ch *columnHeader, bm *bitmap, prefi bb := bbPool.Get() visitValues(bs, ch, bm, func(v string) bool { - s := toIPv4StringExt(bs, bb, v) + s := toIPv4String(bs, bb, v) return matchExactPrefix(s, prefix) }) bbPool.Put(bb) @@ -126,7 +126,7 @@ func matchFloat64ByExactPrefix(bs *blockSearch, ch *columnHeader, bm *bitmap, pr bb := bbPool.Get() visitValues(bs, ch, bm, func(v string) bool { - s := toFloat64StringExt(bs, bb, v) + s := toFloat64String(bs, bb, v) return matchExactPrefix(s, prefix) }) bbPool.Put(bb) diff --git a/lib/logstorage/filter_ipv4_range.go b/lib/logstorage/filter_ipv4_range.go index 83d30761c..c952eb1da 100644 --- a/lib/logstorage/filter_ipv4_range.go +++ b/lib/logstorage/filter_ipv4_range.go @@ -3,8 +3,6 @@ package logstorage import ( "fmt" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) @@ -18,9 +16,9 @@ type filterIPv4Range struct { } func (fr *filterIPv4Range) String() string { - minValue := string(encoding.MarshalUint32(nil, fr.minValue)) - maxValue := string(encoding.MarshalUint32(nil, fr.maxValue)) - return fmt.Sprintf("%sipv4_range(%s, %s)", quoteFieldNameIfNeeded(fr.fieldName), toIPv4String(nil, minValue), toIPv4String(nil, maxValue)) + minValue := marshalIPv4String(nil, fr.minValue) + maxValue := marshalIPv4String(nil, fr.maxValue) + return fmt.Sprintf("%sipv4_range(%s, %s)", quoteFieldNameIfNeeded(fr.fieldName), minValue, maxValue) } func (fr *filterIPv4Range) apply(bs *blockSearch, bm *bitmap) { @@ -108,8 +106,7 @@ func matchIPv4ByRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minValue, m if len(v) != 4 { logger.Panicf("FATAL: %s: unexpected length for binary representation of IPv4: got %d; want 4", bs.partPath(), len(v)) } - b := bytesutil.ToUnsafeBytes(v) - n := encoding.UnmarshalUint32(b) + n := unmarshalIPv4(v) return n >= minValue && n <= maxValue }) } diff --git a/lib/logstorage/filter_len_range.go b/lib/logstorage/filter_len_range.go index b287dcf24..b372d7b0e 100644 --- a/lib/logstorage/filter_len_range.go +++ b/lib/logstorage/filter_len_range.go @@ -3,7 +3,6 @@ package logstorage import ( "unicode/utf8" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) @@ -89,7 +88,7 @@ func matchIPv4ByLenRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minLen, bb := bbPool.Get() visitValues(bs, ch, bm, func(v string) bool { - s := toIPv4StringExt(bs, bb, v) + s := toIPv4String(bs, bb, v) return matchLenRange(s, minLen, maxLen) }) bbPool.Put(bb) @@ -103,7 +102,7 @@ func matchFloat64ByLenRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minLe bb := bbPool.Get() visitValues(bs, ch, bm, func(v string) bool { - s := toFloat64StringExt(bs, bb, v) + s := toFloat64String(bs, bb, v) return matchLenRange(s, minLen, maxLen) }) bbPool.Put(bb) @@ -191,12 +190,10 @@ func matchMinMaxValueLen(ch *columnHeader, minLen, maxLen uint64) bool { bb := bbPool.Get() defer bbPool.Put(bb) - bb.B = marshalUint64(bb.B[:0], ch.minValue) - s := bytesutil.ToUnsafeString(bb.B) - if maxLen < uint64(len(s)) { + bb.B = marshalUint64String(bb.B[:0], ch.minValue) + if maxLen < uint64(len(bb.B)) { return false } - bb.B = marshalUint64(bb.B[:0], ch.maxValue) - s = bytesutil.ToUnsafeString(bb.B) - return minLen <= uint64(len(s)) + bb.B = marshalUint64String(bb.B[:0], ch.maxValue) + return minLen <= uint64(len(bb.B)) } diff --git a/lib/logstorage/filter_phrase.go b/lib/logstorage/filter_phrase.go index fc396668e..a2dbd996b 100644 --- a/lib/logstorage/filter_phrase.go +++ b/lib/logstorage/filter_phrase.go @@ -107,7 +107,7 @@ func matchTimestampISO8601ByPhrase(bs *blockSearch, ch *columnHeader, bm *bitmap bb := bbPool.Get() visitValues(bs, ch, bm, func(v string) bool { - s := toTimestampISO8601StringExt(bs, bb, v) + s := toTimestampISO8601String(bs, bb, v) return matchPhrase(s, phrase) }) bbPool.Put(bb) @@ -131,7 +131,7 @@ func matchIPv4ByPhrase(bs *blockSearch, ch *columnHeader, bm *bitmap, phrase str bb := bbPool.Get() visitValues(bs, ch, bm, func(v string) bool { - s := toIPv4StringExt(bs, bb, v) + s := toIPv4String(bs, bb, v) return matchPhrase(s, phrase) }) bbPool.Put(bb) @@ -160,7 +160,7 @@ func matchFloat64ByPhrase(bs *blockSearch, ch *columnHeader, bm *bitmap, phrase bb := bbPool.Get() visitValues(bs, ch, bm, func(v string) bool { - s := toFloat64StringExt(bs, bb, v) + s := toFloat64String(bs, bb, v) return matchPhrase(s, phrase) }) bbPool.Put(bb) @@ -294,26 +294,29 @@ func isMsgFieldName(fieldName string) bool { return fieldName == "" || fieldName == "_msg" } -func toFloat64StringExt(bs *blockSearch, bb *bytesutil.ByteBuffer, v string) string { +func toFloat64String(bs *blockSearch, bb *bytesutil.ByteBuffer, v string) string { if len(v) != 8 { logger.Panicf("FATAL: %s: unexpected length for binary representation of floating-point number: got %d; want 8", bs.partPath(), len(v)) } - bb.B = toFloat64String(bb.B[:0], v) + f := unmarshalFloat64(v) + bb.B = marshalFloat64String(bb.B[:0], f) return bytesutil.ToUnsafeString(bb.B) } -func toIPv4StringExt(bs *blockSearch, bb *bytesutil.ByteBuffer, v string) string { +func toIPv4String(bs *blockSearch, bb *bytesutil.ByteBuffer, v string) string { if len(v) != 4 { logger.Panicf("FATAL: %s: unexpected length for binary representation of IPv4: got %d; want 4", bs.partPath(), len(v)) } - bb.B = toIPv4String(bb.B[:0], v) + ip := unmarshalIPv4(v) + bb.B = marshalIPv4String(bb.B[:0], ip) return bytesutil.ToUnsafeString(bb.B) } -func toTimestampISO8601StringExt(bs *blockSearch, bb *bytesutil.ByteBuffer, v string) string { +func toTimestampISO8601String(bs *blockSearch, bb *bytesutil.ByteBuffer, v string) string { if len(v) != 8 { logger.Panicf("FATAL: %s: unexpected length for binary representation of ISO8601 timestamp: got %d; want 8", bs.partPath(), len(v)) } - bb.B = toTimestampISO8601String(bb.B[:0], v) + timestamp := unmarshalTimestampISO8601(v) + bb.B = marshalTimestampISO8601String(bb.B[:0], timestamp) return bytesutil.ToUnsafeString(bb.B) } diff --git a/lib/logstorage/filter_prefix.go b/lib/logstorage/filter_prefix.go index ea22711fd..953c3e2b6 100644 --- a/lib/logstorage/filter_prefix.go +++ b/lib/logstorage/filter_prefix.go @@ -7,7 +7,6 @@ import ( "unicode/utf8" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) @@ -102,7 +101,7 @@ func matchTimestampISO8601ByPrefix(bs *blockSearch, ch *columnHeader, bm *bitmap bb := bbPool.Get() visitValues(bs, ch, bm, func(v string) bool { - s := toTimestampISO8601StringExt(bs, bb, v) + s := toTimestampISO8601String(bs, bb, v) return matchPrefix(s, prefix) }) bbPool.Put(bb) @@ -123,7 +122,7 @@ func matchIPv4ByPrefix(bs *blockSearch, ch *columnHeader, bm *bitmap, prefix str bb := bbPool.Get() visitValues(bs, ch, bm, func(v string) bool { - s := toIPv4StringExt(bs, bb, v) + s := toIPv4String(bs, bb, v) return matchPrefix(s, prefix) }) bbPool.Put(bb) @@ -151,7 +150,7 @@ func matchFloat64ByPrefix(bs *blockSearch, ch *columnHeader, bm *bitmap, prefix bb := bbPool.Get() visitValues(bs, ch, bm, func(v string) bool { - s := toFloat64StringExt(bs, bb, v) + s := toFloat64String(bs, bb, v) return matchPrefix(s, prefix) }) bbPool.Put(bb) @@ -321,8 +320,8 @@ func toUint8String(bs *blockSearch, bb *bytesutil.ByteBuffer, v string) string { if len(v) != 1 { logger.Panicf("FATAL: %s: unexpected length for binary representation of uint8 number: got %d; want 1", bs.partPath(), len(v)) } - n := uint64(v[0]) - bb.B = marshalUint64(bb.B[:0], n) + n := unmarshalUint8(v) + bb.B = marshalUint8String(bb.B[:0], n) return bytesutil.ToUnsafeString(bb.B) } @@ -330,9 +329,8 @@ func toUint16String(bs *blockSearch, bb *bytesutil.ByteBuffer, v string) string if len(v) != 2 { logger.Panicf("FATAL: %s: unexpected length for binary representation of uint16 number: got %d; want 2", bs.partPath(), len(v)) } - b := bytesutil.ToUnsafeBytes(v) - n := uint64(encoding.UnmarshalUint16(b)) - bb.B = marshalUint64(bb.B[:0], n) + n := unmarshalUint16(v) + bb.B = marshalUint16String(bb.B[:0], n) return bytesutil.ToUnsafeString(bb.B) } @@ -340,9 +338,8 @@ func toUint32String(bs *blockSearch, bb *bytesutil.ByteBuffer, v string) string if len(v) != 4 { logger.Panicf("FATAL: %s: unexpected length for binary representation of uint32 number: got %d; want 4", bs.partPath(), len(v)) } - b := bytesutil.ToUnsafeBytes(v) - n := uint64(encoding.UnmarshalUint32(b)) - bb.B = marshalUint64(bb.B[:0], n) + n := unmarshalUint32(v) + bb.B = marshalUint32String(bb.B[:0], n) return bytesutil.ToUnsafeString(bb.B) } @@ -350,8 +347,7 @@ func toUint64String(bs *blockSearch, bb *bytesutil.ByteBuffer, v string) string if len(v) != 8 { logger.Panicf("FATAL: %s: unexpected length for binary representation of uint64 number: got %d; want 8", bs.partPath(), len(v)) } - b := bytesutil.ToUnsafeBytes(v) - n := encoding.UnmarshalUint64(b) - bb.B = marshalUint64(bb.B[:0], n) + n := unmarshalUint64(v) + bb.B = marshalUint64String(bb.B[:0], n) return bytesutil.ToUnsafeString(bb.B) } diff --git a/lib/logstorage/filter_range.go b/lib/logstorage/filter_range.go index 66b3bc692..29f8eb78f 100644 --- a/lib/logstorage/filter_range.go +++ b/lib/logstorage/filter_range.go @@ -3,8 +3,6 @@ package logstorage import ( "math" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) @@ -83,9 +81,7 @@ func matchFloat64ByRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minValue if len(v) != 8 { logger.Panicf("FATAL: %s: unexpected length for binary representation of floating-point number: got %d; want 8", bs.partPath(), len(v)) } - b := bytesutil.ToUnsafeBytes(v) - n := encoding.UnmarshalUint64(b) - f := math.Float64frombits(n) + f := unmarshalFloat64(v) return f >= minValue && f <= maxValue }) } @@ -118,7 +114,7 @@ func matchUint8ByRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minValue, if len(v) != 1 { logger.Panicf("FATAL: %s: unexpected length for binary representation of uint8 number: got %d; want 1", bs.partPath(), len(v)) } - n := uint64(v[0]) + n := uint64(unmarshalUint8(v)) return n >= minValueUint && n <= maxValueUint }) bbPool.Put(bb) @@ -135,8 +131,7 @@ func matchUint16ByRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minValue, if len(v) != 2 { logger.Panicf("FATAL: %s: unexpected length for binary representation of uint16 number: got %d; want 2", bs.partPath(), len(v)) } - b := bytesutil.ToUnsafeBytes(v) - n := uint64(encoding.UnmarshalUint16(b)) + n := uint64(unmarshalUint16(v)) return n >= minValueUint && n <= maxValueUint }) bbPool.Put(bb) @@ -153,8 +148,7 @@ func matchUint32ByRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minValue, if len(v) != 4 { logger.Panicf("FATAL: %s: unexpected length for binary representation of uint8 number: got %d; want 4", bs.partPath(), len(v)) } - b := bytesutil.ToUnsafeBytes(v) - n := uint64(encoding.UnmarshalUint32(b)) + n := uint64(unmarshalUint32(v)) return n >= minValueUint && n <= maxValueUint }) bbPool.Put(bb) @@ -171,8 +165,7 @@ func matchUint64ByRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minValue, if len(v) != 8 { logger.Panicf("FATAL: %s: unexpected length for binary representation of uint8 number: got %d; want 8", bs.partPath(), len(v)) } - b := bytesutil.ToUnsafeBytes(v) - n := encoding.UnmarshalUint64(b) + n := unmarshalUint64(v) return n >= minValueUint && n <= maxValueUint }) bbPool.Put(bb) diff --git a/lib/logstorage/filter_regexp.go b/lib/logstorage/filter_regexp.go index 0217d0aaa..990d62ecf 100644 --- a/lib/logstorage/filter_regexp.go +++ b/lib/logstorage/filter_regexp.go @@ -69,7 +69,7 @@ func (fr *filterRegexp) apply(bs *blockSearch, bm *bitmap) { func matchTimestampISO8601ByRegexp(bs *blockSearch, ch *columnHeader, bm *bitmap, re *regexp.Regexp) { bb := bbPool.Get() visitValues(bs, ch, bm, func(v string) bool { - s := toTimestampISO8601StringExt(bs, bb, v) + s := toTimestampISO8601String(bs, bb, v) return re.MatchString(s) }) bbPool.Put(bb) @@ -78,7 +78,7 @@ func matchTimestampISO8601ByRegexp(bs *blockSearch, ch *columnHeader, bm *bitmap func matchIPv4ByRegexp(bs *blockSearch, ch *columnHeader, bm *bitmap, re *regexp.Regexp) { bb := bbPool.Get() visitValues(bs, ch, bm, func(v string) bool { - s := toIPv4StringExt(bs, bb, v) + s := toIPv4String(bs, bb, v) return re.MatchString(s) }) bbPool.Put(bb) @@ -87,7 +87,7 @@ func matchIPv4ByRegexp(bs *blockSearch, ch *columnHeader, bm *bitmap, re *regexp func matchFloat64ByRegexp(bs *blockSearch, ch *columnHeader, bm *bitmap, re *regexp.Regexp) { bb := bbPool.Get() visitValues(bs, ch, bm, func(v string) bool { - s := toFloat64StringExt(bs, bb, v) + s := toFloat64String(bs, bb, v) return re.MatchString(s) }) bbPool.Put(bb) diff --git a/lib/logstorage/filter_sequence.go b/lib/logstorage/filter_sequence.go index 075999593..ff9dc5314 100644 --- a/lib/logstorage/filter_sequence.go +++ b/lib/logstorage/filter_sequence.go @@ -124,7 +124,7 @@ func matchTimestampISO8601BySequence(bs *blockSearch, ch *columnHeader, bm *bitm // Slow path - phrases contain incomplete timestamp. Search over string representation of the timestamp. bb := bbPool.Get() visitValues(bs, ch, bm, func(v string) bool { - s := toTimestampISO8601StringExt(bs, bb, v) + s := toTimestampISO8601String(bs, bb, v) return matchSequence(s, phrases) }) bbPool.Put(bb) @@ -145,7 +145,7 @@ func matchIPv4BySequence(bs *blockSearch, ch *columnHeader, bm *bitmap, phrases, // the ip to string before searching for prefix there. bb := bbPool.Get() visitValues(bs, ch, bm, func(v string) bool { - s := toIPv4StringExt(bs, bb, v) + s := toIPv4String(bs, bb, v) return matchSequence(s, phrases) }) bbPool.Put(bb) @@ -163,7 +163,7 @@ func matchFloat64BySequence(bs *blockSearch, ch *columnHeader, bm *bitmap, phras // of floating-point numbers :( bb := bbPool.Get() visitValues(bs, ch, bm, func(v string) bool { - s := toFloat64StringExt(bs, bb, v) + s := toFloat64String(bs, bb, v) return matchSequence(s, phrases) }) bbPool.Put(bb) diff --git a/lib/logstorage/filter_string_range.go b/lib/logstorage/filter_string_range.go index 0bb77e16f..c360501c0 100644 --- a/lib/logstorage/filter_string_range.go +++ b/lib/logstorage/filter_string_range.go @@ -81,7 +81,7 @@ func matchTimestampISO8601ByStringRange(bs *blockSearch, ch *columnHeader, bm *b bb := bbPool.Get() visitValues(bs, ch, bm, func(v string) bool { - s := toTimestampISO8601StringExt(bs, bb, v) + s := toTimestampISO8601String(bs, bb, v) return matchStringRange(s, minValue, maxValue) }) bbPool.Put(bb) @@ -95,7 +95,7 @@ func matchIPv4ByStringRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minVa bb := bbPool.Get() visitValues(bs, ch, bm, func(v string) bool { - s := toIPv4StringExt(bs, bb, v) + s := toIPv4String(bs, bb, v) return matchStringRange(s, minValue, maxValue) }) bbPool.Put(bb) @@ -109,7 +109,7 @@ func matchFloat64ByStringRange(bs *blockSearch, ch *columnHeader, bm *bitmap, mi bb := bbPool.Get() visitValues(bs, ch, bm, func(v string) bool { - s := toFloat64StringExt(bs, bb, v) + s := toFloat64String(bs, bb, v) return matchStringRange(s, minValue, maxValue) }) bbPool.Put(bb) diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index a1f1422a8..41a81fd3b 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -208,8 +208,8 @@ func (q *Query) String() string { // AddTimeFilter adds global filter _time:[start ... end] to q. func (q *Query) AddTimeFilter(start, end int64) { - startStr := marshalTimestampRFC3339Nano(nil, start) - endStr := marshalTimestampRFC3339Nano(nil, end) + startStr := marshalTimestampRFC3339NanoString(nil, start) + endStr := marshalTimestampRFC3339NanoString(nil, end) ft := &filterTime{ minTimestamp: start, maxTimestamp: end, diff --git a/lib/logstorage/stats_max.go b/lib/logstorage/stats_max.go index 1617db3ce..e477a010f 100644 --- a/lib/logstorage/stats_max.go +++ b/lib/logstorage/stats_max.go @@ -1,11 +1,13 @@ package logstorage import ( + "math" "slices" "strings" "unsafe" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) type statsMax struct { @@ -61,14 +63,14 @@ func (smp *statsMaxProcessor) updateStatsForRow(br *blockResult, rowIdx int) int // Find the minimum value across all the fields for the given row for _, c := range br.getColumns() { v := c.getValueAtRow(br, rowIdx) - smp.updateState(v) + smp.updateStateString(v) } } else { // Find the minimum value across the requested fields for the given row for _, field := range smp.sm.fields { c := br.getColumnByName(field) v := c.getValueAtRow(br, rowIdx) - smp.updateState(v) + smp.updateStateString(v) } } @@ -78,17 +80,18 @@ func (smp *statsMaxProcessor) updateStatsForRow(br *blockResult, rowIdx int) int func (smp *statsMaxProcessor) mergeState(sfp statsProcessor) { src := sfp.(*statsMaxProcessor) if src.hasMax { - smp.updateState(src.max) + smp.updateStateString(src.max) } } func (smp *statsMaxProcessor) updateStateForColumn(br *blockResult, c *blockResultColumn) { + if len(br.timestamps) == 0 { + return + } + if c.isTime { // Special case for time column timestamps := br.timestamps - if len(timestamps) == 0 { - return - } maxTimestamp := timestamps[len(timestamps)-1] for _, timestamp := range timestamps[:len(timestamps)-1] { if timestamp > maxTimestamp { @@ -97,9 +100,8 @@ func (smp *statsMaxProcessor) updateStateForColumn(br *blockResult, c *blockResu } bb := bbPool.Get() - bb.B = marshalTimestampRFC3339Nano(bb.B[:0], maxTimestamp) - v := bytesutil.ToUnsafeString(bb.B) - smp.updateState(v) + bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], maxTimestamp) + smp.updateStateBytes(bb.B) bbPool.Put(bb) return @@ -107,16 +109,114 @@ func (smp *statsMaxProcessor) updateStateForColumn(br *blockResult, c *blockResu if c.isConst { // Special case for const column v := c.encodedValues[0] - smp.updateState(v) + smp.updateStateString(v) return } - for _, v := range c.getValues(br) { - smp.updateState(v) + switch c.valueType { + case valueTypeString: + for _, v := range c.encodedValues { + smp.updateStateString(v) + } + case valueTypeDict: + for _, v := range c.dictValues { + smp.updateStateString(v) + } + case valueTypeUint8: + maxN := unmarshalUint8(c.encodedValues[0]) + for _, v := range c.encodedValues[1:] { + n := unmarshalUint8(v) + if n > maxN { + maxN = n + } + } + bb := bbPool.Get() + bb.B = marshalUint8String(bb.B[:0], maxN) + smp.updateStateBytes(bb.B) + bbPool.Put(bb) + case valueTypeUint16: + maxN := unmarshalUint16(c.encodedValues[0]) + for _, v := range c.encodedValues[1:] { + n := unmarshalUint16(v) + if n > maxN { + maxN = n + } + } + bb := bbPool.Get() + bb.B = marshalUint16String(bb.B[:0], maxN) + smp.updateStateBytes(bb.B) + bbPool.Put(bb) + case valueTypeUint32: + maxN := unmarshalUint32(c.encodedValues[0]) + for _, v := range c.encodedValues[1:] { + n := unmarshalUint32(v) + if n > maxN { + maxN = n + } + } + bb := bbPool.Get() + bb.B = marshalUint32String(bb.B[:0], maxN) + smp.updateStateBytes(bb.B) + bbPool.Put(bb) + case valueTypeUint64: + maxN := unmarshalUint64(c.encodedValues[0]) + for _, v := range c.encodedValues[1:] { + n := unmarshalUint64(v) + if n > maxN { + maxN = n + } + } + bb := bbPool.Get() + bb.B = marshalUint64String(bb.B[:0], maxN) + smp.updateStateBytes(bb.B) + bbPool.Put(bb) + case valueTypeFloat64: + maxF := unmarshalFloat64(c.encodedValues[0]) + for _, v := range c.encodedValues[1:] { + f := unmarshalFloat64(v) + if math.IsNaN(maxF) || f > maxF { + maxF = f + } + } + bb := bbPool.Get() + bb.B = marshalFloat64String(bb.B[:0], maxF) + smp.updateStateBytes(bb.B) + bbPool.Put(bb) + case valueTypeIPv4: + maxIP := unmarshalIPv4(c.encodedValues[0]) + for _, v := range c.encodedValues[1:] { + ip := unmarshalIPv4(v) + if ip > maxIP { + maxIP = ip + } + } + bb := bbPool.Get() + bb.B = marshalIPv4String(bb.B[:0], maxIP) + smp.updateStateBytes(bb.B) + bbPool.Put(bb) + case valueTypeTimestampISO8601: + maxTimestamp := unmarshalTimestampISO8601(c.encodedValues[0]) + for _, v := range c.encodedValues[1:] { + timestamp := unmarshalTimestampISO8601(v) + if timestamp > maxTimestamp { + maxTimestamp = timestamp + } + } + bb := bbPool.Get() + bb.B = marshalTimestampISO8601String(bb.B[:0], maxTimestamp) + smp.updateStateBytes(bb.B) + bbPool.Put(bb) + default: + logger.Panicf("BUG: unknown valueType=%d", c.valueType) } } -func (smp *statsMaxProcessor) updateState(v string) { +func (smp *statsMaxProcessor) updateStateBytes(b []byte) { + v := bytesutil.ToUnsafeString(b) + smp.updateStateString(v) +} + +func (smp *statsMaxProcessor) updateStateString(v string) { if smp.hasMax && !lessString(smp.max, v) { return } diff --git a/lib/logstorage/stats_min.go b/lib/logstorage/stats_min.go index 9634243e5..9b614b386 100644 --- a/lib/logstorage/stats_min.go +++ b/lib/logstorage/stats_min.go @@ -1,11 +1,13 @@ package logstorage import ( + "math" "slices" "strings" "unsafe" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) type statsMin struct { @@ -61,14 +63,14 @@ func (smp *statsMinProcessor) updateStatsForRow(br *blockResult, rowIdx int) int // Find the minimum value across all the fields for the given row for _, c := range br.getColumns() { v := c.getValueAtRow(br, rowIdx) - smp.updateState(v) + smp.updateStateString(v) } } else { // Find the minimum value across the requested fields for the given row for _, field := range smp.sm.fields { c := br.getColumnByName(field) v := c.getValueAtRow(br, rowIdx) - smp.updateState(v) + smp.updateStateString(v) } } @@ -78,17 +80,18 @@ func (smp *statsMinProcessor) updateStatsForRow(br *blockResult, rowIdx int) int func (smp *statsMinProcessor) mergeState(sfp statsProcessor) { src := sfp.(*statsMinProcessor) if src.hasMin { - smp.updateState(src.min) + smp.updateStateString(src.min) } } func (smp *statsMinProcessor) updateStateForColumn(br *blockResult, c *blockResultColumn) { + if len(br.timestamps) == 0 { + return + } + if c.isTime { // Special case for time column timestamps := br.timestamps - if len(timestamps) == 0 { - return - } minTimestamp := timestamps[0] for _, timestamp := range timestamps[1:] { if timestamp < minTimestamp { @@ -97,9 +100,8 @@ func (smp *statsMinProcessor) updateStateForColumn(br *blockResult, c *blockResu } bb := bbPool.Get() - bb.B = marshalTimestampRFC3339Nano(bb.B[:0], minTimestamp) - v := bytesutil.ToUnsafeString(bb.B) - smp.updateState(v) + bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], minTimestamp) + smp.updateStateBytes(bb.B) bbPool.Put(bb) return @@ -107,16 +109,114 @@ func (smp *statsMinProcessor) updateStateForColumn(br *blockResult, c *blockResu if c.isConst { // Special case for const column v := c.encodedValues[0] - smp.updateState(v) + smp.updateStateString(v) return } - for _, v := range c.getValues(br) { - smp.updateState(v) + switch c.valueType { + case valueTypeString: + for _, v := range c.encodedValues { + smp.updateStateString(v) + } + case valueTypeDict: + for _, v := range c.dictValues { + smp.updateStateString(v) + } + case valueTypeUint8: + minN := unmarshalUint8(c.encodedValues[0]) + for _, v := range c.encodedValues[1:] { + n := unmarshalUint8(v) + if n < minN { + minN = n + } + } + bb := bbPool.Get() + bb.B = marshalUint8String(bb.B[:0], minN) + smp.updateStateBytes(bb.B) + bbPool.Put(bb) + case valueTypeUint16: + minN := unmarshalUint16(c.encodedValues[0]) + for _, v := range c.encodedValues[1:] { + n := unmarshalUint16(v) + if n < minN { + minN = n + } + } + bb := bbPool.Get() + bb.B = marshalUint16String(bb.B[:0], minN) + smp.updateStateBytes(bb.B) + bbPool.Put(bb) + case valueTypeUint32: + minN := unmarshalUint32(c.encodedValues[0]) + for _, v := range c.encodedValues[1:] { + n := unmarshalUint32(v) + if n < minN { + minN = n + } + } + bb := bbPool.Get() + bb.B = marshalUint32String(bb.B[:0], minN) + smp.updateStateBytes(bb.B) + bbPool.Put(bb) + case valueTypeUint64: + minN := unmarshalUint64(c.encodedValues[0]) + for _, v := range c.encodedValues[1:] { + n := unmarshalUint64(v) + if n < minN { + minN = n + } + } + bb := bbPool.Get() + bb.B = marshalUint64String(bb.B[:0], minN) + smp.updateStateBytes(bb.B) + bbPool.Put(bb) + case valueTypeFloat64: + minF := unmarshalFloat64(c.encodedValues[0]) + for _, v := range c.encodedValues[1:] { + f := unmarshalFloat64(v) + if math.IsNaN(minF) || f < minF { + minF = f + } + } + bb := bbPool.Get() + bb.B = marshalFloat64String(bb.B[:0], minF) + smp.updateStateBytes(bb.B) + bbPool.Put(bb) + case valueTypeIPv4: + minIP := unmarshalIPv4(c.encodedValues[0]) + for _, v := range c.encodedValues[1:] { + ip := unmarshalIPv4(v) + if ip < minIP { + minIP = ip + } + } + bb := bbPool.Get() + bb.B = marshalIPv4String(bb.B[:0], minIP) + smp.updateStateBytes(bb.B) + bbPool.Put(bb) + case valueTypeTimestampISO8601: + minTimestamp := unmarshalTimestampISO8601(c.encodedValues[0]) + for _, v := range c.encodedValues[1:] { + timestamp := unmarshalTimestampISO8601(v) + if timestamp < minTimestamp { + minTimestamp = timestamp + } + } + bb := bbPool.Get() + bb.B = marshalTimestampISO8601String(bb.B[:0], minTimestamp) + smp.updateStateBytes(bb.B) + bbPool.Put(bb) + default: + logger.Panicf("BUG: unknown valueType=%d", c.valueType) } } -func (smp *statsMinProcessor) updateState(v string) { +func (smp *statsMinProcessor) updateStateBytes(b []byte) { + v := bytesutil.ToUnsafeString(b) + smp.updateStateString(v) +} + +func (smp *statsMinProcessor) updateStateString(v string) { if smp.hasMin && !lessString(v, smp.min) { return } diff --git a/lib/logstorage/values_encoder.go b/lib/logstorage/values_encoder.go index 8d323aae3..56ac11381 100644 --- a/lib/logstorage/values_encoder.go +++ b/lib/logstorage/values_encoder.go @@ -168,9 +168,9 @@ func (vd *valuesDecoder) decodeInplace(values []string, vt valueType, dictValues if len(v) != 1 { return fmt.Errorf("unexpected value length for uint8; got %d; want 1", len(v)) } - n := uint64(v[0]) + n := unmarshalUint8(v) dstLen := len(dstBuf) - dstBuf = marshalUint64(dstBuf, n) + dstBuf = marshalUint8String(dstBuf, n) values[i] = bytesutil.ToUnsafeString(dstBuf[dstLen:]) } case valueTypeUint16: @@ -178,10 +178,9 @@ func (vd *valuesDecoder) decodeInplace(values []string, vt valueType, dictValues if len(v) != 2 { return fmt.Errorf("unexpected value length for uint16; got %d; want 2", len(v)) } - b := bytesutil.ToUnsafeBytes(v) - n := uint64(encoding.UnmarshalUint16(b)) + n := unmarshalUint16(v) dstLen := len(dstBuf) - dstBuf = marshalUint64(dstBuf, n) + dstBuf = marshalUint16String(dstBuf, n) values[i] = bytesutil.ToUnsafeString(dstBuf[dstLen:]) } case valueTypeUint32: @@ -189,10 +188,9 @@ func (vd *valuesDecoder) decodeInplace(values []string, vt valueType, dictValues if len(v) != 4 { return fmt.Errorf("unexpected value length for uint32; got %d; want 4", len(v)) } - b := bytesutil.ToUnsafeBytes(v) - n := uint64(encoding.UnmarshalUint32(b)) + n := unmarshalUint32(v) dstLen := len(dstBuf) - dstBuf = marshalUint64(dstBuf, n) + dstBuf = marshalUint32String(dstBuf, n) values[i] = bytesutil.ToUnsafeString(dstBuf[dstLen:]) } case valueTypeUint64: @@ -200,10 +198,9 @@ func (vd *valuesDecoder) decodeInplace(values []string, vt valueType, dictValues if len(v) != 8 { return fmt.Errorf("unexpected value length for uint64; got %d; want 8", len(v)) } - b := bytesutil.ToUnsafeBytes(v) - n := encoding.UnmarshalUint64(b) + n := unmarshalUint64(v) dstLen := len(dstBuf) - dstBuf = marshalUint64(dstBuf, n) + dstBuf = marshalUint64String(dstBuf, n) values[i] = bytesutil.ToUnsafeString(dstBuf[dstLen:]) } case valueTypeFloat64: @@ -211,8 +208,9 @@ func (vd *valuesDecoder) decodeInplace(values []string, vt valueType, dictValues if len(v) != 8 { return fmt.Errorf("unexpected value length for uint64; got %d; want 8", len(v)) } + f := unmarshalFloat64(v) dstLen := len(dstBuf) - dstBuf = toFloat64String(dstBuf, v) + dstBuf = marshalFloat64String(dstBuf, f) values[i] = bytesutil.ToUnsafeString(dstBuf[dstLen:]) } case valueTypeIPv4: @@ -220,8 +218,9 @@ func (vd *valuesDecoder) decodeInplace(values []string, vt valueType, dictValues if len(v) != 4 { return fmt.Errorf("unexpected value length for ipv4; got %d; want 4", len(v)) } + ip := unmarshalIPv4(v) dstLen := len(dstBuf) - dstBuf = toIPv4String(dstBuf, v) + dstBuf = marshalIPv4String(dstBuf, ip) values[i] = bytesutil.ToUnsafeString(dstBuf[dstLen:]) } case valueTypeTimestampISO8601: @@ -229,8 +228,9 @@ func (vd *valuesDecoder) decodeInplace(values []string, vt valueType, dictValues if len(v) != 8 { return fmt.Errorf("unexpected value length for uint64; got %d; want 8", len(v)) } + timestamp := unmarshalTimestampISO8601(v) dstLen := len(dstBuf) - dstBuf = toTimestampISO8601String(dstBuf, v) + dstBuf = marshalTimestampISO8601String(dstBuf, timestamp) values[i] = bytesutil.ToUnsafeString(dstBuf[dstLen:]) } default: @@ -241,32 +241,6 @@ func (vd *valuesDecoder) decodeInplace(values []string, vt valueType, dictValues return nil } -func toTimestampISO8601String(dst []byte, v string) []byte { - b := bytesutil.ToUnsafeBytes(v) - n := encoding.UnmarshalUint64(b) - dst = marshalTimestampISO8601(dst, int64(n)) - return dst -} - -func toIPv4String(dst []byte, v string) []byte { - dst = marshalUint64(dst, uint64(v[0])) - dst = append(dst, '.') - dst = marshalUint64(dst, uint64(v[1])) - dst = append(dst, '.') - dst = marshalUint64(dst, uint64(v[2])) - dst = append(dst, '.') - dst = marshalUint64(dst, uint64(v[3])) - return dst -} - -func toFloat64String(dst []byte, v string) []byte { - b := bytesutil.ToUnsafeBytes(v) - n := encoding.UnmarshalUint64(b) - f := math.Float64frombits(n) - dst = marshalFloat64(dst, f) - return dst -} - func getValuesDecoder() *valuesDecoder { v := valuesDecoderPool.Get() if v == nil { @@ -356,11 +330,6 @@ func tryParseTimestampRFC3339Nano(s string) (int64, bool) { return nsecs, true } -// marshalTimestampRFC3339Nano appends RFC3339Nano-formatted nsecs to dst and returns the result. -func marshalTimestampRFC3339Nano(dst []byte, nsecs int64) []byte { - return time.Unix(0, nsecs).UTC().AppendFormat(dst, time.RFC3339Nano) -} - // tryParseTimestampISO8601 parses 'YYYY-MM-DDThh:mm:ss.mssZ' and returns unix timestamp in nanoseconds. // // The returned timestamp can be negative if s is smaller than 1970 year. @@ -404,13 +373,6 @@ func tryParseTimestampISO8601(s string) (int64, bool) { return nsecs, true } -// marshalTimestampISO8601 appends ISO8601-formatted nsecs to dst and returns the result. -func marshalTimestampISO8601(dst []byte, nsecs int64) []byte { - return time.Unix(0, nsecs).UTC().AppendFormat(dst, iso8601Timestamp) -} - -const iso8601Timestamp = "2006-01-02T15:04:05.000Z" - // tryParseTimestampSecs parses YYYY-MM-DDTHH:mm:ss into unix timestamp in seconds. func tryParseTimestampSecs(s string) (int64, bool, string) { // Parse year @@ -517,11 +479,6 @@ func tryParseUint64(s string) (uint64, bool) { return n, true } -// marshalUint64 appends string representation of n to dst and returns the result. -func marshalUint64(dst []byte, n uint64) []byte { - return strconv.AppendUint(dst, n, 10) -} - func tryIPv4Encoding(dstBuf []byte, dstValues, srcValues []string) ([]byte, []string, valueType, uint64, uint64) { u32s := encoding.GetUint32s(len(srcValues)) defer encoding.PutUint32s(u32s) @@ -607,18 +564,6 @@ func tryParseIPv4(s string) (uint32, bool) { return ipv4, true } -// marshalIPv4 appends string representation of IPv4 address in n to dst and returns the result. -func marshalIPv4(dst []byte, n uint32) []byte { - dst = marshalUint64(dst, uint64(n>>24)) - dst = append(dst, '.') - dst = marshalUint64(dst, uint64((n>>16)&0xff)) - dst = append(dst, '.') - dst = marshalUint64(dst, uint64((n>>8)&0xff)) - dst = append(dst, '.') - dst = marshalUint64(dst, uint64(n&0xff)) - return dst -} - func tryFloat64Encoding(dstBuf []byte, dstValues, srcValues []string) ([]byte, []string, valueType, uint64, uint64) { u64s := encoding.GetUint64s(len(srcValues)) defer encoding.PutUint64s(u64s) @@ -710,11 +655,6 @@ func tryParseFloat64(s string) (float64, bool) { return f, true } -// marshalFloat64 appends formatted f to dst and returns the result. -func marshalFloat64(dst []byte, f float64) []byte { - return strconv.AppendFloat(dst, f, 'f', -1, 64) -} - // tryParseBytes parses user-readable bytes representation in s. // // Supported suffixes: @@ -935,53 +875,53 @@ func marshalDuration(dst []byte, nsecs int64) []byte { if nsecs >= nsecsPerWeek { weeks := nsecs / nsecsPerWeek nsecs -= weeks * nsecsPerWeek - dst = marshalUint64(dst, uint64(weeks)) + dst = marshalUint64String(dst, uint64(weeks)) dst = append(dst, 'w') } if nsecs >= nsecsPerDay { days := nsecs / nsecsPerDay nsecs -= days * nsecsPerDay - dst = marshalUint64(dst, uint64(days)) + dst = marshalUint8String(dst, uint8(days)) dst = append(dst, 'd') } if nsecs >= nsecsPerHour { hours := nsecs / nsecsPerHour nsecs -= hours * nsecsPerHour - dst = marshalUint64(dst, uint64(hours)) + dst = marshalUint8String(dst, uint8(hours)) dst = append(dst, 'h') } if nsecs >= nsecsPerMinute { minutes := nsecs / nsecsPerMinute nsecs -= minutes * nsecsPerMinute - dst = marshalUint64(dst, uint64(minutes)) + dst = marshalUint8String(dst, uint8(minutes)) dst = append(dst, 'm') } if nsecs >= nsecsPerSecond { if formatFloat64Seconds { seconds := float64(nsecs) / nsecsPerSecond - dst = marshalFloat64(dst, seconds) + dst = marshalFloat64String(dst, seconds) dst = append(dst, 's') return dst } seconds := nsecs / nsecsPerSecond nsecs -= seconds * nsecsPerSecond - dst = marshalUint64(dst, uint64(seconds)) + dst = marshalUint8String(dst, uint8(seconds)) dst = append(dst, 's') } if nsecs >= nsecsPerMillisecond { msecs := nsecs / nsecsPerMillisecond nsecs -= msecs * nsecsPerMillisecond - dst = marshalUint64(dst, uint64(msecs)) + dst = marshalUint16String(dst, uint16(msecs)) dst = append(dst, "ms"...) } if nsecs >= nsecsPerMicrosecond { usecs := nsecs / nsecsPerMicrosecond nsecs -= usecs * nsecsPerMicrosecond - dst = marshalUint64(dst, uint64(usecs)) + dst = marshalUint16String(dst, uint16(usecs)) dst = append(dst, "µs"...) } if nsecs > 0 { - dst = marshalUint64(dst, uint64(nsecs)) + dst = marshalUint16String(dst, uint16(nsecs)) dst = append(dst, "ns"...) } return dst @@ -1156,3 +1096,96 @@ func (vd *valuesDict) unmarshal(a *arena, src []byte) ([]byte, error) { } return src, nil } + +func unmarshalUint8(v string) uint8 { + return uint8(v[0]) +} + +func unmarshalUint16(v string) uint16 { + b := bytesutil.ToUnsafeBytes(v) + return encoding.UnmarshalUint16(b) +} + +func unmarshalUint32(v string) uint32 { + b := bytesutil.ToUnsafeBytes(v) + return encoding.UnmarshalUint32(b) +} + +func unmarshalUint64(v string) uint64 { + b := bytesutil.ToUnsafeBytes(v) + return encoding.UnmarshalUint64(b) +} + +func unmarshalFloat64(v string) float64 { + n := unmarshalUint64(v) + return math.Float64frombits(n) +} + +func unmarshalIPv4(v string) uint32 { + return unmarshalUint32(v) +} + +func unmarshalTimestampISO8601(v string) int64 { + n := unmarshalUint64(v) + return int64(n) +} + +func marshalUint8String(dst []byte, n uint8) []byte { + if n < 10 { + return append(dst, '0'+n) + } + if n < 100 { + return append(dst, '0'+n/10, '0'+n%10) + } + + if n < 200 { + dst = append(dst, '1') + n -= 100 + } else { + dst = append(dst, '2') + n -= 200 + } + if n < 10 { + return append(dst, '0', '0'+n) + } + return append(dst, '0'+n/10, '0'+n%10) +} + +func marshalUint16String(dst []byte, n uint16) []byte { + return marshalUint64String(dst, uint64(n)) +} + +func marshalUint32String(dst []byte, n uint32) []byte { + return marshalUint64String(dst, uint64(n)) +} + +func marshalUint64String(dst []byte, n uint64) []byte { + return strconv.AppendUint(dst, n, 10) +} + +func marshalFloat64String(dst []byte, f float64) []byte { + return strconv.AppendFloat(dst, f, 'f', -1, 64) +} + +func marshalIPv4String(dst []byte, n uint32) []byte { + dst = marshalUint8String(dst, uint8(n>>24)) + dst = append(dst, '.') + dst = marshalUint8String(dst, uint8(n>>16)) + dst = append(dst, '.') + dst = marshalUint8String(dst, uint8(n>>8)) + dst = append(dst, '.') + dst = marshalUint8String(dst, uint8(n)) + return dst +} + +// marshalTimestampISO8601String appends ISO8601-formatted nsecs to dst and returns the result. +func marshalTimestampISO8601String(dst []byte, nsecs int64) []byte { + return time.Unix(0, nsecs).UTC().AppendFormat(dst, iso8601Timestamp) +} + +const iso8601Timestamp = "2006-01-02T15:04:05.000Z" + +// marshalTimestampRFC3339NanoString appends RFC3339Nano-formatted nsecs to dst and returns the result. +func marshalTimestampRFC3339NanoString(dst []byte, nsecs int64) []byte { + return time.Unix(0, nsecs).UTC().AppendFormat(dst, time.RFC3339Nano) +} diff --git a/lib/logstorage/values_encoder_test.go b/lib/logstorage/values_encoder_test.go index e5b73b3c5..f6d2d26da 100644 --- a/lib/logstorage/values_encoder_test.go +++ b/lib/logstorage/values_encoder_test.go @@ -4,6 +4,7 @@ import ( "fmt" "math" "reflect" + "strconv" "testing" ) @@ -96,7 +97,7 @@ func TestValuesEncoder(t *testing.T) { f(values, valueTypeTimestampISO8601, 1303184641000000000, 1303184641008000000) } -func TestTryParseIPv4_Success(t *testing.T) { +func TestTryParseIPv4String_Success(t *testing.T) { f := func(s string) { t.Helper() @@ -104,7 +105,7 @@ func TestTryParseIPv4_Success(t *testing.T) { if !ok { t.Fatalf("cannot parse %q", s) } - data := marshalIPv4(nil, n) + data := marshalIPv4String(nil, n) if string(data) != s { t.Fatalf("unexpected ip; got %q; want %q", data, s) } @@ -147,14 +148,14 @@ func TestTryParseIPv4_Failure(t *testing.T) { f("127.127.127.-1") } -func TestTryParseTimestampRFC3339Nano_Success(t *testing.T) { +func TestTryParseTimestampRFC3339NanoString_Success(t *testing.T) { f := func(s string) { t.Helper() nsecs, ok := tryParseTimestampRFC3339Nano(s) if !ok { t.Fatalf("cannot parse timestamp %q", s) } - data := marshalTimestampRFC3339Nano(nil, nsecs) + data := marshalTimestampRFC3339NanoString(nil, nsecs) if string(data) != s { t.Fatalf("unexpected timestamp; got %q; want %q", data, s) } @@ -236,14 +237,14 @@ func TestTryParseTimestampRFC3339Nano_Failure(t *testing.T) { f("2023-01-23T23:33:ssZ") } -func TestTryParseTimestampISO8601_Success(t *testing.T) { +func TestTryParseTimestampISO8601String_Success(t *testing.T) { f := func(s string) { t.Helper() nsecs, ok := tryParseTimestampISO8601(s) if !ok { t.Fatalf("cannot parse timestamp %q", s) } - data := marshalTimestampISO8601(nil, nsecs) + data := marshalTimestampISO8601String(nil, nsecs) if string(data) != s { t.Fatalf("unexpected timestamp; got %q; want %q", data, s) } @@ -554,11 +555,11 @@ func TestTryParseFloat64_Failure(t *testing.T) { f("12-5") } -func TestMarshalFloat64(t *testing.T) { +func TestMarshalFloat64String(t *testing.T) { f := func(f float64, resultExpected string) { t.Helper() - result := marshalFloat64(nil, f) + result := marshalFloat64String(nil, f) if string(result) != resultExpected { t.Fatalf("unexpected result; got %q; want %q", result, resultExpected) } @@ -618,11 +619,79 @@ func TestTryParseUint64_Failure(t *testing.T) { f("foo") } -func TestMarshalUint64(t *testing.T) { +func TestMarshalUint8String(t *testing.T) { + f := func(n uint8, resultExpected string) { + t.Helper() + + result := marshalUint8String(nil, n) + if string(result) != resultExpected { + t.Fatalf("unexpected result; got %q; want %q", result, resultExpected) + } + } + + for i := 0; i < 256; i++ { + resultExpected := strconv.Itoa(i) + f(uint8(i), resultExpected) + } + + // the maximum possible value + f(math.MaxUint8, "255") +} + +func TestMarshalUint16String(t *testing.T) { + f := func(n uint16, resultExpected string) { + t.Helper() + + result := marshalUint16String(nil, n) + if string(result) != resultExpected { + t.Fatalf("unexpected result; got %q; want %q", result, resultExpected) + } + } + + f(0, "0") + f(1, "1") + f(10, "10") + f(12, "12") + f(120, "120") + f(1203, "1203") + f(12345, "12345") + + // the maximum possible value + f(math.MaxUint16, "65535") +} + +func TestMarshalUint32String(t *testing.T) { + f := func(n uint32, resultExpected string) { + t.Helper() + + result := marshalUint32String(nil, n) + if string(result) != resultExpected { + t.Fatalf("unexpected result; got %q; want %q", result, resultExpected) + } + } + + f(0, "0") + f(1, "1") + f(10, "10") + f(12, "12") + f(120, "120") + f(1203, "1203") + f(12034, "12034") + f(123456, "123456") + f(1234567, "1234567") + f(12345678, "12345678") + f(123456789, "123456789") + f(1234567890, "1234567890") + + // the maximum possible value + f(math.MaxUint32, "4294967295") +} + +func TestMarshalUint64String(t *testing.T) { f := func(n uint64, resultExpected string) { t.Helper() - result := marshalUint64(nil, n) + result := marshalUint64String(nil, n) if string(result) != resultExpected { t.Fatalf("unexpected result; got %q; want %q", result, resultExpected) } @@ -632,8 +701,7 @@ func TestMarshalUint64(t *testing.T) { f(123456, "123456") // the maximum possible value - f(18446744073709551615, "18446744073709551615") - f(18_446_744_073_709_551_615, "18446744073709551615") + f(math.MaxUint64, "18446744073709551615") } func TestTryParseIPv4Mask_Success(t *testing.T) { diff --git a/lib/logstorage/values_encoder_timing_test.go b/lib/logstorage/values_encoder_timing_test.go index a86d91c23..d80e6f411 100644 --- a/lib/logstorage/values_encoder_timing_test.go +++ b/lib/logstorage/values_encoder_timing_test.go @@ -2,6 +2,7 @@ package logstorage import ( "fmt" + "sync/atomic" "testing" ) @@ -17,14 +18,17 @@ func BenchmarkTryParseTimestampRFC3339Nano(b *testing.B) { b.SetBytes(int64(len(a))) b.ReportAllocs() b.RunParallel(func(pb *testing.PB) { + nSum := int64(0) for pb.Next() { for _, s := range a { - _, ok := tryParseTimestampRFC3339Nano(s) + n, ok := tryParseTimestampRFC3339Nano(s) if !ok { panic(fmt.Errorf("cannot parse timestamp %q", s)) } + nSum += n } } + GlobalSink.Add(uint64(nSum)) }) } @@ -40,14 +44,17 @@ func BenchmarkTryParseTimestampISO8601(b *testing.B) { b.SetBytes(int64(len(a))) b.ReportAllocs() b.RunParallel(func(pb *testing.PB) { + nSum := int64(0) for pb.Next() { for _, s := range a { - _, ok := tryParseTimestampISO8601(s) + n, ok := tryParseTimestampISO8601(s) if !ok { panic(fmt.Errorf("cannot parse timestamp %q", s)) } + nSum += n } } + GlobalSink.Add(uint64(nSum)) }) } @@ -63,14 +70,17 @@ func BenchmarkTryParseIPv4(b *testing.B) { b.SetBytes(int64(len(a))) b.ReportAllocs() b.RunParallel(func(pb *testing.PB) { + nSum := uint32(0) for pb.Next() { for _, s := range a { - _, ok := tryParseIPv4(s) + n, ok := tryParseIPv4(s) if !ok { panic(fmt.Errorf("cannot parse ipv4 %q", s)) } + nSum += n } } + GlobalSink.Add(uint64(nSum)) }) } @@ -86,14 +96,17 @@ func BenchmarkTryParseUint64(b *testing.B) { b.SetBytes(int64(len(a))) b.ReportAllocs() b.RunParallel(func(pb *testing.PB) { + var nSum uint64 for pb.Next() { for _, s := range a { - _, ok := tryParseUint64(s) + n, ok := tryParseUint64(s) if !ok { panic(fmt.Errorf("cannot parse uint %q", s)) } + nSum += n } } + GlobalSink.Add(nSum) }) } @@ -109,13 +122,34 @@ func BenchmarkTryParseFloat64(b *testing.B) { b.SetBytes(int64(len(a))) b.ReportAllocs() b.RunParallel(func(pb *testing.PB) { + var fSum float64 for pb.Next() { for _, s := range a { - _, ok := tryParseFloat64(s) + f, ok := tryParseFloat64(s) if !ok { panic(fmt.Errorf("cannot parse float64 %q", s)) } + fSum += f } } + GlobalSink.Add(uint64(fSum)) }) } + +func BenchmarkMarshalUint8String(b *testing.B) { + b.SetBytes(256) + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + var buf []byte + n := 0 + for pb.Next() { + for i := 0; i < 256; i++ { + buf = marshalUint8String(buf[:0], uint8(i)) + n += len(buf) + } + } + GlobalSink.Add(uint64(n)) + }) +} + +var GlobalSink atomic.Uint64