From ffa6581c46e9e7b6d181f8501ca4bdf568e34671 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 29 Sep 2020 01:27:36 +0300 Subject: [PATCH] app/vminsert: refresh the list of healthy storage nodes only if the the row cannot be sent to destination storage node Previously the list had been generated for each rerouted row. This could consume additional CPU time during rerouting, which could lead to rerouting slowdown. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/791 --- app/vminsert/netstorage/netstorage.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/app/vminsert/netstorage/netstorage.go b/app/vminsert/netstorage/netstorage.go index b5004add8..091c63504 100644 --- a/app/vminsert/netstorage/netstorage.go +++ b/app/vminsert/netstorage/netstorage.go @@ -576,6 +576,11 @@ func spreadReroutedBufToStorageNodesBlocking(stopCh <-chan struct{}, br *bufRows rowsProcessed := 0 defer reroutedRowsProcessed.Add(rowsProcessed) + sns := getHealthyStorageNodesBlocking(stopCh) + if len(sns) == 0 { + // stopCh is notified to stop. + return + } src := br.buf for len(src) > 0 { tail, err := mr.Unmarshal(src) @@ -593,12 +598,6 @@ func spreadReroutedBufToStorageNodesBlocking(stopCh <-chan struct{}, br *bufRows h = xxhash.Sum64(mr.MetricNameRaw) } for { - // Obtain fresh list of healthy storage nodes, since it may change with every iteration. - sns := getHealthyStorageNodesBlocking(stopCh) - if len(sns) == 0 { - // stopCh is notified to stop. - return - } idx := h % uint64(len(sns)) sn := sns[idx] if sn.sendReroutedRow(rowBuf) { @@ -621,6 +620,12 @@ func spreadReroutedBufToStorageNodesBlocking(stopCh <-chan struct{}, br *bufRows case <-t.C: timerpool.Put(t) } + // Obtain fresh list of healthy storage nodes after the delay, since it may be already updated. + sns = getHealthyStorageNodesBlocking(stopCh) + if len(sns) == 0 { + // stopCh is notified to stop. + return + } } } }