VictoriaMetrics/lib/protoparser/clusternative/streamparser.go
Aliaksandr Valialkin b84aea1e6e lib/protoparser/clusternative: do not pool unmarshalWork structs, since they can occupy big amounts of memory (more than 100MB per each struct)
This should reduce memory usage for vmstorage under high ingestion rate when the vmstorage runs on a system with big number of CPU cores
2021-06-23 15:45:08 +03:00

144 lines
4.7 KiB
Go

package clusternative
import (
"fmt"
"io"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/consts"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
)
// ParseStream parses data sent from vminsert to bc and calls callback for parsed rows.
//
// The callback can be called concurrently multiple times for streamed data from req.
//
// callback shouldn't hold block after returning.
func ParseStream(bc *handshake.BufferedConn, callback func(rows []storage.MetricRow) error) error {
var wg sync.WaitGroup
var (
callbackErrLock sync.Mutex
callbackErr error
)
for {
// Do not use unmarshalWork pool, since every unmarshalWork structure usually occupies
// big amounts of memory (more than consts.MaxInsertPacketSize bytes).
// The pool would result in increased memory usage.
uw := &unmarshalWork{}
uw.callback = func(rows []storage.MetricRow) {
if err := callback(rows); err != nil {
processErrors.Inc()
callbackErrLock.Lock()
if callbackErr == nil {
callbackErr = fmt.Errorf("error when processing native block: %w", err)
}
callbackErrLock.Unlock()
}
}
uw.wg = &wg
var err error
uw.reqBuf, err = readBlock(uw.reqBuf[:0], bc)
if err != nil {
wg.Wait()
if err == io.EOF {
// Remote end gracefully closed the connection.
return nil
}
return err
}
blocksRead.Inc()
wg.Add(1)
common.ScheduleUnmarshalWork(uw)
}
}
// readBlock reads the next data block from vminsert-initiated bc, appends it to dst and returns the result.
func readBlock(dst []byte, bc *handshake.BufferedConn) ([]byte, error) {
sizeBuf := sizeBufPool.Get()
defer sizeBufPool.Put(sizeBuf)
sizeBuf.B = bytesutil.Resize(sizeBuf.B, 8)
if _, err := io.ReadFull(bc, sizeBuf.B); err != nil {
if err != io.EOF {
readErrors.Inc()
err = fmt.Errorf("cannot read packet size: %w", err)
}
return dst, err
}
packetSize := encoding.UnmarshalUint64(sizeBuf.B)
if packetSize > consts.MaxInsertPacketSize {
parseErrors.Inc()
return dst, fmt.Errorf("too big packet size: %d; shouldn't exceed %d", packetSize, consts.MaxInsertPacketSize)
}
dstLen := len(dst)
dst = bytesutil.Resize(dst, dstLen+int(packetSize))
if n, err := io.ReadFull(bc, dst[dstLen:]); err != nil {
readErrors.Inc()
return dst, fmt.Errorf("cannot read packet with size %d bytes: %w; read only %d bytes", packetSize, err, n)
}
// Send `ack` to vminsert that the packet has been received.
deadline := time.Now().Add(5 * time.Second)
if err := bc.SetWriteDeadline(deadline); err != nil {
writeErrors.Inc()
return dst, fmt.Errorf("cannot set write deadline for sending `ack` to vminsert: %w", err)
}
sizeBuf.B[0] = 1
if _, err := bc.Write(sizeBuf.B[:1]); err != nil {
writeErrors.Inc()
return dst, fmt.Errorf("cannot send `ack` to vminsert: %w", err)
}
if err := bc.Flush(); err != nil {
writeErrors.Inc()
return dst, fmt.Errorf("cannot flush `ack` to vminsert: %w", err)
}
return dst, nil
}
var sizeBufPool bytesutil.ByteBufferPool
var (
readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="clusternative"}`)
writeErrors = metrics.NewCounter(`vm_protoparser_write_errors_total{type="clusternative"}`)
rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="clusternative"}`)
blocksRead = metrics.NewCounter(`vm_protoparser_blocks_read_total{type="clusternative"}`)
parseErrors = metrics.NewCounter(`vm_protoparser_parse_errors_total{type="clusternative"}`)
processErrors = metrics.NewCounter(`vm_protoparser_process_errors_total{type="clusternative"}`)
)
type unmarshalWork struct {
wg *sync.WaitGroup
callback func(rows []storage.MetricRow)
reqBuf []byte
mrs []storage.MetricRow
lastResetTime uint64
}
// Unmarshal implements common.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
reqBuf := uw.reqBuf
for len(reqBuf) > 0 {
// Limit the number of rows passed to callback in order to reduce memory usage
// when processing big packets of rows.
mrs, tail, err := storage.UnmarshalMetricRows(uw.mrs[:0], reqBuf, maxRowsPerCallback)
uw.mrs = mrs
if err != nil {
parseErrors.Inc()
logger.Errorf("cannot unmarshal MetricRow from clusternative block with size %d (remaining %d bytes): %s", len(reqBuf), len(tail), err)
break
}
rowsRead.Add(len(mrs))
uw.callback(mrs)
reqBuf = tail
}
wg := uw.wg
wg.Done()
}
const maxRowsPerCallback = 10000