app/vlinsert/loki: follow-up for 3aeb1b96a2

- Disallow more than 3 items in Loki line entry, since it must contain two mandatory entries: timestamp and message,
  plus one optional entry - structured metadata. See https://grafana.com/docs/loki/latest/reference/loki-http-api/#ingest-logs

- Update references to structured metadata docs in Loki, in order to simplify further maintenance of the code

- Move the change from bugfix to feature at docs/VictoriaLogs/CHANGELOG.md, since VictoriaLogs never supported
  structured metadata over JSON Loki protocol. The support for structured metadata in protobuf Loki protocol
  has been added in ac06569c49 , which has been included in v0.28.0-victorialogs.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7431
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7432

(cherry picked from commit 3d75c39ff4)
This commit is contained in:
Aliaksandr Valialkin 2024-11-06 19:23:35 +01:00 committed by hagen1778
parent 364a2e3e1f
commit 1c7d8adc65
No known key found for this signature in database
GPG key ID: E92986095E0DD614
4 changed files with 13 additions and 16 deletions

View file

@ -108,9 +108,6 @@ func parseJSONRequest(data []byte, lmp insertutils.LogMessageProcessor) (int, er
labels = o
}
labels.Visit(func(k []byte, v *fastjson.Value) {
if err != nil {
return
}
vStr, errLocal := v.StringBytes()
if errLocal != nil {
err = fmt.Errorf("unexpected label value type for %q:%q; want string", k, v)
@ -141,8 +138,8 @@ func parseJSONRequest(data []byte, lmp insertutils.LogMessageProcessor) (int, er
if err != nil {
return rowsIngested, fmt.Errorf("unexpected contents of `values` item; want array; got %q", line)
}
if len(lineA) < 2 {
return rowsIngested, fmt.Errorf("unexpected number of values in `values` item array %q; got %d want 2", line, len(lineA))
if len(lineA) < 2 || len(lineA) > 3 {
return rowsIngested, fmt.Errorf("unexpected number of values in `values` item array %q; got %d want 2 or 3", line, len(lineA))
}
// parse timestamp
@ -169,7 +166,7 @@ func parseJSONRequest(data []byte, lmp insertutils.LogMessageProcessor) (int, er
Value: bytesutil.ToUnsafeString(msg),
})
// parse structured metadata
// parse structured metadata - see https://grafana.com/docs/loki/latest/reference/loki-http-api/#ingest-logs
if len(lineA) > 2 {
structuredMetadata, err := lineA[2].Object()
if err != nil {
@ -177,12 +174,9 @@ func parseJSONRequest(data []byte, lmp insertutils.LogMessageProcessor) (int, er
}
structuredMetadata.Visit(func(k []byte, v *fastjson.Value) {
if err != nil {
return
}
vStr, errLocal := v.StringBytes()
if errLocal != nil {
err = fmt.Errorf("unexpected structuredMetadata label value type for %q:%q; want string", k, v)
err = fmt.Errorf("unexpected label value type for %q:%q; want string", k, v)
return
}
@ -192,7 +186,7 @@ func parseJSONRequest(data []byte, lmp insertutils.LogMessageProcessor) (int, er
})
})
if err != nil {
return rowsIngested, fmt.Errorf("error when parsing line `structuredMetadata` object: %w", err)
return rowsIngested, fmt.Errorf("error when parsing `structuredMetadata` object: %w", err)
}
}
lmp.AddRow(ts, fields)

View file

@ -45,14 +45,17 @@ func TestParseJSONRequest_Failure(t *testing.T) {
// Invalid length of `values` individual item
f(`{"streams":[{"values":[[]]}]}`)
f(`{"streams":[{"values":[["123"]]}]}`)
f(`{"streams":[{"values":[["123","456","789","8123"]]}]}`)
// Invalid type for timestamp inside `values` individual item
f(`{"streams":[{"values":[[123,"456"]}]}`)
// Invalid type for log message
f(`{"streams":[{"values":[["123",1234]]}]}`)
// invalid structured metadata type
f(`{"streams":[{"values":[["1577836800000000001", "foo bar", ["metadata_1", "md_value"]]]}]}`)
// structured metadata with unexpected value type
f(`{"streams":[{"values":[["1577836800000000001", "foo bar", {"metadata_1": 1}]] }]}`)
}

View file

@ -17,7 +17,7 @@ var mp easyproto.MarshalerPool
// PushRequest represents Loki PushRequest
//
// See https://github.com/grafana/loki/blob/4220737a52da7ab6c9346b12d5a5d7bedbcd641d/pkg/push/push.proto#L14C1-L14C20
// See https://github.com/grafana/loki/blob/ada4b7b8713385fbe9f5984a5a0aaaddf1a7b851/pkg/push/push.proto#L14
type PushRequest struct {
Streams []Stream
@ -87,7 +87,7 @@ func (pr *PushRequest) unmarshalProtobuf(entriesBuf []Entry, labelPairBuf []Labe
// Stream represents Loki stream.
//
// See https://github.com/grafana/loki/blob/4220737a52da7ab6c9346b12d5a5d7bedbcd641d/pkg/push/push.proto#L23
// See https://github.com/grafana/loki/blob/ada4b7b8713385fbe9f5984a5a0aaaddf1a7b851/pkg/push/push.proto#L23
type Stream struct {
Labels string
Entries []Entry
@ -139,7 +139,7 @@ func (s *Stream) unmarshalProtobuf(entriesBuf []Entry, labelPairBuf []LabelPair,
// Entry represents Loki entry.
//
// See https://github.com/grafana/loki/blob/4220737a52da7ab6c9346b12d5a5d7bedbcd641d/pkg/push/push.proto#L38
// See https://github.com/grafana/loki/blob/ada4b7b8713385fbe9f5984a5a0aaaddf1a7b851/pkg/push/push.proto#L38
type Entry struct {
Timestamp time.Time
Line string
@ -203,7 +203,7 @@ func (e *Entry) unmarshalProtobuf(labelPairBuf []LabelPair, src []byte) ([]Label
// LabelPair represents Loki label pair.
//
// See https://github.com/grafana/loki/blob/4220737a52da7ab6c9346b12d5a5d7bedbcd641d/pkg/push/push.proto#L33
// See https://github.com/grafana/loki/blob/ada4b7b8713385fbe9f5984a5a0aaaddf1a7b851/pkg/push/push.proto#L33
type LabelPair struct {
Name string
Value string

View file

@ -15,12 +15,12 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip
* FEATURE: support [structured metadata](https://grafana.com/docs/loki/latest/get-started/labels/structured-metadata/) when ingesting logs with [Grafana Loki ingestion protocol](https://docs.victoriametrics.com/victorialogs/data-ingestion/promtail/). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7431).
* FEATURE: add [`join` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#join-pipe), which can be used for performing SQL-like joins.
* FEATURE: support returning historical logs from [live tailing API](https://docs.victoriametrics.com/victorialogs/querying/#live-tailing) via `start_offset` query arg. For example, request to `/select/logsql/tail?query=*&start_offset=5m` returns logs for the last 5 minutes before starting returning live tailing logs for the given `query`.
* FEATURE: add an ability to specify extra fields for logs ingested via [HTTP-based data ingestion protocols](https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-apis). See `extra_fields` query arg and `VL-Extra-Fields` HTTP header in [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-parameters).
* FEATURE: add [`block_stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#block_stats-pipe) for returning various per-block stats. This pipe is useful for debugging.
* BUGFIX: properly parse structured metadata when ingesting logs with [Loki ingestion protocol](https://docs.victoriametrics.com/victorialogs/data-ingestion/promtail/). The issue has been introduced in [v0.3.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.3.0-victorialogs). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7431) for the details.
* BUGFIX: properly sort fields with floating-point numbers by [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe). Previously floating-point numbers could be improperly sorted because they were treated as strings, and [natural sorting](https://en.wikipedia.org/wiki/Natural_sort_order) was incorrectly applied to them. For example, `0.123` was treated as bigger than `0.9`.
## [v0.40.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.40.0-victorialogs)