diff --git a/app/vminsert/netstorage/insert_ctx.go b/app/vminsert/netstorage/insert_ctx.go index efb77a663..4438fdf06 100644 --- a/app/vminsert/netstorage/insert_ctx.go +++ b/app/vminsert/netstorage/insert_ctx.go @@ -122,11 +122,48 @@ func (ctx *InsertCtx) ApplyRelabeling() { func (ctx *InsertCtx) WriteDataPoint(at *auth.Token, labels []prompb.Label, timestamp int64, value float64) error { ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], at.AccountID, at.ProjectID, labels) storageNodeIdx := ctx.GetStorageNodeIdx(at, labels) - return ctx.WriteDataPointExt(storageNodeIdx, ctx.MetricNameBuf, timestamp, value) + return ctx.writeDataPointToReplicas(storageNodeIdx, ctx.MetricNameBuf, timestamp, value) } // WriteDataPointExt writes the given metricNameRaw with (timestmap, value) to ctx buffer with the given storageNodeIdx. func (ctx *InsertCtx) WriteDataPointExt(storageNodeIdx int, metricNameRaw []byte, timestamp int64, value float64) error { + return ctx.writeDataPointToReplicas(storageNodeIdx, metricNameRaw, timestamp, value) +} + +func (ctx *InsertCtx) writeDataPointToReplicas(storageNodeIdx int, metricNameRaw []byte, timestamp int64, value float64) error { + var firstErr error + var failsCount int + for i := 0; i < replicas; i++ { + snIdx := storageNodeIdx + i + if snIdx >= len(ctx.snb.sns) { + snIdx %= len(ctx.snb.sns) + } + + if err := ctx.writeDataPointExt(snIdx, metricNameRaw, timestamp, value); err != nil { + if replicas == 1 { + return fmt.Errorf("cannot write datapoint: %w", err) + } + if firstErr == nil { + firstErr = err + } + failsCount++ + // The data is partially replicated, so just emit a warning and return true. + // We could retry sending the data again, but this may result in uncontrolled duplicate data. + // So it is better returning true. + br := &ctx.bufRowss[snIdx] + rowsIncompletelyReplicatedTotal.Add(br.rows) + incompleteReplicationLogger.Warnf("cannot make a copy #%d out of %d copies according to -replicationFactor=%d, used_nodes=%d for %d bytes with %d rows, "+ + "since a part of storage nodes is temporarily unavailable", i+1, replicas, *replicationFactor, len(br.buf), br.rows) + continue + } + } + if failsCount == replicas { + return fmt.Errorf("cannot write datapoint to any replicas: %w", firstErr) + } + return nil +} + +func (ctx *InsertCtx) writeDataPointExt(storageNodeIdx int, metricNameRaw []byte, timestamp int64, value float64) error { br := &ctx.bufRowss[storageNodeIdx] snb := ctx.snb sn := snb.sns[storageNodeIdx] diff --git a/app/vminsert/netstorage/netstorage.go b/app/vminsert/netstorage/netstorage.go index aa006512d..ea51289ad 100644 --- a/app/vminsert/netstorage/netstorage.go +++ b/app/vminsert/netstorage/netstorage.go @@ -45,6 +45,8 @@ var ( "See also -disableRerouting") ) +var replicas int + var errStorageReadOnly = errors.New("storage node is read only") func (sn *storageNode) isReady() bool { @@ -140,15 +142,6 @@ again: } func (sn *storageNode) run(snb *storageNodesBucket, snIdx int) { - replicas := *replicationFactor - if replicas <= 0 { - replicas = 1 - } - sns := snb.sns - if replicas > len(sns) { - replicas = len(sns) - } - sn.readOnlyCheckerWG.Add(1) go func() { defer sn.readOnlyCheckerWG.Done() @@ -193,7 +186,7 @@ func (sn *storageNode) run(snb *storageNodesBucket, snIdx int) { continue } // Send br to replicas storage nodes starting from snIdx. - for !sendBufToReplicasNonblocking(snb, &br, snIdx, replicas) { + for !sendBufToSnNonblocking(snb, &br, snIdx) { d := timeutil.AddJitterToDuration(time.Millisecond * 200) t := timerpool.Get(d) select { @@ -206,50 +199,38 @@ func (sn *storageNode) run(snb *storageNodesBucket, snIdx int) { sn.checkHealth() } } + if sn.isBufferFull.CompareAndSwap(true, false) { + logger.Infof("transited node=%s to non-full", sn.dialer.Addr()) + } + br.reset() } } -func sendBufToReplicasNonblocking(snb *storageNodesBucket, br *bufRows, snIdx, replicas int) bool { - usedStorageNodes := make(map[*storageNode]struct{}, replicas) +func sendBufToSnNonblocking(snb *storageNodesBucket, br *bufRows, snIdx int) bool { sns := snb.sns - for i := 0; i < replicas; i++ { - idx := snIdx + i - attempts := 0 - for { - attempts++ - if attempts > len(sns) { - if i == 0 { - // The data wasn't replicated at all. - cannotReplicateLogger.Warnf("cannot push %d bytes with %d rows to storage nodes, since all the nodes are temporarily unavailable; "+ - "re-trying to send the data soon", len(br.buf), br.rows) - return false - } - // The data is partially replicated, so just emit a warning and return true. - // We could retry sending the data again, but this may result in uncontrolled duplicate data. - // So it is better returning true. - rowsIncompletelyReplicatedTotal.Add(br.rows) - incompleteReplicationLogger.Warnf("cannot make a copy #%d out of %d copies according to -replicationFactor=%d for %d bytes with %d rows, "+ - "since a part of storage nodes is temporarily unavailable", i+1, replicas, *replicationFactor, len(br.buf), br.rows) - return true - } - if idx >= len(sns) { - idx %= len(sns) - } - sn := sns[idx] - idx++ - if _, ok := usedStorageNodes[sn]; ok { - // The br has been already replicated to sn. Skip it. - continue - } - if !sn.sendBufRowsNonblocking(br) { - // Cannot send data to sn. Go to the next sn. - continue - } - // Successfully sent data to sn. - usedStorageNodes[sn] = struct{}{} - break + idx := snIdx + attempts := 0 + for { + attempts++ + if attempts > len(sns) { + // The data wasn't replicated at all. + cannotReplicateLogger.Warnf("cannot push %d bytes with %d rows to storage nodes, since all the nodes are temporarily unavailable; "+ + "re-trying to send the data soon", len(br.buf), br.rows) + return false } + if idx >= len(sns) { + idx %= len(sns) + } + sn := sns[idx] + idx++ + + if !sn.sendBufRowsNonblocking(br) { + // Cannot send data to sn. Go to the next sn. + continue + } + // Successfully sent data to sn. + break } return true } @@ -413,6 +394,8 @@ type storageNode struct { // In this case the data is re-routed to the remaining healthy vmstorage nodes. isBroken atomic.Bool + isBufferFull atomic.Bool + // isReadOnly is set to true if the given vmstorage node is read only // In this case the data is re-routed to the remaining healthy vmstorage nodes. isReadOnly atomic.Bool @@ -505,6 +488,13 @@ func setStorageNodesBucket(snb *storageNodesBucket) { // // Call MustStop when the initialized vmstorage connections are no longer needed. func Init(addrs []string, hashSeed uint64) { + replicas = *replicationFactor + if replicas <= 0 { + replicas = 1 + } + if replicas > len(addrs) { + replicas = len(addrs) + } snb := initStorageNodes(addrs, hashSeed) setStorageNodesBucket(snb) } @@ -772,7 +762,7 @@ var noStorageNodesLogger = logger.WithThrottler("storageNodesUnavailable", 5*tim func getNotReadyStorageNodeIdxs(snb *storageNodesBucket, dst []int, snExtra *storageNode) []int { dst = dst[:0] for i, sn := range snb.sns { - if sn == snExtra || !sn.isReady() { + if sn == snExtra || !sn.isReady() || sn.isBufferFull.Load() { dst = append(dst, i) } } @@ -787,10 +777,17 @@ func (sn *storageNode) trySendBuf(buf []byte, rows int) bool { sent := false sn.brLock.Lock() - if sn.isReady() && len(sn.br.buf)+len(buf) <= maxBufSizePerStorageNode { + if !sn.isReady() { + return sent + } + if len(sn.br.buf)+len(buf) <= maxBufSizePerStorageNode { sn.br.buf = append(sn.br.buf, buf...) sn.br.rows += rows sent = true + } else { + if sn.isBufferFull.CompareAndSwap(false, true) { + logger.Infof("node: %s transited to full", sn.dialer.Addr()) + } } sn.brLock.Unlock() return sent