mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
66b2987f49
This eliminates possible bugs related to forgotten Query.Optimize() calls. This also allows removing optimize() function from pipe interface. While at it, drop filterNoop inside filterAnd.
172 lines
4.1 KiB
Go
172 lines
4.1 KiB
Go
package logstorage
|
|
|
|
import (
|
|
"fmt"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// pipeUnpackSyslog processes '| unpack_syslog ...' pipe.
|
|
//
|
|
// See https://docs.victoriametrics.com/victorialogs/logsql/#unpack_syslog-pipe
|
|
type pipeUnpackSyslog struct {
|
|
// fromField is the field to unpack syslog fields from
|
|
fromField string
|
|
|
|
// the timezone to use when parsing rfc3164 timestamps
|
|
offsetStr string
|
|
offsetTimezone *time.Location
|
|
|
|
// resultPrefix is prefix to add to unpacked field names
|
|
resultPrefix string
|
|
|
|
keepOriginalFields bool
|
|
|
|
// iff is an optional filter for skipping unpacking syslog
|
|
iff *ifFilter
|
|
}
|
|
|
|
func (pu *pipeUnpackSyslog) String() string {
|
|
s := "unpack_syslog"
|
|
if pu.iff != nil {
|
|
s += " " + pu.iff.String()
|
|
}
|
|
if !isMsgFieldName(pu.fromField) {
|
|
s += " from " + quoteTokenIfNeeded(pu.fromField)
|
|
}
|
|
if pu.offsetStr != "" {
|
|
s += " offset " + pu.offsetStr
|
|
}
|
|
if pu.resultPrefix != "" {
|
|
s += " result_prefix " + quoteTokenIfNeeded(pu.resultPrefix)
|
|
}
|
|
if pu.keepOriginalFields {
|
|
s += " keep_original_fields"
|
|
}
|
|
return s
|
|
}
|
|
|
|
func (pu *pipeUnpackSyslog) canLiveTail() bool {
|
|
return true
|
|
}
|
|
|
|
func (pu *pipeUnpackSyslog) updateNeededFields(neededFields, unneededFields fieldsSet) {
|
|
updateNeededFieldsForUnpackPipe(pu.fromField, nil, pu.keepOriginalFields, false, pu.iff, neededFields, unneededFields)
|
|
}
|
|
|
|
func (pu *pipeUnpackSyslog) hasFilterInWithQuery() bool {
|
|
return pu.iff.hasFilterInWithQuery()
|
|
}
|
|
|
|
func (pu *pipeUnpackSyslog) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) {
|
|
iffNew, err := pu.iff.initFilterInValues(cache, getFieldValuesFunc)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
puNew := *pu
|
|
puNew.iff = iffNew
|
|
return &puNew, nil
|
|
}
|
|
|
|
func (pu *pipeUnpackSyslog) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
|
|
unpackSyslog := func(uctx *fieldsUnpackerContext, s string) {
|
|
year := currentYear.Load()
|
|
p := GetSyslogParser(int(year), pu.offsetTimezone)
|
|
|
|
p.Parse(s)
|
|
for _, f := range p.Fields {
|
|
uctx.addField(f.Name, f.Value)
|
|
}
|
|
|
|
PutSyslogParser(p)
|
|
}
|
|
|
|
return newPipeUnpackProcessor(workersCount, unpackSyslog, ppNext, pu.fromField, pu.resultPrefix, pu.keepOriginalFields, false, pu.iff)
|
|
}
|
|
|
|
var currentYear atomic.Int64
|
|
|
|
func init() {
|
|
year := time.Now().UTC().Year()
|
|
currentYear.Store(int64(year))
|
|
go func() {
|
|
for {
|
|
t := time.Now().UTC()
|
|
nextYear := time.Date(t.Year()+1, 1, 1, 0, 0, 0, 0, time.UTC)
|
|
d := nextYear.Sub(t)
|
|
time.Sleep(d)
|
|
year := time.Now().UTC().Year()
|
|
currentYear.Store(int64(year))
|
|
}
|
|
}()
|
|
}
|
|
|
|
func parsePipeUnpackSyslog(lex *lexer) (*pipeUnpackSyslog, error) {
|
|
if !lex.isKeyword("unpack_syslog") {
|
|
return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "unpack_syslog")
|
|
}
|
|
lex.nextToken()
|
|
|
|
var iff *ifFilter
|
|
if lex.isKeyword("if") {
|
|
f, err := parseIfFilter(lex)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
iff = f
|
|
}
|
|
|
|
fromField := "_msg"
|
|
if lex.isKeyword("from") {
|
|
lex.nextToken()
|
|
f, err := parseFieldName(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse 'from' field name: %w", err)
|
|
}
|
|
fromField = f
|
|
}
|
|
|
|
offsetStr := ""
|
|
offsetTimezone := time.Local
|
|
if lex.isKeyword("offset") {
|
|
lex.nextToken()
|
|
s, err := getCompoundToken(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read 'offset': %w", err)
|
|
}
|
|
offsetStr = s
|
|
nsecs, ok := tryParseDuration(offsetStr)
|
|
if !ok {
|
|
return nil, fmt.Errorf("cannot parse 'offset' from %q", offsetStr)
|
|
}
|
|
secs := nsecs / nsecsPerSecond
|
|
offsetTimezone = time.FixedZone("custom", int(secs))
|
|
}
|
|
|
|
resultPrefix := ""
|
|
if lex.isKeyword("result_prefix") {
|
|
lex.nextToken()
|
|
p, err := getCompoundToken(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse 'result_prefix': %w", err)
|
|
}
|
|
resultPrefix = p
|
|
}
|
|
|
|
keepOriginalFields := false
|
|
if lex.isKeyword("keep_original_fields") {
|
|
lex.nextToken()
|
|
keepOriginalFields = true
|
|
}
|
|
|
|
pu := &pipeUnpackSyslog{
|
|
fromField: fromField,
|
|
offsetStr: offsetStr,
|
|
offsetTimezone: offsetTimezone,
|
|
resultPrefix: resultPrefix,
|
|
keepOriginalFields: keepOriginalFields,
|
|
iff: iff,
|
|
}
|
|
|
|
return pu, nil
|
|
}
|