From 2005e5f93b5c674714c399d4ea26be8dd60b4fbe Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 1 May 2024 01:19:22 +0200 Subject: [PATCH] wip --- lib/logstorage/block_search.go | 66 ++++++++++++++++++++ lib/logstorage/pipe.go | 3 +- lib/logstorage/pipe_fields.go | 13 ++-- lib/logstorage/pipe_head.go | 21 ++++--- lib/logstorage/pipe_skip.go | 19 +++--- lib/logstorage/pipe_stats.go | 106 ++++++++++++++++++++------------- lib/logstorage/stats_count.go | 14 +---- lib/logstorage/stats_sum.go | 13 +--- lib/logstorage/stats_unique.go | 13 +--- 9 files changed, 173 insertions(+), 95 deletions(-) diff --git a/lib/logstorage/block_search.go b/lib/logstorage/block_search.go index 94fc1ad07..1a729c343 100644 --- a/lib/logstorage/block_search.go +++ b/lib/logstorage/block_search.go @@ -316,6 +316,32 @@ func (br *blockResult) reset() { br.cs = cs[:0] } +func (br *blockResult) resetRows() { + br.buf = br.buf[:0] + + clear(br.valuesBuf) + br.valuesBuf = br.valuesBuf[:0] + + br.timestamps = br.timestamps[:0] + + cs := br.getColumns() + for i := range cs { + cs[i].resetRows() + } +} + +func (br *blockResult) addRow(timestamp int64, values []string) { + br.timestamps = append(br.timestamps, timestamp) + + cs := br.getColumns() + if len(values) != len(cs) { + logger.Panicf("BUG: unexpected number of values in a row; got %d; want %d", len(values), len(cs)) + } + for i := range cs { + cs[i].addValue(values[i]) + } +} + func (br *blockResult) fetchAllColumns(bs *blockSearch, bm *bitmap) { if !br.addStreamColumn(bs) { // Skip the current block, since the associated stream tags are missing. @@ -573,6 +599,13 @@ func (br *blockResult) addConstColumn(name, value string) { }) } +func (br *blockResult) addEmptyStringColumn(columnName string) { + br.cs = append(br.cs, blockResultColumn{ + name: columnName, + valueType: valueTypeString, + }) +} + func (br *blockResult) updateColumns(columnNames []string) { if br.areSameColumns(columnNames) { // Fast path - nothing to change. @@ -694,6 +727,10 @@ type blockResultColumn struct { // values contain decoded values after getValues() call for the given column values []string + + // buf and valuesBuf are used by addValue() in order to re-use memory across resetRows(). + buf []byte + valuesBuf []string } func (c *blockResultColumn) reset() { @@ -704,6 +741,35 @@ func (c *blockResultColumn) reset() { c.dictValues = nil c.encodedValues = nil c.values = nil + + c.buf = c.buf[:0] + + clear(c.valuesBuf) + c.valuesBuf = c.valuesBuf[:0] +} + +func (c *blockResultColumn) resetRows() { + c.dictValues = nil + c.encodedValues = nil + c.values = nil + + c.buf = c.buf[:0] + + clear(c.valuesBuf) + c.valuesBuf = c.valuesBuf[:0] +} + +func (c *blockResultColumn) addValue(v string) { + if c.valueType != valueTypeString { + logger.Panicf("BUG: unexpected column type; got %d; want %d", c.valueType, valueTypeString) + } + + bufLen := len(c.buf) + c.buf = append(c.buf, v...) + c.valuesBuf = append(c.valuesBuf, bytesutil.ToUnsafeString(c.buf[bufLen:])) + + c.encodedValues = c.valuesBuf + c.values = c.valuesBuf } // getEncodedValues returns encoded values for the given column. diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index a286e1c1c..1763529ed 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -27,7 +27,8 @@ type pipeProcessor interface { // The workerID is the id of the worker goroutine, which calls the writeBlock. // It is in the range 0 ... workersCount-1 . // - // It is forbidden to hold references br after returning from writeBlock, since the caller re-uses it. + // It is OK to modify br contents inside writeBlock. The caller mustn't rely on br contents after writeBlock call. + // It is forbidden to hold references to br after returning from writeBlock, since the caller may re-use it. // // If any error occurs at writeBlock, then cancel() must be called by pipeProcessor in order to notify worker goroutines // to stop sending new data. The occurred error must be returned from flush(). diff --git a/lib/logstorage/pipe_fields.go b/lib/logstorage/pipe_fields.go index 3822e2bff..eb6654b50 100644 --- a/lib/logstorage/pipe_fields.go +++ b/lib/logstorage/pipe_fields.go @@ -7,6 +7,9 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) +// pipeFields implements '| fields ...' pipe. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#limiters type pipeFields struct { // fields contains list of fields to fetch fields []string @@ -34,14 +37,14 @@ type pipeFieldsProcessor struct { ppBase pipeProcessor } -func (fpp *pipeFieldsProcessor) writeBlock(workerID uint, br *blockResult) { - if !fpp.pf.containsStar { - br.updateColumns(fpp.pf.fields) +func (pfp *pipeFieldsProcessor) writeBlock(workerID uint, br *blockResult) { + if !pfp.pf.containsStar { + br.updateColumns(pfp.pf.fields) } - fpp.ppBase.writeBlock(workerID, br) + pfp.ppBase.writeBlock(workerID, br) } -func (fpp *pipeFieldsProcessor) flush() error { +func (pfp *pipeFieldsProcessor) flush() error { return nil } diff --git a/lib/logstorage/pipe_head.go b/lib/logstorage/pipe_head.go index 3eda2ca83..7a82f4b8c 100644 --- a/lib/logstorage/pipe_head.go +++ b/lib/logstorage/pipe_head.go @@ -5,6 +5,9 @@ import ( "sync/atomic" ) +// pipeHead implements '| head ...' pipe. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#limiters type pipeHead struct { n uint64 } @@ -33,31 +36,31 @@ type pipeHeadProcessor struct { rowsProcessed atomic.Uint64 } -func (hpp *pipeHeadProcessor) writeBlock(workerID uint, br *blockResult) { - rowsProcessed := hpp.rowsProcessed.Add(uint64(len(br.timestamps))) - if rowsProcessed <= hpp.ph.n { +func (php *pipeHeadProcessor) writeBlock(workerID uint, br *blockResult) { + rowsProcessed := php.rowsProcessed.Add(uint64(len(br.timestamps))) + if rowsProcessed <= php.ph.n { // Fast path - write all the rows to ppBase. - hpp.ppBase.writeBlock(workerID, br) + php.ppBase.writeBlock(workerID, br) return } // Slow path - overflow. Write the remaining rows if needed. rowsProcessed -= uint64(len(br.timestamps)) - if rowsProcessed >= hpp.ph.n { + if rowsProcessed >= php.ph.n { // Nothing to write. There is no need in cancel() call, since it has been called by another goroutine. return } // Write remaining rows. - keepRows := hpp.ph.n - rowsProcessed + keepRows := php.ph.n - rowsProcessed br.truncateRows(int(keepRows)) - hpp.ppBase.writeBlock(workerID, br) + php.ppBase.writeBlock(workerID, br) // Notify the caller that it should stop passing more data to writeBlock(). - hpp.cancel() + php.cancel() } -func (hpp *pipeHeadProcessor) flush() error { +func (php *pipeHeadProcessor) flush() error { return nil } diff --git a/lib/logstorage/pipe_skip.go b/lib/logstorage/pipe_skip.go index 7df995376..a5a8403be 100644 --- a/lib/logstorage/pipe_skip.go +++ b/lib/logstorage/pipe_skip.go @@ -5,6 +5,9 @@ import ( "sync/atomic" ) +// pipeSkip implements '| skip ...' pipe. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#limiters type pipeSkip struct { n uint64 } @@ -27,24 +30,24 @@ type pipeSkipProcessor struct { rowsProcessed atomic.Uint64 } -func (spp *pipeSkipProcessor) writeBlock(workerID uint, br *blockResult) { - rowsProcessed := spp.rowsProcessed.Add(uint64(len(br.timestamps))) - if rowsProcessed <= spp.ps.n { +func (psp *pipeSkipProcessor) writeBlock(workerID uint, br *blockResult) { + rowsProcessed := psp.rowsProcessed.Add(uint64(len(br.timestamps))) + if rowsProcessed <= psp.ps.n { return } rowsProcessed -= uint64(len(br.timestamps)) - if rowsProcessed >= spp.ps.n { - spp.ppBase.writeBlock(workerID, br) + if rowsProcessed >= psp.ps.n { + psp.ppBase.writeBlock(workerID, br) return } - rowsSkip := spp.ps.n - rowsProcessed + rowsSkip := psp.ps.n - rowsProcessed br.skipRows(int(rowsSkip)) - spp.ppBase.writeBlock(workerID, br) + psp.ppBase.writeBlock(workerID, br) } -func (spp *pipeSkipProcessor) flush() error { +func (psp *pipeSkipProcessor) flush() error { return nil } diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index 4c38c28b8..8c8746c98 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -12,9 +12,18 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" ) +// pipeStats processes '| stats ...' queries. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#stats type pipeStats struct { + // byFields contains field names from 'by(...)' clause. byFields []string - funcs []statsFunc + + // resultNames contains names of output results generated by funcs. + resultNames []string + + // funcs contains stats functions to execute. + funcs []statsFunc } type statsFunc interface { @@ -48,8 +57,8 @@ type statsProcessor interface { // mergeState must merge sfp state into statsProcessor state. mergeState(sfp statsProcessor) - // finalizeStats must return the collected stats from statsProcessor. - finalizeStats() (name, value string) + // finalizeStats must return the collected stats result from statsProcessor. + finalizeStats() string } func (ps *pipeStats) String() string { @@ -63,7 +72,7 @@ func (ps *pipeStats) String() string { } a := make([]string, len(ps.funcs)) for i, f := range ps.funcs { - a[i] = f.String() + a[i] = f.String() + " as " + ps.resultNames[i] } s += strings.Join(a, ", ") return s @@ -83,7 +92,7 @@ func (ps *pipeStats) newPipeProcessor(workersCount int, stopCh <-chan struct{}, maxStateSize -= stateSizeBudgetChunk } - spp := &pipeStatsProcessor{ + psp := &pipeStatsProcessor{ ps: ps, stopCh: stopCh, cancel: cancel, @@ -93,9 +102,9 @@ func (ps *pipeStats) newPipeProcessor(workersCount int, stopCh <-chan struct{}, maxStateSize: maxStateSize, } - spp.stateSizeBudget.Store(maxStateSize) + psp.stateSizeBudget.Store(maxStateSize) - return spp + return psp } type pipeStatsProcessor struct { @@ -149,24 +158,24 @@ type pipeStatsGroup struct { sfps []statsProcessor } -func (spp *pipeStatsProcessor) writeBlock(workerID uint, br *blockResult) { - shard := &spp.shards[workerID] +func (psp *pipeStatsProcessor) writeBlock(workerID uint, br *blockResult) { + shard := &psp.shards[workerID] for shard.stateSizeBudget < 0 { // steal some budget for the state size from the global budget. - remaining := spp.stateSizeBudget.Add(-stateSizeBudgetChunk) + remaining := psp.stateSizeBudget.Add(-stateSizeBudgetChunk) if remaining < 0 { // The state size is too big. Stop processing data in order to avoid OOM crash. if remaining+stateSizeBudgetChunk >= 0 { // Notify worker goroutines to stop calling writeBlock() in order to save CPU time. - spp.cancel() + psp.cancel() } return } shard.stateSizeBudget += stateSizeBudgetChunk } - byFields := spp.ps.byFields + byFields := psp.ps.byFields if len(byFields) == 0 { // Fast path - pass all the rows to a single group with empty key. for _, sfp := range shard.getStatsProcessors(nil) { @@ -255,13 +264,13 @@ func (spp *pipeStatsProcessor) writeBlock(workerID uint, br *blockResult) { shard.keyBuf = keyBuf } -func (spp *pipeStatsProcessor) flush() error { - if n := spp.stateSizeBudget.Load(); n <= 0 { - return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", spp.ps.String(), spp.maxStateSize/(1<<20)) +func (psp *pipeStatsProcessor) flush() error { + if n := psp.stateSizeBudget.Load(); n <= 0 { + return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", psp.ps.String(), psp.maxStateSize/(1<<20)) } // Merge states across shards - shards := spp.shards + shards := psp.shards m := shards[0].m shards = shards[1:] for i := range shards { @@ -270,7 +279,7 @@ func (spp *pipeStatsProcessor) flush() error { // shard.m may be quite big, so this loop can take a lot of time and CPU. // Stop processing data as soon as stopCh is closed without wasting additional CPU time. select { - case <-spp.stopCh: + case <-psp.stopCh: return nil default: } @@ -287,7 +296,7 @@ func (spp *pipeStatsProcessor) flush() error { } // Write per-group states to ppBase - byFields := spp.ps.byFields + byFields := psp.ps.byFields if len(byFields) == 0 && len(m) == 0 { // Special case - zero matching rows. _ = shards[0].getStatsProcessors(nil) @@ -296,12 +305,18 @@ func (spp *pipeStatsProcessor) flush() error { var values []string var br blockResult - zeroTimestamps := []int64{0} + for _, f := range byFields { + br.addEmptyStringColumn(f) + } + for _, resultName := range psp.ps.resultNames { + br.addEmptyStringColumn(resultName) + } + for key, spg := range m { // m may be quite big, so this loop can take a lot of time and CPU. // Stop processing data as soon as stopCh is closed without wasting additional CPU time. select { - case <-spp.stopCh: + case <-psp.stopCh: return nil default: } @@ -321,21 +336,20 @@ func (spp *pipeStatsProcessor) flush() error { logger.Panicf("BUG: unexpected number of values decoded from keyBuf; got %d; want %d", len(values), len(byFields)) } - br.reset() - br.timestamps = zeroTimestamps - - // construct columns for byFields - for i, f := range byFields { - br.addConstColumn(f, values[i]) - } - - // construct columns for stats functions + // calculate values for stats functions for _, sfp := range spg.sfps { - name, value := sfp.finalizeStats() - br.addConstColumn(name, value) + value := sfp.finalizeStats() + values = append(values, value) } - spp.ppBase.writeBlock(0, &br) + br.addRow(0, values) + if len(br.timestamps) >= 1_000 { + psp.ppBase.writeBlock(0, &br) + br.resetRows() + } + } + if len(br.timestamps) > 0 { + psp.ppBase.writeBlock(0, &br) } return nil @@ -378,14 +392,17 @@ func parsePipeStats(lex *lexer) (*pipeStats, error) { ps.byFields = fields } + var resultNames []string var funcs []statsFunc for { - sf, err := parseStatsFunc(lex) + sf, resultName, err := parseStatsFunc(lex) if err != nil { return nil, err } + resultNames = append(resultNames, resultName) funcs = append(funcs, sf) if lex.isKeyword("|", ")", "") { + ps.resultNames = resultNames ps.funcs = funcs return &ps, nil } @@ -396,29 +413,36 @@ func parsePipeStats(lex *lexer) (*pipeStats, error) { } } -func parseStatsFunc(lex *lexer) (statsFunc, error) { +func parseStatsFunc(lex *lexer) (statsFunc, string, error) { + var sf statsFunc switch { case lex.isKeyword("count"): sfc, err := parseStatsCount(lex) if err != nil { - return nil, fmt.Errorf("cannot parse 'count' func: %w", err) + return nil, "", fmt.Errorf("cannot parse 'count' func: %w", err) } - return sfc, nil + sf = sfc case lex.isKeyword("uniq"): sfu, err := parseStatsUniq(lex) if err != nil { - return nil, fmt.Errorf("cannot parse 'uniq' func: %w", err) + return nil, "", fmt.Errorf("cannot parse 'uniq' func: %w", err) } - return sfu, nil + sf = sfu case lex.isKeyword("sum"): sfs, err := parseStatsSum(lex) if err != nil { - return nil, fmt.Errorf("cannot parse 'sum' func: %w", err) + return nil, "", fmt.Errorf("cannot parse 'sum' func: %w", err) } - return sfs, nil + sf = sfs default: - return nil, fmt.Errorf("unknown stats func %q", lex.token) + return nil, "", fmt.Errorf("unknown stats func %q", lex.token) } + + resultName, err := parseResultName(lex) + if err != nil { + return nil, "", fmt.Errorf("cannot parse result name: %w", err) + } + return sf, resultName, nil } func parseResultName(lex *lexer) (string, error) { diff --git a/lib/logstorage/stats_count.go b/lib/logstorage/stats_count.go index 04469c5bb..4d066ed7f 100644 --- a/lib/logstorage/stats_count.go +++ b/lib/logstorage/stats_count.go @@ -12,12 +12,10 @@ import ( type statsCount struct { fields []string containsStar bool - - resultName string } func (sc *statsCount) String() string { - return "count(" + fieldNamesString(sc.fields) + ") as " + quoteTokenIfNeeded(sc.resultName) + return "count(" + fieldNamesString(sc.fields) + ")" } func (sc *statsCount) neededFields() []string { @@ -192,9 +190,8 @@ func (scp *statsCountProcessor) mergeState(sfp statsProcessor) { scp.rowsCount += src.rowsCount } -func (scp *statsCountProcessor) finalizeStats() (string, string) { - value := strconv.FormatUint(scp.rowsCount, 10) - return scp.sc.resultName, value +func (scp *statsCountProcessor) finalizeStats() string { + return strconv.FormatUint(scp.rowsCount, 10) } func parseStatsCount(lex *lexer) (*statsCount, error) { @@ -203,14 +200,9 @@ func parseStatsCount(lex *lexer) (*statsCount, error) { if err != nil { return nil, fmt.Errorf("cannot parse 'count' args: %w", err) } - resultName, err := parseResultName(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse result name: %w", err) - } sc := &statsCount{ fields: fields, containsStar: slices.Contains(fields, "*"), - resultName: resultName, } return sc, nil } diff --git a/lib/logstorage/stats_sum.go b/lib/logstorage/stats_sum.go index 3173fbb8c..1a328f0ef 100644 --- a/lib/logstorage/stats_sum.go +++ b/lib/logstorage/stats_sum.go @@ -11,11 +11,10 @@ import ( type statsSum struct { fields []string containsStar bool - resultName string } func (ss *statsSum) String() string { - return "sum(" + fieldNamesString(ss.fields) + ") as " + quoteTokenIfNeeded(ss.resultName) + return "sum(" + fieldNamesString(ss.fields) + ")" } func (ss *statsSum) neededFields() []string { @@ -80,9 +79,8 @@ func (ssp *statsSumProcessor) mergeState(sfp statsProcessor) { ssp.sum += src.sum } -func (ssp *statsSumProcessor) finalizeStats() (string, string) { - value := strconv.FormatFloat(ssp.sum, 'f', -1, 64) - return ssp.ss.resultName, value +func (ssp *statsSumProcessor) finalizeStats() string { + return strconv.FormatFloat(ssp.sum, 'f', -1, 64) } func parseStatsSum(lex *lexer) (*statsSum, error) { @@ -94,14 +92,9 @@ func parseStatsSum(lex *lexer) (*statsSum, error) { if len(fields) == 0 { return nil, fmt.Errorf("'sum' must contain at least one arg") } - resultName, err := parseResultName(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse result name: %w", err) - } ss := &statsSum{ fields: fields, containsStar: slices.Contains(fields, "*"), - resultName: resultName, } return ss, nil } diff --git a/lib/logstorage/stats_unique.go b/lib/logstorage/stats_unique.go index 721cbd53a..19cfd00a9 100644 --- a/lib/logstorage/stats_unique.go +++ b/lib/logstorage/stats_unique.go @@ -13,11 +13,10 @@ import ( type statsUniq struct { fields []string containsStar bool - resultName string } func (su *statsUniq) String() string { - return "uniq(" + fieldNamesString(su.fields) + ") as " + quoteTokenIfNeeded(su.resultName) + return "uniq(" + fieldNamesString(su.fields) + ")" } func (su *statsUniq) neededFields() []string { @@ -347,10 +346,9 @@ func (sup *statsUniqProcessor) mergeState(sfp statsProcessor) { } } -func (sup *statsUniqProcessor) finalizeStats() (string, string) { +func (sup *statsUniqProcessor) finalizeStats() string { n := uint64(len(sup.m)) - value := strconv.FormatUint(n, 10) - return sup.su.resultName, value + return strconv.FormatUint(n, 10) } func parseStatsUniq(lex *lexer) (*statsUniq, error) { @@ -359,14 +357,9 @@ func parseStatsUniq(lex *lexer) (*statsUniq, error) { if err != nil { return nil, fmt.Errorf("cannot parse 'uniq' args: %w", err) } - resultName, err := parseResultName(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse result name: %w", err) - } su := &statsUniq{ fields: fields, containsStar: slices.Contains(fields, "*"), - resultName: resultName, } return su, nil }