mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
66b2987f49
This eliminates possible bugs related to forgotten Query.Optimize() calls. This also allows removing optimize() function from pipe interface. While at it, drop filterNoop inside filterAnd.
170 lines
4.2 KiB
Go
170 lines
4.2 KiB
Go
package logstorage
|
|
|
|
import (
|
|
"fmt"
|
|
"regexp"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
)
|
|
|
|
// pipeReplaceRegexp processes '| replace_regexp ...' pipe.
|
|
//
|
|
// See https://docs.victoriametrics.com/victorialogs/logsql/#replace_regexp-pipe
|
|
type pipeReplaceRegexp struct {
|
|
field string
|
|
re *regexp.Regexp
|
|
replacement string
|
|
|
|
// limit limits the number of replacements, which can be performed
|
|
limit uint64
|
|
|
|
// iff is an optional filter for skipping the replace_regexp operation
|
|
iff *ifFilter
|
|
}
|
|
|
|
func (pr *pipeReplaceRegexp) String() string {
|
|
s := "replace_regexp"
|
|
if pr.iff != nil {
|
|
s += " " + pr.iff.String()
|
|
}
|
|
s += fmt.Sprintf(" (%s, %s)", quoteTokenIfNeeded(pr.re.String()), quoteTokenIfNeeded(pr.replacement))
|
|
if pr.field != "_msg" {
|
|
s += " at " + quoteTokenIfNeeded(pr.field)
|
|
}
|
|
if pr.limit > 0 {
|
|
s += fmt.Sprintf(" limit %d", pr.limit)
|
|
}
|
|
return s
|
|
}
|
|
|
|
func (pr *pipeReplaceRegexp) canLiveTail() bool {
|
|
return true
|
|
}
|
|
|
|
func (pr *pipeReplaceRegexp) updateNeededFields(neededFields, unneededFields fieldsSet) {
|
|
updateNeededFieldsForUpdatePipe(neededFields, unneededFields, pr.field, pr.iff)
|
|
}
|
|
|
|
func (pr *pipeReplaceRegexp) hasFilterInWithQuery() bool {
|
|
return pr.iff.hasFilterInWithQuery()
|
|
}
|
|
|
|
func (pr *pipeReplaceRegexp) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) {
|
|
iffNew, err := pr.iff.initFilterInValues(cache, getFieldValuesFunc)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
peNew := *pr
|
|
peNew.iff = iffNew
|
|
return &peNew, nil
|
|
}
|
|
|
|
func (pr *pipeReplaceRegexp) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
|
|
updateFunc := func(a *arena, v string) string {
|
|
bLen := len(a.b)
|
|
a.b = appendReplaceRegexp(a.b, v, pr.re, pr.replacement, pr.limit)
|
|
return bytesutil.ToUnsafeString(a.b[bLen:])
|
|
}
|
|
|
|
return newPipeUpdateProcessor(workersCount, updateFunc, ppNext, pr.field, pr.iff)
|
|
|
|
}
|
|
|
|
func parsePipeReplaceRegexp(lex *lexer) (*pipeReplaceRegexp, error) {
|
|
if !lex.isKeyword("replace_regexp") {
|
|
return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "replace_regexp")
|
|
}
|
|
lex.nextToken()
|
|
|
|
// parse optional if (...)
|
|
var iff *ifFilter
|
|
if lex.isKeyword("if") {
|
|
f, err := parseIfFilter(lex)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
iff = f
|
|
}
|
|
|
|
if !lex.isKeyword("(") {
|
|
return nil, fmt.Errorf("missing '(' after 'replace_regexp'")
|
|
}
|
|
lex.nextToken()
|
|
|
|
reStr, err := getCompoundToken(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse reStr in 'replace_regexp': %w", err)
|
|
}
|
|
re, err := regexp.Compile(reStr)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse regexp %q in 'replace_regexp': %w", reStr, err)
|
|
}
|
|
if !lex.isKeyword(",") {
|
|
return nil, fmt.Errorf("missing ',' after 'replace_regexp(%q'", reStr)
|
|
}
|
|
lex.nextToken()
|
|
|
|
replacement, err := getCompoundToken(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse replacement in 'replace_regexp(%q': %w", reStr, err)
|
|
}
|
|
|
|
if !lex.isKeyword(")") {
|
|
return nil, fmt.Errorf("missing ')' after 'replace_regexp(%q, %q'", reStr, replacement)
|
|
}
|
|
lex.nextToken()
|
|
|
|
field := "_msg"
|
|
if lex.isKeyword("at") {
|
|
lex.nextToken()
|
|
f, err := parseFieldName(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse 'at' field after 'replace_regexp(%q, %q)': %w", reStr, replacement, err)
|
|
}
|
|
field = f
|
|
}
|
|
|
|
limit := uint64(0)
|
|
if lex.isKeyword("limit") {
|
|
lex.nextToken()
|
|
n, ok := tryParseUint64(lex.token)
|
|
if !ok {
|
|
return nil, fmt.Errorf("cannot parse 'limit %s' in 'replace_regexp'", lex.token)
|
|
}
|
|
lex.nextToken()
|
|
limit = n
|
|
}
|
|
|
|
pr := &pipeReplaceRegexp{
|
|
field: field,
|
|
re: re,
|
|
replacement: replacement,
|
|
limit: limit,
|
|
iff: iff,
|
|
}
|
|
|
|
return pr, nil
|
|
}
|
|
|
|
func appendReplaceRegexp(dst []byte, s string, re *regexp.Regexp, replacement string, limit uint64) []byte {
|
|
if len(s) == 0 {
|
|
return dst
|
|
}
|
|
|
|
replacements := uint64(0)
|
|
for {
|
|
locs := re.FindStringSubmatchIndex(s)
|
|
if locs == nil {
|
|
return append(dst, s...)
|
|
}
|
|
start := locs[0]
|
|
dst = append(dst, s[:start]...)
|
|
end := locs[1]
|
|
dst = re.ExpandString(dst, replacement, s, locs)
|
|
s = s[end:]
|
|
replacements++
|
|
if limit > 0 && replacements >= limit {
|
|
return append(dst, s...)
|
|
}
|
|
}
|
|
}
|