lib/protoparser: remove superflowous memory allocations during protocol parsing

This commit is contained in:
Aliaksandr Valialkin 2022-04-06 14:00:08 +03:00
parent 569b0d444c
commit 5acd70109b
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
7 changed files with 112 additions and 73 deletions

View file

@ -45,16 +45,8 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error {
defer putStreamContext(ctx)
for ctx.Read() {
uw := getUnmarshalWork()
uw.callback = func(rows []Row) {
if err := callback(rows); err != nil {
ctx.callbackErrLock.Lock()
if ctx.callbackErr == nil {
ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err)
}
ctx.callbackErrLock.Unlock()
}
ctx.wg.Done()
}
uw.ctx = ctx
uw.callback = callback
uw.cds = cds
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
@ -153,18 +145,32 @@ var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct {
rows Rows
callback func(rows []Row)
ctx *streamContext
callback func(rows []Row) error
cds []ColumnDescriptor
reqBuf []byte
}
func (uw *unmarshalWork) reset() {
uw.rows.Reset()
uw.ctx = nil
uw.callback = nil
uw.cds = nil
uw.reqBuf = uw.reqBuf[:0]
}
func (uw *unmarshalWork) runCallback(rows []Row) {
ctx := uw.ctx
if err := uw.callback(rows); err != nil {
ctx.callbackErrLock.Lock()
if ctx.callbackErr == nil {
ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err)
}
ctx.callbackErrLock.Unlock()
}
ctx.wg.Done()
}
// Unmarshal implements common.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf), uw.cds)
@ -188,7 +194,7 @@ func (uw *unmarshalWork) Unmarshal() {
}
}
uw.callback(rows)
uw.runCallback(rows)
putUnmarshalWork(uw)
}

View file

@ -324,7 +324,8 @@ func Test_streamContext_Read(t *testing.T) {
}
uw := getUnmarshalWork()
callbackCalls := 0
uw.callback = func(rows []Row) {
uw.ctx = ctx
uw.callback = func(rows []Row) error {
callbackCalls++
if len(rows) != len(rowsExpected.Rows) {
t.Fatalf("different len of expected rows;\ngot\n%+v;\nwant\n%+v", rows, rowsExpected.Rows)
@ -332,8 +333,10 @@ func Test_streamContext_Read(t *testing.T) {
if !reflect.DeepEqual(rows, rowsExpected.Rows) {
t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows, rowsExpected.Rows)
}
return nil
}
uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf...)
ctx.wg.Add(1)
uw.Unmarshal()
if callbackCalls != 1 {
t.Fatalf("unexpected number of callback calls; got %d; want 1", callbackCalls)

View file

@ -31,16 +31,8 @@ func ParseStream(r io.Reader, callback func(rows []Row) error) error {
for ctx.Read() {
uw := getUnmarshalWork()
uw.callback = func(rows []Row) {
if err := callback(rows); err != nil {
ctx.callbackErrLock.Lock()
if ctx.callbackErr == nil {
ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err)
}
ctx.callbackErrLock.Unlock()
}
ctx.wg.Done()
}
uw.ctx = ctx
uw.callback = callback
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
common.ScheduleUnmarshalWork(uw)
@ -138,16 +130,30 @@ var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct {
rows Rows
callback func(rows []Row)
ctx *streamContext
callback func(rows []Row) error
reqBuf []byte
}
func (uw *unmarshalWork) reset() {
uw.rows.Reset()
uw.ctx = nil
uw.callback = nil
uw.reqBuf = uw.reqBuf[:0]
}
func (uw *unmarshalWork) runCallback(rows []Row) {
ctx := uw.ctx
if err := uw.callback(rows); err != nil {
ctx.callbackErrLock.Lock()
if ctx.callbackErr == nil {
ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err)
}
ctx.callbackErrLock.Unlock()
}
ctx.wg.Done()
}
// Unmarshal implements common.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf))
@ -176,7 +182,7 @@ func (uw *unmarshalWork) Unmarshal() {
}
}
uw.callback(rows)
uw.runCallback(rows)
putUnmarshalWork(uw)
}

View file

@ -56,16 +56,8 @@ func ParseStream(r io.Reader, isGzipped bool, precision, db string, callback fun
defer putStreamContext(ctx)
for ctx.Read() {
uw := getUnmarshalWork()
uw.callback = func(db string, rows []Row) {
if err := callback(db, rows); err != nil {
ctx.callbackErrLock.Lock()
if ctx.callbackErr == nil {
ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err)
}
ctx.callbackErrLock.Unlock()
}
ctx.wg.Done()
}
uw.ctx = ctx
uw.callback = callback
uw.db = db
uw.tsMultiplier = tsMultiplier
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
@ -165,7 +157,8 @@ var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct {
rows Rows
callback func(db string, rows []Row)
ctx *streamContext
callback func(db string, rows []Row) error
db string
tsMultiplier int64
reqBuf []byte
@ -173,12 +166,25 @@ type unmarshalWork struct {
func (uw *unmarshalWork) reset() {
uw.rows.Reset()
uw.ctx = nil
uw.callback = nil
uw.db = ""
uw.tsMultiplier = 0
uw.reqBuf = uw.reqBuf[:0]
}
func (uw *unmarshalWork) runCallback(rows []Row) {
ctx := uw.ctx
if err := uw.callback(uw.db, rows); err != nil {
ctx.callbackErrLock.Lock()
if ctx.callbackErr == nil {
ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err)
}
ctx.callbackErrLock.Unlock()
}
ctx.wg.Done()
}
// Unmarshal implements common.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf))
@ -225,7 +231,7 @@ func (uw *unmarshalWork) Unmarshal() {
}
}
uw.callback(uw.db, rows)
uw.runCallback(rows)
putUnmarshalWork(uw)
}

View file

@ -30,16 +30,8 @@ func ParseStream(r io.Reader, callback func(rows []Row) error) error {
defer putStreamContext(ctx)
for ctx.Read() {
uw := getUnmarshalWork()
uw.callback = func(rows []Row) {
if err := callback(rows); err != nil {
ctx.callbackErrLock.Lock()
if ctx.callbackErr == nil {
ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err)
}
ctx.callbackErrLock.Unlock()
}
ctx.wg.Done()
}
uw.ctx = ctx
uw.callback = callback
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
common.ScheduleUnmarshalWork(uw)
@ -137,16 +129,30 @@ var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct {
rows Rows
callback func(rows []Row)
ctx *streamContext
callback func(rows []Row) error
reqBuf []byte
}
func (uw *unmarshalWork) reset() {
uw.rows.Reset()
uw.ctx = nil
uw.callback = nil
uw.reqBuf = uw.reqBuf[:0]
}
func (uw *unmarshalWork) runCallback(rows []Row) {
ctx := uw.ctx
if err := uw.callback(rows); err != nil {
ctx.callbackErrLock.Lock()
if ctx.callbackErr == nil {
ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err)
}
ctx.callbackErrLock.Unlock()
}
ctx.wg.Done()
}
// Unmarshal implements common.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf))
@ -175,7 +181,7 @@ func (uw *unmarshalWork) Unmarshal() {
}
}
uw.callback(rows)
uw.runCallback(rows)
putUnmarshalWork(uw)
}

View file

@ -32,16 +32,8 @@ func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback f
for ctx.Read() {
uw := getUnmarshalWork()
uw.errLogger = errLogger
uw.callback = func(rows []Row) {
if err := callback(rows); err != nil {
ctx.callbackErrLock.Lock()
if ctx.callbackErr == nil {
ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err)
}
ctx.callbackErrLock.Unlock()
}
ctx.wg.Done()
}
uw.ctx = ctx
uw.callback = callback
uw.defaultTimestamp = defaultTimestamp
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
@ -140,7 +132,8 @@ var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct {
rows Rows
callback func(rows []Row)
ctx *streamContext
callback func(rows []Row) error
errLogger func(string)
defaultTimestamp int64
reqBuf []byte
@ -148,12 +141,25 @@ type unmarshalWork struct {
func (uw *unmarshalWork) reset() {
uw.rows.Reset()
uw.ctx = nil
uw.callback = nil
uw.errLogger = nil
uw.defaultTimestamp = 0
uw.reqBuf = uw.reqBuf[:0]
}
func (uw *unmarshalWork) runCallback(rows []Row) {
ctx := uw.ctx
if err := uw.callback(rows); err != nil {
ctx.callbackErrLock.Lock()
if ctx.callbackErr == nil {
ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err)
}
ctx.callbackErrLock.Unlock()
}
ctx.wg.Done()
}
// Unmarshal implements common.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
if uw.errLogger != nil {
@ -176,7 +182,7 @@ func (uw *unmarshalWork) Unmarshal() {
}
}
uw.callback(rows)
uw.runCallback(rows)
putUnmarshalWork(uw)
}

View file

@ -34,16 +34,8 @@ func ParseStream(r io.Reader, isGzipped bool, callback func(rows []Row) error) e
defer putStreamContext(ctx)
for ctx.Read() {
uw := getUnmarshalWork()
uw.callback = func(rows []Row) {
if err := callback(rows); err != nil {
ctx.callbackErrLock.Lock()
if ctx.callbackErr == nil {
ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err)
}
ctx.callbackErrLock.Unlock()
}
ctx.wg.Done()
}
uw.ctx = ctx
uw.callback = callback
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
common.ScheduleUnmarshalWork(uw)
@ -141,16 +133,30 @@ var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct {
rows Rows
callback func(rows []Row)
ctx *streamContext
callback func(rows []Row) error
reqBuf []byte
}
func (uw *unmarshalWork) reset() {
uw.rows.Reset()
uw.ctx = nil
uw.callback = nil
uw.reqBuf = uw.reqBuf[:0]
}
func (uw *unmarshalWork) runCallback(rows []Row) {
ctx := uw.ctx
if err := uw.callback(rows); err != nil {
ctx.callbackErrLock.Lock()
if ctx.callbackErr == nil {
ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err)
}
ctx.callbackErrLock.Unlock()
}
ctx.wg.Done()
}
// Unmarshal implements common.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf))
@ -159,7 +165,7 @@ func (uw *unmarshalWork) Unmarshal() {
row := &rows[i]
rowsRead.Add(len(row.Timestamps))
}
uw.callback(rows)
uw.runCallback(rows)
putUnmarshalWork(uw)
}