diff --git a/.gitignore b/.gitignore index f06c7eeea..96d97c523 100644 --- a/.gitignore +++ b/.gitignore @@ -8,11 +8,11 @@ *.test *.swp /gocache-for-docker +/victoria-logs-data /victoria-metrics-data /vmagent-remotewrite-data /vmstorage-data /vmselect-cache -/victoria-logs-data /package/temp-deb-* /package/temp-rpm-* /package/*.deb diff --git a/app/vlinsert/elasticsearch/elasticsearch.go b/app/vlinsert/elasticsearch/elasticsearch.go index 7f1a22956..c11d8b398 100644 --- a/app/vlinsert/elasticsearch/elasticsearch.go +++ b/app/vlinsert/elasticsearch/elasticsearch.go @@ -199,12 +199,15 @@ func readBulkLine(sc *bufio.Scanner, timeField, msgField string, return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err) } - timestamp, err := extractTimestampFromFields(timeField, p.Fields) + ts, err := extractTimestampFromFields(timeField, p.Fields) if err != nil { return false, fmt.Errorf("cannot parse timestamp: %w", err) } + if ts == 0 { + ts = time.Now().UnixNano() + } p.RenameField(msgField, "_msg") - processLogMessage(timestamp, p.Fields) + processLogMessage(ts, p.Fields) logjson.PutParser(p) return true, nil } @@ -222,10 +225,15 @@ func extractTimestampFromFields(timeField string, fields []logstorage.Field) (in f.Value = "" return timestamp, nil } - return time.Now().UnixNano(), nil + return 0, nil } func parseElasticsearchTimestamp(s string) (int64, error) { + if s == "0" || s == "" { + // Special case - zero or empty timestamp must be substituted + // with the current time by the caller. + return 0, nil + } if len(s) < len("YYYY-MM-DD") || s[len("YYYY")] != '-' { // Try parsing timestamp in milliseconds n, err := strconv.ParseInt(s, 10, 64) diff --git a/app/vlinsert/jsonline/jsonline.go b/app/vlinsert/jsonline/jsonline.go index 373bc0129..1b901db36 100644 --- a/app/vlinsert/jsonline/jsonline.go +++ b/app/vlinsert/jsonline/jsonline.go @@ -99,12 +99,15 @@ func readLine(sc *bufio.Scanner, timeField, msgField string, processLogMessage f if err := p.ParseLogMessage(line); err != nil { return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err) } - timestamp, err := extractTimestampFromFields(timeField, p.Fields) + ts, err := extractTimestampFromFields(timeField, p.Fields) if err != nil { return false, fmt.Errorf("cannot parse timestamp: %w", err) } + if ts == 0 { + ts = time.Now().UnixNano() + } p.RenameField(msgField, "_msg") - processLogMessage(timestamp, p.Fields) + processLogMessage(ts, p.Fields) logjson.PutParser(p) return true, nil } @@ -122,10 +125,15 @@ func extractTimestampFromFields(timeField string, fields []logstorage.Field) (in f.Value = "" return timestamp, nil } - return time.Now().UnixNano(), nil + return 0, nil } func parseISO8601Timestamp(s string) (int64, error) { + if s == "0" || s == "" { + // Special case for returning the current timestamp. + // It must be automatically converted to the current timestamp by the caller. + return 0, nil + } t, err := time.Parse(time.RFC3339, s) if err != nil { return 0, fmt.Errorf("cannot parse timestamp %q: %w", s, err) diff --git a/app/vlinsert/loki/loki.go b/app/vlinsert/loki/loki.go index b7450b0b5..878616328 100644 --- a/app/vlinsert/loki/loki.go +++ b/app/vlinsert/loki/loki.go @@ -4,35 +4,32 @@ import ( "net/http" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" "github.com/VictoriaMetrics/metrics" ) -const msgField = "_msg" - var ( - lokiRequestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push"}`) + lokiRequestsJSONTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="json"}`) + lokiRequestsProtobufTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="protobuf"}`) ) -// RequestHandler processes ElasticSearch insert requests +// RequestHandler processes Loki insert requests +// +// See https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool { - switch path { - case "/api/v1/push": - contentType := r.Header.Get("Content-Type") - lokiRequestsTotal.Inc() - switch contentType { - case "application/x-protobuf": - return handleProtobuf(r, w) - case "application/json", "gzip": - return handleJSON(r, w) - default: - logger.Warnf("unsupported Content-Type=%q for %q request; skipping it", contentType, path) - return false - } - default: + if path != "/api/v1/push" { return false } + contentType := r.Header.Get("Content-Type") + switch contentType { + case "application/json": + lokiRequestsJSONTotal.Inc() + return handleJSON(r, w) + default: + // Protobuf request body should be handled by default accoring to https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki + lokiRequestsProtobufTotal.Inc() + return handleProtobuf(r, w) + } } func getCommonParams(r *http.Request) (*insertutils.CommonParams, error) { diff --git a/app/vlinsert/loki/loki_json.go b/app/vlinsert/loki/loki_json.go index 874f92cf4..5ace832b0 100644 --- a/app/vlinsert/loki/loki_json.go +++ b/app/vlinsert/loki/loki_json.go @@ -6,127 +6,185 @@ import ( "math" "net/http" "strconv" + "time" - "github.com/valyala/fastjson" - - "github.com/VictoriaMetrics/metrics" - + "github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" + "github.com/VictoriaMetrics/metrics" + "github.com/valyala/fastjson" ) var ( - rowsIngestedTotalJSON = metrics.NewCounter(`vl_rows_ingested_total{type="loki", format="json"}`) + rowsIngestedJSONTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="json"}`) parserPool fastjson.ParserPool ) func handleJSON(r *http.Request, w http.ResponseWriter) bool { - contentType := r.Header.Get("Content-Type") reader := r.Body - if contentType == "gzip" { + if r.Header.Get("Content-Encoding") == "gzip" { zr, err := common.GetGzipReader(reader) if err != nil { - httpserver.Errorf(w, r, "cannot read gzipped request: %s", err) + httpserver.Errorf(w, r, "cannot initialize gzip reader: %s", err) return true } defer common.PutGzipReader(zr) reader = zr } + wcr := writeconcurrencylimiter.GetReader(reader) + data, err := io.ReadAll(wcr) + writeconcurrencylimiter.PutReader(wcr) + if err != nil { + httpserver.Errorf(w, r, "cannot read request body: %s", err) + return true + } + cp, err := getCommonParams(r) if err != nil { - httpserver.Errorf(w, r, "cannot parse request: %s", err) + httpserver.Errorf(w, r, "cannot parse common params from request: %s", err) return true } lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields) - defer logstorage.PutLogRows(lr) - processLogMessage := cp.GetProcessLogMessageFunc(lr) - n, err := processJSONRequest(reader, processLogMessage) + n, err := parseJSONRequest(data, processLogMessage) + vlstorage.MustAddRows(lr) + logstorage.PutLogRows(lr) if err != nil { - httpserver.Errorf(w, r, "cannot decode loki request: %s", err) + httpserver.Errorf(w, r, "cannot parse Loki request: %s", err) return true } - rowsIngestedTotalJSON.Add(n) + rowsIngestedJSONTotal.Add(n) return true } -func processJSONRequest(r io.Reader, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) { - wcr := writeconcurrencylimiter.GetReader(r) - defer writeconcurrencylimiter.PutReader(wcr) - - bytes, err := io.ReadAll(wcr) - if err != nil { - return 0, fmt.Errorf("cannot read request body: %w", err) - } - +func parseJSONRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) { p := parserPool.Get() defer parserPool.Put(p) - v, err := p.ParseBytes(bytes) + v, err := p.ParseBytes(data) if err != nil { - return 0, fmt.Errorf("cannot parse request body: %w", err) + return 0, fmt.Errorf("cannot parse JSON request body: %w", err) } + streamsV := v.Get("streams") + if streamsV == nil { + return 0, fmt.Errorf("missing `streams` item in the parsed JSON: %q", v) + } + streams, err := streamsV.Array() + if err != nil { + return 0, fmt.Errorf("`streams` item in the parsed JSON must contain an array; got %q", streamsV) + } + + currentTimestamp := time.Now().UnixNano() var commonFields []logstorage.Field rowsIngested := 0 - for stIdx, st := range v.GetArray("streams") { - // `stream` contains labels for the stream. - // Labels are same for all entries in the stream. - logFields := st.GetObject("stream") - if logFields == nil { - logger.Warnf("missing streams field from %q", st) - logFields = &fastjson.Object{} - } - commonFields = slicesutil.ResizeNoCopyMayOverallocate(commonFields, logFields.Len()+1) - i := 0 - logFields.Visit(func(k []byte, v *fastjson.Value) { - sfName := bytesutil.ToUnsafeString(k) - sfValue := bytesutil.ToUnsafeString(v.GetStringBytes()) - commonFields[i].Name = sfName - commonFields[i].Value = sfValue - i++ - }) - msgFieldIdx := logFields.Len() - commonFields[msgFieldIdx].Name = msgField - - for idx, v := range st.GetArray("values") { - vs := v.GetArray() - if len(vs) != 2 { - return rowsIngested, fmt.Errorf("unexpected number of values in stream %d line %d: %q; got %d; want %d", stIdx, idx, v, len(vs), 2) - } - - tsString := bytesutil.ToUnsafeString(vs[0].GetStringBytes()) - ts, err := parseLokiTimestamp(tsString) + for _, stream := range streams { + // populate common labels from `stream` dict + commonFields = commonFields[:0] + labelsV := stream.Get("stream") + var labels *fastjson.Object + if labelsV != nil { + o, err := labelsV.Object() if err != nil { - return rowsIngested, fmt.Errorf("cannot parse timestamp in stream %d line %d: %q: %s", stIdx, idx, vs, err) + return rowsIngested, fmt.Errorf("`stream` item in the parsed JSON must contain an object; got %q", labelsV) + } + labels = o + } + labels.Visit(func(k []byte, v *fastjson.Value) { + if err != nil { + return + } + vStr, errLocal := v.StringBytes() + if errLocal != nil { + err = fmt.Errorf("unexpected label value type for %q:%q; want string", k, v) + return + } + commonFields = append(commonFields, logstorage.Field{ + Name: bytesutil.ToUnsafeString(k), + Value: bytesutil.ToUnsafeString(vStr), + }) + }) + if err != nil { + return rowsIngested, fmt.Errorf("error when parsing `stream` object: %w", err) + } + + // populate messages from `values` array + linesV := stream.Get("values") + if linesV == nil { + return rowsIngested, fmt.Errorf("missing `values` item in the parsed JSON %q", stream) + } + lines, err := linesV.Array() + if err != nil { + return rowsIngested, fmt.Errorf("`values` item in the parsed JSON must contain an array; got %q", linesV) + } + + fields := commonFields + for _, line := range lines { + lineA, err := line.Array() + if err != nil { + return rowsIngested, fmt.Errorf("unexpected contents of `values` item; want array; got %q", line) + } + if len(lineA) != 2 { + return rowsIngested, fmt.Errorf("unexpected number of values in `values` item array %q; got %d want 2", line, len(lineA)) } - commonFields[msgFieldIdx].Value = bytesutil.ToUnsafeString(vs[1].GetStringBytes()) - processLogMessage(ts, commonFields) + // parse timestamp + timestamp, err := lineA[0].StringBytes() + if err != nil { + return rowsIngested, fmt.Errorf("unexpected log timestamp type for %q; want string", lineA[0]) + } + ts, err := parseLokiTimestamp(bytesutil.ToUnsafeString(timestamp)) + if err != nil { + return rowsIngested, fmt.Errorf("cannot parse log timestamp %q: %w", timestamp, err) + } + if ts == 0 { + ts = currentTimestamp + } + + // parse log message + msg, err := lineA[1].StringBytes() + if err != nil { + return rowsIngested, fmt.Errorf("unexpected log message type for %q; want string", lineA[1]) + } + + fields = append(fields[:len(commonFields)], logstorage.Field{ + Name: "_msg", + Value: bytesutil.ToUnsafeString(msg), + }) + processLogMessage(ts, fields) - rowsIngested++ } + rowsIngested += len(lines) } return rowsIngested, nil } func parseLokiTimestamp(s string) (int64, error) { - // Parsing timestamp in nanoseconds + if s == "" { + // Special case - an empty timestamp must be substituted with the current time by the caller. + return 0, nil + } n, err := strconv.ParseInt(s, 10, 64) if err != nil { - return 0, fmt.Errorf("cannot parse timestamp in nanoseconds from %q: %w", s, err) - } - if n > int64(math.MaxInt64) { - return 0, fmt.Errorf("too big timestamp in nanoseconds: %d; mustn't exceed %d", n, math.MaxInt64) + // Fall back to parsing floating-point value + f, err := strconv.ParseFloat(s, 64) + if err != nil { + return 0, err + } + if f > math.MaxInt64 { + return 0, fmt.Errorf("too big timestamp in nanoseconds: %v; mustn't exceed %v", f, math.MaxInt64) + } + if f < math.MinInt64 { + return 0, fmt.Errorf("too small timestamp in nanoseconds: %v; must be bigger or equal to %v", f, math.MinInt64) + } + n = int64(f) } if n < 0 { - return 0, fmt.Errorf("too small timestamp in nanoseconds: %d; must be bigger than %d", n, 0) + return 0, fmt.Errorf("too small timestamp in nanoseconds: %d; must be bigger than 0", n) } return n, nil } diff --git a/app/vlinsert/loki/loki_json_test.go b/app/vlinsert/loki/loki_json_test.go index 5588035c6..93cf8652a 100644 --- a/app/vlinsert/loki/loki_json_test.go +++ b/app/vlinsert/loki/loki_json_test.go @@ -1,99 +1,130 @@ package loki import ( - "reflect" + "fmt" "strings" "testing" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" ) -func TestProcessJSONRequest(t *testing.T) { - type item struct { - ts int64 - fields []logstorage.Field - } - - same := func(s string, expected []item) { +func TestParseJSONRequestFailure(t *testing.T) { + f := func(s string) { t.Helper() - r := strings.NewReader(s) - actual := make([]item, 0) - n, err := processJSONRequest(r, func(timestamp int64, fields []logstorage.Field) { - actual = append(actual, item{ - ts: timestamp, - fields: fields, - }) + n, err := parseJSONRequest([]byte(s), func(timestamp int64, fields []logstorage.Field) { + t.Fatalf("unexpected call to parseJSONRequest callback!") }) - - if err != nil { - t.Fatalf("unexpected error: %s", err) - } - - if len(actual) != len(expected) || n != len(expected) { - t.Fatalf("unexpected len(actual)=%d; expecting %d", len(actual), len(expected)) - } - - for i, actualItem := range actual { - expectedItem := expected[i] - if actualItem.ts != expectedItem.ts { - t.Fatalf("unexpected timestamp for item #%d; got %d; expecting %d", i, actualItem.ts, expectedItem.ts) - } - if !reflect.DeepEqual(actualItem.fields, expectedItem.fields) { - t.Fatalf("unexpected fields for item #%d; got %v; expecting %v", i, actualItem.fields, expectedItem.fields) - } - } - } - - fail := func(s string) { - t.Helper() - r := strings.NewReader(s) - actual := make([]item, 0) - _, err := processJSONRequest(r, func(timestamp int64, fields []logstorage.Field) { - actual = append(actual, item{ - ts: timestamp, - fields: fields, - }) - }) - if err == nil { - t.Fatalf("expected to fail with body: %q", s) + t.Fatalf("expecting non-nil error") + } + if n != 0 { + t.Fatalf("unexpected number of parsed lines: %d; want 0", n) } - } + f(``) - same(`{"streams":[{"stream":{"foo":"bar"},"values":[["1577836800000000000","baz"]]}]}`, []item{ - { - ts: 1577836800000000000, - fields: []logstorage.Field{ - { - Name: "foo", - Value: "bar", - }, - { - Name: "_msg", - Value: "baz", - }, - }, - }, - }) + // Invalid json + f(`{}`) + f(`[]`) + f(`"foo"`) + f(`123`) - fail(``) - fail(`{"streams":[{"stream":{"foo" = "bar"},"values":[["1577836800000000000","baz"]]}]}`) - fail(`{"streams":[{"stream":{"foo": "bar"}`) + // invalid type for `streams` item + f(`{"streams":123}`) + + // Missing `values` item + f(`{"streams":[{}]}`) + + // Invalid type for `values` item + f(`{"streams":[{"values":"foobar"}]}`) + + // Invalid type for `stream` item + f(`{"streams":[{"stream":[],"values":[]}]}`) + + // Invalid type for `values` individual item + f(`{"streams":[{"values":[123]}]}`) + + // Invalid length of `values` individual item + f(`{"streams":[{"values":[[]]}]}`) + f(`{"streams":[{"values":[["123"]]}]}`) + f(`{"streams":[{"values":[["123","456","789"]]}]}`) + + // Invalid type for timestamp inside `values` individual item + f(`{"streams":[{"values":[[123,"456"]}]}`) + + // Invalid type for log message + f(`{"streams":[{"values":[["123",1234]]}]}`) } -func Test_parseLokiTimestamp(t *testing.T) { - f := func(s string, expected int64) { +func TestParseJSONRequestSuccess(t *testing.T) { + f := func(s string, resultExpected string) { t.Helper() - actual, err := parseLokiTimestamp(s) + var lines []string + n, err := parseJSONRequest([]byte(s), func(timestamp int64, fields []logstorage.Field) { + var a []string + for _, f := range fields { + a = append(a, f.String()) + } + line := fmt.Sprintf("_time:%d %s", timestamp, strings.Join(a, " ")) + lines = append(lines, line) + }) if err != nil { t.Fatalf("unexpected error: %s", err) } - if actual != expected { - t.Fatalf("unexpected timestamp; got %d; expecting %d", actual, expected) + if n != len(lines) { + t.Fatalf("unexpected number of lines parsed; got %d; want %d", n, len(lines)) + } + result := strings.Join(lines, "\n") + if result != resultExpected { + t.Fatalf("unexpected result;\ngot\n%s\nwant\n%s", result, resultExpected) } } - f("1687510468000000000", 1687510468000000000) - f("1577836800000000000", 1577836800000000000) + // Empty streams + f(`{"streams":[]}`, ``) + f(`{"streams":[{"values":[]}]}`, ``) + f(`{"streams":[{"stream":{},"values":[]}]}`, ``) + f(`{"streams":[{"stream":{"foo":"bar"},"values":[]}]}`, ``) + + // Empty stream labels + f(`{"streams":[{"values":[["1577836800000000001", "foo bar"]]}]}`, `_time:1577836800000000001 "_msg":"foo bar"`) + f(`{"streams":[{"stream":{},"values":[["1577836800000000001", "foo bar"]]}]}`, `_time:1577836800000000001 "_msg":"foo bar"`) + + // Non-empty stream labels + f(`{"streams":[{"stream":{ + "label1": "value1", + "label2": "value2" +},"values":[ + ["1577836800000000001", "foo bar"], + ["1477836900005000002", "abc"], + ["147.78369e9", "foobar"] +]}]}`, `_time:1577836800000000001 "label1":"value1" "label2":"value2" "_msg":"foo bar" +_time:1477836900005000002 "label1":"value1" "label2":"value2" "_msg":"abc" +_time:147783690000 "label1":"value1" "label2":"value2" "_msg":"foobar"`) + + // Multiple streams + f(`{ + "streams": [ + { + "stream": { + "foo": "bar", + "a": "b" + }, + "values": [ + ["1577836800000000001", "foo bar"], + ["1577836900005000002", "abc"] + ] + }, + { + "stream": { + "x": "y" + }, + "values": [ + ["1877836900005000002", "yx"] + ] + } + ] +}`, `_time:1577836800000000001 "foo":"bar" "a":"b" "_msg":"foo bar" +_time:1577836900005000002 "foo":"bar" "a":"b" "_msg":"abc" +_time:1877836900005000002 "x":"y" "_msg":"yx"`) } diff --git a/app/vlinsert/loki/loki_json_timing_test.go b/app/vlinsert/loki/loki_json_timing_test.go index 7da50b2fe..9c51f593a 100644 --- a/app/vlinsert/loki/loki_json_timing_test.go +++ b/app/vlinsert/loki/loki_json_timing_test.go @@ -3,71 +3,76 @@ package loki import ( "fmt" "strconv" - "strings" "testing" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" ) -func BenchmarkProcessJSONRequest(b *testing.B) { +func BenchmarkParseJSONRequest(b *testing.B) { for _, streams := range []int{5, 10} { for _, rows := range []int{100, 1000} { for _, labels := range []int{10, 50} { b.Run(fmt.Sprintf("streams_%d/rows_%d/labels_%d", streams, rows, labels), func(b *testing.B) { - benchmarkProcessJSONRequest(b, streams, rows, labels) + benchmarkParseJSONRequest(b, streams, rows, labels) }) } } } } -func benchmarkProcessJSONRequest(b *testing.B, streams, rows, labels int) { - s := getJSONBody(streams, rows, labels) +func benchmarkParseJSONRequest(b *testing.B, streams, rows, labels int) { b.ReportAllocs() - b.SetBytes(int64(len(s))) + b.SetBytes(int64(streams * rows)) b.RunParallel(func(pb *testing.PB) { + data := getJSONBody(streams, rows, labels) for pb.Next() { - _, err := processJSONRequest(strings.NewReader(s), func(timestamp int64, fields []logstorage.Field) {}) + _, err := parseJSONRequest(data, func(timestamp int64, fields []logstorage.Field) {}) if err != nil { - b.Fatalf("unexpected error: %s", err) + panic(fmt.Errorf("unexpected error: %s", err)) } } }) } -func getJSONBody(streams, rows, labels int) string { - body := `{"streams":[` +func getJSONBody(streams, rows, labels int) []byte { + body := append([]byte{}, `{"streams":[`...) now := time.Now().UnixNano() valuePrefix := fmt.Sprintf(`["%d","value_`, now) for i := 0; i < streams; i++ { - body += `{"stream":{` + body = append(body, `{"stream":{`...) for j := 0; j < labels; j++ { - body += `"label_` + strconv.Itoa(j) + `":"value_` + strconv.Itoa(j) + `"` + body = append(body, `"label_`...) + body = strconv.AppendInt(body, int64(j), 10) + body = append(body, `":"value_`...) + body = strconv.AppendInt(body, int64(j), 10) + body = append(body, '"') if j < labels-1 { - body += `,` + body = append(body, ',') } } - body += `}, "values":[` + body = append(body, `}, "values":[`...) for j := 0; j < rows; j++ { - body += valuePrefix + strconv.Itoa(j) + `"]` + body = append(body, valuePrefix...) + body = strconv.AppendInt(body, int64(j), 10) + body = append(body, `"]`...) if j < rows-1 { - body += `,` + body = append(body, ',') } } - body += `]}` + body = append(body, `]}`...) if i < streams-1 { - body += `,` + body = append(body, ',') } } - body += `]}` + body = append(body, `]}`...) return body } diff --git a/app/vlinsert/loki/loki_protobuf.go b/app/vlinsert/loki/loki_protobuf.go index a55cad138..a75bf93d0 100644 --- a/app/vlinsert/loki/loki_protobuf.go +++ b/app/vlinsert/loki/loki_protobuf.go @@ -4,68 +4,66 @@ import ( "fmt" "io" "net/http" + "strconv" + "strings" "sync" + "time" - "github.com/golang/snappy" - - "github.com/VictoriaMetrics/metrics" - "github.com/VictoriaMetrics/metricsql" - + "github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" + "github.com/VictoriaMetrics/metrics" + "github.com/golang/snappy" ) var ( - rowsIngestedTotalProtobuf = metrics.NewCounter(`vl_rows_ingested_total{type="loki", format="protobuf"}`) + rowsIngestedProtobufTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="protobuf"}`) bytesBufPool bytesutil.ByteBufferPool pushReqsPool sync.Pool ) func handleProtobuf(r *http.Request, w http.ResponseWriter) bool { wcr := writeconcurrencylimiter.GetReader(r.Body) - defer writeconcurrencylimiter.PutReader(wcr) + data, err := io.ReadAll(wcr) + writeconcurrencylimiter.PutReader(wcr) + if err != nil { + httpserver.Errorf(w, r, "cannot read request body: %s", err) + return true + } cp, err := getCommonParams(r) if err != nil { - httpserver.Errorf(w, r, "cannot parse request: %s", err) + httpserver.Errorf(w, r, "cannot parse common params from request: %s", err) return true } lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields) - defer logstorage.PutLogRows(lr) - processLogMessage := cp.GetProcessLogMessageFunc(lr) - n, err := processProtobufRequest(wcr, processLogMessage) + n, err := parseProtobufRequest(data, processLogMessage) + vlstorage.MustAddRows(lr) + logstorage.PutLogRows(lr) if err != nil { - httpserver.Errorf(w, r, "cannot decode loki request: %s", err) + httpserver.Errorf(w, r, "cannot parse loki request: %s", err) return true } - - rowsIngestedTotalProtobuf.Add(n) - + rowsIngestedProtobufTotal.Add(n) return true } -func processProtobufRequest(r io.Reader, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) { - wcr := writeconcurrencylimiter.GetReader(r) - defer writeconcurrencylimiter.PutReader(wcr) - - bytes, err := io.ReadAll(wcr) - if err != nil { - return 0, fmt.Errorf("cannot read request body: %s", err) - } - +func parseProtobufRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) { bb := bytesBufPool.Get() defer bytesBufPool.Put(bb) - bb.B, err = snappy.Decode(bb.B[:cap(bb.B)], bytes) - if err != nil { - return 0, fmt.Errorf("cannot decode snappy from request body: %s", err) - } - req := getPushReq() - defer putPushReq(req) + buf, err := snappy.Decode(bb.B[:cap(bb.B)], data) + if err != nil { + return 0, fmt.Errorf("cannot decode snappy-encoded request body: %w", err) + } + bb.B = buf + + req := getPushRequest() + defer putPushRequest(req) + err = req.Unmarshal(bb.B) if err != nil { return 0, fmt.Errorf("cannot parse request body: %s", err) @@ -73,59 +71,93 @@ func processProtobufRequest(r io.Reader, processLogMessage func(timestamp int64, var commonFields []logstorage.Field rowsIngested := 0 - for stIdx, st := range req.Streams { + streams := req.Streams + currentTimestamp := time.Now().UnixNano() + for i := range streams { + stream := &streams[i] // st.Labels contains labels for the stream. // Labels are same for all entries in the stream. - commonFields, err = parseLogFields(st.Labels, commonFields) + commonFields, err = parsePromLabels(commonFields[:0], stream.Labels) if err != nil { - return rowsIngested, fmt.Errorf("failed to unmarshal labels in stream %d: %q; %s", stIdx, st.Labels, err) + return rowsIngested, fmt.Errorf("cannot parse stream labels %q: %s", stream.Labels, err) } - msgFieldIDx := len(commonFields) - 1 - commonFields[msgFieldIDx].Name = msgField + fields := commonFields - for _, v := range st.Entries { - commonFields[msgFieldIDx].Value = v.Line - processLogMessage(v.Timestamp.UnixNano(), commonFields) - rowsIngested++ + entries := stream.Entries + for j := range entries { + entry := &entries[j] + fields = append(fields[:len(commonFields)], logstorage.Field{ + Name: "_msg", + Value: entry.Line, + }) + ts := entry.Timestamp.UnixNano() + if ts == 0 { + ts = currentTimestamp + } + processLogMessage(ts, fields) } + rowsIngested += len(stream.Entries) } return rowsIngested, nil } -// Parses logs fields s and returns the corresponding log fields. -// Cannot use searchutils.ParseMetricSelector here because its dependencies -// bring flags which clashes with logstorage flags. +// parsePromLabels parses log fields in Prometheus text exposition format from s, appends them to dst and returns the result. // -// Loki encodes labels in the PromQL labels format. // See test data of promtail for examples: https://github.com/grafana/loki/blob/a24ef7b206e0ca63ee74ca6ecb0a09b745cd2258/pkg/push/types_test.go -func parseLogFields(s string, dst []logstorage.Field) ([]logstorage.Field, error) { - expr, err := metricsql.Parse(s) - if err != nil { - return nil, err +func parsePromLabels(dst []logstorage.Field, s string) ([]logstorage.Field, error) { + // Make sure s is wrapped into `{...}` + s = strings.TrimSpace(s) + if len(s) < 2 { + return nil, fmt.Errorf("too short string to parse: %q", s) } - - me, ok := expr.(*metricsql.MetricExpr) - if !ok { - return nil, fmt.Errorf("failed to parse stream labels; got %q", expr.AppendString(nil)) + if s[0] != '{' { + return nil, fmt.Errorf("missing `{` at the beginning of %q", s) } - - // Expecting only label filters without MetricsQL "or" operator. - if len(me.LabelFilterss) != 1 { - return nil, fmt.Errorf("unexpected format of log fields; got %q", s) + if s[len(s)-1] != '}' { + return nil, fmt.Errorf("missing `}` at the end of %q", s) } + s = s[1 : len(s)-1] - // Allocate space for labels + msg field. - // Msg field is added by caller. - dst = slicesutil.ResizeNoCopyMayOverallocate(dst, len(me.LabelFilterss[0])) - for i, l := range me.LabelFilterss[0] { - dst[i].Name = l.Label - dst[i].Value = l.Value + for len(s) > 0 { + // Parse label name + n := strings.IndexByte(s, '=') + if n < 0 { + return nil, fmt.Errorf("cannot find `=` char for label value at %s", s) + } + name := s[:n] + s = s[n+1:] + + // Parse label value + qs, err := strconv.QuotedPrefix(s) + if err != nil { + return nil, fmt.Errorf("cannot parse value for label %q at %s: %w", name, s, err) + } + s = s[len(qs):] + value, err := strconv.Unquote(qs) + if err != nil { + return nil, fmt.Errorf("cannot unquote value %q for label %q: %w", qs, name, err) + } + + // Append the found field to dst. + dst = append(dst, logstorage.Field{ + Name: name, + Value: value, + }) + + // Check whether there are other labels remaining + if len(s) == 0 { + break + } + if !strings.HasPrefix(s, ",") { + return nil, fmt.Errorf("missing `,` char at %s", s) + } + s = s[1:] + s = strings.TrimPrefix(s, " ") } - return dst, nil } -func getPushReq() *PushRequest { +func getPushRequest() *PushRequest { v := pushReqsPool.Get() if v == nil { return &PushRequest{} @@ -133,7 +165,7 @@ func getPushReq() *PushRequest { return v.(*PushRequest) } -func putPushReq(reqs *PushRequest) { - reqs.Reset() - pushReqsPool.Put(reqs) +func putPushRequest(req *PushRequest) { + req.Reset() + pushReqsPool.Put(req) } diff --git a/app/vlinsert/loki/loki_protobuf_test.go b/app/vlinsert/loki/loki_protobuf_test.go index e590668ee..f6eb5f0ec 100644 --- a/app/vlinsert/loki/loki_protobuf_test.go +++ b/app/vlinsert/loki/loki_protobuf_test.go @@ -1,50 +1,171 @@ package loki import ( - "bytes" - "strconv" + "fmt" + "strings" "testing" "time" - "github.com/golang/snappy" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" + "github.com/golang/snappy" ) -func TestProcessProtobufRequest(t *testing.T) { - body := getProtobufBody(5, 5, 5) - - reader := bytes.NewReader(body) - _, err := processProtobufRequest(reader, func(timestamp int64, fields []logstorage.Field) {}) - if err != nil { - t.Fatalf("unexpected error: %s", err) - } -} - -func getProtobufBody(streams, rows, labels int) []byte { - var pr PushRequest - - for i := 0; i < streams; i++ { - var st Stream - - st.Labels = `{` - for j := 0; j < labels; j++ { - st.Labels += `label_` + strconv.Itoa(j) + `="value_` + strconv.Itoa(j) + `"` - if j < labels-1 { - st.Labels += `,` +func TestParseProtobufRequestSuccess(t *testing.T) { + f := func(s string, resultExpected string) { + t.Helper() + var pr PushRequest + n, err := parseJSONRequest([]byte(s), func(timestamp int64, fields []logstorage.Field) { + msg := "" + for _, f := range fields { + if f.Name == "_msg" { + msg = f.Value + } } + var a []string + for _, f := range fields { + if f.Name == "_msg" { + continue + } + item := fmt.Sprintf("%s=%q", f.Name, f.Value) + a = append(a, item) + } + labels := "{" + strings.Join(a, ", ") + "}" + pr.Streams = append(pr.Streams, Stream{ + Labels: labels, + Entries: []Entry{ + { + Timestamp: time.Unix(0, timestamp), + Line: msg, + }, + }, + }) + }) + if err != nil { + t.Fatalf("unexpected error: %s", err) } - st.Labels += `}` - - for j := 0; j < rows; j++ { - st.Entries = append(st.Entries, Entry{Timestamp: time.Now(), Line: "value_" + strconv.Itoa(j)}) + if n != len(pr.Streams) { + t.Fatalf("unexpected number of streams; got %d; want %d", len(pr.Streams), n) } - pr.Streams = append(pr.Streams, st) + data, err := pr.Marshal() + if err != nil { + t.Fatalf("unexpected error when marshaling PushRequest: %s", err) + } + encodedData := snappy.Encode(nil, data) + + var lines []string + n, err = parseProtobufRequest(encodedData, func(timestamp int64, fields []logstorage.Field) { + var a []string + for _, f := range fields { + a = append(a, f.String()) + } + line := fmt.Sprintf("_time:%d %s", timestamp, strings.Join(a, " ")) + lines = append(lines, line) + }) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if n != len(lines) { + t.Fatalf("unexpected number of lines parsed; got %d; want %d", n, len(lines)) + } + result := strings.Join(lines, "\n") + if result != resultExpected { + t.Fatalf("unexpected result;\ngot\n%s\nwant\n%s", result, resultExpected) + } } - body, _ := pr.Marshal() - encodedBody := snappy.Encode(nil, body) + // Empty streams + f(`{"streams":[]}`, ``) + f(`{"streams":[{"values":[]}]}`, ``) + f(`{"streams":[{"stream":{},"values":[]}]}`, ``) + f(`{"streams":[{"stream":{"foo":"bar"},"values":[]}]}`, ``) - return encodedBody + // Empty stream labels + f(`{"streams":[{"values":[["1577836800000000001", "foo bar"]]}]}`, `_time:1577836800000000001 "_msg":"foo bar"`) + f(`{"streams":[{"stream":{},"values":[["1577836800000000001", "foo bar"]]}]}`, `_time:1577836800000000001 "_msg":"foo bar"`) + + // Non-empty stream labels + f(`{"streams":[{"stream":{ + "label1": "value1", + "label2": "value2" +},"values":[ + ["1577836800000000001", "foo bar"], + ["1477836900005000002", "abc"], + ["147.78369e9", "foobar"] +]}]}`, `_time:1577836800000000001 "label1":"value1" "label2":"value2" "_msg":"foo bar" +_time:1477836900005000002 "label1":"value1" "label2":"value2" "_msg":"abc" +_time:147783690000 "label1":"value1" "label2":"value2" "_msg":"foobar"`) + + // Multiple streams + f(`{ + "streams": [ + { + "stream": { + "foo": "bar", + "a": "b" + }, + "values": [ + ["1577836800000000001", "foo bar"], + ["1577836900005000002", "abc"] + ] + }, + { + "stream": { + "x": "y" + }, + "values": [ + ["1877836900005000002", "yx"] + ] + } + ] +}`, `_time:1577836800000000001 "foo":"bar" "a":"b" "_msg":"foo bar" +_time:1577836900005000002 "foo":"bar" "a":"b" "_msg":"abc" +_time:1877836900005000002 "x":"y" "_msg":"yx"`) +} + +func TestParsePromLabelsSuccess(t *testing.T) { + f := func(s string) { + t.Helper() + fields, err := parsePromLabels(nil, s) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + var a []string + for _, f := range fields { + a = append(a, fmt.Sprintf("%s=%q", f.Name, f.Value)) + } + result := "{" + strings.Join(a, ", ") + "}" + if result != s { + t.Fatalf("unexpected result;\ngot\n%s\nwant\n%s", result, s) + } + } + + f("{}") + f(`{foo="bar"}`) + f(`{foo="bar", baz="x", y="z"}`) + f(`{foo="ba\"r\\z\n", a="", b="\"\\"}`) +} + +func TestParsePromLabelsFailure(t *testing.T) { + f := func(s string) { + t.Helper() + fields, err := parsePromLabels(nil, s) + if err == nil { + t.Fatalf("expecting non-nil error") + } + if len(fields) > 0 { + t.Fatalf("unexpected non-empty fields: %s", fields) + } + } + + f("") + f("{") + f(`{foo}`) + f(`{foo=bar}`) + f(`{foo="bar}`) + f(`{foo="ba\",r}`) + f(`{foo="bar" baz="aa"}`) + f(`foobar`) + f(`foo{bar="baz"}`) } diff --git a/app/vlinsert/loki/loki_protobuf_timing_test.go b/app/vlinsert/loki/loki_protobuf_timing_test.go index 0da5fe741..18f5b89ef 100644 --- a/app/vlinsert/loki/loki_protobuf_timing_test.go +++ b/app/vlinsert/loki/loki_protobuf_timing_test.go @@ -1,35 +1,65 @@ package loki import ( - "bytes" "fmt" + "strconv" "testing" + "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" + "github.com/golang/snappy" ) -func BenchmarkProcessProtobufRequest(b *testing.B) { +func BenchmarkParseProtobufRequest(b *testing.B) { for _, streams := range []int{5, 10} { for _, rows := range []int{100, 1000} { for _, labels := range []int{10, 50} { b.Run(fmt.Sprintf("streams_%d/rows_%d/labels_%d", streams, rows, labels), func(b *testing.B) { - benchmarkProcessProtobufRequest(b, streams, rows, labels) + benchmarkParseProtobufRequest(b, streams, rows, labels) }) } } } } -func benchmarkProcessProtobufRequest(b *testing.B, streams, rows, labels int) { - body := getProtobufBody(streams, rows, labels) +func benchmarkParseProtobufRequest(b *testing.B, streams, rows, labels int) { b.ReportAllocs() - b.SetBytes(int64(len(body))) + b.SetBytes(int64(streams * rows)) b.RunParallel(func(pb *testing.PB) { + body := getProtobufBody(streams, rows, labels) for pb.Next() { - _, err := processProtobufRequest(bytes.NewBuffer(body), func(timestamp int64, fields []logstorage.Field) {}) + _, err := parseProtobufRequest(body, func(timestamp int64, fields []logstorage.Field) {}) if err != nil { - b.Fatalf("unexpected error: %s", err) + panic(fmt.Errorf("unexpected error: %s", err)) } } }) } + +func getProtobufBody(streams, rows, labels int) []byte { + var pr PushRequest + + for i := 0; i < streams; i++ { + var st Stream + + st.Labels = `{` + for j := 0; j < labels; j++ { + st.Labels += `label_` + strconv.Itoa(j) + `="value_` + strconv.Itoa(j) + `"` + if j < labels-1 { + st.Labels += `,` + } + } + st.Labels += `}` + + for j := 0; j < rows; j++ { + st.Entries = append(st.Entries, Entry{Timestamp: time.Now(), Line: "value_" + strconv.Itoa(j)}) + } + + pr.Streams = append(pr.Streams, st) + } + + body, _ := pr.Marshal() + encodedBody := snappy.Encode(nil, body) + + return encodedBody +} diff --git a/deployment/docker/victorialogs/promtail/config.yml b/deployment/docker/victorialogs/promtail/config.yml index 3587c8dae..4a5974a7d 100644 --- a/deployment/docker/victorialogs/promtail/config.yml +++ b/deployment/docker/victorialogs/promtail/config.yml @@ -6,7 +6,7 @@ positions: filename: /tmp/positions.yaml clients: - - url: http://vlogs:9428/insert/loki/api/v1/push?_stream_fields=filename,job,stream,host,app,pid + - url: http://vlogs:9428/insert/loki/api/v1/push?_stream_fields=instance,job,host,app,pid tenant_id: "0:0" scrape_configs: diff --git a/deployment/docker/victorialogs/promtail/docker-compose.yml b/deployment/docker/victorialogs/promtail/docker-compose.yml index cd9e6c0ad..1e05a2f8f 100644 --- a/deployment/docker/victorialogs/promtail/docker-compose.yml +++ b/deployment/docker/victorialogs/promtail/docker-compose.yml @@ -13,7 +13,7 @@ services: # Run `make package-victoria-logs` to build victoria-logs image vlogs: - image: docker.io/victoriametrics/victoria-logs:latest + image: docker.io/victoriametrics/victoria-logs:v0.2.0-victorialogs volumes: - victorialogs-promtail-docker:/vlogs ports: diff --git a/docs/VictoriaLogs/data-ingestion/Promtail.md b/docs/VictoriaLogs/data-ingestion/Promtail.md index 7aa3850b4..7d72e9a99 100644 --- a/docs/VictoriaLogs/data-ingestion/Promtail.md +++ b/docs/VictoriaLogs/data-ingestion/Promtail.md @@ -1,16 +1,20 @@ # Promtail setup +[Promtail](https://grafana.com/docs/loki/latest/clients/promtail/) is a default log shipper for Grafana Loki. +Promtail can be configured to send the collected logs to VictoriaLogs according to the following docs. + Specify [`clients`](https://grafana.com/docs/loki/latest/clients/promtail/configuration/#clients) section in the configuration file for sending the collected logs to [VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/): ```yaml clients: - - url: http://vlogs:9428/insert/loki/api/v1/push?_stream_fields=filename,job,stream,host,app,pid + - url: http://localhost:9428/insert/loki/api/v1/push?_stream_fields=instance,job,host,app,pid ``` -Substitute `vlogs:9428` address inside `clients` with the real TCP address of VictoriaLogs. +Substitute `localhost:9428` address inside `clients` with the real TCP address of VictoriaLogs. See [these docs](https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/#http-parameters) for details on the used URL query parameter section. +There is no need in specifying `_msg_field` and `_time_field` query args, since VictoriaLogs automatically extracts log message and timestamp from the ingested Loki data. It is recommended verifying whether the initial setup generates the needed [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) and uses the correct [stream fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields). @@ -19,27 +23,28 @@ and inspecting VictoriaLogs logs then: ```yaml clients: - - url: http://vlogs:9428/insert/loki/api/v1/push?_stream_fields=filename,job,stream,host,app,pid&debug=1 + - url: http://localhost:9428/insert/loki/api/v1/push?_stream_fields=instance,job,host,app,pid&debug=1 ``` If some [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) must be skipped during data ingestion, then they can be put into `ignore_fields` [parameter](https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/#http-parameters). -For example, the following config instructs VictoriaLogs to ignore `log.offset` and `event.original` fields in the ingested logs: +For example, the following config instructs VictoriaLogs to ignore `filename` and `stream` fields in the ingested logs: ```yaml clients: - - url: http://vlogs:9428/insert/loki/api/v1/push?_stream_fields=filename,job,stream,host,app,pid&debug=1 + - url: http://localhost:9428/insert/loki/api/v1/push?_stream_fields=instance,job,host,app,pid&ignore_fields=filename,stream ``` By default the ingested logs are stored in the `(AccountID=0, ProjectID=0)` [tenant](https://docs.victoriametrics.com/VictoriaLogs/#multitenancy). -If you need storing logs in other tenant, then It is possible to either use `tenant_id` provided by Loki configuration, or to use `headers` and provide -`AccountID` and `ProjectID` headers. Format for `tenant_id` is `AccountID:ProjectID`. -For example, the following config instructs VictoriaLogs to store logs in the `(AccountID=12, ProjectID=12)` tenant: +If you need storing logs in other tenant, then specify the needed tenant via `tenant_id` field +in the [Loki client configuration](https://grafana.com/docs/loki/latest/clients/promtail/configuration/#clients) +The `tenant_id` must have `AccountID:ProjectID` format, where `AccountID` and `ProjectID` are arbitrary uint32 numbers. +For example, the following config instructs VictoriaLogs to store logs in the `(AccountID=12, ProjectID=34)` [tenant](https://docs.victoriametrics.com/VictoriaLogs/#multitenancy): ```yaml clients: - - url: http://vlogs:9428/insert/loki/api/v1/push?_stream_fields=filename,job,stream,host,app,pid&debug=1 - tenant_id: "12:12" + - url: http://localhost:9428/insert/loki/api/v1/push?_stream_fields=instance,job,host,app,pid&debug=1 + tenant_id: "12:34" ``` The ingested log entries can be queried according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/querying/). diff --git a/docs/VictoriaLogs/data-ingestion/README.md b/docs/VictoriaLogs/data-ingestion/README.md index 6b02566cd..925106100 100644 --- a/docs/VictoriaLogs/data-ingestion/README.md +++ b/docs/VictoriaLogs/data-ingestion/README.md @@ -17,7 +17,7 @@ menu: - Fluentbit. See [how to setup Fluentbit for sending logs to VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/Fluentbit.html). - Logstash. See [how to setup Logstash for sending logs to VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/Logstash.html). - Vector. See [how to setup Vector for sending logs to VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/Vector.html). -- Promtail. See [how to setup Promtail for sending logs to VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/Promtail.html). +- Promtail (aka Grafana Loki). See [how to setup Promtail for sending logs to VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/Promtail.html). The ingested logs can be queried according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/querying/). @@ -33,7 +33,7 @@ VictoriaLogs supports the following data ingestion HTTP APIs: - Elasticsearch bulk API. See [these docs](#elasticsearch-bulk-api). - JSON stream API aka [ndjson](http://ndjson.org/). See [these docs](#json-stream-api). -- [Loki JSON API](https://grafana.com/docs/loki/latest/api/#push-log-entries-to-lokiq). See [these docs](#loki-json-api). +- Loki JSON API. See [these docs](#loki-json-api). VictoriaLogs accepts optional [HTTP parameters](#http-parameters) at data ingestion HTTP APIs. @@ -47,12 +47,18 @@ The following command pushes a single log line to VictoriaLogs: ```bash echo '{"create":{}} -{"_msg":"cannot open file","_time":"2023-06-21T04:24:24Z","host.name":"host123"} +{"_msg":"cannot open file","_time":"0","host.name":"host123"} ' | curl -X POST -H 'Content-Type: application/json' --data-binary @- http://localhost:9428/insert/elasticsearch/_bulk ``` It is possible to push thousands of log lines in a single request to this API. +If the [timestamp field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field) is set to `"0"`, +then the current timestamp at VictoriaLogs side is used per each ingested log line. +Otherwise the timestamp field must be in the [ISO8601](https://en.wikipedia.org/wiki/ISO_8601) format. For example, `2023-06-20T15:32:10Z`. +Optional fractional part of seconds can be specified after the dot - `2023-06-20T15:32:10.123Z`. +Timezone can be specified instead of `Z` suffix - `2023-06-20T15:32:10+02:00`. + See [these docs](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) for details on fields, which must be present in the ingested log messages. @@ -88,17 +94,18 @@ VictoriaLogs accepts JSON line stream aka [ndjson](http://ndjson.org/) at `http: The following command pushes multiple log lines to VictoriaLogs: ```bash -echo '{ "log": { "level": "info", "message": "hello world" }, "date": "2023-06-20T15:31:23Z", "stream": "stream1" } -{ "log": { "level": "error", "message": "oh no!" }, "date": "2023-06-20T15:32:10.567Z", "stream": "stream1" } -{ "log": { "level": "info", "message": "hello world" }, "date": "2023-06-20T15:35:11.567890+02:00", "stream": "stream2" } +echo '{ "log": { "level": "info", "message": "hello world" }, "date": "0", "stream": "stream1" } +{ "log": { "level": "error", "message": "oh no!" }, "date": "0", "stream": "stream1" } +{ "log": { "level": "info", "message": "hello world" }, "date": "0", "stream": "stream2" } ' | curl -X POST -H 'Content-Type: application/stream+json' --data-binary @- \ 'http://localhost:9428/insert/jsonline?_stream_fields=stream&_time_field=date&_msg_field=log.message' ``` It is possible to push unlimited number of log lines in a single request to this API. -The [timestamp field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field) must be -in the [ISO8601](https://en.wikipedia.org/wiki/ISO_8601) format. For example, `2023-06-20T15:32:10Z`. +If the [timestamp field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field) is set to `"0"`, +then the current timestamp at VictoriaLogs side is used per each ingested log line. +Otherwise the timestamp field must be in the [ISO8601](https://en.wikipedia.org/wiki/ISO_8601) format. For example, `2023-06-20T15:32:10Z`. Optional fractional part of seconds can be specified after the dot - `2023-06-20T15:32:10.123Z`. Timezone can be specified instead of `Z` suffix - `2023-06-20T15:32:10+02:00`. @@ -134,15 +141,42 @@ See also: ### Loki JSON API -VictoriaLogs accepts logs in [Loki JSON API](https://grafana.com/docs/loki/latest/api/#push-log-entries-to-lokiq) format at `http://localhost:9428/insert/loki/api/v1/push` endpoint. +VictoriaLogs accepts logs in [Loki JSON API](https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki) format at `http://localhost:9428/insert/loki/api/v1/push` endpoint. The following command pushes a single log line to Loki JSON API at VictoriaLogs: ```bash -curl -v -H "Content-Type: application/json" -XPOST -s "http://localhost:9428/insert/loki/api/v1/push?_stream_fields=foo" --data-raw \ - '{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}' +curl -H "Content-Type: application/json" -XPOST "http://localhost:9428/insert/loki/api/v1/push?_stream_fields=instance,job" --data-raw \ + '{"streams": [{ "stream": { "instance": "host123", "job": "app42" }, "values": [ [ "0", "foo fizzbuzz bar" ] ] }]}' ``` +It is possible to push thousands of log streams and log lines in a single request to this API. + +The API accepts various http parameters, which can change the data ingestion behavior - [these docs](#http-parameters) for details. + +The following command verifies that the data has been successfully ingested into VictoriaLogs by [querying](https://docs.victoriametrics.com/VictoriaLogs/querying/) it: + +```bash +curl http://localhost:9428/select/logsql/query -d 'query=fizzbuzz' +``` + +The command should return the following response: + +```bash +{"_msg":"foo fizzbuzz bar","_stream":"{instance=\"host123\",job=\"app42\"}","_time":"2023-07-20T23:01:19.288676497Z"} +``` + +The response by default contains [`_msg`](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#message-field), +[`_stream`](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields) and +[`_time`](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field) fields plus the explicitly mentioned fields. +See [these docs](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#querying-specific-fields) for details. + +See also: + +- [How to debug data ingestion](#troubleshooting). +- [HTTP parameters, which can be passed to the API](#http-parameters). +- [How to query VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/querying.html). + ### HTTP parameters VictoriaLogs accepts the following parameters at [data ingestion HTTP APIs](#http-apis): diff --git a/lib/logstorage/log_rows.go b/lib/logstorage/log_rows.go index 6efbc8a3d..ce759b85a 100644 --- a/lib/logstorage/log_rows.go +++ b/lib/logstorage/log_rows.go @@ -230,6 +230,7 @@ func (lr *LogRows) GetRowString(idx int) string { // GetLogRows returns LogRows from the pool for the given streamFields. // // streamFields is a set of field names, which must be associated with the stream. +// ignoreFields is a set of field names, which must be ignored during data ingestion. // // Return back it to the pool with PutLogRows() when it is no longer needed. func GetLogRows(streamFields, ignoreFields []string) *LogRows { diff --git a/lib/logstorage/tenant_id.go b/lib/logstorage/tenant_id.go index 49873dcee..1d406b875 100644 --- a/lib/logstorage/tenant_id.go +++ b/lib/logstorage/tenant_id.go @@ -87,7 +87,7 @@ func GetTenantIDFromString(s string) (TenantID, error) { if colon < 0 { account, err := getUint32FromString(s) if err != nil { - return tenantID, fmt.Errorf("cannot parse %q as TenantID: %w", s, err) + return tenantID, fmt.Errorf("cannot parse accountID from %q: %w", s, err) } tenantID.AccountID = account @@ -96,13 +96,13 @@ func GetTenantIDFromString(s string) (TenantID, error) { account, err := getUint32FromString(s[:colon]) if err != nil { - return tenantID, fmt.Errorf("cannot parse %q as TenantID: %w", s, err) + return tenantID, fmt.Errorf("cannot parse accountID part from %q: %w", s, err) } tenantID.AccountID = account project, err := getUint32FromString(s[colon+1:]) if err != nil { - return tenantID, fmt.Errorf("cannot parse %q as TenantID: %w", s, err) + return tenantID, fmt.Errorf("cannot parse projectID part from %q: %w", s, err) } tenantID.ProjectID = project diff --git a/lib/slicesutil/resize.go b/lib/slicesutil/resize.go deleted file mode 100644 index b650b6ca9..000000000 --- a/lib/slicesutil/resize.go +++ /dev/null @@ -1,20 +0,0 @@ -package slicesutil - -import "math/bits" - -// ResizeNoCopyMayOverallocate resizes dst to minimum n bytes and returns the resized buffer (which may be newly allocated). -// -// If newly allocated buffer is returned then b contents isn't copied to it. -func ResizeNoCopyMayOverallocate[T any](dst []T, n int) []T { - if n <= cap(dst) { - return dst[:n] - } - nNew := roundToNearestPow2(n) - dstNew := make([]T, nNew) - return dstNew[:n] -} - -func roundToNearestPow2(n int) int { - pow2 := uint8(bits.Len(uint(n - 1))) - return 1 << pow2 -}