app/vlinsert/loki: follow-up after 09df5b66fd

- Parse protobuf if Content-Type isn't set to `application/json` - this behavior is documented at https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki

- Properly handle gzip'ped JSON requests. The `gzip` header must be read from `Content-Encoding` instead of `Content-Type` header

- Properly flush all the parsed logs with the explicit call to vlstorage.MustAddRows() at the end of query handler

- Check JSON field types more strictly.

- Allow parsing Loki timestamp as floating-point number. Such a timestamp can be generated by some clients,
  which store timestamps in float64 instead of int64.

- Optimize parsing of Loki labels in Prometheus text exposition format.

- Simplify tests.

- Remove lib/slicesutil, since there are no more users for it.

- Update docs with missing info and fix various typos. For example, it should be enough to have `instance` and `job` labels
  as stream fields in most Loki setups.

- Allow empty of missing timestamps in the ingested logs.
  The current timestamp at VictoriaLogs side is then used for the ingested logs.
  This simplifies debugging and testing of the provided HTTP-based data ingestion APIs.

The remaining MAJOR issue, which needs to be addressed: victoria-logs binary size increased from 13MB to 22MB
after adding support for Loki data ingestion protocol at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4482 .
This is because of shitty protobuf dependencies. They must be replaced with another protobuf implementation
similar to the one used at lib/prompb or lib/prompbmarshal .
This commit is contained in:
Aliaksandr Valialkin 2023-07-20 16:21:47 -07:00
parent 41f0227af2
commit 30098ac8bd
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
17 changed files with 647 additions and 337 deletions

2
.gitignore vendored
View file

@ -8,11 +8,11 @@
*.test *.test
*.swp *.swp
/gocache-for-docker /gocache-for-docker
/victoria-logs-data
/victoria-metrics-data /victoria-metrics-data
/vmagent-remotewrite-data /vmagent-remotewrite-data
/vmstorage-data /vmstorage-data
/vmselect-cache /vmselect-cache
/victoria-logs-data
/package/temp-deb-* /package/temp-deb-*
/package/temp-rpm-* /package/temp-rpm-*
/package/*.deb /package/*.deb

View file

@ -199,12 +199,15 @@ func readBulkLine(sc *bufio.Scanner, timeField, msgField string,
return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err) return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err)
} }
timestamp, err := extractTimestampFromFields(timeField, p.Fields) ts, err := extractTimestampFromFields(timeField, p.Fields)
if err != nil { if err != nil {
return false, fmt.Errorf("cannot parse timestamp: %w", err) return false, fmt.Errorf("cannot parse timestamp: %w", err)
} }
if ts == 0 {
ts = time.Now().UnixNano()
}
p.RenameField(msgField, "_msg") p.RenameField(msgField, "_msg")
processLogMessage(timestamp, p.Fields) processLogMessage(ts, p.Fields)
logjson.PutParser(p) logjson.PutParser(p)
return true, nil return true, nil
} }
@ -222,10 +225,15 @@ func extractTimestampFromFields(timeField string, fields []logstorage.Field) (in
f.Value = "" f.Value = ""
return timestamp, nil return timestamp, nil
} }
return time.Now().UnixNano(), nil return 0, nil
} }
func parseElasticsearchTimestamp(s string) (int64, error) { func parseElasticsearchTimestamp(s string) (int64, error) {
if s == "0" || s == "" {
// Special case - zero or empty timestamp must be substituted
// with the current time by the caller.
return 0, nil
}
if len(s) < len("YYYY-MM-DD") || s[len("YYYY")] != '-' { if len(s) < len("YYYY-MM-DD") || s[len("YYYY")] != '-' {
// Try parsing timestamp in milliseconds // Try parsing timestamp in milliseconds
n, err := strconv.ParseInt(s, 10, 64) n, err := strconv.ParseInt(s, 10, 64)

View file

@ -99,12 +99,15 @@ func readLine(sc *bufio.Scanner, timeField, msgField string, processLogMessage f
if err := p.ParseLogMessage(line); err != nil { if err := p.ParseLogMessage(line); err != nil {
return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err) return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err)
} }
timestamp, err := extractTimestampFromFields(timeField, p.Fields) ts, err := extractTimestampFromFields(timeField, p.Fields)
if err != nil { if err != nil {
return false, fmt.Errorf("cannot parse timestamp: %w", err) return false, fmt.Errorf("cannot parse timestamp: %w", err)
} }
if ts == 0 {
ts = time.Now().UnixNano()
}
p.RenameField(msgField, "_msg") p.RenameField(msgField, "_msg")
processLogMessage(timestamp, p.Fields) processLogMessage(ts, p.Fields)
logjson.PutParser(p) logjson.PutParser(p)
return true, nil return true, nil
} }
@ -122,10 +125,15 @@ func extractTimestampFromFields(timeField string, fields []logstorage.Field) (in
f.Value = "" f.Value = ""
return timestamp, nil return timestamp, nil
} }
return time.Now().UnixNano(), nil return 0, nil
} }
func parseISO8601Timestamp(s string) (int64, error) { func parseISO8601Timestamp(s string) (int64, error) {
if s == "0" || s == "" {
// Special case for returning the current timestamp.
// It must be automatically converted to the current timestamp by the caller.
return 0, nil
}
t, err := time.Parse(time.RFC3339, s) t, err := time.Parse(time.RFC3339, s)
if err != nil { if err != nil {
return 0, fmt.Errorf("cannot parse timestamp %q: %w", s, err) return 0, fmt.Errorf("cannot parse timestamp %q: %w", s, err)

View file

@ -4,35 +4,32 @@ import (
"net/http" "net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
const msgField = "_msg"
var ( var (
lokiRequestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push"}`) lokiRequestsJSONTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="json"}`)
lokiRequestsProtobufTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="protobuf"}`)
) )
// RequestHandler processes ElasticSearch insert requests // RequestHandler processes Loki insert requests
//
// See https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki
func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool { func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
switch path { if path != "/api/v1/push" {
case "/api/v1/push":
contentType := r.Header.Get("Content-Type")
lokiRequestsTotal.Inc()
switch contentType {
case "application/x-protobuf":
return handleProtobuf(r, w)
case "application/json", "gzip":
return handleJSON(r, w)
default:
logger.Warnf("unsupported Content-Type=%q for %q request; skipping it", contentType, path)
return false
}
default:
return false return false
} }
contentType := r.Header.Get("Content-Type")
switch contentType {
case "application/json":
lokiRequestsJSONTotal.Inc()
return handleJSON(r, w)
default:
// Protobuf request body should be handled by default accoring to https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki
lokiRequestsProtobufTotal.Inc()
return handleProtobuf(r, w)
}
} }
func getCommonParams(r *http.Request) (*insertutils.CommonParams, error) { func getCommonParams(r *http.Request) (*insertutils.CommonParams, error) {

View file

@ -6,127 +6,185 @@ import (
"math" "math"
"net/http" "net/http"
"strconv" "strconv"
"time"
"github.com/valyala/fastjson" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
"github.com/valyala/fastjson"
) )
var ( var (
rowsIngestedTotalJSON = metrics.NewCounter(`vl_rows_ingested_total{type="loki", format="json"}`) rowsIngestedJSONTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="json"}`)
parserPool fastjson.ParserPool parserPool fastjson.ParserPool
) )
func handleJSON(r *http.Request, w http.ResponseWriter) bool { func handleJSON(r *http.Request, w http.ResponseWriter) bool {
contentType := r.Header.Get("Content-Type")
reader := r.Body reader := r.Body
if contentType == "gzip" { if r.Header.Get("Content-Encoding") == "gzip" {
zr, err := common.GetGzipReader(reader) zr, err := common.GetGzipReader(reader)
if err != nil { if err != nil {
httpserver.Errorf(w, r, "cannot read gzipped request: %s", err) httpserver.Errorf(w, r, "cannot initialize gzip reader: %s", err)
return true return true
} }
defer common.PutGzipReader(zr) defer common.PutGzipReader(zr)
reader = zr reader = zr
} }
wcr := writeconcurrencylimiter.GetReader(reader)
data, err := io.ReadAll(wcr)
writeconcurrencylimiter.PutReader(wcr)
if err != nil {
httpserver.Errorf(w, r, "cannot read request body: %s", err)
return true
}
cp, err := getCommonParams(r) cp, err := getCommonParams(r)
if err != nil { if err != nil {
httpserver.Errorf(w, r, "cannot parse request: %s", err) httpserver.Errorf(w, r, "cannot parse common params from request: %s", err)
return true return true
} }
lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields) lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields)
defer logstorage.PutLogRows(lr)
processLogMessage := cp.GetProcessLogMessageFunc(lr) processLogMessage := cp.GetProcessLogMessageFunc(lr)
n, err := processJSONRequest(reader, processLogMessage) n, err := parseJSONRequest(data, processLogMessage)
vlstorage.MustAddRows(lr)
logstorage.PutLogRows(lr)
if err != nil { if err != nil {
httpserver.Errorf(w, r, "cannot decode loki request: %s", err) httpserver.Errorf(w, r, "cannot parse Loki request: %s", err)
return true return true
} }
rowsIngestedTotalJSON.Add(n) rowsIngestedJSONTotal.Add(n)
return true return true
} }
func processJSONRequest(r io.Reader, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) { func parseJSONRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) {
wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr)
bytes, err := io.ReadAll(wcr)
if err != nil {
return 0, fmt.Errorf("cannot read request body: %w", err)
}
p := parserPool.Get() p := parserPool.Get()
defer parserPool.Put(p) defer parserPool.Put(p)
v, err := p.ParseBytes(bytes) v, err := p.ParseBytes(data)
if err != nil { if err != nil {
return 0, fmt.Errorf("cannot parse request body: %w", err) return 0, fmt.Errorf("cannot parse JSON request body: %w", err)
} }
streamsV := v.Get("streams")
if streamsV == nil {
return 0, fmt.Errorf("missing `streams` item in the parsed JSON: %q", v)
}
streams, err := streamsV.Array()
if err != nil {
return 0, fmt.Errorf("`streams` item in the parsed JSON must contain an array; got %q", streamsV)
}
currentTimestamp := time.Now().UnixNano()
var commonFields []logstorage.Field var commonFields []logstorage.Field
rowsIngested := 0 rowsIngested := 0
for stIdx, st := range v.GetArray("streams") { for _, stream := range streams {
// `stream` contains labels for the stream. // populate common labels from `stream` dict
// Labels are same for all entries in the stream. commonFields = commonFields[:0]
logFields := st.GetObject("stream") labelsV := stream.Get("stream")
if logFields == nil { var labels *fastjson.Object
logger.Warnf("missing streams field from %q", st) if labelsV != nil {
logFields = &fastjson.Object{} o, err := labelsV.Object()
}
commonFields = slicesutil.ResizeNoCopyMayOverallocate(commonFields, logFields.Len()+1)
i := 0
logFields.Visit(func(k []byte, v *fastjson.Value) {
sfName := bytesutil.ToUnsafeString(k)
sfValue := bytesutil.ToUnsafeString(v.GetStringBytes())
commonFields[i].Name = sfName
commonFields[i].Value = sfValue
i++
})
msgFieldIdx := logFields.Len()
commonFields[msgFieldIdx].Name = msgField
for idx, v := range st.GetArray("values") {
vs := v.GetArray()
if len(vs) != 2 {
return rowsIngested, fmt.Errorf("unexpected number of values in stream %d line %d: %q; got %d; want %d", stIdx, idx, v, len(vs), 2)
}
tsString := bytesutil.ToUnsafeString(vs[0].GetStringBytes())
ts, err := parseLokiTimestamp(tsString)
if err != nil { if err != nil {
return rowsIngested, fmt.Errorf("cannot parse timestamp in stream %d line %d: %q: %s", stIdx, idx, vs, err) return rowsIngested, fmt.Errorf("`stream` item in the parsed JSON must contain an object; got %q", labelsV)
}
labels = o
}
labels.Visit(func(k []byte, v *fastjson.Value) {
if err != nil {
return
}
vStr, errLocal := v.StringBytes()
if errLocal != nil {
err = fmt.Errorf("unexpected label value type for %q:%q; want string", k, v)
return
}
commonFields = append(commonFields, logstorage.Field{
Name: bytesutil.ToUnsafeString(k),
Value: bytesutil.ToUnsafeString(vStr),
})
})
if err != nil {
return rowsIngested, fmt.Errorf("error when parsing `stream` object: %w", err)
}
// populate messages from `values` array
linesV := stream.Get("values")
if linesV == nil {
return rowsIngested, fmt.Errorf("missing `values` item in the parsed JSON %q", stream)
}
lines, err := linesV.Array()
if err != nil {
return rowsIngested, fmt.Errorf("`values` item in the parsed JSON must contain an array; got %q", linesV)
}
fields := commonFields
for _, line := range lines {
lineA, err := line.Array()
if err != nil {
return rowsIngested, fmt.Errorf("unexpected contents of `values` item; want array; got %q", line)
}
if len(lineA) != 2 {
return rowsIngested, fmt.Errorf("unexpected number of values in `values` item array %q; got %d want 2", line, len(lineA))
} }
commonFields[msgFieldIdx].Value = bytesutil.ToUnsafeString(vs[1].GetStringBytes()) // parse timestamp
processLogMessage(ts, commonFields) timestamp, err := lineA[0].StringBytes()
if err != nil {
return rowsIngested, fmt.Errorf("unexpected log timestamp type for %q; want string", lineA[0])
}
ts, err := parseLokiTimestamp(bytesutil.ToUnsafeString(timestamp))
if err != nil {
return rowsIngested, fmt.Errorf("cannot parse log timestamp %q: %w", timestamp, err)
}
if ts == 0 {
ts = currentTimestamp
}
// parse log message
msg, err := lineA[1].StringBytes()
if err != nil {
return rowsIngested, fmt.Errorf("unexpected log message type for %q; want string", lineA[1])
}
fields = append(fields[:len(commonFields)], logstorage.Field{
Name: "_msg",
Value: bytesutil.ToUnsafeString(msg),
})
processLogMessage(ts, fields)
rowsIngested++
} }
rowsIngested += len(lines)
} }
return rowsIngested, nil return rowsIngested, nil
} }
func parseLokiTimestamp(s string) (int64, error) { func parseLokiTimestamp(s string) (int64, error) {
// Parsing timestamp in nanoseconds if s == "" {
// Special case - an empty timestamp must be substituted with the current time by the caller.
return 0, nil
}
n, err := strconv.ParseInt(s, 10, 64) n, err := strconv.ParseInt(s, 10, 64)
if err != nil { if err != nil {
return 0, fmt.Errorf("cannot parse timestamp in nanoseconds from %q: %w", s, err) // Fall back to parsing floating-point value
} f, err := strconv.ParseFloat(s, 64)
if n > int64(math.MaxInt64) { if err != nil {
return 0, fmt.Errorf("too big timestamp in nanoseconds: %d; mustn't exceed %d", n, math.MaxInt64) return 0, err
}
if f > math.MaxInt64 {
return 0, fmt.Errorf("too big timestamp in nanoseconds: %v; mustn't exceed %v", f, math.MaxInt64)
}
if f < math.MinInt64 {
return 0, fmt.Errorf("too small timestamp in nanoseconds: %v; must be bigger or equal to %v", f, math.MinInt64)
}
n = int64(f)
} }
if n < 0 { if n < 0 {
return 0, fmt.Errorf("too small timestamp in nanoseconds: %d; must be bigger than %d", n, 0) return 0, fmt.Errorf("too small timestamp in nanoseconds: %d; must be bigger than 0", n)
} }
return n, nil return n, nil
} }

View file

@ -1,99 +1,130 @@
package loki package loki
import ( import (
"reflect" "fmt"
"strings" "strings"
"testing" "testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
) )
func TestProcessJSONRequest(t *testing.T) { func TestParseJSONRequestFailure(t *testing.T) {
type item struct { f := func(s string) {
ts int64
fields []logstorage.Field
}
same := func(s string, expected []item) {
t.Helper() t.Helper()
r := strings.NewReader(s) n, err := parseJSONRequest([]byte(s), func(timestamp int64, fields []logstorage.Field) {
actual := make([]item, 0) t.Fatalf("unexpected call to parseJSONRequest callback!")
n, err := processJSONRequest(r, func(timestamp int64, fields []logstorage.Field) {
actual = append(actual, item{
ts: timestamp,
fields: fields,
})
}) })
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if len(actual) != len(expected) || n != len(expected) {
t.Fatalf("unexpected len(actual)=%d; expecting %d", len(actual), len(expected))
}
for i, actualItem := range actual {
expectedItem := expected[i]
if actualItem.ts != expectedItem.ts {
t.Fatalf("unexpected timestamp for item #%d; got %d; expecting %d", i, actualItem.ts, expectedItem.ts)
}
if !reflect.DeepEqual(actualItem.fields, expectedItem.fields) {
t.Fatalf("unexpected fields for item #%d; got %v; expecting %v", i, actualItem.fields, expectedItem.fields)
}
}
}
fail := func(s string) {
t.Helper()
r := strings.NewReader(s)
actual := make([]item, 0)
_, err := processJSONRequest(r, func(timestamp int64, fields []logstorage.Field) {
actual = append(actual, item{
ts: timestamp,
fields: fields,
})
})
if err == nil { if err == nil {
t.Fatalf("expected to fail with body: %q", s) t.Fatalf("expecting non-nil error")
}
if n != 0 {
t.Fatalf("unexpected number of parsed lines: %d; want 0", n)
} }
} }
f(``)
same(`{"streams":[{"stream":{"foo":"bar"},"values":[["1577836800000000000","baz"]]}]}`, []item{ // Invalid json
{ f(`{}`)
ts: 1577836800000000000, f(`[]`)
fields: []logstorage.Field{ f(`"foo"`)
{ f(`123`)
Name: "foo",
Value: "bar",
},
{
Name: "_msg",
Value: "baz",
},
},
},
})
fail(``) // invalid type for `streams` item
fail(`{"streams":[{"stream":{"foo" = "bar"},"values":[["1577836800000000000","baz"]]}]}`) f(`{"streams":123}`)
fail(`{"streams":[{"stream":{"foo": "bar"}`)
// Missing `values` item
f(`{"streams":[{}]}`)
// Invalid type for `values` item
f(`{"streams":[{"values":"foobar"}]}`)
// Invalid type for `stream` item
f(`{"streams":[{"stream":[],"values":[]}]}`)
// Invalid type for `values` individual item
f(`{"streams":[{"values":[123]}]}`)
// Invalid length of `values` individual item
f(`{"streams":[{"values":[[]]}]}`)
f(`{"streams":[{"values":[["123"]]}]}`)
f(`{"streams":[{"values":[["123","456","789"]]}]}`)
// Invalid type for timestamp inside `values` individual item
f(`{"streams":[{"values":[[123,"456"]}]}`)
// Invalid type for log message
f(`{"streams":[{"values":[["123",1234]]}]}`)
} }
func Test_parseLokiTimestamp(t *testing.T) { func TestParseJSONRequestSuccess(t *testing.T) {
f := func(s string, expected int64) { f := func(s string, resultExpected string) {
t.Helper() t.Helper()
actual, err := parseLokiTimestamp(s) var lines []string
n, err := parseJSONRequest([]byte(s), func(timestamp int64, fields []logstorage.Field) {
var a []string
for _, f := range fields {
a = append(a, f.String())
}
line := fmt.Sprintf("_time:%d %s", timestamp, strings.Join(a, " "))
lines = append(lines, line)
})
if err != nil { if err != nil {
t.Fatalf("unexpected error: %s", err) t.Fatalf("unexpected error: %s", err)
} }
if actual != expected { if n != len(lines) {
t.Fatalf("unexpected timestamp; got %d; expecting %d", actual, expected) t.Fatalf("unexpected number of lines parsed; got %d; want %d", n, len(lines))
}
result := strings.Join(lines, "\n")
if result != resultExpected {
t.Fatalf("unexpected result;\ngot\n%s\nwant\n%s", result, resultExpected)
} }
} }
f("1687510468000000000", 1687510468000000000) // Empty streams
f("1577836800000000000", 1577836800000000000) f(`{"streams":[]}`, ``)
f(`{"streams":[{"values":[]}]}`, ``)
f(`{"streams":[{"stream":{},"values":[]}]}`, ``)
f(`{"streams":[{"stream":{"foo":"bar"},"values":[]}]}`, ``)
// Empty stream labels
f(`{"streams":[{"values":[["1577836800000000001", "foo bar"]]}]}`, `_time:1577836800000000001 "_msg":"foo bar"`)
f(`{"streams":[{"stream":{},"values":[["1577836800000000001", "foo bar"]]}]}`, `_time:1577836800000000001 "_msg":"foo bar"`)
// Non-empty stream labels
f(`{"streams":[{"stream":{
"label1": "value1",
"label2": "value2"
},"values":[
["1577836800000000001", "foo bar"],
["1477836900005000002", "abc"],
["147.78369e9", "foobar"]
]}]}`, `_time:1577836800000000001 "label1":"value1" "label2":"value2" "_msg":"foo bar"
_time:1477836900005000002 "label1":"value1" "label2":"value2" "_msg":"abc"
_time:147783690000 "label1":"value1" "label2":"value2" "_msg":"foobar"`)
// Multiple streams
f(`{
"streams": [
{
"stream": {
"foo": "bar",
"a": "b"
},
"values": [
["1577836800000000001", "foo bar"],
["1577836900005000002", "abc"]
]
},
{
"stream": {
"x": "y"
},
"values": [
["1877836900005000002", "yx"]
]
}
]
}`, `_time:1577836800000000001 "foo":"bar" "a":"b" "_msg":"foo bar"
_time:1577836900005000002 "foo":"bar" "a":"b" "_msg":"abc"
_time:1877836900005000002 "x":"y" "_msg":"yx"`)
} }

View file

@ -3,71 +3,76 @@ package loki
import ( import (
"fmt" "fmt"
"strconv" "strconv"
"strings"
"testing" "testing"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
) )
func BenchmarkProcessJSONRequest(b *testing.B) { func BenchmarkParseJSONRequest(b *testing.B) {
for _, streams := range []int{5, 10} { for _, streams := range []int{5, 10} {
for _, rows := range []int{100, 1000} { for _, rows := range []int{100, 1000} {
for _, labels := range []int{10, 50} { for _, labels := range []int{10, 50} {
b.Run(fmt.Sprintf("streams_%d/rows_%d/labels_%d", streams, rows, labels), func(b *testing.B) { b.Run(fmt.Sprintf("streams_%d/rows_%d/labels_%d", streams, rows, labels), func(b *testing.B) {
benchmarkProcessJSONRequest(b, streams, rows, labels) benchmarkParseJSONRequest(b, streams, rows, labels)
}) })
} }
} }
} }
} }
func benchmarkProcessJSONRequest(b *testing.B, streams, rows, labels int) { func benchmarkParseJSONRequest(b *testing.B, streams, rows, labels int) {
s := getJSONBody(streams, rows, labels)
b.ReportAllocs() b.ReportAllocs()
b.SetBytes(int64(len(s))) b.SetBytes(int64(streams * rows))
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
data := getJSONBody(streams, rows, labels)
for pb.Next() { for pb.Next() {
_, err := processJSONRequest(strings.NewReader(s), func(timestamp int64, fields []logstorage.Field) {}) _, err := parseJSONRequest(data, func(timestamp int64, fields []logstorage.Field) {})
if err != nil { if err != nil {
b.Fatalf("unexpected error: %s", err) panic(fmt.Errorf("unexpected error: %s", err))
} }
} }
}) })
} }
func getJSONBody(streams, rows, labels int) string { func getJSONBody(streams, rows, labels int) []byte {
body := `{"streams":[` body := append([]byte{}, `{"streams":[`...)
now := time.Now().UnixNano() now := time.Now().UnixNano()
valuePrefix := fmt.Sprintf(`["%d","value_`, now) valuePrefix := fmt.Sprintf(`["%d","value_`, now)
for i := 0; i < streams; i++ { for i := 0; i < streams; i++ {
body += `{"stream":{` body = append(body, `{"stream":{`...)
for j := 0; j < labels; j++ { for j := 0; j < labels; j++ {
body += `"label_` + strconv.Itoa(j) + `":"value_` + strconv.Itoa(j) + `"` body = append(body, `"label_`...)
body = strconv.AppendInt(body, int64(j), 10)
body = append(body, `":"value_`...)
body = strconv.AppendInt(body, int64(j), 10)
body = append(body, '"')
if j < labels-1 { if j < labels-1 {
body += `,` body = append(body, ',')
} }
} }
body += `}, "values":[` body = append(body, `}, "values":[`...)
for j := 0; j < rows; j++ { for j := 0; j < rows; j++ {
body += valuePrefix + strconv.Itoa(j) + `"]` body = append(body, valuePrefix...)
body = strconv.AppendInt(body, int64(j), 10)
body = append(body, `"]`...)
if j < rows-1 { if j < rows-1 {
body += `,` body = append(body, ',')
} }
} }
body += `]}` body = append(body, `]}`...)
if i < streams-1 { if i < streams-1 {
body += `,` body = append(body, ',')
} }
} }
body += `]}` body = append(body, `]}`...)
return body return body
} }

View file

@ -4,68 +4,66 @@ import (
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"strconv"
"strings"
"sync" "sync"
"time"
"github.com/golang/snappy" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/metricsql"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
"github.com/golang/snappy"
) )
var ( var (
rowsIngestedTotalProtobuf = metrics.NewCounter(`vl_rows_ingested_total{type="loki", format="protobuf"}`) rowsIngestedProtobufTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="protobuf"}`)
bytesBufPool bytesutil.ByteBufferPool bytesBufPool bytesutil.ByteBufferPool
pushReqsPool sync.Pool pushReqsPool sync.Pool
) )
func handleProtobuf(r *http.Request, w http.ResponseWriter) bool { func handleProtobuf(r *http.Request, w http.ResponseWriter) bool {
wcr := writeconcurrencylimiter.GetReader(r.Body) wcr := writeconcurrencylimiter.GetReader(r.Body)
defer writeconcurrencylimiter.PutReader(wcr) data, err := io.ReadAll(wcr)
writeconcurrencylimiter.PutReader(wcr)
if err != nil {
httpserver.Errorf(w, r, "cannot read request body: %s", err)
return true
}
cp, err := getCommonParams(r) cp, err := getCommonParams(r)
if err != nil { if err != nil {
httpserver.Errorf(w, r, "cannot parse request: %s", err) httpserver.Errorf(w, r, "cannot parse common params from request: %s", err)
return true return true
} }
lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields) lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields)
defer logstorage.PutLogRows(lr)
processLogMessage := cp.GetProcessLogMessageFunc(lr) processLogMessage := cp.GetProcessLogMessageFunc(lr)
n, err := processProtobufRequest(wcr, processLogMessage) n, err := parseProtobufRequest(data, processLogMessage)
vlstorage.MustAddRows(lr)
logstorage.PutLogRows(lr)
if err != nil { if err != nil {
httpserver.Errorf(w, r, "cannot decode loki request: %s", err) httpserver.Errorf(w, r, "cannot parse loki request: %s", err)
return true return true
} }
rowsIngestedProtobufTotal.Add(n)
rowsIngestedTotalProtobuf.Add(n)
return true return true
} }
func processProtobufRequest(r io.Reader, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) { func parseProtobufRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) {
wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr)
bytes, err := io.ReadAll(wcr)
if err != nil {
return 0, fmt.Errorf("cannot read request body: %s", err)
}
bb := bytesBufPool.Get() bb := bytesBufPool.Get()
defer bytesBufPool.Put(bb) defer bytesBufPool.Put(bb)
bb.B, err = snappy.Decode(bb.B[:cap(bb.B)], bytes)
if err != nil {
return 0, fmt.Errorf("cannot decode snappy from request body: %s", err)
}
req := getPushReq() buf, err := snappy.Decode(bb.B[:cap(bb.B)], data)
defer putPushReq(req) if err != nil {
return 0, fmt.Errorf("cannot decode snappy-encoded request body: %w", err)
}
bb.B = buf
req := getPushRequest()
defer putPushRequest(req)
err = req.Unmarshal(bb.B) err = req.Unmarshal(bb.B)
if err != nil { if err != nil {
return 0, fmt.Errorf("cannot parse request body: %s", err) return 0, fmt.Errorf("cannot parse request body: %s", err)
@ -73,59 +71,93 @@ func processProtobufRequest(r io.Reader, processLogMessage func(timestamp int64,
var commonFields []logstorage.Field var commonFields []logstorage.Field
rowsIngested := 0 rowsIngested := 0
for stIdx, st := range req.Streams { streams := req.Streams
currentTimestamp := time.Now().UnixNano()
for i := range streams {
stream := &streams[i]
// st.Labels contains labels for the stream. // st.Labels contains labels for the stream.
// Labels are same for all entries in the stream. // Labels are same for all entries in the stream.
commonFields, err = parseLogFields(st.Labels, commonFields) commonFields, err = parsePromLabels(commonFields[:0], stream.Labels)
if err != nil { if err != nil {
return rowsIngested, fmt.Errorf("failed to unmarshal labels in stream %d: %q; %s", stIdx, st.Labels, err) return rowsIngested, fmt.Errorf("cannot parse stream labels %q: %s", stream.Labels, err)
} }
msgFieldIDx := len(commonFields) - 1 fields := commonFields
commonFields[msgFieldIDx].Name = msgField
for _, v := range st.Entries { entries := stream.Entries
commonFields[msgFieldIDx].Value = v.Line for j := range entries {
processLogMessage(v.Timestamp.UnixNano(), commonFields) entry := &entries[j]
rowsIngested++ fields = append(fields[:len(commonFields)], logstorage.Field{
Name: "_msg",
Value: entry.Line,
})
ts := entry.Timestamp.UnixNano()
if ts == 0 {
ts = currentTimestamp
}
processLogMessage(ts, fields)
} }
rowsIngested += len(stream.Entries)
} }
return rowsIngested, nil return rowsIngested, nil
} }
// Parses logs fields s and returns the corresponding log fields. // parsePromLabels parses log fields in Prometheus text exposition format from s, appends them to dst and returns the result.
// Cannot use searchutils.ParseMetricSelector here because its dependencies
// bring flags which clashes with logstorage flags.
// //
// Loki encodes labels in the PromQL labels format.
// See test data of promtail for examples: https://github.com/grafana/loki/blob/a24ef7b206e0ca63ee74ca6ecb0a09b745cd2258/pkg/push/types_test.go // See test data of promtail for examples: https://github.com/grafana/loki/blob/a24ef7b206e0ca63ee74ca6ecb0a09b745cd2258/pkg/push/types_test.go
func parseLogFields(s string, dst []logstorage.Field) ([]logstorage.Field, error) { func parsePromLabels(dst []logstorage.Field, s string) ([]logstorage.Field, error) {
expr, err := metricsql.Parse(s) // Make sure s is wrapped into `{...}`
if err != nil { s = strings.TrimSpace(s)
return nil, err if len(s) < 2 {
return nil, fmt.Errorf("too short string to parse: %q", s)
} }
if s[0] != '{' {
me, ok := expr.(*metricsql.MetricExpr) return nil, fmt.Errorf("missing `{` at the beginning of %q", s)
if !ok {
return nil, fmt.Errorf("failed to parse stream labels; got %q", expr.AppendString(nil))
} }
if s[len(s)-1] != '}' {
// Expecting only label filters without MetricsQL "or" operator. return nil, fmt.Errorf("missing `}` at the end of %q", s)
if len(me.LabelFilterss) != 1 {
return nil, fmt.Errorf("unexpected format of log fields; got %q", s)
} }
s = s[1 : len(s)-1]
// Allocate space for labels + msg field. for len(s) > 0 {
// Msg field is added by caller. // Parse label name
dst = slicesutil.ResizeNoCopyMayOverallocate(dst, len(me.LabelFilterss[0])) n := strings.IndexByte(s, '=')
for i, l := range me.LabelFilterss[0] { if n < 0 {
dst[i].Name = l.Label return nil, fmt.Errorf("cannot find `=` char for label value at %s", s)
dst[i].Value = l.Value }
name := s[:n]
s = s[n+1:]
// Parse label value
qs, err := strconv.QuotedPrefix(s)
if err != nil {
return nil, fmt.Errorf("cannot parse value for label %q at %s: %w", name, s, err)
}
s = s[len(qs):]
value, err := strconv.Unquote(qs)
if err != nil {
return nil, fmt.Errorf("cannot unquote value %q for label %q: %w", qs, name, err)
}
// Append the found field to dst.
dst = append(dst, logstorage.Field{
Name: name,
Value: value,
})
// Check whether there are other labels remaining
if len(s) == 0 {
break
}
if !strings.HasPrefix(s, ",") {
return nil, fmt.Errorf("missing `,` char at %s", s)
}
s = s[1:]
s = strings.TrimPrefix(s, " ")
} }
return dst, nil return dst, nil
} }
func getPushReq() *PushRequest { func getPushRequest() *PushRequest {
v := pushReqsPool.Get() v := pushReqsPool.Get()
if v == nil { if v == nil {
return &PushRequest{} return &PushRequest{}
@ -133,7 +165,7 @@ func getPushReq() *PushRequest {
return v.(*PushRequest) return v.(*PushRequest)
} }
func putPushReq(reqs *PushRequest) { func putPushRequest(req *PushRequest) {
reqs.Reset() req.Reset()
pushReqsPool.Put(reqs) pushReqsPool.Put(req)
} }

View file

@ -1,50 +1,171 @@
package loki package loki
import ( import (
"bytes" "fmt"
"strconv" "strings"
"testing" "testing"
"time" "time"
"github.com/golang/snappy"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/golang/snappy"
) )
func TestProcessProtobufRequest(t *testing.T) { func TestParseProtobufRequestSuccess(t *testing.T) {
body := getProtobufBody(5, 5, 5) f := func(s string, resultExpected string) {
t.Helper()
reader := bytes.NewReader(body) var pr PushRequest
_, err := processProtobufRequest(reader, func(timestamp int64, fields []logstorage.Field) {}) n, err := parseJSONRequest([]byte(s), func(timestamp int64, fields []logstorage.Field) {
if err != nil { msg := ""
t.Fatalf("unexpected error: %s", err) for _, f := range fields {
} if f.Name == "_msg" {
} msg = f.Value
}
func getProtobufBody(streams, rows, labels int) []byte {
var pr PushRequest
for i := 0; i < streams; i++ {
var st Stream
st.Labels = `{`
for j := 0; j < labels; j++ {
st.Labels += `label_` + strconv.Itoa(j) + `="value_` + strconv.Itoa(j) + `"`
if j < labels-1 {
st.Labels += `,`
} }
var a []string
for _, f := range fields {
if f.Name == "_msg" {
continue
}
item := fmt.Sprintf("%s=%q", f.Name, f.Value)
a = append(a, item)
}
labels := "{" + strings.Join(a, ", ") + "}"
pr.Streams = append(pr.Streams, Stream{
Labels: labels,
Entries: []Entry{
{
Timestamp: time.Unix(0, timestamp),
Line: msg,
},
},
})
})
if err != nil {
t.Fatalf("unexpected error: %s", err)
} }
st.Labels += `}` if n != len(pr.Streams) {
t.Fatalf("unexpected number of streams; got %d; want %d", len(pr.Streams), n)
for j := 0; j < rows; j++ {
st.Entries = append(st.Entries, Entry{Timestamp: time.Now(), Line: "value_" + strconv.Itoa(j)})
} }
pr.Streams = append(pr.Streams, st) data, err := pr.Marshal()
if err != nil {
t.Fatalf("unexpected error when marshaling PushRequest: %s", err)
}
encodedData := snappy.Encode(nil, data)
var lines []string
n, err = parseProtobufRequest(encodedData, func(timestamp int64, fields []logstorage.Field) {
var a []string
for _, f := range fields {
a = append(a, f.String())
}
line := fmt.Sprintf("_time:%d %s", timestamp, strings.Join(a, " "))
lines = append(lines, line)
})
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if n != len(lines) {
t.Fatalf("unexpected number of lines parsed; got %d; want %d", n, len(lines))
}
result := strings.Join(lines, "\n")
if result != resultExpected {
t.Fatalf("unexpected result;\ngot\n%s\nwant\n%s", result, resultExpected)
}
} }
body, _ := pr.Marshal() // Empty streams
encodedBody := snappy.Encode(nil, body) f(`{"streams":[]}`, ``)
f(`{"streams":[{"values":[]}]}`, ``)
f(`{"streams":[{"stream":{},"values":[]}]}`, ``)
f(`{"streams":[{"stream":{"foo":"bar"},"values":[]}]}`, ``)
return encodedBody // Empty stream labels
f(`{"streams":[{"values":[["1577836800000000001", "foo bar"]]}]}`, `_time:1577836800000000001 "_msg":"foo bar"`)
f(`{"streams":[{"stream":{},"values":[["1577836800000000001", "foo bar"]]}]}`, `_time:1577836800000000001 "_msg":"foo bar"`)
// Non-empty stream labels
f(`{"streams":[{"stream":{
"label1": "value1",
"label2": "value2"
},"values":[
["1577836800000000001", "foo bar"],
["1477836900005000002", "abc"],
["147.78369e9", "foobar"]
]}]}`, `_time:1577836800000000001 "label1":"value1" "label2":"value2" "_msg":"foo bar"
_time:1477836900005000002 "label1":"value1" "label2":"value2" "_msg":"abc"
_time:147783690000 "label1":"value1" "label2":"value2" "_msg":"foobar"`)
// Multiple streams
f(`{
"streams": [
{
"stream": {
"foo": "bar",
"a": "b"
},
"values": [
["1577836800000000001", "foo bar"],
["1577836900005000002", "abc"]
]
},
{
"stream": {
"x": "y"
},
"values": [
["1877836900005000002", "yx"]
]
}
]
}`, `_time:1577836800000000001 "foo":"bar" "a":"b" "_msg":"foo bar"
_time:1577836900005000002 "foo":"bar" "a":"b" "_msg":"abc"
_time:1877836900005000002 "x":"y" "_msg":"yx"`)
}
func TestParsePromLabelsSuccess(t *testing.T) {
f := func(s string) {
t.Helper()
fields, err := parsePromLabels(nil, s)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
var a []string
for _, f := range fields {
a = append(a, fmt.Sprintf("%s=%q", f.Name, f.Value))
}
result := "{" + strings.Join(a, ", ") + "}"
if result != s {
t.Fatalf("unexpected result;\ngot\n%s\nwant\n%s", result, s)
}
}
f("{}")
f(`{foo="bar"}`)
f(`{foo="bar", baz="x", y="z"}`)
f(`{foo="ba\"r\\z\n", a="", b="\"\\"}`)
}
func TestParsePromLabelsFailure(t *testing.T) {
f := func(s string) {
t.Helper()
fields, err := parsePromLabels(nil, s)
if err == nil {
t.Fatalf("expecting non-nil error")
}
if len(fields) > 0 {
t.Fatalf("unexpected non-empty fields: %s", fields)
}
}
f("")
f("{")
f(`{foo}`)
f(`{foo=bar}`)
f(`{foo="bar}`)
f(`{foo="ba\",r}`)
f(`{foo="bar" baz="aa"}`)
f(`foobar`)
f(`foo{bar="baz"}`)
} }

View file

@ -1,35 +1,65 @@
package loki package loki
import ( import (
"bytes"
"fmt" "fmt"
"strconv"
"testing" "testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/golang/snappy"
) )
func BenchmarkProcessProtobufRequest(b *testing.B) { func BenchmarkParseProtobufRequest(b *testing.B) {
for _, streams := range []int{5, 10} { for _, streams := range []int{5, 10} {
for _, rows := range []int{100, 1000} { for _, rows := range []int{100, 1000} {
for _, labels := range []int{10, 50} { for _, labels := range []int{10, 50} {
b.Run(fmt.Sprintf("streams_%d/rows_%d/labels_%d", streams, rows, labels), func(b *testing.B) { b.Run(fmt.Sprintf("streams_%d/rows_%d/labels_%d", streams, rows, labels), func(b *testing.B) {
benchmarkProcessProtobufRequest(b, streams, rows, labels) benchmarkParseProtobufRequest(b, streams, rows, labels)
}) })
} }
} }
} }
} }
func benchmarkProcessProtobufRequest(b *testing.B, streams, rows, labels int) { func benchmarkParseProtobufRequest(b *testing.B, streams, rows, labels int) {
body := getProtobufBody(streams, rows, labels)
b.ReportAllocs() b.ReportAllocs()
b.SetBytes(int64(len(body))) b.SetBytes(int64(streams * rows))
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
body := getProtobufBody(streams, rows, labels)
for pb.Next() { for pb.Next() {
_, err := processProtobufRequest(bytes.NewBuffer(body), func(timestamp int64, fields []logstorage.Field) {}) _, err := parseProtobufRequest(body, func(timestamp int64, fields []logstorage.Field) {})
if err != nil { if err != nil {
b.Fatalf("unexpected error: %s", err) panic(fmt.Errorf("unexpected error: %s", err))
} }
} }
}) })
} }
func getProtobufBody(streams, rows, labels int) []byte {
var pr PushRequest
for i := 0; i < streams; i++ {
var st Stream
st.Labels = `{`
for j := 0; j < labels; j++ {
st.Labels += `label_` + strconv.Itoa(j) + `="value_` + strconv.Itoa(j) + `"`
if j < labels-1 {
st.Labels += `,`
}
}
st.Labels += `}`
for j := 0; j < rows; j++ {
st.Entries = append(st.Entries, Entry{Timestamp: time.Now(), Line: "value_" + strconv.Itoa(j)})
}
pr.Streams = append(pr.Streams, st)
}
body, _ := pr.Marshal()
encodedBody := snappy.Encode(nil, body)
return encodedBody
}

View file

@ -6,7 +6,7 @@ positions:
filename: /tmp/positions.yaml filename: /tmp/positions.yaml
clients: clients:
- url: http://vlogs:9428/insert/loki/api/v1/push?_stream_fields=filename,job,stream,host,app,pid - url: http://vlogs:9428/insert/loki/api/v1/push?_stream_fields=instance,job,host,app,pid
tenant_id: "0:0" tenant_id: "0:0"
scrape_configs: scrape_configs:

View file

@ -13,7 +13,7 @@ services:
# Run `make package-victoria-logs` to build victoria-logs image # Run `make package-victoria-logs` to build victoria-logs image
vlogs: vlogs:
image: docker.io/victoriametrics/victoria-logs:latest image: docker.io/victoriametrics/victoria-logs:v0.2.0-victorialogs
volumes: volumes:
- victorialogs-promtail-docker:/vlogs - victorialogs-promtail-docker:/vlogs
ports: ports:

View file

@ -1,16 +1,20 @@
# Promtail setup # Promtail setup
[Promtail](https://grafana.com/docs/loki/latest/clients/promtail/) is a default log shipper for Grafana Loki.
Promtail can be configured to send the collected logs to VictoriaLogs according to the following docs.
Specify [`clients`](https://grafana.com/docs/loki/latest/clients/promtail/configuration/#clients) section in the configuration file Specify [`clients`](https://grafana.com/docs/loki/latest/clients/promtail/configuration/#clients) section in the configuration file
for sending the collected logs to [VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/): for sending the collected logs to [VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/):
```yaml ```yaml
clients: clients:
- url: http://vlogs:9428/insert/loki/api/v1/push?_stream_fields=filename,job,stream,host,app,pid - url: http://localhost:9428/insert/loki/api/v1/push?_stream_fields=instance,job,host,app,pid
``` ```
Substitute `vlogs:9428` address inside `clients` with the real TCP address of VictoriaLogs. Substitute `localhost:9428` address inside `clients` with the real TCP address of VictoriaLogs.
See [these docs](https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/#http-parameters) for details on the used URL query parameter section. See [these docs](https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/#http-parameters) for details on the used URL query parameter section.
There is no need in specifying `_msg_field` and `_time_field` query args, since VictoriaLogs automatically extracts log message and timestamp from the ingested Loki data.
It is recommended verifying whether the initial setup generates the needed [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) It is recommended verifying whether the initial setup generates the needed [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model)
and uses the correct [stream fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields). and uses the correct [stream fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields).
@ -19,27 +23,28 @@ and inspecting VictoriaLogs logs then:
```yaml ```yaml
clients: clients:
- url: http://vlogs:9428/insert/loki/api/v1/push?_stream_fields=filename,job,stream,host,app,pid&debug=1 - url: http://localhost:9428/insert/loki/api/v1/push?_stream_fields=instance,job,host,app,pid&debug=1
``` ```
If some [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) must be skipped If some [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) must be skipped
during data ingestion, then they can be put into `ignore_fields` [parameter](https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/#http-parameters). during data ingestion, then they can be put into `ignore_fields` [parameter](https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/#http-parameters).
For example, the following config instructs VictoriaLogs to ignore `log.offset` and `event.original` fields in the ingested logs: For example, the following config instructs VictoriaLogs to ignore `filename` and `stream` fields in the ingested logs:
```yaml ```yaml
clients: clients:
- url: http://vlogs:9428/insert/loki/api/v1/push?_stream_fields=filename,job,stream,host,app,pid&debug=1 - url: http://localhost:9428/insert/loki/api/v1/push?_stream_fields=instance,job,host,app,pid&ignore_fields=filename,stream
``` ```
By default the ingested logs are stored in the `(AccountID=0, ProjectID=0)` [tenant](https://docs.victoriametrics.com/VictoriaLogs/#multitenancy). By default the ingested logs are stored in the `(AccountID=0, ProjectID=0)` [tenant](https://docs.victoriametrics.com/VictoriaLogs/#multitenancy).
If you need storing logs in other tenant, then It is possible to either use `tenant_id` provided by Loki configuration, or to use `headers` and provide If you need storing logs in other tenant, then specify the needed tenant via `tenant_id` field
`AccountID` and `ProjectID` headers. Format for `tenant_id` is `AccountID:ProjectID`. in the [Loki client configuration](https://grafana.com/docs/loki/latest/clients/promtail/configuration/#clients)
For example, the following config instructs VictoriaLogs to store logs in the `(AccountID=12, ProjectID=12)` tenant: The `tenant_id` must have `AccountID:ProjectID` format, where `AccountID` and `ProjectID` are arbitrary uint32 numbers.
For example, the following config instructs VictoriaLogs to store logs in the `(AccountID=12, ProjectID=34)` [tenant](https://docs.victoriametrics.com/VictoriaLogs/#multitenancy):
```yaml ```yaml
clients: clients:
- url: http://vlogs:9428/insert/loki/api/v1/push?_stream_fields=filename,job,stream,host,app,pid&debug=1 - url: http://localhost:9428/insert/loki/api/v1/push?_stream_fields=instance,job,host,app,pid&debug=1
tenant_id: "12:12" tenant_id: "12:34"
``` ```
The ingested log entries can be queried according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/querying/). The ingested log entries can be queried according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/querying/).

View file

@ -17,7 +17,7 @@ menu:
- Fluentbit. See [how to setup Fluentbit for sending logs to VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/Fluentbit.html). - Fluentbit. See [how to setup Fluentbit for sending logs to VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/Fluentbit.html).
- Logstash. See [how to setup Logstash for sending logs to VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/Logstash.html). - Logstash. See [how to setup Logstash for sending logs to VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/Logstash.html).
- Vector. See [how to setup Vector for sending logs to VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/Vector.html). - Vector. See [how to setup Vector for sending logs to VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/Vector.html).
- Promtail. See [how to setup Promtail for sending logs to VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/Promtail.html). - Promtail (aka Grafana Loki). See [how to setup Promtail for sending logs to VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/Promtail.html).
The ingested logs can be queried according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/querying/). The ingested logs can be queried according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/querying/).
@ -33,7 +33,7 @@ VictoriaLogs supports the following data ingestion HTTP APIs:
- Elasticsearch bulk API. See [these docs](#elasticsearch-bulk-api). - Elasticsearch bulk API. See [these docs](#elasticsearch-bulk-api).
- JSON stream API aka [ndjson](http://ndjson.org/). See [these docs](#json-stream-api). - JSON stream API aka [ndjson](http://ndjson.org/). See [these docs](#json-stream-api).
- [Loki JSON API](https://grafana.com/docs/loki/latest/api/#push-log-entries-to-lokiq). See [these docs](#loki-json-api). - Loki JSON API. See [these docs](#loki-json-api).
VictoriaLogs accepts optional [HTTP parameters](#http-parameters) at data ingestion HTTP APIs. VictoriaLogs accepts optional [HTTP parameters](#http-parameters) at data ingestion HTTP APIs.
@ -47,12 +47,18 @@ The following command pushes a single log line to VictoriaLogs:
```bash ```bash
echo '{"create":{}} echo '{"create":{}}
{"_msg":"cannot open file","_time":"2023-06-21T04:24:24Z","host.name":"host123"} {"_msg":"cannot open file","_time":"0","host.name":"host123"}
' | curl -X POST -H 'Content-Type: application/json' --data-binary @- http://localhost:9428/insert/elasticsearch/_bulk ' | curl -X POST -H 'Content-Type: application/json' --data-binary @- http://localhost:9428/insert/elasticsearch/_bulk
``` ```
It is possible to push thousands of log lines in a single request to this API. It is possible to push thousands of log lines in a single request to this API.
If the [timestamp field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field) is set to `"0"`,
then the current timestamp at VictoriaLogs side is used per each ingested log line.
Otherwise the timestamp field must be in the [ISO8601](https://en.wikipedia.org/wiki/ISO_8601) format. For example, `2023-06-20T15:32:10Z`.
Optional fractional part of seconds can be specified after the dot - `2023-06-20T15:32:10.123Z`.
Timezone can be specified instead of `Z` suffix - `2023-06-20T15:32:10+02:00`.
See [these docs](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) for details on fields, See [these docs](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) for details on fields,
which must be present in the ingested log messages. which must be present in the ingested log messages.
@ -88,17 +94,18 @@ VictoriaLogs accepts JSON line stream aka [ndjson](http://ndjson.org/) at `http:
The following command pushes multiple log lines to VictoriaLogs: The following command pushes multiple log lines to VictoriaLogs:
```bash ```bash
echo '{ "log": { "level": "info", "message": "hello world" }, "date": "2023-06-20T15:31:23Z", "stream": "stream1" } echo '{ "log": { "level": "info", "message": "hello world" }, "date": "0", "stream": "stream1" }
{ "log": { "level": "error", "message": "oh no!" }, "date": "2023-06-20T15:32:10.567Z", "stream": "stream1" } { "log": { "level": "error", "message": "oh no!" }, "date": "0", "stream": "stream1" }
{ "log": { "level": "info", "message": "hello world" }, "date": "2023-06-20T15:35:11.567890+02:00", "stream": "stream2" } { "log": { "level": "info", "message": "hello world" }, "date": "0", "stream": "stream2" }
' | curl -X POST -H 'Content-Type: application/stream+json' --data-binary @- \ ' | curl -X POST -H 'Content-Type: application/stream+json' --data-binary @- \
'http://localhost:9428/insert/jsonline?_stream_fields=stream&_time_field=date&_msg_field=log.message' 'http://localhost:9428/insert/jsonline?_stream_fields=stream&_time_field=date&_msg_field=log.message'
``` ```
It is possible to push unlimited number of log lines in a single request to this API. It is possible to push unlimited number of log lines in a single request to this API.
The [timestamp field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field) must be If the [timestamp field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field) is set to `"0"`,
in the [ISO8601](https://en.wikipedia.org/wiki/ISO_8601) format. For example, `2023-06-20T15:32:10Z`. then the current timestamp at VictoriaLogs side is used per each ingested log line.
Otherwise the timestamp field must be in the [ISO8601](https://en.wikipedia.org/wiki/ISO_8601) format. For example, `2023-06-20T15:32:10Z`.
Optional fractional part of seconds can be specified after the dot - `2023-06-20T15:32:10.123Z`. Optional fractional part of seconds can be specified after the dot - `2023-06-20T15:32:10.123Z`.
Timezone can be specified instead of `Z` suffix - `2023-06-20T15:32:10+02:00`. Timezone can be specified instead of `Z` suffix - `2023-06-20T15:32:10+02:00`.
@ -134,15 +141,42 @@ See also:
### Loki JSON API ### Loki JSON API
VictoriaLogs accepts logs in [Loki JSON API](https://grafana.com/docs/loki/latest/api/#push-log-entries-to-lokiq) format at `http://localhost:9428/insert/loki/api/v1/push` endpoint. VictoriaLogs accepts logs in [Loki JSON API](https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki) format at `http://localhost:9428/insert/loki/api/v1/push` endpoint.
The following command pushes a single log line to Loki JSON API at VictoriaLogs: The following command pushes a single log line to Loki JSON API at VictoriaLogs:
```bash ```bash
curl -v -H "Content-Type: application/json" -XPOST -s "http://localhost:9428/insert/loki/api/v1/push?_stream_fields=foo" --data-raw \ curl -H "Content-Type: application/json" -XPOST "http://localhost:9428/insert/loki/api/v1/push?_stream_fields=instance,job" --data-raw \
'{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}' '{"streams": [{ "stream": { "instance": "host123", "job": "app42" }, "values": [ [ "0", "foo fizzbuzz bar" ] ] }]}'
``` ```
It is possible to push thousands of log streams and log lines in a single request to this API.
The API accepts various http parameters, which can change the data ingestion behavior - [these docs](#http-parameters) for details.
The following command verifies that the data has been successfully ingested into VictoriaLogs by [querying](https://docs.victoriametrics.com/VictoriaLogs/querying/) it:
```bash
curl http://localhost:9428/select/logsql/query -d 'query=fizzbuzz'
```
The command should return the following response:
```bash
{"_msg":"foo fizzbuzz bar","_stream":"{instance=\"host123\",job=\"app42\"}","_time":"2023-07-20T23:01:19.288676497Z"}
```
The response by default contains [`_msg`](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#message-field),
[`_stream`](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields) and
[`_time`](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field) fields plus the explicitly mentioned fields.
See [these docs](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#querying-specific-fields) for details.
See also:
- [How to debug data ingestion](#troubleshooting).
- [HTTP parameters, which can be passed to the API](#http-parameters).
- [How to query VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/querying.html).
### HTTP parameters ### HTTP parameters
VictoriaLogs accepts the following parameters at [data ingestion HTTP APIs](#http-apis): VictoriaLogs accepts the following parameters at [data ingestion HTTP APIs](#http-apis):

View file

@ -230,6 +230,7 @@ func (lr *LogRows) GetRowString(idx int) string {
// GetLogRows returns LogRows from the pool for the given streamFields. // GetLogRows returns LogRows from the pool for the given streamFields.
// //
// streamFields is a set of field names, which must be associated with the stream. // streamFields is a set of field names, which must be associated with the stream.
// ignoreFields is a set of field names, which must be ignored during data ingestion.
// //
// Return back it to the pool with PutLogRows() when it is no longer needed. // Return back it to the pool with PutLogRows() when it is no longer needed.
func GetLogRows(streamFields, ignoreFields []string) *LogRows { func GetLogRows(streamFields, ignoreFields []string) *LogRows {

View file

@ -87,7 +87,7 @@ func GetTenantIDFromString(s string) (TenantID, error) {
if colon < 0 { if colon < 0 {
account, err := getUint32FromString(s) account, err := getUint32FromString(s)
if err != nil { if err != nil {
return tenantID, fmt.Errorf("cannot parse %q as TenantID: %w", s, err) return tenantID, fmt.Errorf("cannot parse accountID from %q: %w", s, err)
} }
tenantID.AccountID = account tenantID.AccountID = account
@ -96,13 +96,13 @@ func GetTenantIDFromString(s string) (TenantID, error) {
account, err := getUint32FromString(s[:colon]) account, err := getUint32FromString(s[:colon])
if err != nil { if err != nil {
return tenantID, fmt.Errorf("cannot parse %q as TenantID: %w", s, err) return tenantID, fmt.Errorf("cannot parse accountID part from %q: %w", s, err)
} }
tenantID.AccountID = account tenantID.AccountID = account
project, err := getUint32FromString(s[colon+1:]) project, err := getUint32FromString(s[colon+1:])
if err != nil { if err != nil {
return tenantID, fmt.Errorf("cannot parse %q as TenantID: %w", s, err) return tenantID, fmt.Errorf("cannot parse projectID part from %q: %w", s, err)
} }
tenantID.ProjectID = project tenantID.ProjectID = project

View file

@ -1,20 +0,0 @@
package slicesutil
import "math/bits"
// ResizeNoCopyMayOverallocate resizes dst to minimum n bytes and returns the resized buffer (which may be newly allocated).
//
// If newly allocated buffer is returned then b contents isn't copied to it.
func ResizeNoCopyMayOverallocate[T any](dst []T, n int) []T {
if n <= cap(dst) {
return dst[:n]
}
nNew := roundToNearestPow2(n)
dstNew := make([]T, nNew)
return dstNew[:n]
}
func roundToNearestPow2(n int) int {
pow2 := uint8(bits.Len(uint(n - 1)))
return 1 << pow2
}