app/vminsert: remove useless delays when sending data to vmstorage

This improves the maximum data ingestion performance for cluster VictoriaMetrics

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/791
This commit is contained in:
Aliaksandr Valialkin 2020-09-28 21:35:40 +03:00
parent 1481d6d8ff
commit 9d123eb22a

View file

@ -17,6 +17,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/metrics"
xxhash "github.com/cespare/xxhash/v2"
)
@ -92,7 +93,7 @@ func (sn *storageNode) run(stopCh <-chan struct{}, snIdx int) {
replicas = len(storageNodes)
}
ticker := time.NewTicker(time.Second)
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
var br bufRows
brLastResetTime := fasttime.UnixTimestamp()
@ -103,8 +104,8 @@ func (sn *storageNode) run(stopCh <-chan struct{}, snIdx int) {
bufLen := len(sn.br.buf)
sn.brLock.Unlock()
waitCh = nil
if len(br.buf) == 0 && bufLen > maxBufSizePerStorageNode/4 {
// Do not sleep, since sn.br.buf contains enough data to process.
if bufLen > 0 {
// Do not sleep if sn.br.buf isn't empty.
waitCh = closedCh
}
select {
@ -115,33 +116,37 @@ func (sn *storageNode) run(stopCh <-chan struct{}, snIdx int) {
case <-ticker.C:
case <-waitCh:
}
if len(br.buf) == 0 {
sn.brLock.Lock()
sn.br, br = br, sn.br
sn.brLock.Unlock()
}
sn.brLock.Lock()
sn.br, br = br, sn.br
sn.brLock.Unlock()
currentTime := fasttime.UnixTimestamp()
if len(br.buf) < cap(br.buf)/4 && currentTime-brLastResetTime > 10 {
// Free up capacity space occupied by br.buf in order to reduce memory usage after spikes.
br.buf = append(br.buf[:0:0], br.buf...)
brLastResetTime = currentTime
}
sn.checkHealth()
if len(br.buf) == 0 {
// Nothing to send. Just check sn health, so it could be returned to non-broken state.
sn.checkHealth()
// Nothing to send.
continue
}
// Send br to replicas storageNodes starting from snIdx.
if !sendBufToReplicas(&br, snIdx, replicas) {
// do not reset br in the hope it will be sent next time.
continue
for !sendBufToReplicasNonblocking(&br, snIdx, replicas) {
t := timerpool.Get(200 * time.Millisecond)
select {
case <-stopCh:
timerpool.Put(t)
return
case <-t.C:
timerpool.Put(t)
sn.checkHealth()
}
}
br.reset()
}
}
func sendBufToReplicas(br *bufRows, snIdx, replicas int) bool {
func sendBufToReplicasNonblocking(br *bufRows, snIdx, replicas int) bool {
usedStorageNodes := make(map[*storageNode]bool, replicas)
for i := 0; i < replicas; i++ {
idx := snIdx + i
@ -172,7 +177,7 @@ func sendBufToReplicas(br *bufRows, snIdx, replicas int) bool {
// The br has been already replicated to sn. Skip it.
continue
}
if !sn.sendBufRows(br) {
if !sn.sendBufRowsNonblocking(br) {
// Cannot send data to sn. Go to the next sn.
continue
}
@ -188,50 +193,48 @@ func (sn *storageNode) checkHealth() {
sn.bcLock.Lock()
defer sn.bcLock.Unlock()
if !sn.isBroken() {
return
}
if sn.bc != nil {
logger.Panicf("BUG: sn.bc must be nil when sn is broken; got %p", sn.bc)
// The sn looks healthy.
return
}
bc, err := sn.dial()
if err != nil {
logger.Warnf("cannot dial storageNode %q: %s", sn.dialer.Addr(), err)
return
}
logger.Infof("successfully dialed -storageNode=%q", sn.dialer.Addr())
sn.bc = bc
atomic.StoreUint32(&sn.broken, 0)
}
func (sn *storageNode) sendBufRows(br *bufRows) bool {
func (sn *storageNode) sendBufRowsNonblocking(br *bufRows) bool {
if sn.isBroken() {
return false
}
sn.bcLock.Lock()
defer sn.bcLock.Unlock()
if sn.bc == nil {
bc, err := sn.dial()
if err != nil {
// Mark sn as broken in order to prevent sending additional data to it until it is recovered.
atomic.StoreUint32(&sn.broken, 1)
logger.Warnf("cannot dial storageNode %q: %s", sn.dialer.Addr(), err)
return false
}
sn.bc = bc
// Do not call sn.dial() here in order to prevent long blocking on sn.bcLock.Lock(),
// which can negatively impact data sending in sendBufToReplicasNonblocking().
// sn.dial() should be called by sn.checkHealth() un unsuccessful call to sendBufToReplicasNonblocking().
return false
}
err := sendToConn(sn.bc, br.buf)
if err == nil {
// Successfully sent buf to bc. Remove broken flag from sn.
atomic.StoreUint32(&sn.broken, 0)
// Successfully sent buf to bc.
sn.rowsSent.Add(br.rows)
return true
}
// Couldn't flush buf to sn. Mark sn as broken.
logger.Warnf("cannot send %d bytes with %d rows to %q: %s; re-routing this data to healthy storage nodes", len(br.buf), br.rows, sn.dialer.Addr(), err)
logger.Warnf("cannot send %d bytes with %d rows to -storageNode=%q: %s; closing the connection to storageNode and "+
"re-routing this data to healthy storage nodes", len(br.buf), br.rows, sn.dialer.Addr(), err)
if err = sn.bc.Close(); err != nil {
logger.Warnf("cannot close connection to storageNode %q: %s", sn.dialer.Addr(), err)
}
sn.bc = nil
sn.connectionErrors.Inc()
atomic.StoreUint32(&sn.broken, 1)
sn.connectionErrors.Inc()
return false
}
@ -302,7 +305,7 @@ func (sn *storageNode) dial() (*handshake.BufferedConn, error) {
}
func rerouteWorker(stopCh <-chan struct{}) {
ticker := time.NewTicker(time.Second)
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
var br bufRows
brLastResetTime := fasttime.UnixTimestamp()
@ -313,8 +316,8 @@ func rerouteWorker(stopCh <-chan struct{}) {
bufLen := len(reroutedBR.buf)
reroutedBRLock.Unlock()
waitCh = nil
if len(br.buf) == 0 && bufLen > reroutedBufMaxSize/4 {
// Do not sleep if reroutedBR contains enough data to process.
if bufLen > 0 {
// Do not sleep if reroutedBR contains data to process.
waitCh = closedCh
}
select {
@ -325,11 +328,9 @@ func rerouteWorker(stopCh <-chan struct{}) {
case <-ticker.C:
case <-waitCh:
}
if len(br.buf) == 0 {
reroutedBRLock.Lock()
reroutedBR, br = br, reroutedBR
reroutedBRLock.Unlock()
}
reroutedBRLock.Lock()
reroutedBR, br = br, reroutedBR
reroutedBRLock.Unlock()
reroutedBRCond.Broadcast()
currentTime := fasttime.UnixTimestamp()
if len(br.buf) < cap(br.buf)/4 && currentTime-brLastResetTime > 10 {
@ -341,16 +342,8 @@ func rerouteWorker(stopCh <-chan struct{}) {
// Nothing to re-route.
continue
}
sns := getHealthyStorageNodes()
if len(sns) == 0 {
// No more vmstorage nodes to write data to.
rerouteErrors.Inc()
logger.Errorf("cannot send rerouted rows because all the storage nodes are unhealthy")
// Do not reset br in the hope it could be sent next time.
continue
}
spreadReroutedBufToStorageNodes(sns, &br)
// There is no need in br.reset() here, since it is already done in spreadReroutedBufToStorageNodes.
spreadReroutedBufToStorageNodesBlocking(stopCh, &br)
br.reset()
}
// Notify all the blocked addToReroutedBufMayBlock callers, so they may finish the work.
reroutedBRCond.Broadcast()
@ -510,7 +503,7 @@ func addToReroutedBufMayBlock(buf []byte, rows int) error {
if getHealthyStorageNodesCount() == 0 {
rowsLostTotal.Add(rows)
return fmt.Errorf("all the vmstorage nodes are unavailable and reroutedBR has no enough space for storing %d bytes; "+
"only %d out of %d bytes left in reroutedBR",
"only %d free bytes left out of %d bytes in reroutedBR",
len(buf), reroutedBufMaxSize-len(reroutedBR.buf), reroutedBufMaxSize)
}
select {
@ -520,7 +513,7 @@ func addToReroutedBufMayBlock(buf []byte, rows int) error {
default:
}
// The reroutedBR.buf has no enough space for len(buf). Wait while the reroutedBR.buf is be sent by rerouteWorker.
// The reroutedBR.buf has no enough space for len(buf). Wait while the reroutedBR.buf is sent by rerouteWorker.
reroutedBufWaits.Inc()
reroutedBRCond.Wait()
}
@ -550,14 +543,32 @@ func getHealthyStorageNodes() []*storageNode {
return sns
}
func spreadReroutedBufToStorageNodes(sns []*storageNode, br *bufRows) {
func getHealthyStorageNodesBlocking(stopCh <-chan struct{}) []*storageNode {
// Wait for at least a single healthy storage node.
for {
sns := getHealthyStorageNodes()
if len(sns) > 0 {
return sns
}
// There is no healthy storage nodes.
// Wait for a while until such nodes appear.
t := timerpool.Get(time.Second)
select {
case <-stopCh:
timerpool.Put(t)
return nil
case <-t.C:
timerpool.Put(t)
}
}
}
func spreadReroutedBufToStorageNodesBlocking(stopCh <-chan struct{}, br *bufRows) {
var mr storage.MetricRow
rowsProcessed := 0
defer reroutedRowsProcessed.Add(rowsProcessed)
src := br.buf
dst := br.buf[:0]
dstRows := 0
for len(src) > 0 {
tail, err := mr.Unmarshal(src)
if err != nil {
@ -566,49 +577,44 @@ func spreadReroutedBufToStorageNodes(sns []*storageNode, br *bufRows) {
rowBuf := src[:len(src)-len(tail)]
src = tail
rowsProcessed++
idx := uint64(0)
if len(sns) > 1 {
h := xxhash.Sum64(mr.MetricNameRaw)
var h uint64
if len(storageNodes) > 1 {
// Do not use jump.Hash(h, int32(len(sns))) here,
// since this leads to uneven distribution of rerouted rows among sns -
// they all go to the original or to the next sn.
idx = h % uint64(len(sns))
h = xxhash.Sum64(mr.MetricNameRaw)
}
attempts := 0
for {
sn := sns[idx]
idx++
if idx >= uint64(len(sns)) {
idx = 0
}
attempts++
if attempts > len(sns) {
// All the storage nodes are broken.
// Return the remaining data to br.buf, so it may be processed later.
dst = append(dst, rowBuf...)
dst = append(dst, src...)
br.buf = dst
br.rows = dstRows + (br.rows - rowsProcessed + 1)
// Obtain fresh list of healthy storage nodes, since it may change with every iteration.
sns := getHealthyStorageNodesBlocking(stopCh)
if len(sns) == 0 {
// stopCh is notified to stop.
return
}
if sn.isBroken() {
// The sn is broken. Go to the next one.
continue
idx := h % uint64(len(sns))
sn := sns[idx]
if sn.sendReroutedRow(rowBuf) {
// The row has been successfully re-routed to sn.
break
}
if !sn.sendReroutedRow(rowBuf) {
// The row cannot be re-routed to sn. Return it back to the buf for rerouting.
// Do not re-route the row to the remaining storage nodes,
// since this may result in increased resource usage (CPU, memory, disk IO) on these nodes,
// because they'll have to accept and register new time series (this is resource-intensive operation).
dst = append(dst, rowBuf...)
dstRows++
// The row cannot be re-routed to sn. Wait for a while and try again.
// Do not re-route the row to the remaining storage nodes,
// since this may result in increased resource usage (CPU, memory, disk IO) on these nodes,
// because they'll have to accept and register new time series (this is resource-intensive operation).
//
// Do not skip rowBuf in the hope it may be sent later, since this wastes CPU time for no reason.
rerouteErrors.Inc()
t := timerpool.Get(200 * time.Millisecond)
select {
case <-stopCh:
// stopCh is notified to stop.
timerpool.Put(t)
return
case <-t.C:
timerpool.Put(t)
}
break
}
}
br.buf = dst
br.rows = dstRows
}
func (sn *storageNode) sendReroutedRow(buf []byte) bool {