2023-06-21 13:31:28 +00:00
|
|
|
package jsonline
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
2024-06-17 20:28:15 +00:00
|
|
|
"io"
|
2023-06-21 13:31:28 +00:00
|
|
|
"net/http"
|
|
|
|
"time"
|
|
|
|
|
2023-06-22 02:39:22 +00:00
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils"
|
2023-06-21 13:31:28 +00:00
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
2025-03-18 15:24:48 +00:00
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
|
2023-06-21 13:31:28 +00:00
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
|
|
|
"github.com/VictoriaMetrics/metrics"
|
|
|
|
)
|
|
|
|
|
|
|
|
// RequestHandler processes jsonline insert requests
|
2024-06-17 10:13:18 +00:00
|
|
|
func RequestHandler(w http.ResponseWriter, r *http.Request) {
|
2023-09-18 21:58:32 +00:00
|
|
|
startTime := time.Now()
|
2023-06-21 13:31:28 +00:00
|
|
|
w.Header().Add("Content-Type", "application/json")
|
|
|
|
|
2023-06-22 02:39:22 +00:00
|
|
|
if r.Method != "POST" {
|
2023-06-21 13:31:28 +00:00
|
|
|
w.WriteHeader(http.StatusMethodNotAllowed)
|
2024-06-17 10:13:18 +00:00
|
|
|
return
|
2023-06-21 13:31:28 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
requestsTotal.Inc()
|
|
|
|
|
2023-06-22 02:39:22 +00:00
|
|
|
cp, err := insertutils.GetCommonParams(r)
|
2023-06-21 13:31:28 +00:00
|
|
|
if err != nil {
|
|
|
|
httpserver.Errorf(w, r, "%s", err)
|
2024-06-17 10:13:18 +00:00
|
|
|
return
|
2023-06-21 13:31:28 +00:00
|
|
|
}
|
2023-10-02 14:26:02 +00:00
|
|
|
if err := vlstorage.CanWriteData(); err != nil {
|
|
|
|
httpserver.Errorf(w, r, "%s", err)
|
2024-06-17 10:13:18 +00:00
|
|
|
return
|
2023-10-02 14:26:02 +00:00
|
|
|
}
|
2023-06-21 13:31:28 +00:00
|
|
|
|
2025-03-14 17:10:37 +00:00
|
|
|
encoding := r.Header.Get("Content-Encoding")
|
2025-03-18 15:24:48 +00:00
|
|
|
reader, err := protoparserutil.GetUncompressedReader(r.Body, encoding)
|
2025-03-14 17:10:37 +00:00
|
|
|
if err != nil {
|
|
|
|
logger.Errorf("cannot decode jsonline request: %s", err)
|
|
|
|
return
|
2023-06-21 13:31:28 +00:00
|
|
|
}
|
2025-03-18 15:24:48 +00:00
|
|
|
defer protoparserutil.PutUncompressedReader(reader)
|
2023-06-21 13:31:28 +00:00
|
|
|
|
2025-03-18 10:13:21 +00:00
|
|
|
lmp := cp.NewLogMessageProcessor("jsonline", true)
|
2024-12-03 15:06:56 +00:00
|
|
|
streamName := fmt.Sprintf("remoteAddr=%s, requestURI=%q", httpserver.GetQuotedRemoteAddr(r), r.RequestURI)
|
2025-02-10 13:31:56 +00:00
|
|
|
processStreamInternal(streamName, reader, cp.TimeField, cp.MsgFields, lmp)
|
2024-06-17 20:28:15 +00:00
|
|
|
lmp.MustClose()
|
|
|
|
|
2025-02-10 13:31:56 +00:00
|
|
|
requestDuration.UpdateDuration(startTime)
|
2024-06-17 20:28:15 +00:00
|
|
|
}
|
|
|
|
|
2025-02-10 13:31:56 +00:00
|
|
|
func processStreamInternal(streamName string, r io.Reader, timeField string, msgFields []string, lmp insertutils.LogMessageProcessor) {
|
2024-06-17 20:28:15 +00:00
|
|
|
wcr := writeconcurrencylimiter.GetReader(r)
|
2023-06-21 13:31:28 +00:00
|
|
|
defer writeconcurrencylimiter.PutReader(wcr)
|
|
|
|
|
2024-12-03 15:06:56 +00:00
|
|
|
lr := insertutils.NewLineReader(streamName, wcr)
|
2023-06-21 13:31:28 +00:00
|
|
|
|
|
|
|
n := 0
|
|
|
|
for {
|
2024-12-03 15:06:56 +00:00
|
|
|
ok, err := readLine(lr, timeField, msgFields, lmp)
|
2023-06-21 13:31:28 +00:00
|
|
|
wcr.DecConcurrency()
|
|
|
|
if err != nil {
|
2024-06-17 10:13:18 +00:00
|
|
|
errorsTotal.Inc()
|
2025-02-10 13:31:56 +00:00
|
|
|
logger.Warnf("jsonline: cannot read line #%d in /jsonline request: %s", n, err)
|
2023-06-21 13:31:28 +00:00
|
|
|
}
|
|
|
|
if !ok {
|
2025-02-10 13:31:56 +00:00
|
|
|
return
|
2023-06-21 13:31:28 +00:00
|
|
|
}
|
|
|
|
n++
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-12-03 15:06:56 +00:00
|
|
|
func readLine(lr *insertutils.LineReader, timeField string, msgFields []string, lmp insertutils.LogMessageProcessor) (bool, error) {
|
2023-06-22 02:39:22 +00:00
|
|
|
var line []byte
|
|
|
|
for len(line) == 0 {
|
2024-12-03 15:06:56 +00:00
|
|
|
if !lr.NextLine() {
|
|
|
|
err := lr.Err()
|
|
|
|
return false, err
|
2023-06-21 13:31:28 +00:00
|
|
|
}
|
2024-12-03 15:06:56 +00:00
|
|
|
line = lr.Line
|
2023-06-21 13:31:28 +00:00
|
|
|
}
|
|
|
|
|
2024-05-20 02:08:30 +00:00
|
|
|
p := logstorage.GetJSONParser()
|
2025-02-10 13:31:56 +00:00
|
|
|
defer logstorage.PutJSONParser(p)
|
|
|
|
|
2024-05-22 19:01:20 +00:00
|
|
|
if err := p.ParseLogMessage(line); err != nil {
|
2025-02-10 13:31:56 +00:00
|
|
|
return true, fmt.Errorf("cannot parse json-encoded line: %w; line contents: %q", err, line)
|
2023-06-21 13:31:28 +00:00
|
|
|
}
|
2025-02-09 21:36:22 +00:00
|
|
|
ts, err := insertutils.ExtractTimestampFromFields(timeField, p.Fields)
|
2023-06-21 13:31:28 +00:00
|
|
|
if err != nil {
|
2025-02-10 13:31:56 +00:00
|
|
|
return true, fmt.Errorf("cannot get timestamp from json-encoded line: %w; line contents: %q", err, line)
|
2023-07-20 23:21:47 +00:00
|
|
|
}
|
2024-10-30 13:13:56 +00:00
|
|
|
logstorage.RenameField(p.Fields, msgFields, "_msg")
|
2024-12-04 12:57:20 +00:00
|
|
|
lmp.AddRow(ts, p.Fields, nil)
|
2023-09-29 09:55:38 +00:00
|
|
|
|
2023-06-21 13:31:28 +00:00
|
|
|
return true, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
var (
|
2024-06-17 10:13:18 +00:00
|
|
|
requestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/jsonline"}`)
|
|
|
|
errorsTotal = metrics.NewCounter(`vl_http_errors_total{path="/insert/jsonline"}`)
|
|
|
|
|
|
|
|
requestDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/jsonline"}`)
|
2023-06-21 13:31:28 +00:00
|
|
|
)
|