mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
218 lines
5.6 KiB
Go
218 lines
5.6 KiB
Go
package loki
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
"github.com/golang/snappy"
|
|
)
|
|
|
|
var (
|
|
bytesBufPool bytesutil.ByteBufferPool
|
|
pushReqsPool sync.Pool
|
|
)
|
|
|
|
func handleProtobuf(r *http.Request, w http.ResponseWriter) {
|
|
startTime := time.Now()
|
|
requestsProtobufTotal.Inc()
|
|
wcr := writeconcurrencylimiter.GetReader(r.Body)
|
|
data, err := io.ReadAll(wcr)
|
|
writeconcurrencylimiter.PutReader(wcr)
|
|
if err != nil {
|
|
httpserver.Errorf(w, r, "cannot read request body: %s", err)
|
|
return
|
|
}
|
|
|
|
cp, err := getCommonParams(r)
|
|
if err != nil {
|
|
httpserver.Errorf(w, r, "cannot parse common params from request: %s", err)
|
|
return
|
|
}
|
|
if err := vlstorage.CanWriteData(); err != nil {
|
|
httpserver.Errorf(w, r, "%s", err)
|
|
return
|
|
}
|
|
lmp := cp.NewLogMessageProcessor()
|
|
n, err := parseProtobufRequest(data, lmp)
|
|
lmp.MustClose()
|
|
if err != nil {
|
|
httpserver.Errorf(w, r, "cannot parse Loki protobuf request: %s", err)
|
|
return
|
|
}
|
|
|
|
rowsIngestedProtobufTotal.Add(n)
|
|
|
|
// update requestProtobufDuration only for successfully parsed requests
|
|
// There is no need in updating requestProtobufDuration for request errors,
|
|
// since their timings are usually much smaller than the timing for successful request parsing.
|
|
requestProtobufDuration.UpdateDuration(startTime)
|
|
}
|
|
|
|
var (
|
|
requestsProtobufTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="protobuf"}`)
|
|
rowsIngestedProtobufTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="protobuf"}`)
|
|
requestProtobufDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="protobuf"}`)
|
|
)
|
|
|
|
func parseProtobufRequest(data []byte, lmp insertutils.LogMessageProcessor) (int, error) {
|
|
bb := bytesBufPool.Get()
|
|
defer bytesBufPool.Put(bb)
|
|
|
|
buf, err := snappy.Decode(bb.B[:cap(bb.B)], data)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("cannot decode snappy-encoded request body: %w", err)
|
|
}
|
|
bb.B = buf
|
|
|
|
req := getPushRequest()
|
|
defer putPushRequest(req)
|
|
|
|
err = req.UnmarshalProtobuf(bb.B)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("cannot parse request body: %w", err)
|
|
}
|
|
|
|
fields := getFields()
|
|
defer putFields(fields)
|
|
|
|
rowsIngested := 0
|
|
streams := req.Streams
|
|
currentTimestamp := time.Now().UnixNano()
|
|
for i := range streams {
|
|
stream := &streams[i]
|
|
// st.Labels contains labels for the stream.
|
|
// Labels are same for all entries in the stream.
|
|
fields.fields, err = parsePromLabels(fields.fields[:0], stream.Labels)
|
|
if err != nil {
|
|
return rowsIngested, fmt.Errorf("cannot parse stream labels %q: %w", stream.Labels, err)
|
|
}
|
|
commonFieldsLen := len(fields.fields)
|
|
|
|
entries := stream.Entries
|
|
for j := range entries {
|
|
e := &entries[j]
|
|
fields.fields = fields.fields[:commonFieldsLen]
|
|
|
|
for _, lp := range e.StructuredMetadata {
|
|
fields.fields = append(fields.fields, logstorage.Field{
|
|
Name: lp.Name,
|
|
Value: lp.Value,
|
|
})
|
|
}
|
|
|
|
fields.fields = append(fields.fields, logstorage.Field{
|
|
Name: "_msg",
|
|
Value: e.Line,
|
|
})
|
|
|
|
ts := e.Timestamp.UnixNano()
|
|
if ts == 0 {
|
|
ts = currentTimestamp
|
|
}
|
|
|
|
lmp.AddRow(ts, fields.fields)
|
|
}
|
|
rowsIngested += len(stream.Entries)
|
|
}
|
|
return rowsIngested, nil
|
|
}
|
|
|
|
func getFields() *fields {
|
|
v := fieldsPool.Get()
|
|
if v == nil {
|
|
return &fields{}
|
|
}
|
|
return v.(*fields)
|
|
}
|
|
|
|
func putFields(f *fields) {
|
|
f.fields = f.fields[:0]
|
|
fieldsPool.Put(f)
|
|
}
|
|
|
|
var fieldsPool sync.Pool
|
|
|
|
type fields struct {
|
|
fields []logstorage.Field
|
|
}
|
|
|
|
// parsePromLabels parses log fields in Prometheus text exposition format from s, appends them to dst and returns the result.
|
|
//
|
|
// See test data of promtail for examples: https://github.com/grafana/loki/blob/a24ef7b206e0ca63ee74ca6ecb0a09b745cd2258/pkg/push/types_test.go
|
|
func parsePromLabels(dst []logstorage.Field, s string) ([]logstorage.Field, error) {
|
|
// Make sure s is wrapped into `{...}`
|
|
s = strings.TrimSpace(s)
|
|
if len(s) < 2 {
|
|
return nil, fmt.Errorf("too short string to parse: %q", s)
|
|
}
|
|
if s[0] != '{' {
|
|
return nil, fmt.Errorf("missing `{` at the beginning of %q", s)
|
|
}
|
|
if s[len(s)-1] != '}' {
|
|
return nil, fmt.Errorf("missing `}` at the end of %q", s)
|
|
}
|
|
s = s[1 : len(s)-1]
|
|
|
|
for len(s) > 0 {
|
|
// Parse label name
|
|
n := strings.IndexByte(s, '=')
|
|
if n < 0 {
|
|
return nil, fmt.Errorf("cannot find `=` char for label value at %s", s)
|
|
}
|
|
name := s[:n]
|
|
s = s[n+1:]
|
|
|
|
// Parse label value
|
|
qs, err := strconv.QuotedPrefix(s)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse value for label %q at %s: %w", name, s, err)
|
|
}
|
|
s = s[len(qs):]
|
|
value, err := strconv.Unquote(qs)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot unquote value %q for label %q: %w", qs, name, err)
|
|
}
|
|
|
|
// Append the found field to dst.
|
|
dst = append(dst, logstorage.Field{
|
|
Name: name,
|
|
Value: value,
|
|
})
|
|
|
|
// Check whether there are other labels remaining
|
|
if len(s) == 0 {
|
|
break
|
|
}
|
|
if !strings.HasPrefix(s, ",") {
|
|
return nil, fmt.Errorf("missing `,` char at %s", s)
|
|
}
|
|
s = s[1:]
|
|
s = strings.TrimPrefix(s, " ")
|
|
}
|
|
return dst, nil
|
|
}
|
|
|
|
func getPushRequest() *PushRequest {
|
|
v := pushReqsPool.Get()
|
|
if v == nil {
|
|
return &PushRequest{}
|
|
}
|
|
return v.(*PushRequest)
|
|
}
|
|
|
|
func putPushRequest(req *PushRequest) {
|
|
req.reset()
|
|
pushReqsPool.Put(req)
|
|
}
|