mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/protoparser/prometheus: fix TestParseStream after 124f78857b
This commit is contained in:
parent
96ee276e6e
commit
7cde336b33
1 changed files with 31 additions and 2 deletions
|
@ -4,22 +4,41 @@ import (
|
|||
"bytes"
|
||||
"compress/gzip"
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||
)
|
||||
|
||||
func TestParseStream(t *testing.T) {
|
||||
common.StartUnmarshalWorkers()
|
||||
defer common.StopUnmarshalWorkers()
|
||||
|
||||
const defaultTimestamp = 123
|
||||
f := func(s string, rowsExpected []Row) {
|
||||
t.Helper()
|
||||
bb := bytes.NewBufferString(s)
|
||||
var result []Row
|
||||
var lock sync.Mutex
|
||||
doneCh := make(chan struct{})
|
||||
err := ParseStream(bb, defaultTimestamp, false, func(rows []Row) error {
|
||||
lock.Lock()
|
||||
result = appendRowCopies(result, rows)
|
||||
if len(result) == len(rowsExpected) {
|
||||
close(doneCh)
|
||||
}
|
||||
lock.Unlock()
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error when parsing %q: %s", s, err)
|
||||
}
|
||||
select {
|
||||
case <-doneCh:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatalf("timeout")
|
||||
}
|
||||
if !reflect.DeepEqual(result, rowsExpected) {
|
||||
t.Fatalf("unexpected rows parsed; got\n%v\nwant\n%v", result, rowsExpected)
|
||||
}
|
||||
|
@ -34,19 +53,29 @@ func TestParseStream(t *testing.T) {
|
|||
t.Fatalf("unexpected error when closing gzip writer: %s", err)
|
||||
}
|
||||
result = nil
|
||||
doneCh = make(chan struct{})
|
||||
err = ParseStream(bb, defaultTimestamp, true, func(rows []Row) error {
|
||||
lock.Lock()
|
||||
result = appendRowCopies(result, rows)
|
||||
if len(result) == len(rowsExpected) {
|
||||
close(doneCh)
|
||||
}
|
||||
lock.Unlock()
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error when parsing compressed %q: %s", s, err)
|
||||
}
|
||||
select {
|
||||
case <-doneCh:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatalf("timeout on compressed stream")
|
||||
}
|
||||
if !reflect.DeepEqual(result, rowsExpected) {
|
||||
t.Fatalf("unexpected rows parsed; got\n%v\nwant\n%v", result, rowsExpected)
|
||||
t.Fatalf("unexpected compressed rows parsed; got\n%v\nwant\n%v", result, rowsExpected)
|
||||
}
|
||||
}
|
||||
|
||||
f("", nil)
|
||||
f("foo 123 456", []Row{{
|
||||
Metric: "foo",
|
||||
Value: 123,
|
||||
|
|
Loading…
Reference in a new issue