VictoriaMetrics/lib/logstorage/pipe_math.go
Aliaksandr Valialkin e25b2e7f05
wip
2024-05-28 14:21:29 +02:00

736 lines
16 KiB
Go

package logstorage
import (
"fmt"
"math"
"strings"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
)
// pipeMath processes '| math ...' pipe.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe
type pipeMath struct {
entries []*mathEntry
}
type mathEntry struct {
// The calculated expr result is stored in resultField.
resultField string
// expr is the expression to calculate.
expr *mathExpr
}
type mathExpr struct {
// if isConst is set, then the given mathExpr returns the given constValue.
isConst bool
constValue float64
// constValueStr is the original string representation of constValue.
//
// It is used in String() method for returning the original representation of the given constValue.
constValueStr string
// if fieldName isn't empty, then the given mathExpr fetches numeric values from the given fieldName.
fieldName string
// args are args for the given mathExpr.
args []*mathExpr
// op is the operation name (aka function name) for the given mathExpr.
op string
// f is the function for calculating results for the given mathExpr.
f mathFunc
// whether the mathExpr was wrapped in parens.
wrappedInParens bool
}
// mathFunc must fill result with calculated results based on the given args.
type mathFunc func(result []float64, args [][]float64)
func (pm *pipeMath) String() string {
s := "math"
a := make([]string, len(pm.entries))
for i, e := range pm.entries {
a[i] = e.String()
}
s += " " + strings.Join(a, ", ")
return s
}
func (me *mathEntry) String() string {
s := me.expr.String()
if isMathBinaryOp(me.expr.op) {
s = "(" + s + ")"
}
s += " as " + quoteTokenIfNeeded(me.resultField)
return s
}
func (me *mathExpr) String() string {
if me.isConst {
return me.constValueStr
}
if me.fieldName != "" {
return quoteTokenIfNeeded(me.fieldName)
}
args := me.args
if isMathBinaryOp(me.op) {
opPriority := getMathBinaryOpPriority(me.op)
left := me.args[0]
right := me.args[1]
leftStr := left.String()
rightStr := right.String()
if isMathBinaryOp(left.op) && getMathBinaryOpPriority(left.op) > opPriority {
leftStr = "(" + leftStr + ")"
}
if isMathBinaryOp(right.op) && getMathBinaryOpPriority(right.op) > opPriority {
rightStr = "(" + rightStr + ")"
}
return fmt.Sprintf("%s %s %s", leftStr, me.op, rightStr)
}
if me.op == "unary_minus" {
argStr := args[0].String()
if isMathBinaryOp(args[0].op) {
argStr = "(" + argStr + ")"
}
return "-" + argStr
}
a := make([]string, len(args))
for i, arg := range args {
a[i] = arg.String()
}
argsStr := strings.Join(a, ", ")
return fmt.Sprintf("%s(%s)", me.op, argsStr)
}
func isMathBinaryOp(op string) bool {
_, ok := mathBinaryOps[op]
return ok
}
func getMathBinaryOpPriority(op string) int {
bo, ok := mathBinaryOps[op]
if !ok {
logger.Panicf("BUG: unexpected binary op: %q", op)
}
return bo.priority
}
func getMathFuncForBinaryOp(op string) (mathFunc, error) {
bo, ok := mathBinaryOps[op]
if !ok {
return nil, fmt.Errorf("unsupported binary operation: %q", op)
}
return bo.f, nil
}
var mathBinaryOps = map[string]mathBinaryOp{
"^": {
priority: 1,
f: mathFuncPow,
},
"*": {
priority: 2,
f: mathFuncMul,
},
"/": {
priority: 2,
f: mathFuncDiv,
},
"%": {
priority: 2,
f: mathFuncMod,
},
"+": {
priority: 3,
f: mathFuncPlus,
},
"-": {
priority: 3,
f: mathFuncMinus,
},
}
type mathBinaryOp struct {
priority int
f mathFunc
}
func (pm *pipeMath) updateNeededFields(neededFields, unneededFields fieldsSet) {
for i := len(pm.entries) - 1; i >= 0; i-- {
e := pm.entries[i]
if neededFields.contains("*") {
if !unneededFields.contains(e.resultField) {
unneededFields.add(e.resultField)
entryFields := e.getNeededFields()
unneededFields.removeFields(entryFields)
}
} else {
if neededFields.contains(e.resultField) {
neededFields.remove(e.resultField)
entryFields := e.getNeededFields()
neededFields.addFields(entryFields)
}
}
}
}
func (me *mathEntry) getNeededFields() []string {
neededFields := newFieldsSet()
me.expr.updateNeededFields(neededFields)
return neededFields.getAll()
}
func (me *mathExpr) updateNeededFields(neededFields fieldsSet) {
if me.isConst {
return
}
if me.fieldName != "" {
neededFields.add(me.fieldName)
return
}
for _, arg := range me.args {
arg.updateNeededFields(neededFields)
}
}
func (pm *pipeMath) optimize() {
// nothing to do
}
func (pm *pipeMath) hasFilterInWithQuery() bool {
return false
}
func (pm *pipeMath) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) {
return pm, nil
}
func (pm *pipeMath) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
pmp := &pipeMathProcessor{
pm: pm,
ppNext: ppNext,
shards: make([]pipeMathProcessorShard, workersCount),
}
return pmp
}
type pipeMathProcessor struct {
pm *pipeMath
ppNext pipeProcessor
shards []pipeMathProcessorShard
}
type pipeMathProcessorShard struct {
pipeMathProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(pipeMathProcessorShardNopad{})%128]byte
}
type pipeMathProcessorShardNopad struct {
// a holds all the data for rcs.
a arena
// rcs is used for storing calculated results before they are written to ppNext.
rcs []resultColumn
// rs is storage for temporary results
rs [][]float64
// rsBuf is backing storage for rs slices
rsBuf []float64
}
func (shard *pipeMathProcessorShard) executeMathEntry(e *mathEntry, rc *resultColumn, br *blockResult) {
clear(shard.rs)
shard.rs = shard.rs[:0]
shard.rsBuf = shard.rsBuf[:0]
shard.executeExpr(e.expr, br)
r := shard.rs[0]
b := shard.a.b
for _, f := range r {
bLen := len(b)
b = marshalFloat64String(b, f)
v := bytesutil.ToUnsafeString(b[bLen:])
rc.addValue(v)
}
shard.a.b = b
}
func (shard *pipeMathProcessorShard) executeExpr(me *mathExpr, br *blockResult) {
rIdx := len(shard.rs)
shard.rs = slicesutil.SetLength(shard.rs, len(shard.rs)+1)
shard.rsBuf = slicesutil.SetLength(shard.rsBuf, len(shard.rsBuf)+len(br.timestamps))
shard.rs[rIdx] = shard.rsBuf[len(shard.rsBuf)-len(br.timestamps):]
if me.isConst {
r := shard.rs[rIdx]
for i := range br.timestamps {
r[i] = me.constValue
}
return
}
if me.fieldName != "" {
c := br.getColumnByName(me.fieldName)
values := c.getValues(br)
r := shard.rs[rIdx]
var f float64
for i, v := range values {
if i == 0 || v != values[i-1] {
var ok bool
f, ok = tryParseFloat64(v)
if !ok {
f = nan
}
}
r[i] = f
}
return
}
rsBufLen := len(shard.rsBuf)
for _, arg := range me.args {
shard.executeExpr(arg, br)
}
me.f(shard.rs[rIdx], shard.rs[rIdx+1:])
shard.rs = shard.rs[:rIdx+1]
shard.rsBuf = shard.rsBuf[:rsBufLen]
}
func (pmp *pipeMathProcessor) writeBlock(workerID uint, br *blockResult) {
if len(br.timestamps) == 0 {
return
}
shard := &pmp.shards[workerID]
entries := pmp.pm.entries
shard.rcs = slicesutil.SetLength(shard.rcs, len(entries))
rcs := shard.rcs
for i, e := range entries {
rc := &rcs[i]
rc.name = e.resultField
shard.executeMathEntry(e, rc, br)
br.addResultColumn(rc)
}
pmp.ppNext.writeBlock(workerID, br)
for i := range rcs {
rcs[i].resetValues()
}
shard.a.reset()
}
func (pmp *pipeMathProcessor) flush() error {
return nil
}
func parsePipeMath(lex *lexer, needMathKeyword bool) (*pipeMath, error) {
if needMathKeyword {
if !lex.isKeyword("math") {
return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "math")
}
lex.nextToken()
}
var mes []*mathEntry
for {
me, err := parseMathEntry(lex)
if err != nil {
return nil, err
}
mes = append(mes, me)
switch {
case lex.isKeyword(","):
lex.nextToken()
case lex.isKeyword("|", ")", ""):
if len(mes) == 0 {
return nil, fmt.Errorf("missing 'math' expressions")
}
pm := &pipeMath{
entries: mes,
}
return pm, nil
default:
return nil, fmt.Errorf("unexpected token after 'math' expression [%s]: %q; expecting ',', '|' or ')'", mes[len(mes)-1], lex.token)
}
}
}
func parseMathEntry(lex *lexer) (*mathEntry, error) {
me, err := parseMathExpr(lex)
if err != nil {
return nil, err
}
if !lex.isKeyword("as") {
return nil, fmt.Errorf("missing 'as' after [%s]", me)
}
lex.nextToken()
resultField, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result name: %w", err)
}
e := &mathEntry{
resultField: resultField,
expr: me,
}
return e, nil
}
func parseMathExpr(lex *lexer) (*mathExpr, error) {
// parse left operand
left, err := parseMathExprOperand(lex)
if err != nil {
return nil, err
}
if lex.isKeyword("as", "|", ")", ",", "") {
// There is no right operand
return left, nil
}
for {
// parse operator
op := lex.token
lex.nextToken()
f, err := getMathFuncForBinaryOp(op)
if err != nil {
return nil, fmt.Errorf("cannot parse operator after [%s]: %w", left, err)
}
// parse right operand
right, err := parseMathExprOperand(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse operand after [%s %s]: %w", left, op, err)
}
me := &mathExpr{
args: []*mathExpr{left, right},
op: op,
f: f,
}
// balance operands according to their priority
if !left.wrappedInParens && isMathBinaryOp(left.op) && getMathBinaryOpPriority(left.op) > getMathBinaryOpPriority(op) {
me.args[0] = left.args[1]
left.args[1] = me
me = left
}
if !lex.isKeyword("as", "|", ")", ",", "") {
left = me
continue
}
return me, nil
}
}
func parseMathExprInParens(lex *lexer) (*mathExpr, error) {
if !lex.isKeyword("(") {
return nil, fmt.Errorf("missing '('")
}
lex.nextToken()
me, err := parseMathExpr(lex)
if err != nil {
return nil, err
}
me.wrappedInParens = true
if !lex.isKeyword(")") {
return nil, fmt.Errorf("missing ')'; got %q instead", lex.token)
}
lex.nextToken()
return me, nil
}
func parseMathExprOperand(lex *lexer) (*mathExpr, error) {
if lex.isKeyword("(") {
return parseMathExprInParens(lex)
}
switch {
case lex.isKeyword("abs"):
return parseMathExprAbs(lex)
case lex.isKeyword("min"):
return parseMathExprMin(lex)
case lex.isKeyword("max"):
return parseMathExprMax(lex)
case lex.isKeyword("-"):
return parseMathExprUnaryMinus(lex)
case lex.isKeyword("+"):
// just skip unary plus
lex.nextToken()
return parseMathExprOperand(lex)
case lex.isNumber():
return parseMathExprConstNumber(lex)
default:
return parseMathExprFieldName(lex)
}
}
func parseMathExprAbs(lex *lexer) (*mathExpr, error) {
me, err := parseMathExprGenericFunc(lex, "abs", mathFuncAbs)
if err != nil {
return nil, err
}
if len(me.args) != 1 {
return nil, fmt.Errorf("'abs' function accepts only one arg; got %d args: [%s]", len(me.args), me)
}
return me, nil
}
func parseMathExprMin(lex *lexer) (*mathExpr, error) {
me, err := parseMathExprGenericFunc(lex, "min", mathFuncMin)
if err != nil {
return nil, err
}
if len(me.args) < 2 {
return nil, fmt.Errorf("'min' function needs at least 2 args; got %d args: [%s]", len(me.args), me)
}
return me, nil
}
func parseMathExprMax(lex *lexer) (*mathExpr, error) {
me, err := parseMathExprGenericFunc(lex, "max", mathFuncMax)
if err != nil {
return nil, err
}
if len(me.args) < 2 {
return nil, fmt.Errorf("'max' function needs at least 2 args; got %d args: [%s]", len(me.args), me)
}
return me, nil
}
func parseMathExprGenericFunc(lex *lexer, funcName string, f mathFunc) (*mathExpr, error) {
if !lex.isKeyword(funcName) {
return nil, fmt.Errorf("missing %q keyword", funcName)
}
lex.nextToken()
args, err := parseMathFuncArgs(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse args for %q function: %w", funcName, err)
}
if len(args) == 0 {
return nil, fmt.Errorf("%q function needs at least one org", funcName)
}
me := &mathExpr{
args: args,
op: funcName,
f: f,
}
return me, nil
}
func parseMathFuncArgs(lex *lexer) ([]*mathExpr, error) {
if !lex.isKeyword("(") {
return nil, fmt.Errorf("missing '('")
}
lex.nextToken()
var args []*mathExpr
for {
if lex.isKeyword(")") {
lex.nextToken()
return args, nil
}
me, err := parseMathExpr(lex)
if err != nil {
return nil, err
}
args = append(args, me)
switch {
case lex.isKeyword(")"):
case lex.isKeyword(","):
lex.nextToken()
default:
return nil, fmt.Errorf("unexpected token after [%s]: %q; want ',' or ')'", me, lex.token)
}
}
}
func parseMathExprUnaryMinus(lex *lexer) (*mathExpr, error) {
if !lex.isKeyword("-") {
return nil, fmt.Errorf("missing '-'")
}
lex.nextToken()
expr, err := parseMathExprOperand(lex)
if err != nil {
return nil, err
}
me := &mathExpr{
args: []*mathExpr{expr},
op: "unary_minus",
f: mathFuncUnaryMinus,
}
return me, nil
}
func parseMathExprConstNumber(lex *lexer) (*mathExpr, error) {
if !lex.isNumber() {
return nil, fmt.Errorf("cannot parse number from %q", lex.token)
}
numStr, err := getCompoundMathToken(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse number: %w", err)
}
f, ok := tryParseNumber(numStr)
if !ok {
return nil, fmt.Errorf("cannot parse number from %q", numStr)
}
me := &mathExpr{
isConst: true,
constValue: f,
constValueStr: numStr,
}
return me, nil
}
func parseMathExprFieldName(lex *lexer) (*mathExpr, error) {
fieldName, err := parseFieldName(lex)
if err != nil {
return nil, err
}
fieldName = getCanonicalColumnName(fieldName)
me := &mathExpr{
fieldName: fieldName,
}
return me, nil
}
func getCompoundMathToken(lex *lexer) (string, error) {
stopTokens := []string{"=", "+", "-", "*", "/", "%", "^", ",", ")", "|", ""}
if lex.isKeyword(stopTokens...) {
return "", fmt.Errorf("compound token cannot start with '%s'", lex.token)
}
s := lex.token
rawS := lex.rawToken
lex.nextToken()
suffix := ""
for !lex.isSkippedSpace && !lex.isKeyword(stopTokens...) {
s += lex.token
lex.nextToken()
}
if suffix == "" {
return s, nil
}
return rawS + suffix, nil
}
func mathFuncPlus(result []float64, args [][]float64) {
a := args[0]
b := args[1]
for i := range result {
result[i] = a[i] + b[i]
}
}
func mathFuncMinus(result []float64, args [][]float64) {
a := args[0]
b := args[1]
for i := range result {
result[i] = a[i] - b[i]
}
}
func mathFuncMul(result []float64, args [][]float64) {
a := args[0]
b := args[1]
for i := range result {
result[i] = a[i] * b[i]
}
}
func mathFuncDiv(result []float64, args [][]float64) {
a := args[0]
b := args[1]
for i := range result {
result[i] = a[i] / b[i]
}
}
func mathFuncMod(result []float64, args [][]float64) {
a := args[0]
b := args[1]
for i := range result {
result[i] = math.Mod(a[i], b[i])
}
}
func mathFuncPow(result []float64, args [][]float64) {
a := args[0]
b := args[1]
for i := range result {
result[i] = math.Pow(a[i], b[i])
}
}
func mathFuncAbs(result []float64, args [][]float64) {
arg := args[0]
for i := range result {
result[i] = math.Abs(arg[i])
}
}
func mathFuncUnaryMinus(result []float64, args [][]float64) {
arg := args[0]
for i := range result {
result[i] = -arg[i]
}
}
func mathFuncMin(result []float64, args [][]float64) {
for i := range result {
f := nan
for _, arg := range args {
if math.IsNaN(f) || arg[i] < f {
f = arg[i]
}
}
result[i] = f
}
}
func mathFuncMax(result []float64, args [][]float64) {
for i := range result {
f := nan
for _, arg := range args {
if math.IsNaN(f) || arg[i] > f {
f = arg[i]
}
}
result[i] = f
}
}