This commit is contained in:
Aliaksandr Valialkin 2024-05-01 01:19:22 +02:00
parent a38345c6b4
commit 2005e5f93b
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
9 changed files with 173 additions and 95 deletions

View file

@ -316,6 +316,32 @@ func (br *blockResult) reset() {
br.cs = cs[:0]
}
func (br *blockResult) resetRows() {
br.buf = br.buf[:0]
clear(br.valuesBuf)
br.valuesBuf = br.valuesBuf[:0]
br.timestamps = br.timestamps[:0]
cs := br.getColumns()
for i := range cs {
cs[i].resetRows()
}
}
func (br *blockResult) addRow(timestamp int64, values []string) {
br.timestamps = append(br.timestamps, timestamp)
cs := br.getColumns()
if len(values) != len(cs) {
logger.Panicf("BUG: unexpected number of values in a row; got %d; want %d", len(values), len(cs))
}
for i := range cs {
cs[i].addValue(values[i])
}
}
func (br *blockResult) fetchAllColumns(bs *blockSearch, bm *bitmap) {
if !br.addStreamColumn(bs) {
// Skip the current block, since the associated stream tags are missing.
@ -573,6 +599,13 @@ func (br *blockResult) addConstColumn(name, value string) {
})
}
func (br *blockResult) addEmptyStringColumn(columnName string) {
br.cs = append(br.cs, blockResultColumn{
name: columnName,
valueType: valueTypeString,
})
}
func (br *blockResult) updateColumns(columnNames []string) {
if br.areSameColumns(columnNames) {
// Fast path - nothing to change.
@ -694,6 +727,10 @@ type blockResultColumn struct {
// values contain decoded values after getValues() call for the given column
values []string
// buf and valuesBuf are used by addValue() in order to re-use memory across resetRows().
buf []byte
valuesBuf []string
}
func (c *blockResultColumn) reset() {
@ -704,6 +741,35 @@ func (c *blockResultColumn) reset() {
c.dictValues = nil
c.encodedValues = nil
c.values = nil
c.buf = c.buf[:0]
clear(c.valuesBuf)
c.valuesBuf = c.valuesBuf[:0]
}
func (c *blockResultColumn) resetRows() {
c.dictValues = nil
c.encodedValues = nil
c.values = nil
c.buf = c.buf[:0]
clear(c.valuesBuf)
c.valuesBuf = c.valuesBuf[:0]
}
func (c *blockResultColumn) addValue(v string) {
if c.valueType != valueTypeString {
logger.Panicf("BUG: unexpected column type; got %d; want %d", c.valueType, valueTypeString)
}
bufLen := len(c.buf)
c.buf = append(c.buf, v...)
c.valuesBuf = append(c.valuesBuf, bytesutil.ToUnsafeString(c.buf[bufLen:]))
c.encodedValues = c.valuesBuf
c.values = c.valuesBuf
}
// getEncodedValues returns encoded values for the given column.

View file

@ -27,7 +27,8 @@ type pipeProcessor interface {
// The workerID is the id of the worker goroutine, which calls the writeBlock.
// It is in the range 0 ... workersCount-1 .
//
// It is forbidden to hold references br after returning from writeBlock, since the caller re-uses it.
// It is OK to modify br contents inside writeBlock. The caller mustn't rely on br contents after writeBlock call.
// It is forbidden to hold references to br after returning from writeBlock, since the caller may re-use it.
//
// If any error occurs at writeBlock, then cancel() must be called by pipeProcessor in order to notify worker goroutines
// to stop sending new data. The occurred error must be returned from flush().

View file

@ -7,6 +7,9 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// pipeFields implements '| fields ...' pipe.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#limiters
type pipeFields struct {
// fields contains list of fields to fetch
fields []string
@ -34,14 +37,14 @@ type pipeFieldsProcessor struct {
ppBase pipeProcessor
}
func (fpp *pipeFieldsProcessor) writeBlock(workerID uint, br *blockResult) {
if !fpp.pf.containsStar {
br.updateColumns(fpp.pf.fields)
func (pfp *pipeFieldsProcessor) writeBlock(workerID uint, br *blockResult) {
if !pfp.pf.containsStar {
br.updateColumns(pfp.pf.fields)
}
fpp.ppBase.writeBlock(workerID, br)
pfp.ppBase.writeBlock(workerID, br)
}
func (fpp *pipeFieldsProcessor) flush() error {
func (pfp *pipeFieldsProcessor) flush() error {
return nil
}

View file

@ -5,6 +5,9 @@ import (
"sync/atomic"
)
// pipeHead implements '| head ...' pipe.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#limiters
type pipeHead struct {
n uint64
}
@ -33,31 +36,31 @@ type pipeHeadProcessor struct {
rowsProcessed atomic.Uint64
}
func (hpp *pipeHeadProcessor) writeBlock(workerID uint, br *blockResult) {
rowsProcessed := hpp.rowsProcessed.Add(uint64(len(br.timestamps)))
if rowsProcessed <= hpp.ph.n {
func (php *pipeHeadProcessor) writeBlock(workerID uint, br *blockResult) {
rowsProcessed := php.rowsProcessed.Add(uint64(len(br.timestamps)))
if rowsProcessed <= php.ph.n {
// Fast path - write all the rows to ppBase.
hpp.ppBase.writeBlock(workerID, br)
php.ppBase.writeBlock(workerID, br)
return
}
// Slow path - overflow. Write the remaining rows if needed.
rowsProcessed -= uint64(len(br.timestamps))
if rowsProcessed >= hpp.ph.n {
if rowsProcessed >= php.ph.n {
// Nothing to write. There is no need in cancel() call, since it has been called by another goroutine.
return
}
// Write remaining rows.
keepRows := hpp.ph.n - rowsProcessed
keepRows := php.ph.n - rowsProcessed
br.truncateRows(int(keepRows))
hpp.ppBase.writeBlock(workerID, br)
php.ppBase.writeBlock(workerID, br)
// Notify the caller that it should stop passing more data to writeBlock().
hpp.cancel()
php.cancel()
}
func (hpp *pipeHeadProcessor) flush() error {
func (php *pipeHeadProcessor) flush() error {
return nil
}

View file

@ -5,6 +5,9 @@ import (
"sync/atomic"
)
// pipeSkip implements '| skip ...' pipe.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#limiters
type pipeSkip struct {
n uint64
}
@ -27,24 +30,24 @@ type pipeSkipProcessor struct {
rowsProcessed atomic.Uint64
}
func (spp *pipeSkipProcessor) writeBlock(workerID uint, br *blockResult) {
rowsProcessed := spp.rowsProcessed.Add(uint64(len(br.timestamps)))
if rowsProcessed <= spp.ps.n {
func (psp *pipeSkipProcessor) writeBlock(workerID uint, br *blockResult) {
rowsProcessed := psp.rowsProcessed.Add(uint64(len(br.timestamps)))
if rowsProcessed <= psp.ps.n {
return
}
rowsProcessed -= uint64(len(br.timestamps))
if rowsProcessed >= spp.ps.n {
spp.ppBase.writeBlock(workerID, br)
if rowsProcessed >= psp.ps.n {
psp.ppBase.writeBlock(workerID, br)
return
}
rowsSkip := spp.ps.n - rowsProcessed
rowsSkip := psp.ps.n - rowsProcessed
br.skipRows(int(rowsSkip))
spp.ppBase.writeBlock(workerID, br)
psp.ppBase.writeBlock(workerID, br)
}
func (spp *pipeSkipProcessor) flush() error {
func (psp *pipeSkipProcessor) flush() error {
return nil
}

View file

@ -12,8 +12,17 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
)
// pipeStats processes '| stats ...' queries.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#stats
type pipeStats struct {
// byFields contains field names from 'by(...)' clause.
byFields []string
// resultNames contains names of output results generated by funcs.
resultNames []string
// funcs contains stats functions to execute.
funcs []statsFunc
}
@ -48,8 +57,8 @@ type statsProcessor interface {
// mergeState must merge sfp state into statsProcessor state.
mergeState(sfp statsProcessor)
// finalizeStats must return the collected stats from statsProcessor.
finalizeStats() (name, value string)
// finalizeStats must return the collected stats result from statsProcessor.
finalizeStats() string
}
func (ps *pipeStats) String() string {
@ -63,7 +72,7 @@ func (ps *pipeStats) String() string {
}
a := make([]string, len(ps.funcs))
for i, f := range ps.funcs {
a[i] = f.String()
a[i] = f.String() + " as " + ps.resultNames[i]
}
s += strings.Join(a, ", ")
return s
@ -83,7 +92,7 @@ func (ps *pipeStats) newPipeProcessor(workersCount int, stopCh <-chan struct{},
maxStateSize -= stateSizeBudgetChunk
}
spp := &pipeStatsProcessor{
psp := &pipeStatsProcessor{
ps: ps,
stopCh: stopCh,
cancel: cancel,
@ -93,9 +102,9 @@ func (ps *pipeStats) newPipeProcessor(workersCount int, stopCh <-chan struct{},
maxStateSize: maxStateSize,
}
spp.stateSizeBudget.Store(maxStateSize)
psp.stateSizeBudget.Store(maxStateSize)
return spp
return psp
}
type pipeStatsProcessor struct {
@ -149,24 +158,24 @@ type pipeStatsGroup struct {
sfps []statsProcessor
}
func (spp *pipeStatsProcessor) writeBlock(workerID uint, br *blockResult) {
shard := &spp.shards[workerID]
func (psp *pipeStatsProcessor) writeBlock(workerID uint, br *blockResult) {
shard := &psp.shards[workerID]
for shard.stateSizeBudget < 0 {
// steal some budget for the state size from the global budget.
remaining := spp.stateSizeBudget.Add(-stateSizeBudgetChunk)
remaining := psp.stateSizeBudget.Add(-stateSizeBudgetChunk)
if remaining < 0 {
// The state size is too big. Stop processing data in order to avoid OOM crash.
if remaining+stateSizeBudgetChunk >= 0 {
// Notify worker goroutines to stop calling writeBlock() in order to save CPU time.
spp.cancel()
psp.cancel()
}
return
}
shard.stateSizeBudget += stateSizeBudgetChunk
}
byFields := spp.ps.byFields
byFields := psp.ps.byFields
if len(byFields) == 0 {
// Fast path - pass all the rows to a single group with empty key.
for _, sfp := range shard.getStatsProcessors(nil) {
@ -255,13 +264,13 @@ func (spp *pipeStatsProcessor) writeBlock(workerID uint, br *blockResult) {
shard.keyBuf = keyBuf
}
func (spp *pipeStatsProcessor) flush() error {
if n := spp.stateSizeBudget.Load(); n <= 0 {
return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", spp.ps.String(), spp.maxStateSize/(1<<20))
func (psp *pipeStatsProcessor) flush() error {
if n := psp.stateSizeBudget.Load(); n <= 0 {
return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", psp.ps.String(), psp.maxStateSize/(1<<20))
}
// Merge states across shards
shards := spp.shards
shards := psp.shards
m := shards[0].m
shards = shards[1:]
for i := range shards {
@ -270,7 +279,7 @@ func (spp *pipeStatsProcessor) flush() error {
// shard.m may be quite big, so this loop can take a lot of time and CPU.
// Stop processing data as soon as stopCh is closed without wasting additional CPU time.
select {
case <-spp.stopCh:
case <-psp.stopCh:
return nil
default:
}
@ -287,7 +296,7 @@ func (spp *pipeStatsProcessor) flush() error {
}
// Write per-group states to ppBase
byFields := spp.ps.byFields
byFields := psp.ps.byFields
if len(byFields) == 0 && len(m) == 0 {
// Special case - zero matching rows.
_ = shards[0].getStatsProcessors(nil)
@ -296,12 +305,18 @@ func (spp *pipeStatsProcessor) flush() error {
var values []string
var br blockResult
zeroTimestamps := []int64{0}
for _, f := range byFields {
br.addEmptyStringColumn(f)
}
for _, resultName := range psp.ps.resultNames {
br.addEmptyStringColumn(resultName)
}
for key, spg := range m {
// m may be quite big, so this loop can take a lot of time and CPU.
// Stop processing data as soon as stopCh is closed without wasting additional CPU time.
select {
case <-spp.stopCh:
case <-psp.stopCh:
return nil
default:
}
@ -321,21 +336,20 @@ func (spp *pipeStatsProcessor) flush() error {
logger.Panicf("BUG: unexpected number of values decoded from keyBuf; got %d; want %d", len(values), len(byFields))
}
br.reset()
br.timestamps = zeroTimestamps
// construct columns for byFields
for i, f := range byFields {
br.addConstColumn(f, values[i])
}
// construct columns for stats functions
// calculate values for stats functions
for _, sfp := range spg.sfps {
name, value := sfp.finalizeStats()
br.addConstColumn(name, value)
value := sfp.finalizeStats()
values = append(values, value)
}
spp.ppBase.writeBlock(0, &br)
br.addRow(0, values)
if len(br.timestamps) >= 1_000 {
psp.ppBase.writeBlock(0, &br)
br.resetRows()
}
}
if len(br.timestamps) > 0 {
psp.ppBase.writeBlock(0, &br)
}
return nil
@ -378,14 +392,17 @@ func parsePipeStats(lex *lexer) (*pipeStats, error) {
ps.byFields = fields
}
var resultNames []string
var funcs []statsFunc
for {
sf, err := parseStatsFunc(lex)
sf, resultName, err := parseStatsFunc(lex)
if err != nil {
return nil, err
}
resultNames = append(resultNames, resultName)
funcs = append(funcs, sf)
if lex.isKeyword("|", ")", "") {
ps.resultNames = resultNames
ps.funcs = funcs
return &ps, nil
}
@ -396,29 +413,36 @@ func parsePipeStats(lex *lexer) (*pipeStats, error) {
}
}
func parseStatsFunc(lex *lexer) (statsFunc, error) {
func parseStatsFunc(lex *lexer) (statsFunc, string, error) {
var sf statsFunc
switch {
case lex.isKeyword("count"):
sfc, err := parseStatsCount(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'count' func: %w", err)
return nil, "", fmt.Errorf("cannot parse 'count' func: %w", err)
}
return sfc, nil
sf = sfc
case lex.isKeyword("uniq"):
sfu, err := parseStatsUniq(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'uniq' func: %w", err)
return nil, "", fmt.Errorf("cannot parse 'uniq' func: %w", err)
}
return sfu, nil
sf = sfu
case lex.isKeyword("sum"):
sfs, err := parseStatsSum(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'sum' func: %w", err)
return nil, "", fmt.Errorf("cannot parse 'sum' func: %w", err)
}
return sfs, nil
sf = sfs
default:
return nil, fmt.Errorf("unknown stats func %q", lex.token)
return nil, "", fmt.Errorf("unknown stats func %q", lex.token)
}
resultName, err := parseResultName(lex)
if err != nil {
return nil, "", fmt.Errorf("cannot parse result name: %w", err)
}
return sf, resultName, nil
}
func parseResultName(lex *lexer) (string, error) {

View file

@ -12,12 +12,10 @@ import (
type statsCount struct {
fields []string
containsStar bool
resultName string
}
func (sc *statsCount) String() string {
return "count(" + fieldNamesString(sc.fields) + ") as " + quoteTokenIfNeeded(sc.resultName)
return "count(" + fieldNamesString(sc.fields) + ")"
}
func (sc *statsCount) neededFields() []string {
@ -192,9 +190,8 @@ func (scp *statsCountProcessor) mergeState(sfp statsProcessor) {
scp.rowsCount += src.rowsCount
}
func (scp *statsCountProcessor) finalizeStats() (string, string) {
value := strconv.FormatUint(scp.rowsCount, 10)
return scp.sc.resultName, value
func (scp *statsCountProcessor) finalizeStats() string {
return strconv.FormatUint(scp.rowsCount, 10)
}
func parseStatsCount(lex *lexer) (*statsCount, error) {
@ -203,14 +200,9 @@ func parseStatsCount(lex *lexer) (*statsCount, error) {
if err != nil {
return nil, fmt.Errorf("cannot parse 'count' args: %w", err)
}
resultName, err := parseResultName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result name: %w", err)
}
sc := &statsCount{
fields: fields,
containsStar: slices.Contains(fields, "*"),
resultName: resultName,
}
return sc, nil
}

View file

@ -11,11 +11,10 @@ import (
type statsSum struct {
fields []string
containsStar bool
resultName string
}
func (ss *statsSum) String() string {
return "sum(" + fieldNamesString(ss.fields) + ") as " + quoteTokenIfNeeded(ss.resultName)
return "sum(" + fieldNamesString(ss.fields) + ")"
}
func (ss *statsSum) neededFields() []string {
@ -80,9 +79,8 @@ func (ssp *statsSumProcessor) mergeState(sfp statsProcessor) {
ssp.sum += src.sum
}
func (ssp *statsSumProcessor) finalizeStats() (string, string) {
value := strconv.FormatFloat(ssp.sum, 'f', -1, 64)
return ssp.ss.resultName, value
func (ssp *statsSumProcessor) finalizeStats() string {
return strconv.FormatFloat(ssp.sum, 'f', -1, 64)
}
func parseStatsSum(lex *lexer) (*statsSum, error) {
@ -94,14 +92,9 @@ func parseStatsSum(lex *lexer) (*statsSum, error) {
if len(fields) == 0 {
return nil, fmt.Errorf("'sum' must contain at least one arg")
}
resultName, err := parseResultName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result name: %w", err)
}
ss := &statsSum{
fields: fields,
containsStar: slices.Contains(fields, "*"),
resultName: resultName,
}
return ss, nil
}

View file

@ -13,11 +13,10 @@ import (
type statsUniq struct {
fields []string
containsStar bool
resultName string
}
func (su *statsUniq) String() string {
return "uniq(" + fieldNamesString(su.fields) + ") as " + quoteTokenIfNeeded(su.resultName)
return "uniq(" + fieldNamesString(su.fields) + ")"
}
func (su *statsUniq) neededFields() []string {
@ -347,10 +346,9 @@ func (sup *statsUniqProcessor) mergeState(sfp statsProcessor) {
}
}
func (sup *statsUniqProcessor) finalizeStats() (string, string) {
func (sup *statsUniqProcessor) finalizeStats() string {
n := uint64(len(sup.m))
value := strconv.FormatUint(n, 10)
return sup.su.resultName, value
return strconv.FormatUint(n, 10)
}
func parseStatsUniq(lex *lexer) (*statsUniq, error) {
@ -359,14 +357,9 @@ func parseStatsUniq(lex *lexer) (*statsUniq, error) {
if err != nil {
return nil, fmt.Errorf("cannot parse 'uniq' args: %w", err)
}
resultName, err := parseResultName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result name: %w", err)
}
su := &statsUniq{
fields: fields,
containsStar: slices.Contains(fields, "*"),
resultName: resultName,
}
return su, nil
}