mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-02-09 15:27:11 +00:00
app/vlinsert/jsonline: code prettifying
This commit is contained in:
parent
bab5e9fbc2
commit
dde9ceed07
10 changed files with 218 additions and 205 deletions
|
@ -1,8 +0,0 @@
|
||||||
package common
|
|
||||||
|
|
||||||
import "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
|
||||||
|
|
||||||
var (
|
|
||||||
// MaxLineSizeBytes is the maximum size of a single line, which can be read by /insert/* handlers
|
|
||||||
MaxLineSizeBytes = flagutil.NewBytes("insert.maxLineSizeBytes", 256*1024, "The maximum size of a single line, which can be read by /insert/* handlers")
|
|
||||||
)
|
|
|
@ -11,16 +11,15 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/common"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bufferedwriter"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bufferedwriter"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logjson"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logjson"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
||||||
pc "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
)
|
)
|
||||||
|
@ -84,54 +83,15 @@ func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
bulkRequestsTotal.Inc()
|
bulkRequestsTotal.Inc()
|
||||||
|
|
||||||
// Extract tenantID
|
cp, err := insertutils.GetCommonParams(r)
|
||||||
tenantID, err := logstorage.GetTenantIDFromRequest(r)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
httpserver.Errorf(w, r, "%s", err)
|
httpserver.Errorf(w, r, "%s", err)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields)
|
||||||
// Extract time field name from _time_field query arg
|
processLogMessage := cp.GetProcessLogMessageFunc(lr)
|
||||||
var timeField = "_time"
|
|
||||||
if tf := r.FormValue("_time_field"); tf != "" {
|
|
||||||
timeField = tf
|
|
||||||
}
|
|
||||||
|
|
||||||
// Extract message field name from _msg_field query arg
|
|
||||||
var msgField = ""
|
|
||||||
if msgf := r.FormValue("_msg_field"); msgf != "" {
|
|
||||||
msgField = msgf
|
|
||||||
}
|
|
||||||
|
|
||||||
streamFields := httputils.GetArray(r, "_stream_fields")
|
|
||||||
ignoreFields := httputils.GetArray(r, "ignore_fields")
|
|
||||||
|
|
||||||
isDebug := httputils.GetBool(r, "debug")
|
|
||||||
debugRequestURI := ""
|
|
||||||
debugRemoteAddr := ""
|
|
||||||
if isDebug {
|
|
||||||
debugRequestURI = httpserver.GetRequestURI(r)
|
|
||||||
debugRemoteAddr = httpserver.GetQuotedRemoteAddr(r)
|
|
||||||
}
|
|
||||||
|
|
||||||
lr := logstorage.GetLogRows(streamFields, ignoreFields)
|
|
||||||
processLogMessage := func(timestamp int64, fields []logstorage.Field) {
|
|
||||||
lr.MustAdd(tenantID, timestamp, fields)
|
|
||||||
if isDebug {
|
|
||||||
s := lr.GetRowString(0)
|
|
||||||
lr.ResetKeepSettings()
|
|
||||||
logger.Infof("remoteAddr=%s; requestURI=%s; ignoring log entry because of `debug` query arg: %s", debugRemoteAddr, debugRequestURI, s)
|
|
||||||
rowsDroppedTotal.Inc()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if lr.NeedFlush() {
|
|
||||||
vlstorage.MustAddRows(lr)
|
|
||||||
lr.ResetKeepSettings()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
isGzip := r.Header.Get("Content-Encoding") == "gzip"
|
isGzip := r.Header.Get("Content-Encoding") == "gzip"
|
||||||
n, err := readBulkRequest(r.Body, isGzip, timeField, msgField, processLogMessage)
|
n, err := readBulkRequest(r.Body, isGzip, cp.TimeField, cp.MsgField, processLogMessage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Warnf("cannot decode log message #%d in /_bulk request: %s", n, err)
|
logger.Warnf("cannot decode log message #%d in /_bulk request: %s", n, err)
|
||||||
return true
|
return true
|
||||||
|
@ -152,7 +112,6 @@ func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
|
||||||
|
|
||||||
var (
|
var (
|
||||||
bulkRequestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/elasticsearch/_bulk"}`)
|
bulkRequestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/elasticsearch/_bulk"}`)
|
||||||
rowsDroppedTotal = metrics.NewCounter(`vl_rows_dropped_total{path="/insert/elasticsearch/_bulk",reason="debug"}`)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string,
|
func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string,
|
||||||
|
@ -161,11 +120,11 @@ func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string,
|
||||||
// See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
|
// See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
|
||||||
|
|
||||||
if isGzip {
|
if isGzip {
|
||||||
zr, err := pc.GetGzipReader(r)
|
zr, err := common.GetGzipReader(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("cannot read gzipped _bulk request: %w", err)
|
return 0, fmt.Errorf("cannot read gzipped _bulk request: %w", err)
|
||||||
}
|
}
|
||||||
defer pc.PutGzipReader(zr)
|
defer common.PutGzipReader(zr)
|
||||||
r = zr
|
r = zr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -175,7 +134,7 @@ func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string,
|
||||||
lb := lineBufferPool.Get()
|
lb := lineBufferPool.Get()
|
||||||
defer lineBufferPool.Put(lb)
|
defer lineBufferPool.Put(lb)
|
||||||
|
|
||||||
lb.B = bytesutil.ResizeNoCopyNoOverallocate(lb.B, common.MaxLineSizeBytes.IntN())
|
lb.B = bytesutil.ResizeNoCopyNoOverallocate(lb.B, insertutils.MaxLineSizeBytes.IntN())
|
||||||
sc := bufio.NewScanner(wcr)
|
sc := bufio.NewScanner(wcr)
|
||||||
sc.Buffer(lb.B, len(lb.B))
|
sc.Buffer(lb.B, len(lb.B))
|
||||||
|
|
||||||
|
@ -211,7 +170,7 @@ func readBulkLine(sc *bufio.Scanner, timeField, msgField string,
|
||||||
if err := sc.Err(); err != nil {
|
if err := sc.Err(); err != nil {
|
||||||
if errors.Is(err, bufio.ErrTooLong) {
|
if errors.Is(err, bufio.ErrTooLong) {
|
||||||
return false, fmt.Errorf(`cannot read "create" or "index" command, since its size exceeds -insert.maxLineSizeBytes=%d`,
|
return false, fmt.Errorf(`cannot read "create" or "index" command, since its size exceeds -insert.maxLineSizeBytes=%d`,
|
||||||
common.MaxLineSizeBytes.IntN())
|
insertutils.MaxLineSizeBytes.IntN())
|
||||||
}
|
}
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
@ -228,7 +187,7 @@ func readBulkLine(sc *bufio.Scanner, timeField, msgField string,
|
||||||
if !sc.Scan() {
|
if !sc.Scan() {
|
||||||
if err := sc.Err(); err != nil {
|
if err := sc.Err(); err != nil {
|
||||||
if errors.Is(err, bufio.ErrTooLong) {
|
if errors.Is(err, bufio.ErrTooLong) {
|
||||||
return false, fmt.Errorf("cannot read log message, since its size exceeds -insert.maxLineSizeBytes=%d", common.MaxLineSizeBytes.IntN())
|
return false, fmt.Errorf("cannot read log message, since its size exceeds -insert.maxLineSizeBytes=%d", insertutils.MaxLineSizeBytes.IntN())
|
||||||
}
|
}
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
@ -244,7 +203,7 @@ func readBulkLine(sc *bufio.Scanner, timeField, msgField string,
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, fmt.Errorf("cannot parse timestamp: %w", err)
|
return false, fmt.Errorf("cannot parse timestamp: %w", err)
|
||||||
}
|
}
|
||||||
updateMessageFieldName(msgField, p.Fields)
|
p.RenameField(msgField, "_msg")
|
||||||
processLogMessage(timestamp, p.Fields)
|
processLogMessage(timestamp, p.Fields)
|
||||||
logjson.PutParser(p)
|
logjson.PutParser(p)
|
||||||
return true, nil
|
return true, nil
|
||||||
|
@ -266,19 +225,6 @@ func extractTimestampFromFields(timeField string, fields []logstorage.Field) (in
|
||||||
return time.Now().UnixNano(), nil
|
return time.Now().UnixNano(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func updateMessageFieldName(msgField string, fields []logstorage.Field) {
|
|
||||||
if msgField == "" {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
for i := range fields {
|
|
||||||
f := &fields[i]
|
|
||||||
if f.Name == msgField {
|
|
||||||
f.Name = "_msg"
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func parseElasticsearchTimestamp(s string) (int64, error) {
|
func parseElasticsearchTimestamp(s string) (int64, error) {
|
||||||
if len(s) < len("YYYY-MM-DD") || s[len("YYYY")] != '-' {
|
if len(s) < len("YYYY-MM-DD") || s[len("YYYY")] != '-' {
|
||||||
// Try parsing timestamp in milliseconds
|
// Try parsing timestamp in milliseconds
|
||||||
|
|
91
app/vlinsert/insertutils/common_params.go
Normal file
91
app/vlinsert/insertutils/common_params.go
Normal file
|
@ -0,0 +1,91 @@
|
||||||
|
package insertutils
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
||||||
|
"github.com/VictoriaMetrics/metrics"
|
||||||
|
)
|
||||||
|
|
||||||
|
// CommonParams contains common HTTP parameters used by log ingestion APIs.
|
||||||
|
//
|
||||||
|
// See https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/#http-parameters
|
||||||
|
type CommonParams struct {
|
||||||
|
TenantID logstorage.TenantID
|
||||||
|
TimeField string
|
||||||
|
MsgField string
|
||||||
|
StreamFields []string
|
||||||
|
IgnoreFields []string
|
||||||
|
|
||||||
|
Debug bool
|
||||||
|
DebugRequestURI string
|
||||||
|
DebugRemoteAddr string
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetCommonParams returns CommonParams from r.
|
||||||
|
func GetCommonParams(r *http.Request) (*CommonParams, error) {
|
||||||
|
// Extract tenantID
|
||||||
|
tenantID, err := logstorage.GetTenantIDFromRequest(r)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extract time field name from _time_field query arg
|
||||||
|
var timeField = "_time"
|
||||||
|
if tf := r.FormValue("_time_field"); tf != "" {
|
||||||
|
timeField = tf
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extract message field name from _msg_field query arg
|
||||||
|
var msgField = ""
|
||||||
|
if msgf := r.FormValue("_msg_field"); msgf != "" {
|
||||||
|
msgField = msgf
|
||||||
|
}
|
||||||
|
|
||||||
|
streamFields := httputils.GetArray(r, "_stream_fields")
|
||||||
|
ignoreFields := httputils.GetArray(r, "ignore_fields")
|
||||||
|
|
||||||
|
debug := httputils.GetBool(r, "debug")
|
||||||
|
debugRequestURI := ""
|
||||||
|
debugRemoteAddr := ""
|
||||||
|
if debug {
|
||||||
|
debugRequestURI = httpserver.GetRequestURI(r)
|
||||||
|
debugRemoteAddr = httpserver.GetQuotedRemoteAddr(r)
|
||||||
|
}
|
||||||
|
|
||||||
|
cp := &CommonParams{
|
||||||
|
TenantID: tenantID,
|
||||||
|
TimeField: timeField,
|
||||||
|
MsgField: msgField,
|
||||||
|
StreamFields: streamFields,
|
||||||
|
IgnoreFields: ignoreFields,
|
||||||
|
Debug: debug,
|
||||||
|
DebugRequestURI: debugRequestURI,
|
||||||
|
DebugRemoteAddr: debugRemoteAddr,
|
||||||
|
}
|
||||||
|
return cp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetProcessLogMessageFunc returns a function, which adds parsed log messages to lr.
|
||||||
|
func (cp *CommonParams) GetProcessLogMessageFunc(lr *logstorage.LogRows) func(timestamp int64, fields []logstorage.Field) {
|
||||||
|
return func(timestamp int64, fields []logstorage.Field) {
|
||||||
|
lr.MustAdd(cp.TenantID, timestamp, fields)
|
||||||
|
if cp.Debug {
|
||||||
|
s := lr.GetRowString(0)
|
||||||
|
lr.ResetKeepSettings()
|
||||||
|
logger.Infof("remoteAddr=%s; requestURI=%s; ignoring log entry because of `debug` query arg: %s", cp.DebugRemoteAddr, cp.DebugRequestURI, s)
|
||||||
|
rowsDroppedTotal.Inc()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if lr.NeedFlush() {
|
||||||
|
vlstorage.MustAddRows(lr)
|
||||||
|
lr.ResetKeepSettings()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var rowsDroppedTotal = metrics.NewCounter(`vl_rows_dropped_total{reason="debug"}`)
|
10
app/vlinsert/insertutils/flags.go
Normal file
10
app/vlinsert/insertutils/flags.go
Normal file
|
@ -0,0 +1,10 @@
|
||||||
|
package insertutils
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// MaxLineSizeBytes is the maximum length of a single line for /insert/* handlers
|
||||||
|
MaxLineSizeBytes = flagutil.NewBytes("insert.maxLineSizeBytes", 256*1024, "The maximum size of a single line, which can be read by /insert/* handlers")
|
||||||
|
)
|
|
@ -4,92 +4,48 @@ import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logjson"
|
|
||||||
"math"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/common"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logjson"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
||||||
pc "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
)
|
)
|
||||||
|
|
||||||
// RequestHandler processes jsonline insert requests
|
// RequestHandler processes jsonline insert requests
|
||||||
func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
|
func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||||
w.Header().Add("Content-Type", "application/json")
|
w.Header().Add("Content-Type", "application/json")
|
||||||
|
|
||||||
if path != "/" {
|
if r.Method != "POST" {
|
||||||
return false
|
|
||||||
}
|
|
||||||
if method := r.Method; method != "POST" {
|
|
||||||
w.WriteHeader(http.StatusMethodNotAllowed)
|
w.WriteHeader(http.StatusMethodNotAllowed)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
requestsTotal.Inc()
|
requestsTotal.Inc()
|
||||||
|
|
||||||
// Extract tenantID
|
cp, err := insertutils.GetCommonParams(r)
|
||||||
tenantID, err := logstorage.GetTenantIDFromRequest(r)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
httpserver.Errorf(w, r, "%s", err)
|
httpserver.Errorf(w, r, "%s", err)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields)
|
||||||
// Extract time field name from _time_field query arg
|
processLogMessage := cp.GetProcessLogMessageFunc(lr)
|
||||||
var timeField = "_time"
|
|
||||||
if tf := r.FormValue("_time_field"); tf != "" {
|
|
||||||
timeField = tf
|
|
||||||
}
|
|
||||||
|
|
||||||
// Extract message field name from _msg_field query arg
|
|
||||||
var msgField = ""
|
|
||||||
if msgf := r.FormValue("_msg_field"); msgf != "" {
|
|
||||||
msgField = msgf
|
|
||||||
}
|
|
||||||
|
|
||||||
streamFields := httputils.GetArray(r, "_stream_fields")
|
|
||||||
ignoreFields := httputils.GetArray(r, "ignore_fields")
|
|
||||||
|
|
||||||
isDebug := httputils.GetBool(r, "debug")
|
|
||||||
debugRequestURI := ""
|
|
||||||
debugRemoteAddr := ""
|
|
||||||
if isDebug {
|
|
||||||
debugRequestURI = httpserver.GetRequestURI(r)
|
|
||||||
debugRemoteAddr = httpserver.GetQuotedRemoteAddr(r)
|
|
||||||
}
|
|
||||||
|
|
||||||
lr := logstorage.GetLogRows(streamFields, ignoreFields)
|
|
||||||
processLogMessage := func(timestamp int64, fields []logstorage.Field) {
|
|
||||||
lr.MustAdd(tenantID, timestamp, fields)
|
|
||||||
if isDebug {
|
|
||||||
s := lr.GetRowString(0)
|
|
||||||
lr.ResetKeepSettings()
|
|
||||||
logger.Infof("remoteAddr=%s; requestURI=%s; ignoring log entry because of `debug` query arg: %s", debugRemoteAddr, debugRequestURI, s)
|
|
||||||
rowsDroppedTotal.Inc()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if lr.NeedFlush() {
|
|
||||||
vlstorage.MustAddRows(lr)
|
|
||||||
lr.ResetKeepSettings()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
reader := r.Body
|
reader := r.Body
|
||||||
if r.Header.Get("Content-Encoding") == "gzip" {
|
if r.Header.Get("Content-Encoding") == "gzip" {
|
||||||
zr, err := pc.GetGzipReader(reader)
|
zr, err := common.GetGzipReader(reader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
//return 0, fmt.Errorf("cannot read gzipped _bulk request: %w", err)
|
logger.Errorf("cannot read gzipped _bulk request: %s", err)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
defer pc.PutGzipReader(zr)
|
defer common.PutGzipReader(zr)
|
||||||
reader = zr
|
reader = zr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -99,16 +55,17 @@ func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
|
||||||
lb := lineBufferPool.Get()
|
lb := lineBufferPool.Get()
|
||||||
defer lineBufferPool.Put(lb)
|
defer lineBufferPool.Put(lb)
|
||||||
|
|
||||||
lb.B = bytesutil.ResizeNoCopyNoOverallocate(lb.B, common.MaxLineSizeBytes.IntN())
|
lb.B = bytesutil.ResizeNoCopyNoOverallocate(lb.B, insertutils.MaxLineSizeBytes.IntN())
|
||||||
sc := bufio.NewScanner(wcr)
|
sc := bufio.NewScanner(wcr)
|
||||||
sc.Buffer(lb.B, len(lb.B))
|
sc.Buffer(lb.B, len(lb.B))
|
||||||
|
|
||||||
n := 0
|
n := 0
|
||||||
for {
|
for {
|
||||||
ok, err := readLine(sc, timeField, msgField, processLogMessage)
|
ok, err := readLine(sc, cp.TimeField, cp.MsgField, processLogMessage)
|
||||||
wcr.DecConcurrency()
|
wcr.DecConcurrency()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Errorf("cannot read line #%d in /jsonline request: %s", n, err)
|
logger.Errorf("cannot read line #%d in /jsonline request: %s", n, err)
|
||||||
|
break
|
||||||
}
|
}
|
||||||
if !ok {
|
if !ok {
|
||||||
break
|
break
|
||||||
|
@ -124,30 +81,29 @@ func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
func readLine(sc *bufio.Scanner, timeField, msgField string, processLogMessage func(timestamp int64, fields []logstorage.Field)) (bool, error) {
|
func readLine(sc *bufio.Scanner, timeField, msgField string, processLogMessage func(timestamp int64, fields []logstorage.Field)) (bool, error) {
|
||||||
if !sc.Scan() {
|
var line []byte
|
||||||
if err := sc.Err(); err != nil {
|
for len(line) == 0 {
|
||||||
if errors.Is(err, bufio.ErrTooLong) {
|
if !sc.Scan() {
|
||||||
return false, fmt.Errorf(`cannot read json line, since its size exceeds -insert.maxLineSizeBytes=%d`, common.MaxLineSizeBytes.IntN())
|
if err := sc.Err(); err != nil {
|
||||||
|
if errors.Is(err, bufio.ErrTooLong) {
|
||||||
|
return false, fmt.Errorf(`cannot read json line, since its size exceeds -insert.maxLineSizeBytes=%d`, insertutils.MaxLineSizeBytes.IntN())
|
||||||
|
}
|
||||||
|
return false, err
|
||||||
}
|
}
|
||||||
return false, err
|
return false, nil
|
||||||
}
|
}
|
||||||
return false, nil
|
line = sc.Bytes()
|
||||||
}
|
}
|
||||||
|
|
||||||
line := sc.Bytes()
|
|
||||||
p := logjson.GetParser()
|
p := logjson.GetParser()
|
||||||
|
|
||||||
if err := p.ParseLogMessage(line); err != nil {
|
if err := p.ParseLogMessage(line); err != nil {
|
||||||
invalidJSONLineLogger.Warnf("cannot parse json-encoded log entry: %s", err)
|
return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err)
|
||||||
return true, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
timestamp, err := extractTimestampFromFields(timeField, p.Fields)
|
timestamp, err := extractTimestampFromFields(timeField, p.Fields)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
invalidTimestampLogger.Warnf("skipping the log entry because cannot parse timestamp: %s", err)
|
return false, fmt.Errorf("cannot parse timestamp: %w", err)
|
||||||
return true, nil
|
|
||||||
}
|
}
|
||||||
updateMessageFieldName(msgField, p.Fields)
|
p.RenameField(msgField, "_msg")
|
||||||
processLogMessage(timestamp, p.Fields)
|
processLogMessage(timestamp, p.Fields)
|
||||||
logjson.PutParser(p)
|
logjson.PutParser(p)
|
||||||
return true, nil
|
return true, nil
|
||||||
|
@ -159,7 +115,7 @@ func extractTimestampFromFields(timeField string, fields []logstorage.Field) (in
|
||||||
if f.Name != timeField {
|
if f.Name != timeField {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
timestamp, err := parseTimestamp(f.Value)
|
timestamp, err := parseISO8601Timestamp(f.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
@ -169,42 +125,7 @@ func extractTimestampFromFields(timeField string, fields []logstorage.Field) (in
|
||||||
return time.Now().UnixNano(), nil
|
return time.Now().UnixNano(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func updateMessageFieldName(msgField string, fields []logstorage.Field) {
|
func parseISO8601Timestamp(s string) (int64, error) {
|
||||||
if msgField == "" {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
for i := range fields {
|
|
||||||
f := &fields[i]
|
|
||||||
if f.Name == msgField {
|
|
||||||
f.Name = "_msg"
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func parseTimestamp(s string) (int64, error) {
|
|
||||||
if len(s) < len("YYYY-MM-DD") || s[len("YYYY")] != '-' {
|
|
||||||
// Try parsing timestamp in milliseconds
|
|
||||||
n, err := strconv.ParseInt(s, 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return 0, fmt.Errorf("cannot parse timestamp in milliseconds from %q: %w", s, err)
|
|
||||||
}
|
|
||||||
if n > int64(math.MaxInt64)/1e6 {
|
|
||||||
return 0, fmt.Errorf("too big timestamp in milliseconds: %d; mustn't exceed %d", n, int64(math.MaxInt64)/1e6)
|
|
||||||
}
|
|
||||||
if n < int64(math.MinInt64)/1e6 {
|
|
||||||
return 0, fmt.Errorf("too small timestamp in milliseconds: %d; must be bigger than %d", n, int64(math.MinInt64)/1e6)
|
|
||||||
}
|
|
||||||
n *= 1e6
|
|
||||||
return n, nil
|
|
||||||
}
|
|
||||||
if len(s) == len("YYYY-MM-DD") {
|
|
||||||
t, err := time.Parse("2006-01-02", s)
|
|
||||||
if err != nil {
|
|
||||||
return 0, fmt.Errorf("cannot parse date %q: %w", s, err)
|
|
||||||
}
|
|
||||||
return t.UnixNano(), nil
|
|
||||||
}
|
|
||||||
t, err := time.Parse(time.RFC3339, s)
|
t, err := time.Parse(time.RFC3339, s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("cannot parse timestamp %q: %w", s, err)
|
return 0, fmt.Errorf("cannot parse timestamp %q: %w", s, err)
|
||||||
|
@ -217,10 +138,4 @@ var lineBufferPool bytesutil.ByteBufferPool
|
||||||
var (
|
var (
|
||||||
requestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/jsonline"}`)
|
requestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/jsonline"}`)
|
||||||
rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="jsonline"}`)
|
rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="jsonline"}`)
|
||||||
rowsDroppedTotal = metrics.NewCounter(`vl_rows_dropped_total{path="/insert/jsonline",reason="debug"}`)
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
invalidTimestampLogger = logger.WithThrottler("invalidTimestampLogger", 5*time.Second)
|
|
||||||
invalidJSONLineLogger = logger.WithThrottler("invalidJSONLineLogger", 5*time.Second)
|
|
||||||
)
|
)
|
||||||
|
|
|
@ -25,13 +25,13 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||||
path = strings.TrimPrefix(path, "/insert")
|
path = strings.TrimPrefix(path, "/insert")
|
||||||
path = strings.ReplaceAll(path, "//", "/")
|
path = strings.ReplaceAll(path, "//", "/")
|
||||||
|
|
||||||
|
if path == "/jsonline" {
|
||||||
|
return jsonline.RequestHandler(w, r)
|
||||||
|
}
|
||||||
switch {
|
switch {
|
||||||
case strings.HasPrefix(path, "/elasticsearch/"):
|
case strings.HasPrefix(path, "/elasticsearch/"):
|
||||||
path = strings.TrimPrefix(path, "/elasticsearch")
|
path = strings.TrimPrefix(path, "/elasticsearch")
|
||||||
return elasticsearch.RequestHandler(path, w, r)
|
return elasticsearch.RequestHandler(path, w, r)
|
||||||
case strings.HasPrefix(path, "/jsonline"):
|
|
||||||
path = strings.TrimPrefix(path, "/jsonline")
|
|
||||||
return jsonline.RequestHandler(path, w, r)
|
|
||||||
default:
|
default:
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,7 +30,7 @@ VictoriaLogs accepts logs in [Elasticsearch bulk API](https://www.elastic.co/gui
|
||||||
/ [OpenSearch Bulk API](http://opensearch.org/docs/1.2/opensearch/rest-api/document-apis/bulk/) format
|
/ [OpenSearch Bulk API](http://opensearch.org/docs/1.2/opensearch/rest-api/document-apis/bulk/) format
|
||||||
at `http://localhost:9428/insert/elasticsearch/_bulk` endpoint.
|
at `http://localhost:9428/insert/elasticsearch/_bulk` endpoint.
|
||||||
|
|
||||||
The following command pushes a single log line to Elasticsearch bulk API at VictoriaLogs:
|
The following command pushes a single log line to VictoriaLogs:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
echo '{"create":{}}
|
echo '{"create":{}}
|
||||||
|
@ -38,7 +38,14 @@ echo '{"create":{}}
|
||||||
' | curl -X POST -H 'Content-Type: application/json' --data-binary @- http://localhost:9428/insert/elasticsearch/_bulk
|
' | curl -X POST -H 'Content-Type: application/json' --data-binary @- http://localhost:9428/insert/elasticsearch/_bulk
|
||||||
```
|
```
|
||||||
|
|
||||||
The following command verifies that the data has been successfully pushed to VictoriaLogs by [querying](https://docs.victoriametrics.com/VictoriaLogs/querying/) it:
|
It is possible to push thousands of log lines in a single request to this API.
|
||||||
|
|
||||||
|
See [these docs](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) for details on fields,
|
||||||
|
which must be present in the ingested log messages.
|
||||||
|
|
||||||
|
The API accepts various http parameters, which can change the data ingestion behavior - [these docs](#http-parameters) for details.
|
||||||
|
|
||||||
|
The following command verifies that the data has been successfully ingested to VictoriaLogs by [querying](https://docs.victoriametrics.com/VictoriaLogs/querying/) it:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
curl http://localhost:9428/select/logsql/query -d 'query=host.name:host123'
|
curl http://localhost:9428/select/logsql/query -d 'query=host.name:host123'
|
||||||
|
@ -50,20 +57,57 @@ The command should return the following response:
|
||||||
{"_msg":"cannot open file","_stream":"{}","_time":"2023-06-21T04:24:24Z","host.name":"host123"}
|
{"_msg":"cannot open file","_stream":"{}","_time":"2023-06-21T04:24:24Z","host.name":"host123"}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
See also:
|
||||||
|
|
||||||
|
- [How to debug data ingestion](#troubleshooting).
|
||||||
|
- [HTTP parameters, which can be passed to the API](#http-parameters).
|
||||||
|
- [How to query VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/querying.html).
|
||||||
|
|
||||||
### JSON stream API
|
### JSON stream API
|
||||||
|
|
||||||
VictoriaLogs supports HTTP API on `/insert/jsonline` endpoint for data ingestion where
|
VictoriaLogs accepts JSON line stream aka [ndjson](http://ndjson.org/) at `http://localhost:9428/insert/jsonline` endpoint.
|
||||||
body contains a JSON object in each line (separated by `\n`).
|
|
||||||
|
|
||||||
Here is an example:
|
The following command pushes multiple log lines to VictoriaLogs:
|
||||||
|
|
||||||
```http request
|
```bash
|
||||||
POST http://localhost:9428/insert/jsonline/?_stream_fields=stream&_msg_field=log&_time_field=date
|
echo '{ "log": { "level": "info", "message": "hello world" }, "date": "2023-06-20T15:31:23Z", "stream": "stream1" }
|
||||||
Content-Type: application/jsonl
|
{ "log": { "level": "error", "message": "oh no!" }, "date": "2023-06-20T15:32:10.567Z", "stream": "stream1" }
|
||||||
{ "log": { "level": "info", "message": "hello world" }, "date": "2023‐06‐20T15:31:23Z", "stream": "stream1" }
|
{ "log": { "level": "info", "message": "hello world" }, "date": "2023-06-20T15:35:11.567890+02:00", "stream": "stream2" }
|
||||||
{ "log": { "level": "error", "message": "oh no!" }, "date": "2023‐06‐20T15:32:10Z", "stream": "stream1" }
|
' | curl -X POST -H 'Content-Type: application/stream+json' --data-binary @- \
|
||||||
{ "log": { "level": "info", "message": "hello world" }, "date": "2023‐06‐20T15:35:11Z", "stream": "stream2" }
|
'http://localhost:9428/insert/jsonline?_stream_fields=stream&_time_field=date&_msg_field=log.message'
|
||||||
```
|
```
|
||||||
|
|
||||||
|
It is possible to push unlimited number of log lines in a single request to this API.
|
||||||
|
|
||||||
|
The [timestamp field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field) must be
|
||||||
|
in the [ISO8601](https://en.wikipedia.org/wiki/ISO_8601) format. For example, `2023-06-20T15:32:10Z`.
|
||||||
|
Optional fractional part of seconds can be specified after the dot - `2023-06-20T15:32:10.123Z`.
|
||||||
|
Timezone can be specified instead of `Z` suffix - `2023-06-20T15:32:10+02:00`.
|
||||||
|
|
||||||
|
See [these docs](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) for details on fields,
|
||||||
|
which must be present in the ingested log messages.
|
||||||
|
|
||||||
|
The API accepts various http parameters, which can change the data ingestion behavior - [these docs](#http-parameters) for details.
|
||||||
|
|
||||||
|
The following command verifies that the data has been successfully ingested into VictoriaLogs by [querying](https://docs.victoriametrics.com/VictoriaLogs/querying/) it:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
curl http://localhost:9428/select/logsql/query -d 'query=log.level:*'
|
||||||
|
```
|
||||||
|
|
||||||
|
The command should return the following response:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
{"_msg":"hello world","_stream":"{stream=\"stream2\"}","_time":"2023-06-20T13:35:11.56789Z","log.level":"info"}
|
||||||
|
{"_msg":"hello world","_stream":"{stream=\"stream1\"}","_time":"2023-06-20T15:31:23Z","log.level":"info"}
|
||||||
|
{"_msg":"oh no!","_stream":"{stream=\"stream1\"}","_time":"2023-06-20T15:32:10.567Z","log.level":"error"}
|
||||||
|
```
|
||||||
|
|
||||||
|
See also:
|
||||||
|
|
||||||
|
- [How to debug data ingestion](#troubleshooting).
|
||||||
|
- [HTTP parameters, which can be passed to the API](#http-parameters).
|
||||||
|
- [How to query VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/querying.html).
|
||||||
|
|
||||||
### HTTP parameters
|
### HTTP parameters
|
||||||
|
|
||||||
|
|
|
@ -133,5 +133,5 @@ See also:
|
||||||
|
|
||||||
- [Data ingestion troubleshooting](https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/#troubleshooting).
|
- [Data ingestion troubleshooting](https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/#troubleshooting).
|
||||||
- [How to query VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/querying/).
|
- [How to query VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/querying/).
|
||||||
- [Elasticsearch output docs for Vector.dev](https://vector.dev/docs/reference/configuration/sinks/elasticsearch/).
|
- [Elasticsearch output docs for Vector](https://vector.dev/docs/reference/configuration/sinks/elasticsearch/).
|
||||||
- [Docker-compose demo for Filebeat integration with VictoriaLogs](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker/victorialogs/vector-docker).
|
- [Docker-compose demo for Filebeat integration with VictoriaLogs](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker/victorialogs/vector-docker).
|
||||||
|
|
|
@ -84,6 +84,21 @@ func (p *Parser) ParseLogMessage(msg []byte) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RenameField renames field with the oldName to newName in p.Fields
|
||||||
|
func (p *Parser) RenameField(oldName, newName string) {
|
||||||
|
if oldName == "" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
fields := p.Fields
|
||||||
|
for i := range fields {
|
||||||
|
f := &fields[i]
|
||||||
|
if f.Name == oldName {
|
||||||
|
f.Name = newName
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func appendLogFields(dst []logstorage.Field, dstBuf, prefixBuf []byte, v *fastjson.Value) ([]logstorage.Field, []byte, []byte) {
|
func appendLogFields(dst []logstorage.Field, dstBuf, prefixBuf []byte, v *fastjson.Value) ([]logstorage.Field, []byte, []byte) {
|
||||||
o := v.GetObject()
|
o := v.GetObject()
|
||||||
o.Visit(func(k []byte, v *fastjson.Value) {
|
o.Visit(func(k []byte, v *fastjson.Value) {
|
||||||
|
|
|
@ -457,7 +457,7 @@ type TimeFormatter int64
|
||||||
func (tf *TimeFormatter) String() string {
|
func (tf *TimeFormatter) String() string {
|
||||||
ts := int64(*tf)
|
ts := int64(*tf)
|
||||||
t := time.Unix(0, ts).UTC()
|
t := time.Unix(0, ts).UTC()
|
||||||
return t.Format(time.RFC3339)
|
return t.Format(time.RFC3339Nano)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Storage) getPartitionForDay(day int64) *partitionWrapper {
|
func (s *Storage) getPartitionForDay(day int64) *partitionWrapper {
|
||||||
|
|
Loading…
Reference in a new issue