mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vminsert/netstorage: send per-storageNode bufs to vmstorage nodes in parallel
This should improve the maximum ingestion throughput. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/175
This commit is contained in:
parent
694cc59ed1
commit
2f4c950fe9
1 changed files with 26 additions and 15 deletions
|
@ -9,19 +9,24 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/consts"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
xxhash "github.com/cespare/xxhash/v2"
|
||||
jump "github.com/lithammer/go-jump-consistent-hash"
|
||||
)
|
||||
|
||||
// InsertCtx is a generic context for inserting data
|
||||
// InsertCtx is a generic context for inserting data.
|
||||
//
|
||||
// InsertCtx.Reset must be called before the first usage.
|
||||
type InsertCtx struct {
|
||||
Labels []prompb.Label
|
||||
MetricNameBuf []byte
|
||||
|
||||
bufRowss []bufRows
|
||||
labelsBuf []byte
|
||||
|
||||
resultCh chan error
|
||||
}
|
||||
|
||||
type bufRows struct {
|
||||
|
@ -61,6 +66,11 @@ func (ctx *InsertCtx) Reset() {
|
|||
br.rows = 0
|
||||
}
|
||||
ctx.labelsBuf = ctx.labelsBuf[:0]
|
||||
if ctx.resultCh == nil {
|
||||
ctx.resultCh = make(chan error, len(storageNodes))
|
||||
} else if len(ctx.resultCh) > 0 {
|
||||
logger.Panicf("BUG: ctx.resultCh must be empty on Reset; got %d items", len(ctx.resultCh))
|
||||
}
|
||||
}
|
||||
|
||||
// AddLabel adds (name, value) label to ctx.Labels.
|
||||
|
@ -95,7 +105,7 @@ func (ctx *InsertCtx) WriteDataPointExt(at *auth.Token, storageNodeIdx int, metr
|
|||
br := &ctx.bufRowss[storageNodeIdx]
|
||||
sn := storageNodes[storageNodeIdx]
|
||||
bufNew := storage.MarshalMetricRow(br.buf, metricNameRaw, timestamp, value)
|
||||
if len(bufNew) >= maxStorageNodeBufSize {
|
||||
if len(bufNew) >= consts.MaxInsertPacketSize {
|
||||
// Send buf to storageNode, since it is too big.
|
||||
if err := br.pushTo(sn); err != nil {
|
||||
return err
|
||||
|
@ -108,28 +118,29 @@ func (ctx *InsertCtx) WriteDataPointExt(at *auth.Token, storageNodeIdx int, metr
|
|||
return nil
|
||||
}
|
||||
|
||||
var maxStorageNodeBufSize = func() int {
|
||||
n := 1024 * 1024
|
||||
if n > consts.MaxInsertPacketSize {
|
||||
n = consts.MaxInsertPacketSize
|
||||
}
|
||||
return n
|
||||
}()
|
||||
|
||||
// FlushBufs flushes ctx bufs to remote storage nodes.
|
||||
func (ctx *InsertCtx) FlushBufs() error {
|
||||
// Send per-storageNode bufs.
|
||||
// Send per-storageNode bufs in parallel.
|
||||
resultCh := ctx.resultCh
|
||||
resultChLen := 0
|
||||
for i := range ctx.bufRowss {
|
||||
br := &ctx.bufRowss[i]
|
||||
if len(br.buf) == 0 {
|
||||
continue
|
||||
}
|
||||
sn := storageNodes[i]
|
||||
if err := br.pushTo(sn); err != nil {
|
||||
return err
|
||||
resultChLen++
|
||||
go func(br *bufRows, sn *storageNode) {
|
||||
resultCh <- br.pushTo(sn)
|
||||
}(br, storageNodes[i])
|
||||
}
|
||||
var lastErr error
|
||||
for i := 0; i < resultChLen; i++ {
|
||||
err := <-resultCh
|
||||
if err != nil {
|
||||
lastErr = err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return lastErr
|
||||
}
|
||||
|
||||
// GetStorageNodeIdx returns storage node index for the given at and labels.
|
||||
|
|
Loading…
Reference in a new issue