app/vminsert: moves replication at datapointWrite level

* It allows to tolerate slow storage node for replicationFactor > 1 and disableRerouting=false
* It allows to have partial replication with replicationFactor > and dropSamplesOnOverload=true
* It increases cpu and memory usage by 30-50% per replica

Signed-off-by: f41gh7 <nik@victoriametrics.com>
This commit is contained in:
f41gh7 2024-08-21 15:38:19 +02:00
parent 08cbbf8134
commit 1f80c3ce08
No known key found for this signature in database
GPG key ID: 4558311CF775EC72
2 changed files with 85 additions and 51 deletions

View file

@ -122,11 +122,48 @@ func (ctx *InsertCtx) ApplyRelabeling() {
func (ctx *InsertCtx) WriteDataPoint(at *auth.Token, labels []prompb.Label, timestamp int64, value float64) error {
ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], at.AccountID, at.ProjectID, labels)
storageNodeIdx := ctx.GetStorageNodeIdx(at, labels)
return ctx.WriteDataPointExt(storageNodeIdx, ctx.MetricNameBuf, timestamp, value)
return ctx.writeDataPointToReplicas(storageNodeIdx, ctx.MetricNameBuf, timestamp, value)
}
// WriteDataPointExt writes the given metricNameRaw with (timestmap, value) to ctx buffer with the given storageNodeIdx.
func (ctx *InsertCtx) WriteDataPointExt(storageNodeIdx int, metricNameRaw []byte, timestamp int64, value float64) error {
return ctx.writeDataPointToReplicas(storageNodeIdx, metricNameRaw, timestamp, value)
}
func (ctx *InsertCtx) writeDataPointToReplicas(storageNodeIdx int, metricNameRaw []byte, timestamp int64, value float64) error {
var firstErr error
var failsCount int
for i := 0; i < replicas; i++ {
snIdx := storageNodeIdx + i
if snIdx >= len(ctx.snb.sns) {
snIdx %= len(ctx.snb.sns)
}
if err := ctx.writeDataPointExt(snIdx, metricNameRaw, timestamp, value); err != nil {
if replicas == 1 {
return fmt.Errorf("cannot write datapoint: %w", err)
}
if firstErr == nil {
firstErr = err
}
failsCount++
// The data is partially replicated, so just emit a warning and return true.
// We could retry sending the data again, but this may result in uncontrolled duplicate data.
// So it is better returning true.
br := &ctx.bufRowss[snIdx]
rowsIncompletelyReplicatedTotal.Add(br.rows)
incompleteReplicationLogger.Warnf("cannot make a copy #%d out of %d copies according to -replicationFactor=%d, used_nodes=%d for %d bytes with %d rows, "+
"since a part of storage nodes is temporarily unavailable", i+1, replicas, *replicationFactor, len(br.buf), br.rows)
continue
}
}
if failsCount == replicas {
return fmt.Errorf("cannot write datapoint to any replicas: %w", firstErr)
}
return nil
}
func (ctx *InsertCtx) writeDataPointExt(storageNodeIdx int, metricNameRaw []byte, timestamp int64, value float64) error {
br := &ctx.bufRowss[storageNodeIdx]
snb := ctx.snb
sn := snb.sns[storageNodeIdx]

View file

@ -45,6 +45,8 @@ var (
"See also -disableRerouting")
)
var replicas int
var errStorageReadOnly = errors.New("storage node is read only")
func (sn *storageNode) isReady() bool {
@ -140,15 +142,6 @@ again:
}
func (sn *storageNode) run(snb *storageNodesBucket, snIdx int) {
replicas := *replicationFactor
if replicas <= 0 {
replicas = 1
}
sns := snb.sns
if replicas > len(sns) {
replicas = len(sns)
}
sn.readOnlyCheckerWG.Add(1)
go func() {
defer sn.readOnlyCheckerWG.Done()
@ -193,7 +186,7 @@ func (sn *storageNode) run(snb *storageNodesBucket, snIdx int) {
continue
}
// Send br to replicas storage nodes starting from snIdx.
for !sendBufToReplicasNonblocking(snb, &br, snIdx, replicas) {
for !sendBufToSnNonblocking(snb, &br, snIdx) {
d := timeutil.AddJitterToDuration(time.Millisecond * 200)
t := timerpool.Get(d)
select {
@ -206,51 +199,39 @@ func (sn *storageNode) run(snb *storageNodesBucket, snIdx int) {
sn.checkHealth()
}
}
if sn.isBufferFull.CompareAndSwap(true, false) {
logger.Infof("transited node=%s to non-full", sn.dialer.Addr())
}
br.reset()
}
}
func sendBufToReplicasNonblocking(snb *storageNodesBucket, br *bufRows, snIdx, replicas int) bool {
usedStorageNodes := make(map[*storageNode]struct{}, replicas)
func sendBufToSnNonblocking(snb *storageNodesBucket, br *bufRows, snIdx int) bool {
sns := snb.sns
for i := 0; i < replicas; i++ {
idx := snIdx + i
idx := snIdx
attempts := 0
for {
attempts++
if attempts > len(sns) {
if i == 0 {
// The data wasn't replicated at all.
cannotReplicateLogger.Warnf("cannot push %d bytes with %d rows to storage nodes, since all the nodes are temporarily unavailable; "+
"re-trying to send the data soon", len(br.buf), br.rows)
return false
}
// The data is partially replicated, so just emit a warning and return true.
// We could retry sending the data again, but this may result in uncontrolled duplicate data.
// So it is better returning true.
rowsIncompletelyReplicatedTotal.Add(br.rows)
incompleteReplicationLogger.Warnf("cannot make a copy #%d out of %d copies according to -replicationFactor=%d for %d bytes with %d rows, "+
"since a part of storage nodes is temporarily unavailable", i+1, replicas, *replicationFactor, len(br.buf), br.rows)
return true
}
if idx >= len(sns) {
idx %= len(sns)
}
sn := sns[idx]
idx++
if _, ok := usedStorageNodes[sn]; ok {
// The br has been already replicated to sn. Skip it.
continue
}
if !sn.sendBufRowsNonblocking(br) {
// Cannot send data to sn. Go to the next sn.
continue
}
// Successfully sent data to sn.
usedStorageNodes[sn] = struct{}{}
break
}
}
return true
}
@ -413,6 +394,8 @@ type storageNode struct {
// In this case the data is re-routed to the remaining healthy vmstorage nodes.
isBroken atomic.Bool
isBufferFull atomic.Bool
// isReadOnly is set to true if the given vmstorage node is read only
// In this case the data is re-routed to the remaining healthy vmstorage nodes.
isReadOnly atomic.Bool
@ -505,6 +488,13 @@ func setStorageNodesBucket(snb *storageNodesBucket) {
//
// Call MustStop when the initialized vmstorage connections are no longer needed.
func Init(addrs []string, hashSeed uint64) {
replicas = *replicationFactor
if replicas <= 0 {
replicas = 1
}
if replicas > len(addrs) {
replicas = len(addrs)
}
snb := initStorageNodes(addrs, hashSeed)
setStorageNodesBucket(snb)
}
@ -772,7 +762,7 @@ var noStorageNodesLogger = logger.WithThrottler("storageNodesUnavailable", 5*tim
func getNotReadyStorageNodeIdxs(snb *storageNodesBucket, dst []int, snExtra *storageNode) []int {
dst = dst[:0]
for i, sn := range snb.sns {
if sn == snExtra || !sn.isReady() {
if sn == snExtra || !sn.isReady() || sn.isBufferFull.Load() {
dst = append(dst, i)
}
}
@ -787,10 +777,17 @@ func (sn *storageNode) trySendBuf(buf []byte, rows int) bool {
sent := false
sn.brLock.Lock()
if sn.isReady() && len(sn.br.buf)+len(buf) <= maxBufSizePerStorageNode {
if !sn.isReady() {
return sent
}
if len(sn.br.buf)+len(buf) <= maxBufSizePerStorageNode {
sn.br.buf = append(sn.br.buf, buf...)
sn.br.rows += rows
sent = true
} else {
if sn.isBufferFull.CompareAndSwap(false, true) {
logger.Infof("node: %s transited to full", sn.dialer.Addr())
}
}
sn.brLock.Unlock()
return sent