diff --git a/lib/logstorage/arena.go b/lib/logstorage/arena.go index e11d2b4dd..6466a53d2 100644 --- a/lib/logstorage/arena.go +++ b/lib/logstorage/arena.go @@ -4,6 +4,7 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" ) func getArena() *arena { @@ -29,8 +30,12 @@ func (a *arena) reset() { a.b = a.b[:0] } +func (a *arena) preallocate(n int) { + a.b = slicesutil.ExtendCapacity(a.b, n) +} + func (a *arena) sizeBytes() int { - return len(a.b) + return cap(a.b) } func (a *arena) copyBytes(b []byte) []byte { @@ -41,9 +46,8 @@ func (a *arena) copyBytes(b []byte) []byte { ab := a.b abLen := len(ab) ab = append(ab, b...) - result := ab[abLen:] a.b = ab - return result + return ab[abLen:] } func (a *arena) copyBytesToString(b []byte) string { diff --git a/lib/logstorage/arena_test.go b/lib/logstorage/arena_test.go index 0e2072bb1..f27a020a7 100644 --- a/lib/logstorage/arena_test.go +++ b/lib/logstorage/arena_test.go @@ -35,7 +35,7 @@ func TestArena(t *testing.T) { } } - if n := a.sizeBytes(); n != valuesLen { + if n := a.sizeBytes(); n < valuesLen { t.Fatalf("unexpected arena size; got %d; want %d", n, valuesLen) } @@ -47,7 +47,7 @@ func TestArena(t *testing.T) { t.Fatalf("unexpected len(b); got %d; want %d", len(b), j) } valuesLen += j - if n := a.sizeBytes(); n != valuesLen { + if n := a.sizeBytes(); n < valuesLen { t.Fatalf("unexpected arena size; got %d; want %d", n, valuesLen) } for k := range b { diff --git a/lib/logstorage/bitmap.go b/lib/logstorage/bitmap.go index ad35f4e46..01f573b0f 100644 --- a/lib/logstorage/bitmap.go +++ b/lib/logstorage/bitmap.go @@ -3,6 +3,7 @@ package logstorage import ( "math/bits" "sync" + "unsafe" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" @@ -37,6 +38,10 @@ func (bm *bitmap) reset() { bm.bitsLen = 0 } +func (bm *bitmap) sizeBytes() int { + return int(unsafe.Sizeof(*bm)) + cap(bm.a)*int(unsafe.Sizeof(bm.a[0])) +} + func (bm *bitmap) copyFrom(src *bitmap) { bm.reset() @@ -149,7 +154,8 @@ func (bm *bitmap) forEachSetBit(f func(idx int) bool) { // forEachSetBitReadonly calls f for each set bit func (bm *bitmap) forEachSetBitReadonly(f func(idx int)) { if bm.areAllBitsSet() { - for i := range bm.bitsLen { + n := bm.bitsLen + for i := 0; i < n; i++ { f(i) } return diff --git a/lib/logstorage/bitmap_timing_test.go b/lib/logstorage/bitmap_timing_test.go index 0430e1aaf..1665d9050 100644 --- a/lib/logstorage/bitmap_timing_test.go +++ b/lib/logstorage/bitmap_timing_test.go @@ -5,7 +5,7 @@ import ( ) func BenchmarkBitmapForEachSetBitReadonly(b *testing.B) { - const bitsLen = 64*1024 + const bitsLen = 64 * 1024 b.Run("no-zero-bits", func(b *testing.B) { bm := getBitmap(bitsLen) @@ -34,7 +34,7 @@ func BenchmarkBitmapForEachSetBitReadonly(b *testing.B) { } func BenchmarkBitmapForEachSetBit(b *testing.B) { - const bitsLen = 64*1024 + const bitsLen = 64 * 1024 b.Run("no-zero-bits-noclear", func(b *testing.B) { bm := getBitmap(bitsLen) diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index d14b2aba1..1c16b85a5 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -19,8 +19,14 @@ import ( // // It is expected that its contents is accessed only from a single goroutine at a time. type blockResult struct { - // buf holds all the bytes behind the requested column values in the block. - buf []byte + // bs is the blockSearch used for fetching block data. + bs *blockSearch + + // bm is bitamp for fetching the needed rows from bs. + bm *bitmap + + // a holds all the bytes behind the requested column values in the block. + a arena // values holds all the requested column values in the block. valuesBuf []string @@ -48,7 +54,10 @@ type blockResult struct { } func (br *blockResult) reset() { - br.buf = br.buf[:0] + br.bs = nil + br.bm = nil + + br.a.reset() clear(br.valuesBuf) br.valuesBuf = br.valuesBuf[:0] @@ -72,13 +81,18 @@ func (br *blockResult) reset() { func (br *blockResult) clone() *blockResult { brNew := &blockResult{} + brNew.bs = br.bs + + brNew.bm = getBitmap(br.bm.bitsLen) + brNew.bm.copyFrom(br.bm) + cs := br.getColumns() bufLen := 0 for _, c := range cs { bufLen += c.neededBackingBufLen() } - brNew.buf = make([]byte, 0, bufLen) + brNew.a.preallocate(bufLen) valuesBufLen := 0 for _, c := range cs { @@ -102,37 +116,29 @@ func (br *blockResult) clone() *blockResult { // cloneValues clones the given values into br and returns the cloned values. func (br *blockResult) cloneValues(values []string) []string { - buf := br.buf - valuesBuf := br.valuesBuf - valuesBufLen := len(valuesBuf) - + valuesBufLen := len(br.valuesBuf) for _, v := range values { - if len(valuesBuf) > 0 && v == valuesBuf[len(valuesBuf)-1] { - valuesBuf = append(valuesBuf, valuesBuf[len(valuesBuf)-1]) - } else { - bufLen := len(buf) - buf = append(buf, v...) - valuesBuf = append(valuesBuf, bytesutil.ToUnsafeString(buf[bufLen:])) - } + br.addValue(v) } - - br.valuesBuf = valuesBuf - br.buf = buf - - return valuesBuf[valuesBufLen:] + return br.valuesBuf[valuesBufLen:] } -func (br *blockResult) copyString(s string) string { - bufLen := len(br.buf) - br.buf = append(br.buf, s...) - return bytesutil.ToUnsafeString(br.buf[bufLen:]) +func (br *blockResult) addValue(v string) { + valuesBuf := br.valuesBuf + if len(valuesBuf) > 0 && v == valuesBuf[len(valuesBuf)-1] { + v = valuesBuf[len(valuesBuf)-1] + } else { + v = br.a.copyString(v) + } + br.valuesBuf = append(br.valuesBuf, v) } // sizeBytes returns the size of br in bytes. func (br *blockResult) sizeBytes() int { n := int(unsafe.Sizeof(*br)) - n += cap(br.buf) + n += br.bm.sizeBytes() + n += br.a.sizeBytes() n += cap(br.valuesBuf) * int(unsafe.Sizeof(br.valuesBuf[0])) n += cap(br.timestamps) * int(unsafe.Sizeof(br.timestamps[0])) n += cap(br.csBuf) * int(unsafe.Sizeof(br.csBuf[0])) @@ -157,15 +163,15 @@ func (br *blockResult) setResultColumns(rcs []resultColumn) { if areConstValues(rc.values) { // This optimization allows reducing memory usage after br cloning csBuf = append(csBuf, blockResultColumn{ - name: br.copyString(rc.name), + name: br.a.copyString(rc.name), isConst: true, - encodedValues: rc.values[:1], + valuesEncoded: rc.values[:1], }) } else { csBuf = append(csBuf, blockResultColumn{ - name: br.copyString(rc.name), + name: br.a.copyString(rc.name), valueType: valueTypeString, - encodedValues: rc.values, + valuesEncoded: rc.values, }) } } @@ -173,7 +179,9 @@ func (br *blockResult) setResultColumns(rcs []resultColumn) { br.csInitialized = false } -func (br *blockResult) fetchAllColumns(bs *blockSearch, bm *bitmap) { +func (br *blockResult) initAllColumns() { + bs := br.bs + unneededColumnNames := bs.bsw.so.unneededColumnNames if !slices.Contains(unneededColumnNames, "_time") { @@ -196,7 +204,7 @@ func (br *blockResult) fetchAllColumns(bs *blockSearch, bm *bitmap) { if v != "" { br.addConstColumn("_msg", v) } else if ch := bs.csh.getColumnHeader("_msg"); ch != nil { - br.addColumn(bs, ch, bm) + br.addColumn(ch) } else { br.addConstColumn("_msg", "") } @@ -220,12 +228,14 @@ func (br *blockResult) fetchAllColumns(bs *blockSearch, bm *bitmap) { continue } if !slices.Contains(unneededColumnNames, ch.name) { - br.addColumn(bs, ch, bm) + br.addColumn(ch) } } } -func (br *blockResult) fetchRequestedColumns(bs *blockSearch, bm *bitmap) { +func (br *blockResult) initRequestedColumns() { + bs := br.bs + for _, columnName := range bs.bsw.so.neededColumnNames { switch columnName { case "_stream": @@ -241,7 +251,7 @@ func (br *blockResult) fetchRequestedColumns(bs *blockSearch, bm *bitmap) { if v != "" { br.addConstColumn(columnName, v) } else if ch := bs.csh.getColumnHeader(columnName); ch != nil { - br.addColumn(bs, ch, bm) + br.addColumn(ch) } else { br.addConstColumn(columnName, "") } @@ -252,6 +262,8 @@ func (br *blockResult) fetchRequestedColumns(bs *blockSearch, bm *bitmap) { func (br *blockResult) mustInit(bs *blockSearch, bm *bitmap) { br.reset() + br.bs = bs + br.bm = bm br.streamID = bs.bsw.bh.streamID if bm.isZero() { @@ -285,125 +297,104 @@ func (br *blockResult) mustInit(bs *blockSearch, bm *bitmap) { br.timestamps = dstTimestamps } -func (br *blockResult) addColumn(bs *blockSearch, ch *columnHeader, bm *bitmap) { - buf := br.buf - valuesBuf := br.valuesBuf - valuesBufLen := len(valuesBuf) - var dictValues []string - - appendValue := func(v string) { - if len(valuesBuf) > 0 && v == valuesBuf[len(valuesBuf)-1] { - valuesBuf = append(valuesBuf, valuesBuf[len(valuesBuf)-1]) - } else { - bufLen := len(buf) - buf = append(buf, v...) - valuesBuf = append(valuesBuf, bytesutil.ToUnsafeString(buf[bufLen:])) - } +func (br *blockResult) newValuesEncodedForColumn(c *blockResultColumn) []string { + if c.isConst { + logger.Panicf("BUG: newValuesEncodedForColumn() musn't be called for const column") + } + if c.isTime { + logger.Panicf("BUG: newValuesEncodedForColumn() musn't be called for time column") } - switch ch.valueType { + valuesBufLen := len(br.valuesBuf) + + bs := br.bs + bm := br.bm + ch := &c.ch + + switch c.valueType { case valueTypeString: - visitValues(bs, ch, bm, func(v string) bool { - appendValue(v) - return true - }) + visitValuesReadonly(bs, ch, bm, br.addValue) case valueTypeDict: - dictValues = ch.valuesDict.values - visitValues(bs, ch, bm, func(v string) bool { + visitValuesReadonly(bs, ch, bm, func(v string) { if len(v) != 1 { logger.Panicf("FATAL: %s: unexpected dict value size for column %q; got %d bytes; want 1 byte", bs.partPath(), ch.name, len(v)) } dictIdx := v[0] - if int(dictIdx) >= len(dictValues) { - logger.Panicf("FATAL: %s: too big dict index for column %q: %d; should be smaller than %d", bs.partPath(), ch.name, dictIdx, len(dictValues)) + if int(dictIdx) >= len(c.dictValues) { + logger.Panicf("FATAL: %s: too big dict index for column %q: %d; should be smaller than %d", bs.partPath(), ch.name, dictIdx, len(c.dictValues)) } - appendValue(v) - return true + br.addValue(v) }) case valueTypeUint8: - visitValues(bs, ch, bm, func(v string) bool { + visitValuesReadonly(bs, ch, bm, func(v string) { if len(v) != 1 { logger.Panicf("FATAL: %s: unexpected size for uint8 column %q; got %d bytes; want 1 byte", bs.partPath(), ch.name, len(v)) } - appendValue(v) - return true + br.addValue(v) }) case valueTypeUint16: - visitValues(bs, ch, bm, func(v string) bool { + visitValuesReadonly(bs, ch, bm, func(v string) { if len(v) != 2 { logger.Panicf("FATAL: %s: unexpected size for uint16 column %q; got %d bytes; want 2 bytes", bs.partPath(), ch.name, len(v)) } - appendValue(v) - return true + br.addValue(v) }) case valueTypeUint32: - visitValues(bs, ch, bm, func(v string) bool { + visitValuesReadonly(bs, ch, bm, func(v string) { if len(v) != 4 { logger.Panicf("FATAL: %s: unexpected size for uint32 column %q; got %d bytes; want 4 bytes", bs.partPath(), ch.name, len(v)) } - appendValue(v) - return true + br.addValue(v) }) case valueTypeUint64: - visitValues(bs, ch, bm, func(v string) bool { + visitValuesReadonly(bs, ch, bm, func(v string) { if len(v) != 8 { logger.Panicf("FATAL: %s: unexpected size for uint64 column %q; got %d bytes; want 8 bytes", bs.partPath(), ch.name, len(v)) } - appendValue(v) - return true + br.addValue(v) }) case valueTypeFloat64: - visitValues(bs, ch, bm, func(v string) bool { + visitValuesReadonly(bs, ch, bm, func(v string) { if len(v) != 8 { logger.Panicf("FATAL: %s: unexpected size for float64 column %q; got %d bytes; want 8 bytes", bs.partPath(), ch.name, len(v)) } - appendValue(v) - return true + br.addValue(v) }) case valueTypeIPv4: - visitValues(bs, ch, bm, func(v string) bool { + visitValuesReadonly(bs, ch, bm, func(v string) { if len(v) != 4 { logger.Panicf("FATAL: %s: unexpected size for ipv4 column %q; got %d bytes; want 4 bytes", bs.partPath(), ch.name, len(v)) } - appendValue(v) - return true + br.addValue(v) }) case valueTypeTimestampISO8601: - visitValues(bs, ch, bm, func(v string) bool { + visitValuesReadonly(bs, ch, bm, func(v string) { if len(v) != 8 { logger.Panicf("FATAL: %s: unexpected size for timestmap column %q; got %d bytes; want 8 bytes", bs.partPath(), ch.name, len(v)) } - appendValue(v) - return true + br.addValue(v) }) default: logger.Panicf("FATAL: %s: unknown valueType=%d for column %q", bs.partPath(), ch.valueType, ch.name) } - encodedValues := valuesBuf[valuesBufLen:] - - valuesBufLen = len(valuesBuf) - for _, v := range dictValues { - appendValue(v) - } - dictValues = valuesBuf[valuesBufLen:] - - // copy ch.name to buf - bufLen := len(buf) - buf = append(buf, ch.name...) - name := bytesutil.ToUnsafeString(buf[bufLen:]) + return br.valuesBuf[valuesBufLen:] +} +// addColumn adds column for the given ch to br. +// +// The added column is valid until ch is changed. +func (br *blockResult) addColumn(ch *columnHeader) { br.csBuf = append(br.csBuf, blockResultColumn{ - name: getCanonicalColumnName(name), - minValue: ch.minValue, - maxValue: ch.maxValue, - valueType: ch.valueType, - dictValues: dictValues, - encodedValues: encodedValues, + name: getCanonicalColumnName(ch.name), + ch: *ch, + valueType: ch.valueType, + dictValues: ch.valuesDict.values, }) br.csInitialized = false - br.buf = buf - br.valuesBuf = valuesBuf + + c := &br.csBuf[len(br.csBuf)-1] + c.ch.valuesDict.values = nil } func (br *blockResult) addTimeColumn() { @@ -438,49 +429,50 @@ func (br *blockResult) addStreamColumn(bs *blockSearch) bool { } func (br *blockResult) addConstColumn(name, value string) { - value = br.copyString(value) + nameCopy := br.a.copyString(name) - valuesBuf := br.valuesBuf - valuesBufLen := len(valuesBuf) - valuesBuf = append(valuesBuf, value) - br.valuesBuf = valuesBuf - encodedValues := valuesBuf[valuesBufLen:] + valuesBufLen := len(br.valuesBuf) + br.addValue(value) + valuesEncoded := br.valuesBuf[valuesBufLen:] br.csBuf = append(br.csBuf, blockResultColumn{ - name: br.copyString(name), + name: nameCopy, isConst: true, - encodedValues: encodedValues, + valuesEncoded: valuesEncoded, }) br.csInitialized = false } -func (br *blockResult) getBucketedColumnValues(c *blockResultColumn, bf *byStatsField) []string { +func (br *blockResult) newValuesBucketedForColumn(c *blockResultColumn, bf *byStatsField) []string { if c.isConst { - return br.getBucketedConstValues(c.encodedValues[0], bf) + v := c.valuesEncoded[0] + return br.getBucketedConstValues(v, bf) } if c.isTime { return br.getBucketedTimestampValues(bf) } + valuesEncoded := c.getValuesEncoded(br) + switch c.valueType { case valueTypeString: - return br.getBucketedStringValues(c.encodedValues, bf) + return br.getBucketedStringValues(valuesEncoded, bf) case valueTypeDict: - return br.getBucketedDictValues(c.encodedValues, c.dictValues, bf) + return br.getBucketedDictValues(valuesEncoded, c.dictValues, bf) case valueTypeUint8: - return br.getBucketedUint8Values(c.encodedValues, bf) + return br.getBucketedUint8Values(valuesEncoded, bf) case valueTypeUint16: - return br.getBucketedUint16Values(c.encodedValues, bf) + return br.getBucketedUint16Values(valuesEncoded, bf) case valueTypeUint32: - return br.getBucketedUint32Values(c.encodedValues, bf) + return br.getBucketedUint32Values(valuesEncoded, bf) case valueTypeUint64: - return br.getBucketedUint64Values(c.encodedValues, bf) + return br.getBucketedUint64Values(valuesEncoded, bf) case valueTypeFloat64: - return br.getBucketedFloat64Values(c.encodedValues, bf) + return br.getBucketedFloat64Values(valuesEncoded, bf) case valueTypeIPv4: - return br.getBucketedIPv4Values(c.encodedValues, bf) + return br.getBucketedIPv4Values(valuesEncoded, bf) case valueTypeTimestampISO8601: - return br.getBucketedTimestampISO8601Values(c.encodedValues, bf) + return br.getBucketedTimestampISO8601Values(valuesEncoded, bf) default: logger.Panicf("BUG: unknown valueType=%d", c.valueType) return nil @@ -509,7 +501,7 @@ func (br *blockResult) getBucketedConstValues(v string, bf *byStatsField) []stri } func (br *blockResult) getBucketedTimestampValues(bf *byStatsField) []string { - buf := br.buf + buf := br.a.b valuesBuf := br.valuesBuf valuesBufLen := len(valuesBuf) @@ -566,7 +558,7 @@ func (br *blockResult) getBucketedTimestampValues(bf *byStatsField) []string { } } - br.buf = buf + br.a.b = buf br.valuesBuf = valuesBuf return valuesBuf[valuesBufLen:] @@ -596,12 +588,12 @@ func (br *blockResult) getBucketedStringValues(values []string, bf *byStatsField return valuesBuf[valuesBufLen:] } -func (br *blockResult) getBucketedDictValues(encodedValues, dictValues []string, bf *byStatsField) []string { +func (br *blockResult) getBucketedDictValues(valuesEncoded, dictValues []string, bf *byStatsField) []string { valuesBuf := br.valuesBuf valuesBufLen := len(valuesBuf) dictValues = br.getBucketedStringValues(dictValues, bf) - for _, v := range encodedValues { + for _, v := range valuesEncoded { dictIdx := v[0] valuesBuf = append(valuesBuf, dictValues[dictIdx]) } @@ -611,16 +603,16 @@ func (br *blockResult) getBucketedDictValues(encodedValues, dictValues []string, return valuesBuf[valuesBufLen:] } -func (br *blockResult) getBucketedUint8Values(encodedValues []string, bf *byStatsField) []string { - buf := br.buf +func (br *blockResult) getBucketedUint8Values(valuesEncoded []string, bf *byStatsField) []string { + buf := br.a.b valuesBuf := br.valuesBuf valuesBufLen := len(valuesBuf) var s string if !bf.hasBucketConfig() { - for i, v := range encodedValues { - if i > 0 && encodedValues[i-1] == encodedValues[i] { + for i, v := range valuesEncoded { + if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] { valuesBuf = append(valuesBuf, s) continue } @@ -639,8 +631,8 @@ func (br *blockResult) getBucketedUint8Values(encodedValues []string, bf *byStat bucketOffsetInt := uint64(int64(bf.bucketOffset)) nPrev := uint64(0) - for i, v := range encodedValues { - if i > 0 && encodedValues[i-1] == encodedValues[i] { + for i, v := range valuesEncoded { + if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] { valuesBuf = append(valuesBuf, s) continue } @@ -664,21 +656,21 @@ func (br *blockResult) getBucketedUint8Values(encodedValues []string, bf *byStat } br.valuesBuf = valuesBuf - br.buf = buf + br.a.b = buf return br.valuesBuf[valuesBufLen:] } -func (br *blockResult) getBucketedUint16Values(encodedValues []string, bf *byStatsField) []string { - buf := br.buf +func (br *blockResult) getBucketedUint16Values(valuesEncoded []string, bf *byStatsField) []string { + buf := br.a.b valuesBuf := br.valuesBuf valuesBufLen := len(valuesBuf) var s string if !bf.hasBucketConfig() { - for i, v := range encodedValues { - if i > 0 && encodedValues[i-1] == encodedValues[i] { + for i, v := range valuesEncoded { + if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] { valuesBuf = append(valuesBuf, s) continue } @@ -697,8 +689,8 @@ func (br *blockResult) getBucketedUint16Values(encodedValues []string, bf *bySta bucketOffsetInt := uint64(int64(bf.bucketOffset)) nPrev := uint64(0) - for i, v := range encodedValues { - if i > 0 && encodedValues[i-1] == encodedValues[i] { + for i, v := range valuesEncoded { + if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] { valuesBuf = append(valuesBuf, s) continue } @@ -722,21 +714,21 @@ func (br *blockResult) getBucketedUint16Values(encodedValues []string, bf *bySta } br.valuesBuf = valuesBuf - br.buf = buf + br.a.b = buf return br.valuesBuf[valuesBufLen:] } -func (br *blockResult) getBucketedUint32Values(encodedValues []string, bf *byStatsField) []string { - buf := br.buf +func (br *blockResult) getBucketedUint32Values(valuesEncoded []string, bf *byStatsField) []string { + buf := br.a.b valuesBuf := br.valuesBuf valuesBufLen := len(valuesBuf) var s string if !bf.hasBucketConfig() { - for i, v := range encodedValues { - if i > 0 && encodedValues[i-1] == encodedValues[i] { + for i, v := range valuesEncoded { + if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] { valuesBuf = append(valuesBuf, s) continue } @@ -755,8 +747,8 @@ func (br *blockResult) getBucketedUint32Values(encodedValues []string, bf *bySta bucketOffsetInt := uint64(int64(bf.bucketOffset)) nPrev := uint64(0) - for i, v := range encodedValues { - if i > 0 && encodedValues[i-1] == encodedValues[i] { + for i, v := range valuesEncoded { + if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] { valuesBuf = append(valuesBuf, s) continue } @@ -780,21 +772,21 @@ func (br *blockResult) getBucketedUint32Values(encodedValues []string, bf *bySta } br.valuesBuf = valuesBuf - br.buf = buf + br.a.b = buf return br.valuesBuf[valuesBufLen:] } -func (br *blockResult) getBucketedUint64Values(encodedValues []string, bf *byStatsField) []string { - buf := br.buf +func (br *blockResult) getBucketedUint64Values(valuesEncoded []string, bf *byStatsField) []string { + buf := br.a.b valuesBuf := br.valuesBuf valuesBufLen := len(valuesBuf) var s string if !bf.hasBucketConfig() { - for i, v := range encodedValues { - if i > 0 && encodedValues[i-1] == encodedValues[i] { + for i, v := range valuesEncoded { + if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] { valuesBuf = append(valuesBuf, s) continue } @@ -813,8 +805,8 @@ func (br *blockResult) getBucketedUint64Values(encodedValues []string, bf *bySta bucketOffsetInt := uint64(int64(bf.bucketOffset)) nPrev := uint64(0) - for i, v := range encodedValues { - if i > 0 && encodedValues[i-1] == encodedValues[i] { + for i, v := range valuesEncoded { + if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] { valuesBuf = append(valuesBuf, s) continue } @@ -838,21 +830,21 @@ func (br *blockResult) getBucketedUint64Values(encodedValues []string, bf *bySta } br.valuesBuf = valuesBuf - br.buf = buf + br.a.b = buf return br.valuesBuf[valuesBufLen:] } -func (br *blockResult) getBucketedFloat64Values(encodedValues []string, bf *byStatsField) []string { - buf := br.buf +func (br *blockResult) getBucketedFloat64Values(valuesEncoded []string, bf *byStatsField) []string { + buf := br.a.b valuesBuf := br.valuesBuf valuesBufLen := len(valuesBuf) var s string if !bf.hasBucketConfig() { - for i, v := range encodedValues { - if i > 0 && encodedValues[i-1] == encodedValues[i] { + for i, v := range valuesEncoded { + if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] { valuesBuf = append(valuesBuf, s) continue } @@ -875,8 +867,8 @@ func (br *blockResult) getBucketedFloat64Values(encodedValues []string, bf *bySt bucketSizeP10 := int64(bucketSize * p10) fPrev := float64(0) - for i, v := range encodedValues { - if i > 0 && encodedValues[i-1] == encodedValues[i] { + for i, v := range valuesEncoded { + if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] { valuesBuf = append(valuesBuf, s) continue } @@ -906,21 +898,21 @@ func (br *blockResult) getBucketedFloat64Values(encodedValues []string, bf *bySt } br.valuesBuf = valuesBuf - br.buf = buf + br.a.b = buf return br.valuesBuf[valuesBufLen:] } -func (br *blockResult) getBucketedIPv4Values(encodedValues []string, bf *byStatsField) []string { - buf := br.buf +func (br *blockResult) getBucketedIPv4Values(valuesEncoded []string, bf *byStatsField) []string { + buf := br.a.b valuesBuf := br.valuesBuf valuesBufLen := len(valuesBuf) var s string if !bf.hasBucketConfig() { - for i, v := range encodedValues { - if i > 0 && encodedValues[i-1] == encodedValues[i] { + for i, v := range valuesEncoded { + if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] { valuesBuf = append(valuesBuf, s) continue } @@ -939,8 +931,8 @@ func (br *blockResult) getBucketedIPv4Values(encodedValues []string, bf *byStats bucketOffsetInt := uint32(int32(bf.bucketOffset)) nPrev := uint32(0) - for i, v := range encodedValues { - if i > 0 && encodedValues[i-1] == encodedValues[i] { + for i, v := range valuesEncoded { + if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] { valuesBuf = append(valuesBuf, s) continue } @@ -964,21 +956,21 @@ func (br *blockResult) getBucketedIPv4Values(encodedValues []string, bf *byStats } br.valuesBuf = valuesBuf - br.buf = buf + br.a.b = buf return valuesBuf[valuesBufLen:] } -func (br *blockResult) getBucketedTimestampISO8601Values(encodedValues []string, bf *byStatsField) []string { - buf := br.buf +func (br *blockResult) getBucketedTimestampISO8601Values(valuesEncoded []string, bf *byStatsField) []string { + buf := br.a.b valuesBuf := br.valuesBuf valuesBufLen := len(valuesBuf) var s string if !bf.hasBucketConfig() { - for i, v := range encodedValues { - if i > 0 && encodedValues[i-1] == encodedValues[i] { + for i, v := range valuesEncoded { + if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] { valuesBuf = append(valuesBuf, s) continue } @@ -999,8 +991,8 @@ func (br *blockResult) getBucketedTimestampISO8601Values(encodedValues []string, timestampPrev := int64(0) bb := bbPool.Get() - for i, v := range encodedValues { - if i > 0 && encodedValues[i-1] == encodedValues[i] { + for i, v := range valuesEncoded { + if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] { valuesBuf = append(valuesBuf, s) continue } @@ -1032,7 +1024,7 @@ func (br *blockResult) getBucketedTimestampISO8601Values(encodedValues []string, } br.valuesBuf = valuesBuf - br.buf = buf + br.a.b = buf return valuesBuf[valuesBufLen:] } @@ -1069,9 +1061,11 @@ func (br *blockResult) getBucketedValue(s string, bf *byStatsField) string { f += bf.bucketOffset - bufLen := len(br.buf) - br.buf = marshalFloat64String(br.buf, f) - return bytesutil.ToUnsafeString(br.buf[bufLen:]) + buf := br.a.b + bufLen := len(buf) + buf = marshalFloat64String(buf, f) + br.a.b = buf + return bytesutil.ToUnsafeString(buf[bufLen:]) } if timestamp, ok := tryParseTimestampISO8601(s); ok { @@ -1091,9 +1085,11 @@ func (br *blockResult) getBucketedValue(s string, bf *byStatsField) string { } timestamp += bucketOffset - bufLen := len(br.buf) - br.buf = marshalTimestampISO8601String(br.buf, timestamp) - return bytesutil.ToUnsafeString(br.buf[bufLen:]) + buf := br.a.b + bufLen := len(buf) + buf = marshalTimestampISO8601String(buf, timestamp) + br.a.b = buf + return bytesutil.ToUnsafeString(buf[bufLen:]) } if timestamp, ok := tryParseTimestampRFC3339Nano(s); ok { @@ -1113,9 +1109,11 @@ func (br *blockResult) getBucketedValue(s string, bf *byStatsField) string { } timestamp += bucketOffset - bufLen := len(br.buf) - br.buf = marshalTimestampRFC3339NanoString(br.buf, timestamp) - return bytesutil.ToUnsafeString(br.buf[bufLen:]) + buf := br.a.b + bufLen := len(buf) + buf = marshalTimestampRFC3339NanoString(buf, timestamp) + br.a.b = buf + return bytesutil.ToUnsafeString(buf[bufLen:]) } if n, ok := tryParseIPv4(s); ok { @@ -1129,9 +1127,11 @@ func (br *blockResult) getBucketedValue(s string, bf *byStatsField) string { n -= n % bucketSizeInt n += bucketOffset - bufLen := len(br.buf) - br.buf = marshalIPv4String(br.buf, n) - return bytesutil.ToUnsafeString(br.buf[bufLen:]) + buf := br.a.b + bufLen := len(buf) + buf = marshalIPv4String(buf, n) + br.a.b = buf + return bytesutil.ToUnsafeString(buf[bufLen:]) } if nsecs, ok := tryParseDuration(s); ok { @@ -1145,9 +1145,11 @@ func (br *blockResult) getBucketedValue(s string, bf *byStatsField) string { nsecs -= nsecs % bucketSizeInt nsecs += bucketOffset - bufLen := len(br.buf) - br.buf = marshalDuration(br.buf, nsecs) - return bytesutil.ToUnsafeString(br.buf[bufLen:]) + buf := br.a.b + bufLen := len(buf) + buf = marshalDuration(buf, nsecs) + br.a.b = buf + return bytesutil.ToUnsafeString(buf[bufLen:]) } // Couldn't parse s, so return it as is. @@ -1305,8 +1307,11 @@ func (br *blockResult) skipRows(skipRows int) { if c.isConst { continue } - if c.encodedValues != nil { - c.encodedValues = append(c.encodedValues[:0], c.encodedValues[skipRows:]...) + if c.valuesEncoded != nil { + c.valuesEncoded = append(c.valuesEncoded[:0], c.valuesEncoded[skipRows:]...) + } + if c.valuesBucketed != nil { + c.valuesBucketed = append(c.valuesBucketed[:0], c.valuesBucketed[skipRows:]...) } } } @@ -1320,8 +1325,11 @@ func (br *blockResult) truncateRows(keepRows int) { if c.isConst { continue } - if c.encodedValues != nil { - c.encodedValues = c.encodedValues[:keepRows] + if c.valuesEncoded != nil { + c.valuesEncoded = c.valuesEncoded[:keepRows] + } + if c.valuesBucketed != nil { + c.valuesBucketed = append(c.valuesBucketed[:0], c.valuesBucketed[keepRows:]...) } } } @@ -1334,55 +1342,65 @@ type blockResultColumn struct { // name is column name. name string - // minValue is the minimum value in the block for uint*, float64, ipv4 and timestamp valueType - minValue uint64 - - // maxValue is the maximum value in the block for uint*, float64, ipv4 and timestamp valueType - maxValue uint64 + // ch is is used for initializing valuesEncoded for non-time and non-const columns. + // + // ch.valuesDict.values must be set to nil, since dict values for valueTypeDict are stored at dictValues. + ch columnHeader // isConst is set to true if the column is const. // - // The column value is stored in encodedValues[0] + // The column value is stored in valuesEncoded[0] isConst bool // isTime is set to true if the column contains _time values. // - // The column values are stored in blockResult.timestamps + // The column values are stored in blockResult.timestamps, while valuesEncoded is nil. isTime bool // valueType is the type of non-cost value valueType valueType - // dictValues contains dictionary values for valueTypeDict column + // dictValues contains dict values for valueType=valueTypeDict. dictValues []string - // encodedValues contains encoded values for non-const column - encodedValues []string + // valuesEncoded contains encoded values for non-const and non-time column after getValuesEncoded() call + valuesEncoded []string // values contains decoded values after getValues() call values []string - // bucketedValues contains values after getBucketedValues() call - bucketedValues []string + // valuesBucketed contains values after getValuesBucketed() call + valuesBucketed []string - // bucketSizeStr contains bucketSizeStr for bucketedValues + // bucketSizeStr contains bucketSizeStr for valuesBucketed bucketSizeStr string - // bucketOffsetStr contains bucketOffset for bucketedValues + // bucketOffsetStr contains bucketOffset for valuesBucketed bucketOffsetStr string } // clone returns a clone of c backed by data from br. +// +// the clone is valid until br is reset. func (c *blockResultColumn) clone(br *blockResult) blockResultColumn { var cNew blockResultColumn - cNew.name = br.copyString(c.name) + cNew.name = br.a.copyString(c.name) + + cNew.ch = c.ch + cNew.ch.valuesDict.values = nil + cNew.isConst = c.isConst cNew.isTime = c.isTime cNew.valueType = c.valueType + cNew.dictValues = br.cloneValues(c.dictValues) - cNew.encodedValues = br.cloneValues(c.encodedValues) - // do not copy c.values and c.bucketedValues - they should be re-created from scrach if needed + cNew.valuesEncoded = br.cloneValues(c.valuesEncoded) + if c.valueType != valueTypeString { + cNew.values = br.cloneValues(c.values) + } + cNew.valuesBucketed = br.cloneValues(c.valuesBucketed) + cNew.bucketSizeStr = c.bucketSizeStr cNew.bucketOffsetStr = c.bucketOffsetStr @@ -1390,22 +1408,24 @@ func (c *blockResultColumn) clone(br *blockResult) blockResultColumn { } func (c *blockResultColumn) neededBackingBufLen() int { - n := 0 - + n := len(c.name) n += valuesSizeBytes(c.dictValues) - n += valuesSizeBytes(c.encodedValues) - // do not take into account c.values and c.bucketedValues, since they should be re-created from scratch if needed - + n += valuesSizeBytes(c.valuesEncoded) + if c.valueType != valueTypeString { + n += valuesSizeBytes(c.values) + } + n += valuesSizeBytes(c.valuesBucketed) return n } func (c *blockResultColumn) neededBackingValuesBufLen() int { n := 0 - n += len(c.dictValues) - n += len(c.encodedValues) - // do not take into account c.values and c.bucketedValues, since they should be re-created from scratch if needed - + n += len(c.valuesEncoded) + if c.valueType != valueTypeString { + n += len(c.values) + } + n += len(c.valuesBucketed) return n } @@ -1423,7 +1443,7 @@ func valuesSizeBytes(values []string) int { func (c *blockResultColumn) getValueAtRow(br *blockResult, rowIdx int) string { if c.isConst { // Fast path for const column. - return c.encodedValues[0] + return c.valuesEncoded[0] } if c.values != nil { // Fast path, which avoids call overhead for getValues(). @@ -1438,18 +1458,18 @@ func (c *blockResultColumn) getValueAtRow(br *blockResult, rowIdx int) string { // getValues returns values for the given column, bucketed according to bf. // // The returned values are valid until br.reset() is called. -func (c *blockResultColumn) getBucketedValues(br *blockResult, bf *byStatsField) []string { +func (c *blockResultColumn) getValuesBucketed(br *blockResult, bf *byStatsField) []string { if !bf.hasBucketConfig() { return c.getValues(br) } - if values := c.bucketedValues; values != nil && c.bucketSizeStr == bf.bucketSizeStr && c.bucketOffsetStr == bf.bucketOffsetStr { + if values := c.valuesBucketed; values != nil && c.bucketSizeStr == bf.bucketSizeStr && c.bucketOffsetStr == bf.bucketOffsetStr { return values } - c.bucketedValues = br.getBucketedColumnValues(c, bf) + c.valuesBucketed = br.newValuesBucketedForColumn(c, bf) c.bucketSizeStr = bf.bucketSizeStr c.bucketOffsetStr = bf.bucketOffsetStr - return c.bucketedValues + return c.valuesBucketed } // getValues returns values for the given column. @@ -1460,41 +1480,58 @@ func (c *blockResultColumn) getValues(br *blockResult) []string { return values } - c.values = br.getBucketedColumnValues(c, zeroByStatsField) + c.values = br.newValuesBucketedForColumn(c, zeroByStatsField) return c.values } -func (c *blockResultColumn) getFloatValueAtRow(rowIdx int) (float64, bool) { +// getValuesEncoded returns encoded values for the given column. +// +// The returned values are valid until br.reset() is called. +func (c *blockResultColumn) getValuesEncoded(br *blockResult) []string { + if c.isTime { + return nil + } + if values := c.valuesEncoded; values != nil { + return values + } + + c.valuesEncoded = br.newValuesEncodedForColumn(c) + return c.valuesEncoded +} + +func (c *blockResultColumn) getFloatValueAtRow(br *blockResult, rowIdx int) (float64, bool) { if c.isConst { - v := c.encodedValues[0] + v := c.valuesEncoded[0] return tryParseFloat64(v) } if c.isTime { return 0, false } + valuesEncoded := c.getValuesEncoded(br) + switch c.valueType { case valueTypeString: - v := c.encodedValues[rowIdx] + v := valuesEncoded[rowIdx] return tryParseFloat64(v) case valueTypeDict: - dictIdx := c.encodedValues[rowIdx][0] + dictIdx := valuesEncoded[rowIdx][0] v := c.dictValues[dictIdx] return tryParseFloat64(v) case valueTypeUint8: - v := c.encodedValues[rowIdx] + v := valuesEncoded[rowIdx] return float64(unmarshalUint8(v)), true case valueTypeUint16: - v := c.encodedValues[rowIdx] + v := valuesEncoded[rowIdx] return float64(unmarshalUint16(v)), true case valueTypeUint32: - v := c.encodedValues[rowIdx] + v := valuesEncoded[rowIdx] return float64(unmarshalUint32(v)), true case valueTypeUint64: - v := c.encodedValues[rowIdx] + v := valuesEncoded[rowIdx] return float64(unmarshalUint64(v)), true case valueTypeFloat64: - v := c.encodedValues[rowIdx] + v := valuesEncoded[rowIdx] f := unmarshalFloat64(v) return f, !math.IsNaN(f) case valueTypeIPv4: @@ -1509,7 +1546,7 @@ func (c *blockResultColumn) getFloatValueAtRow(rowIdx int) (float64, bool) { func (c *blockResultColumn) sumLenValues(br *blockResult) uint64 { if c.isConst { - v := c.encodedValues[0] + v := c.valuesEncoded[0] return uint64(len(v)) * uint64(len(br.timestamps)) } if c.isTime { @@ -1522,7 +1559,7 @@ func (c *blockResultColumn) sumLenValues(br *blockResult) uint64 { case valueTypeDict: n := uint64(0) dictValues := c.dictValues - for _, v := range c.encodedValues { + for _, v := range c.getValuesEncoded(br) { idx := v[0] v := dictValues[idx] n += uint64(len(v)) @@ -1558,7 +1595,7 @@ func (c *blockResultColumn) sumLenStringValues(br *blockResult) uint64 { func (c *blockResultColumn) sumValues(br *blockResult) (float64, int) { if c.isConst { - v := c.encodedValues[0] + v := c.valuesEncoded[0] f, ok := tryParseFloat64(v) if !ok { return 0, 0 @@ -1575,7 +1612,7 @@ func (c *blockResultColumn) sumValues(br *blockResult) (float64, int) { count := 0 f := float64(0) ok := false - values := c.encodedValues + values := c.getValuesEncoded(br) for i := range values { if i == 0 || values[i-1] != values[i] { f, ok = tryParseFloat64(values[i]) @@ -1587,9 +1624,10 @@ func (c *blockResultColumn) sumValues(br *blockResult) (float64, int) { } return sum, count case valueTypeDict: - a := encoding.GetFloat64s(len(c.dictValues)) + dictValues := c.dictValues + a := encoding.GetFloat64s(len(dictValues)) dictValuesFloat := a.A - for i, v := range c.dictValues { + for i, v := range dictValues { f, ok := tryParseFloat64(v) if !ok { f = nan @@ -1598,7 +1636,7 @@ func (c *blockResultColumn) sumValues(br *blockResult) (float64, int) { } sum := float64(0) count := 0 - for _, v := range c.encodedValues { + for _, v := range c.getValuesEncoded(br) { dictIdx := v[0] f := dictValuesFloat[dictIdx] if !math.IsNaN(f) { @@ -1610,31 +1648,31 @@ func (c *blockResultColumn) sumValues(br *blockResult) (float64, int) { return sum, count case valueTypeUint8: sum := uint64(0) - for _, v := range c.encodedValues { + for _, v := range c.getValuesEncoded(br) { sum += uint64(unmarshalUint8(v)) } return float64(sum), len(br.timestamps) case valueTypeUint16: sum := uint64(0) - for _, v := range c.encodedValues { + for _, v := range c.getValuesEncoded(br) { sum += uint64(unmarshalUint16(v)) } return float64(sum), len(br.timestamps) case valueTypeUint32: sum := uint64(0) - for _, v := range c.encodedValues { + for _, v := range c.getValuesEncoded(br) { sum += uint64(unmarshalUint32(v)) } return float64(sum), len(br.timestamps) case valueTypeUint64: sum := float64(0) - for _, v := range c.encodedValues { + for _, v := range c.getValuesEncoded(br) { sum += float64(unmarshalUint64(v)) } return sum, len(br.timestamps) case valueTypeFloat64: sum := float64(0) - for _, v := range c.encodedValues { + for _, v := range c.getValuesEncoded(br) { f := unmarshalFloat64(v) if !math.IsNaN(f) { sum += f @@ -1656,16 +1694,19 @@ type resultColumn struct { // name is column name. name string - // buf contains values data. - buf []byte + // a contains values data. + a arena - // values is the result values. They are backed by data inside buf. + // values is the result values. They are backed by data inside a. values []string } -func (rc *resultColumn) resetKeepName() { - rc.buf = rc.buf[:0] +func (rc *resultColumn) sizeBytes() int { + return len(rc.name) + rc.a.sizeBytes() + len(rc.values)*int(unsafe.Sizeof(rc.values[0])) +} +func (rc *resultColumn) resetKeepName() { + rc.a.reset() clear(rc.values) rc.values = rc.values[:0] } @@ -1674,12 +1715,11 @@ func (rc *resultColumn) resetKeepName() { func (rc *resultColumn) addValue(v string) { values := rc.values if len(values) > 0 && string(v) == values[len(values)-1] { - rc.values = append(values, values[len(values)-1]) + v = values[len(values)-1] } else { - bufLen := len(rc.buf) - rc.buf = append(rc.buf, v...) - rc.values = append(values, bytesutil.ToUnsafeString(rc.buf[bufLen:])) + v = rc.a.copyString(v) } + rc.values = append(values, v) } func truncateTimestampToMonth(timestamp int64) int64 { @@ -1705,5 +1745,16 @@ func getEmptyStrings(rowsCount int) []string { var emptyStrings atomic.Pointer[[]string] +func visitValuesReadonly(bs *blockSearch, ch *columnHeader, bm *bitmap, f func(value string)) { + if bm.isZero() { + // Fast path - nothing to visit + return + } + values := bs.getValuesForColumn(ch) + bm.forEachSetBitReadonly(func(idx int) { + f(values[idx]) + }) +} + var nan = math.NaN() var inf = math.Inf(1) diff --git a/lib/logstorage/block_search.go b/lib/logstorage/block_search.go index 6845f9c33..fe5cc56aa 100644 --- a/lib/logstorage/block_search.go +++ b/lib/logstorage/block_search.go @@ -168,9 +168,9 @@ func (bs *blockSearch) search(bsw *blockSearchWork) { // fetch the requested columns to bs.br. if bs.bsw.so.needAllColumns { - bs.br.fetchAllColumns(bs, bm) + bs.br.initAllColumns() } else { - bs.br.fetchRequestedColumns(bs, bm) + bs.br.initRequestedColumns() } } diff --git a/lib/logstorage/pipe_sort.go b/lib/logstorage/pipe_sort.go index 4787b56bb..55b74b5dd 100644 --- a/lib/logstorage/pipe_sort.go +++ b/lib/logstorage/pipe_sort.go @@ -232,13 +232,13 @@ func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) { { c: &blockResultColumn{ valueType: valueTypeString, - encodedValues: rc.values, + valuesEncoded: rc.values, }, i64Values: i64Values, f64Values: f64Values, }, } - shard.stateSizeBudget -= len(rc.buf) + int(unsafe.Sizeof(byColumns[0])+unsafe.Sizeof(*byColumns[0].c)) + shard.stateSizeBudget -= rc.sizeBytes() + int(unsafe.Sizeof(byColumns[0])+unsafe.Sizeof(*byColumns[0].c)) // Append br to shard.blocks. shard.blocks = append(shard.blocks, sortBlock{ @@ -260,8 +260,8 @@ func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) { continue } if c.isConst { - bc.i64Values = shard.createInt64Values(c.encodedValues) - bc.f64Values = shard.createFloat64Values(c.encodedValues) + bc.i64Values = shard.createInt64Values(c.valuesEncoded) + bc.f64Values = shard.createFloat64Values(c.valuesEncoded) continue } @@ -610,8 +610,8 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA int, shardB *pipeSort if cA.c.isConst && cB.c.isConst { // Fast path - compare const values - ccA := cA.c.encodedValues[0] - ccB := cB.c.encodedValues[0] + ccA := cA.c.valuesEncoded[0] + ccB := cB.c.valuesEncoded[0] if ccA == ccB { continue } diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index 56e086460..efb5a9cfd 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -182,14 +182,14 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) { c := br.getColumnByName(bf.name) if c.isConst { // Fast path for column with constant value. - v := br.getBucketedValue(c.encodedValues[0], bf) + v := br.getBucketedValue(c.valuesEncoded[0], bf) shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(v)) psg := shard.getPipeStatsGroup(shard.keyBuf) shard.stateSizeBudget -= psg.updateStatsForAllRows(br) return } - values := c.getBucketedValues(br, bf) + values := c.getValuesBucketed(br, bf) if areConstValues(values) { // Fast path for column with constant values. shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(values[0])) @@ -216,7 +216,7 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) { columnValues := shard.columnValues[:0] for _, bf := range byFields { c := br.getColumnByName(bf.name) - values := c.getBucketedValues(br, bf) + values := c.getValuesBucketed(br, bf) columnValues = append(columnValues, values) } shard.columnValues = columnValues diff --git a/lib/logstorage/stats_avg.go b/lib/logstorage/stats_avg.go index 5cd0ec364..f0afcfee8 100644 --- a/lib/logstorage/stats_avg.go +++ b/lib/logstorage/stats_avg.go @@ -57,7 +57,7 @@ func (sap *statsAvgProcessor) updateStatsForRow(br *blockResult, rowIdx int) int if sap.sa.containsStar { // Scan all the fields for the given row for _, c := range br.getColumns() { - f, ok := c.getFloatValueAtRow(rowIdx) + f, ok := c.getFloatValueAtRow(br, rowIdx) if ok { sap.sum += f sap.count++ @@ -67,7 +67,7 @@ func (sap *statsAvgProcessor) updateStatsForRow(br *blockResult, rowIdx int) int // Scan only the given fields for the given row for _, field := range sap.sa.fields { c := br.getColumnByName(field) - f, ok := c.getFloatValueAtRow(rowIdx) + f, ok := c.getFloatValueAtRow(br, rowIdx) if ok { sap.sum += f sap.count++ diff --git a/lib/logstorage/stats_count.go b/lib/logstorage/stats_count.go index fc7219211..dcf2be2a7 100644 --- a/lib/logstorage/stats_count.go +++ b/lib/logstorage/stats_count.go @@ -49,7 +49,7 @@ func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int { // Fast path for count(single_column) c := br.getColumnByName(fields[0]) if c.isConst { - if c.encodedValues[0] != "" { + if c.valuesEncoded[0] != "" { scp.rowsCount += uint64(len(br.timestamps)) } return 0 @@ -60,7 +60,7 @@ func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int { } switch c.valueType { case valueTypeString: - for _, v := range c.encodedValues { + for _, v := range c.getValuesEncoded(br) { if v != "" { scp.rowsCount++ } @@ -72,7 +72,7 @@ func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int { scp.rowsCount += uint64(len(br.timestamps)) return 0 } - for _, v := range c.encodedValues { + for _, v := range c.getValuesEncoded(br) { if int(v[0]) != zeroDictIdx { scp.rowsCount++ } @@ -95,7 +95,7 @@ func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int { for _, f := range fields { c := br.getColumnByName(f) if c.isConst { - if c.encodedValues[0] != "" { + if c.valuesEncoded[0] != "" { scp.rowsCount += uint64(len(br.timestamps)) return 0 } @@ -105,18 +105,21 @@ func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int { scp.rowsCount += uint64(len(br.timestamps)) return 0 } + switch c.valueType { case valueTypeString: + valuesEncoded := c.getValuesEncoded(br) bm.forEachSetBit(func(i int) bool { - return c.encodedValues[i] == "" + return valuesEncoded[i] == "" }) case valueTypeDict: if !slices.Contains(c.dictValues, "") { scp.rowsCount += uint64(len(br.timestamps)) return 0 } + valuesEncoded := c.getValuesEncoded(br) bm.forEachSetBit(func(i int) bool { - dictIdx := c.encodedValues[i][0] + dictIdx := valuesEncoded[i][0] return c.dictValues[dictIdx] == "" }) case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64, valueTypeFloat64, valueTypeIPv4, valueTypeTimestampISO8601: @@ -144,7 +147,7 @@ func (scp *statsCountProcessor) updateStatsForRow(br *blockResult, rowIdx int) i // Fast path for count(single_column) c := br.getColumnByName(fields[0]) if c.isConst { - if c.encodedValues[0] != "" { + if c.valuesEncoded[0] != "" { scp.rowsCount++ } return 0 @@ -155,12 +158,14 @@ func (scp *statsCountProcessor) updateStatsForRow(br *blockResult, rowIdx int) i } switch c.valueType { case valueTypeString: - if v := c.encodedValues[rowIdx]; v != "" { + valuesEncoded := c.getValuesEncoded(br) + if v := valuesEncoded[rowIdx]; v != "" { scp.rowsCount++ } return 0 case valueTypeDict: - dictIdx := c.encodedValues[rowIdx][0] + valuesEncoded := c.getValuesEncoded(br) + dictIdx := valuesEncoded[rowIdx][0] if v := c.dictValues[dictIdx]; v != "" { scp.rowsCount++ } diff --git a/lib/logstorage/stats_count_empty.go b/lib/logstorage/stats_count_empty.go index 62581d817..9904057f5 100644 --- a/lib/logstorage/stats_count_empty.go +++ b/lib/logstorage/stats_count_empty.go @@ -53,7 +53,7 @@ func (scp *statsCountEmptyProcessor) updateStatsForAllRows(br *blockResult) int // Fast path for count_empty(single_column) c := br.getColumnByName(fields[0]) if c.isConst { - if c.encodedValues[0] == "" { + if c.valuesEncoded[0] == "" { scp.rowsCount += uint64(len(br.timestamps)) } return 0 @@ -63,7 +63,7 @@ func (scp *statsCountEmptyProcessor) updateStatsForAllRows(br *blockResult) int } switch c.valueType { case valueTypeString: - for _, v := range c.encodedValues { + for _, v := range c.getValuesEncoded(br) { if v == "" { scp.rowsCount++ } @@ -74,7 +74,7 @@ func (scp *statsCountEmptyProcessor) updateStatsForAllRows(br *blockResult) int if zeroDictIdx < 0 { return 0 } - for _, v := range c.encodedValues { + for _, v := range c.getValuesEncoded(br) { if int(v[0]) == zeroDictIdx { scp.rowsCount++ } @@ -96,7 +96,7 @@ func (scp *statsCountEmptyProcessor) updateStatsForAllRows(br *blockResult) int for _, f := range fields { c := br.getColumnByName(f) if c.isConst { - if c.encodedValues[0] == "" { + if c.valuesEncoded[0] == "" { scp.rowsCount += uint64(len(br.timestamps)) return 0 } @@ -107,15 +107,17 @@ func (scp *statsCountEmptyProcessor) updateStatsForAllRows(br *blockResult) int } switch c.valueType { case valueTypeString: + valuesEncoded := c.getValuesEncoded(br) bm.forEachSetBit(func(i int) bool { - return c.encodedValues[i] == "" + return valuesEncoded[i] == "" }) case valueTypeDict: if !slices.Contains(c.dictValues, "") { return 0 } + valuesEncoded := c.getValuesEncoded(br) bm.forEachSetBit(func(i int) bool { - dictIdx := c.encodedValues[i][0] + dictIdx := valuesEncoded[i][0] return c.dictValues[dictIdx] == "" }) case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64, valueTypeFloat64, valueTypeIPv4, valueTypeTimestampISO8601: @@ -145,7 +147,7 @@ func (scp *statsCountEmptyProcessor) updateStatsForRow(br *blockResult, rowIdx i // Fast path for count_empty(single_column) c := br.getColumnByName(fields[0]) if c.isConst { - if c.encodedValues[0] == "" { + if c.valuesEncoded[0] == "" { scp.rowsCount++ } return 0 @@ -155,12 +157,14 @@ func (scp *statsCountEmptyProcessor) updateStatsForRow(br *blockResult, rowIdx i } switch c.valueType { case valueTypeString: - if v := c.encodedValues[rowIdx]; v == "" { + valuesEncoded := c.getValuesEncoded(br) + if v := valuesEncoded[rowIdx]; v == "" { scp.rowsCount++ } return 0 case valueTypeDict: - dictIdx := c.encodedValues[rowIdx][0] + valuesEncoded := c.getValuesEncoded(br) + dictIdx := valuesEncoded[rowIdx][0] if v := c.dictValues[dictIdx]; v == "" { scp.rowsCount++ } diff --git a/lib/logstorage/stats_count_uniq.go b/lib/logstorage/stats_count_uniq.go index 354e25813..d4f1bc486 100644 --- a/lib/logstorage/stats_count_uniq.go +++ b/lib/logstorage/stats_count_uniq.go @@ -122,7 +122,7 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int { } if c.isConst { // count unique const values - v := c.encodedValues[0] + v := c.valuesEncoded[0] if v == "" { // Do not count empty values return stateSizeIncrease @@ -156,7 +156,7 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int { return stateSizeIncrease } - // Count unique values across encodedValues + // Count unique values across values values := c.getValues(br) keyBuf := sup.keyBuf[:0] for i, v := range values { @@ -278,7 +278,7 @@ func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx in } if c.isConst { // count unique const values - v := c.encodedValues[0] + v := c.valuesEncoded[0] if v == "" { // Do not count empty values return stateSizeIncrease @@ -295,7 +295,8 @@ func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx in } if c.valueType == valueTypeDict { // count unique non-zero c.dictValues - dictIdx := c.encodedValues[rowIdx][0] + valuesEncoded := c.getValuesEncoded(br) + dictIdx := valuesEncoded[rowIdx][0] v := c.dictValues[dictIdx] if v == "" { // Do not count empty values diff --git a/lib/logstorage/stats_max.go b/lib/logstorage/stats_max.go index df917abaf..45289b895 100644 --- a/lib/logstorage/stats_max.go +++ b/lib/logstorage/stats_max.go @@ -108,14 +108,14 @@ func (smp *statsMaxProcessor) updateStateForColumn(br *blockResult, c *blockResu } if c.isConst { // Special case for const column - v := c.encodedValues[0] + v := c.valuesEncoded[0] smp.updateStateString(v) return } switch c.valueType { case valueTypeString: - for _, v := range c.encodedValues { + for _, v := range c.getValuesEncoded(br) { smp.updateStateString(v) } case valueTypeDict: @@ -124,23 +124,23 @@ func (smp *statsMaxProcessor) updateStateForColumn(br *blockResult, c *blockResu } case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64: bb := bbPool.Get() - bb.B = marshalUint64String(bb.B[:0], c.maxValue) + bb.B = marshalUint64String(bb.B[:0], c.ch.maxValue) smp.updateStateBytes(bb.B) bbPool.Put(bb) case valueTypeFloat64: - f := math.Float64frombits(c.maxValue) + f := math.Float64frombits(c.ch.maxValue) bb := bbPool.Get() bb.B = marshalFloat64String(bb.B[:0], f) smp.updateStateBytes(bb.B) bbPool.Put(bb) case valueTypeIPv4: bb := bbPool.Get() - bb.B = marshalIPv4String(bb.B[:0], uint32(c.maxValue)) + bb.B = marshalIPv4String(bb.B[:0], uint32(c.ch.maxValue)) smp.updateStateBytes(bb.B) bbPool.Put(bb) case valueTypeTimestampISO8601: bb := bbPool.Get() - bb.B = marshalTimestampISO8601String(bb.B[:0], int64(c.maxValue)) + bb.B = marshalTimestampISO8601String(bb.B[:0], int64(c.ch.maxValue)) smp.updateStateBytes(bb.B) bbPool.Put(bb) default: diff --git a/lib/logstorage/stats_min.go b/lib/logstorage/stats_min.go index 298b1c92a..8344087f6 100644 --- a/lib/logstorage/stats_min.go +++ b/lib/logstorage/stats_min.go @@ -108,14 +108,14 @@ func (smp *statsMinProcessor) updateStateForColumn(br *blockResult, c *blockResu } if c.isConst { // Special case for const column - v := c.encodedValues[0] + v := c.valuesEncoded[0] smp.updateStateString(v) return } switch c.valueType { case valueTypeString: - for _, v := range c.encodedValues { + for _, v := range c.getValuesEncoded(br) { smp.updateStateString(v) } case valueTypeDict: @@ -124,23 +124,23 @@ func (smp *statsMinProcessor) updateStateForColumn(br *blockResult, c *blockResu } case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64: bb := bbPool.Get() - bb.B = marshalUint64String(bb.B[:0], c.minValue) + bb.B = marshalUint64String(bb.B[:0], c.ch.minValue) smp.updateStateBytes(bb.B) bbPool.Put(bb) case valueTypeFloat64: - f := math.Float64frombits(c.minValue) + f := math.Float64frombits(c.ch.minValue) bb := bbPool.Get() bb.B = marshalFloat64String(bb.B[:0], f) smp.updateStateBytes(bb.B) bbPool.Put(bb) case valueTypeIPv4: bb := bbPool.Get() - bb.B = marshalIPv4String(bb.B[:0], uint32(c.minValue)) + bb.B = marshalIPv4String(bb.B[:0], uint32(c.ch.minValue)) smp.updateStateBytes(bb.B) bbPool.Put(bb) case valueTypeTimestampISO8601: bb := bbPool.Get() - bb.B = marshalTimestampISO8601String(bb.B[:0], int64(c.minValue)) + bb.B = marshalTimestampISO8601String(bb.B[:0], int64(c.ch.minValue)) smp.updateStateBytes(bb.B) bbPool.Put(bb) default: diff --git a/lib/logstorage/stats_quantile.go b/lib/logstorage/stats_quantile.go index 2d6552d5d..310020082 100644 --- a/lib/logstorage/stats_quantile.go +++ b/lib/logstorage/stats_quantile.go @@ -2,11 +2,15 @@ package logstorage import ( "fmt" + "math" "slices" "strconv" "unsafe" "github.com/valyala/fastrand" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) type statsQuantile struct { @@ -38,27 +42,16 @@ type statsQuantileProcessor struct { } func (sqp *statsQuantileProcessor) updateStatsForAllRows(br *blockResult) int { - h := &sqp.h stateSizeIncrease := 0 if sqp.sq.containsStar { for _, c := range br.getColumns() { - for _, v := range c.getValues(br) { - f, ok := tryParseFloat64(v) - if ok { - stateSizeIncrease += h.update(f) - } - } + stateSizeIncrease += sqp.updateStateForColumn(br, c) } } else { for _, field := range sqp.sq.fields { c := br.getColumnByName(field) - for _, v := range c.getValues(br) { - f, ok := tryParseFloat64(v) - if ok { - stateSizeIncrease += h.update(f) - } - } + stateSizeIncrease += sqp.updateStateForColumn(br, c) } } @@ -71,7 +64,7 @@ func (sqp *statsQuantileProcessor) updateStatsForRow(br *blockResult, rowIdx int if sqp.sq.containsStar { for _, c := range br.getColumns() { - f, ok := c.getFloatValueAtRow(rowIdx) + f, ok := c.getFloatValueAtRow(br, rowIdx) if ok { stateSizeIncrease += h.update(f) } @@ -79,7 +72,7 @@ func (sqp *statsQuantileProcessor) updateStatsForRow(br *blockResult, rowIdx int } else { for _, field := range sqp.sq.fields { c := br.getColumnByName(field) - f, ok := c.getFloatValueAtRow(rowIdx) + f, ok := c.getFloatValueAtRow(br, rowIdx) if ok { stateSizeIncrease += h.update(f) } @@ -89,6 +82,85 @@ func (sqp *statsQuantileProcessor) updateStatsForRow(br *blockResult, rowIdx int return stateSizeIncrease } +func (sqp *statsQuantileProcessor) updateStateForColumn(br *blockResult, c *blockResultColumn) int { + h := &sqp.h + stateSizeIncrease := 0 + + if c.isConst { + f, ok := tryParseFloat64(c.valuesEncoded[0]) + if ok { + for range br.timestamps { + stateSizeIncrease += h.update(f) + } + } + return stateSizeIncrease + } + if c.isTime { + return 0 + } + + switch c.valueType { + case valueTypeString: + for _, v := range c.getValues(br) { + f, ok := tryParseFloat64(v) + if ok { + stateSizeIncrease += h.update(f) + } + } + case valueTypeDict: + dictValues := c.dictValues + a := encoding.GetFloat64s(len(dictValues)) + for i, v := range dictValues { + f, ok := tryParseFloat64(v) + if !ok { + f = nan + } + a.A[i] = f + } + for _, v := range c.getValuesEncoded(br) { + idx := v[0] + f := a.A[idx] + if !math.IsNaN(f) { + h.update(f) + } + } + encoding.PutFloat64s(a) + case valueTypeUint8: + for _, v := range c.getValuesEncoded(br) { + n := unmarshalUint8(v) + h.update(float64(n)) + } + case valueTypeUint16: + for _, v := range c.getValuesEncoded(br) { + n := unmarshalUint16(v) + h.update(float64(n)) + } + case valueTypeUint32: + for _, v := range c.getValuesEncoded(br) { + n := unmarshalUint32(v) + h.update(float64(n)) + } + case valueTypeUint64: + for _, v := range c.getValuesEncoded(br) { + n := unmarshalUint64(v) + h.update(float64(n)) + } + case valueTypeFloat64: + for _, v := range c.getValuesEncoded(br) { + f := unmarshalFloat64(v) + if !math.IsNaN(f) { + h.update(f) + } + } + case valueTypeIPv4: + case valueTypeTimestampISO8601: + default: + logger.Panicf("BUG: unexpected valueType=%d", c.valueType) + } + + return stateSizeIncrease +} + func (sqp *statsQuantileProcessor) mergeState(sfp statsProcessor) { src := sfp.(*statsQuantileProcessor) sqp.h.mergeState(&src.h) diff --git a/lib/logstorage/stats_sum.go b/lib/logstorage/stats_sum.go index 4480214df..e70eb05dd 100644 --- a/lib/logstorage/stats_sum.go +++ b/lib/logstorage/stats_sum.go @@ -38,27 +38,13 @@ func (ssp *statsSumProcessor) updateStatsForAllRows(br *blockResult) int { if ssp.ss.containsStar { // Sum all the columns for _, c := range br.getColumns() { - f, count := c.sumValues(br) - if count > 0 { - if math.IsNaN(ssp.sum) { - ssp.sum = f - } else { - ssp.sum += f - } - } + ssp.updateStateForColumn(br, c) } } else { // Sum the requested columns for _, field := range ssp.ss.fields { c := br.getColumnByName(field) - f, count := c.sumValues(br) - if count > 0 { - if math.IsNaN(ssp.sum) { - ssp.sum = f - } else { - ssp.sum += f - } - } + ssp.updateStateForColumn(br, c) } } return 0 @@ -68,32 +54,39 @@ func (ssp *statsSumProcessor) updateStatsForRow(br *blockResult, rowIdx int) int if ssp.ss.containsStar { // Sum all the fields for the given row for _, c := range br.getColumns() { - f, ok := c.getFloatValueAtRow(rowIdx) + f, ok := c.getFloatValueAtRow(br, rowIdx) if ok { - if math.IsNaN(ssp.sum) { - ssp.sum = f - } else { - ssp.sum += f - } + ssp.updateState(f) } } } else { // Sum only the given fields for the given row for _, field := range ssp.ss.fields { c := br.getColumnByName(field) - f, ok := c.getFloatValueAtRow(rowIdx) + f, ok := c.getFloatValueAtRow(br, rowIdx) if ok { - if math.IsNaN(ssp.sum) { - ssp.sum = f - } else { - ssp.sum += f - } + ssp.updateState(f) } } } return 0 } +func (ssp *statsSumProcessor) updateStateForColumn(br *blockResult, c *blockResultColumn) { + f, count := c.sumValues(br) + if count > 0 { + ssp.updateState(f) + } +} + +func (ssp *statsSumProcessor) updateState(f float64) { + if math.IsNaN(ssp.sum) { + ssp.sum = f + } else { + ssp.sum += f + } +} + func (ssp *statsSumProcessor) mergeState(sfp statsProcessor) { src := sfp.(*statsSumProcessor) ssp.sum += src.sum diff --git a/lib/logstorage/stats_uniq_values.go b/lib/logstorage/stats_uniq_values.go index d11e61387..6118f844a 100644 --- a/lib/logstorage/stats_uniq_values.go +++ b/lib/logstorage/stats_uniq_values.go @@ -68,7 +68,7 @@ func (sup *statsUniqValuesProcessor) updateStatsForAllRowsColumn(c *blockResultC stateSizeIncrease := 0 if c.isConst { // collect unique const values - v := c.encodedValues[0] + v := c.valuesEncoded[0] if v == "" { // skip empty values return stateSizeIncrease @@ -141,7 +141,7 @@ func (sup *statsUniqValuesProcessor) updateStatsForRowColumn(c *blockResultColum stateSizeIncrease := 0 if c.isConst { // collect unique const values - v := c.encodedValues[0] + v := c.valuesEncoded[0] if v == "" { // skip empty values return stateSizeIncrease @@ -155,7 +155,8 @@ func (sup *statsUniqValuesProcessor) updateStatsForRowColumn(c *blockResultColum } if c.valueType == valueTypeDict { // collect unique non-zero c.dictValues - dictIdx := c.encodedValues[rowIdx][0] + valuesEncoded := c.getValuesEncoded(br) + dictIdx := valuesEncoded[rowIdx][0] v := c.dictValues[dictIdx] if v == "" { // skip empty values diff --git a/lib/logstorage/stats_values.go b/lib/logstorage/stats_values.go index 2dd62f2a9..816727ba1 100644 --- a/lib/logstorage/stats_values.go +++ b/lib/logstorage/stats_values.go @@ -61,7 +61,7 @@ func (svp *statsValuesProcessor) updateStatsForAllRows(br *blockResult) int { func (svp *statsValuesProcessor) updateStatsForAllRowsColumn(c *blockResultColumn, br *blockResult) int { stateSizeIncrease := 0 if c.isConst { - v := strings.Clone(c.encodedValues[0]) + v := strings.Clone(c.valuesEncoded[0]) stateSizeIncrease += len(v) values := svp.values @@ -81,7 +81,7 @@ func (svp *statsValuesProcessor) updateStatsForAllRowsColumn(c *blockResultColum } values := svp.values - for _, encodedValue := range c.encodedValues { + for _, encodedValue := range c.getValuesEncoded(br) { idx := encodedValue[0] values = append(values, dictValues[idx]) } @@ -128,7 +128,7 @@ func (svp *statsValuesProcessor) updateStatsForRow(br *blockResult, rowIdx int) func (svp *statsValuesProcessor) updateStatsForRowColumn(c *blockResultColumn, br *blockResult, rowIdx int) int { stateSizeIncrease := 0 if c.isConst { - v := strings.Clone(c.encodedValues[0]) + v := strings.Clone(c.valuesEncoded[0]) stateSizeIncrease += len(v) svp.values = append(svp.values, v) @@ -138,7 +138,8 @@ func (svp *statsValuesProcessor) updateStatsForRowColumn(c *blockResultColumn, b } if c.valueType == valueTypeDict { // collect unique non-zero c.dictValues - dictIdx := c.encodedValues[rowIdx][0] + valuesEncoded := c.getValuesEncoded(br) + dictIdx := valuesEncoded[rowIdx][0] v := strings.Clone(c.dictValues[dictIdx]) stateSizeIncrease += len(v)