mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-02-09 15:27:11 +00:00
app/vminsert: reduce memory usage for Influx, Graphite and OpenTSDB protocols
Do not buffer per-connection data and just store it as it arrives
This commit is contained in:
parent
e307a4d92c
commit
968d094524
5 changed files with 242 additions and 79 deletions
60
app/vminsert/common/lines_reader.go
Normal file
60
app/vminsert/common/lines_reader.go
Normal file
|
@ -0,0 +1,60 @@
|
||||||
|
package common
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
// The maximum size of a single line returned by ReadLinesBlock.
|
||||||
|
const maxLineSize = 256 * 1024
|
||||||
|
|
||||||
|
// Default size in bytes of a single block returned by ReadLinesBlock.
|
||||||
|
const defaultBlockSize = 64 * 1024
|
||||||
|
|
||||||
|
// ReadLinesBlock reads a block of lines delimited by '\n' from tailBuf and r into dstBuf.
|
||||||
|
//
|
||||||
|
// Trailing chars after the last newline are put into tailBuf.
|
||||||
|
//
|
||||||
|
// Returns (dstBuf, tailBuf).
|
||||||
|
func ReadLinesBlock(r io.Reader, dstBuf, tailBuf []byte) ([]byte, []byte, error) {
|
||||||
|
if cap(dstBuf) < defaultBlockSize {
|
||||||
|
dstBuf = bytesutil.Resize(dstBuf, defaultBlockSize)
|
||||||
|
}
|
||||||
|
dstBuf = append(dstBuf[:0], tailBuf...)
|
||||||
|
again:
|
||||||
|
n, err := r.Read(dstBuf[len(dstBuf):cap(dstBuf)])
|
||||||
|
// Check for error only if zero bytes read from r, i.e. no forward progress made.
|
||||||
|
// Otherwise process the read data.
|
||||||
|
if n == 0 {
|
||||||
|
if err == nil {
|
||||||
|
return dstBuf, tailBuf, fmt.Errorf("no forward progress made")
|
||||||
|
}
|
||||||
|
return dstBuf, tailBuf, err
|
||||||
|
}
|
||||||
|
dstBuf = dstBuf[:len(dstBuf)+n]
|
||||||
|
|
||||||
|
// Search for the last newline in dstBuf and put the rest into tailBuf.
|
||||||
|
nn := bytes.LastIndexByte(dstBuf[len(dstBuf)-n:], '\n')
|
||||||
|
if nn < 0 {
|
||||||
|
// Didn't found at least a single line.
|
||||||
|
if len(dstBuf) > maxLineSize {
|
||||||
|
return dstBuf, tailBuf, fmt.Errorf("too long line: more than %d bytes", maxLineSize)
|
||||||
|
}
|
||||||
|
if cap(dstBuf) < 2*len(dstBuf) {
|
||||||
|
// Increase dsbBuf capacity, so more data could be read into it.
|
||||||
|
dstBufLen := len(dstBuf)
|
||||||
|
dstBuf = bytesutil.Resize(dstBuf, 2*cap(dstBuf))
|
||||||
|
dstBuf = dstBuf[:dstBufLen]
|
||||||
|
}
|
||||||
|
goto again
|
||||||
|
}
|
||||||
|
|
||||||
|
// Found at least a single line. Return it.
|
||||||
|
nn += len(dstBuf) - n
|
||||||
|
tailBuf = append(tailBuf[:0], dstBuf[nn+1:]...)
|
||||||
|
dstBuf = dstBuf[:nn]
|
||||||
|
return dstBuf, tailBuf, nil
|
||||||
|
}
|
148
app/vminsert/common/lines_reader_test.go
Normal file
148
app/vminsert/common/lines_reader_test.go
Normal file
|
@ -0,0 +1,148 @@
|
||||||
|
package common
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestReadLinesBlockFailure(t *testing.T) {
|
||||||
|
f := func(s string) {
|
||||||
|
t.Helper()
|
||||||
|
r := bytes.NewBufferString(s)
|
||||||
|
if _, _, err := ReadLinesBlock(r, nil, nil); err == nil {
|
||||||
|
t.Fatalf("expecting non-nil error")
|
||||||
|
}
|
||||||
|
sbr := &singleByteReader{
|
||||||
|
b: []byte(s),
|
||||||
|
}
|
||||||
|
if _, _, err := ReadLinesBlock(sbr, nil, nil); err == nil {
|
||||||
|
t.Fatalf("expecting non-nil error")
|
||||||
|
}
|
||||||
|
fr := &failureReader{}
|
||||||
|
if _, _, err := ReadLinesBlock(fr, nil, nil); err == nil {
|
||||||
|
t.Fatalf("expecting non-nil error")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// empty string
|
||||||
|
f("")
|
||||||
|
|
||||||
|
// no newline in nonempty string
|
||||||
|
f("foobar")
|
||||||
|
|
||||||
|
// too long string
|
||||||
|
b := make([]byte, maxLineSize+1)
|
||||||
|
f(string(b))
|
||||||
|
}
|
||||||
|
|
||||||
|
type failureReader struct{}
|
||||||
|
|
||||||
|
func (fr *failureReader) Read(p []byte) (int, error) {
|
||||||
|
return 0, fmt.Errorf("some error")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReadLineBlockSuccessSingleByteReader(t *testing.T) {
|
||||||
|
f := func(s, dstBufExpected, tailBufExpected string) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
r := &singleByteReader{
|
||||||
|
b: []byte(s),
|
||||||
|
}
|
||||||
|
dstBuf, tailBuf, err := ReadLinesBlock(r, nil, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %s", err)
|
||||||
|
}
|
||||||
|
if string(dstBuf) != dstBufExpected {
|
||||||
|
t.Fatalf("unexpected dstBuf; got %q; want %q; tailBuf=%q", dstBuf, dstBufExpected, tailBuf)
|
||||||
|
}
|
||||||
|
if string(tailBuf) != tailBufExpected {
|
||||||
|
t.Fatalf("unexpected tailBuf; got %q; want %q; dstBuf=%q", tailBuf, tailBufExpected, dstBuf)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify the same with non-empty dstBuf and tailBuf
|
||||||
|
r = &singleByteReader{
|
||||||
|
b: []byte(s),
|
||||||
|
}
|
||||||
|
dstBuf, tailBuf, err = ReadLinesBlock(r, dstBuf, tailBuf[:0])
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("non-empty bufs: unexpected error: %s", err)
|
||||||
|
}
|
||||||
|
if string(dstBuf) != dstBufExpected {
|
||||||
|
t.Fatalf("non-empty bufs: unexpected dstBuf; got %q; want %q; tailBuf=%q", dstBuf, dstBufExpected, tailBuf)
|
||||||
|
}
|
||||||
|
if string(tailBuf) != tailBufExpected {
|
||||||
|
t.Fatalf("non-empty bufs: unexpected tailBuf; got %q; want %q; dstBuf=%q", tailBuf, tailBufExpected, dstBuf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
f("\n", "", "")
|
||||||
|
f("foo\n", "foo", "")
|
||||||
|
f("\nfoo", "", "")
|
||||||
|
f("foo\nbar", "foo", "")
|
||||||
|
f("foo\nbar\nbaz", "foo", "")
|
||||||
|
|
||||||
|
// The maximum line size
|
||||||
|
b := make([]byte, maxLineSize+10)
|
||||||
|
b[maxLineSize] = '\n'
|
||||||
|
f(string(b), string(b[:maxLineSize]), "")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReadLineBlockSuccessBytesBuffer(t *testing.T) {
|
||||||
|
f := func(s, dstBufExpected, tailBufExpected string) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
r := bytes.NewBufferString(s)
|
||||||
|
dstBuf, tailBuf, err := ReadLinesBlock(r, nil, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %s", err)
|
||||||
|
}
|
||||||
|
if string(dstBuf) != dstBufExpected {
|
||||||
|
t.Fatalf("unexpected dstBuf; got %q; want %q; tailBuf=%q", dstBuf, dstBufExpected, tailBuf)
|
||||||
|
}
|
||||||
|
if string(tailBuf) != tailBufExpected {
|
||||||
|
t.Fatalf("unexpected tailBuf; got %q; want %q; dstBuf=%q", tailBuf, tailBufExpected, dstBuf)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify the same with non-empty dstBuf and tailBuf
|
||||||
|
r = bytes.NewBufferString(s)
|
||||||
|
dstBuf, tailBuf, err = ReadLinesBlock(r, dstBuf, tailBuf[:0])
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("non-empty bufs: unexpected error: %s", err)
|
||||||
|
}
|
||||||
|
if string(dstBuf) != dstBufExpected {
|
||||||
|
t.Fatalf("non-empty bufs: unexpected dstBuf; got %q; want %q; tailBuf=%q", dstBuf, dstBufExpected, tailBuf)
|
||||||
|
}
|
||||||
|
if string(tailBuf) != tailBufExpected {
|
||||||
|
t.Fatalf("non-empty bufs: unexpected tailBuf; got %q; want %q; dstBuf=%q", tailBuf, tailBufExpected, dstBuf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
f("\n", "", "")
|
||||||
|
f("foo\n", "foo", "")
|
||||||
|
f("\nfoo", "", "foo")
|
||||||
|
f("foo\nbar", "foo", "bar")
|
||||||
|
f("foo\nbar\nbaz", "foo\nbar", "baz")
|
||||||
|
|
||||||
|
// The maximum line size
|
||||||
|
b := make([]byte, maxLineSize+10)
|
||||||
|
b[maxLineSize] = '\n'
|
||||||
|
f(string(b), string(b[:maxLineSize]), string(b[maxLineSize+1:]))
|
||||||
|
}
|
||||||
|
|
||||||
|
type singleByteReader struct {
|
||||||
|
b []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sbr *singleByteReader) Read(p []byte) (int, error) {
|
||||||
|
if len(sbr.b) == 0 {
|
||||||
|
return 0, io.EOF
|
||||||
|
}
|
||||||
|
n := copy(p, sbr.b[:1])
|
||||||
|
sbr.b = sbr.b[n:]
|
||||||
|
if len(sbr.b) == 0 {
|
||||||
|
return n, io.EOF
|
||||||
|
}
|
||||||
|
return n, nil
|
||||||
|
}
|
|
@ -1,7 +1,6 @@
|
||||||
package graphite
|
package graphite
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
|
@ -55,8 +54,6 @@ func (ctx *pushCtx) InsertRows() error {
|
||||||
return ic.FlushBufs()
|
return ic.FlushBufs()
|
||||||
}
|
}
|
||||||
|
|
||||||
const maxReadPacketSize = 4 * 1024 * 1024
|
|
||||||
|
|
||||||
const flushTimeout = 3 * time.Second
|
const flushTimeout = 3 * time.Second
|
||||||
|
|
||||||
func (ctx *pushCtx) Read(r io.Reader) bool {
|
func (ctx *pushCtx) Read(r io.Reader) bool {
|
||||||
|
@ -71,33 +68,22 @@ func (ctx *pushCtx) Read(r io.Reader) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
lr := io.LimitReader(r, maxReadPacketSize)
|
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf)
|
||||||
ctx.reqBuf.Reset()
|
if ctx.err != nil {
|
||||||
ctx.reqBuf.B = append(ctx.reqBuf.B[:0], ctx.tailBuf...)
|
if ne, ok := ctx.err.(net.Error); ok && ne.Timeout() {
|
||||||
n, err := io.CopyBuffer(&ctx.reqBuf, lr, ctx.copyBuf[:])
|
|
||||||
if err != nil {
|
|
||||||
if ne, ok := err.(net.Error); ok && ne.Timeout() {
|
|
||||||
// Flush the read data on timeout and try reading again.
|
// Flush the read data on timeout and try reading again.
|
||||||
|
ctx.err = nil
|
||||||
} else {
|
} else {
|
||||||
graphiteReadErrors.Inc()
|
if ctx.err != io.EOF {
|
||||||
ctx.err = fmt.Errorf("cannot read graphite plaintext protocol data: %s", err)
|
graphiteReadErrors.Inc()
|
||||||
|
ctx.err = fmt.Errorf("cannot read graphite plaintext protocol data: %s", ctx.err)
|
||||||
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
} else if n < maxReadPacketSize {
|
|
||||||
// Mark the end of stream.
|
|
||||||
ctx.err = io.EOF
|
|
||||||
}
|
}
|
||||||
|
if err := ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf)); err != nil {
|
||||||
// Parse all the rows until the last newline in ctx.reqBuf.B
|
|
||||||
nn := bytes.LastIndexByte(ctx.reqBuf.B, '\n')
|
|
||||||
ctx.tailBuf = ctx.tailBuf[:0]
|
|
||||||
if nn >= 0 {
|
|
||||||
ctx.tailBuf = append(ctx.tailBuf[:0], ctx.reqBuf.B[nn+1:]...)
|
|
||||||
ctx.reqBuf.B = ctx.reqBuf.B[:nn]
|
|
||||||
}
|
|
||||||
if err = ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf.B)); err != nil {
|
|
||||||
graphiteUnmarshalErrors.Inc()
|
graphiteUnmarshalErrors.Inc()
|
||||||
ctx.err = fmt.Errorf("cannot unmarshal graphite plaintext protocol data with size %d: %s", len(ctx.reqBuf.B), err)
|
ctx.err = fmt.Errorf("cannot unmarshal graphite plaintext protocol data with size %d: %s", len(ctx.reqBuf), err)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -112,9 +98,8 @@ type pushCtx struct {
|
||||||
Rows Rows
|
Rows Rows
|
||||||
Common common.InsertCtx
|
Common common.InsertCtx
|
||||||
|
|
||||||
reqBuf bytesutil.ByteBuffer
|
reqBuf []byte
|
||||||
tailBuf []byte
|
tailBuf []byte
|
||||||
copyBuf [16 * 1024]byte
|
|
||||||
|
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
@ -129,7 +114,7 @@ func (ctx *pushCtx) Error() error {
|
||||||
func (ctx *pushCtx) reset() {
|
func (ctx *pushCtx) reset() {
|
||||||
ctx.Rows.Reset()
|
ctx.Rows.Reset()
|
||||||
ctx.Common.Reset(0)
|
ctx.Common.Reset(0)
|
||||||
ctx.reqBuf.Reset()
|
ctx.reqBuf = ctx.reqBuf[:0]
|
||||||
ctx.tailBuf = ctx.tailBuf[:0]
|
ctx.tailBuf = ctx.tailBuf[:0]
|
||||||
|
|
||||||
ctx.err = nil
|
ctx.err = nil
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package influx
|
package influx
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"compress/gzip"
|
"compress/gzip"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
@ -123,36 +122,21 @@ func putGzipReader(zr *gzip.Reader) {
|
||||||
|
|
||||||
var gzipReaderPool sync.Pool
|
var gzipReaderPool sync.Pool
|
||||||
|
|
||||||
const maxReadPacketSize = 4 * 1024 * 1024
|
|
||||||
|
|
||||||
func (ctx *pushCtx) Read(r io.Reader, tsMultiplier int64) bool {
|
func (ctx *pushCtx) Read(r io.Reader, tsMultiplier int64) bool {
|
||||||
if ctx.err != nil {
|
if ctx.err != nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
lr := io.LimitReader(r, maxReadPacketSize)
|
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf)
|
||||||
ctx.reqBuf.Reset()
|
if ctx.err != nil {
|
||||||
ctx.reqBuf.B = append(ctx.reqBuf.B[:0], ctx.tailBuf...)
|
if ctx.err != io.EOF {
|
||||||
n, err := io.CopyBuffer(&ctx.reqBuf, lr, ctx.copyBuf[:])
|
influxReadErrors.Inc()
|
||||||
if err != nil {
|
ctx.err = fmt.Errorf("cannot read influx line protocol data: %s", ctx.err)
|
||||||
influxReadErrors.Inc()
|
}
|
||||||
ctx.err = fmt.Errorf("cannot read influx line protocol data: %s", err)
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if n < maxReadPacketSize {
|
if err := ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf)); err != nil {
|
||||||
// Mark the end of stream.
|
|
||||||
ctx.err = io.EOF
|
|
||||||
}
|
|
||||||
|
|
||||||
// Parse all the rows until the last newline in ctx.reqBuf.B
|
|
||||||
nn := bytes.LastIndexByte(ctx.reqBuf.B, '\n')
|
|
||||||
ctx.tailBuf = ctx.tailBuf[:0]
|
|
||||||
if nn >= 0 {
|
|
||||||
ctx.tailBuf = append(ctx.tailBuf[:0], ctx.reqBuf.B[nn+1:]...)
|
|
||||||
ctx.reqBuf.B = ctx.reqBuf.B[:nn]
|
|
||||||
}
|
|
||||||
if err = ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf.B)); err != nil {
|
|
||||||
influxUnmarshalErrors.Inc()
|
influxUnmarshalErrors.Inc()
|
||||||
ctx.err = fmt.Errorf("cannot unmarshal influx line protocol data with size %d: %s", len(ctx.reqBuf.B), err)
|
ctx.err = fmt.Errorf("cannot unmarshal influx line protocol data with size %d: %s", len(ctx.reqBuf), err)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -191,9 +175,8 @@ type pushCtx struct {
|
||||||
Rows Rows
|
Rows Rows
|
||||||
Common common.InsertCtx
|
Common common.InsertCtx
|
||||||
|
|
||||||
reqBuf bytesutil.ByteBuffer
|
reqBuf []byte
|
||||||
tailBuf []byte
|
tailBuf []byte
|
||||||
copyBuf [16 * 1024]byte
|
|
||||||
metricNameBuf []byte
|
metricNameBuf []byte
|
||||||
metricGroupBuf []byte
|
metricGroupBuf []byte
|
||||||
|
|
||||||
|
@ -211,7 +194,7 @@ func (ctx *pushCtx) reset() {
|
||||||
ctx.Rows.Reset()
|
ctx.Rows.Reset()
|
||||||
ctx.Common.Reset(0)
|
ctx.Common.Reset(0)
|
||||||
|
|
||||||
ctx.reqBuf.Reset()
|
ctx.reqBuf = ctx.reqBuf[:0]
|
||||||
ctx.tailBuf = ctx.tailBuf[:0]
|
ctx.tailBuf = ctx.tailBuf[:0]
|
||||||
ctx.metricNameBuf = ctx.metricNameBuf[:0]
|
ctx.metricNameBuf = ctx.metricNameBuf[:0]
|
||||||
ctx.metricGroupBuf = ctx.metricGroupBuf[:0]
|
ctx.metricGroupBuf = ctx.metricGroupBuf[:0]
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package opentsdb
|
package opentsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
|
@ -71,33 +70,22 @@ func (ctx *pushCtx) Read(r io.Reader) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
lr := io.LimitReader(r, maxReadPacketSize)
|
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf)
|
||||||
ctx.reqBuf.Reset()
|
if ctx.err != nil {
|
||||||
ctx.reqBuf.B = append(ctx.reqBuf.B[:0], ctx.tailBuf...)
|
if ne, ok := ctx.err.(net.Error); ok && ne.Timeout() {
|
||||||
n, err := io.CopyBuffer(&ctx.reqBuf, lr, ctx.copyBuf[:])
|
|
||||||
if err != nil {
|
|
||||||
if ne, ok := err.(net.Error); ok && ne.Timeout() {
|
|
||||||
// Flush the read data on timeout and try reading again.
|
// Flush the read data on timeout and try reading again.
|
||||||
|
ctx.err = nil
|
||||||
} else {
|
} else {
|
||||||
opentsdbReadErrors.Inc()
|
if ctx.err != io.EOF {
|
||||||
ctx.err = fmt.Errorf("cannot read OpenTSDB put protocol data: %s", err)
|
opentsdbReadErrors.Inc()
|
||||||
|
ctx.err = fmt.Errorf("cannot read OpenTSDB put protocol data: %s", ctx.err)
|
||||||
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
} else if n < maxReadPacketSize {
|
|
||||||
// Mark the end of stream.
|
|
||||||
ctx.err = io.EOF
|
|
||||||
}
|
}
|
||||||
|
if err := ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf)); err != nil {
|
||||||
// Parse all the rows until the last newline in ctx.reqBuf.B
|
|
||||||
nn := bytes.LastIndexByte(ctx.reqBuf.B, '\n')
|
|
||||||
ctx.tailBuf = ctx.tailBuf[:0]
|
|
||||||
if nn >= 0 {
|
|
||||||
ctx.tailBuf = append(ctx.tailBuf[:0], ctx.reqBuf.B[nn+1:]...)
|
|
||||||
ctx.reqBuf.B = ctx.reqBuf.B[:nn]
|
|
||||||
}
|
|
||||||
if err = ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf.B)); err != nil {
|
|
||||||
opentsdbUnmarshalErrors.Inc()
|
opentsdbUnmarshalErrors.Inc()
|
||||||
ctx.err = fmt.Errorf("cannot unmarshal OpenTSDB put protocol data with size %d: %s", len(ctx.reqBuf.B), err)
|
ctx.err = fmt.Errorf("cannot unmarshal OpenTSDB put protocol data with size %d: %s", len(ctx.reqBuf), err)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -112,9 +100,8 @@ type pushCtx struct {
|
||||||
Rows Rows
|
Rows Rows
|
||||||
Common common.InsertCtx
|
Common common.InsertCtx
|
||||||
|
|
||||||
reqBuf bytesutil.ByteBuffer
|
reqBuf []byte
|
||||||
tailBuf []byte
|
tailBuf []byte
|
||||||
copyBuf [16 * 1024]byte
|
|
||||||
|
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
@ -129,7 +116,7 @@ func (ctx *pushCtx) Error() error {
|
||||||
func (ctx *pushCtx) reset() {
|
func (ctx *pushCtx) reset() {
|
||||||
ctx.Rows.Reset()
|
ctx.Rows.Reset()
|
||||||
ctx.Common.Reset(0)
|
ctx.Common.Reset(0)
|
||||||
ctx.reqBuf.Reset()
|
ctx.reqBuf = ctx.reqBuf[:0]
|
||||||
ctx.tailBuf = ctx.tailBuf[:0]
|
ctx.tailBuf = ctx.tailBuf[:0]
|
||||||
|
|
||||||
ctx.err = nil
|
ctx.err = nil
|
||||||
|
|
Loading…
Reference in a new issue