From f548adce0bdecb28ea0b07e9cc8e8b65d27ffc21 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 20 Jul 2023 16:21:47 -0700 Subject: [PATCH] app/vlinsert/loki: follow-up after 09df5b66fd7bf33f171d6179748e6f32394ef56e - Parse protobuf if Content-Type isn't set to `application/json` - this behavior is documented at https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki - Properly handle gzip'ped JSON requests. The `gzip` header must be read from `Content-Encoding` instead of `Content-Type` header - Properly flush all the parsed logs with the explicit call to vlstorage.MustAddRows() at the end of query handler - Check JSON field types more strictly. - Allow parsing Loki timestamp as floating-point number. Such a timestamp can be generated by some clients, which store timestamps in float64 instead of int64. - Optimize parsing of Loki labels in Prometheus text exposition format. - Simplify tests. - Remove lib/slicesutil, since there are no more users for it. - Update docs with missing info and fix various typos. For example, it should be enough to have `instance` and `job` labels as stream fields in most Loki setups. - Allow empty of missing timestamps in the ingested logs. The current timestamp at VictoriaLogs side is then used for the ingested logs. This simplifies debugging and testing of the provided HTTP-based data ingestion APIs. The remaining MAJOR issue, which needs to be addressed: victoria-logs binary size increased from 13MB to 22MB after adding support for Loki data ingestion protocol at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4482 . This is because of shitty protobuf dependencies. They must be replaced with another protobuf implementation similar to the one used at lib/prompb or lib/prompbmarshal . --- .gitignore | 2 +- app/vlinsert/elasticsearch/elasticsearch.go | 14 +- app/vlinsert/jsonline/jsonline.go | 14 +- app/vlinsert/loki/loki.go | 35 ++-- app/vlinsert/loki/loki_json.go | 188 ++++++++++++------ app/vlinsert/loki/loki_json_test.go | 177 ++++++++++------- app/vlinsert/loki/loki_json_timing_test.go | 43 ++-- app/vlinsert/loki/loki_protobuf.go | 166 +++++++++------- app/vlinsert/loki/loki_protobuf_test.go | 187 ++++++++++++++--- .../loki/loki_protobuf_timing_test.go | 46 ++++- .../docker/victorialogs/promtail/config.yml | 2 +- .../victorialogs/promtail/docker-compose.yml | 2 +- docs/VictoriaLogs/data-ingestion/Promtail.md | 25 ++- docs/VictoriaLogs/data-ingestion/README.md | 56 +++++- lib/logstorage/log_rows.go | 1 + lib/logstorage/tenant_id.go | 6 +- lib/slicesutil/resize.go | 20 -- 17 files changed, 647 insertions(+), 337 deletions(-) delete mode 100644 lib/slicesutil/resize.go diff --git a/.gitignore b/.gitignore index f06c7eeea..96d97c523 100644 --- a/.gitignore +++ b/.gitignore @@ -8,11 +8,11 @@ *.test *.swp /gocache-for-docker +/victoria-logs-data /victoria-metrics-data /vmagent-remotewrite-data /vmstorage-data /vmselect-cache -/victoria-logs-data /package/temp-deb-* /package/temp-rpm-* /package/*.deb diff --git a/app/vlinsert/elasticsearch/elasticsearch.go b/app/vlinsert/elasticsearch/elasticsearch.go index 7f1a22956..c11d8b398 100644 --- a/app/vlinsert/elasticsearch/elasticsearch.go +++ b/app/vlinsert/elasticsearch/elasticsearch.go @@ -199,12 +199,15 @@ func readBulkLine(sc *bufio.Scanner, timeField, msgField string, return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err) } - timestamp, err := extractTimestampFromFields(timeField, p.Fields) + ts, err := extractTimestampFromFields(timeField, p.Fields) if err != nil { return false, fmt.Errorf("cannot parse timestamp: %w", err) } + if ts == 0 { + ts = time.Now().UnixNano() + } p.RenameField(msgField, "_msg") - processLogMessage(timestamp, p.Fields) + processLogMessage(ts, p.Fields) logjson.PutParser(p) return true, nil } @@ -222,10 +225,15 @@ func extractTimestampFromFields(timeField string, fields []logstorage.Field) (in f.Value = "" return timestamp, nil } - return time.Now().UnixNano(), nil + return 0, nil } func parseElasticsearchTimestamp(s string) (int64, error) { + if s == "0" || s == "" { + // Special case - zero or empty timestamp must be substituted + // with the current time by the caller. + return 0, nil + } if len(s) < len("YYYY-MM-DD") || s[len("YYYY")] != '-' { // Try parsing timestamp in milliseconds n, err := strconv.ParseInt(s, 10, 64) diff --git a/app/vlinsert/jsonline/jsonline.go b/app/vlinsert/jsonline/jsonline.go index 373bc0129..1b901db36 100644 --- a/app/vlinsert/jsonline/jsonline.go +++ b/app/vlinsert/jsonline/jsonline.go @@ -99,12 +99,15 @@ func readLine(sc *bufio.Scanner, timeField, msgField string, processLogMessage f if err := p.ParseLogMessage(line); err != nil { return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err) } - timestamp, err := extractTimestampFromFields(timeField, p.Fields) + ts, err := extractTimestampFromFields(timeField, p.Fields) if err != nil { return false, fmt.Errorf("cannot parse timestamp: %w", err) } + if ts == 0 { + ts = time.Now().UnixNano() + } p.RenameField(msgField, "_msg") - processLogMessage(timestamp, p.Fields) + processLogMessage(ts, p.Fields) logjson.PutParser(p) return true, nil } @@ -122,10 +125,15 @@ func extractTimestampFromFields(timeField string, fields []logstorage.Field) (in f.Value = "" return timestamp, nil } - return time.Now().UnixNano(), nil + return 0, nil } func parseISO8601Timestamp(s string) (int64, error) { + if s == "0" || s == "" { + // Special case for returning the current timestamp. + // It must be automatically converted to the current timestamp by the caller. + return 0, nil + } t, err := time.Parse(time.RFC3339, s) if err != nil { return 0, fmt.Errorf("cannot parse timestamp %q: %w", s, err) diff --git a/app/vlinsert/loki/loki.go b/app/vlinsert/loki/loki.go index b7450b0b5..878616328 100644 --- a/app/vlinsert/loki/loki.go +++ b/app/vlinsert/loki/loki.go @@ -4,35 +4,32 @@ import ( "net/http" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" "github.com/VictoriaMetrics/metrics" ) -const msgField = "_msg" - var ( - lokiRequestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push"}`) + lokiRequestsJSONTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="json"}`) + lokiRequestsProtobufTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="protobuf"}`) ) -// RequestHandler processes ElasticSearch insert requests +// RequestHandler processes Loki insert requests +// +// See https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool { - switch path { - case "/api/v1/push": - contentType := r.Header.Get("Content-Type") - lokiRequestsTotal.Inc() - switch contentType { - case "application/x-protobuf": - return handleProtobuf(r, w) - case "application/json", "gzip": - return handleJSON(r, w) - default: - logger.Warnf("unsupported Content-Type=%q for %q request; skipping it", contentType, path) - return false - } - default: + if path != "/api/v1/push" { return false } + contentType := r.Header.Get("Content-Type") + switch contentType { + case "application/json": + lokiRequestsJSONTotal.Inc() + return handleJSON(r, w) + default: + // Protobuf request body should be handled by default accoring to https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki + lokiRequestsProtobufTotal.Inc() + return handleProtobuf(r, w) + } } func getCommonParams(r *http.Request) (*insertutils.CommonParams, error) { diff --git a/app/vlinsert/loki/loki_json.go b/app/vlinsert/loki/loki_json.go index 874f92cf4..5ace832b0 100644 --- a/app/vlinsert/loki/loki_json.go +++ b/app/vlinsert/loki/loki_json.go @@ -6,127 +6,185 @@ import ( "math" "net/http" "strconv" + "time" - "github.com/valyala/fastjson" - - "github.com/VictoriaMetrics/metrics" - + "github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" + "github.com/VictoriaMetrics/metrics" + "github.com/valyala/fastjson" ) var ( - rowsIngestedTotalJSON = metrics.NewCounter(`vl_rows_ingested_total{type="loki", format="json"}`) + rowsIngestedJSONTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="json"}`) parserPool fastjson.ParserPool ) func handleJSON(r *http.Request, w http.ResponseWriter) bool { - contentType := r.Header.Get("Content-Type") reader := r.Body - if contentType == "gzip" { + if r.Header.Get("Content-Encoding") == "gzip" { zr, err := common.GetGzipReader(reader) if err != nil { - httpserver.Errorf(w, r, "cannot read gzipped request: %s", err) + httpserver.Errorf(w, r, "cannot initialize gzip reader: %s", err) return true } defer common.PutGzipReader(zr) reader = zr } + wcr := writeconcurrencylimiter.GetReader(reader) + data, err := io.ReadAll(wcr) + writeconcurrencylimiter.PutReader(wcr) + if err != nil { + httpserver.Errorf(w, r, "cannot read request body: %s", err) + return true + } + cp, err := getCommonParams(r) if err != nil { - httpserver.Errorf(w, r, "cannot parse request: %s", err) + httpserver.Errorf(w, r, "cannot parse common params from request: %s", err) return true } lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields) - defer logstorage.PutLogRows(lr) - processLogMessage := cp.GetProcessLogMessageFunc(lr) - n, err := processJSONRequest(reader, processLogMessage) + n, err := parseJSONRequest(data, processLogMessage) + vlstorage.MustAddRows(lr) + logstorage.PutLogRows(lr) if err != nil { - httpserver.Errorf(w, r, "cannot decode loki request: %s", err) + httpserver.Errorf(w, r, "cannot parse Loki request: %s", err) return true } - rowsIngestedTotalJSON.Add(n) + rowsIngestedJSONTotal.Add(n) return true } -func processJSONRequest(r io.Reader, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) { - wcr := writeconcurrencylimiter.GetReader(r) - defer writeconcurrencylimiter.PutReader(wcr) - - bytes, err := io.ReadAll(wcr) - if err != nil { - return 0, fmt.Errorf("cannot read request body: %w", err) - } - +func parseJSONRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) { p := parserPool.Get() defer parserPool.Put(p) - v, err := p.ParseBytes(bytes) + v, err := p.ParseBytes(data) if err != nil { - return 0, fmt.Errorf("cannot parse request body: %w", err) + return 0, fmt.Errorf("cannot parse JSON request body: %w", err) } + streamsV := v.Get("streams") + if streamsV == nil { + return 0, fmt.Errorf("missing `streams` item in the parsed JSON: %q", v) + } + streams, err := streamsV.Array() + if err != nil { + return 0, fmt.Errorf("`streams` item in the parsed JSON must contain an array; got %q", streamsV) + } + + currentTimestamp := time.Now().UnixNano() var commonFields []logstorage.Field rowsIngested := 0 - for stIdx, st := range v.GetArray("streams") { - // `stream` contains labels for the stream. - // Labels are same for all entries in the stream. - logFields := st.GetObject("stream") - if logFields == nil { - logger.Warnf("missing streams field from %q", st) - logFields = &fastjson.Object{} - } - commonFields = slicesutil.ResizeNoCopyMayOverallocate(commonFields, logFields.Len()+1) - i := 0 - logFields.Visit(func(k []byte, v *fastjson.Value) { - sfName := bytesutil.ToUnsafeString(k) - sfValue := bytesutil.ToUnsafeString(v.GetStringBytes()) - commonFields[i].Name = sfName - commonFields[i].Value = sfValue - i++ - }) - msgFieldIdx := logFields.Len() - commonFields[msgFieldIdx].Name = msgField - - for idx, v := range st.GetArray("values") { - vs := v.GetArray() - if len(vs) != 2 { - return rowsIngested, fmt.Errorf("unexpected number of values in stream %d line %d: %q; got %d; want %d", stIdx, idx, v, len(vs), 2) - } - - tsString := bytesutil.ToUnsafeString(vs[0].GetStringBytes()) - ts, err := parseLokiTimestamp(tsString) + for _, stream := range streams { + // populate common labels from `stream` dict + commonFields = commonFields[:0] + labelsV := stream.Get("stream") + var labels *fastjson.Object + if labelsV != nil { + o, err := labelsV.Object() if err != nil { - return rowsIngested, fmt.Errorf("cannot parse timestamp in stream %d line %d: %q: %s", stIdx, idx, vs, err) + return rowsIngested, fmt.Errorf("`stream` item in the parsed JSON must contain an object; got %q", labelsV) + } + labels = o + } + labels.Visit(func(k []byte, v *fastjson.Value) { + if err != nil { + return + } + vStr, errLocal := v.StringBytes() + if errLocal != nil { + err = fmt.Errorf("unexpected label value type for %q:%q; want string", k, v) + return + } + commonFields = append(commonFields, logstorage.Field{ + Name: bytesutil.ToUnsafeString(k), + Value: bytesutil.ToUnsafeString(vStr), + }) + }) + if err != nil { + return rowsIngested, fmt.Errorf("error when parsing `stream` object: %w", err) + } + + // populate messages from `values` array + linesV := stream.Get("values") + if linesV == nil { + return rowsIngested, fmt.Errorf("missing `values` item in the parsed JSON %q", stream) + } + lines, err := linesV.Array() + if err != nil { + return rowsIngested, fmt.Errorf("`values` item in the parsed JSON must contain an array; got %q", linesV) + } + + fields := commonFields + for _, line := range lines { + lineA, err := line.Array() + if err != nil { + return rowsIngested, fmt.Errorf("unexpected contents of `values` item; want array; got %q", line) + } + if len(lineA) != 2 { + return rowsIngested, fmt.Errorf("unexpected number of values in `values` item array %q; got %d want 2", line, len(lineA)) } - commonFields[msgFieldIdx].Value = bytesutil.ToUnsafeString(vs[1].GetStringBytes()) - processLogMessage(ts, commonFields) + // parse timestamp + timestamp, err := lineA[0].StringBytes() + if err != nil { + return rowsIngested, fmt.Errorf("unexpected log timestamp type for %q; want string", lineA[0]) + } + ts, err := parseLokiTimestamp(bytesutil.ToUnsafeString(timestamp)) + if err != nil { + return rowsIngested, fmt.Errorf("cannot parse log timestamp %q: %w", timestamp, err) + } + if ts == 0 { + ts = currentTimestamp + } + + // parse log message + msg, err := lineA[1].StringBytes() + if err != nil { + return rowsIngested, fmt.Errorf("unexpected log message type for %q; want string", lineA[1]) + } + + fields = append(fields[:len(commonFields)], logstorage.Field{ + Name: "_msg", + Value: bytesutil.ToUnsafeString(msg), + }) + processLogMessage(ts, fields) - rowsIngested++ } + rowsIngested += len(lines) } return rowsIngested, nil } func parseLokiTimestamp(s string) (int64, error) { - // Parsing timestamp in nanoseconds + if s == "" { + // Special case - an empty timestamp must be substituted with the current time by the caller. + return 0, nil + } n, err := strconv.ParseInt(s, 10, 64) if err != nil { - return 0, fmt.Errorf("cannot parse timestamp in nanoseconds from %q: %w", s, err) - } - if n > int64(math.MaxInt64) { - return 0, fmt.Errorf("too big timestamp in nanoseconds: %d; mustn't exceed %d", n, math.MaxInt64) + // Fall back to parsing floating-point value + f, err := strconv.ParseFloat(s, 64) + if err != nil { + return 0, err + } + if f > math.MaxInt64 { + return 0, fmt.Errorf("too big timestamp in nanoseconds: %v; mustn't exceed %v", f, math.MaxInt64) + } + if f < math.MinInt64 { + return 0, fmt.Errorf("too small timestamp in nanoseconds: %v; must be bigger or equal to %v", f, math.MinInt64) + } + n = int64(f) } if n < 0 { - return 0, fmt.Errorf("too small timestamp in nanoseconds: %d; must be bigger than %d", n, 0) + return 0, fmt.Errorf("too small timestamp in nanoseconds: %d; must be bigger than 0", n) } return n, nil } diff --git a/app/vlinsert/loki/loki_json_test.go b/app/vlinsert/loki/loki_json_test.go index 5588035c6..93cf8652a 100644 --- a/app/vlinsert/loki/loki_json_test.go +++ b/app/vlinsert/loki/loki_json_test.go @@ -1,99 +1,130 @@ package loki import ( - "reflect" + "fmt" "strings" "testing" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" ) -func TestProcessJSONRequest(t *testing.T) { - type item struct { - ts int64 - fields []logstorage.Field - } - - same := func(s string, expected []item) { +func TestParseJSONRequestFailure(t *testing.T) { + f := func(s string) { t.Helper() - r := strings.NewReader(s) - actual := make([]item, 0) - n, err := processJSONRequest(r, func(timestamp int64, fields []logstorage.Field) { - actual = append(actual, item{ - ts: timestamp, - fields: fields, - }) + n, err := parseJSONRequest([]byte(s), func(timestamp int64, fields []logstorage.Field) { + t.Fatalf("unexpected call to parseJSONRequest callback!") }) - - if err != nil { - t.Fatalf("unexpected error: %s", err) - } - - if len(actual) != len(expected) || n != len(expected) { - t.Fatalf("unexpected len(actual)=%d; expecting %d", len(actual), len(expected)) - } - - for i, actualItem := range actual { - expectedItem := expected[i] - if actualItem.ts != expectedItem.ts { - t.Fatalf("unexpected timestamp for item #%d; got %d; expecting %d", i, actualItem.ts, expectedItem.ts) - } - if !reflect.DeepEqual(actualItem.fields, expectedItem.fields) { - t.Fatalf("unexpected fields for item #%d; got %v; expecting %v", i, actualItem.fields, expectedItem.fields) - } - } - } - - fail := func(s string) { - t.Helper() - r := strings.NewReader(s) - actual := make([]item, 0) - _, err := processJSONRequest(r, func(timestamp int64, fields []logstorage.Field) { - actual = append(actual, item{ - ts: timestamp, - fields: fields, - }) - }) - if err == nil { - t.Fatalf("expected to fail with body: %q", s) + t.Fatalf("expecting non-nil error") + } + if n != 0 { + t.Fatalf("unexpected number of parsed lines: %d; want 0", n) } - } + f(``) - same(`{"streams":[{"stream":{"foo":"bar"},"values":[["1577836800000000000","baz"]]}]}`, []item{ - { - ts: 1577836800000000000, - fields: []logstorage.Field{ - { - Name: "foo", - Value: "bar", - }, - { - Name: "_msg", - Value: "baz", - }, - }, - }, - }) + // Invalid json + f(`{}`) + f(`[]`) + f(`"foo"`) + f(`123`) - fail(``) - fail(`{"streams":[{"stream":{"foo" = "bar"},"values":[["1577836800000000000","baz"]]}]}`) - fail(`{"streams":[{"stream":{"foo": "bar"}`) + // invalid type for `streams` item + f(`{"streams":123}`) + + // Missing `values` item + f(`{"streams":[{}]}`) + + // Invalid type for `values` item + f(`{"streams":[{"values":"foobar"}]}`) + + // Invalid type for `stream` item + f(`{"streams":[{"stream":[],"values":[]}]}`) + + // Invalid type for `values` individual item + f(`{"streams":[{"values":[123]}]}`) + + // Invalid length of `values` individual item + f(`{"streams":[{"values":[[]]}]}`) + f(`{"streams":[{"values":[["123"]]}]}`) + f(`{"streams":[{"values":[["123","456","789"]]}]}`) + + // Invalid type for timestamp inside `values` individual item + f(`{"streams":[{"values":[[123,"456"]}]}`) + + // Invalid type for log message + f(`{"streams":[{"values":[["123",1234]]}]}`) } -func Test_parseLokiTimestamp(t *testing.T) { - f := func(s string, expected int64) { +func TestParseJSONRequestSuccess(t *testing.T) { + f := func(s string, resultExpected string) { t.Helper() - actual, err := parseLokiTimestamp(s) + var lines []string + n, err := parseJSONRequest([]byte(s), func(timestamp int64, fields []logstorage.Field) { + var a []string + for _, f := range fields { + a = append(a, f.String()) + } + line := fmt.Sprintf("_time:%d %s", timestamp, strings.Join(a, " ")) + lines = append(lines, line) + }) if err != nil { t.Fatalf("unexpected error: %s", err) } - if actual != expected { - t.Fatalf("unexpected timestamp; got %d; expecting %d", actual, expected) + if n != len(lines) { + t.Fatalf("unexpected number of lines parsed; got %d; want %d", n, len(lines)) + } + result := strings.Join(lines, "\n") + if result != resultExpected { + t.Fatalf("unexpected result;\ngot\n%s\nwant\n%s", result, resultExpected) } } - f("1687510468000000000", 1687510468000000000) - f("1577836800000000000", 1577836800000000000) + // Empty streams + f(`{"streams":[]}`, ``) + f(`{"streams":[{"values":[]}]}`, ``) + f(`{"streams":[{"stream":{},"values":[]}]}`, ``) + f(`{"streams":[{"stream":{"foo":"bar"},"values":[]}]}`, ``) + + // Empty stream labels + f(`{"streams":[{"values":[["1577836800000000001", "foo bar"]]}]}`, `_time:1577836800000000001 "_msg":"foo bar"`) + f(`{"streams":[{"stream":{},"values":[["1577836800000000001", "foo bar"]]}]}`, `_time:1577836800000000001 "_msg":"foo bar"`) + + // Non-empty stream labels + f(`{"streams":[{"stream":{ + "label1": "value1", + "label2": "value2" +},"values":[ + ["1577836800000000001", "foo bar"], + ["1477836900005000002", "abc"], + ["147.78369e9", "foobar"] +]}]}`, `_time:1577836800000000001 "label1":"value1" "label2":"value2" "_msg":"foo bar" +_time:1477836900005000002 "label1":"value1" "label2":"value2" "_msg":"abc" +_time:147783690000 "label1":"value1" "label2":"value2" "_msg":"foobar"`) + + // Multiple streams + f(`{ + "streams": [ + { + "stream": { + "foo": "bar", + "a": "b" + }, + "values": [ + ["1577836800000000001", "foo bar"], + ["1577836900005000002", "abc"] + ] + }, + { + "stream": { + "x": "y" + }, + "values": [ + ["1877836900005000002", "yx"] + ] + } + ] +}`, `_time:1577836800000000001 "foo":"bar" "a":"b" "_msg":"foo bar" +_time:1577836900005000002 "foo":"bar" "a":"b" "_msg":"abc" +_time:1877836900005000002 "x":"y" "_msg":"yx"`) } diff --git a/app/vlinsert/loki/loki_json_timing_test.go b/app/vlinsert/loki/loki_json_timing_test.go index 7da50b2fe..9c51f593a 100644 --- a/app/vlinsert/loki/loki_json_timing_test.go +++ b/app/vlinsert/loki/loki_json_timing_test.go @@ -3,71 +3,76 @@ package loki import ( "fmt" "strconv" - "strings" "testing" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" ) -func BenchmarkProcessJSONRequest(b *testing.B) { +func BenchmarkParseJSONRequest(b *testing.B) { for _, streams := range []int{5, 10} { for _, rows := range []int{100, 1000} { for _, labels := range []int{10, 50} { b.Run(fmt.Sprintf("streams_%d/rows_%d/labels_%d", streams, rows, labels), func(b *testing.B) { - benchmarkProcessJSONRequest(b, streams, rows, labels) + benchmarkParseJSONRequest(b, streams, rows, labels) }) } } } } -func benchmarkProcessJSONRequest(b *testing.B, streams, rows, labels int) { - s := getJSONBody(streams, rows, labels) +func benchmarkParseJSONRequest(b *testing.B, streams, rows, labels int) { b.ReportAllocs() - b.SetBytes(int64(len(s))) + b.SetBytes(int64(streams * rows)) b.RunParallel(func(pb *testing.PB) { + data := getJSONBody(streams, rows, labels) for pb.Next() { - _, err := processJSONRequest(strings.NewReader(s), func(timestamp int64, fields []logstorage.Field) {}) + _, err := parseJSONRequest(data, func(timestamp int64, fields []logstorage.Field) {}) if err != nil { - b.Fatalf("unexpected error: %s", err) + panic(fmt.Errorf("unexpected error: %s", err)) } } }) } -func getJSONBody(streams, rows, labels int) string { - body := `{"streams":[` +func getJSONBody(streams, rows, labels int) []byte { + body := append([]byte{}, `{"streams":[`...) now := time.Now().UnixNano() valuePrefix := fmt.Sprintf(`["%d","value_`, now) for i := 0; i < streams; i++ { - body += `{"stream":{` + body = append(body, `{"stream":{`...) for j := 0; j < labels; j++ { - body += `"label_` + strconv.Itoa(j) + `":"value_` + strconv.Itoa(j) + `"` + body = append(body, `"label_`...) + body = strconv.AppendInt(body, int64(j), 10) + body = append(body, `":"value_`...) + body = strconv.AppendInt(body, int64(j), 10) + body = append(body, '"') if j < labels-1 { - body += `,` + body = append(body, ',') } } - body += `}, "values":[` + body = append(body, `}, "values":[`...) for j := 0; j < rows; j++ { - body += valuePrefix + strconv.Itoa(j) + `"]` + body = append(body, valuePrefix...) + body = strconv.AppendInt(body, int64(j), 10) + body = append(body, `"]`...) if j < rows-1 { - body += `,` + body = append(body, ',') } } - body += `]}` + body = append(body, `]}`...) if i < streams-1 { - body += `,` + body = append(body, ',') } } - body += `]}` + body = append(body, `]}`...) return body } diff --git a/app/vlinsert/loki/loki_protobuf.go b/app/vlinsert/loki/loki_protobuf.go index a55cad138..a75bf93d0 100644 --- a/app/vlinsert/loki/loki_protobuf.go +++ b/app/vlinsert/loki/loki_protobuf.go @@ -4,68 +4,66 @@ import ( "fmt" "io" "net/http" + "strconv" + "strings" "sync" + "time" - "github.com/golang/snappy" - - "github.com/VictoriaMetrics/metrics" - "github.com/VictoriaMetrics/metricsql" - + "github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" + "github.com/VictoriaMetrics/metrics" + "github.com/golang/snappy" ) var ( - rowsIngestedTotalProtobuf = metrics.NewCounter(`vl_rows_ingested_total{type="loki", format="protobuf"}`) + rowsIngestedProtobufTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="protobuf"}`) bytesBufPool bytesutil.ByteBufferPool pushReqsPool sync.Pool ) func handleProtobuf(r *http.Request, w http.ResponseWriter) bool { wcr := writeconcurrencylimiter.GetReader(r.Body) - defer writeconcurrencylimiter.PutReader(wcr) + data, err := io.ReadAll(wcr) + writeconcurrencylimiter.PutReader(wcr) + if err != nil { + httpserver.Errorf(w, r, "cannot read request body: %s", err) + return true + } cp, err := getCommonParams(r) if err != nil { - httpserver.Errorf(w, r, "cannot parse request: %s", err) + httpserver.Errorf(w, r, "cannot parse common params from request: %s", err) return true } lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields) - defer logstorage.PutLogRows(lr) - processLogMessage := cp.GetProcessLogMessageFunc(lr) - n, err := processProtobufRequest(wcr, processLogMessage) + n, err := parseProtobufRequest(data, processLogMessage) + vlstorage.MustAddRows(lr) + logstorage.PutLogRows(lr) if err != nil { - httpserver.Errorf(w, r, "cannot decode loki request: %s", err) + httpserver.Errorf(w, r, "cannot parse loki request: %s", err) return true } - - rowsIngestedTotalProtobuf.Add(n) - + rowsIngestedProtobufTotal.Add(n) return true } -func processProtobufRequest(r io.Reader, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) { - wcr := writeconcurrencylimiter.GetReader(r) - defer writeconcurrencylimiter.PutReader(wcr) - - bytes, err := io.ReadAll(wcr) - if err != nil { - return 0, fmt.Errorf("cannot read request body: %s", err) - } - +func parseProtobufRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) { bb := bytesBufPool.Get() defer bytesBufPool.Put(bb) - bb.B, err = snappy.Decode(bb.B[:cap(bb.B)], bytes) - if err != nil { - return 0, fmt.Errorf("cannot decode snappy from request body: %s", err) - } - req := getPushReq() - defer putPushReq(req) + buf, err := snappy.Decode(bb.B[:cap(bb.B)], data) + if err != nil { + return 0, fmt.Errorf("cannot decode snappy-encoded request body: %w", err) + } + bb.B = buf + + req := getPushRequest() + defer putPushRequest(req) + err = req.Unmarshal(bb.B) if err != nil { return 0, fmt.Errorf("cannot parse request body: %s", err) @@ -73,59 +71,93 @@ func processProtobufRequest(r io.Reader, processLogMessage func(timestamp int64, var commonFields []logstorage.Field rowsIngested := 0 - for stIdx, st := range req.Streams { + streams := req.Streams + currentTimestamp := time.Now().UnixNano() + for i := range streams { + stream := &streams[i] // st.Labels contains labels for the stream. // Labels are same for all entries in the stream. - commonFields, err = parseLogFields(st.Labels, commonFields) + commonFields, err = parsePromLabels(commonFields[:0], stream.Labels) if err != nil { - return rowsIngested, fmt.Errorf("failed to unmarshal labels in stream %d: %q; %s", stIdx, st.Labels, err) + return rowsIngested, fmt.Errorf("cannot parse stream labels %q: %s", stream.Labels, err) } - msgFieldIDx := len(commonFields) - 1 - commonFields[msgFieldIDx].Name = msgField + fields := commonFields - for _, v := range st.Entries { - commonFields[msgFieldIDx].Value = v.Line - processLogMessage(v.Timestamp.UnixNano(), commonFields) - rowsIngested++ + entries := stream.Entries + for j := range entries { + entry := &entries[j] + fields = append(fields[:len(commonFields)], logstorage.Field{ + Name: "_msg", + Value: entry.Line, + }) + ts := entry.Timestamp.UnixNano() + if ts == 0 { + ts = currentTimestamp + } + processLogMessage(ts, fields) } + rowsIngested += len(stream.Entries) } return rowsIngested, nil } -// Parses logs fields s and returns the corresponding log fields. -// Cannot use searchutils.ParseMetricSelector here because its dependencies -// bring flags which clashes with logstorage flags. +// parsePromLabels parses log fields in Prometheus text exposition format from s, appends them to dst and returns the result. // -// Loki encodes labels in the PromQL labels format. // See test data of promtail for examples: https://github.com/grafana/loki/blob/a24ef7b206e0ca63ee74ca6ecb0a09b745cd2258/pkg/push/types_test.go -func parseLogFields(s string, dst []logstorage.Field) ([]logstorage.Field, error) { - expr, err := metricsql.Parse(s) - if err != nil { - return nil, err +func parsePromLabels(dst []logstorage.Field, s string) ([]logstorage.Field, error) { + // Make sure s is wrapped into `{...}` + s = strings.TrimSpace(s) + if len(s) < 2 { + return nil, fmt.Errorf("too short string to parse: %q", s) } - - me, ok := expr.(*metricsql.MetricExpr) - if !ok { - return nil, fmt.Errorf("failed to parse stream labels; got %q", expr.AppendString(nil)) + if s[0] != '{' { + return nil, fmt.Errorf("missing `{` at the beginning of %q", s) } - - // Expecting only label filters without MetricsQL "or" operator. - if len(me.LabelFilterss) != 1 { - return nil, fmt.Errorf("unexpected format of log fields; got %q", s) + if s[len(s)-1] != '}' { + return nil, fmt.Errorf("missing `}` at the end of %q", s) } + s = s[1 : len(s)-1] - // Allocate space for labels + msg field. - // Msg field is added by caller. - dst = slicesutil.ResizeNoCopyMayOverallocate(dst, len(me.LabelFilterss[0])) - for i, l := range me.LabelFilterss[0] { - dst[i].Name = l.Label - dst[i].Value = l.Value + for len(s) > 0 { + // Parse label name + n := strings.IndexByte(s, '=') + if n < 0 { + return nil, fmt.Errorf("cannot find `=` char for label value at %s", s) + } + name := s[:n] + s = s[n+1:] + + // Parse label value + qs, err := strconv.QuotedPrefix(s) + if err != nil { + return nil, fmt.Errorf("cannot parse value for label %q at %s: %w", name, s, err) + } + s = s[len(qs):] + value, err := strconv.Unquote(qs) + if err != nil { + return nil, fmt.Errorf("cannot unquote value %q for label %q: %w", qs, name, err) + } + + // Append the found field to dst. + dst = append(dst, logstorage.Field{ + Name: name, + Value: value, + }) + + // Check whether there are other labels remaining + if len(s) == 0 { + break + } + if !strings.HasPrefix(s, ",") { + return nil, fmt.Errorf("missing `,` char at %s", s) + } + s = s[1:] + s = strings.TrimPrefix(s, " ") } - return dst, nil } -func getPushReq() *PushRequest { +func getPushRequest() *PushRequest { v := pushReqsPool.Get() if v == nil { return &PushRequest{} @@ -133,7 +165,7 @@ func getPushReq() *PushRequest { return v.(*PushRequest) } -func putPushReq(reqs *PushRequest) { - reqs.Reset() - pushReqsPool.Put(reqs) +func putPushRequest(req *PushRequest) { + req.Reset() + pushReqsPool.Put(req) } diff --git a/app/vlinsert/loki/loki_protobuf_test.go b/app/vlinsert/loki/loki_protobuf_test.go index e590668ee..f6eb5f0ec 100644 --- a/app/vlinsert/loki/loki_protobuf_test.go +++ b/app/vlinsert/loki/loki_protobuf_test.go @@ -1,50 +1,171 @@ package loki import ( - "bytes" - "strconv" + "fmt" + "strings" "testing" "time" - "github.com/golang/snappy" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" + "github.com/golang/snappy" ) -func TestProcessProtobufRequest(t *testing.T) { - body := getProtobufBody(5, 5, 5) - - reader := bytes.NewReader(body) - _, err := processProtobufRequest(reader, func(timestamp int64, fields []logstorage.Field) {}) - if err != nil { - t.Fatalf("unexpected error: %s", err) - } -} - -func getProtobufBody(streams, rows, labels int) []byte { - var pr PushRequest - - for i := 0; i < streams; i++ { - var st Stream - - st.Labels = `{` - for j := 0; j < labels; j++ { - st.Labels += `label_` + strconv.Itoa(j) + `="value_` + strconv.Itoa(j) + `"` - if j < labels-1 { - st.Labels += `,` +func TestParseProtobufRequestSuccess(t *testing.T) { + f := func(s string, resultExpected string) { + t.Helper() + var pr PushRequest + n, err := parseJSONRequest([]byte(s), func(timestamp int64, fields []logstorage.Field) { + msg := "" + for _, f := range fields { + if f.Name == "_msg" { + msg = f.Value + } } + var a []string + for _, f := range fields { + if f.Name == "_msg" { + continue + } + item := fmt.Sprintf("%s=%q", f.Name, f.Value) + a = append(a, item) + } + labels := "{" + strings.Join(a, ", ") + "}" + pr.Streams = append(pr.Streams, Stream{ + Labels: labels, + Entries: []Entry{ + { + Timestamp: time.Unix(0, timestamp), + Line: msg, + }, + }, + }) + }) + if err != nil { + t.Fatalf("unexpected error: %s", err) } - st.Labels += `}` - - for j := 0; j < rows; j++ { - st.Entries = append(st.Entries, Entry{Timestamp: time.Now(), Line: "value_" + strconv.Itoa(j)}) + if n != len(pr.Streams) { + t.Fatalf("unexpected number of streams; got %d; want %d", len(pr.Streams), n) } - pr.Streams = append(pr.Streams, st) + data, err := pr.Marshal() + if err != nil { + t.Fatalf("unexpected error when marshaling PushRequest: %s", err) + } + encodedData := snappy.Encode(nil, data) + + var lines []string + n, err = parseProtobufRequest(encodedData, func(timestamp int64, fields []logstorage.Field) { + var a []string + for _, f := range fields { + a = append(a, f.String()) + } + line := fmt.Sprintf("_time:%d %s", timestamp, strings.Join(a, " ")) + lines = append(lines, line) + }) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if n != len(lines) { + t.Fatalf("unexpected number of lines parsed; got %d; want %d", n, len(lines)) + } + result := strings.Join(lines, "\n") + if result != resultExpected { + t.Fatalf("unexpected result;\ngot\n%s\nwant\n%s", result, resultExpected) + } } - body, _ := pr.Marshal() - encodedBody := snappy.Encode(nil, body) + // Empty streams + f(`{"streams":[]}`, ``) + f(`{"streams":[{"values":[]}]}`, ``) + f(`{"streams":[{"stream":{},"values":[]}]}`, ``) + f(`{"streams":[{"stream":{"foo":"bar"},"values":[]}]}`, ``) - return encodedBody + // Empty stream labels + f(`{"streams":[{"values":[["1577836800000000001", "foo bar"]]}]}`, `_time:1577836800000000001 "_msg":"foo bar"`) + f(`{"streams":[{"stream":{},"values":[["1577836800000000001", "foo bar"]]}]}`, `_time:1577836800000000001 "_msg":"foo bar"`) + + // Non-empty stream labels + f(`{"streams":[{"stream":{ + "label1": "value1", + "label2": "value2" +},"values":[ + ["1577836800000000001", "foo bar"], + ["1477836900005000002", "abc"], + ["147.78369e9", "foobar"] +]}]}`, `_time:1577836800000000001 "label1":"value1" "label2":"value2" "_msg":"foo bar" +_time:1477836900005000002 "label1":"value1" "label2":"value2" "_msg":"abc" +_time:147783690000 "label1":"value1" "label2":"value2" "_msg":"foobar"`) + + // Multiple streams + f(`{ + "streams": [ + { + "stream": { + "foo": "bar", + "a": "b" + }, + "values": [ + ["1577836800000000001", "foo bar"], + ["1577836900005000002", "abc"] + ] + }, + { + "stream": { + "x": "y" + }, + "values": [ + ["1877836900005000002", "yx"] + ] + } + ] +}`, `_time:1577836800000000001 "foo":"bar" "a":"b" "_msg":"foo bar" +_time:1577836900005000002 "foo":"bar" "a":"b" "_msg":"abc" +_time:1877836900005000002 "x":"y" "_msg":"yx"`) +} + +func TestParsePromLabelsSuccess(t *testing.T) { + f := func(s string) { + t.Helper() + fields, err := parsePromLabels(nil, s) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + var a []string + for _, f := range fields { + a = append(a, fmt.Sprintf("%s=%q", f.Name, f.Value)) + } + result := "{" + strings.Join(a, ", ") + "}" + if result != s { + t.Fatalf("unexpected result;\ngot\n%s\nwant\n%s", result, s) + } + } + + f("{}") + f(`{foo="bar"}`) + f(`{foo="bar", baz="x", y="z"}`) + f(`{foo="ba\"r\\z\n", a="", b="\"\\"}`) +} + +func TestParsePromLabelsFailure(t *testing.T) { + f := func(s string) { + t.Helper() + fields, err := parsePromLabels(nil, s) + if err == nil { + t.Fatalf("expecting non-nil error") + } + if len(fields) > 0 { + t.Fatalf("unexpected non-empty fields: %s", fields) + } + } + + f("") + f("{") + f(`{foo}`) + f(`{foo=bar}`) + f(`{foo="bar}`) + f(`{foo="ba\",r}`) + f(`{foo="bar" baz="aa"}`) + f(`foobar`) + f(`foo{bar="baz"}`) } diff --git a/app/vlinsert/loki/loki_protobuf_timing_test.go b/app/vlinsert/loki/loki_protobuf_timing_test.go index 0da5fe741..18f5b89ef 100644 --- a/app/vlinsert/loki/loki_protobuf_timing_test.go +++ b/app/vlinsert/loki/loki_protobuf_timing_test.go @@ -1,35 +1,65 @@ package loki import ( - "bytes" "fmt" + "strconv" "testing" + "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" + "github.com/golang/snappy" ) -func BenchmarkProcessProtobufRequest(b *testing.B) { +func BenchmarkParseProtobufRequest(b *testing.B) { for _, streams := range []int{5, 10} { for _, rows := range []int{100, 1000} { for _, labels := range []int{10, 50} { b.Run(fmt.Sprintf("streams_%d/rows_%d/labels_%d", streams, rows, labels), func(b *testing.B) { - benchmarkProcessProtobufRequest(b, streams, rows, labels) + benchmarkParseProtobufRequest(b, streams, rows, labels) }) } } } } -func benchmarkProcessProtobufRequest(b *testing.B, streams, rows, labels int) { - body := getProtobufBody(streams, rows, labels) +func benchmarkParseProtobufRequest(b *testing.B, streams, rows, labels int) { b.ReportAllocs() - b.SetBytes(int64(len(body))) + b.SetBytes(int64(streams * rows)) b.RunParallel(func(pb *testing.PB) { + body := getProtobufBody(streams, rows, labels) for pb.Next() { - _, err := processProtobufRequest(bytes.NewBuffer(body), func(timestamp int64, fields []logstorage.Field) {}) + _, err := parseProtobufRequest(body, func(timestamp int64, fields []logstorage.Field) {}) if err != nil { - b.Fatalf("unexpected error: %s", err) + panic(fmt.Errorf("unexpected error: %s", err)) } } }) } + +func getProtobufBody(streams, rows, labels int) []byte { + var pr PushRequest + + for i := 0; i < streams; i++ { + var st Stream + + st.Labels = `{` + for j := 0; j < labels; j++ { + st.Labels += `label_` + strconv.Itoa(j) + `="value_` + strconv.Itoa(j) + `"` + if j < labels-1 { + st.Labels += `,` + } + } + st.Labels += `}` + + for j := 0; j < rows; j++ { + st.Entries = append(st.Entries, Entry{Timestamp: time.Now(), Line: "value_" + strconv.Itoa(j)}) + } + + pr.Streams = append(pr.Streams, st) + } + + body, _ := pr.Marshal() + encodedBody := snappy.Encode(nil, body) + + return encodedBody +} diff --git a/deployment/docker/victorialogs/promtail/config.yml b/deployment/docker/victorialogs/promtail/config.yml index 3587c8dae..4a5974a7d 100644 --- a/deployment/docker/victorialogs/promtail/config.yml +++ b/deployment/docker/victorialogs/promtail/config.yml @@ -6,7 +6,7 @@ positions: filename: /tmp/positions.yaml clients: - - url: http://vlogs:9428/insert/loki/api/v1/push?_stream_fields=filename,job,stream,host,app,pid + - url: http://vlogs:9428/insert/loki/api/v1/push?_stream_fields=instance,job,host,app,pid tenant_id: "0:0" scrape_configs: diff --git a/deployment/docker/victorialogs/promtail/docker-compose.yml b/deployment/docker/victorialogs/promtail/docker-compose.yml index cd9e6c0ad..1e05a2f8f 100644 --- a/deployment/docker/victorialogs/promtail/docker-compose.yml +++ b/deployment/docker/victorialogs/promtail/docker-compose.yml @@ -13,7 +13,7 @@ services: # Run `make package-victoria-logs` to build victoria-logs image vlogs: - image: docker.io/victoriametrics/victoria-logs:latest + image: docker.io/victoriametrics/victoria-logs:v0.2.0-victorialogs volumes: - victorialogs-promtail-docker:/vlogs ports: diff --git a/docs/VictoriaLogs/data-ingestion/Promtail.md b/docs/VictoriaLogs/data-ingestion/Promtail.md index 7aa3850b4..7d72e9a99 100644 --- a/docs/VictoriaLogs/data-ingestion/Promtail.md +++ b/docs/VictoriaLogs/data-ingestion/Promtail.md @@ -1,16 +1,20 @@ # Promtail setup +[Promtail](https://grafana.com/docs/loki/latest/clients/promtail/) is a default log shipper for Grafana Loki. +Promtail can be configured to send the collected logs to VictoriaLogs according to the following docs. + Specify [`clients`](https://grafana.com/docs/loki/latest/clients/promtail/configuration/#clients) section in the configuration file for sending the collected logs to [VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/): ```yaml clients: - - url: http://vlogs:9428/insert/loki/api/v1/push?_stream_fields=filename,job,stream,host,app,pid + - url: http://localhost:9428/insert/loki/api/v1/push?_stream_fields=instance,job,host,app,pid ``` -Substitute `vlogs:9428` address inside `clients` with the real TCP address of VictoriaLogs. +Substitute `localhost:9428` address inside `clients` with the real TCP address of VictoriaLogs. See [these docs](https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/#http-parameters) for details on the used URL query parameter section. +There is no need in specifying `_msg_field` and `_time_field` query args, since VictoriaLogs automatically extracts log message and timestamp from the ingested Loki data. It is recommended verifying whether the initial setup generates the needed [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) and uses the correct [stream fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields). @@ -19,27 +23,28 @@ and inspecting VictoriaLogs logs then: ```yaml clients: - - url: http://vlogs:9428/insert/loki/api/v1/push?_stream_fields=filename,job,stream,host,app,pid&debug=1 + - url: http://localhost:9428/insert/loki/api/v1/push?_stream_fields=instance,job,host,app,pid&debug=1 ``` If some [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) must be skipped during data ingestion, then they can be put into `ignore_fields` [parameter](https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/#http-parameters). -For example, the following config instructs VictoriaLogs to ignore `log.offset` and `event.original` fields in the ingested logs: +For example, the following config instructs VictoriaLogs to ignore `filename` and `stream` fields in the ingested logs: ```yaml clients: - - url: http://vlogs:9428/insert/loki/api/v1/push?_stream_fields=filename,job,stream,host,app,pid&debug=1 + - url: http://localhost:9428/insert/loki/api/v1/push?_stream_fields=instance,job,host,app,pid&ignore_fields=filename,stream ``` By default the ingested logs are stored in the `(AccountID=0, ProjectID=0)` [tenant](https://docs.victoriametrics.com/VictoriaLogs/#multitenancy). -If you need storing logs in other tenant, then It is possible to either use `tenant_id` provided by Loki configuration, or to use `headers` and provide -`AccountID` and `ProjectID` headers. Format for `tenant_id` is `AccountID:ProjectID`. -For example, the following config instructs VictoriaLogs to store logs in the `(AccountID=12, ProjectID=12)` tenant: +If you need storing logs in other tenant, then specify the needed tenant via `tenant_id` field +in the [Loki client configuration](https://grafana.com/docs/loki/latest/clients/promtail/configuration/#clients) +The `tenant_id` must have `AccountID:ProjectID` format, where `AccountID` and `ProjectID` are arbitrary uint32 numbers. +For example, the following config instructs VictoriaLogs to store logs in the `(AccountID=12, ProjectID=34)` [tenant](https://docs.victoriametrics.com/VictoriaLogs/#multitenancy): ```yaml clients: - - url: http://vlogs:9428/insert/loki/api/v1/push?_stream_fields=filename,job,stream,host,app,pid&debug=1 - tenant_id: "12:12" + - url: http://localhost:9428/insert/loki/api/v1/push?_stream_fields=instance,job,host,app,pid&debug=1 + tenant_id: "12:34" ``` The ingested log entries can be queried according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/querying/). diff --git a/docs/VictoriaLogs/data-ingestion/README.md b/docs/VictoriaLogs/data-ingestion/README.md index 6b02566cd..925106100 100644 --- a/docs/VictoriaLogs/data-ingestion/README.md +++ b/docs/VictoriaLogs/data-ingestion/README.md @@ -17,7 +17,7 @@ menu: - Fluentbit. See [how to setup Fluentbit for sending logs to VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/Fluentbit.html). - Logstash. See [how to setup Logstash for sending logs to VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/Logstash.html). - Vector. See [how to setup Vector for sending logs to VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/Vector.html). -- Promtail. See [how to setup Promtail for sending logs to VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/Promtail.html). +- Promtail (aka Grafana Loki). See [how to setup Promtail for sending logs to VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/Promtail.html). The ingested logs can be queried according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/querying/). @@ -33,7 +33,7 @@ VictoriaLogs supports the following data ingestion HTTP APIs: - Elasticsearch bulk API. See [these docs](#elasticsearch-bulk-api). - JSON stream API aka [ndjson](http://ndjson.org/). See [these docs](#json-stream-api). -- [Loki JSON API](https://grafana.com/docs/loki/latest/api/#push-log-entries-to-lokiq). See [these docs](#loki-json-api). +- Loki JSON API. See [these docs](#loki-json-api). VictoriaLogs accepts optional [HTTP parameters](#http-parameters) at data ingestion HTTP APIs. @@ -47,12 +47,18 @@ The following command pushes a single log line to VictoriaLogs: ```bash echo '{"create":{}} -{"_msg":"cannot open file","_time":"2023-06-21T04:24:24Z","host.name":"host123"} +{"_msg":"cannot open file","_time":"0","host.name":"host123"} ' | curl -X POST -H 'Content-Type: application/json' --data-binary @- http://localhost:9428/insert/elasticsearch/_bulk ``` It is possible to push thousands of log lines in a single request to this API. +If the [timestamp field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field) is set to `"0"`, +then the current timestamp at VictoriaLogs side is used per each ingested log line. +Otherwise the timestamp field must be in the [ISO8601](https://en.wikipedia.org/wiki/ISO_8601) format. For example, `2023-06-20T15:32:10Z`. +Optional fractional part of seconds can be specified after the dot - `2023-06-20T15:32:10.123Z`. +Timezone can be specified instead of `Z` suffix - `2023-06-20T15:32:10+02:00`. + See [these docs](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) for details on fields, which must be present in the ingested log messages. @@ -88,17 +94,18 @@ VictoriaLogs accepts JSON line stream aka [ndjson](http://ndjson.org/) at `http: The following command pushes multiple log lines to VictoriaLogs: ```bash -echo '{ "log": { "level": "info", "message": "hello world" }, "date": "2023-06-20T15:31:23Z", "stream": "stream1" } -{ "log": { "level": "error", "message": "oh no!" }, "date": "2023-06-20T15:32:10.567Z", "stream": "stream1" } -{ "log": { "level": "info", "message": "hello world" }, "date": "2023-06-20T15:35:11.567890+02:00", "stream": "stream2" } +echo '{ "log": { "level": "info", "message": "hello world" }, "date": "0", "stream": "stream1" } +{ "log": { "level": "error", "message": "oh no!" }, "date": "0", "stream": "stream1" } +{ "log": { "level": "info", "message": "hello world" }, "date": "0", "stream": "stream2" } ' | curl -X POST -H 'Content-Type: application/stream+json' --data-binary @- \ 'http://localhost:9428/insert/jsonline?_stream_fields=stream&_time_field=date&_msg_field=log.message' ``` It is possible to push unlimited number of log lines in a single request to this API. -The [timestamp field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field) must be -in the [ISO8601](https://en.wikipedia.org/wiki/ISO_8601) format. For example, `2023-06-20T15:32:10Z`. +If the [timestamp field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field) is set to `"0"`, +then the current timestamp at VictoriaLogs side is used per each ingested log line. +Otherwise the timestamp field must be in the [ISO8601](https://en.wikipedia.org/wiki/ISO_8601) format. For example, `2023-06-20T15:32:10Z`. Optional fractional part of seconds can be specified after the dot - `2023-06-20T15:32:10.123Z`. Timezone can be specified instead of `Z` suffix - `2023-06-20T15:32:10+02:00`. @@ -134,15 +141,42 @@ See also: ### Loki JSON API -VictoriaLogs accepts logs in [Loki JSON API](https://grafana.com/docs/loki/latest/api/#push-log-entries-to-lokiq) format at `http://localhost:9428/insert/loki/api/v1/push` endpoint. +VictoriaLogs accepts logs in [Loki JSON API](https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki) format at `http://localhost:9428/insert/loki/api/v1/push` endpoint. The following command pushes a single log line to Loki JSON API at VictoriaLogs: ```bash -curl -v -H "Content-Type: application/json" -XPOST -s "http://localhost:9428/insert/loki/api/v1/push?_stream_fields=foo" --data-raw \ - '{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}' +curl -H "Content-Type: application/json" -XPOST "http://localhost:9428/insert/loki/api/v1/push?_stream_fields=instance,job" --data-raw \ + '{"streams": [{ "stream": { "instance": "host123", "job": "app42" }, "values": [ [ "0", "foo fizzbuzz bar" ] ] }]}' ``` +It is possible to push thousands of log streams and log lines in a single request to this API. + +The API accepts various http parameters, which can change the data ingestion behavior - [these docs](#http-parameters) for details. + +The following command verifies that the data has been successfully ingested into VictoriaLogs by [querying](https://docs.victoriametrics.com/VictoriaLogs/querying/) it: + +```bash +curl http://localhost:9428/select/logsql/query -d 'query=fizzbuzz' +``` + +The command should return the following response: + +```bash +{"_msg":"foo fizzbuzz bar","_stream":"{instance=\"host123\",job=\"app42\"}","_time":"2023-07-20T23:01:19.288676497Z"} +``` + +The response by default contains [`_msg`](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#message-field), +[`_stream`](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields) and +[`_time`](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field) fields plus the explicitly mentioned fields. +See [these docs](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#querying-specific-fields) for details. + +See also: + +- [How to debug data ingestion](#troubleshooting). +- [HTTP parameters, which can be passed to the API](#http-parameters). +- [How to query VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/querying.html). + ### HTTP parameters VictoriaLogs accepts the following parameters at [data ingestion HTTP APIs](#http-apis): diff --git a/lib/logstorage/log_rows.go b/lib/logstorage/log_rows.go index 6efbc8a3d..ce759b85a 100644 --- a/lib/logstorage/log_rows.go +++ b/lib/logstorage/log_rows.go @@ -230,6 +230,7 @@ func (lr *LogRows) GetRowString(idx int) string { // GetLogRows returns LogRows from the pool for the given streamFields. // // streamFields is a set of field names, which must be associated with the stream. +// ignoreFields is a set of field names, which must be ignored during data ingestion. // // Return back it to the pool with PutLogRows() when it is no longer needed. func GetLogRows(streamFields, ignoreFields []string) *LogRows { diff --git a/lib/logstorage/tenant_id.go b/lib/logstorage/tenant_id.go index 49873dcee..1d406b875 100644 --- a/lib/logstorage/tenant_id.go +++ b/lib/logstorage/tenant_id.go @@ -87,7 +87,7 @@ func GetTenantIDFromString(s string) (TenantID, error) { if colon < 0 { account, err := getUint32FromString(s) if err != nil { - return tenantID, fmt.Errorf("cannot parse %q as TenantID: %w", s, err) + return tenantID, fmt.Errorf("cannot parse accountID from %q: %w", s, err) } tenantID.AccountID = account @@ -96,13 +96,13 @@ func GetTenantIDFromString(s string) (TenantID, error) { account, err := getUint32FromString(s[:colon]) if err != nil { - return tenantID, fmt.Errorf("cannot parse %q as TenantID: %w", s, err) + return tenantID, fmt.Errorf("cannot parse accountID part from %q: %w", s, err) } tenantID.AccountID = account project, err := getUint32FromString(s[colon+1:]) if err != nil { - return tenantID, fmt.Errorf("cannot parse %q as TenantID: %w", s, err) + return tenantID, fmt.Errorf("cannot parse projectID part from %q: %w", s, err) } tenantID.ProjectID = project diff --git a/lib/slicesutil/resize.go b/lib/slicesutil/resize.go deleted file mode 100644 index b650b6ca9..000000000 --- a/lib/slicesutil/resize.go +++ /dev/null @@ -1,20 +0,0 @@ -package slicesutil - -import "math/bits" - -// ResizeNoCopyMayOverallocate resizes dst to minimum n bytes and returns the resized buffer (which may be newly allocated). -// -// If newly allocated buffer is returned then b contents isn't copied to it. -func ResizeNoCopyMayOverallocate[T any](dst []T, n int) []T { - if n <= cap(dst) { - return dst[:n] - } - nNew := roundToNearestPow2(n) - dstNew := make([]T, nNew) - return dstNew[:n] -} - -func roundToNearestPow2(n int) int { - pow2 := uint8(bits.Len(uint(n - 1))) - return 1 << pow2 -}