2023-06-22 02:39:22 +00:00
package insertutils
import (
2024-10-30 13:59:03 +00:00
"flag"
2023-06-22 02:39:22 +00:00
"net/http"
2024-09-03 17:16:10 +00:00
"strings"
2024-07-01 23:28:02 +00:00
"sync"
"time"
2023-06-22 02:39:22 +00:00
2023-09-15 13:18:38 +00:00
"github.com/VictoriaMetrics/metrics"
2023-06-22 02:39:22 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
2024-07-01 23:28:02 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
2023-06-22 02:39:22 +00:00
)
2024-10-30 13:59:03 +00:00
var (
defaultMsgValue = flag . String ( "defaultMsgValue" , "missing _msg field; see https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field" ,
"Default value for _msg field if the ingested log entry doesn't contain it; see https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field" )
)
2023-06-22 02:39:22 +00:00
// CommonParams contains common HTTP parameters used by log ingestion APIs.
//
2024-05-24 22:30:58 +00:00
// See https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-parameters
2023-06-22 02:39:22 +00:00
type CommonParams struct {
TenantID logstorage . TenantID
TimeField string
2024-10-30 13:13:56 +00:00
MsgFields [ ] string
2023-06-22 02:39:22 +00:00
StreamFields [ ] string
IgnoreFields [ ] string
Debug bool
DebugRequestURI string
DebugRemoteAddr string
}
// GetCommonParams returns CommonParams from r.
func GetCommonParams ( r * http . Request ) ( * CommonParams , error ) {
// Extract tenantID
tenantID , err := logstorage . GetTenantIDFromRequest ( r )
if err != nil {
return nil , err
}
2024-09-03 15:43:26 +00:00
// Extract time field name from _time_field query arg or header
2024-09-03 17:16:10 +00:00
timeField := "_time"
2023-06-22 02:39:22 +00:00
if tf := r . FormValue ( "_time_field" ) ; tf != "" {
timeField = tf
2024-09-03 15:43:26 +00:00
} else if tf = r . Header . Get ( "VL-Time-Field" ) ; tf != "" {
timeField = tf
2023-06-22 02:39:22 +00:00
}
2024-09-03 15:43:26 +00:00
// Extract message field name from _msg_field query arg or header
2024-09-03 17:16:10 +00:00
msgField := ""
2023-06-22 02:39:22 +00:00
if msgf := r . FormValue ( "_msg_field" ) ; msgf != "" {
msgField = msgf
2024-09-03 15:43:26 +00:00
} else if msgf = r . Header . Get ( "VL-Msg-Field" ) ; msgf != "" {
msgField = msgf
2023-06-22 02:39:22 +00:00
}
2024-10-30 13:13:56 +00:00
var msgFields [ ] string
if msgField != "" {
msgFields = strings . Split ( msgField , "," )
}
2023-06-22 02:39:22 +00:00
streamFields := httputils . GetArray ( r , "_stream_fields" )
2024-09-03 15:43:26 +00:00
if len ( streamFields ) == 0 {
2024-09-03 17:16:10 +00:00
if sf := r . Header . Get ( "VL-Stream-Fields" ) ; len ( sf ) > 0 {
streamFields = strings . Split ( sf , "," )
2024-09-03 15:43:26 +00:00
}
}
2023-06-22 02:39:22 +00:00
ignoreFields := httputils . GetArray ( r , "ignore_fields" )
2024-09-03 15:43:26 +00:00
if len ( ignoreFields ) == 0 {
2024-09-03 17:16:10 +00:00
if f := r . Header . Get ( "VL-Ignore-Fields" ) ; len ( f ) > 0 {
ignoreFields = strings . Split ( f , "," )
2024-09-03 15:43:26 +00:00
}
}
2023-06-22 02:39:22 +00:00
debug := httputils . GetBool ( r , "debug" )
2024-09-03 17:16:10 +00:00
if ! debug {
if dh := r . Header . Get ( "VL-Debug" ) ; len ( dh ) > 0 {
hv := strings . ToLower ( dh )
switch hv {
case "" , "0" , "f" , "false" , "no" :
default :
debug = true
}
}
}
2023-06-22 02:39:22 +00:00
debugRequestURI := ""
debugRemoteAddr := ""
if debug {
debugRequestURI = httpserver . GetRequestURI ( r )
debugRemoteAddr = httpserver . GetQuotedRemoteAddr ( r )
}
cp := & CommonParams {
TenantID : tenantID ,
TimeField : timeField ,
2024-10-30 13:13:56 +00:00
MsgFields : msgFields ,
2023-06-22 02:39:22 +00:00
StreamFields : streamFields ,
IgnoreFields : ignoreFields ,
Debug : debug ,
DebugRequestURI : debugRequestURI ,
DebugRemoteAddr : debugRemoteAddr ,
}
2024-09-03 15:43:26 +00:00
2023-06-22 02:39:22 +00:00
return cp , nil
}
2024-06-17 10:13:18 +00:00
// GetCommonParamsForSyslog returns common params needed for parsing syslog messages and storing them to the given tenantID.
func GetCommonParamsForSyslog ( tenantID logstorage . TenantID ) * CommonParams {
// See https://docs.victoriametrics.com/victorialogs/logsql/#unpack_syslog-pipe
cp := & CommonParams {
TenantID : tenantID ,
TimeField : "timestamp" ,
2024-10-30 13:13:56 +00:00
MsgFields : [ ] string {
"message" ,
} ,
2024-06-17 10:13:18 +00:00
StreamFields : [ ] string {
"hostname" ,
"app_name" ,
"proc_id" ,
} ,
}
return cp
}
2024-06-17 20:28:15 +00:00
// LogMessageProcessor is an interface for log message processors.
type LogMessageProcessor interface {
// AddRow must add row to the LogMessageProcessor with the given timestamp and the given fields.
//
// The LogMessageProcessor implementation cannot hold references to fields, since the caller can re-use them.
AddRow ( timestamp int64 , fields [ ] logstorage . Field )
// MustClose() must flush all the remaining fields and free up resources occupied by LogMessageProcessor.
MustClose ( )
}
type logMessageProcessor struct {
2024-07-01 23:28:02 +00:00
mu sync . Mutex
wg sync . WaitGroup
stopCh chan struct { }
lastFlushTime time . Time
2024-10-30 13:59:03 +00:00
tmpFields [ ] logstorage . Field
2024-06-17 20:28:15 +00:00
cp * CommonParams
lr * logstorage . LogRows
}
2024-07-01 23:28:02 +00:00
func ( lmp * logMessageProcessor ) initPeriodicFlush ( ) {
lmp . lastFlushTime = time . Now ( )
lmp . wg . Add ( 1 )
go func ( ) {
defer lmp . wg . Done ( )
d := timeutil . AddJitterToDuration ( time . Second )
ticker := time . NewTicker ( d )
defer ticker . Stop ( )
for {
select {
case <- lmp . stopCh :
return
case <- ticker . C :
lmp . mu . Lock ( )
if time . Since ( lmp . lastFlushTime ) >= d {
lmp . flushLocked ( )
}
lmp . mu . Unlock ( )
}
}
} ( )
}
2024-06-17 20:28:15 +00:00
// AddRow adds new log message to lmp with the given timestamp and fields.
func ( lmp * logMessageProcessor ) AddRow ( timestamp int64 , fields [ ] logstorage . Field ) {
2024-07-01 23:28:02 +00:00
lmp . mu . Lock ( )
defer lmp . mu . Unlock ( )
2024-06-17 20:28:15 +00:00
if len ( fields ) > * MaxFieldsPerLine {
rf := logstorage . RowFormatter ( fields )
logger . Warnf ( "dropping log line with %d fields; it exceeds -insert.maxFieldsPerLine=%d; %s" , len ( fields ) , * MaxFieldsPerLine , rf )
rowsDroppedTotalTooManyFields . Inc ( )
return
}
2024-10-30 13:59:03 +00:00
if * defaultMsgValue != "" && ! hasMsgField ( fields ) {
// The log entry doesn't contain mandatory _msg field. Add _msg field with default value then
// according to https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field .
lmp . tmpFields = append ( lmp . tmpFields [ : 0 ] , fields ... )
lmp . tmpFields = append ( lmp . tmpFields , logstorage . Field {
Name : "_msg" ,
Value : * defaultMsgValue ,
} )
fields = lmp . tmpFields
2024-09-26 07:35:28 +00:00
}
2024-06-17 20:28:15 +00:00
lmp . lr . MustAdd ( lmp . cp . TenantID , timestamp , fields )
if lmp . cp . Debug {
s := lmp . lr . GetRowString ( 0 )
lmp . lr . ResetKeepSettings ( )
2024-09-03 17:16:10 +00:00
logger . Infof ( "remoteAddr=%s; requestURI=%s; ignoring log entry because of `debug` arg: %s" , lmp . cp . DebugRemoteAddr , lmp . cp . DebugRequestURI , s )
2024-06-17 20:28:15 +00:00
rowsDroppedTotalDebug . Inc ( )
return
}
if lmp . lr . NeedFlush ( ) {
2024-07-01 23:28:02 +00:00
lmp . flushLocked ( )
2024-06-17 20:28:15 +00:00
}
}
2024-10-30 13:59:03 +00:00
func hasMsgField ( fields [ ] logstorage . Field ) bool {
for _ , f := range fields {
if f . Name == "_msg" {
return len ( f . Value ) > 0
}
}
return false
}
2024-07-01 23:28:02 +00:00
// flushLocked must be called under locked lmp.mu.
func ( lmp * logMessageProcessor ) flushLocked ( ) {
lmp . lastFlushTime = time . Now ( )
2024-06-17 20:28:15 +00:00
vlstorage . MustAddRows ( lmp . lr )
lmp . lr . ResetKeepSettings ( )
}
// MustClose flushes the remaining data to the underlying storage and closes lmp.
func ( lmp * logMessageProcessor ) MustClose ( ) {
2024-07-01 23:28:02 +00:00
close ( lmp . stopCh )
lmp . wg . Wait ( )
lmp . flushLocked ( )
2024-06-17 20:28:15 +00:00
logstorage . PutLogRows ( lmp . lr )
lmp . lr = nil
}
// NewLogMessageProcessor returns new LogMessageProcessor for the given cp.
2024-07-01 23:28:02 +00:00
//
// MustClose() must be called on the returned LogMessageProcessor when it is no longer needed.
2024-06-17 20:28:15 +00:00
func ( cp * CommonParams ) NewLogMessageProcessor ( ) LogMessageProcessor {
lr := logstorage . GetLogRows ( cp . StreamFields , cp . IgnoreFields )
2024-07-01 23:28:02 +00:00
lmp := & logMessageProcessor {
2024-06-17 20:28:15 +00:00
cp : cp ,
lr : lr ,
2024-07-01 23:28:02 +00:00
stopCh : make ( chan struct { } ) ,
2023-06-22 02:39:22 +00:00
}
2024-07-01 23:28:02 +00:00
lmp . initPeriodicFlush ( )
return lmp
2023-06-22 02:39:22 +00:00
}
2024-09-03 17:16:10 +00:00
var (
rowsDroppedTotalDebug = metrics . NewCounter ( ` vl_rows_dropped_total { reason="debug"} ` )
rowsDroppedTotalTooManyFields = metrics . NewCounter ( ` vl_rows_dropped_total { reason="too_many_fields"} ` )
)