mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-02-19 15:30:17 +00:00
jsonline support for data ingestion in vlinsert (#4487)
added json lines / json stream format for ingestion to vlinsert
This commit is contained in:
parent
b16a5ee705
commit
d9d759bc90
6 changed files with 329 additions and 12 deletions
8
app/vlinsert/common/flags.go
Normal file
8
app/vlinsert/common/flags.go
Normal file
|
@ -0,0 +1,8 @@
|
||||||
|
package common
|
||||||
|
|
||||||
|
import "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||||
|
|
||||||
|
var (
|
||||||
|
// MaxLineSizeBytes is the maximum size of a single line, which can be read by /insert/* handlers
|
||||||
|
MaxLineSizeBytes = flagutil.NewBytes("insert.maxLineSizeBytes", 256*1024, "The maximum size of a single line, which can be read by /insert/* handlers")
|
||||||
|
)
|
|
@ -11,24 +11,20 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/common"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bufferedwriter"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bufferedwriter"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logjson"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logjson"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
pc "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
|
||||||
maxLineSizeBytes = flagutil.NewBytes("insert.maxLineSizeBytes", 256*1024, "The maximum size of a single line, which can be read by /insert/* handlers")
|
|
||||||
)
|
|
||||||
|
|
||||||
// RequestHandler processes ElasticSearch insert requests
|
// RequestHandler processes ElasticSearch insert requests
|
||||||
func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
|
func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
|
||||||
w.Header().Add("Content-Type", "application/json")
|
w.Header().Add("Content-Type", "application/json")
|
||||||
|
@ -165,11 +161,11 @@ func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string,
|
||||||
// See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
|
// See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
|
||||||
|
|
||||||
if isGzip {
|
if isGzip {
|
||||||
zr, err := common.GetGzipReader(r)
|
zr, err := pc.GetGzipReader(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("cannot read gzipped _bulk request: %w", err)
|
return 0, fmt.Errorf("cannot read gzipped _bulk request: %w", err)
|
||||||
}
|
}
|
||||||
defer common.PutGzipReader(zr)
|
defer pc.PutGzipReader(zr)
|
||||||
r = zr
|
r = zr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -179,7 +175,7 @@ func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string,
|
||||||
lb := lineBufferPool.Get()
|
lb := lineBufferPool.Get()
|
||||||
defer lineBufferPool.Put(lb)
|
defer lineBufferPool.Put(lb)
|
||||||
|
|
||||||
lb.B = bytesutil.ResizeNoCopyNoOverallocate(lb.B, maxLineSizeBytes.IntN())
|
lb.B = bytesutil.ResizeNoCopyNoOverallocate(lb.B, common.MaxLineSizeBytes.IntN())
|
||||||
sc := bufio.NewScanner(wcr)
|
sc := bufio.NewScanner(wcr)
|
||||||
sc.Buffer(lb.B, len(lb.B))
|
sc.Buffer(lb.B, len(lb.B))
|
||||||
|
|
||||||
|
@ -215,7 +211,7 @@ func readBulkLine(sc *bufio.Scanner, timeField, msgField string,
|
||||||
if err := sc.Err(); err != nil {
|
if err := sc.Err(); err != nil {
|
||||||
if errors.Is(err, bufio.ErrTooLong) {
|
if errors.Is(err, bufio.ErrTooLong) {
|
||||||
return false, fmt.Errorf(`cannot read "create" or "index" command, since its size exceeds -insert.maxLineSizeBytes=%d`,
|
return false, fmt.Errorf(`cannot read "create" or "index" command, since its size exceeds -insert.maxLineSizeBytes=%d`,
|
||||||
maxLineSizeBytes.IntN())
|
common.MaxLineSizeBytes.IntN())
|
||||||
}
|
}
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
@ -232,7 +228,7 @@ func readBulkLine(sc *bufio.Scanner, timeField, msgField string,
|
||||||
if !sc.Scan() {
|
if !sc.Scan() {
|
||||||
if err := sc.Err(); err != nil {
|
if err := sc.Err(); err != nil {
|
||||||
if errors.Is(err, bufio.ErrTooLong) {
|
if errors.Is(err, bufio.ErrTooLong) {
|
||||||
return false, fmt.Errorf("cannot read log message, since its size exceeds -insert.maxLineSizeBytes=%d", maxLineSizeBytes.IntN())
|
return false, fmt.Errorf("cannot read log message, since its size exceeds -insert.maxLineSizeBytes=%d", common.MaxLineSizeBytes.IntN())
|
||||||
}
|
}
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
229
app/vlinsert/jsonline/jsonline.go
Normal file
229
app/vlinsert/jsonline/jsonline.go
Normal file
|
@ -0,0 +1,229 @@
|
||||||
|
package jsonline
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logjson"
|
||||||
|
"math"
|
||||||
|
"net/http"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/common"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
||||||
|
pc "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
||||||
|
"github.com/VictoriaMetrics/metrics"
|
||||||
|
)
|
||||||
|
|
||||||
|
// RequestHandler processes jsonline insert requests
|
||||||
|
func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
|
||||||
|
w.Header().Add("Content-Type", "application/json")
|
||||||
|
|
||||||
|
if path != "/" {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if method := r.Method; method != "POST" {
|
||||||
|
w.WriteHeader(http.StatusMethodNotAllowed)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
requestsTotal.Inc()
|
||||||
|
|
||||||
|
// Extract tenantID
|
||||||
|
tenantID, err := logstorage.GetTenantIDFromRequest(r)
|
||||||
|
if err != nil {
|
||||||
|
httpserver.Errorf(w, r, "%s", err)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extract time field name from _time_field query arg
|
||||||
|
var timeField = "_time"
|
||||||
|
if tf := r.FormValue("_time_field"); tf != "" {
|
||||||
|
timeField = tf
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extract message field name from _msg_field query arg
|
||||||
|
var msgField = ""
|
||||||
|
if msgf := r.FormValue("_msg_field"); msgf != "" {
|
||||||
|
msgField = msgf
|
||||||
|
}
|
||||||
|
|
||||||
|
streamFields := httputils.GetArray(r, "_stream_fields")
|
||||||
|
ignoreFields := httputils.GetArray(r, "ignore_fields")
|
||||||
|
|
||||||
|
isDebug := httputils.GetBool(r, "debug")
|
||||||
|
debugRequestURI := ""
|
||||||
|
debugRemoteAddr := ""
|
||||||
|
if isDebug {
|
||||||
|
debugRequestURI = httpserver.GetRequestURI(r)
|
||||||
|
debugRemoteAddr = httpserver.GetQuotedRemoteAddr(r)
|
||||||
|
}
|
||||||
|
|
||||||
|
lr := logstorage.GetLogRows(streamFields, ignoreFields)
|
||||||
|
processLogMessage := func(timestamp int64, fields []logstorage.Field) {
|
||||||
|
lr.MustAdd(tenantID, timestamp, fields)
|
||||||
|
if isDebug {
|
||||||
|
s := lr.GetRowString(0)
|
||||||
|
lr.ResetKeepSettings()
|
||||||
|
logger.Infof("remoteAddr=%s; requestURI=%s; ignoring log entry because of `debug` query arg: %s", debugRemoteAddr, debugRequestURI, s)
|
||||||
|
rowsDroppedTotal.Inc()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if lr.NeedFlush() {
|
||||||
|
vlstorage.MustAddRows(lr)
|
||||||
|
lr.ResetKeepSettings()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
reader := r.Body
|
||||||
|
if r.Header.Get("Content-Encoding") == "gzip" {
|
||||||
|
zr, err := pc.GetGzipReader(reader)
|
||||||
|
if err != nil {
|
||||||
|
//return 0, fmt.Errorf("cannot read gzipped _bulk request: %w", err)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
defer pc.PutGzipReader(zr)
|
||||||
|
reader = zr
|
||||||
|
}
|
||||||
|
|
||||||
|
wcr := writeconcurrencylimiter.GetReader(reader)
|
||||||
|
defer writeconcurrencylimiter.PutReader(wcr)
|
||||||
|
|
||||||
|
lb := lineBufferPool.Get()
|
||||||
|
defer lineBufferPool.Put(lb)
|
||||||
|
|
||||||
|
lb.B = bytesutil.ResizeNoCopyNoOverallocate(lb.B, common.MaxLineSizeBytes.IntN())
|
||||||
|
sc := bufio.NewScanner(wcr)
|
||||||
|
sc.Buffer(lb.B, len(lb.B))
|
||||||
|
|
||||||
|
n := 0
|
||||||
|
for {
|
||||||
|
ok, err := readLine(sc, timeField, msgField, processLogMessage)
|
||||||
|
wcr.DecConcurrency()
|
||||||
|
if err != nil {
|
||||||
|
logger.Errorf("cannot read line #%d in /jsonline request: %s", n, err)
|
||||||
|
}
|
||||||
|
if !ok {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
n++
|
||||||
|
rowsIngestedTotal.Inc()
|
||||||
|
}
|
||||||
|
|
||||||
|
vlstorage.MustAddRows(lr)
|
||||||
|
logstorage.PutLogRows(lr)
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func readLine(sc *bufio.Scanner, timeField, msgField string, processLogMessage func(timestamp int64, fields []logstorage.Field)) (bool, error) {
|
||||||
|
if !sc.Scan() {
|
||||||
|
if err := sc.Err(); err != nil {
|
||||||
|
if errors.Is(err, bufio.ErrTooLong) {
|
||||||
|
return false, fmt.Errorf(`cannot read json line, since its size exceeds -insert.maxLineSizeBytes=%d`, common.MaxLineSizeBytes.IntN())
|
||||||
|
}
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
line := sc.Bytes()
|
||||||
|
p := logjson.GetParser()
|
||||||
|
|
||||||
|
llll.Warnf("\n----\n%s\n----\n", line)
|
||||||
|
|
||||||
|
if err := p.ParseLogMessage(line); err != nil {
|
||||||
|
invalidJSONLineLogger.Warnf("cannot parse json-encoded log entry: %s", err)
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
timestamp, err := extractTimestampFromFields(timeField, p.Fields)
|
||||||
|
if err != nil {
|
||||||
|
invalidTimestampLogger.Warnf("skipping the log entry because cannot parse timestamp: %s", err)
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
updateMessageFieldName(msgField, p.Fields)
|
||||||
|
processLogMessage(timestamp, p.Fields)
|
||||||
|
logjson.PutParser(p)
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func extractTimestampFromFields(timeField string, fields []logstorage.Field) (int64, error) {
|
||||||
|
for i := range fields {
|
||||||
|
f := &fields[i]
|
||||||
|
if f.Name != timeField {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
timestamp, err := parseTimestamp(f.Value)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
f.Value = ""
|
||||||
|
return timestamp, nil
|
||||||
|
}
|
||||||
|
return time.Now().UnixNano(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func updateMessageFieldName(msgField string, fields []logstorage.Field) {
|
||||||
|
if msgField == "" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for i := range fields {
|
||||||
|
f := &fields[i]
|
||||||
|
if f.Name == msgField {
|
||||||
|
f.Name = "_msg"
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseTimestamp(s string) (int64, error) {
|
||||||
|
if len(s) < len("YYYY-MM-DD") || s[len("YYYY")] != '-' {
|
||||||
|
// Try parsing timestamp in milliseconds
|
||||||
|
n, err := strconv.ParseInt(s, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("cannot parse timestamp in milliseconds from %q: %w", s, err)
|
||||||
|
}
|
||||||
|
if n > int64(math.MaxInt64)/1e6 {
|
||||||
|
return 0, fmt.Errorf("too big timestamp in milliseconds: %d; mustn't exceed %d", n, int64(math.MaxInt64)/1e6)
|
||||||
|
}
|
||||||
|
if n < int64(math.MinInt64)/1e6 {
|
||||||
|
return 0, fmt.Errorf("too small timestamp in milliseconds: %d; must be bigger than %d", n, int64(math.MinInt64)/1e6)
|
||||||
|
}
|
||||||
|
n *= 1e6
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
if len(s) == len("YYYY-MM-DD") {
|
||||||
|
t, err := time.Parse("2006-01-02", s)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("cannot parse date %q: %w", s, err)
|
||||||
|
}
|
||||||
|
return t.UnixNano(), nil
|
||||||
|
}
|
||||||
|
t, err := time.Parse(time.RFC3339, s)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("cannot parse timestamp %q: %w", s, err)
|
||||||
|
}
|
||||||
|
return t.UnixNano(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var lineBufferPool bytesutil.ByteBufferPool
|
||||||
|
|
||||||
|
var (
|
||||||
|
requestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/jsonline"}`)
|
||||||
|
rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="jsonline"}`)
|
||||||
|
rowsDroppedTotal = metrics.NewCounter(`vl_rows_dropped_total{path="/insert/jsonline",reason="debug"}`)
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
invalidTimestampLogger = logger.WithThrottler("invalidTimestampLogger", 5*time.Second)
|
||||||
|
invalidJSONLineLogger = logger.WithThrottler("invalidJSONLineLogger", 5*time.Second)
|
||||||
|
llll = logger.WithThrottler("llll", 2*time.Second)
|
||||||
|
)
|
70
app/vlinsert/jsonline/jsonline_test.go
Normal file
70
app/vlinsert/jsonline/jsonline_test.go
Normal file
|
@ -0,0 +1,70 @@
|
||||||
|
package jsonline
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
||||||
|
"reflect"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestReadBulkRequestSuccess(t *testing.T) {
|
||||||
|
f := func(data, timeField, msgField string, rowsExpected int, timestampsExpected []int64, resultExpected string) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
var timestamps []int64
|
||||||
|
var result string
|
||||||
|
processLogMessage := func(timestamp int64, fields []logstorage.Field) {
|
||||||
|
timestamps = append(timestamps, timestamp)
|
||||||
|
|
||||||
|
a := make([]string, len(fields))
|
||||||
|
for i, f := range fields {
|
||||||
|
a[i] = fmt.Sprintf("%q:%q", f.Name, f.Value)
|
||||||
|
}
|
||||||
|
s := "{" + strings.Join(a, ",") + "}\n"
|
||||||
|
result += s
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read the request without compression
|
||||||
|
r := bytes.NewBufferString(data)
|
||||||
|
sc := bufio.NewScanner(r)
|
||||||
|
rows := 0
|
||||||
|
for {
|
||||||
|
ok, err := readLine(sc, timeField, msgField, processLogMessage)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %s", err)
|
||||||
|
}
|
||||||
|
if !ok {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
rows++
|
||||||
|
}
|
||||||
|
if rows != rowsExpected {
|
||||||
|
t.Fatalf("unexpected rows read; got %d; want %d", rows, rowsExpected)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(timestamps, timestampsExpected) {
|
||||||
|
t.Fatalf("unexpected timestamps;\ngot\n%d\nwant\n%d", timestamps, timestampsExpected)
|
||||||
|
}
|
||||||
|
if result != resultExpected {
|
||||||
|
t.Fatalf("unexpected result;\ngot\n%s\nwant\n%s", result, resultExpected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify non-empty data
|
||||||
|
data := `{"@timestamp":"2023-06-06T04:48:11.735Z","log":{"offset":71770,"file":{"path":"/var/log/auth.log"}},"message":"foobar"}
|
||||||
|
{"@timestamp":"2023-06-06T04:48:12.735Z","message":"baz"}
|
||||||
|
{"message":"xyz","@timestamp":"2023-06-06T04:48:13.735Z","x":"y"}
|
||||||
|
`
|
||||||
|
timeField := "@timestamp"
|
||||||
|
msgField := "message"
|
||||||
|
rowsExpected := 3
|
||||||
|
timestampsExpected := []int64{1686026891735000000, 1686026892735000000, 1686026893735000000}
|
||||||
|
resultExpected := `{"@timestamp":"","log.offset":"71770","log.file.path":"/var/log/auth.log","_msg":"foobar"}
|
||||||
|
{"@timestamp":"","_msg":"baz"}
|
||||||
|
{"_msg":"xyz","@timestamp":"","x":"y"}
|
||||||
|
`
|
||||||
|
f(data, timeField, msgField, rowsExpected, timestampsExpected, resultExpected)
|
||||||
|
}
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/elasticsearch"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/elasticsearch"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/jsonline"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Init initializes vlinsert
|
// Init initializes vlinsert
|
||||||
|
@ -28,6 +29,9 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||||
case strings.HasPrefix(path, "/elasticsearch/"):
|
case strings.HasPrefix(path, "/elasticsearch/"):
|
||||||
path = strings.TrimPrefix(path, "/elasticsearch")
|
path = strings.TrimPrefix(path, "/elasticsearch")
|
||||||
return elasticsearch.RequestHandler(path, w, r)
|
return elasticsearch.RequestHandler(path, w, r)
|
||||||
|
case strings.HasPrefix(path, "/jsonline"):
|
||||||
|
path = strings.TrimPrefix(path, "/jsonline")
|
||||||
|
return jsonline.RequestHandler(path, w, r)
|
||||||
default:
|
default:
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,8 +45,18 @@ The command should return the following response:
|
||||||
|
|
||||||
### JSON stream API
|
### JSON stream API
|
||||||
|
|
||||||
TODO: document JSON stream API
|
VictoriaLogs supports HTTP API on `/insert/jsonline` endpoint for data ingestion where
|
||||||
|
body contains a JSON object in each line (separated by `\n`).
|
||||||
|
|
||||||
|
Here is an example:
|
||||||
|
|
||||||
|
```http request
|
||||||
|
POST http://localhost:9428/insert/jsonline/?_stream_fields=stream&_msg_field=log&_time_field=date
|
||||||
|
Content-Type: application/jsonl
|
||||||
|
{ "log": { "level": "info", "message": "hello world" }, "date": "2023‐06‐20T15:31:23Z", "stream": "stream1" }
|
||||||
|
{ "log": { "level": "error", "message": "oh no!" }, "date": "2023‐06‐20T15:32:10Z", "stream": "stream1" }
|
||||||
|
{ "log": { "level": "info", "message": "hello world" }, "date": "2023‐06‐20T15:35:11Z", "stream": "stream2" }
|
||||||
|
```
|
||||||
|
|
||||||
### HTTP parameters
|
### HTTP parameters
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue