diff --git a/lib/protoparser/prometheus/streamparser_test.go b/lib/protoparser/prometheus/streamparser_test.go index 6ef7e11f25..09feba6e8b 100644 --- a/lib/protoparser/prometheus/streamparser_test.go +++ b/lib/protoparser/prometheus/streamparser_test.go @@ -4,22 +4,41 @@ import ( "bytes" "compress/gzip" "reflect" + "sync" "testing" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" ) func TestParseStream(t *testing.T) { + common.StartUnmarshalWorkers() + defer common.StopUnmarshalWorkers() + const defaultTimestamp = 123 f := func(s string, rowsExpected []Row) { t.Helper() bb := bytes.NewBufferString(s) var result []Row + var lock sync.Mutex + doneCh := make(chan struct{}) err := ParseStream(bb, defaultTimestamp, false, func(rows []Row) error { + lock.Lock() result = appendRowCopies(result, rows) + if len(result) == len(rowsExpected) { + close(doneCh) + } + lock.Unlock() return nil }) if err != nil { t.Fatalf("unexpected error when parsing %q: %s", s, err) } + select { + case <-doneCh: + case <-time.After(time.Second): + t.Fatalf("timeout") + } if !reflect.DeepEqual(result, rowsExpected) { t.Fatalf("unexpected rows parsed; got\n%v\nwant\n%v", result, rowsExpected) } @@ -34,19 +53,29 @@ func TestParseStream(t *testing.T) { t.Fatalf("unexpected error when closing gzip writer: %s", err) } result = nil + doneCh = make(chan struct{}) err = ParseStream(bb, defaultTimestamp, true, func(rows []Row) error { + lock.Lock() result = appendRowCopies(result, rows) + if len(result) == len(rowsExpected) { + close(doneCh) + } + lock.Unlock() return nil }) if err != nil { t.Fatalf("unexpected error when parsing compressed %q: %s", s, err) } + select { + case <-doneCh: + case <-time.After(time.Second): + t.Fatalf("timeout on compressed stream") + } if !reflect.DeepEqual(result, rowsExpected) { - t.Fatalf("unexpected rows parsed; got\n%v\nwant\n%v", result, rowsExpected) + t.Fatalf("unexpected compressed rows parsed; got\n%v\nwant\n%v", result, rowsExpected) } } - f("", nil) f("foo 123 456", []Row{{ Metric: "foo", Value: 123,