From ac06569c49ebdf251003d651adddc39580ca720d Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 10 Jul 2024 02:42:41 +0200 Subject: [PATCH] app/vlinsert/loki: use easyproto instead for parsing Loki protobuf messages --- app/vlinsert/loki/loki_protobuf.go | 52 +- app/vlinsert/loki/loki_protobuf_test.go | 7 +- .../loki/loki_protobuf_timing_test.go | 51 +- app/vlinsert/loki/pb.go | 302 +++++++ app/vlinsert/loki/push_request.pb.go | 801 ------------------ app/vlinsert/loki/push_request.proto | 38 - app/vlinsert/loki/timestamp.go | 110 --- app/vlinsert/loki/types.go | 418 --------- lib/logstorage/stream_tags.go | 6 +- 9 files changed, 382 insertions(+), 1403 deletions(-) create mode 100644 app/vlinsert/loki/pb.go delete mode 100644 app/vlinsert/loki/push_request.pb.go delete mode 100644 app/vlinsert/loki/push_request.proto delete mode 100644 app/vlinsert/loki/timestamp.go delete mode 100644 app/vlinsert/loki/types.go diff --git a/app/vlinsert/loki/loki_protobuf.go b/app/vlinsert/loki/loki_protobuf.go index b8c612a26..2c1ac6b39 100644 --- a/app/vlinsert/loki/loki_protobuf.go +++ b/app/vlinsert/loki/loki_protobuf.go @@ -79,12 +79,14 @@ func parseProtobufRequest(data []byte, lmp insertutils.LogMessageProcessor) (int req := getPushRequest() defer putPushRequest(req) - err = req.Unmarshal(bb.B) + err = req.UnmarshalProtobuf(bb.B) if err != nil { return 0, fmt.Errorf("cannot parse request body: %w", err) } - var commonFields []logstorage.Field + fields := getFields() + defer putFields(fields) + rowsIngested := 0 streams := req.Streams currentTimestamp := time.Now().UnixNano() @@ -92,30 +94,60 @@ func parseProtobufRequest(data []byte, lmp insertutils.LogMessageProcessor) (int stream := &streams[i] // st.Labels contains labels for the stream. // Labels are same for all entries in the stream. - commonFields, err = parsePromLabels(commonFields[:0], stream.Labels) + fields.fields, err = parsePromLabels(fields.fields[:0], stream.Labels) if err != nil { return rowsIngested, fmt.Errorf("cannot parse stream labels %q: %w", stream.Labels, err) } - fields := commonFields + commonFieldsLen := len(fields.fields) entries := stream.Entries for j := range entries { - entry := &entries[j] - fields = append(fields[:len(commonFields)], logstorage.Field{ + e := &entries[j] + fields.fields = fields.fields[:commonFieldsLen] + + for _, lp := range e.StructuredMetadata { + fields.fields = append(fields.fields, logstorage.Field{ + Name: lp.Name, + Value: lp.Value, + }) + } + + fields.fields = append(fields.fields, logstorage.Field{ Name: "_msg", - Value: entry.Line, + Value: e.Line, }) - ts := entry.Timestamp.UnixNano() + + ts := e.Timestamp.UnixNano() if ts == 0 { ts = currentTimestamp } - lmp.AddRow(ts, fields) + + lmp.AddRow(ts, fields.fields) } rowsIngested += len(stream.Entries) } return rowsIngested, nil } +func getFields() *fields { + v := fieldsPool.Get() + if v == nil { + return &fields{} + } + return v.(*fields) +} + +func putFields(f *fields) { + f.fields = f.fields[:0] + fieldsPool.Put(f) +} + +var fieldsPool sync.Pool + +type fields struct { + fields []logstorage.Field +} + // parsePromLabels parses log fields in Prometheus text exposition format from s, appends them to dst and returns the result. // // See test data of promtail for examples: https://github.com/grafana/loki/blob/a24ef7b206e0ca63ee74ca6ecb0a09b745cd2258/pkg/push/types_test.go @@ -181,6 +213,6 @@ func getPushRequest() *PushRequest { } func putPushRequest(req *PushRequest) { - req.Reset() + req.reset() pushReqsPool.Put(req) } diff --git a/app/vlinsert/loki/loki_protobuf_test.go b/app/vlinsert/loki/loki_protobuf_test.go index 0e6424a72..14b3452e6 100644 --- a/app/vlinsert/loki/loki_protobuf_test.go +++ b/app/vlinsert/loki/loki_protobuf_test.go @@ -36,7 +36,7 @@ func (tlp *testLogMessageProcessor) AddRow(timestamp int64, fields []logstorage. Entries: []Entry{ { Timestamp: time.Unix(0, timestamp), - Line: msg, + Line: strings.Clone(msg), }, }, }) @@ -58,10 +58,7 @@ func TestParseProtobufRequest_Success(t *testing.T) { t.Fatalf("unexpected number of streams; got %d; want %d", len(tlp.pr.Streams), n) } - data, err := tlp.pr.Marshal() - if err != nil { - t.Fatalf("unexpected error when marshaling PushRequest: %s", err) - } + data := tlp.pr.MarshalProtobuf(nil) encodedData := snappy.Encode(nil, data) tlp2 := &insertutils.TestLogMessageProcessor{} diff --git a/app/vlinsert/loki/loki_protobuf_timing_test.go b/app/vlinsert/loki/loki_protobuf_timing_test.go index 6f873f11d..1143c33c4 100644 --- a/app/vlinsert/loki/loki_protobuf_timing_test.go +++ b/app/vlinsert/loki/loki_protobuf_timing_test.go @@ -9,6 +9,7 @@ import ( "github.com/golang/snappy" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" ) func BenchmarkParseProtobufRequest(b *testing.B) { @@ -38,29 +39,47 @@ func benchmarkParseProtobufRequest(b *testing.B, streams, rows, labels int) { }) } -func getProtobufBody(streams, rows, labels int) []byte { - var pr PushRequest - - for i := 0; i < streams; i++ { - var st Stream - - st.Labels = `{` - for j := 0; j < labels; j++ { - st.Labels += `label_` + strconv.Itoa(j) + `="value_` + strconv.Itoa(j) + `"` - if j < labels-1 { - st.Labels += `,` +func getProtobufBody(streamsCount, rowsCount, labelsCount int) []byte { + var b []byte + var entries []Entry + streams := make([]Stream, streamsCount) + for i := range streams { + b = b[:0] + b = append(b, '{') + for j := 0; j < labelsCount; j++ { + b = append(b, "label_"...) + b = strconv.AppendInt(b, int64(j), 10) + b = append(b, `="value_`...) + b = strconv.AppendInt(b, int64(j), 10) + b = append(b, '"') + if j < labelsCount-1 { + b = append(b, ',') } } - st.Labels += `}` + b = append(b, '}') + labels := string(b) - for j := 0; j < rows; j++ { - st.Entries = append(st.Entries, Entry{Timestamp: time.Now(), Line: "value_" + strconv.Itoa(j)}) + var rowsBuf []byte + entriesLen := len(entries) + for j := 0; j < rowsCount; j++ { + rowsBufLen := len(rowsBuf) + rowsBuf = append(rowsBuf, "value_"...) + rowsBuf = strconv.AppendInt(rowsBuf, int64(j), 10) + entries = append(entries, Entry{ + Timestamp: time.Now(), + Line: bytesutil.ToUnsafeString(rowsBuf[rowsBufLen:]), + }) } - pr.Streams = append(pr.Streams, st) + st := &streams[i] + st.Labels = labels + st.Entries = entries[entriesLen:] + } + pr := PushRequest{ + Streams: streams, } - body, _ := pr.Marshal() + body := pr.MarshalProtobuf(nil) encodedBody := snappy.Encode(nil, body) return encodedBody diff --git a/app/vlinsert/loki/pb.go b/app/vlinsert/loki/pb.go new file mode 100644 index 000000000..e046ef5ae --- /dev/null +++ b/app/vlinsert/loki/pb.go @@ -0,0 +1,302 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: push_request.proto +// source: https://raw.githubusercontent.com/grafana/loki/main/pkg/push/push_request.proto +// Licensed under the Apache License, Version 2.0 (the "License"); +// https://github.com/grafana/loki/blob/main/pkg/push/LICENSE + +package loki + +import ( + "fmt" + "time" + + "github.com/VictoriaMetrics/easyproto" +) + +var mp easyproto.MarshalerPool + +// PushRequest represents Loki PushRequest +// +// See https://github.com/grafana/loki/blob/4220737a52da7ab6c9346b12d5a5d7bedbcd641d/pkg/push/push.proto#L14C1-L14C20 +type PushRequest struct { + Streams []Stream + + entriesBuf []Entry + labelPairBuf []LabelPair +} + +func (pr *PushRequest) reset() { + pr.Streams = pr.Streams[:0] + + pr.entriesBuf = pr.entriesBuf[:0] + pr.labelPairBuf = pr.labelPairBuf[:0] +} + +// UnmarshalProtobuf unmarshals pr from protobuf message at src. +// +// pr remains valid until src is modified. +func (pr *PushRequest) UnmarshalProtobuf(src []byte) error { + pr.reset() + var err error + pr.entriesBuf, pr.labelPairBuf, err = pr.unmarshalProtobuf(pr.entriesBuf, pr.labelPairBuf, src) + return err +} + +// MarshalProtobuf marshals r to protobuf message, appends it to dst and returns the result. +func (pr *PushRequest) MarshalProtobuf(dst []byte) []byte { + m := mp.Get() + pr.marshalProtobuf(m.MessageMarshaler()) + dst = m.Marshal(dst) + mp.Put(m) + return dst +} + +func (pr *PushRequest) marshalProtobuf(mm *easyproto.MessageMarshaler) { + for _, s := range pr.Streams { + s.marshalProtobuf(mm.AppendMessage(1)) + } +} + +func (pr *PushRequest) unmarshalProtobuf(entriesBuf []Entry, labelPairBuf []LabelPair, src []byte) ([]Entry, []LabelPair, error) { + // message PushRequest { + // repeated Stream streams = 1; + // } + var err error + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return entriesBuf, labelPairBuf, fmt.Errorf("cannot read next field in PushRequest: %w", err) + } + switch fc.FieldNum { + case 1: + data, ok := fc.MessageData() + if !ok { + return entriesBuf, labelPairBuf, fmt.Errorf("cannot read Stream data") + } + pr.Streams = append(pr.Streams, Stream{}) + s := &pr.Streams[len(pr.Streams)-1] + entriesBuf, labelPairBuf, err = s.unmarshalProtobuf(entriesBuf, labelPairBuf, data) + if err != nil { + return entriesBuf, labelPairBuf, fmt.Errorf("cannot unmarshal Stream: %w", err) + } + } + } + return entriesBuf, labelPairBuf, nil +} + +// Stream represents Loki stream. +// +// See https://github.com/grafana/loki/blob/4220737a52da7ab6c9346b12d5a5d7bedbcd641d/pkg/push/push.proto#L23 +type Stream struct { + Labels string + Entries []Entry +} + +func (s *Stream) marshalProtobuf(mm *easyproto.MessageMarshaler) { + mm.AppendString(1, s.Labels) + for _, e := range s.Entries { + e.marshalProtobuf(mm.AppendMessage(2)) + } +} + +func (s *Stream) unmarshalProtobuf(entriesBuf []Entry, labelPairBuf []LabelPair, src []byte) ([]Entry, []LabelPair, error) { + // message Stream { + // string labels = 1; + // repeated Entry entries = 2; + // } + var err error + var fc easyproto.FieldContext + entriesBufLen := len(entriesBuf) + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return entriesBuf, labelPairBuf, fmt.Errorf("cannot read next field in Stream: %w", err) + } + switch fc.FieldNum { + case 1: + labels, ok := fc.String() + if !ok { + return entriesBuf, labelPairBuf, fmt.Errorf("cannot read labels") + } + s.Labels = labels + case 2: + data, ok := fc.MessageData() + if !ok { + return entriesBuf, labelPairBuf, fmt.Errorf("cannot read Entry data") + } + entriesBuf = append(entriesBuf, Entry{}) + e := &entriesBuf[len(entriesBuf)-1] + labelPairBuf, err = e.unmarshalProtobuf(labelPairBuf, data) + if err != nil { + return entriesBuf, labelPairBuf, fmt.Errorf("cannot unmarshal Entry: %w", err) + } + } + } + s.Entries = entriesBuf[entriesBufLen:] + return entriesBuf, labelPairBuf, nil +} + +// Entry represents Loki entry. +// +// See https://github.com/grafana/loki/blob/4220737a52da7ab6c9346b12d5a5d7bedbcd641d/pkg/push/push.proto#L38 +type Entry struct { + Timestamp time.Time + Line string + StructuredMetadata []LabelPair +} + +func (e *Entry) marshalProtobuf(mm *easyproto.MessageMarshaler) { + marshalTime(mm, 1, e.Timestamp) + mm.AppendString(2, e.Line) + for _, lp := range e.StructuredMetadata { + lp.marshalProtobuf(mm.AppendMessage(3)) + } +} + +func (e *Entry) unmarshalProtobuf(labelPairBuf []LabelPair, src []byte) ([]LabelPair, error) { + // message Entry { + // Timestamp timestamp = 1; + // string line = 2; + // repeated LabelPair structuredMetadata = 3; + // } + var err error + var fc easyproto.FieldContext + labelPairBufLen := len(labelPairBuf) + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return labelPairBuf, fmt.Errorf("cannot read next field in Entry: %w", err) + } + switch fc.FieldNum { + case 1: + data, ok := fc.MessageData() + if !ok { + return labelPairBuf, fmt.Errorf("cannot read Timestamp data") + } + timestamp, err := unmarshalTime(data) + if err != nil { + return labelPairBuf, fmt.Errorf("cannot unmarshal Timestamp: %w", err) + } + e.Timestamp = timestamp + case 2: + line, ok := fc.String() + if !ok { + return labelPairBuf, fmt.Errorf("cannot read Line") + } + e.Line = line + case 3: + data, ok := fc.MessageData() + if !ok { + return labelPairBuf, fmt.Errorf("cannot read StructuredMetadata") + } + labelPairBuf = append(labelPairBuf, LabelPair{}) + lp := &labelPairBuf[len(labelPairBuf)-1] + if err := lp.unmarshalProtobuf(data); err != nil { + return labelPairBuf, fmt.Errorf("cannot unmarshal StructuredMetadata: %w", err) + } + } + } + e.StructuredMetadata = labelPairBuf[labelPairBufLen:] + return labelPairBuf, nil +} + +// LabelPair represents Loki label pair. +// +// See https://github.com/grafana/loki/blob/4220737a52da7ab6c9346b12d5a5d7bedbcd641d/pkg/push/push.proto#L33 +type LabelPair struct { + Name string + Value string +} + +func (lp *LabelPair) marshalProtobuf(mm *easyproto.MessageMarshaler) { + mm.AppendString(1, lp.Name) + mm.AppendString(2, lp.Value) +} + +func (lp *LabelPair) unmarshalProtobuf(src []byte) (err error) { + // message LabelPair { + // string name = 1; + // string value = 2; + // } + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in LabelPair: %w", err) + } + switch fc.FieldNum { + case 1: + name, ok := fc.String() + if !ok { + return fmt.Errorf("cannot read name") + } + lp.Name = name + case 2: + value, ok := fc.String() + if !ok { + return fmt.Errorf("cannot unmarshal value") + } + lp.Value = value + } + } + return nil +} + +func marshalTime(mm *easyproto.MessageMarshaler, fieldNum uint32, timestamp time.Time) { + nsecs := timestamp.UnixNano() + ts := Timestamp{ + Seconds: nsecs / 1e9, + Nanos: int32(nsecs % 1e9), + } + ts.marshalProtobuf(mm.AppendMessage(fieldNum)) +} + +func unmarshalTime(src []byte) (time.Time, error) { + var ts Timestamp + if err := ts.unmarshalProtobuf(src); err != nil { + return time.Time{}, err + } + timestamp := time.Unix(ts.Seconds, int64(ts.Nanos)).UTC() + return timestamp, nil +} + +// Timestamp is protobuf well-known timestamp type. +type Timestamp struct { + Seconds int64 + Nanos int32 +} + +func (ts *Timestamp) marshalProtobuf(mm *easyproto.MessageMarshaler) { + mm.AppendInt64(1, ts.Seconds) + mm.AppendInt32(2, ts.Nanos) +} + +func (ts *Timestamp) unmarshalProtobuf(src []byte) (err error) { + // message Timestamp { + // int64 seconds = 1; + // int32 nanos = 2; + // } + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in Timestamp: %w", err) + } + switch fc.FieldNum { + case 1: + seconds, ok := fc.Int64() + if !ok { + return fmt.Errorf("cannot read Seconds") + } + ts.Seconds = seconds + case 2: + nanos, ok := fc.Int32() + if !ok { + return fmt.Errorf("cannot read Nanos") + } + ts.Nanos = nanos + } + } + return nil +} diff --git a/app/vlinsert/loki/push_request.pb.go b/app/vlinsert/loki/push_request.pb.go deleted file mode 100644 index 490b4372b..000000000 --- a/app/vlinsert/loki/push_request.pb.go +++ /dev/null @@ -1,801 +0,0 @@ -// Code generated by protoc-gen-gogo. DO NOT EDIT. -// source: push_request.proto -// source: https://raw.githubusercontent.com/grafana/loki/main/pkg/push/push_request.proto -// Licensed under the Apache License, Version 2.0 (the "License"); -// https://github.com/grafana/loki/blob/main/pkg/push/LICENSE - -package loki - -import ( - "fmt" - "io" - math_bits "math/bits" - "strings" - "time" - - github_com_gogo_protobuf_types "github.com/gogo/protobuf/types" -) - -type PushRequest struct { - Streams []Stream `protobuf:"bytes,1,rep,name=streams,proto3,customtype=Stream" json:"streams"` -} - -func (m *PushRequest) Reset() { *m = PushRequest{} } - -type PushResponse struct { -} - -func (m *PushResponse) Reset() { *m = PushResponse{} } - -type StreamAdapter struct { - Labels string `protobuf:"bytes,1,opt,name=labels,proto3" json:"labels"` - Entries []EntryAdapter `protobuf:"bytes,2,rep,name=entries,proto3" json:"entries"` - // hash contains the original hash of the stream. - Hash uint64 `protobuf:"varint,3,opt,name=hash,proto3" json:"-"` -} - -func (m *StreamAdapter) Reset() { *m = StreamAdapter{} } - -func (m *StreamAdapter) GetLabels() string { - if m != nil { - return m.Labels - } - return "" -} - -func (m *StreamAdapter) GetEntries() []EntryAdapter { - if m != nil { - return m.Entries - } - return nil -} - -func (m *StreamAdapter) GetHash() uint64 { - if m != nil { - return m.Hash - } - return 0 -} - -type EntryAdapter struct { - Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"ts"` - Line string `protobuf:"bytes,2,opt,name=line,proto3" json:"line"` -} - -func (m *EntryAdapter) Reset() { *m = EntryAdapter{} } - -func (m *EntryAdapter) GetTimestamp() time.Time { - if m != nil { - return m.Timestamp - } - return time.Time{} -} - -func (m *EntryAdapter) GetLine() string { - if m != nil { - return m.Line - } - return "" -} - -func (m *PushRequest) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalToSizedBuffer(dAtA[:size]) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *PushRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { - i := len(dAtA) - _ = i - var l int - _ = l - if len(m.Streams) > 0 { - for iNdEx := len(m.Streams) - 1; iNdEx >= 0; iNdEx-- { - { - size := m.Streams[iNdEx].Size() - i -= size - if _, err := m.Streams[iNdEx].MarshalTo(dAtA[i:]); err != nil { - return 0, err - } - i = encodeVarintPush(dAtA, i, uint64(size)) - } - i-- - dAtA[i] = 0xa - } - } - return len(dAtA) - i, nil -} - -func (m *PushResponse) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalToSizedBuffer(dAtA[:size]) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *PushResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { - i := len(dAtA) - _ = i - var l int - _ = l - return len(dAtA) - i, nil -} - -func (m *StreamAdapter) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalToSizedBuffer(dAtA[:size]) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *StreamAdapter) MarshalToSizedBuffer(dAtA []byte) (int, error) { - i := len(dAtA) - _ = i - var l int - _ = l - if m.Hash != 0 { - i = encodeVarintPush(dAtA, i, uint64(m.Hash)) - i-- - dAtA[i] = 0x18 - } - if len(m.Entries) > 0 { - for iNdEx := len(m.Entries) - 1; iNdEx >= 0; iNdEx-- { - { - size, err := m.Entries[iNdEx].MarshalToSizedBuffer(dAtA[:i]) - if err != nil { - return 0, err - } - i -= size - i = encodeVarintPush(dAtA, i, uint64(size)) - } - i-- - dAtA[i] = 0x12 - } - } - if len(m.Labels) > 0 { - i -= len(m.Labels) - copy(dAtA[i:], m.Labels) - i = encodeVarintPush(dAtA, i, uint64(len(m.Labels))) - i-- - dAtA[i] = 0xa - } - return len(dAtA) - i, nil -} - -func (m *EntryAdapter) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalToSizedBuffer(dAtA[:size]) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *EntryAdapter) MarshalToSizedBuffer(dAtA []byte) (int, error) { - i := len(dAtA) - _ = i - var l int - _ = l - if len(m.Line) > 0 { - i -= len(m.Line) - copy(dAtA[i:], m.Line) - i = encodeVarintPush(dAtA, i, uint64(len(m.Line))) - i-- - dAtA[i] = 0x12 - } - n1, err1 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.Timestamp, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(m.Timestamp):]) - if err1 != nil { - return 0, err1 - } - i -= n1 - i = encodeVarintPush(dAtA, i, uint64(n1)) - i-- - dAtA[i] = 0xa - return len(dAtA) - i, nil -} - -func encodeVarintPush(dAtA []byte, offset int, v uint64) int { - offset -= sovPush(v) - base := offset - for v >= 1<<7 { - dAtA[offset] = uint8(v&0x7f | 0x80) - v >>= 7 - offset++ - } - dAtA[offset] = uint8(v) - return base -} -func (m *PushRequest) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - if len(m.Streams) > 0 { - for _, e := range m.Streams { - l = e.Size() - n += 1 + l + sovPush(uint64(l)) - } - } - return n -} - -func (m *PushResponse) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - return n -} - -func (m *StreamAdapter) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - l = len(m.Labels) - if l > 0 { - n += 1 + l + sovPush(uint64(l)) - } - if len(m.Entries) > 0 { - for _, e := range m.Entries { - l = e.Size() - n += 1 + l + sovPush(uint64(l)) - } - } - if m.Hash != 0 { - n += 1 + sovPush(uint64(m.Hash)) - } - return n -} - -func (m *EntryAdapter) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - l = github_com_gogo_protobuf_types.SizeOfStdTime(m.Timestamp) - n += 1 + l + sovPush(uint64(l)) - l = len(m.Line) - if l > 0 { - n += 1 + l + sovPush(uint64(l)) - } - return n -} - -func sovPush(x uint64) (n int) { - return (math_bits.Len64(x|1) + 6) / 7 -} -func sozPush(x uint64) (n int) { - return sovPush(uint64((x << 1) ^ uint64((int64(x) >> 63)))) -} -func (this *PushRequest) String() string { - if this == nil { - return "nil" - } - s := strings.Join([]string{`&PushRequest{`, - `Streams:` + fmt.Sprintf("%v", this.Streams) + `,`, - `}`, - }, "") - return s -} -func (m *PushRequest) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPush - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: PushRequest: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: PushRequest: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Streams", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPush - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLengthPush - } - postIndex := iNdEx + msglen - if postIndex < 0 { - return ErrInvalidLengthPush - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Streams = append(m.Streams, Stream{}) - if err := m.Streams[len(m.Streams)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skipPush(dAtA[iNdEx:]) - if err != nil { - return err - } - if skippy < 0 { - return ErrInvalidLengthPush - } - if (iNdEx + skippy) < 0 { - return ErrInvalidLengthPush - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func (m *PushResponse) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPush - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: PushResponse: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: PushResponse: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - default: - iNdEx = preIndex - skippy, err := skipPush(dAtA[iNdEx:]) - if err != nil { - return err - } - if skippy < 0 { - return ErrInvalidLengthPush - } - if (iNdEx + skippy) < 0 { - return ErrInvalidLengthPush - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func (m *StreamAdapter) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPush - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: StreamAdapter: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: StreamAdapter: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPush - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLengthPush - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLengthPush - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Labels = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Entries", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPush - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLengthPush - } - postIndex := iNdEx + msglen - if postIndex < 0 { - return ErrInvalidLengthPush - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Entries = append(m.Entries, EntryAdapter{}) - if err := m.Entries[len(m.Entries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - case 3: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Hash", wireType) - } - m.Hash = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPush - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.Hash |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - default: - iNdEx = preIndex - skippy, err := skipPush(dAtA[iNdEx:]) - if err != nil { - return err - } - if skippy < 0 { - return ErrInvalidLengthPush - } - if (iNdEx + skippy) < 0 { - return ErrInvalidLengthPush - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func (m *EntryAdapter) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPush - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: EntryAdapter: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: EntryAdapter: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPush - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLengthPush - } - postIndex := iNdEx + msglen - if postIndex < 0 { - return ErrInvalidLengthPush - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(&m.Timestamp, dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Line", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPush - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLengthPush - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLengthPush - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Line = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skipPush(dAtA[iNdEx:]) - if err != nil { - return err - } - if skippy < 0 { - return ErrInvalidLengthPush - } - if (iNdEx + skippy) < 0 { - return ErrInvalidLengthPush - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func skipPush(dAtA []byte) (n int, err error) { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, ErrIntOverflowPush - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - wireType := int(wire & 0x7) - switch wireType { - case 0: - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, ErrIntOverflowPush - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - iNdEx++ - if dAtA[iNdEx-1] < 0x80 { - break - } - } - return iNdEx, nil - case 1: - iNdEx += 8 - return iNdEx, nil - case 2: - var length int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, ErrIntOverflowPush - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - length |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - if length < 0 { - return 0, ErrInvalidLengthPush - } - iNdEx += length - if iNdEx < 0 { - return 0, ErrInvalidLengthPush - } - return iNdEx, nil - case 3: - for { - var innerWire uint64 - var start int = iNdEx - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, ErrIntOverflowPush - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - innerWire |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - innerWireType := int(innerWire & 0x7) - if innerWireType == 4 { - break - } - next, err := skipPush(dAtA[start:]) - if err != nil { - return 0, err - } - iNdEx = start + next - if iNdEx < 0 { - return 0, ErrInvalidLengthPush - } - } - return iNdEx, nil - case 4: - return iNdEx, nil - case 5: - iNdEx += 4 - return iNdEx, nil - default: - return 0, fmt.Errorf("proto: illegal wireType %d", wireType) - } - } - panic("unreachable") -} - -var ( - ErrInvalidLengthPush = fmt.Errorf("proto: negative length found during unmarshaling") - ErrIntOverflowPush = fmt.Errorf("proto: integer overflow") -) diff --git a/app/vlinsert/loki/push_request.proto b/app/vlinsert/loki/push_request.proto deleted file mode 100644 index b0edbf47a..000000000 --- a/app/vlinsert/loki/push_request.proto +++ /dev/null @@ -1,38 +0,0 @@ -syntax = "proto3"; - -// source: https://raw.githubusercontent.com/grafana/loki/main/pkg/push/push.proto -// Licensed under the Apache License, Version 2.0 (the "License"); -// https://github.com/grafana/loki/blob/main/pkg/push/LICENSE - -package logproto; - -import "gogoproto/gogo.proto"; -import "google/protobuf/timestamp.proto"; - -option go_package = "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/loki"; - -message PushRequest { - repeated StreamAdapter streams = 1 [ - (gogoproto.jsontag) = "streams", - (gogoproto.customtype) = "Stream" - ]; -} - -message StreamAdapter { - string labels = 1 [(gogoproto.jsontag) = "labels"]; - repeated EntryAdapter entries = 2 [ - (gogoproto.nullable) = false, - (gogoproto.jsontag) = "entries" - ]; - // hash contains the original hash of the stream. - uint64 hash = 3 [(gogoproto.jsontag) = "-"]; -} - -message EntryAdapter { - google.protobuf.Timestamp timestamp = 1 [ - (gogoproto.stdtime) = true, - (gogoproto.nullable) = false, - (gogoproto.jsontag) = "ts" - ]; - string line = 2 [(gogoproto.jsontag) = "line"]; -} diff --git a/app/vlinsert/loki/timestamp.go b/app/vlinsert/loki/timestamp.go deleted file mode 100644 index 5891eec14..000000000 --- a/app/vlinsert/loki/timestamp.go +++ /dev/null @@ -1,110 +0,0 @@ -package loki - -// source: https://raw.githubusercontent.com/grafana/loki/main/pkg/push/timestamp.go -// Licensed under the Apache License, Version 2.0 (the "License"); -// https://github.com/grafana/loki/blob/main/pkg/push/LICENSE - -import ( - "errors" - "strconv" - "time" - - "github.com/gogo/protobuf/types" -) - -const ( - // Seconds field of the earliest valid Timestamp. - // This is time.Date(1, 1, 1, 0, 0, 0, 0, time.UTC).Unix(). - minValidSeconds = -62135596800 - // Seconds field just after the latest valid Timestamp. - // This is time.Date(10000, 1, 1, 0, 0, 0, 0, time.UTC).Unix(). - maxValidSeconds = 253402300800 -) - -// validateTimestamp determines whether a Timestamp is valid. -// A valid timestamp represents a time in the range -// [0001-01-01, 10000-01-01) and has a Nanos field -// in the range [0, 1e9). -// -// If the Timestamp is valid, validateTimestamp returns nil. -// Otherwise, it returns an error that describes -// the problem. -// -// Every valid Timestamp can be represented by a time.Time, but the converse is not true. -func validateTimestamp(ts *types.Timestamp) error { - if ts == nil { - return errors.New("timestamp: nil Timestamp") - } - if ts.Seconds < minValidSeconds { - return errors.New("timestamp: " + formatTimestamp(ts) + " before 0001-01-01") - } - if ts.Seconds >= maxValidSeconds { - return errors.New("timestamp: " + formatTimestamp(ts) + " after 10000-01-01") - } - if ts.Nanos < 0 || ts.Nanos >= 1e9 { - return errors.New("timestamp: " + formatTimestamp(ts) + ": nanos not in range [0, 1e9)") - } - return nil -} - -// formatTimestamp is equivalent to fmt.Sprintf("%#v", ts) -// but avoids the escape incurred by using fmt.Sprintf, eliminating -// unnecessary heap allocations. -func formatTimestamp(ts *types.Timestamp) string { - if ts == nil { - return "nil" - } - - seconds := strconv.FormatInt(ts.Seconds, 10) - nanos := strconv.FormatInt(int64(ts.Nanos), 10) - return "&types.Timestamp{Seconds: " + seconds + ",\nNanos: " + nanos + ",\n}" -} - -func sizeOfStdTime(t time.Time) int { - ts, err := timestampProto(t) - if err != nil { - return 0 - } - return ts.Size() -} - -func stdTimeMarshalTo(t time.Time, data []byte) (int, error) { - ts, err := timestampProto(t) - if err != nil { - return 0, err - } - return ts.MarshalTo(data) -} - -func stdTimeUnmarshal(t *time.Time, data []byte) error { - ts := &types.Timestamp{} - if err := ts.Unmarshal(data); err != nil { - return err - } - tt, err := timestampFromProto(ts) - if err != nil { - return err - } - *t = tt - return nil -} - -func timestampFromProto(ts *types.Timestamp) (time.Time, error) { - // Don't return the zero value on error, because corresponds to a valid - // timestamp. Instead return whatever time.Unix gives us. - var t time.Time - if ts == nil { - t = time.Unix(0, 0).UTC() // treat nil like the empty Timestamp - } else { - t = time.Unix(ts.Seconds, int64(ts.Nanos)).UTC() - } - return t, validateTimestamp(ts) -} - -func timestampProto(t time.Time) (types.Timestamp, error) { - ts := types.Timestamp{ - Seconds: t.Unix(), - Nanos: int32(t.Nanosecond()), - } - return ts, validateTimestamp(&ts) -} diff --git a/app/vlinsert/loki/types.go b/app/vlinsert/loki/types.go deleted file mode 100644 index 7dc1cfe25..000000000 --- a/app/vlinsert/loki/types.go +++ /dev/null @@ -1,418 +0,0 @@ -package loki - -// source: https://raw.githubusercontent.com/grafana/loki/main/pkg/push/types.go -// Licensed under the Apache License, Version 2.0 (the "License"); -// https://github.com/grafana/loki/blob/main/pkg/push/LICENSE - -import ( - "fmt" - "io" - "time" -) - -// Stream contains a unique labels set as a string and a set of entries for it. -// We are not using the proto generated version but this custom one so that we -// can improve serialization see benchmark. -type Stream struct { - Labels string `protobuf:"bytes,1,opt,name=labels,proto3" json:"labels"` - Entries []Entry `protobuf:"bytes,2,rep,name=entries,proto3,customtype=EntryAdapter" json:"entries"` - Hash uint64 `protobuf:"varint,3,opt,name=hash,proto3" json:"-"` -} - -// Entry is a log entry with a timestamp. -type Entry struct { - Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"ts"` - Line string `protobuf:"bytes,2,opt,name=line,proto3" json:"line"` -} - -// Marshal implements the proto.Marshaler interface. -func (m *Stream) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalToSizedBuffer(dAtA[:size]) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -// MarshalTo marshals m to dst. -func (m *Stream) MarshalTo(dAtA []byte) (int, error) { - size := m.Size() - return m.MarshalToSizedBuffer(dAtA[:size]) -} - -// MarshalToSizedBuffer marshals m to the sized buffer. -func (m *Stream) MarshalToSizedBuffer(dAtA []byte) (int, error) { - i := len(dAtA) - _ = i - var l int - _ = l - if m.Hash != 0 { - i = encodeVarintPush(dAtA, i, m.Hash) - i-- - dAtA[i] = 0x18 - } - if len(m.Entries) > 0 { - for iNdEx := len(m.Entries) - 1; iNdEx >= 0; iNdEx-- { - { - size, err := m.Entries[iNdEx].MarshalToSizedBuffer(dAtA[:i]) - if err != nil { - return 0, err - } - i -= size - i = encodeVarintPush(dAtA, i, uint64(size)) - } - i-- - dAtA[i] = 0x12 - } - } - if len(m.Labels) > 0 { - i -= len(m.Labels) - copy(dAtA[i:], m.Labels) - i = encodeVarintPush(dAtA, i, uint64(len(m.Labels))) - i-- - dAtA[i] = 0xa - } - return len(dAtA) - i, nil -} - -// Marshal implements the proto.Marshaler interface. -func (m *Entry) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalToSizedBuffer(dAtA[:size]) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -// MarshalTo marshals m to dst. -func (m *Entry) MarshalTo(dAtA []byte) (int, error) { - size := m.Size() - return m.MarshalToSizedBuffer(dAtA[:size]) -} - -// MarshalToSizedBuffer marshals m to the sized buffer. -func (m *Entry) MarshalToSizedBuffer(dAtA []byte) (int, error) { - i := len(dAtA) - _ = i - var l int - _ = l - if len(m.Line) > 0 { - i -= len(m.Line) - copy(dAtA[i:], m.Line) - i = encodeVarintPush(dAtA, i, uint64(len(m.Line))) - i-- - dAtA[i] = 0x12 - } - n7, err7 := stdTimeMarshalTo(m.Timestamp, dAtA[i-sizeOfStdTime(m.Timestamp):]) - if err7 != nil { - return 0, err7 - } - i -= n7 - i = encodeVarintPush(dAtA, i, uint64(n7)) - i-- - dAtA[i] = 0xa - return len(dAtA) - i, nil -} - -// Unmarshal unmarshals the given data into m. -func (m *Stream) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPush - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: StreamAdapter: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: StreamAdapter: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPush - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLengthPush - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLengthPush - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Labels = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Entries", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPush - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLengthPush - } - postIndex := iNdEx + msglen - if postIndex < 0 { - return ErrInvalidLengthPush - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Entries = append(m.Entries, Entry{}) - if err := m.Entries[len(m.Entries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - case 3: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Hash", wireType) - } - m.Hash = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPush - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.Hash |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - default: - iNdEx = preIndex - skippy, err := skipPush(dAtA[iNdEx:]) - if err != nil { - return err - } - if skippy < 0 { - return ErrInvalidLengthPush - } - if (iNdEx + skippy) < 0 { - return ErrInvalidLengthPush - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} - -// Unmarshal unmarshals the given data into m. -func (m *Entry) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPush - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: EntryAdapter: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: EntryAdapter: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPush - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLengthPush - } - postIndex := iNdEx + msglen - if postIndex < 0 { - return ErrInvalidLengthPush - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - if err := stdTimeUnmarshal(&m.Timestamp, dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Line", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPush - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLengthPush - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLengthPush - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Line = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skipPush(dAtA[iNdEx:]) - if err != nil { - return err - } - if skippy < 0 { - return ErrInvalidLengthPush - } - if (iNdEx + skippy) < 0 { - return ErrInvalidLengthPush - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} - -// Size returns the size of the serialized Stream. -func (m *Stream) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - l = len(m.Labels) - if l > 0 { - n += 1 + l + sovPush(uint64(l)) - } - if len(m.Entries) > 0 { - for _, e := range m.Entries { - l = e.Size() - n += 1 + l + sovPush(uint64(l)) - } - } - if m.Hash != 0 { - n += 1 + sovPush(m.Hash) - } - return n -} - -// Size returns the size of the serialized Entry -func (m *Entry) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - l = sizeOfStdTime(m.Timestamp) - n += 1 + l + sovPush(uint64(l)) - l = len(m.Line) - if l > 0 { - n += 1 + l + sovPush(uint64(l)) - } - return n -} diff --git a/lib/logstorage/stream_tags.go b/lib/logstorage/stream_tags.go index 2e946bab6..b5b0b4226 100644 --- a/lib/logstorage/stream_tags.go +++ b/lib/logstorage/stream_tags.go @@ -43,11 +43,7 @@ func (st *StreamTags) Reset() { st.buf = st.buf[:0] tags := st.tags - for i := range tags { - t := &tags[i] - t.Name = nil - t.Value = nil - } + clear(tags) st.tags = tags[:0] }