Adds read-only mode for vmstorage node (#1680)

* adds read-only mode for vmstorage
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/269

* changes order a bit

* moves isFreeDiskLimitReached var to storage struct
renames functions to be consistent
change protoparser api - with optional storage limit check for given openned storage

* renames freeSpaceLimit to ReadOnly
This commit is contained in:
Nikolay 2021-10-08 12:52:56 +03:00 committed by GitHub
parent cbf01f7384
commit a171916ef5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 160 additions and 13 deletions

View file

@ -30,7 +30,7 @@ func InsertHandler(c net.Conn) error {
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(bc, func(rows []storage.MetricRow) error {
return insertRows(rows)
})
}, nil)
})
}

View file

@ -1,6 +1,7 @@
package netstorage
import (
"errors"
"flag"
"fmt"
"io"
@ -32,8 +33,10 @@ var (
disableRerouting = flag.Bool(`disableRerouting`, true, "Whether to disable re-routing when some of vmstorage nodes accept incoming data at slower speed compared to other storage nodes. Disabled re-routing limits the ingestion rate by the slowest vmstorage node. On the other side, disabled re-routing minimizes the number of active time series in the cluster during rolling restarts and during spikes in series churn rate")
)
func (sn *storageNode) isBroken() bool {
return atomic.LoadUint32(&sn.broken) != 0
var errorStorageReadOnly = errors.New("storage node is read only")
func (sn *storageNode) isNotReady() bool {
return atomic.LoadUint32(&sn.broken) != 0 || atomic.LoadUint32(&sn.isReadOnly) != 0
}
// push pushes buf to sn internal bufs.
@ -60,7 +63,7 @@ again:
return fmt.Errorf("cannot send %d rows because of graceful shutdown", rows)
default:
}
if sn.isBroken() {
if sn.isNotReady() {
if len(storageNodes) == 1 {
// There are no other storage nodes to re-route to. So wait until the current node becomes healthy.
sn.brCond.Wait()
@ -110,6 +113,7 @@ func (sn *storageNode) run(stopCh <-chan struct{}, snIdx int) {
replicas = len(storageNodes)
}
sn.startStorageReadOnlyCheck(stopCh)
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
var br bufRows
@ -234,7 +238,7 @@ func (sn *storageNode) checkHealth() {
}
func (sn *storageNode) sendBufRowsNonblocking(br *bufRows) bool {
if sn.isBroken() {
if sn.isNotReady() {
return false
}
sn.bcLock.Lock()
@ -255,6 +259,11 @@ func (sn *storageNode) sendBufRowsNonblocking(br *bufRows) bool {
sn.rowsSent.Add(br.rows)
return true
}
if errors.Is(err, errorStorageReadOnly) {
atomic.StoreUint32(&sn.isReadOnly, 1)
sn.brCond.Broadcast()
return false
}
// Couldn't flush buf to sn. Mark sn as broken.
logger.Warnf("cannot send %d bytes with %d rows to -storageNode=%q: %s; closing the connection to storageNode and "+
"re-routing this data to healthy storage nodes", len(br.buf), br.rows, sn.dialer.Addr(), err)
@ -307,9 +316,19 @@ func sendToConn(bc *handshake.BufferedConn, buf []byte) error {
if _, err := io.ReadFull(bc, sizeBuf.B[:1]); err != nil {
return fmt.Errorf("cannot read `ack` from vmstorage: %w", err)
}
if sizeBuf.B[0] != 1 {
return fmt.Errorf("unexpected `ack` received from vmstorage; got %d; want %d", sizeBuf.B[0], 1)
// vmstorage returns ack status response
// 1 - response ok, data written to storage
// 2 - storage is read only
ackResp := sizeBuf.B[0]
switch ackResp {
case 1:
case 2:
return errorStorageReadOnly
default:
return fmt.Errorf("unexpected `ack` received from vmstorage; got %d; want 1 or 2", sizeBuf.B[0])
}
return nil
}
@ -343,6 +362,10 @@ type storageNode struct {
// In this case the data is re-routed to the remaining healthy vmstorage nodes.
broken uint32
// isReadOnly is set to non-zero if the given vmstorage node is read only
// In this case the data is re-routed to the remaining healthy vmstorage nodes.
isReadOnly uint32
// brLock protects br.
brLock sync.Mutex
@ -449,11 +472,15 @@ func InitStorageNodes(addrs []string, seed byte) {
return float64(n)
})
_ = metrics.NewGauge(fmt.Sprintf(`vm_rpc_vmstorage_is_reachable{name="vminsert", addr=%q}`, addr), func() float64 {
if sn.isBroken() {
if atomic.LoadUint32(&sn.broken) != 0 {
return 0
}
return 1
})
_ = metrics.NewGauge(fmt.Sprintf(`vm_rpc_vmstorage_is_read_only{name="vminsert", addr=%q}`, addr), func() float64 {
return float64(atomic.LoadUint32(&sn.isReadOnly))
})
storageNodes = append(storageNodes, sn)
}
@ -548,6 +575,43 @@ func (sn *storageNode) sendBufMayBlock(buf []byte) bool {
return true
}
func (sn *storageNode) startStorageReadOnlyCheck(stop <-chan struct{}) {
f := func() {
// fast path
if atomic.LoadUint32(&sn.isReadOnly) == 0 {
return
}
sn.bcLock.Lock()
defer sn.bcLock.Unlock()
if sn.bc == nil {
return
}
// send nil buff to check ack response from storage
err := sendToConn(sn.bc, nil)
if err == nil {
atomic.StoreUint32(&sn.isReadOnly, 0)
return
}
if !errors.Is(err, errorStorageReadOnly) {
logger.Warnf("cannot check storage readd only status for -storageNode=%q: %s", sn.dialer.Addr(), err)
}
}
storageNodesWG.Add(1)
go func() {
t := time.NewTicker(time.Second * 30)
defer t.Stop()
storageNodesWG.Done()
for {
select {
case <-stop:
return
case <-t.C:
f()
}
}
}()
}
func getStorageNodesMapForRerouting(snExclude *storageNode, mayUseSNExclude bool) []*storageNode {
sns := getStorageNodesForRerouting(snExclude, true)
if len(sns) == len(storageNodes) {
@ -576,7 +640,7 @@ func getStorageNodesForRerouting(snExclude *storageNode, skipRecentlyReroutedNod
sns := make([]*storageNode, 0, len(storageNodes))
currentTime := fasttime.UnixTimestamp()
for i, sn := range storageNodes {
if sn == snExclude || sn.isBroken() {
if sn == snExclude || sn.isNotReady() {
// Skip snExclude and broken storage nodes.
continue
}

View file

@ -46,6 +46,9 @@ var (
"Excess series are logged and dropped. This can be useful for limiting series cardinality. See also -storage.maxDailySeries")
maxDailySeries = flag.Int("storage.maxDailySeries", 0, "The maximum number of unique series can be added to the storage during the last 24 hours. "+
"Excess series are logged and dropped. This can be useful for limiting series churn rate. See also -storage.maxHourlySeries")
minFreeDiskSpaceSizeBytes = flagutil.NewBytes("storage.minFreeDiskSpaceSize", 0, "Defines minimum free disk space size for storageDataPath. "+
"If limit is reached, storage becomes read-only and tells vminsert to reroute data for other storage nodes.")
)
func main() {
@ -61,6 +64,7 @@ func main() {
storage.SetFinalMergeDelay(*finalMergeDelay)
storage.SetBigMergeWorkersCount(*bigMergeConcurrency)
storage.SetSmallMergeWorkersCount(*smallMergeConcurrency)
storage.SetFreeDiskSpaceLimit(minFreeDiskSpaceSizeBytes.N)
logger.Infof("opening storage at %q with -retentionPeriod=%s", *storageDataPath, retentionPeriod)
startTime := time.Now()
@ -269,6 +273,17 @@ func registerStorageMetrics(strg *storage.Storage) {
return float64(fs.MustGetFreeSpace(*storageDataPath))
})
metrics.NewGauge(fmt.Sprintf(`vm_free_disk_space_limit_bytes{path=%q}`, *storageDataPath), func() float64 {
return float64(minFreeDiskSpaceSizeBytes.N)
})
metrics.NewGauge(`vm_storage_read_only`, func() float64 {
if strg.IsReadOnly() {
return 1
}
return 0
})
metrics.NewGauge(`vm_active_merges{type="storage/big"}`, func() float64 {
return float64(tm().ActiveBigMerges)
})

View file

@ -261,7 +261,7 @@ func (s *Server) processVMInsertConn(bc *handshake.BufferedConn) error {
return clusternative.ParseStream(bc, func(rows []storage.MetricRow) error {
vminsertMetricsRead.Add(len(rows))
return s.storage.AddRows(rows, uint8(*precisionBits))
})
}, s.storage.IsReadOnly)
}
var vminsertMetricsRead = metrics.NewCounter("vm_vminsert_metrics_read_total")

View file

@ -17,11 +17,13 @@ import (
)
// ParseStream parses data sent from vminsert to bc and calls callback for parsed rows.
// Optional function isReadOnly must return storage writable status
// If it's read only, this status will be propagated to vminsert.
//
// The callback can be called concurrently multiple times for streamed data from req.
//
// callback shouldn't hold block after returning.
func ParseStream(bc *handshake.BufferedConn, callback func(rows []storage.MetricRow) error) error {
func ParseStream(bc *handshake.BufferedConn, callback func(rows []storage.MetricRow) error, isReadOnly func() bool) error {
var wg sync.WaitGroup
var (
callbackErrLock sync.Mutex
@ -44,7 +46,7 @@ func ParseStream(bc *handshake.BufferedConn, callback func(rows []storage.Metric
}
uw.wg = &wg
var err error
uw.reqBuf, err = readBlock(uw.reqBuf[:0], bc)
uw.reqBuf, err = readBlock(uw.reqBuf[:0], bc, isReadOnly)
if err != nil {
wg.Wait()
if err == io.EOF {
@ -60,7 +62,7 @@ func ParseStream(bc *handshake.BufferedConn, callback func(rows []storage.Metric
}
// readBlock reads the next data block from vminsert-initiated bc, appends it to dst and returns the result.
func readBlock(dst []byte, bc *handshake.BufferedConn) ([]byte, error) {
func readBlock(dst []byte, bc *handshake.BufferedConn, isReadOnly func() bool) ([]byte, error) {
sizeBuf := sizeBufPool.Get()
defer sizeBufPool.Put(sizeBuf)
sizeBuf.B = bytesutil.Resize(sizeBuf.B, 8)
@ -71,6 +73,20 @@ func readBlock(dst []byte, bc *handshake.BufferedConn) ([]byte, error) {
}
return dst, err
}
if isReadOnly != nil && isReadOnly() {
// send `read only` ack to vminsert node
sizeBuf.B[0] = 2
if _, err := bc.Write(sizeBuf.B[:1]); err != nil {
writeErrors.Inc()
return dst, fmt.Errorf("cannot send storage full `ack` to vminsert: %w", err)
}
if err := bc.Flush(); err != nil {
writeErrors.Inc()
return dst, fmt.Errorf("cannot flush storage full `ack` to vminsert: %w", err)
}
return dst, nil
}
packetSize := encoding.UnmarshalUint64(sizeBuf.B)
if packetSize > consts.MaxInsertPacketSize {
parseErrors.Inc()

View file

@ -57,6 +57,8 @@ type Storage struct {
hourlySeriesLimitRowsDropped uint64
dailySeriesLimitRowsDropped uint64
isReadOnly uint32
path string
cachePath string
retentionMsecs int64
@ -118,6 +120,7 @@ type Storage struct {
currHourMetricIDsUpdaterWG sync.WaitGroup
nextDayMetricIDsUpdaterWG sync.WaitGroup
retentionWatcherWG sync.WaitGroup
freeSpaceWatcherWG sync.WaitGroup
// The snapshotLock prevents from concurrent creation of snapshots,
// since this may result in snapshots without recently added data,
@ -268,6 +271,7 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer
s.startCurrHourMetricIDsUpdater()
s.startNextDayMetricIDsUpdater()
s.startRetentionWatcher()
s.startFreeDiskSpaceWatcher()
return s, nil
}
@ -573,6 +577,53 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
s.tb.UpdateMetrics(&m.TableMetrics)
}
var (
storageFreeSpaceLimitBytes uint64
)
// SetFreeDiskSpaceLimit sets the minimum free disk space size of current storage path
//
// The function must be called before opening or creating any storage.
func SetFreeDiskSpaceLimit(bytes int) {
storageFreeSpaceLimitBytes = uint64(bytes)
}
// IsReadOnly returns information is storage in read only mode
func (s *Storage) IsReadOnly() bool {
return atomic.LoadUint32(&s.isReadOnly) == 1
}
func (s *Storage) startFreeDiskSpaceWatcher() {
f := func() {
freeSpaceBytes := fs.MustGetFreeSpace(s.path)
// not enough free space
if freeSpaceBytes < storageFreeSpaceLimitBytes {
atomic.StoreUint32(&s.isReadOnly, 1)
return
}
atomic.StoreUint32(&s.isReadOnly, 0)
}
f()
s.freeSpaceWatcherWG.Add(1)
go func() {
defer s.freeSpaceWatcherWG.Done()
// zero value disables limit.
if storageFreeSpaceLimitBytes == 0 {
return
}
t := time.NewTicker(time.Minute)
defer t.Stop()
for {
select {
case <-s.stop:
return
case <-t.C:
f()
}
}
}()
}
func (s *Storage) startRetentionWatcher() {
s.retentionWatcherWG.Add(1)
go func() {
@ -687,6 +738,7 @@ func (s *Storage) resetAndSaveTSIDCache() {
func (s *Storage) MustClose() {
close(s.stop)
s.freeSpaceWatcherWG.Wait()
s.retentionWatcherWG.Wait()
s.currHourMetricIDsUpdaterWG.Wait()
s.nextDayMetricIDsUpdaterWG.Wait()