mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
16ee470da6
In this case the _msg field is set to the value specified in the -defaultMsgValue command-line flag. This should simplify first-time migration to VictoriaLogs from other systems.
262 lines
7 KiB
Go
262 lines
7 KiB
Go
package insertutils
|
|
|
|
import (
|
|
"flag"
|
|
"net/http"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/metrics"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
|
|
)
|
|
|
|
var (
|
|
defaultMsgValue = flag.String("defaultMsgValue", "missing _msg field; see https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field",
|
|
"Default value for _msg field if the ingested log entry doesn't contain it; see https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field")
|
|
)
|
|
|
|
// CommonParams contains common HTTP parameters used by log ingestion APIs.
|
|
//
|
|
// See https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-parameters
|
|
type CommonParams struct {
|
|
TenantID logstorage.TenantID
|
|
TimeField string
|
|
MsgFields []string
|
|
StreamFields []string
|
|
IgnoreFields []string
|
|
|
|
Debug bool
|
|
DebugRequestURI string
|
|
DebugRemoteAddr string
|
|
}
|
|
|
|
// GetCommonParams returns CommonParams from r.
|
|
func GetCommonParams(r *http.Request) (*CommonParams, error) {
|
|
// Extract tenantID
|
|
tenantID, err := logstorage.GetTenantIDFromRequest(r)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Extract time field name from _time_field query arg or header
|
|
timeField := "_time"
|
|
if tf := r.FormValue("_time_field"); tf != "" {
|
|
timeField = tf
|
|
} else if tf = r.Header.Get("VL-Time-Field"); tf != "" {
|
|
timeField = tf
|
|
}
|
|
|
|
// Extract message field name from _msg_field query arg or header
|
|
msgField := ""
|
|
if msgf := r.FormValue("_msg_field"); msgf != "" {
|
|
msgField = msgf
|
|
} else if msgf = r.Header.Get("VL-Msg-Field"); msgf != "" {
|
|
msgField = msgf
|
|
}
|
|
var msgFields []string
|
|
if msgField != "" {
|
|
msgFields = strings.Split(msgField, ",")
|
|
}
|
|
|
|
streamFields := httputils.GetArray(r, "_stream_fields")
|
|
if len(streamFields) == 0 {
|
|
if sf := r.Header.Get("VL-Stream-Fields"); len(sf) > 0 {
|
|
streamFields = strings.Split(sf, ",")
|
|
}
|
|
}
|
|
ignoreFields := httputils.GetArray(r, "ignore_fields")
|
|
if len(ignoreFields) == 0 {
|
|
if f := r.Header.Get("VL-Ignore-Fields"); len(f) > 0 {
|
|
ignoreFields = strings.Split(f, ",")
|
|
}
|
|
}
|
|
|
|
debug := httputils.GetBool(r, "debug")
|
|
if !debug {
|
|
if dh := r.Header.Get("VL-Debug"); len(dh) > 0 {
|
|
hv := strings.ToLower(dh)
|
|
switch hv {
|
|
case "", "0", "f", "false", "no":
|
|
default:
|
|
debug = true
|
|
}
|
|
}
|
|
}
|
|
debugRequestURI := ""
|
|
debugRemoteAddr := ""
|
|
if debug {
|
|
debugRequestURI = httpserver.GetRequestURI(r)
|
|
debugRemoteAddr = httpserver.GetQuotedRemoteAddr(r)
|
|
}
|
|
|
|
cp := &CommonParams{
|
|
TenantID: tenantID,
|
|
TimeField: timeField,
|
|
MsgFields: msgFields,
|
|
StreamFields: streamFields,
|
|
IgnoreFields: ignoreFields,
|
|
Debug: debug,
|
|
DebugRequestURI: debugRequestURI,
|
|
DebugRemoteAddr: debugRemoteAddr,
|
|
}
|
|
|
|
return cp, nil
|
|
}
|
|
|
|
// GetCommonParamsForSyslog returns common params needed for parsing syslog messages and storing them to the given tenantID.
|
|
func GetCommonParamsForSyslog(tenantID logstorage.TenantID) *CommonParams {
|
|
// See https://docs.victoriametrics.com/victorialogs/logsql/#unpack_syslog-pipe
|
|
cp := &CommonParams{
|
|
TenantID: tenantID,
|
|
TimeField: "timestamp",
|
|
MsgFields: []string{
|
|
"message",
|
|
},
|
|
StreamFields: []string{
|
|
"hostname",
|
|
"app_name",
|
|
"proc_id",
|
|
},
|
|
}
|
|
|
|
return cp
|
|
}
|
|
|
|
// LogMessageProcessor is an interface for log message processors.
|
|
type LogMessageProcessor interface {
|
|
// AddRow must add row to the LogMessageProcessor with the given timestamp and the given fields.
|
|
//
|
|
// The LogMessageProcessor implementation cannot hold references to fields, since the caller can re-use them.
|
|
AddRow(timestamp int64, fields []logstorage.Field)
|
|
|
|
// MustClose() must flush all the remaining fields and free up resources occupied by LogMessageProcessor.
|
|
MustClose()
|
|
}
|
|
|
|
type logMessageProcessor struct {
|
|
mu sync.Mutex
|
|
wg sync.WaitGroup
|
|
stopCh chan struct{}
|
|
lastFlushTime time.Time
|
|
|
|
tmpFields []logstorage.Field
|
|
|
|
cp *CommonParams
|
|
lr *logstorage.LogRows
|
|
}
|
|
|
|
func (lmp *logMessageProcessor) initPeriodicFlush() {
|
|
lmp.lastFlushTime = time.Now()
|
|
|
|
lmp.wg.Add(1)
|
|
go func() {
|
|
defer lmp.wg.Done()
|
|
|
|
d := timeutil.AddJitterToDuration(time.Second)
|
|
ticker := time.NewTicker(d)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-lmp.stopCh:
|
|
return
|
|
case <-ticker.C:
|
|
lmp.mu.Lock()
|
|
if time.Since(lmp.lastFlushTime) >= d {
|
|
lmp.flushLocked()
|
|
}
|
|
lmp.mu.Unlock()
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// AddRow adds new log message to lmp with the given timestamp and fields.
|
|
func (lmp *logMessageProcessor) AddRow(timestamp int64, fields []logstorage.Field) {
|
|
lmp.mu.Lock()
|
|
defer lmp.mu.Unlock()
|
|
|
|
if len(fields) > *MaxFieldsPerLine {
|
|
rf := logstorage.RowFormatter(fields)
|
|
logger.Warnf("dropping log line with %d fields; it exceeds -insert.maxFieldsPerLine=%d; %s", len(fields), *MaxFieldsPerLine, rf)
|
|
rowsDroppedTotalTooManyFields.Inc()
|
|
return
|
|
}
|
|
|
|
if *defaultMsgValue != "" && !hasMsgField(fields) {
|
|
// The log entry doesn't contain mandatory _msg field. Add _msg field with default value then
|
|
// according to https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field .
|
|
lmp.tmpFields = append(lmp.tmpFields[:0], fields...)
|
|
lmp.tmpFields = append(lmp.tmpFields, logstorage.Field{
|
|
Name: "_msg",
|
|
Value: *defaultMsgValue,
|
|
})
|
|
fields = lmp.tmpFields
|
|
}
|
|
|
|
lmp.lr.MustAdd(lmp.cp.TenantID, timestamp, fields)
|
|
if lmp.cp.Debug {
|
|
s := lmp.lr.GetRowString(0)
|
|
lmp.lr.ResetKeepSettings()
|
|
logger.Infof("remoteAddr=%s; requestURI=%s; ignoring log entry because of `debug` arg: %s", lmp.cp.DebugRemoteAddr, lmp.cp.DebugRequestURI, s)
|
|
rowsDroppedTotalDebug.Inc()
|
|
return
|
|
}
|
|
if lmp.lr.NeedFlush() {
|
|
lmp.flushLocked()
|
|
}
|
|
}
|
|
|
|
func hasMsgField(fields []logstorage.Field) bool {
|
|
for _, f := range fields {
|
|
if f.Name == "_msg" {
|
|
return len(f.Value) > 0
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// flushLocked must be called under locked lmp.mu.
|
|
func (lmp *logMessageProcessor) flushLocked() {
|
|
lmp.lastFlushTime = time.Now()
|
|
vlstorage.MustAddRows(lmp.lr)
|
|
lmp.lr.ResetKeepSettings()
|
|
}
|
|
|
|
// MustClose flushes the remaining data to the underlying storage and closes lmp.
|
|
func (lmp *logMessageProcessor) MustClose() {
|
|
close(lmp.stopCh)
|
|
lmp.wg.Wait()
|
|
|
|
lmp.flushLocked()
|
|
logstorage.PutLogRows(lmp.lr)
|
|
lmp.lr = nil
|
|
}
|
|
|
|
// NewLogMessageProcessor returns new LogMessageProcessor for the given cp.
|
|
//
|
|
// MustClose() must be called on the returned LogMessageProcessor when it is no longer needed.
|
|
func (cp *CommonParams) NewLogMessageProcessor() LogMessageProcessor {
|
|
lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields)
|
|
lmp := &logMessageProcessor{
|
|
cp: cp,
|
|
lr: lr,
|
|
|
|
stopCh: make(chan struct{}),
|
|
}
|
|
lmp.initPeriodicFlush()
|
|
|
|
return lmp
|
|
}
|
|
|
|
var (
|
|
rowsDroppedTotalDebug = metrics.NewCounter(`vl_rows_dropped_total{reason="debug"}`)
|
|
rowsDroppedTotalTooManyFields = metrics.NewCounter(`vl_rows_dropped_total{reason="too_many_fields"}`)
|
|
)
|