mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vlinsert/loki: use easyproto instead for parsing Loki protobuf messages
This commit is contained in:
parent
00c666a6c3
commit
ac06569c49
9 changed files with 382 additions and 1403 deletions
|
@ -79,12 +79,14 @@ func parseProtobufRequest(data []byte, lmp insertutils.LogMessageProcessor) (int
|
||||||
req := getPushRequest()
|
req := getPushRequest()
|
||||||
defer putPushRequest(req)
|
defer putPushRequest(req)
|
||||||
|
|
||||||
err = req.Unmarshal(bb.B)
|
err = req.UnmarshalProtobuf(bb.B)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("cannot parse request body: %w", err)
|
return 0, fmt.Errorf("cannot parse request body: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var commonFields []logstorage.Field
|
fields := getFields()
|
||||||
|
defer putFields(fields)
|
||||||
|
|
||||||
rowsIngested := 0
|
rowsIngested := 0
|
||||||
streams := req.Streams
|
streams := req.Streams
|
||||||
currentTimestamp := time.Now().UnixNano()
|
currentTimestamp := time.Now().UnixNano()
|
||||||
|
@ -92,30 +94,60 @@ func parseProtobufRequest(data []byte, lmp insertutils.LogMessageProcessor) (int
|
||||||
stream := &streams[i]
|
stream := &streams[i]
|
||||||
// st.Labels contains labels for the stream.
|
// st.Labels contains labels for the stream.
|
||||||
// Labels are same for all entries in the stream.
|
// Labels are same for all entries in the stream.
|
||||||
commonFields, err = parsePromLabels(commonFields[:0], stream.Labels)
|
fields.fields, err = parsePromLabels(fields.fields[:0], stream.Labels)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return rowsIngested, fmt.Errorf("cannot parse stream labels %q: %w", stream.Labels, err)
|
return rowsIngested, fmt.Errorf("cannot parse stream labels %q: %w", stream.Labels, err)
|
||||||
}
|
}
|
||||||
fields := commonFields
|
commonFieldsLen := len(fields.fields)
|
||||||
|
|
||||||
entries := stream.Entries
|
entries := stream.Entries
|
||||||
for j := range entries {
|
for j := range entries {
|
||||||
entry := &entries[j]
|
e := &entries[j]
|
||||||
fields = append(fields[:len(commonFields)], logstorage.Field{
|
fields.fields = fields.fields[:commonFieldsLen]
|
||||||
|
|
||||||
|
for _, lp := range e.StructuredMetadata {
|
||||||
|
fields.fields = append(fields.fields, logstorage.Field{
|
||||||
|
Name: lp.Name,
|
||||||
|
Value: lp.Value,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fields.fields = append(fields.fields, logstorage.Field{
|
||||||
Name: "_msg",
|
Name: "_msg",
|
||||||
Value: entry.Line,
|
Value: e.Line,
|
||||||
})
|
})
|
||||||
ts := entry.Timestamp.UnixNano()
|
|
||||||
|
ts := e.Timestamp.UnixNano()
|
||||||
if ts == 0 {
|
if ts == 0 {
|
||||||
ts = currentTimestamp
|
ts = currentTimestamp
|
||||||
}
|
}
|
||||||
lmp.AddRow(ts, fields)
|
|
||||||
|
lmp.AddRow(ts, fields.fields)
|
||||||
}
|
}
|
||||||
rowsIngested += len(stream.Entries)
|
rowsIngested += len(stream.Entries)
|
||||||
}
|
}
|
||||||
return rowsIngested, nil
|
return rowsIngested, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getFields() *fields {
|
||||||
|
v := fieldsPool.Get()
|
||||||
|
if v == nil {
|
||||||
|
return &fields{}
|
||||||
|
}
|
||||||
|
return v.(*fields)
|
||||||
|
}
|
||||||
|
|
||||||
|
func putFields(f *fields) {
|
||||||
|
f.fields = f.fields[:0]
|
||||||
|
fieldsPool.Put(f)
|
||||||
|
}
|
||||||
|
|
||||||
|
var fieldsPool sync.Pool
|
||||||
|
|
||||||
|
type fields struct {
|
||||||
|
fields []logstorage.Field
|
||||||
|
}
|
||||||
|
|
||||||
// parsePromLabels parses log fields in Prometheus text exposition format from s, appends them to dst and returns the result.
|
// parsePromLabels parses log fields in Prometheus text exposition format from s, appends them to dst and returns the result.
|
||||||
//
|
//
|
||||||
// See test data of promtail for examples: https://github.com/grafana/loki/blob/a24ef7b206e0ca63ee74ca6ecb0a09b745cd2258/pkg/push/types_test.go
|
// See test data of promtail for examples: https://github.com/grafana/loki/blob/a24ef7b206e0ca63ee74ca6ecb0a09b745cd2258/pkg/push/types_test.go
|
||||||
|
@ -181,6 +213,6 @@ func getPushRequest() *PushRequest {
|
||||||
}
|
}
|
||||||
|
|
||||||
func putPushRequest(req *PushRequest) {
|
func putPushRequest(req *PushRequest) {
|
||||||
req.Reset()
|
req.reset()
|
||||||
pushReqsPool.Put(req)
|
pushReqsPool.Put(req)
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,7 +36,7 @@ func (tlp *testLogMessageProcessor) AddRow(timestamp int64, fields []logstorage.
|
||||||
Entries: []Entry{
|
Entries: []Entry{
|
||||||
{
|
{
|
||||||
Timestamp: time.Unix(0, timestamp),
|
Timestamp: time.Unix(0, timestamp),
|
||||||
Line: msg,
|
Line: strings.Clone(msg),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
@ -58,10 +58,7 @@ func TestParseProtobufRequest_Success(t *testing.T) {
|
||||||
t.Fatalf("unexpected number of streams; got %d; want %d", len(tlp.pr.Streams), n)
|
t.Fatalf("unexpected number of streams; got %d; want %d", len(tlp.pr.Streams), n)
|
||||||
}
|
}
|
||||||
|
|
||||||
data, err := tlp.pr.Marshal()
|
data := tlp.pr.MarshalProtobuf(nil)
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unexpected error when marshaling PushRequest: %s", err)
|
|
||||||
}
|
|
||||||
encodedData := snappy.Encode(nil, data)
|
encodedData := snappy.Encode(nil, data)
|
||||||
|
|
||||||
tlp2 := &insertutils.TestLogMessageProcessor{}
|
tlp2 := &insertutils.TestLogMessageProcessor{}
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"github.com/golang/snappy"
|
"github.com/golang/snappy"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
func BenchmarkParseProtobufRequest(b *testing.B) {
|
func BenchmarkParseProtobufRequest(b *testing.B) {
|
||||||
|
@ -38,29 +39,47 @@ func benchmarkParseProtobufRequest(b *testing.B, streams, rows, labels int) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func getProtobufBody(streams, rows, labels int) []byte {
|
func getProtobufBody(streamsCount, rowsCount, labelsCount int) []byte {
|
||||||
var pr PushRequest
|
var b []byte
|
||||||
|
var entries []Entry
|
||||||
for i := 0; i < streams; i++ {
|
streams := make([]Stream, streamsCount)
|
||||||
var st Stream
|
for i := range streams {
|
||||||
|
b = b[:0]
|
||||||
st.Labels = `{`
|
b = append(b, '{')
|
||||||
for j := 0; j < labels; j++ {
|
for j := 0; j < labelsCount; j++ {
|
||||||
st.Labels += `label_` + strconv.Itoa(j) + `="value_` + strconv.Itoa(j) + `"`
|
b = append(b, "label_"...)
|
||||||
if j < labels-1 {
|
b = strconv.AppendInt(b, int64(j), 10)
|
||||||
st.Labels += `,`
|
b = append(b, `="value_`...)
|
||||||
|
b = strconv.AppendInt(b, int64(j), 10)
|
||||||
|
b = append(b, '"')
|
||||||
|
if j < labelsCount-1 {
|
||||||
|
b = append(b, ',')
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
st.Labels += `}`
|
b = append(b, '}')
|
||||||
|
labels := string(b)
|
||||||
|
|
||||||
for j := 0; j < rows; j++ {
|
var rowsBuf []byte
|
||||||
st.Entries = append(st.Entries, Entry{Timestamp: time.Now(), Line: "value_" + strconv.Itoa(j)})
|
entriesLen := len(entries)
|
||||||
|
for j := 0; j < rowsCount; j++ {
|
||||||
|
rowsBufLen := len(rowsBuf)
|
||||||
|
rowsBuf = append(rowsBuf, "value_"...)
|
||||||
|
rowsBuf = strconv.AppendInt(rowsBuf, int64(j), 10)
|
||||||
|
entries = append(entries, Entry{
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
Line: bytesutil.ToUnsafeString(rowsBuf[rowsBufLen:]),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pr.Streams = append(pr.Streams, st)
|
st := &streams[i]
|
||||||
|
st.Labels = labels
|
||||||
|
st.Entries = entries[entriesLen:]
|
||||||
|
}
|
||||||
|
pr := PushRequest{
|
||||||
|
Streams: streams,
|
||||||
}
|
}
|
||||||
|
|
||||||
body, _ := pr.Marshal()
|
body := pr.MarshalProtobuf(nil)
|
||||||
encodedBody := snappy.Encode(nil, body)
|
encodedBody := snappy.Encode(nil, body)
|
||||||
|
|
||||||
return encodedBody
|
return encodedBody
|
||||||
|
|
302
app/vlinsert/loki/pb.go
Normal file
302
app/vlinsert/loki/pb.go
Normal file
|
@ -0,0 +1,302 @@
|
||||||
|
// Code generated by protoc-gen-gogo. DO NOT EDIT.
|
||||||
|
// source: push_request.proto
|
||||||
|
// source: https://raw.githubusercontent.com/grafana/loki/main/pkg/push/push_request.proto
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// https://github.com/grafana/loki/blob/main/pkg/push/LICENSE
|
||||||
|
|
||||||
|
package loki
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/easyproto"
|
||||||
|
)
|
||||||
|
|
||||||
|
var mp easyproto.MarshalerPool
|
||||||
|
|
||||||
|
// PushRequest represents Loki PushRequest
|
||||||
|
//
|
||||||
|
// See https://github.com/grafana/loki/blob/4220737a52da7ab6c9346b12d5a5d7bedbcd641d/pkg/push/push.proto#L14C1-L14C20
|
||||||
|
type PushRequest struct {
|
||||||
|
Streams []Stream
|
||||||
|
|
||||||
|
entriesBuf []Entry
|
||||||
|
labelPairBuf []LabelPair
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pr *PushRequest) reset() {
|
||||||
|
pr.Streams = pr.Streams[:0]
|
||||||
|
|
||||||
|
pr.entriesBuf = pr.entriesBuf[:0]
|
||||||
|
pr.labelPairBuf = pr.labelPairBuf[:0]
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalProtobuf unmarshals pr from protobuf message at src.
|
||||||
|
//
|
||||||
|
// pr remains valid until src is modified.
|
||||||
|
func (pr *PushRequest) UnmarshalProtobuf(src []byte) error {
|
||||||
|
pr.reset()
|
||||||
|
var err error
|
||||||
|
pr.entriesBuf, pr.labelPairBuf, err = pr.unmarshalProtobuf(pr.entriesBuf, pr.labelPairBuf, src)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarshalProtobuf marshals r to protobuf message, appends it to dst and returns the result.
|
||||||
|
func (pr *PushRequest) MarshalProtobuf(dst []byte) []byte {
|
||||||
|
m := mp.Get()
|
||||||
|
pr.marshalProtobuf(m.MessageMarshaler())
|
||||||
|
dst = m.Marshal(dst)
|
||||||
|
mp.Put(m)
|
||||||
|
return dst
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pr *PushRequest) marshalProtobuf(mm *easyproto.MessageMarshaler) {
|
||||||
|
for _, s := range pr.Streams {
|
||||||
|
s.marshalProtobuf(mm.AppendMessage(1))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pr *PushRequest) unmarshalProtobuf(entriesBuf []Entry, labelPairBuf []LabelPair, src []byte) ([]Entry, []LabelPair, error) {
|
||||||
|
// message PushRequest {
|
||||||
|
// repeated Stream streams = 1;
|
||||||
|
// }
|
||||||
|
var err error
|
||||||
|
var fc easyproto.FieldContext
|
||||||
|
for len(src) > 0 {
|
||||||
|
src, err = fc.NextField(src)
|
||||||
|
if err != nil {
|
||||||
|
return entriesBuf, labelPairBuf, fmt.Errorf("cannot read next field in PushRequest: %w", err)
|
||||||
|
}
|
||||||
|
switch fc.FieldNum {
|
||||||
|
case 1:
|
||||||
|
data, ok := fc.MessageData()
|
||||||
|
if !ok {
|
||||||
|
return entriesBuf, labelPairBuf, fmt.Errorf("cannot read Stream data")
|
||||||
|
}
|
||||||
|
pr.Streams = append(pr.Streams, Stream{})
|
||||||
|
s := &pr.Streams[len(pr.Streams)-1]
|
||||||
|
entriesBuf, labelPairBuf, err = s.unmarshalProtobuf(entriesBuf, labelPairBuf, data)
|
||||||
|
if err != nil {
|
||||||
|
return entriesBuf, labelPairBuf, fmt.Errorf("cannot unmarshal Stream: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return entriesBuf, labelPairBuf, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stream represents Loki stream.
|
||||||
|
//
|
||||||
|
// See https://github.com/grafana/loki/blob/4220737a52da7ab6c9346b12d5a5d7bedbcd641d/pkg/push/push.proto#L23
|
||||||
|
type Stream struct {
|
||||||
|
Labels string
|
||||||
|
Entries []Entry
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Stream) marshalProtobuf(mm *easyproto.MessageMarshaler) {
|
||||||
|
mm.AppendString(1, s.Labels)
|
||||||
|
for _, e := range s.Entries {
|
||||||
|
e.marshalProtobuf(mm.AppendMessage(2))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Stream) unmarshalProtobuf(entriesBuf []Entry, labelPairBuf []LabelPair, src []byte) ([]Entry, []LabelPair, error) {
|
||||||
|
// message Stream {
|
||||||
|
// string labels = 1;
|
||||||
|
// repeated Entry entries = 2;
|
||||||
|
// }
|
||||||
|
var err error
|
||||||
|
var fc easyproto.FieldContext
|
||||||
|
entriesBufLen := len(entriesBuf)
|
||||||
|
for len(src) > 0 {
|
||||||
|
src, err = fc.NextField(src)
|
||||||
|
if err != nil {
|
||||||
|
return entriesBuf, labelPairBuf, fmt.Errorf("cannot read next field in Stream: %w", err)
|
||||||
|
}
|
||||||
|
switch fc.FieldNum {
|
||||||
|
case 1:
|
||||||
|
labels, ok := fc.String()
|
||||||
|
if !ok {
|
||||||
|
return entriesBuf, labelPairBuf, fmt.Errorf("cannot read labels")
|
||||||
|
}
|
||||||
|
s.Labels = labels
|
||||||
|
case 2:
|
||||||
|
data, ok := fc.MessageData()
|
||||||
|
if !ok {
|
||||||
|
return entriesBuf, labelPairBuf, fmt.Errorf("cannot read Entry data")
|
||||||
|
}
|
||||||
|
entriesBuf = append(entriesBuf, Entry{})
|
||||||
|
e := &entriesBuf[len(entriesBuf)-1]
|
||||||
|
labelPairBuf, err = e.unmarshalProtobuf(labelPairBuf, data)
|
||||||
|
if err != nil {
|
||||||
|
return entriesBuf, labelPairBuf, fmt.Errorf("cannot unmarshal Entry: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.Entries = entriesBuf[entriesBufLen:]
|
||||||
|
return entriesBuf, labelPairBuf, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Entry represents Loki entry.
|
||||||
|
//
|
||||||
|
// See https://github.com/grafana/loki/blob/4220737a52da7ab6c9346b12d5a5d7bedbcd641d/pkg/push/push.proto#L38
|
||||||
|
type Entry struct {
|
||||||
|
Timestamp time.Time
|
||||||
|
Line string
|
||||||
|
StructuredMetadata []LabelPair
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Entry) marshalProtobuf(mm *easyproto.MessageMarshaler) {
|
||||||
|
marshalTime(mm, 1, e.Timestamp)
|
||||||
|
mm.AppendString(2, e.Line)
|
||||||
|
for _, lp := range e.StructuredMetadata {
|
||||||
|
lp.marshalProtobuf(mm.AppendMessage(3))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Entry) unmarshalProtobuf(labelPairBuf []LabelPair, src []byte) ([]LabelPair, error) {
|
||||||
|
// message Entry {
|
||||||
|
// Timestamp timestamp = 1;
|
||||||
|
// string line = 2;
|
||||||
|
// repeated LabelPair structuredMetadata = 3;
|
||||||
|
// }
|
||||||
|
var err error
|
||||||
|
var fc easyproto.FieldContext
|
||||||
|
labelPairBufLen := len(labelPairBuf)
|
||||||
|
for len(src) > 0 {
|
||||||
|
src, err = fc.NextField(src)
|
||||||
|
if err != nil {
|
||||||
|
return labelPairBuf, fmt.Errorf("cannot read next field in Entry: %w", err)
|
||||||
|
}
|
||||||
|
switch fc.FieldNum {
|
||||||
|
case 1:
|
||||||
|
data, ok := fc.MessageData()
|
||||||
|
if !ok {
|
||||||
|
return labelPairBuf, fmt.Errorf("cannot read Timestamp data")
|
||||||
|
}
|
||||||
|
timestamp, err := unmarshalTime(data)
|
||||||
|
if err != nil {
|
||||||
|
return labelPairBuf, fmt.Errorf("cannot unmarshal Timestamp: %w", err)
|
||||||
|
}
|
||||||
|
e.Timestamp = timestamp
|
||||||
|
case 2:
|
||||||
|
line, ok := fc.String()
|
||||||
|
if !ok {
|
||||||
|
return labelPairBuf, fmt.Errorf("cannot read Line")
|
||||||
|
}
|
||||||
|
e.Line = line
|
||||||
|
case 3:
|
||||||
|
data, ok := fc.MessageData()
|
||||||
|
if !ok {
|
||||||
|
return labelPairBuf, fmt.Errorf("cannot read StructuredMetadata")
|
||||||
|
}
|
||||||
|
labelPairBuf = append(labelPairBuf, LabelPair{})
|
||||||
|
lp := &labelPairBuf[len(labelPairBuf)-1]
|
||||||
|
if err := lp.unmarshalProtobuf(data); err != nil {
|
||||||
|
return labelPairBuf, fmt.Errorf("cannot unmarshal StructuredMetadata: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
e.StructuredMetadata = labelPairBuf[labelPairBufLen:]
|
||||||
|
return labelPairBuf, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// LabelPair represents Loki label pair.
|
||||||
|
//
|
||||||
|
// See https://github.com/grafana/loki/blob/4220737a52da7ab6c9346b12d5a5d7bedbcd641d/pkg/push/push.proto#L33
|
||||||
|
type LabelPair struct {
|
||||||
|
Name string
|
||||||
|
Value string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lp *LabelPair) marshalProtobuf(mm *easyproto.MessageMarshaler) {
|
||||||
|
mm.AppendString(1, lp.Name)
|
||||||
|
mm.AppendString(2, lp.Value)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lp *LabelPair) unmarshalProtobuf(src []byte) (err error) {
|
||||||
|
// message LabelPair {
|
||||||
|
// string name = 1;
|
||||||
|
// string value = 2;
|
||||||
|
// }
|
||||||
|
var fc easyproto.FieldContext
|
||||||
|
for len(src) > 0 {
|
||||||
|
src, err = fc.NextField(src)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot read next field in LabelPair: %w", err)
|
||||||
|
}
|
||||||
|
switch fc.FieldNum {
|
||||||
|
case 1:
|
||||||
|
name, ok := fc.String()
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("cannot read name")
|
||||||
|
}
|
||||||
|
lp.Name = name
|
||||||
|
case 2:
|
||||||
|
value, ok := fc.String()
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("cannot unmarshal value")
|
||||||
|
}
|
||||||
|
lp.Value = value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func marshalTime(mm *easyproto.MessageMarshaler, fieldNum uint32, timestamp time.Time) {
|
||||||
|
nsecs := timestamp.UnixNano()
|
||||||
|
ts := Timestamp{
|
||||||
|
Seconds: nsecs / 1e9,
|
||||||
|
Nanos: int32(nsecs % 1e9),
|
||||||
|
}
|
||||||
|
ts.marshalProtobuf(mm.AppendMessage(fieldNum))
|
||||||
|
}
|
||||||
|
|
||||||
|
func unmarshalTime(src []byte) (time.Time, error) {
|
||||||
|
var ts Timestamp
|
||||||
|
if err := ts.unmarshalProtobuf(src); err != nil {
|
||||||
|
return time.Time{}, err
|
||||||
|
}
|
||||||
|
timestamp := time.Unix(ts.Seconds, int64(ts.Nanos)).UTC()
|
||||||
|
return timestamp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Timestamp is protobuf well-known timestamp type.
|
||||||
|
type Timestamp struct {
|
||||||
|
Seconds int64
|
||||||
|
Nanos int32
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ts *Timestamp) marshalProtobuf(mm *easyproto.MessageMarshaler) {
|
||||||
|
mm.AppendInt64(1, ts.Seconds)
|
||||||
|
mm.AppendInt32(2, ts.Nanos)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ts *Timestamp) unmarshalProtobuf(src []byte) (err error) {
|
||||||
|
// message Timestamp {
|
||||||
|
// int64 seconds = 1;
|
||||||
|
// int32 nanos = 2;
|
||||||
|
// }
|
||||||
|
var fc easyproto.FieldContext
|
||||||
|
for len(src) > 0 {
|
||||||
|
src, err = fc.NextField(src)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot read next field in Timestamp: %w", err)
|
||||||
|
}
|
||||||
|
switch fc.FieldNum {
|
||||||
|
case 1:
|
||||||
|
seconds, ok := fc.Int64()
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("cannot read Seconds")
|
||||||
|
}
|
||||||
|
ts.Seconds = seconds
|
||||||
|
case 2:
|
||||||
|
nanos, ok := fc.Int32()
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("cannot read Nanos")
|
||||||
|
}
|
||||||
|
ts.Nanos = nanos
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -1,801 +0,0 @@
|
||||||
// Code generated by protoc-gen-gogo. DO NOT EDIT.
|
|
||||||
// source: push_request.proto
|
|
||||||
// source: https://raw.githubusercontent.com/grafana/loki/main/pkg/push/push_request.proto
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// https://github.com/grafana/loki/blob/main/pkg/push/LICENSE
|
|
||||||
|
|
||||||
package loki
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
math_bits "math/bits"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
github_com_gogo_protobuf_types "github.com/gogo/protobuf/types"
|
|
||||||
)
|
|
||||||
|
|
||||||
type PushRequest struct {
|
|
||||||
Streams []Stream `protobuf:"bytes,1,rep,name=streams,proto3,customtype=Stream" json:"streams"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *PushRequest) Reset() { *m = PushRequest{} }
|
|
||||||
|
|
||||||
type PushResponse struct {
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *PushResponse) Reset() { *m = PushResponse{} }
|
|
||||||
|
|
||||||
type StreamAdapter struct {
|
|
||||||
Labels string `protobuf:"bytes,1,opt,name=labels,proto3" json:"labels"`
|
|
||||||
Entries []EntryAdapter `protobuf:"bytes,2,rep,name=entries,proto3" json:"entries"`
|
|
||||||
// hash contains the original hash of the stream.
|
|
||||||
Hash uint64 `protobuf:"varint,3,opt,name=hash,proto3" json:"-"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *StreamAdapter) Reset() { *m = StreamAdapter{} }
|
|
||||||
|
|
||||||
func (m *StreamAdapter) GetLabels() string {
|
|
||||||
if m != nil {
|
|
||||||
return m.Labels
|
|
||||||
}
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *StreamAdapter) GetEntries() []EntryAdapter {
|
|
||||||
if m != nil {
|
|
||||||
return m.Entries
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *StreamAdapter) GetHash() uint64 {
|
|
||||||
if m != nil {
|
|
||||||
return m.Hash
|
|
||||||
}
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
type EntryAdapter struct {
|
|
||||||
Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"ts"`
|
|
||||||
Line string `protobuf:"bytes,2,opt,name=line,proto3" json:"line"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *EntryAdapter) Reset() { *m = EntryAdapter{} }
|
|
||||||
|
|
||||||
func (m *EntryAdapter) GetTimestamp() time.Time {
|
|
||||||
if m != nil {
|
|
||||||
return m.Timestamp
|
|
||||||
}
|
|
||||||
return time.Time{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *EntryAdapter) GetLine() string {
|
|
||||||
if m != nil {
|
|
||||||
return m.Line
|
|
||||||
}
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *PushRequest) Marshal() (dAtA []byte, err error) {
|
|
||||||
size := m.Size()
|
|
||||||
dAtA = make([]byte, size)
|
|
||||||
n, err := m.MarshalToSizedBuffer(dAtA[:size])
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return dAtA[:n], nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *PushRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
|
||||||
i := len(dAtA)
|
|
||||||
_ = i
|
|
||||||
var l int
|
|
||||||
_ = l
|
|
||||||
if len(m.Streams) > 0 {
|
|
||||||
for iNdEx := len(m.Streams) - 1; iNdEx >= 0; iNdEx-- {
|
|
||||||
{
|
|
||||||
size := m.Streams[iNdEx].Size()
|
|
||||||
i -= size
|
|
||||||
if _, err := m.Streams[iNdEx].MarshalTo(dAtA[i:]); err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
i = encodeVarintPush(dAtA, i, uint64(size))
|
|
||||||
}
|
|
||||||
i--
|
|
||||||
dAtA[i] = 0xa
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return len(dAtA) - i, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *PushResponse) Marshal() (dAtA []byte, err error) {
|
|
||||||
size := m.Size()
|
|
||||||
dAtA = make([]byte, size)
|
|
||||||
n, err := m.MarshalToSizedBuffer(dAtA[:size])
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return dAtA[:n], nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *PushResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
|
||||||
i := len(dAtA)
|
|
||||||
_ = i
|
|
||||||
var l int
|
|
||||||
_ = l
|
|
||||||
return len(dAtA) - i, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *StreamAdapter) Marshal() (dAtA []byte, err error) {
|
|
||||||
size := m.Size()
|
|
||||||
dAtA = make([]byte, size)
|
|
||||||
n, err := m.MarshalToSizedBuffer(dAtA[:size])
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return dAtA[:n], nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *StreamAdapter) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
|
||||||
i := len(dAtA)
|
|
||||||
_ = i
|
|
||||||
var l int
|
|
||||||
_ = l
|
|
||||||
if m.Hash != 0 {
|
|
||||||
i = encodeVarintPush(dAtA, i, uint64(m.Hash))
|
|
||||||
i--
|
|
||||||
dAtA[i] = 0x18
|
|
||||||
}
|
|
||||||
if len(m.Entries) > 0 {
|
|
||||||
for iNdEx := len(m.Entries) - 1; iNdEx >= 0; iNdEx-- {
|
|
||||||
{
|
|
||||||
size, err := m.Entries[iNdEx].MarshalToSizedBuffer(dAtA[:i])
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
i -= size
|
|
||||||
i = encodeVarintPush(dAtA, i, uint64(size))
|
|
||||||
}
|
|
||||||
i--
|
|
||||||
dAtA[i] = 0x12
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(m.Labels) > 0 {
|
|
||||||
i -= len(m.Labels)
|
|
||||||
copy(dAtA[i:], m.Labels)
|
|
||||||
i = encodeVarintPush(dAtA, i, uint64(len(m.Labels)))
|
|
||||||
i--
|
|
||||||
dAtA[i] = 0xa
|
|
||||||
}
|
|
||||||
return len(dAtA) - i, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *EntryAdapter) Marshal() (dAtA []byte, err error) {
|
|
||||||
size := m.Size()
|
|
||||||
dAtA = make([]byte, size)
|
|
||||||
n, err := m.MarshalToSizedBuffer(dAtA[:size])
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return dAtA[:n], nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *EntryAdapter) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
|
||||||
i := len(dAtA)
|
|
||||||
_ = i
|
|
||||||
var l int
|
|
||||||
_ = l
|
|
||||||
if len(m.Line) > 0 {
|
|
||||||
i -= len(m.Line)
|
|
||||||
copy(dAtA[i:], m.Line)
|
|
||||||
i = encodeVarintPush(dAtA, i, uint64(len(m.Line)))
|
|
||||||
i--
|
|
||||||
dAtA[i] = 0x12
|
|
||||||
}
|
|
||||||
n1, err1 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.Timestamp, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(m.Timestamp):])
|
|
||||||
if err1 != nil {
|
|
||||||
return 0, err1
|
|
||||||
}
|
|
||||||
i -= n1
|
|
||||||
i = encodeVarintPush(dAtA, i, uint64(n1))
|
|
||||||
i--
|
|
||||||
dAtA[i] = 0xa
|
|
||||||
return len(dAtA) - i, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func encodeVarintPush(dAtA []byte, offset int, v uint64) int {
|
|
||||||
offset -= sovPush(v)
|
|
||||||
base := offset
|
|
||||||
for v >= 1<<7 {
|
|
||||||
dAtA[offset] = uint8(v&0x7f | 0x80)
|
|
||||||
v >>= 7
|
|
||||||
offset++
|
|
||||||
}
|
|
||||||
dAtA[offset] = uint8(v)
|
|
||||||
return base
|
|
||||||
}
|
|
||||||
func (m *PushRequest) Size() (n int) {
|
|
||||||
if m == nil {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
var l int
|
|
||||||
_ = l
|
|
||||||
if len(m.Streams) > 0 {
|
|
||||||
for _, e := range m.Streams {
|
|
||||||
l = e.Size()
|
|
||||||
n += 1 + l + sovPush(uint64(l))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return n
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *PushResponse) Size() (n int) {
|
|
||||||
if m == nil {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
var l int
|
|
||||||
_ = l
|
|
||||||
return n
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *StreamAdapter) Size() (n int) {
|
|
||||||
if m == nil {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
var l int
|
|
||||||
_ = l
|
|
||||||
l = len(m.Labels)
|
|
||||||
if l > 0 {
|
|
||||||
n += 1 + l + sovPush(uint64(l))
|
|
||||||
}
|
|
||||||
if len(m.Entries) > 0 {
|
|
||||||
for _, e := range m.Entries {
|
|
||||||
l = e.Size()
|
|
||||||
n += 1 + l + sovPush(uint64(l))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if m.Hash != 0 {
|
|
||||||
n += 1 + sovPush(uint64(m.Hash))
|
|
||||||
}
|
|
||||||
return n
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *EntryAdapter) Size() (n int) {
|
|
||||||
if m == nil {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
var l int
|
|
||||||
_ = l
|
|
||||||
l = github_com_gogo_protobuf_types.SizeOfStdTime(m.Timestamp)
|
|
||||||
n += 1 + l + sovPush(uint64(l))
|
|
||||||
l = len(m.Line)
|
|
||||||
if l > 0 {
|
|
||||||
n += 1 + l + sovPush(uint64(l))
|
|
||||||
}
|
|
||||||
return n
|
|
||||||
}
|
|
||||||
|
|
||||||
func sovPush(x uint64) (n int) {
|
|
||||||
return (math_bits.Len64(x|1) + 6) / 7
|
|
||||||
}
|
|
||||||
func sozPush(x uint64) (n int) {
|
|
||||||
return sovPush(uint64((x << 1) ^ uint64((int64(x) >> 63))))
|
|
||||||
}
|
|
||||||
func (this *PushRequest) String() string {
|
|
||||||
if this == nil {
|
|
||||||
return "nil"
|
|
||||||
}
|
|
||||||
s := strings.Join([]string{`&PushRequest{`,
|
|
||||||
`Streams:` + fmt.Sprintf("%v", this.Streams) + `,`,
|
|
||||||
`}`,
|
|
||||||
}, "")
|
|
||||||
return s
|
|
||||||
}
|
|
||||||
func (m *PushRequest) Unmarshal(dAtA []byte) error {
|
|
||||||
l := len(dAtA)
|
|
||||||
iNdEx := 0
|
|
||||||
for iNdEx < l {
|
|
||||||
preIndex := iNdEx
|
|
||||||
var wire uint64
|
|
||||||
for shift := uint(0); ; shift += 7 {
|
|
||||||
if shift >= 64 {
|
|
||||||
return ErrIntOverflowPush
|
|
||||||
}
|
|
||||||
if iNdEx >= l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
b := dAtA[iNdEx]
|
|
||||||
iNdEx++
|
|
||||||
wire |= uint64(b&0x7F) << shift
|
|
||||||
if b < 0x80 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
fieldNum := int32(wire >> 3)
|
|
||||||
wireType := int(wire & 0x7)
|
|
||||||
if wireType == 4 {
|
|
||||||
return fmt.Errorf("proto: PushRequest: wiretype end group for non-group")
|
|
||||||
}
|
|
||||||
if fieldNum <= 0 {
|
|
||||||
return fmt.Errorf("proto: PushRequest: illegal tag %d (wire type %d)", fieldNum, wire)
|
|
||||||
}
|
|
||||||
switch fieldNum {
|
|
||||||
case 1:
|
|
||||||
if wireType != 2 {
|
|
||||||
return fmt.Errorf("proto: wrong wireType = %d for field Streams", wireType)
|
|
||||||
}
|
|
||||||
var msglen int
|
|
||||||
for shift := uint(0); ; shift += 7 {
|
|
||||||
if shift >= 64 {
|
|
||||||
return ErrIntOverflowPush
|
|
||||||
}
|
|
||||||
if iNdEx >= l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
b := dAtA[iNdEx]
|
|
||||||
iNdEx++
|
|
||||||
msglen |= int(b&0x7F) << shift
|
|
||||||
if b < 0x80 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if msglen < 0 {
|
|
||||||
return ErrInvalidLengthPush
|
|
||||||
}
|
|
||||||
postIndex := iNdEx + msglen
|
|
||||||
if postIndex < 0 {
|
|
||||||
return ErrInvalidLengthPush
|
|
||||||
}
|
|
||||||
if postIndex > l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
m.Streams = append(m.Streams, Stream{})
|
|
||||||
if err := m.Streams[len(m.Streams)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
iNdEx = postIndex
|
|
||||||
default:
|
|
||||||
iNdEx = preIndex
|
|
||||||
skippy, err := skipPush(dAtA[iNdEx:])
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if skippy < 0 {
|
|
||||||
return ErrInvalidLengthPush
|
|
||||||
}
|
|
||||||
if (iNdEx + skippy) < 0 {
|
|
||||||
return ErrInvalidLengthPush
|
|
||||||
}
|
|
||||||
if (iNdEx + skippy) > l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
iNdEx += skippy
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if iNdEx > l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (m *PushResponse) Unmarshal(dAtA []byte) error {
|
|
||||||
l := len(dAtA)
|
|
||||||
iNdEx := 0
|
|
||||||
for iNdEx < l {
|
|
||||||
preIndex := iNdEx
|
|
||||||
var wire uint64
|
|
||||||
for shift := uint(0); ; shift += 7 {
|
|
||||||
if shift >= 64 {
|
|
||||||
return ErrIntOverflowPush
|
|
||||||
}
|
|
||||||
if iNdEx >= l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
b := dAtA[iNdEx]
|
|
||||||
iNdEx++
|
|
||||||
wire |= uint64(b&0x7F) << shift
|
|
||||||
if b < 0x80 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
fieldNum := int32(wire >> 3)
|
|
||||||
wireType := int(wire & 0x7)
|
|
||||||
if wireType == 4 {
|
|
||||||
return fmt.Errorf("proto: PushResponse: wiretype end group for non-group")
|
|
||||||
}
|
|
||||||
if fieldNum <= 0 {
|
|
||||||
return fmt.Errorf("proto: PushResponse: illegal tag %d (wire type %d)", fieldNum, wire)
|
|
||||||
}
|
|
||||||
switch fieldNum {
|
|
||||||
default:
|
|
||||||
iNdEx = preIndex
|
|
||||||
skippy, err := skipPush(dAtA[iNdEx:])
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if skippy < 0 {
|
|
||||||
return ErrInvalidLengthPush
|
|
||||||
}
|
|
||||||
if (iNdEx + skippy) < 0 {
|
|
||||||
return ErrInvalidLengthPush
|
|
||||||
}
|
|
||||||
if (iNdEx + skippy) > l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
iNdEx += skippy
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if iNdEx > l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (m *StreamAdapter) Unmarshal(dAtA []byte) error {
|
|
||||||
l := len(dAtA)
|
|
||||||
iNdEx := 0
|
|
||||||
for iNdEx < l {
|
|
||||||
preIndex := iNdEx
|
|
||||||
var wire uint64
|
|
||||||
for shift := uint(0); ; shift += 7 {
|
|
||||||
if shift >= 64 {
|
|
||||||
return ErrIntOverflowPush
|
|
||||||
}
|
|
||||||
if iNdEx >= l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
b := dAtA[iNdEx]
|
|
||||||
iNdEx++
|
|
||||||
wire |= uint64(b&0x7F) << shift
|
|
||||||
if b < 0x80 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
fieldNum := int32(wire >> 3)
|
|
||||||
wireType := int(wire & 0x7)
|
|
||||||
if wireType == 4 {
|
|
||||||
return fmt.Errorf("proto: StreamAdapter: wiretype end group for non-group")
|
|
||||||
}
|
|
||||||
if fieldNum <= 0 {
|
|
||||||
return fmt.Errorf("proto: StreamAdapter: illegal tag %d (wire type %d)", fieldNum, wire)
|
|
||||||
}
|
|
||||||
switch fieldNum {
|
|
||||||
case 1:
|
|
||||||
if wireType != 2 {
|
|
||||||
return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType)
|
|
||||||
}
|
|
||||||
var stringLen uint64
|
|
||||||
for shift := uint(0); ; shift += 7 {
|
|
||||||
if shift >= 64 {
|
|
||||||
return ErrIntOverflowPush
|
|
||||||
}
|
|
||||||
if iNdEx >= l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
b := dAtA[iNdEx]
|
|
||||||
iNdEx++
|
|
||||||
stringLen |= uint64(b&0x7F) << shift
|
|
||||||
if b < 0x80 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
intStringLen := int(stringLen)
|
|
||||||
if intStringLen < 0 {
|
|
||||||
return ErrInvalidLengthPush
|
|
||||||
}
|
|
||||||
postIndex := iNdEx + intStringLen
|
|
||||||
if postIndex < 0 {
|
|
||||||
return ErrInvalidLengthPush
|
|
||||||
}
|
|
||||||
if postIndex > l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
m.Labels = string(dAtA[iNdEx:postIndex])
|
|
||||||
iNdEx = postIndex
|
|
||||||
case 2:
|
|
||||||
if wireType != 2 {
|
|
||||||
return fmt.Errorf("proto: wrong wireType = %d for field Entries", wireType)
|
|
||||||
}
|
|
||||||
var msglen int
|
|
||||||
for shift := uint(0); ; shift += 7 {
|
|
||||||
if shift >= 64 {
|
|
||||||
return ErrIntOverflowPush
|
|
||||||
}
|
|
||||||
if iNdEx >= l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
b := dAtA[iNdEx]
|
|
||||||
iNdEx++
|
|
||||||
msglen |= int(b&0x7F) << shift
|
|
||||||
if b < 0x80 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if msglen < 0 {
|
|
||||||
return ErrInvalidLengthPush
|
|
||||||
}
|
|
||||||
postIndex := iNdEx + msglen
|
|
||||||
if postIndex < 0 {
|
|
||||||
return ErrInvalidLengthPush
|
|
||||||
}
|
|
||||||
if postIndex > l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
m.Entries = append(m.Entries, EntryAdapter{})
|
|
||||||
if err := m.Entries[len(m.Entries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
iNdEx = postIndex
|
|
||||||
case 3:
|
|
||||||
if wireType != 0 {
|
|
||||||
return fmt.Errorf("proto: wrong wireType = %d for field Hash", wireType)
|
|
||||||
}
|
|
||||||
m.Hash = 0
|
|
||||||
for shift := uint(0); ; shift += 7 {
|
|
||||||
if shift >= 64 {
|
|
||||||
return ErrIntOverflowPush
|
|
||||||
}
|
|
||||||
if iNdEx >= l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
b := dAtA[iNdEx]
|
|
||||||
iNdEx++
|
|
||||||
m.Hash |= uint64(b&0x7F) << shift
|
|
||||||
if b < 0x80 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
iNdEx = preIndex
|
|
||||||
skippy, err := skipPush(dAtA[iNdEx:])
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if skippy < 0 {
|
|
||||||
return ErrInvalidLengthPush
|
|
||||||
}
|
|
||||||
if (iNdEx + skippy) < 0 {
|
|
||||||
return ErrInvalidLengthPush
|
|
||||||
}
|
|
||||||
if (iNdEx + skippy) > l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
iNdEx += skippy
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if iNdEx > l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (m *EntryAdapter) Unmarshal(dAtA []byte) error {
|
|
||||||
l := len(dAtA)
|
|
||||||
iNdEx := 0
|
|
||||||
for iNdEx < l {
|
|
||||||
preIndex := iNdEx
|
|
||||||
var wire uint64
|
|
||||||
for shift := uint(0); ; shift += 7 {
|
|
||||||
if shift >= 64 {
|
|
||||||
return ErrIntOverflowPush
|
|
||||||
}
|
|
||||||
if iNdEx >= l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
b := dAtA[iNdEx]
|
|
||||||
iNdEx++
|
|
||||||
wire |= uint64(b&0x7F) << shift
|
|
||||||
if b < 0x80 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
fieldNum := int32(wire >> 3)
|
|
||||||
wireType := int(wire & 0x7)
|
|
||||||
if wireType == 4 {
|
|
||||||
return fmt.Errorf("proto: EntryAdapter: wiretype end group for non-group")
|
|
||||||
}
|
|
||||||
if fieldNum <= 0 {
|
|
||||||
return fmt.Errorf("proto: EntryAdapter: illegal tag %d (wire type %d)", fieldNum, wire)
|
|
||||||
}
|
|
||||||
switch fieldNum {
|
|
||||||
case 1:
|
|
||||||
if wireType != 2 {
|
|
||||||
return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType)
|
|
||||||
}
|
|
||||||
var msglen int
|
|
||||||
for shift := uint(0); ; shift += 7 {
|
|
||||||
if shift >= 64 {
|
|
||||||
return ErrIntOverflowPush
|
|
||||||
}
|
|
||||||
if iNdEx >= l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
b := dAtA[iNdEx]
|
|
||||||
iNdEx++
|
|
||||||
msglen |= int(b&0x7F) << shift
|
|
||||||
if b < 0x80 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if msglen < 0 {
|
|
||||||
return ErrInvalidLengthPush
|
|
||||||
}
|
|
||||||
postIndex := iNdEx + msglen
|
|
||||||
if postIndex < 0 {
|
|
||||||
return ErrInvalidLengthPush
|
|
||||||
}
|
|
||||||
if postIndex > l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(&m.Timestamp, dAtA[iNdEx:postIndex]); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
iNdEx = postIndex
|
|
||||||
case 2:
|
|
||||||
if wireType != 2 {
|
|
||||||
return fmt.Errorf("proto: wrong wireType = %d for field Line", wireType)
|
|
||||||
}
|
|
||||||
var stringLen uint64
|
|
||||||
for shift := uint(0); ; shift += 7 {
|
|
||||||
if shift >= 64 {
|
|
||||||
return ErrIntOverflowPush
|
|
||||||
}
|
|
||||||
if iNdEx >= l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
b := dAtA[iNdEx]
|
|
||||||
iNdEx++
|
|
||||||
stringLen |= uint64(b&0x7F) << shift
|
|
||||||
if b < 0x80 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
intStringLen := int(stringLen)
|
|
||||||
if intStringLen < 0 {
|
|
||||||
return ErrInvalidLengthPush
|
|
||||||
}
|
|
||||||
postIndex := iNdEx + intStringLen
|
|
||||||
if postIndex < 0 {
|
|
||||||
return ErrInvalidLengthPush
|
|
||||||
}
|
|
||||||
if postIndex > l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
m.Line = string(dAtA[iNdEx:postIndex])
|
|
||||||
iNdEx = postIndex
|
|
||||||
default:
|
|
||||||
iNdEx = preIndex
|
|
||||||
skippy, err := skipPush(dAtA[iNdEx:])
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if skippy < 0 {
|
|
||||||
return ErrInvalidLengthPush
|
|
||||||
}
|
|
||||||
if (iNdEx + skippy) < 0 {
|
|
||||||
return ErrInvalidLengthPush
|
|
||||||
}
|
|
||||||
if (iNdEx + skippy) > l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
iNdEx += skippy
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if iNdEx > l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func skipPush(dAtA []byte) (n int, err error) {
|
|
||||||
l := len(dAtA)
|
|
||||||
iNdEx := 0
|
|
||||||
for iNdEx < l {
|
|
||||||
var wire uint64
|
|
||||||
for shift := uint(0); ; shift += 7 {
|
|
||||||
if shift >= 64 {
|
|
||||||
return 0, ErrIntOverflowPush
|
|
||||||
}
|
|
||||||
if iNdEx >= l {
|
|
||||||
return 0, io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
b := dAtA[iNdEx]
|
|
||||||
iNdEx++
|
|
||||||
wire |= (uint64(b) & 0x7F) << shift
|
|
||||||
if b < 0x80 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
wireType := int(wire & 0x7)
|
|
||||||
switch wireType {
|
|
||||||
case 0:
|
|
||||||
for shift := uint(0); ; shift += 7 {
|
|
||||||
if shift >= 64 {
|
|
||||||
return 0, ErrIntOverflowPush
|
|
||||||
}
|
|
||||||
if iNdEx >= l {
|
|
||||||
return 0, io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
iNdEx++
|
|
||||||
if dAtA[iNdEx-1] < 0x80 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return iNdEx, nil
|
|
||||||
case 1:
|
|
||||||
iNdEx += 8
|
|
||||||
return iNdEx, nil
|
|
||||||
case 2:
|
|
||||||
var length int
|
|
||||||
for shift := uint(0); ; shift += 7 {
|
|
||||||
if shift >= 64 {
|
|
||||||
return 0, ErrIntOverflowPush
|
|
||||||
}
|
|
||||||
if iNdEx >= l {
|
|
||||||
return 0, io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
b := dAtA[iNdEx]
|
|
||||||
iNdEx++
|
|
||||||
length |= (int(b) & 0x7F) << shift
|
|
||||||
if b < 0x80 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if length < 0 {
|
|
||||||
return 0, ErrInvalidLengthPush
|
|
||||||
}
|
|
||||||
iNdEx += length
|
|
||||||
if iNdEx < 0 {
|
|
||||||
return 0, ErrInvalidLengthPush
|
|
||||||
}
|
|
||||||
return iNdEx, nil
|
|
||||||
case 3:
|
|
||||||
for {
|
|
||||||
var innerWire uint64
|
|
||||||
var start int = iNdEx
|
|
||||||
for shift := uint(0); ; shift += 7 {
|
|
||||||
if shift >= 64 {
|
|
||||||
return 0, ErrIntOverflowPush
|
|
||||||
}
|
|
||||||
if iNdEx >= l {
|
|
||||||
return 0, io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
b := dAtA[iNdEx]
|
|
||||||
iNdEx++
|
|
||||||
innerWire |= (uint64(b) & 0x7F) << shift
|
|
||||||
if b < 0x80 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
innerWireType := int(innerWire & 0x7)
|
|
||||||
if innerWireType == 4 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
next, err := skipPush(dAtA[start:])
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
iNdEx = start + next
|
|
||||||
if iNdEx < 0 {
|
|
||||||
return 0, ErrInvalidLengthPush
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return iNdEx, nil
|
|
||||||
case 4:
|
|
||||||
return iNdEx, nil
|
|
||||||
case 5:
|
|
||||||
iNdEx += 4
|
|
||||||
return iNdEx, nil
|
|
||||||
default:
|
|
||||||
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
panic("unreachable")
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
ErrInvalidLengthPush = fmt.Errorf("proto: negative length found during unmarshaling")
|
|
||||||
ErrIntOverflowPush = fmt.Errorf("proto: integer overflow")
|
|
||||||
)
|
|
|
@ -1,38 +0,0 @@
|
||||||
syntax = "proto3";
|
|
||||||
|
|
||||||
// source: https://raw.githubusercontent.com/grafana/loki/main/pkg/push/push.proto
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// https://github.com/grafana/loki/blob/main/pkg/push/LICENSE
|
|
||||||
|
|
||||||
package logproto;
|
|
||||||
|
|
||||||
import "gogoproto/gogo.proto";
|
|
||||||
import "google/protobuf/timestamp.proto";
|
|
||||||
|
|
||||||
option go_package = "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/loki";
|
|
||||||
|
|
||||||
message PushRequest {
|
|
||||||
repeated StreamAdapter streams = 1 [
|
|
||||||
(gogoproto.jsontag) = "streams",
|
|
||||||
(gogoproto.customtype) = "Stream"
|
|
||||||
];
|
|
||||||
}
|
|
||||||
|
|
||||||
message StreamAdapter {
|
|
||||||
string labels = 1 [(gogoproto.jsontag) = "labels"];
|
|
||||||
repeated EntryAdapter entries = 2 [
|
|
||||||
(gogoproto.nullable) = false,
|
|
||||||
(gogoproto.jsontag) = "entries"
|
|
||||||
];
|
|
||||||
// hash contains the original hash of the stream.
|
|
||||||
uint64 hash = 3 [(gogoproto.jsontag) = "-"];
|
|
||||||
}
|
|
||||||
|
|
||||||
message EntryAdapter {
|
|
||||||
google.protobuf.Timestamp timestamp = 1 [
|
|
||||||
(gogoproto.stdtime) = true,
|
|
||||||
(gogoproto.nullable) = false,
|
|
||||||
(gogoproto.jsontag) = "ts"
|
|
||||||
];
|
|
||||||
string line = 2 [(gogoproto.jsontag) = "line"];
|
|
||||||
}
|
|
|
@ -1,110 +0,0 @@
|
||||||
package loki
|
|
||||||
|
|
||||||
// source: https://raw.githubusercontent.com/grafana/loki/main/pkg/push/timestamp.go
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// https://github.com/grafana/loki/blob/main/pkg/push/LICENSE
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"strconv"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/gogo/protobuf/types"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
// Seconds field of the earliest valid Timestamp.
|
|
||||||
// This is time.Date(1, 1, 1, 0, 0, 0, 0, time.UTC).Unix().
|
|
||||||
minValidSeconds = -62135596800
|
|
||||||
// Seconds field just after the latest valid Timestamp.
|
|
||||||
// This is time.Date(10000, 1, 1, 0, 0, 0, 0, time.UTC).Unix().
|
|
||||||
maxValidSeconds = 253402300800
|
|
||||||
)
|
|
||||||
|
|
||||||
// validateTimestamp determines whether a Timestamp is valid.
|
|
||||||
// A valid timestamp represents a time in the range
|
|
||||||
// [0001-01-01, 10000-01-01) and has a Nanos field
|
|
||||||
// in the range [0, 1e9).
|
|
||||||
//
|
|
||||||
// If the Timestamp is valid, validateTimestamp returns nil.
|
|
||||||
// Otherwise, it returns an error that describes
|
|
||||||
// the problem.
|
|
||||||
//
|
|
||||||
// Every valid Timestamp can be represented by a time.Time, but the converse is not true.
|
|
||||||
func validateTimestamp(ts *types.Timestamp) error {
|
|
||||||
if ts == nil {
|
|
||||||
return errors.New("timestamp: nil Timestamp")
|
|
||||||
}
|
|
||||||
if ts.Seconds < minValidSeconds {
|
|
||||||
return errors.New("timestamp: " + formatTimestamp(ts) + " before 0001-01-01")
|
|
||||||
}
|
|
||||||
if ts.Seconds >= maxValidSeconds {
|
|
||||||
return errors.New("timestamp: " + formatTimestamp(ts) + " after 10000-01-01")
|
|
||||||
}
|
|
||||||
if ts.Nanos < 0 || ts.Nanos >= 1e9 {
|
|
||||||
return errors.New("timestamp: " + formatTimestamp(ts) + ": nanos not in range [0, 1e9)")
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// formatTimestamp is equivalent to fmt.Sprintf("%#v", ts)
|
|
||||||
// but avoids the escape incurred by using fmt.Sprintf, eliminating
|
|
||||||
// unnecessary heap allocations.
|
|
||||||
func formatTimestamp(ts *types.Timestamp) string {
|
|
||||||
if ts == nil {
|
|
||||||
return "nil"
|
|
||||||
}
|
|
||||||
|
|
||||||
seconds := strconv.FormatInt(ts.Seconds, 10)
|
|
||||||
nanos := strconv.FormatInt(int64(ts.Nanos), 10)
|
|
||||||
return "&types.Timestamp{Seconds: " + seconds + ",\nNanos: " + nanos + ",\n}"
|
|
||||||
}
|
|
||||||
|
|
||||||
func sizeOfStdTime(t time.Time) int {
|
|
||||||
ts, err := timestampProto(t)
|
|
||||||
if err != nil {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
return ts.Size()
|
|
||||||
}
|
|
||||||
|
|
||||||
func stdTimeMarshalTo(t time.Time, data []byte) (int, error) {
|
|
||||||
ts, err := timestampProto(t)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
return ts.MarshalTo(data)
|
|
||||||
}
|
|
||||||
|
|
||||||
func stdTimeUnmarshal(t *time.Time, data []byte) error {
|
|
||||||
ts := &types.Timestamp{}
|
|
||||||
if err := ts.Unmarshal(data); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
tt, err := timestampFromProto(ts)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
*t = tt
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func timestampFromProto(ts *types.Timestamp) (time.Time, error) {
|
|
||||||
// Don't return the zero value on error, because corresponds to a valid
|
|
||||||
// timestamp. Instead return whatever time.Unix gives us.
|
|
||||||
var t time.Time
|
|
||||||
if ts == nil {
|
|
||||||
t = time.Unix(0, 0).UTC() // treat nil like the empty Timestamp
|
|
||||||
} else {
|
|
||||||
t = time.Unix(ts.Seconds, int64(ts.Nanos)).UTC()
|
|
||||||
}
|
|
||||||
return t, validateTimestamp(ts)
|
|
||||||
}
|
|
||||||
|
|
||||||
func timestampProto(t time.Time) (types.Timestamp, error) {
|
|
||||||
ts := types.Timestamp{
|
|
||||||
Seconds: t.Unix(),
|
|
||||||
Nanos: int32(t.Nanosecond()),
|
|
||||||
}
|
|
||||||
return ts, validateTimestamp(&ts)
|
|
||||||
}
|
|
|
@ -1,418 +0,0 @@
|
||||||
package loki
|
|
||||||
|
|
||||||
// source: https://raw.githubusercontent.com/grafana/loki/main/pkg/push/types.go
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// https://github.com/grafana/loki/blob/main/pkg/push/LICENSE
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Stream contains a unique labels set as a string and a set of entries for it.
|
|
||||||
// We are not using the proto generated version but this custom one so that we
|
|
||||||
// can improve serialization see benchmark.
|
|
||||||
type Stream struct {
|
|
||||||
Labels string `protobuf:"bytes,1,opt,name=labels,proto3" json:"labels"`
|
|
||||||
Entries []Entry `protobuf:"bytes,2,rep,name=entries,proto3,customtype=EntryAdapter" json:"entries"`
|
|
||||||
Hash uint64 `protobuf:"varint,3,opt,name=hash,proto3" json:"-"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// Entry is a log entry with a timestamp.
|
|
||||||
type Entry struct {
|
|
||||||
Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"ts"`
|
|
||||||
Line string `protobuf:"bytes,2,opt,name=line,proto3" json:"line"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// Marshal implements the proto.Marshaler interface.
|
|
||||||
func (m *Stream) Marshal() (dAtA []byte, err error) {
|
|
||||||
size := m.Size()
|
|
||||||
dAtA = make([]byte, size)
|
|
||||||
n, err := m.MarshalToSizedBuffer(dAtA[:size])
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return dAtA[:n], nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// MarshalTo marshals m to dst.
|
|
||||||
func (m *Stream) MarshalTo(dAtA []byte) (int, error) {
|
|
||||||
size := m.Size()
|
|
||||||
return m.MarshalToSizedBuffer(dAtA[:size])
|
|
||||||
}
|
|
||||||
|
|
||||||
// MarshalToSizedBuffer marshals m to the sized buffer.
|
|
||||||
func (m *Stream) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
|
||||||
i := len(dAtA)
|
|
||||||
_ = i
|
|
||||||
var l int
|
|
||||||
_ = l
|
|
||||||
if m.Hash != 0 {
|
|
||||||
i = encodeVarintPush(dAtA, i, m.Hash)
|
|
||||||
i--
|
|
||||||
dAtA[i] = 0x18
|
|
||||||
}
|
|
||||||
if len(m.Entries) > 0 {
|
|
||||||
for iNdEx := len(m.Entries) - 1; iNdEx >= 0; iNdEx-- {
|
|
||||||
{
|
|
||||||
size, err := m.Entries[iNdEx].MarshalToSizedBuffer(dAtA[:i])
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
i -= size
|
|
||||||
i = encodeVarintPush(dAtA, i, uint64(size))
|
|
||||||
}
|
|
||||||
i--
|
|
||||||
dAtA[i] = 0x12
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(m.Labels) > 0 {
|
|
||||||
i -= len(m.Labels)
|
|
||||||
copy(dAtA[i:], m.Labels)
|
|
||||||
i = encodeVarintPush(dAtA, i, uint64(len(m.Labels)))
|
|
||||||
i--
|
|
||||||
dAtA[i] = 0xa
|
|
||||||
}
|
|
||||||
return len(dAtA) - i, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Marshal implements the proto.Marshaler interface.
|
|
||||||
func (m *Entry) Marshal() (dAtA []byte, err error) {
|
|
||||||
size := m.Size()
|
|
||||||
dAtA = make([]byte, size)
|
|
||||||
n, err := m.MarshalToSizedBuffer(dAtA[:size])
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return dAtA[:n], nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// MarshalTo marshals m to dst.
|
|
||||||
func (m *Entry) MarshalTo(dAtA []byte) (int, error) {
|
|
||||||
size := m.Size()
|
|
||||||
return m.MarshalToSizedBuffer(dAtA[:size])
|
|
||||||
}
|
|
||||||
|
|
||||||
// MarshalToSizedBuffer marshals m to the sized buffer.
|
|
||||||
func (m *Entry) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
|
||||||
i := len(dAtA)
|
|
||||||
_ = i
|
|
||||||
var l int
|
|
||||||
_ = l
|
|
||||||
if len(m.Line) > 0 {
|
|
||||||
i -= len(m.Line)
|
|
||||||
copy(dAtA[i:], m.Line)
|
|
||||||
i = encodeVarintPush(dAtA, i, uint64(len(m.Line)))
|
|
||||||
i--
|
|
||||||
dAtA[i] = 0x12
|
|
||||||
}
|
|
||||||
n7, err7 := stdTimeMarshalTo(m.Timestamp, dAtA[i-sizeOfStdTime(m.Timestamp):])
|
|
||||||
if err7 != nil {
|
|
||||||
return 0, err7
|
|
||||||
}
|
|
||||||
i -= n7
|
|
||||||
i = encodeVarintPush(dAtA, i, uint64(n7))
|
|
||||||
i--
|
|
||||||
dAtA[i] = 0xa
|
|
||||||
return len(dAtA) - i, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Unmarshal unmarshals the given data into m.
|
|
||||||
func (m *Stream) Unmarshal(dAtA []byte) error {
|
|
||||||
l := len(dAtA)
|
|
||||||
iNdEx := 0
|
|
||||||
for iNdEx < l {
|
|
||||||
preIndex := iNdEx
|
|
||||||
var wire uint64
|
|
||||||
for shift := uint(0); ; shift += 7 {
|
|
||||||
if shift >= 64 {
|
|
||||||
return ErrIntOverflowPush
|
|
||||||
}
|
|
||||||
if iNdEx >= l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
b := dAtA[iNdEx]
|
|
||||||
iNdEx++
|
|
||||||
wire |= uint64(b&0x7F) << shift
|
|
||||||
if b < 0x80 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
fieldNum := int32(wire >> 3)
|
|
||||||
wireType := int(wire & 0x7)
|
|
||||||
if wireType == 4 {
|
|
||||||
return fmt.Errorf("proto: StreamAdapter: wiretype end group for non-group")
|
|
||||||
}
|
|
||||||
if fieldNum <= 0 {
|
|
||||||
return fmt.Errorf("proto: StreamAdapter: illegal tag %d (wire type %d)", fieldNum, wire)
|
|
||||||
}
|
|
||||||
switch fieldNum {
|
|
||||||
case 1:
|
|
||||||
if wireType != 2 {
|
|
||||||
return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType)
|
|
||||||
}
|
|
||||||
var stringLen uint64
|
|
||||||
for shift := uint(0); ; shift += 7 {
|
|
||||||
if shift >= 64 {
|
|
||||||
return ErrIntOverflowPush
|
|
||||||
}
|
|
||||||
if iNdEx >= l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
b := dAtA[iNdEx]
|
|
||||||
iNdEx++
|
|
||||||
stringLen |= uint64(b&0x7F) << shift
|
|
||||||
if b < 0x80 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
intStringLen := int(stringLen)
|
|
||||||
if intStringLen < 0 {
|
|
||||||
return ErrInvalidLengthPush
|
|
||||||
}
|
|
||||||
postIndex := iNdEx + intStringLen
|
|
||||||
if postIndex < 0 {
|
|
||||||
return ErrInvalidLengthPush
|
|
||||||
}
|
|
||||||
if postIndex > l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
m.Labels = string(dAtA[iNdEx:postIndex])
|
|
||||||
iNdEx = postIndex
|
|
||||||
case 2:
|
|
||||||
if wireType != 2 {
|
|
||||||
return fmt.Errorf("proto: wrong wireType = %d for field Entries", wireType)
|
|
||||||
}
|
|
||||||
var msglen int
|
|
||||||
for shift := uint(0); ; shift += 7 {
|
|
||||||
if shift >= 64 {
|
|
||||||
return ErrIntOverflowPush
|
|
||||||
}
|
|
||||||
if iNdEx >= l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
b := dAtA[iNdEx]
|
|
||||||
iNdEx++
|
|
||||||
msglen |= int(b&0x7F) << shift
|
|
||||||
if b < 0x80 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if msglen < 0 {
|
|
||||||
return ErrInvalidLengthPush
|
|
||||||
}
|
|
||||||
postIndex := iNdEx + msglen
|
|
||||||
if postIndex < 0 {
|
|
||||||
return ErrInvalidLengthPush
|
|
||||||
}
|
|
||||||
if postIndex > l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
m.Entries = append(m.Entries, Entry{})
|
|
||||||
if err := m.Entries[len(m.Entries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
iNdEx = postIndex
|
|
||||||
case 3:
|
|
||||||
if wireType != 0 {
|
|
||||||
return fmt.Errorf("proto: wrong wireType = %d for field Hash", wireType)
|
|
||||||
}
|
|
||||||
m.Hash = 0
|
|
||||||
for shift := uint(0); ; shift += 7 {
|
|
||||||
if shift >= 64 {
|
|
||||||
return ErrIntOverflowPush
|
|
||||||
}
|
|
||||||
if iNdEx >= l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
b := dAtA[iNdEx]
|
|
||||||
iNdEx++
|
|
||||||
m.Hash |= uint64(b&0x7F) << shift
|
|
||||||
if b < 0x80 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
iNdEx = preIndex
|
|
||||||
skippy, err := skipPush(dAtA[iNdEx:])
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if skippy < 0 {
|
|
||||||
return ErrInvalidLengthPush
|
|
||||||
}
|
|
||||||
if (iNdEx + skippy) < 0 {
|
|
||||||
return ErrInvalidLengthPush
|
|
||||||
}
|
|
||||||
if (iNdEx + skippy) > l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
iNdEx += skippy
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if iNdEx > l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Unmarshal unmarshals the given data into m.
|
|
||||||
func (m *Entry) Unmarshal(dAtA []byte) error {
|
|
||||||
l := len(dAtA)
|
|
||||||
iNdEx := 0
|
|
||||||
for iNdEx < l {
|
|
||||||
preIndex := iNdEx
|
|
||||||
var wire uint64
|
|
||||||
for shift := uint(0); ; shift += 7 {
|
|
||||||
if shift >= 64 {
|
|
||||||
return ErrIntOverflowPush
|
|
||||||
}
|
|
||||||
if iNdEx >= l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
b := dAtA[iNdEx]
|
|
||||||
iNdEx++
|
|
||||||
wire |= uint64(b&0x7F) << shift
|
|
||||||
if b < 0x80 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
fieldNum := int32(wire >> 3)
|
|
||||||
wireType := int(wire & 0x7)
|
|
||||||
if wireType == 4 {
|
|
||||||
return fmt.Errorf("proto: EntryAdapter: wiretype end group for non-group")
|
|
||||||
}
|
|
||||||
if fieldNum <= 0 {
|
|
||||||
return fmt.Errorf("proto: EntryAdapter: illegal tag %d (wire type %d)", fieldNum, wire)
|
|
||||||
}
|
|
||||||
switch fieldNum {
|
|
||||||
case 1:
|
|
||||||
if wireType != 2 {
|
|
||||||
return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType)
|
|
||||||
}
|
|
||||||
var msglen int
|
|
||||||
for shift := uint(0); ; shift += 7 {
|
|
||||||
if shift >= 64 {
|
|
||||||
return ErrIntOverflowPush
|
|
||||||
}
|
|
||||||
if iNdEx >= l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
b := dAtA[iNdEx]
|
|
||||||
iNdEx++
|
|
||||||
msglen |= int(b&0x7F) << shift
|
|
||||||
if b < 0x80 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if msglen < 0 {
|
|
||||||
return ErrInvalidLengthPush
|
|
||||||
}
|
|
||||||
postIndex := iNdEx + msglen
|
|
||||||
if postIndex < 0 {
|
|
||||||
return ErrInvalidLengthPush
|
|
||||||
}
|
|
||||||
if postIndex > l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
if err := stdTimeUnmarshal(&m.Timestamp, dAtA[iNdEx:postIndex]); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
iNdEx = postIndex
|
|
||||||
case 2:
|
|
||||||
if wireType != 2 {
|
|
||||||
return fmt.Errorf("proto: wrong wireType = %d for field Line", wireType)
|
|
||||||
}
|
|
||||||
var stringLen uint64
|
|
||||||
for shift := uint(0); ; shift += 7 {
|
|
||||||
if shift >= 64 {
|
|
||||||
return ErrIntOverflowPush
|
|
||||||
}
|
|
||||||
if iNdEx >= l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
b := dAtA[iNdEx]
|
|
||||||
iNdEx++
|
|
||||||
stringLen |= uint64(b&0x7F) << shift
|
|
||||||
if b < 0x80 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
intStringLen := int(stringLen)
|
|
||||||
if intStringLen < 0 {
|
|
||||||
return ErrInvalidLengthPush
|
|
||||||
}
|
|
||||||
postIndex := iNdEx + intStringLen
|
|
||||||
if postIndex < 0 {
|
|
||||||
return ErrInvalidLengthPush
|
|
||||||
}
|
|
||||||
if postIndex > l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
m.Line = string(dAtA[iNdEx:postIndex])
|
|
||||||
iNdEx = postIndex
|
|
||||||
default:
|
|
||||||
iNdEx = preIndex
|
|
||||||
skippy, err := skipPush(dAtA[iNdEx:])
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if skippy < 0 {
|
|
||||||
return ErrInvalidLengthPush
|
|
||||||
}
|
|
||||||
if (iNdEx + skippy) < 0 {
|
|
||||||
return ErrInvalidLengthPush
|
|
||||||
}
|
|
||||||
if (iNdEx + skippy) > l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
iNdEx += skippy
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if iNdEx > l {
|
|
||||||
return io.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Size returns the size of the serialized Stream.
|
|
||||||
func (m *Stream) Size() (n int) {
|
|
||||||
if m == nil {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
var l int
|
|
||||||
_ = l
|
|
||||||
l = len(m.Labels)
|
|
||||||
if l > 0 {
|
|
||||||
n += 1 + l + sovPush(uint64(l))
|
|
||||||
}
|
|
||||||
if len(m.Entries) > 0 {
|
|
||||||
for _, e := range m.Entries {
|
|
||||||
l = e.Size()
|
|
||||||
n += 1 + l + sovPush(uint64(l))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if m.Hash != 0 {
|
|
||||||
n += 1 + sovPush(m.Hash)
|
|
||||||
}
|
|
||||||
return n
|
|
||||||
}
|
|
||||||
|
|
||||||
// Size returns the size of the serialized Entry
|
|
||||||
func (m *Entry) Size() (n int) {
|
|
||||||
if m == nil {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
var l int
|
|
||||||
_ = l
|
|
||||||
l = sizeOfStdTime(m.Timestamp)
|
|
||||||
n += 1 + l + sovPush(uint64(l))
|
|
||||||
l = len(m.Line)
|
|
||||||
if l > 0 {
|
|
||||||
n += 1 + l + sovPush(uint64(l))
|
|
||||||
}
|
|
||||||
return n
|
|
||||||
}
|
|
|
@ -43,11 +43,7 @@ func (st *StreamTags) Reset() {
|
||||||
st.buf = st.buf[:0]
|
st.buf = st.buf[:0]
|
||||||
|
|
||||||
tags := st.tags
|
tags := st.tags
|
||||||
for i := range tags {
|
clear(tags)
|
||||||
t := &tags[i]
|
|
||||||
t.Name = nil
|
|
||||||
t.Value = nil
|
|
||||||
}
|
|
||||||
st.tags = tags[:0]
|
st.tags = tags[:0]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue