app/vminsert/netstorage: tune re-routing algorithm

Do not re-route data to unavailable storage node. Send it to the remaining storage nodes instead
even if they cannot keep up with the load. This should spread the load more evenly among available
storage nodes.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/791
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1054
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1165
This commit is contained in:
Aliaksandr Valialkin 2021-06-05 16:16:16 +03:00
parent e7d353ee6a
commit 0d067eb112

View file

@ -66,7 +66,7 @@ again:
}
sn.brLock.Unlock()
// The vmstorage node is temporarily broken. Re-route buf to healthy vmstorage nodes even if *disableRerouting==true.
if err := rerouteRowsMayBlock(sn, buf, rows); err != nil {
if err := rerouteRowsMayBlock(sn, false, buf, rows); err != nil {
return fmt.Errorf("%d rows dropped because the current vsmtorage is unavailable and %w", rows, err)
}
return nil
@ -87,7 +87,7 @@ again:
// The buf contents doesn't fit sn.buf.
// This means that the current vmstorage is slow or will become broken soon.
// Spread buf among all the vmstorage nodes.
if err := rerouteRowsMayBlock(sn, buf, rows); err != nil {
if err := rerouteRowsMayBlock(sn, true, buf, rows); err != nil {
return fmt.Errorf("%d rows dropped because the current vmstorage buf is full and %w", rows, err)
}
return nil
@ -458,33 +458,34 @@ func Stop() {
storageNodesWG.Wait()
}
// rerouteRowsMayBlock re-routes rows from buf among healthy storage nodes excluding the snExclude.
// rerouteRowsMayBlock re-routes rows from buf among healthy storage nodes.
//
// It waits until healthy storage nodes have enough space for the re-routed rows.
// This guarantees backpressure if the ingestion rate exceeds vmstorage nodes'
// ingestion rate capacity.
//
// It returns non-nil error only if Stop is called.
func rerouteRowsMayBlock(snExclude *storageNode, buf []byte, rows int) error {
func rerouteRowsMayBlock(snSource *storageNode, mayUseSNSource bool, buf []byte, rows int) error {
if len(storageNodes) < 2 {
logger.Panicf("BUG: re-routing can work only if at least 2 storage nodes are configured; got %d nodes", len(storageNodes))
}
var mr storage.MetricRow
sns := getStorageNodesForRerouting(snExclude)
if len(sns) == 1 {
// Fast path: only a single storage node is available for re-routing.
sn := sns[0]
if sn.sendBufMayBlock(buf) {
if sn != snExclude {
snExclude.rowsReroutedFromHere.Add(rows)
sn.rowsReroutedToHere.Add(rows)
}
return nil
}
}
reroutesTotal.Inc()
atomic.StoreUint64(&snExclude.lastRerouteTime, fasttime.UnixTimestamp())
sns := getStorageNodesMapForRerouting(snSource, mayUseSNSource)
if areStorageNodesEqual(sns) {
// Fast path - all the storage nodes are the same - send the buf to them.
sn := sns[0]
if !sn.sendBufMayBlock(buf) {
return fmt.Errorf("cannot re-route data because of graceful shutdown")
}
if sn != snSource {
snSource.rowsReroutedFromHere.Add(rows)
sn.rowsReroutedToHere.Add(rows)
}
return nil
}
atomic.StoreUint64(&snSource.lastRerouteTime, fasttime.UnixTimestamp())
src := buf
var mr storage.MetricRow
for len(src) > 0 {
tail, err := mr.UnmarshalX(src)
if err != nil {
@ -495,44 +496,19 @@ func rerouteRowsMayBlock(snExclude *storageNode, buf []byte, rows int) error {
reroutedRowsProcessed.Inc()
h := xxhash.Sum64(mr.MetricNameRaw)
mr.ResetX()
for {
idx := h % uint64(len(sns))
sn := sns[idx]
if sn.sendBufMayBlock(rowBuf) {
// The row has been successfully re-routed to sn.
if sn != snExclude {
snExclude.rowsReroutedFromHere.Inc()
sn.rowsReroutedToHere.Inc()
}
break
}
select {
case <-storageNodesStopCh:
return fmt.Errorf("cannot re-route %d rows because of graceful shutdown", rows)
default:
}
// Refresh the list of healthy storage nodes, since sn became broken.
sns = getStorageNodesForRerouting(snExclude)
idx := h % uint64(len(sns))
sn := sns[idx]
if !sn.sendBufMayBlock(rowBuf) {
return fmt.Errorf("cannot re-route data because of graceful shutdown")
}
if sn != snSource {
snSource.rowsReroutedFromHere.Inc()
sn.rowsReroutedToHere.Inc()
}
}
return nil
}
func getStorageNodesForRerouting(snExclude *storageNode) []*storageNode {
sns := make([]*storageNode, 0, len(storageNodes))
currentTime := fasttime.UnixTimestamp()
for _, sn := range storageNodes {
if sn != snExclude && !sn.isBroken() && currentTime > atomic.LoadUint64(&sn.lastRerouteTime)+5 {
sns = append(sns, sn)
}
}
if len(sns) == 0 {
// There are no suitable storage nodes for re-routing. Fall back to snExclude.
sns = append(sns, snExclude)
}
return sns
}
func (sn *storageNode) sendBufMayBlock(buf []byte) bool {
sn.brLock.Lock()
for len(sn.br.buf)+len(buf) > maxBufSizePerStorageNode {
@ -542,10 +518,6 @@ func (sn *storageNode) sendBufMayBlock(buf []byte) bool {
return false
default:
}
if sn.isBroken() {
sn.brLock.Unlock()
return false
}
sn.brCond.Wait()
}
sn.br.buf = append(sn.br.buf, buf...)
@ -554,6 +526,54 @@ func (sn *storageNode) sendBufMayBlock(buf []byte) bool {
return true
}
func getStorageNodesMapForRerouting(snExclude *storageNode, mayUseSNExclude bool) []*storageNode {
sns := getStorageNodesForRerouting(snExclude, true)
if len(sns) == len(storageNodes) {
return sns
}
if !mayUseSNExclude {
sns = getStorageNodesForRerouting(snExclude, false)
}
for len(sns) < len(storageNodes) {
sns = append(sns, snExclude)
}
return sns
}
func areStorageNodesEqual(sns []*storageNode) bool {
snOrigin := sns[0]
for _, sn := range sns[1:] {
if sn != snOrigin {
return false
}
}
return true
}
func getStorageNodesForRerouting(snExclude *storageNode, skipRecentlyReroutedNodes bool) []*storageNode {
sns := make([]*storageNode, 0, len(storageNodes))
currentTime := fasttime.UnixTimestamp()
for i, sn := range storageNodes {
if sn == snExclude || sn.isBroken() {
// Skip snExclude and broken storage nodes.
continue
}
if skipRecentlyReroutedNodes && currentTime <= atomic.LoadUint64(&sn.lastRerouteTime)+5 {
// Skip nodes, which were re-routed recently.
continue
}
for len(sns) <= i {
sns = append(sns, sn)
}
}
if len(sns) > 0 {
for len(sns) < len(storageNodes) {
sns = append(sns, sns[0])
}
}
return sns
}
var (
maxBufSizePerStorageNode int