VictoriaMetrics/app/vlinsert/jsonline/jsonline.go

136 lines
3.8 KiB
Go
Raw Normal View History

package jsonline
import (
"bufio"
"errors"
"fmt"
"io"
"net/http"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
// RequestHandler processes jsonline insert requests
2024-06-17 10:13:18 +00:00
func RequestHandler(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
w.Header().Add("Content-Type", "application/json")
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
2024-06-17 10:13:18 +00:00
return
}
requestsTotal.Inc()
cp, err := insertutils.GetCommonParams(r)
if err != nil {
httpserver.Errorf(w, r, "%s", err)
2024-06-17 10:13:18 +00:00
return
}
lib/logstorage: follow-up for 8a23d08c210c7c2440c224debcff266de3353a64 - Compare the actual free disk space to the value provided via -storage.minFreeDiskSpaceBytes directly inside the Storage.IsReadOnly(). This should work fast in most cases. This simplifies the logic at lib/storage. - Do not take into account -storage.minFreeDiskSpaceBytes during background merges, since it results in uncontrolled growth of small parts when the free disk space approaches -storage.minFreeDiskSpaceBytes. The background merge logic uses another mechanism for determining whether there is enough disk space for the merge - it reserves the needed disk space before the merge and releases it after the merge. This prevents from out of disk space errors during background merge. - Properly handle corner cases for flushing in-memory data to disk when the storage enters read-only mode. This is better than losing the in-memory data. - Return back Storage.MustAddRows() instead of Storage.AddRows(), since the only case when AddRows() can return error is when the storage is in read-only mode. This case must be handled by the caller by calling Storage.IsReadOnly() before adding rows to the storage. This simplifies the code a bit, since the caller of Storage.MustAddRows() shouldn't handle errors returned by Storage.AddRows(). - Properly store parsed logs to Storage if parts of the request contain invalid log lines. Previously the parsed logs could be lost in this case. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4737 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4945
2023-10-02 14:26:02 +00:00
if err := vlstorage.CanWriteData(); err != nil {
httpserver.Errorf(w, r, "%s", err)
2024-06-17 10:13:18 +00:00
return
lib/logstorage: follow-up for 8a23d08c210c7c2440c224debcff266de3353a64 - Compare the actual free disk space to the value provided via -storage.minFreeDiskSpaceBytes directly inside the Storage.IsReadOnly(). This should work fast in most cases. This simplifies the logic at lib/storage. - Do not take into account -storage.minFreeDiskSpaceBytes during background merges, since it results in uncontrolled growth of small parts when the free disk space approaches -storage.minFreeDiskSpaceBytes. The background merge logic uses another mechanism for determining whether there is enough disk space for the merge - it reserves the needed disk space before the merge and releases it after the merge. This prevents from out of disk space errors during background merge. - Properly handle corner cases for flushing in-memory data to disk when the storage enters read-only mode. This is better than losing the in-memory data. - Return back Storage.MustAddRows() instead of Storage.AddRows(), since the only case when AddRows() can return error is when the storage is in read-only mode. This case must be handled by the caller by calling Storage.IsReadOnly() before adding rows to the storage. This simplifies the code a bit, since the caller of Storage.MustAddRows() shouldn't handle errors returned by Storage.AddRows(). - Properly store parsed logs to Storage if parts of the request contain invalid log lines. Previously the parsed logs could be lost in this case. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4737 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4945
2023-10-02 14:26:02 +00:00
}
reader := r.Body
if r.Header.Get("Content-Encoding") == "gzip" {
zr, err := common.GetGzipReader(reader)
if err != nil {
2024-06-17 10:13:18 +00:00
logger.Errorf("cannot read gzipped jsonline request: %s", err)
return
}
defer common.PutGzipReader(zr)
reader = zr
}
lmp := cp.NewLogMessageProcessor()
err = processStreamInternal(reader, cp.TimeField, cp.MsgField, lmp)
lmp.MustClose()
if err != nil {
logger.Errorf("jsonline: %s", err)
} else {
// update requestDuration only for successfully parsed requests.
// There is no need in updating requestDuration for request errors,
// since their timings are usually much smaller than the timing for successful request parsing.
requestDuration.UpdateDuration(startTime)
}
}
func processStreamInternal(r io.Reader, timeField, msgField string, lmp insertutils.LogMessageProcessor) error {
wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr)
lb := lineBufferPool.Get()
defer lineBufferPool.Put(lb)
lb.B = bytesutil.ResizeNoCopyNoOverallocate(lb.B, insertutils.MaxLineSizeBytes.IntN())
sc := bufio.NewScanner(wcr)
sc.Buffer(lb.B, len(lb.B))
n := 0
for {
ok, err := readLine(sc, timeField, msgField, lmp)
wcr.DecConcurrency()
if err != nil {
2024-06-17 10:13:18 +00:00
errorsTotal.Inc()
return fmt.Errorf("cannot read line #%d in /jsonline request: %s", n, err)
}
if !ok {
return nil
}
n++
rowsIngestedTotal.Inc()
}
}
func readLine(sc *bufio.Scanner, timeField, msgField string, lmp insertutils.LogMessageProcessor) (bool, error) {
var line []byte
for len(line) == 0 {
if !sc.Scan() {
if err := sc.Err(); err != nil {
if errors.Is(err, bufio.ErrTooLong) {
return false, fmt.Errorf(`cannot read json line, since its size exceeds -insert.maxLineSizeBytes=%d`, insertutils.MaxLineSizeBytes.IntN())
}
return false, err
}
return false, nil
}
line = sc.Bytes()
}
2024-05-20 02:08:30 +00:00
p := logstorage.GetJSONParser()
2024-05-22 19:01:20 +00:00
if err := p.ParseLogMessage(line); err != nil {
return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err)
}
ts, err := insertutils.ExtractTimestampRFC3339NanoFromFields(timeField, p.Fields)
if err != nil {
2024-06-17 10:13:18 +00:00
return false, fmt.Errorf("cannot get timestamp: %w", err)
app/vlinsert/loki: follow-up after 09df5b66fd7bf33f171d6179748e6f32394ef56e - Parse protobuf if Content-Type isn't set to `application/json` - this behavior is documented at https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki - Properly handle gzip'ped JSON requests. The `gzip` header must be read from `Content-Encoding` instead of `Content-Type` header - Properly flush all the parsed logs with the explicit call to vlstorage.MustAddRows() at the end of query handler - Check JSON field types more strictly. - Allow parsing Loki timestamp as floating-point number. Such a timestamp can be generated by some clients, which store timestamps in float64 instead of int64. - Optimize parsing of Loki labels in Prometheus text exposition format. - Simplify tests. - Remove lib/slicesutil, since there are no more users for it. - Update docs with missing info and fix various typos. For example, it should be enough to have `instance` and `job` labels as stream fields in most Loki setups. - Allow empty of missing timestamps in the ingested logs. The current timestamp at VictoriaLogs side is then used for the ingested logs. This simplifies debugging and testing of the provided HTTP-based data ingestion APIs. The remaining MAJOR issue, which needs to be addressed: victoria-logs binary size increased from 13MB to 22MB after adding support for Loki data ingestion protocol at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4482 . This is because of shitty protobuf dependencies. They must be replaced with another protobuf implementation similar to the one used at lib/prompb or lib/prompbmarshal .
2023-07-20 23:21:47 +00:00
}
2024-06-17 10:13:18 +00:00
logstorage.RenameField(p.Fields, msgField, "_msg")
lmp.AddRow(ts, p.Fields)
2024-05-20 02:08:30 +00:00
logstorage.PutJSONParser(p)
return true, nil
}
var lineBufferPool bytesutil.ByteBufferPool
var (
2024-06-17 10:13:18 +00:00
rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="jsonline"}`)
requestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/jsonline"}`)
errorsTotal = metrics.NewCounter(`vl_http_errors_total{path="/insert/jsonline"}`)
requestDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/jsonline"}`)
)