diff --git a/app/vminsert/netstorage/insert_ctx.go b/app/vminsert/netstorage/insert_ctx.go index f8f9202b4..5b89f01d0 100644 --- a/app/vminsert/netstorage/insert_ctx.go +++ b/app/vminsert/netstorage/insert_ctx.go @@ -9,19 +9,24 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/consts" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" xxhash "github.com/cespare/xxhash/v2" jump "github.com/lithammer/go-jump-consistent-hash" ) -// InsertCtx is a generic context for inserting data +// InsertCtx is a generic context for inserting data. +// +// InsertCtx.Reset must be called before the first usage. type InsertCtx struct { Labels []prompb.Label MetricNameBuf []byte bufRowss []bufRows labelsBuf []byte + + resultCh chan error } type bufRows struct { @@ -61,6 +66,11 @@ func (ctx *InsertCtx) Reset() { br.rows = 0 } ctx.labelsBuf = ctx.labelsBuf[:0] + if ctx.resultCh == nil { + ctx.resultCh = make(chan error, len(storageNodes)) + } else if len(ctx.resultCh) > 0 { + logger.Panicf("BUG: ctx.resultCh must be empty on Reset; got %d items", len(ctx.resultCh)) + } } // AddLabel adds (name, value) label to ctx.Labels. @@ -95,7 +105,7 @@ func (ctx *InsertCtx) WriteDataPointExt(at *auth.Token, storageNodeIdx int, metr br := &ctx.bufRowss[storageNodeIdx] sn := storageNodes[storageNodeIdx] bufNew := storage.MarshalMetricRow(br.buf, metricNameRaw, timestamp, value) - if len(bufNew) >= maxStorageNodeBufSize { + if len(bufNew) >= consts.MaxInsertPacketSize { // Send buf to storageNode, since it is too big. if err := br.pushTo(sn); err != nil { return err @@ -108,28 +118,29 @@ func (ctx *InsertCtx) WriteDataPointExt(at *auth.Token, storageNodeIdx int, metr return nil } -var maxStorageNodeBufSize = func() int { - n := 1024 * 1024 - if n > consts.MaxInsertPacketSize { - n = consts.MaxInsertPacketSize - } - return n -}() - // FlushBufs flushes ctx bufs to remote storage nodes. func (ctx *InsertCtx) FlushBufs() error { - // Send per-storageNode bufs. + // Send per-storageNode bufs in parallel. + resultCh := ctx.resultCh + resultChLen := 0 for i := range ctx.bufRowss { br := &ctx.bufRowss[i] if len(br.buf) == 0 { continue } - sn := storageNodes[i] - if err := br.pushTo(sn); err != nil { - return err + resultChLen++ + go func(br *bufRows, sn *storageNode) { + resultCh <- br.pushTo(sn) + }(br, storageNodes[i]) + } + var lastErr error + for i := 0; i < resultChLen; i++ { + err := <-resultCh + if err != nil { + lastErr = err } } - return nil + return lastErr } // GetStorageNodeIdx returns storage node index for the given at and labels.