app/vminsert: reduce memory usage for Influx, Graphite and OpenTSDB protocols

Do not buffer per-connection data and just store it as it arrives
This commit is contained in:
Aliaksandr Valialkin 2019-05-28 17:31:35 +03:00
parent a6d02ff275
commit a4ec139a4a
5 changed files with 245 additions and 79 deletions

View file

@ -0,0 +1,60 @@
package common
import (
"bytes"
"fmt"
"io"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
// The maximum size of a single line returned by ReadLinesBlock.
const maxLineSize = 256 * 1024
// Default size in bytes of a single block returned by ReadLinesBlock.
const defaultBlockSize = 64 * 1024
// ReadLinesBlock reads a block of lines delimited by '\n' from tailBuf and r into dstBuf.
//
// Trailing chars after the last newline are put into tailBuf.
//
// Returns (dstBuf, tailBuf).
func ReadLinesBlock(r io.Reader, dstBuf, tailBuf []byte) ([]byte, []byte, error) {
if cap(dstBuf) < defaultBlockSize {
dstBuf = bytesutil.Resize(dstBuf, defaultBlockSize)
}
dstBuf = append(dstBuf[:0], tailBuf...)
again:
n, err := r.Read(dstBuf[len(dstBuf):cap(dstBuf)])
// Check for error only if zero bytes read from r, i.e. no forward progress made.
// Otherwise process the read data.
if n == 0 {
if err == nil {
return dstBuf, tailBuf, fmt.Errorf("no forward progress made")
}
return dstBuf, tailBuf, err
}
dstBuf = dstBuf[:len(dstBuf)+n]
// Search for the last newline in dstBuf and put the rest into tailBuf.
nn := bytes.LastIndexByte(dstBuf[len(dstBuf)-n:], '\n')
if nn < 0 {
// Didn't found at least a single line.
if len(dstBuf) > maxLineSize {
return dstBuf, tailBuf, fmt.Errorf("too long line: more than %d bytes", maxLineSize)
}
if cap(dstBuf) < 2*len(dstBuf) {
// Increase dsbBuf capacity, so more data could be read into it.
dstBufLen := len(dstBuf)
dstBuf = bytesutil.Resize(dstBuf, 2*cap(dstBuf))
dstBuf = dstBuf[:dstBufLen]
}
goto again
}
// Found at least a single line. Return it.
nn += len(dstBuf) - n
tailBuf = append(tailBuf[:0], dstBuf[nn+1:]...)
dstBuf = dstBuf[:nn]
return dstBuf, tailBuf, nil
}

View file

@ -0,0 +1,148 @@
package common
import (
"bytes"
"fmt"
"io"
"testing"
)
func TestReadLinesBlockFailure(t *testing.T) {
f := func(s string) {
t.Helper()
r := bytes.NewBufferString(s)
if _, _, err := ReadLinesBlock(r, nil, nil); err == nil {
t.Fatalf("expecting non-nil error")
}
sbr := &singleByteReader{
b: []byte(s),
}
if _, _, err := ReadLinesBlock(sbr, nil, nil); err == nil {
t.Fatalf("expecting non-nil error")
}
fr := &failureReader{}
if _, _, err := ReadLinesBlock(fr, nil, nil); err == nil {
t.Fatalf("expecting non-nil error")
}
}
// empty string
f("")
// no newline in nonempty string
f("foobar")
// too long string
b := make([]byte, maxLineSize+1)
f(string(b))
}
type failureReader struct{}
func (fr *failureReader) Read(p []byte) (int, error) {
return 0, fmt.Errorf("some error")
}
func TestReadLineBlockSuccessSingleByteReader(t *testing.T) {
f := func(s, dstBufExpected, tailBufExpected string) {
t.Helper()
r := &singleByteReader{
b: []byte(s),
}
dstBuf, tailBuf, err := ReadLinesBlock(r, nil, nil)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if string(dstBuf) != dstBufExpected {
t.Fatalf("unexpected dstBuf; got %q; want %q; tailBuf=%q", dstBuf, dstBufExpected, tailBuf)
}
if string(tailBuf) != tailBufExpected {
t.Fatalf("unexpected tailBuf; got %q; want %q; dstBuf=%q", tailBuf, tailBufExpected, dstBuf)
}
// Verify the same with non-empty dstBuf and tailBuf
r = &singleByteReader{
b: []byte(s),
}
dstBuf, tailBuf, err = ReadLinesBlock(r, dstBuf, tailBuf[:0])
if err != nil {
t.Fatalf("non-empty bufs: unexpected error: %s", err)
}
if string(dstBuf) != dstBufExpected {
t.Fatalf("non-empty bufs: unexpected dstBuf; got %q; want %q; tailBuf=%q", dstBuf, dstBufExpected, tailBuf)
}
if string(tailBuf) != tailBufExpected {
t.Fatalf("non-empty bufs: unexpected tailBuf; got %q; want %q; dstBuf=%q", tailBuf, tailBufExpected, dstBuf)
}
}
f("\n", "", "")
f("foo\n", "foo", "")
f("\nfoo", "", "")
f("foo\nbar", "foo", "")
f("foo\nbar\nbaz", "foo", "")
// The maximum line size
b := make([]byte, maxLineSize+10)
b[maxLineSize] = '\n'
f(string(b), string(b[:maxLineSize]), "")
}
func TestReadLineBlockSuccessBytesBuffer(t *testing.T) {
f := func(s, dstBufExpected, tailBufExpected string) {
t.Helper()
r := bytes.NewBufferString(s)
dstBuf, tailBuf, err := ReadLinesBlock(r, nil, nil)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if string(dstBuf) != dstBufExpected {
t.Fatalf("unexpected dstBuf; got %q; want %q; tailBuf=%q", dstBuf, dstBufExpected, tailBuf)
}
if string(tailBuf) != tailBufExpected {
t.Fatalf("unexpected tailBuf; got %q; want %q; dstBuf=%q", tailBuf, tailBufExpected, dstBuf)
}
// Verify the same with non-empty dstBuf and tailBuf
r = bytes.NewBufferString(s)
dstBuf, tailBuf, err = ReadLinesBlock(r, dstBuf, tailBuf[:0])
if err != nil {
t.Fatalf("non-empty bufs: unexpected error: %s", err)
}
if string(dstBuf) != dstBufExpected {
t.Fatalf("non-empty bufs: unexpected dstBuf; got %q; want %q; tailBuf=%q", dstBuf, dstBufExpected, tailBuf)
}
if string(tailBuf) != tailBufExpected {
t.Fatalf("non-empty bufs: unexpected tailBuf; got %q; want %q; dstBuf=%q", tailBuf, tailBufExpected, dstBuf)
}
}
f("\n", "", "")
f("foo\n", "foo", "")
f("\nfoo", "", "foo")
f("foo\nbar", "foo", "bar")
f("foo\nbar\nbaz", "foo\nbar", "baz")
// The maximum line size
b := make([]byte, maxLineSize+10)
b[maxLineSize] = '\n'
f(string(b), string(b[:maxLineSize]), string(b[maxLineSize+1:]))
}
type singleByteReader struct {
b []byte
}
func (sbr *singleByteReader) Read(p []byte) (int, error) {
if len(sbr.b) == 0 {
return 0, io.EOF
}
n := copy(p, sbr.b[:1])
sbr.b = sbr.b[n:]
if len(sbr.b) == 0 {
return n, io.EOF
}
return n, nil
}

View file

@ -1,7 +1,6 @@
package graphite
import (
"bytes"
"fmt"
"io"
"net"
@ -9,6 +8,7 @@ import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/concurrencylimiter"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
@ -58,8 +58,6 @@ func (ctx *pushCtx) InsertRows(at *auth.Token) error {
return ic.FlushBufs()
}
const maxReadPacketSize = 4 * 1024 * 1024
const flushTimeout = 3 * time.Second
func (ctx *pushCtx) Read(r io.Reader) bool {
@ -74,33 +72,22 @@ func (ctx *pushCtx) Read(r io.Reader) bool {
return false
}
}
lr := io.LimitReader(r, maxReadPacketSize)
ctx.reqBuf.Reset()
ctx.reqBuf.B = append(ctx.reqBuf.B[:0], ctx.tailBuf...)
n, err := io.CopyBuffer(&ctx.reqBuf, lr, ctx.copyBuf[:])
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Timeout() {
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf)
if ctx.err != nil {
if ne, ok := ctx.err.(net.Error); ok && ne.Timeout() {
// Flush the read data on timeout and try reading again.
ctx.err = nil
} else {
graphiteReadErrors.Inc()
ctx.err = fmt.Errorf("cannot read graphite plaintext protocol data: %s", err)
if ctx.err != io.EOF {
graphiteReadErrors.Inc()
ctx.err = fmt.Errorf("cannot read graphite plaintext protocol data: %s", ctx.err)
}
return false
}
} else if n < maxReadPacketSize {
// Mark the end of stream.
ctx.err = io.EOF
}
// Parse all the rows until the last newline in ctx.reqBuf.B
nn := bytes.LastIndexByte(ctx.reqBuf.B, '\n')
ctx.tailBuf = ctx.tailBuf[:0]
if nn >= 0 {
ctx.tailBuf = append(ctx.tailBuf[:0], ctx.reqBuf.B[nn+1:]...)
ctx.reqBuf.B = ctx.reqBuf.B[:nn]
}
if err = ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf.B)); err != nil {
if err := ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf)); err != nil {
graphiteUnmarshalErrors.Inc()
ctx.err = fmt.Errorf("cannot unmarshal graphite plaintext protocol data with size %d: %s", len(ctx.reqBuf.B), err)
ctx.err = fmt.Errorf("cannot unmarshal graphite plaintext protocol data with size %d: %s", len(ctx.reqBuf), err)
return false
}
@ -115,9 +102,8 @@ type pushCtx struct {
Rows Rows
Common netstorage.InsertCtx
reqBuf bytesutil.ByteBuffer
reqBuf []byte
tailBuf []byte
copyBuf [16 * 1024]byte
err error
}
@ -132,7 +118,7 @@ func (ctx *pushCtx) Error() error {
func (ctx *pushCtx) reset() {
ctx.Rows.Reset()
ctx.Common.Reset()
ctx.reqBuf.Reset()
ctx.reqBuf = ctx.reqBuf[:0]
ctx.tailBuf = ctx.tailBuf[:0]
ctx.err = nil

View file

@ -1,7 +1,6 @@
package influx
import (
"bytes"
"compress/gzip"
"fmt"
"io"
@ -10,6 +9,7 @@ import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/concurrencylimiter"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
@ -128,36 +128,21 @@ func putGzipReader(zr *gzip.Reader) {
var gzipReaderPool sync.Pool
const maxReadPacketSize = 4 * 1024 * 1024
func (ctx *pushCtx) Read(r io.Reader, tsMultiplier int64) bool {
if ctx.err != nil {
return false
}
lr := io.LimitReader(r, maxReadPacketSize)
ctx.reqBuf.Reset()
ctx.reqBuf.B = append(ctx.reqBuf.B[:0], ctx.tailBuf...)
n, err := io.CopyBuffer(&ctx.reqBuf, lr, ctx.copyBuf[:])
if err != nil {
influxReadErrors.Inc()
ctx.err = fmt.Errorf("cannot read influx line protocol data: %s", err)
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf)
if ctx.err != nil {
if ctx.err != io.EOF {
influxReadErrors.Inc()
ctx.err = fmt.Errorf("cannot read influx line protocol data: %s", ctx.err)
}
return false
}
if n < maxReadPacketSize {
// Mark the end of stream.
ctx.err = io.EOF
}
// Parse all the rows until the last newline in ctx.reqBuf.B
nn := bytes.LastIndexByte(ctx.reqBuf.B, '\n')
ctx.tailBuf = ctx.tailBuf[:0]
if nn >= 0 {
ctx.tailBuf = append(ctx.tailBuf[:0], ctx.reqBuf.B[nn+1:]...)
ctx.reqBuf.B = ctx.reqBuf.B[:nn]
}
if err = ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf.B)); err != nil {
if err := ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf)); err != nil {
influxUnmarshalErrors.Inc()
ctx.err = fmt.Errorf("cannot unmarshal influx line protocol data with size %d: %s", len(ctx.reqBuf.B), err)
ctx.err = fmt.Errorf("cannot unmarshal influx line protocol data with size %d: %s", len(ctx.reqBuf), err)
return false
}
@ -196,9 +181,8 @@ type pushCtx struct {
Rows Rows
Common netstorage.InsertCtx
reqBuf bytesutil.ByteBuffer
reqBuf []byte
tailBuf []byte
copyBuf [16 * 1024]byte
metricGroupBuf []byte
err error
@ -214,7 +198,7 @@ func (ctx *pushCtx) Error() error {
func (ctx *pushCtx) reset() {
ctx.Rows.Reset()
ctx.Common.Reset()
ctx.reqBuf.Reset()
ctx.reqBuf = ctx.reqBuf[:0]
ctx.tailBuf = ctx.tailBuf[:0]
ctx.metricGroupBuf = ctx.metricGroupBuf[:0]

View file

@ -1,7 +1,6 @@
package opentsdb
import (
"bytes"
"fmt"
"io"
"net"
@ -9,6 +8,7 @@ import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/concurrencylimiter"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
@ -74,33 +74,22 @@ func (ctx *pushCtx) Read(r io.Reader) bool {
return false
}
}
lr := io.LimitReader(r, maxReadPacketSize)
ctx.reqBuf.Reset()
ctx.reqBuf.B = append(ctx.reqBuf.B[:0], ctx.tailBuf...)
n, err := io.CopyBuffer(&ctx.reqBuf, lr, ctx.copyBuf[:])
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Timeout() {
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf)
if ctx.err != nil {
if ne, ok := ctx.err.(net.Error); ok && ne.Timeout() {
// Flush the read data on timeout and try reading again.
ctx.err = nil
} else {
opentsdbReadErrors.Inc()
ctx.err = fmt.Errorf("cannot read OpenTSDB put protocol data: %s", err)
if ctx.err != io.EOF {
opentsdbReadErrors.Inc()
ctx.err = fmt.Errorf("cannot read OpenTSDB put protocol data: %s", ctx.err)
}
return false
}
} else if n < maxReadPacketSize {
// Mark the end of stream.
ctx.err = io.EOF
}
// Parse all the rows until the last newline in ctx.reqBuf.B
nn := bytes.LastIndexByte(ctx.reqBuf.B, '\n')
ctx.tailBuf = ctx.tailBuf[:0]
if nn >= 0 {
ctx.tailBuf = append(ctx.tailBuf[:0], ctx.reqBuf.B[nn+1:]...)
ctx.reqBuf.B = ctx.reqBuf.B[:nn]
}
if err = ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf.B)); err != nil {
if err := ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf)); err != nil {
opentsdbUnmarshalErrors.Inc()
ctx.err = fmt.Errorf("cannot unmarshal OpenTSDB put protocol data with size %d: %s", len(ctx.reqBuf.B), err)
ctx.err = fmt.Errorf("cannot unmarshal OpenTSDB put protocol data with size %d: %s", len(ctx.reqBuf), err)
return false
}
@ -115,9 +104,8 @@ type pushCtx struct {
Rows Rows
Common netstorage.InsertCtx
reqBuf bytesutil.ByteBuffer
reqBuf []byte
tailBuf []byte
copyBuf [16 * 1024]byte
err error
}
@ -132,7 +120,7 @@ func (ctx *pushCtx) Error() error {
func (ctx *pushCtx) reset() {
ctx.Rows.Reset()
ctx.Common.Reset()
ctx.reqBuf.Reset()
ctx.reqBuf = ctx.reqBuf[:0]
ctx.tailBuf = ctx.tailBuf[:0]
ctx.err = nil