app/vminsert: refresh the list of healthy storage nodes only if the the row cannot be sent to destination storage node

Previously the list had been generated for each rerouted row. This could consume additional CPU time during rerouting,
which could lead to rerouting slowdown.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/791
This commit is contained in:
Aliaksandr Valialkin 2020-09-29 01:27:36 +03:00
parent 1e7452e501
commit ffa6581c46

View file

@ -576,6 +576,11 @@ func spreadReroutedBufToStorageNodesBlocking(stopCh <-chan struct{}, br *bufRows
rowsProcessed := 0
defer reroutedRowsProcessed.Add(rowsProcessed)
sns := getHealthyStorageNodesBlocking(stopCh)
if len(sns) == 0 {
// stopCh is notified to stop.
return
}
src := br.buf
for len(src) > 0 {
tail, err := mr.Unmarshal(src)
@ -593,12 +598,6 @@ func spreadReroutedBufToStorageNodesBlocking(stopCh <-chan struct{}, br *bufRows
h = xxhash.Sum64(mr.MetricNameRaw)
}
for {
// Obtain fresh list of healthy storage nodes, since it may change with every iteration.
sns := getHealthyStorageNodesBlocking(stopCh)
if len(sns) == 0 {
// stopCh is notified to stop.
return
}
idx := h % uint64(len(sns))
sn := sns[idx]
if sn.sendReroutedRow(rowBuf) {
@ -621,6 +620,12 @@ func spreadReroutedBufToStorageNodesBlocking(stopCh <-chan struct{}, br *bufRows
case <-t.C:
timerpool.Put(t)
}
// Obtain fresh list of healthy storage nodes after the delay, since it may be already updated.
sns = getHealthyStorageNodesBlocking(stopCh)
if len(sns) == 0 {
// stopCh is notified to stop.
return
}
}
}
}