VictoriaMetrics/lib/logstorage/pipe_len.go
Aliaksandr Valialkin a4ea3b87d7
lib/logstorage: optimize query imeediately after its parsing
This eliminates possible bugs related to forgotten Query.Optimize() calls.

This also allows removing optimize() function from pipe interface.

While at it, drop filterNoop inside filterAnd.

(cherry picked from commit 66b2987f49)
2024-11-08 17:07:56 +01:00

170 lines
3.8 KiB
Go

package logstorage
import (
"fmt"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
// pipeLen processes '| len ...' pipe.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#len-pipe
type pipeLen struct {
fieldName string
resultField string
}
func (pl *pipeLen) String() string {
s := "len(" + quoteTokenIfNeeded(pl.fieldName) + ")"
if !isMsgFieldName(pl.resultField) {
s += " as " + quoteTokenIfNeeded(pl.resultField)
}
return s
}
func (pl *pipeLen) canLiveTail() bool {
return true
}
func (pl *pipeLen) updateNeededFields(neededFields, unneededFields fieldsSet) {
if neededFields.contains("*") {
if !unneededFields.contains(pl.resultField) {
unneededFields.add(pl.resultField)
unneededFields.remove(pl.fieldName)
}
} else {
if neededFields.contains(pl.resultField) {
neededFields.remove(pl.resultField)
neededFields.add(pl.fieldName)
}
}
}
func (pl *pipeLen) hasFilterInWithQuery() bool {
return false
}
func (pl *pipeLen) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) {
return pl, nil
}
func (pl *pipeLen) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
return &pipeLenProcessor{
pl: pl,
ppNext: ppNext,
shards: make([]pipeLenProcessorShard, workersCount),
}
}
type pipeLenProcessor struct {
pl *pipeLen
ppNext pipeProcessor
shards []pipeLenProcessorShard
}
type pipeLenProcessorShard struct {
pipeLenProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(pipeLenProcessorShardNopad{})%128]byte
}
type pipeLenProcessorShardNopad struct {
a arena
rc resultColumn
}
func (plp *pipeLenProcessor) writeBlock(workerID uint, br *blockResult) {
if br.rowsLen == 0 {
return
}
shard := &plp.shards[workerID]
shard.rc.name = plp.pl.resultField
c := br.getColumnByName(plp.pl.fieldName)
if c.isConst {
// Fast path for const column
vLen := len(c.valuesEncoded[0])
shard.a.b = marshalUint64String(shard.a.b[:0], uint64(vLen))
vLenStr := bytesutil.ToUnsafeString(shard.a.b)
for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
shard.rc.addValue(vLenStr)
}
} else {
// Slow path for other columns
for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
v := c.getValueAtRow(br, rowIdx)
vLen := len(v)
aLen := len(shard.a.b)
shard.a.b = marshalUint64String(shard.a.b, uint64(vLen))
vLenStr := bytesutil.ToUnsafeString(shard.a.b[aLen:])
shard.rc.addValue(vLenStr)
}
}
// Write the result to ppNext
br.addResultColumn(&shard.rc)
plp.ppNext.writeBlock(workerID, br)
shard.a.reset()
shard.rc.reset()
}
func (plp *pipeLenProcessor) flush() error {
return nil
}
func parsePipeLen(lex *lexer) (*pipeLen, error) {
if !lex.isKeyword("len") {
return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "len")
}
lex.nextToken()
fieldName, err := parseFieldNameWithOptionalParens(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse field name for 'len' pipe: %w", err)
}
// parse optional 'as ...` part
resultField := "_msg"
if lex.isKeyword("as") {
lex.nextToken()
}
if !lex.isKeyword("|", ")", "") {
field, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result field after 'len(%s)': %w", quoteTokenIfNeeded(fieldName), err)
}
resultField = field
}
pl := &pipeLen{
fieldName: fieldName,
resultField: resultField,
}
return pl, nil
}
func parseFieldNameWithOptionalParens(lex *lexer) (string, error) {
hasParens := false
if lex.isKeyword("(") {
lex.nextToken()
hasParens = true
}
fieldName, err := parseFieldName(lex)
if err != nil {
return "", err
}
if hasParens {
if !lex.isKeyword(")") {
return "", fmt.Errorf("missing ')' after '%s'", quoteTokenIfNeeded(fieldName))
}
lex.nextToken()
}
return fieldName, nil
}