mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vlinsert/loki: follow-up for 3aeb1b96a2
- Disallow more than 3 items in Loki line entry, since it must contain two mandatory entries: timestamp and message,
plus one optional entry - structured metadata. See https://grafana.com/docs/loki/latest/reference/loki-http-api/#ingest-logs
- Update references to structured metadata docs in Loki, in order to simplify further maintenance of the code
- Move the change from bugfix to feature at docs/VictoriaLogs/CHANGELOG.md, since VictoriaLogs never supported
structured metadata over JSON Loki protocol. The support for structured metadata in protobuf Loki protocol
has been added in ac06569c49
, which has been included in v0.28.0-victorialogs.
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7431
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7432
This commit is contained in:
parent
42c9183281
commit
3d75c39ff4
4 changed files with 13 additions and 16 deletions
|
@ -108,9 +108,6 @@ func parseJSONRequest(data []byte, lmp insertutils.LogMessageProcessor) (int, er
|
||||||
labels = o
|
labels = o
|
||||||
}
|
}
|
||||||
labels.Visit(func(k []byte, v *fastjson.Value) {
|
labels.Visit(func(k []byte, v *fastjson.Value) {
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
vStr, errLocal := v.StringBytes()
|
vStr, errLocal := v.StringBytes()
|
||||||
if errLocal != nil {
|
if errLocal != nil {
|
||||||
err = fmt.Errorf("unexpected label value type for %q:%q; want string", k, v)
|
err = fmt.Errorf("unexpected label value type for %q:%q; want string", k, v)
|
||||||
|
@ -141,8 +138,8 @@ func parseJSONRequest(data []byte, lmp insertutils.LogMessageProcessor) (int, er
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return rowsIngested, fmt.Errorf("unexpected contents of `values` item; want array; got %q", line)
|
return rowsIngested, fmt.Errorf("unexpected contents of `values` item; want array; got %q", line)
|
||||||
}
|
}
|
||||||
if len(lineA) < 2 {
|
if len(lineA) < 2 || len(lineA) > 3 {
|
||||||
return rowsIngested, fmt.Errorf("unexpected number of values in `values` item array %q; got %d want 2", line, len(lineA))
|
return rowsIngested, fmt.Errorf("unexpected number of values in `values` item array %q; got %d want 2 or 3", line, len(lineA))
|
||||||
}
|
}
|
||||||
|
|
||||||
// parse timestamp
|
// parse timestamp
|
||||||
|
@ -169,7 +166,7 @@ func parseJSONRequest(data []byte, lmp insertutils.LogMessageProcessor) (int, er
|
||||||
Value: bytesutil.ToUnsafeString(msg),
|
Value: bytesutil.ToUnsafeString(msg),
|
||||||
})
|
})
|
||||||
|
|
||||||
// parse structured metadata
|
// parse structured metadata - see https://grafana.com/docs/loki/latest/reference/loki-http-api/#ingest-logs
|
||||||
if len(lineA) > 2 {
|
if len(lineA) > 2 {
|
||||||
structuredMetadata, err := lineA[2].Object()
|
structuredMetadata, err := lineA[2].Object()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -177,12 +174,9 @@ func parseJSONRequest(data []byte, lmp insertutils.LogMessageProcessor) (int, er
|
||||||
}
|
}
|
||||||
|
|
||||||
structuredMetadata.Visit(func(k []byte, v *fastjson.Value) {
|
structuredMetadata.Visit(func(k []byte, v *fastjson.Value) {
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
vStr, errLocal := v.StringBytes()
|
vStr, errLocal := v.StringBytes()
|
||||||
if errLocal != nil {
|
if errLocal != nil {
|
||||||
err = fmt.Errorf("unexpected structuredMetadata label value type for %q:%q; want string", k, v)
|
err = fmt.Errorf("unexpected label value type for %q:%q; want string", k, v)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -192,7 +186,7 @@ func parseJSONRequest(data []byte, lmp insertutils.LogMessageProcessor) (int, er
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return rowsIngested, fmt.Errorf("error when parsing line `structuredMetadata` object: %w", err)
|
return rowsIngested, fmt.Errorf("error when parsing `structuredMetadata` object: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
lmp.AddRow(ts, fields)
|
lmp.AddRow(ts, fields)
|
||||||
|
|
|
@ -45,14 +45,17 @@ func TestParseJSONRequest_Failure(t *testing.T) {
|
||||||
// Invalid length of `values` individual item
|
// Invalid length of `values` individual item
|
||||||
f(`{"streams":[{"values":[[]]}]}`)
|
f(`{"streams":[{"values":[[]]}]}`)
|
||||||
f(`{"streams":[{"values":[["123"]]}]}`)
|
f(`{"streams":[{"values":[["123"]]}]}`)
|
||||||
|
f(`{"streams":[{"values":[["123","456","789","8123"]]}]}`)
|
||||||
|
|
||||||
// Invalid type for timestamp inside `values` individual item
|
// Invalid type for timestamp inside `values` individual item
|
||||||
f(`{"streams":[{"values":[[123,"456"]}]}`)
|
f(`{"streams":[{"values":[[123,"456"]}]}`)
|
||||||
|
|
||||||
// Invalid type for log message
|
// Invalid type for log message
|
||||||
f(`{"streams":[{"values":[["123",1234]]}]}`)
|
f(`{"streams":[{"values":[["123",1234]]}]}`)
|
||||||
|
|
||||||
// invalid structured metadata type
|
// invalid structured metadata type
|
||||||
f(`{"streams":[{"values":[["1577836800000000001", "foo bar", ["metadata_1", "md_value"]]]}]}`)
|
f(`{"streams":[{"values":[["1577836800000000001", "foo bar", ["metadata_1", "md_value"]]]}]}`)
|
||||||
|
|
||||||
// structured metadata with unexpected value type
|
// structured metadata with unexpected value type
|
||||||
f(`{"streams":[{"values":[["1577836800000000001", "foo bar", {"metadata_1": 1}]] }]}`)
|
f(`{"streams":[{"values":[["1577836800000000001", "foo bar", {"metadata_1": 1}]] }]}`)
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,7 @@ var mp easyproto.MarshalerPool
|
||||||
|
|
||||||
// PushRequest represents Loki PushRequest
|
// PushRequest represents Loki PushRequest
|
||||||
//
|
//
|
||||||
// See https://github.com/grafana/loki/blob/4220737a52da7ab6c9346b12d5a5d7bedbcd641d/pkg/push/push.proto#L14C1-L14C20
|
// See https://github.com/grafana/loki/blob/ada4b7b8713385fbe9f5984a5a0aaaddf1a7b851/pkg/push/push.proto#L14
|
||||||
type PushRequest struct {
|
type PushRequest struct {
|
||||||
Streams []Stream
|
Streams []Stream
|
||||||
|
|
||||||
|
@ -87,7 +87,7 @@ func (pr *PushRequest) unmarshalProtobuf(entriesBuf []Entry, labelPairBuf []Labe
|
||||||
|
|
||||||
// Stream represents Loki stream.
|
// Stream represents Loki stream.
|
||||||
//
|
//
|
||||||
// See https://github.com/grafana/loki/blob/4220737a52da7ab6c9346b12d5a5d7bedbcd641d/pkg/push/push.proto#L23
|
// See https://github.com/grafana/loki/blob/ada4b7b8713385fbe9f5984a5a0aaaddf1a7b851/pkg/push/push.proto#L23
|
||||||
type Stream struct {
|
type Stream struct {
|
||||||
Labels string
|
Labels string
|
||||||
Entries []Entry
|
Entries []Entry
|
||||||
|
@ -139,7 +139,7 @@ func (s *Stream) unmarshalProtobuf(entriesBuf []Entry, labelPairBuf []LabelPair,
|
||||||
|
|
||||||
// Entry represents Loki entry.
|
// Entry represents Loki entry.
|
||||||
//
|
//
|
||||||
// See https://github.com/grafana/loki/blob/4220737a52da7ab6c9346b12d5a5d7bedbcd641d/pkg/push/push.proto#L38
|
// See https://github.com/grafana/loki/blob/ada4b7b8713385fbe9f5984a5a0aaaddf1a7b851/pkg/push/push.proto#L38
|
||||||
type Entry struct {
|
type Entry struct {
|
||||||
Timestamp time.Time
|
Timestamp time.Time
|
||||||
Line string
|
Line string
|
||||||
|
@ -203,7 +203,7 @@ func (e *Entry) unmarshalProtobuf(labelPairBuf []LabelPair, src []byte) ([]Label
|
||||||
|
|
||||||
// LabelPair represents Loki label pair.
|
// LabelPair represents Loki label pair.
|
||||||
//
|
//
|
||||||
// See https://github.com/grafana/loki/blob/4220737a52da7ab6c9346b12d5a5d7bedbcd641d/pkg/push/push.proto#L33
|
// See https://github.com/grafana/loki/blob/ada4b7b8713385fbe9f5984a5a0aaaddf1a7b851/pkg/push/push.proto#L33
|
||||||
type LabelPair struct {
|
type LabelPair struct {
|
||||||
Name string
|
Name string
|
||||||
Value string
|
Value string
|
||||||
|
|
|
@ -15,12 +15,12 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
|
||||||
|
|
||||||
## tip
|
## tip
|
||||||
|
|
||||||
|
* FEATURE: support [structured metadata](https://grafana.com/docs/loki/latest/get-started/labels/structured-metadata/) when ingesting logs with [Grafana Loki ingestion protocol](https://docs.victoriametrics.com/victorialogs/data-ingestion/promtail/). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7431).
|
||||||
* FEATURE: add [`join` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#join-pipe), which can be used for performing SQL-like joins.
|
* FEATURE: add [`join` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#join-pipe), which can be used for performing SQL-like joins.
|
||||||
* FEATURE: support returning historical logs from [live tailing API](https://docs.victoriametrics.com/victorialogs/querying/#live-tailing) via `start_offset` query arg. For example, request to `/select/logsql/tail?query=*&start_offset=5m` returns logs for the last 5 minutes before starting returning live tailing logs for the given `query`.
|
* FEATURE: support returning historical logs from [live tailing API](https://docs.victoriametrics.com/victorialogs/querying/#live-tailing) via `start_offset` query arg. For example, request to `/select/logsql/tail?query=*&start_offset=5m` returns logs for the last 5 minutes before starting returning live tailing logs for the given `query`.
|
||||||
* FEATURE: add an ability to specify extra fields for logs ingested via [HTTP-based data ingestion protocols](https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-apis). See `extra_fields` query arg and `VL-Extra-Fields` HTTP header in [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-parameters).
|
* FEATURE: add an ability to specify extra fields for logs ingested via [HTTP-based data ingestion protocols](https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-apis). See `extra_fields` query arg and `VL-Extra-Fields` HTTP header in [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-parameters).
|
||||||
* FEATURE: add [`block_stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#block_stats-pipe) for returning various per-block stats. This pipe is useful for debugging.
|
* FEATURE: add [`block_stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#block_stats-pipe) for returning various per-block stats. This pipe is useful for debugging.
|
||||||
|
|
||||||
* BUGFIX: properly parse structured metadata when ingesting logs with [Loki ingestion protocol](https://docs.victoriametrics.com/victorialogs/data-ingestion/promtail/). The issue has been introduced in [v0.3.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.3.0-victorialogs). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7431) for the details.
|
|
||||||
* BUGFIX: properly sort fields with floating-point numbers by [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe). Previously floating-point numbers could be improperly sorted because they were treated as strings, and [natural sorting](https://en.wikipedia.org/wiki/Natural_sort_order) was incorrectly applied to them. For example, `0.123` was treated as bigger than `0.9`.
|
* BUGFIX: properly sort fields with floating-point numbers by [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe). Previously floating-point numbers could be improperly sorted because they were treated as strings, and [natural sorting](https://en.wikipedia.org/wiki/Natural_sort_order) was incorrectly applied to them. For example, `0.123` was treated as bigger than `0.9`.
|
||||||
|
|
||||||
## [v0.40.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.40.0-victorialogs)
|
## [v0.40.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.40.0-victorialogs)
|
||||||
|
|
Loading…
Reference in a new issue