From 1f7d9a213a68f0f1d3bcb329e53049b12b3936d3 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 26 Jun 2019 02:51:54 +0300 Subject: [PATCH] app/vminsert: fix inifinite loop when reading two lines without newline in the end Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/82 --- app/vminsert/common/lines_reader.go | 3 +- app/vminsert/common/lines_reader_test.go | 71 +++++++++++++++++++++++- app/vminsert/influx/parser_test.go | 26 +++++++++ 3 files changed, 97 insertions(+), 3 deletions(-) diff --git a/app/vminsert/common/lines_reader.go b/app/vminsert/common/lines_reader.go index 17c620d42..74973f126 100644 --- a/app/vminsert/common/lines_reader.go +++ b/app/vminsert/common/lines_reader.go @@ -24,6 +24,7 @@ func ReadLinesBlock(r io.Reader, dstBuf, tailBuf []byte) ([]byte, []byte, error) dstBuf = bytesutil.Resize(dstBuf, defaultBlockSize) } dstBuf = append(dstBuf[:0], tailBuf...) + tailBuf = tailBuf[:0] again: n, err := r.Read(dstBuf[len(dstBuf):cap(dstBuf)]) // Check for error only if zero bytes read from r, i.e. no forward progress made. @@ -34,7 +35,7 @@ again: } if err == io.EOF && len(dstBuf) > 0 { // Missing newline in the end of stream. This is OK, - /// so suppress io.EOF for now. It will be returned during the next + // so suppress io.EOF for now. It will be returned during the next // call to ReadLinesBlock. // This fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/60 . return dstBuf, tailBuf, nil diff --git a/app/vminsert/common/lines_reader_test.go b/app/vminsert/common/lines_reader_test.go index d0386fd13..a083322c1 100644 --- a/app/vminsert/common/lines_reader_test.go +++ b/app/vminsert/common/lines_reader_test.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "io" + "reflect" "testing" ) @@ -40,7 +41,73 @@ func (fr *failureReader) Read(p []byte) (int, error) { return 0, fmt.Errorf("some error") } -func TestReadLineBlockSuccessSingleByteReader(t *testing.T) { +func TestReadLinesBlockMultiLinesSingleByteReader(t *testing.T) { + f := func(s string, linesExpected []string) { + t.Helper() + + r := &singleByteReader{ + b: []byte(s), + } + var err error + var dstBuf, tailBuf []byte + var lines []string + for { + dstBuf, tailBuf, err = ReadLinesBlock(r, dstBuf, tailBuf) + if err != nil { + if err == io.EOF { + break + } + t.Fatalf("unexpected error in ReadLinesBlock(%q): %s", s, err) + } + lines = append(lines, string(dstBuf)) + } + if !reflect.DeepEqual(lines, linesExpected) { + t.Fatalf("unexpected lines after reading %q: got %q; want %q", s, lines, linesExpected) + } + } + + f("", nil) + f("foo", []string{"foo"}) + f("foo\n", []string{"foo"}) + f("foo\nbar", []string{"foo", "bar"}) + f("\nfoo\nbar", []string{"", "foo", "bar"}) + f("\nfoo\nbar\n", []string{"", "foo", "bar"}) + f("\nfoo\nbar\n\n", []string{"", "foo", "bar", ""}) +} + +func TestReadLinesBlockMultiLinesBytesBuffer(t *testing.T) { + f := func(s string, linesExpected []string) { + t.Helper() + + r := bytes.NewBufferString(s) + var err error + var dstBuf, tailBuf []byte + var lines []string + for { + dstBuf, tailBuf, err = ReadLinesBlock(r, dstBuf, tailBuf) + if err != nil { + if err == io.EOF { + break + } + t.Fatalf("unexpected error in ReadLinesBlock(%q): %s", s, err) + } + lines = append(lines, string(dstBuf)) + } + if !reflect.DeepEqual(lines, linesExpected) { + t.Fatalf("unexpected lines after reading %q: got %q; want %q", s, lines, linesExpected) + } + } + + f("", nil) + f("foo", []string{"foo"}) + f("foo\n", []string{"foo"}) + f("foo\nbar", []string{"foo", "bar"}) + f("\nfoo\nbar", []string{"\nfoo", "bar"}) + f("\nfoo\nbar\n", []string{"\nfoo\nbar"}) + f("\nfoo\nbar\n\n", []string{"\nfoo\nbar\n"}) +} + +func TestReadLinesBlockSuccessSingleByteReader(t *testing.T) { f := func(s, dstBufExpected, tailBufExpected string) { t.Helper() @@ -87,7 +154,7 @@ func TestReadLineBlockSuccessSingleByteReader(t *testing.T) { f(string(b), string(b[:maxLineSize]), "") } -func TestReadLineBlockSuccessBytesBuffer(t *testing.T) { +func TestReadLinesBlockSuccessBytesBuffer(t *testing.T) { f := func(s, dstBufExpected, tailBufExpected string) { t.Helper() diff --git a/app/vminsert/influx/parser_test.go b/app/vminsert/influx/parser_test.go index 3668fd70f..cffdba472 100644 --- a/app/vminsert/influx/parser_test.go +++ b/app/vminsert/influx/parser_test.go @@ -348,4 +348,30 @@ func TestRowsUnmarshalSuccess(t *testing.T) { }, }, }) + // No newline after the second line. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/82 + f("foo,tag=xyz field=1.23 48934\n"+ + "bar x=-1i", &Rows{ + Rows: []Row{ + { + Measurement: "foo", + Tags: []Tag{{ + Key: "tag", + Value: "xyz", + }}, + Fields: []Field{{ + Key: "field", + Value: 1.23, + }}, + Timestamp: 48934, + }, + { + Measurement: "bar", + Fields: []Field{{ + Key: "x", + Value: -1, + }}, + }, + }, + }) }