From 968d094524316a76411a47a24cbb4f0d9ecd7c07 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 28 May 2019 17:31:35 +0300 Subject: [PATCH] app/vminsert: reduce memory usage for Influx, Graphite and OpenTSDB protocols Do not buffer per-connection data and just store it as it arrives --- app/vminsert/common/lines_reader.go | 60 +++++++++ app/vminsert/common/lines_reader_test.go | 148 +++++++++++++++++++++++ app/vminsert/graphite/request_handler.go | 39 ++---- app/vminsert/influx/request_handler.go | 37 ++---- app/vminsert/opentsdb/request_handler.go | 37 ++---- 5 files changed, 242 insertions(+), 79 deletions(-) create mode 100644 app/vminsert/common/lines_reader.go create mode 100644 app/vminsert/common/lines_reader_test.go diff --git a/app/vminsert/common/lines_reader.go b/app/vminsert/common/lines_reader.go new file mode 100644 index 000000000..f2d7db3f7 --- /dev/null +++ b/app/vminsert/common/lines_reader.go @@ -0,0 +1,60 @@ +package common + +import ( + "bytes" + "fmt" + "io" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" +) + +// The maximum size of a single line returned by ReadLinesBlock. +const maxLineSize = 256 * 1024 + +// Default size in bytes of a single block returned by ReadLinesBlock. +const defaultBlockSize = 64 * 1024 + +// ReadLinesBlock reads a block of lines delimited by '\n' from tailBuf and r into dstBuf. +// +// Trailing chars after the last newline are put into tailBuf. +// +// Returns (dstBuf, tailBuf). +func ReadLinesBlock(r io.Reader, dstBuf, tailBuf []byte) ([]byte, []byte, error) { + if cap(dstBuf) < defaultBlockSize { + dstBuf = bytesutil.Resize(dstBuf, defaultBlockSize) + } + dstBuf = append(dstBuf[:0], tailBuf...) +again: + n, err := r.Read(dstBuf[len(dstBuf):cap(dstBuf)]) + // Check for error only if zero bytes read from r, i.e. no forward progress made. + // Otherwise process the read data. + if n == 0 { + if err == nil { + return dstBuf, tailBuf, fmt.Errorf("no forward progress made") + } + return dstBuf, tailBuf, err + } + dstBuf = dstBuf[:len(dstBuf)+n] + + // Search for the last newline in dstBuf and put the rest into tailBuf. + nn := bytes.LastIndexByte(dstBuf[len(dstBuf)-n:], '\n') + if nn < 0 { + // Didn't found at least a single line. + if len(dstBuf) > maxLineSize { + return dstBuf, tailBuf, fmt.Errorf("too long line: more than %d bytes", maxLineSize) + } + if cap(dstBuf) < 2*len(dstBuf) { + // Increase dsbBuf capacity, so more data could be read into it. + dstBufLen := len(dstBuf) + dstBuf = bytesutil.Resize(dstBuf, 2*cap(dstBuf)) + dstBuf = dstBuf[:dstBufLen] + } + goto again + } + + // Found at least a single line. Return it. + nn += len(dstBuf) - n + tailBuf = append(tailBuf[:0], dstBuf[nn+1:]...) + dstBuf = dstBuf[:nn] + return dstBuf, tailBuf, nil +} diff --git a/app/vminsert/common/lines_reader_test.go b/app/vminsert/common/lines_reader_test.go new file mode 100644 index 000000000..4f320e215 --- /dev/null +++ b/app/vminsert/common/lines_reader_test.go @@ -0,0 +1,148 @@ +package common + +import ( + "bytes" + "fmt" + "io" + "testing" +) + +func TestReadLinesBlockFailure(t *testing.T) { + f := func(s string) { + t.Helper() + r := bytes.NewBufferString(s) + if _, _, err := ReadLinesBlock(r, nil, nil); err == nil { + t.Fatalf("expecting non-nil error") + } + sbr := &singleByteReader{ + b: []byte(s), + } + if _, _, err := ReadLinesBlock(sbr, nil, nil); err == nil { + t.Fatalf("expecting non-nil error") + } + fr := &failureReader{} + if _, _, err := ReadLinesBlock(fr, nil, nil); err == nil { + t.Fatalf("expecting non-nil error") + } + } + + // empty string + f("") + + // no newline in nonempty string + f("foobar") + + // too long string + b := make([]byte, maxLineSize+1) + f(string(b)) +} + +type failureReader struct{} + +func (fr *failureReader) Read(p []byte) (int, error) { + return 0, fmt.Errorf("some error") +} + +func TestReadLineBlockSuccessSingleByteReader(t *testing.T) { + f := func(s, dstBufExpected, tailBufExpected string) { + t.Helper() + + r := &singleByteReader{ + b: []byte(s), + } + dstBuf, tailBuf, err := ReadLinesBlock(r, nil, nil) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if string(dstBuf) != dstBufExpected { + t.Fatalf("unexpected dstBuf; got %q; want %q; tailBuf=%q", dstBuf, dstBufExpected, tailBuf) + } + if string(tailBuf) != tailBufExpected { + t.Fatalf("unexpected tailBuf; got %q; want %q; dstBuf=%q", tailBuf, tailBufExpected, dstBuf) + } + + // Verify the same with non-empty dstBuf and tailBuf + r = &singleByteReader{ + b: []byte(s), + } + dstBuf, tailBuf, err = ReadLinesBlock(r, dstBuf, tailBuf[:0]) + if err != nil { + t.Fatalf("non-empty bufs: unexpected error: %s", err) + } + if string(dstBuf) != dstBufExpected { + t.Fatalf("non-empty bufs: unexpected dstBuf; got %q; want %q; tailBuf=%q", dstBuf, dstBufExpected, tailBuf) + } + if string(tailBuf) != tailBufExpected { + t.Fatalf("non-empty bufs: unexpected tailBuf; got %q; want %q; dstBuf=%q", tailBuf, tailBufExpected, dstBuf) + } + } + + f("\n", "", "") + f("foo\n", "foo", "") + f("\nfoo", "", "") + f("foo\nbar", "foo", "") + f("foo\nbar\nbaz", "foo", "") + + // The maximum line size + b := make([]byte, maxLineSize+10) + b[maxLineSize] = '\n' + f(string(b), string(b[:maxLineSize]), "") +} + +func TestReadLineBlockSuccessBytesBuffer(t *testing.T) { + f := func(s, dstBufExpected, tailBufExpected string) { + t.Helper() + + r := bytes.NewBufferString(s) + dstBuf, tailBuf, err := ReadLinesBlock(r, nil, nil) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if string(dstBuf) != dstBufExpected { + t.Fatalf("unexpected dstBuf; got %q; want %q; tailBuf=%q", dstBuf, dstBufExpected, tailBuf) + } + if string(tailBuf) != tailBufExpected { + t.Fatalf("unexpected tailBuf; got %q; want %q; dstBuf=%q", tailBuf, tailBufExpected, dstBuf) + } + + // Verify the same with non-empty dstBuf and tailBuf + r = bytes.NewBufferString(s) + dstBuf, tailBuf, err = ReadLinesBlock(r, dstBuf, tailBuf[:0]) + if err != nil { + t.Fatalf("non-empty bufs: unexpected error: %s", err) + } + if string(dstBuf) != dstBufExpected { + t.Fatalf("non-empty bufs: unexpected dstBuf; got %q; want %q; tailBuf=%q", dstBuf, dstBufExpected, tailBuf) + } + if string(tailBuf) != tailBufExpected { + t.Fatalf("non-empty bufs: unexpected tailBuf; got %q; want %q; dstBuf=%q", tailBuf, tailBufExpected, dstBuf) + } + } + + f("\n", "", "") + f("foo\n", "foo", "") + f("\nfoo", "", "foo") + f("foo\nbar", "foo", "bar") + f("foo\nbar\nbaz", "foo\nbar", "baz") + + // The maximum line size + b := make([]byte, maxLineSize+10) + b[maxLineSize] = '\n' + f(string(b), string(b[:maxLineSize]), string(b[maxLineSize+1:])) +} + +type singleByteReader struct { + b []byte +} + +func (sbr *singleByteReader) Read(p []byte) (int, error) { + if len(sbr.b) == 0 { + return 0, io.EOF + } + n := copy(p, sbr.b[:1]) + sbr.b = sbr.b[n:] + if len(sbr.b) == 0 { + return n, io.EOF + } + return n, nil +} diff --git a/app/vminsert/graphite/request_handler.go b/app/vminsert/graphite/request_handler.go index 7c59d7cb2..67f74cbb5 100644 --- a/app/vminsert/graphite/request_handler.go +++ b/app/vminsert/graphite/request_handler.go @@ -1,7 +1,6 @@ package graphite import ( - "bytes" "fmt" "io" "net" @@ -55,8 +54,6 @@ func (ctx *pushCtx) InsertRows() error { return ic.FlushBufs() } -const maxReadPacketSize = 4 * 1024 * 1024 - const flushTimeout = 3 * time.Second func (ctx *pushCtx) Read(r io.Reader) bool { @@ -71,33 +68,22 @@ func (ctx *pushCtx) Read(r io.Reader) bool { return false } } - lr := io.LimitReader(r, maxReadPacketSize) - ctx.reqBuf.Reset() - ctx.reqBuf.B = append(ctx.reqBuf.B[:0], ctx.tailBuf...) - n, err := io.CopyBuffer(&ctx.reqBuf, lr, ctx.copyBuf[:]) - if err != nil { - if ne, ok := err.(net.Error); ok && ne.Timeout() { + ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf) + if ctx.err != nil { + if ne, ok := ctx.err.(net.Error); ok && ne.Timeout() { // Flush the read data on timeout and try reading again. + ctx.err = nil } else { - graphiteReadErrors.Inc() - ctx.err = fmt.Errorf("cannot read graphite plaintext protocol data: %s", err) + if ctx.err != io.EOF { + graphiteReadErrors.Inc() + ctx.err = fmt.Errorf("cannot read graphite plaintext protocol data: %s", ctx.err) + } return false } - } else if n < maxReadPacketSize { - // Mark the end of stream. - ctx.err = io.EOF } - - // Parse all the rows until the last newline in ctx.reqBuf.B - nn := bytes.LastIndexByte(ctx.reqBuf.B, '\n') - ctx.tailBuf = ctx.tailBuf[:0] - if nn >= 0 { - ctx.tailBuf = append(ctx.tailBuf[:0], ctx.reqBuf.B[nn+1:]...) - ctx.reqBuf.B = ctx.reqBuf.B[:nn] - } - if err = ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf.B)); err != nil { + if err := ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf)); err != nil { graphiteUnmarshalErrors.Inc() - ctx.err = fmt.Errorf("cannot unmarshal graphite plaintext protocol data with size %d: %s", len(ctx.reqBuf.B), err) + ctx.err = fmt.Errorf("cannot unmarshal graphite plaintext protocol data with size %d: %s", len(ctx.reqBuf), err) return false } @@ -112,9 +98,8 @@ type pushCtx struct { Rows Rows Common common.InsertCtx - reqBuf bytesutil.ByteBuffer + reqBuf []byte tailBuf []byte - copyBuf [16 * 1024]byte err error } @@ -129,7 +114,7 @@ func (ctx *pushCtx) Error() error { func (ctx *pushCtx) reset() { ctx.Rows.Reset() ctx.Common.Reset(0) - ctx.reqBuf.Reset() + ctx.reqBuf = ctx.reqBuf[:0] ctx.tailBuf = ctx.tailBuf[:0] ctx.err = nil diff --git a/app/vminsert/influx/request_handler.go b/app/vminsert/influx/request_handler.go index 2c713f66c..2e489e7e2 100644 --- a/app/vminsert/influx/request_handler.go +++ b/app/vminsert/influx/request_handler.go @@ -1,7 +1,6 @@ package influx import ( - "bytes" "compress/gzip" "fmt" "io" @@ -123,36 +122,21 @@ func putGzipReader(zr *gzip.Reader) { var gzipReaderPool sync.Pool -const maxReadPacketSize = 4 * 1024 * 1024 - func (ctx *pushCtx) Read(r io.Reader, tsMultiplier int64) bool { if ctx.err != nil { return false } - lr := io.LimitReader(r, maxReadPacketSize) - ctx.reqBuf.Reset() - ctx.reqBuf.B = append(ctx.reqBuf.B[:0], ctx.tailBuf...) - n, err := io.CopyBuffer(&ctx.reqBuf, lr, ctx.copyBuf[:]) - if err != nil { - influxReadErrors.Inc() - ctx.err = fmt.Errorf("cannot read influx line protocol data: %s", err) + ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf) + if ctx.err != nil { + if ctx.err != io.EOF { + influxReadErrors.Inc() + ctx.err = fmt.Errorf("cannot read influx line protocol data: %s", ctx.err) + } return false } - if n < maxReadPacketSize { - // Mark the end of stream. - ctx.err = io.EOF - } - - // Parse all the rows until the last newline in ctx.reqBuf.B - nn := bytes.LastIndexByte(ctx.reqBuf.B, '\n') - ctx.tailBuf = ctx.tailBuf[:0] - if nn >= 0 { - ctx.tailBuf = append(ctx.tailBuf[:0], ctx.reqBuf.B[nn+1:]...) - ctx.reqBuf.B = ctx.reqBuf.B[:nn] - } - if err = ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf.B)); err != nil { + if err := ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf)); err != nil { influxUnmarshalErrors.Inc() - ctx.err = fmt.Errorf("cannot unmarshal influx line protocol data with size %d: %s", len(ctx.reqBuf.B), err) + ctx.err = fmt.Errorf("cannot unmarshal influx line protocol data with size %d: %s", len(ctx.reqBuf), err) return false } @@ -191,9 +175,8 @@ type pushCtx struct { Rows Rows Common common.InsertCtx - reqBuf bytesutil.ByteBuffer + reqBuf []byte tailBuf []byte - copyBuf [16 * 1024]byte metricNameBuf []byte metricGroupBuf []byte @@ -211,7 +194,7 @@ func (ctx *pushCtx) reset() { ctx.Rows.Reset() ctx.Common.Reset(0) - ctx.reqBuf.Reset() + ctx.reqBuf = ctx.reqBuf[:0] ctx.tailBuf = ctx.tailBuf[:0] ctx.metricNameBuf = ctx.metricNameBuf[:0] ctx.metricGroupBuf = ctx.metricGroupBuf[:0] diff --git a/app/vminsert/opentsdb/request_handler.go b/app/vminsert/opentsdb/request_handler.go index eef981a5f..8b9d5b66f 100644 --- a/app/vminsert/opentsdb/request_handler.go +++ b/app/vminsert/opentsdb/request_handler.go @@ -1,7 +1,6 @@ package opentsdb import ( - "bytes" "fmt" "io" "net" @@ -71,33 +70,22 @@ func (ctx *pushCtx) Read(r io.Reader) bool { return false } } - lr := io.LimitReader(r, maxReadPacketSize) - ctx.reqBuf.Reset() - ctx.reqBuf.B = append(ctx.reqBuf.B[:0], ctx.tailBuf...) - n, err := io.CopyBuffer(&ctx.reqBuf, lr, ctx.copyBuf[:]) - if err != nil { - if ne, ok := err.(net.Error); ok && ne.Timeout() { + ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf) + if ctx.err != nil { + if ne, ok := ctx.err.(net.Error); ok && ne.Timeout() { // Flush the read data on timeout and try reading again. + ctx.err = nil } else { - opentsdbReadErrors.Inc() - ctx.err = fmt.Errorf("cannot read OpenTSDB put protocol data: %s", err) + if ctx.err != io.EOF { + opentsdbReadErrors.Inc() + ctx.err = fmt.Errorf("cannot read OpenTSDB put protocol data: %s", ctx.err) + } return false } - } else if n < maxReadPacketSize { - // Mark the end of stream. - ctx.err = io.EOF } - - // Parse all the rows until the last newline in ctx.reqBuf.B - nn := bytes.LastIndexByte(ctx.reqBuf.B, '\n') - ctx.tailBuf = ctx.tailBuf[:0] - if nn >= 0 { - ctx.tailBuf = append(ctx.tailBuf[:0], ctx.reqBuf.B[nn+1:]...) - ctx.reqBuf.B = ctx.reqBuf.B[:nn] - } - if err = ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf.B)); err != nil { + if err := ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf)); err != nil { opentsdbUnmarshalErrors.Inc() - ctx.err = fmt.Errorf("cannot unmarshal OpenTSDB put protocol data with size %d: %s", len(ctx.reqBuf.B), err) + ctx.err = fmt.Errorf("cannot unmarshal OpenTSDB put protocol data with size %d: %s", len(ctx.reqBuf), err) return false } @@ -112,9 +100,8 @@ type pushCtx struct { Rows Rows Common common.InsertCtx - reqBuf bytesutil.ByteBuffer + reqBuf []byte tailBuf []byte - copyBuf [16 * 1024]byte err error } @@ -129,7 +116,7 @@ func (ctx *pushCtx) Error() error { func (ctx *pushCtx) reset() { ctx.Rows.Reset() ctx.Common.Reset(0) - ctx.reqBuf.Reset() + ctx.reqBuf = ctx.reqBuf[:0] ctx.tailBuf = ctx.tailBuf[:0] ctx.err = nil