app/{vminsert,vmstorage}: follow-up after a171916ef5

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/269
This commit is contained in:
Aliaksandr Valialkin 2021-10-08 13:52:56 +03:00
parent a171916ef5
commit 4fddcf4c83
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
7 changed files with 113 additions and 94 deletions

View file

@ -179,6 +179,10 @@ or [an alternative dashboard for VictoriaMetrics cluster](https://grafana.com/gr
It is recommended setting up alerts in [vmalert](https://docs.victoriametrics.com/vmalert.html) or in Prometheus from [this config](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/cluster/deployment/docker/alerts.yml).
## Readonly mode
`vmstorage` nodes automatically switch to readonly mode when the directory pointed by `-storageDataPath` contains less than `-storage.minFreeDiskSpaceBytes` of free space. `vminsert` nodes stop sending data to such nodes and start re-routing the data to the remaining `vmstorage` nodes.
## URL format
@ -777,6 +781,9 @@ Below is the output for `/path/to/vmstorage -help`:
The maximum number of unique series can be added to the storage during the last 24 hours. Excess series are logged and dropped. This can be useful for limiting series churn rate. See also -storage.maxHourlySeries
-storage.maxHourlySeries int
The maximum number of unique series can be added to the storage during the last hour. Excess series are logged and dropped. This can be useful for limiting series cardinality. See also -storage.maxDailySeries
-storage.minFreeDiskSpaceBytes size
The minimum free disk space at -storageDataPath after which the storage stops accepting new data
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 10000000)
-storageDataPath string
Path to storage data (default "vmstorage-data")
-tls

View file

@ -33,7 +33,7 @@ var (
disableRerouting = flag.Bool(`disableRerouting`, true, "Whether to disable re-routing when some of vmstorage nodes accept incoming data at slower speed compared to other storage nodes. Disabled re-routing limits the ingestion rate by the slowest vmstorage node. On the other side, disabled re-routing minimizes the number of active time series in the cluster during rolling restarts and during spikes in series churn rate")
)
var errorStorageReadOnly = errors.New("storage node is read only")
var errStorageReadOnly = errors.New("storage node is read only")
func (sn *storageNode) isNotReady() bool {
return atomic.LoadUint32(&sn.broken) != 0 || atomic.LoadUint32(&sn.isReadOnly) != 0
@ -113,7 +113,13 @@ func (sn *storageNode) run(stopCh <-chan struct{}, snIdx int) {
replicas = len(storageNodes)
}
sn.startStorageReadOnlyCheck(stopCh)
sn.readOnlyCheckerWG.Add(1)
go func() {
defer sn.readOnlyCheckerWG.Done()
sn.readOnlyChecker(stopCh)
}()
defer sn.readOnlyCheckerWG.Wait()
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
var br bufRows
@ -259,9 +265,12 @@ func (sn *storageNode) sendBufRowsNonblocking(br *bufRows) bool {
sn.rowsSent.Add(br.rows)
return true
}
if errors.Is(err, errorStorageReadOnly) {
if errors.Is(err, errStorageReadOnly) {
// The vmstorage is transitioned to readonly mode.
atomic.StoreUint32(&sn.isReadOnly, 1)
sn.brCond.Broadcast()
// Signal the caller that the data wasn't accepted by the vmstorage,
// so it will be re-routed to the remaining vmstorage nodes.
return false
}
// Couldn't flush buf to sn. Mark sn as broken.
@ -317,14 +326,13 @@ func sendToConn(bc *handshake.BufferedConn, buf []byte) error {
return fmt.Errorf("cannot read `ack` from vmstorage: %w", err)
}
// vmstorage returns ack status response
// 1 - response ok, data written to storage
// 2 - storage is read only
ackResp := sizeBuf.B[0]
switch ackResp {
case 1:
// ok response, data successfully accepted by vmstorage
case 2:
return errorStorageReadOnly
// vmstorage is in readonly mode
return errStorageReadOnly
default:
return fmt.Errorf("unexpected `ack` received from vmstorage; got %d; want 1 or 2", sizeBuf.B[0])
}
@ -379,6 +387,9 @@ type storageNode struct {
// bcLock protects bc.
bcLock sync.Mutex
// waitGroup for readOnlyChecker
readOnlyCheckerWG sync.WaitGroup
// bc is a single connection to vmstorage for data transfer.
// It must be accessed under bcLock.
bc *handshake.BufferedConn
@ -478,7 +489,6 @@ func InitStorageNodes(addrs []string, seed byte) {
return 1
})
_ = metrics.NewGauge(fmt.Sprintf(`vm_rpc_vmstorage_is_read_only{name="vminsert", addr=%q}`, addr), func() float64 {
return float64(atomic.LoadUint32(&sn.isReadOnly))
})
storageNodes = append(storageNodes, sn)
@ -575,41 +585,40 @@ func (sn *storageNode) sendBufMayBlock(buf []byte) bool {
return true
}
func (sn *storageNode) startStorageReadOnlyCheck(stop <-chan struct{}) {
f := func() {
// fast path
if atomic.LoadUint32(&sn.isReadOnly) == 0 {
func (sn *storageNode) readOnlyChecker(stop <-chan struct{}) {
ticker := time.NewTicker(time.Second * 30)
defer ticker.Stop()
for {
select {
case <-stop:
return
}
sn.bcLock.Lock()
defer sn.bcLock.Unlock()
if sn.bc == nil {
return
}
// send nil buff to check ack response from storage
err := sendToConn(sn.bc, nil)
if err == nil {
atomic.StoreUint32(&sn.isReadOnly, 0)
return
}
if !errors.Is(err, errorStorageReadOnly) {
logger.Warnf("cannot check storage readd only status for -storageNode=%q: %s", sn.dialer.Addr(), err)
case <-ticker.C:
sn.checkReadOnlyMode()
}
}
storageNodesWG.Add(1)
go func() {
t := time.NewTicker(time.Second * 30)
defer t.Stop()
storageNodesWG.Done()
for {
select {
case <-stop:
return
case <-t.C:
f()
}
}
}()
}
func (sn *storageNode) checkReadOnlyMode() {
if atomic.LoadUint32(&sn.isReadOnly) == 0 {
// fast path - the sn isn't in readonly mode
return
}
// Check whether the storage remains in readonly mode
sn.bcLock.Lock()
defer sn.bcLock.Unlock()
if sn.bc == nil {
return
}
// send nil buff to check ack response from storage
err := sendToConn(sn.bc, nil)
if err == nil {
// The storage switched from readonly to non-readonly mode
atomic.StoreUint32(&sn.isReadOnly, 0)
return
}
if !errors.Is(err, errStorageReadOnly) {
logger.Errorf("cannot check storage readonly mode for -storageNode=%q: %s", sn.dialer.Addr(), err)
}
}
func getStorageNodesMapForRerouting(snExclude *storageNode, mayUseSNExclude bool) []*storageNode {

View file

@ -47,8 +47,7 @@ var (
maxDailySeries = flag.Int("storage.maxDailySeries", 0, "The maximum number of unique series can be added to the storage during the last 24 hours. "+
"Excess series are logged and dropped. This can be useful for limiting series churn rate. See also -storage.maxHourlySeries")
minFreeDiskSpaceSizeBytes = flagutil.NewBytes("storage.minFreeDiskSpaceSize", 0, "Defines minimum free disk space size for storageDataPath. "+
"If limit is reached, storage becomes read-only and tells vminsert to reroute data for other storage nodes.")
minFreeDiskSpaceBytes = flagutil.NewBytes("storage.minFreeDiskSpaceBytes", 10e6, "The minimum free disk space at -storageDataPath after which the storage stops accepting new data")
)
func main() {
@ -64,7 +63,7 @@ func main() {
storage.SetFinalMergeDelay(*finalMergeDelay)
storage.SetBigMergeWorkersCount(*bigMergeConcurrency)
storage.SetSmallMergeWorkersCount(*smallMergeConcurrency)
storage.SetFreeDiskSpaceLimit(minFreeDiskSpaceSizeBytes.N)
storage.SetFreeDiskSpaceLimit(minFreeDiskSpaceBytes.N)
logger.Infof("opening storage at %q with -retentionPeriod=%s", *storageDataPath, retentionPeriod)
startTime := time.Now()
@ -274,10 +273,9 @@ func registerStorageMetrics(strg *storage.Storage) {
})
metrics.NewGauge(fmt.Sprintf(`vm_free_disk_space_limit_bytes{path=%q}`, *storageDataPath), func() float64 {
return float64(minFreeDiskSpaceSizeBytes.N)
return float64(minFreeDiskSpaceBytes.N)
})
metrics.NewGauge(`vm_storage_read_only`, func() float64 {
metrics.NewGauge(fmt.Sprintf(`vm_storage_is_read_only{path=%q}`, *storageDataPath), func() float64 {
if strg.IsReadOnly() {
return 1
}

View file

@ -8,6 +8,7 @@ sort: 15
* FEATURE: add ability to accept metrics from [DataDog agent](https://docs.datadoghq.com/agent/) and [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent). This option simplifies the migration path from DataDog to VictoriaMetrics. See also [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/206).
* FEATURE: vmagent [enterprise](https://victoriametrics.com/enterprise.html): add support for data reading and writing from/to [Apache Kafka](https://kafka.apache.org/). See [these docs](https://docs.victoriametrics.com/vmagent.html#kafka-integration).
* FEATURE: vmstorage: stop accepting new data if `-storageDataPath` directory contains less than `-storage.minFreeDiskSpaceBytes` of free space. This should prevent from `out of disk space` crashes. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/269).
* FEATURE: calculate quantiles in the same way as Prometheus does in such functions as [quantile_over_time](https://docs.victoriametrics.com/MetricsQL.html#quantile_over_time) and [quantile](https://docs.victoriametrics.com/MetricsQL.html#quantile). Previously results from VictoriaMetrics could be slightly different than results from Prometheus. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1625) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1612) issues.
* FEATURE: add `rollup_scrape_interval(m[d])` function to [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html), which returns `min`, `max` and `avg` values for the interval between samples for `m` on the given lookbehind window `d`.
* FEATURE: add `topk_last(k, q)` and `bottomk_last(k, q)` functions to [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html), which return up to `k` time series from `q` with the maximum / minimum last value on the graph.

View file

@ -183,6 +183,10 @@ or [an alternative dashboard for VictoriaMetrics cluster](https://grafana.com/gr
It is recommended setting up alerts in [vmalert](https://docs.victoriametrics.com/vmalert.html) or in Prometheus from [this config](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/cluster/deployment/docker/alerts.yml).
## Readonly mode
`vmstorage` nodes automatically switch to readonly mode when the directory pointed by `-storageDataPath` contains less than `-storage.minFreeDiskSpaceBytes` of free space. `vminsert` nodes stop sending data to such nodes and start re-routing the data to the remaining `vmstorage` nodes.
## URL format
@ -781,6 +785,9 @@ Below is the output for `/path/to/vmstorage -help`:
The maximum number of unique series can be added to the storage during the last 24 hours. Excess series are logged and dropped. This can be useful for limiting series churn rate. See also -storage.maxHourlySeries
-storage.maxHourlySeries int
The maximum number of unique series can be added to the storage during the last hour. Excess series are logged and dropped. This can be useful for limiting series cardinality. See also -storage.maxDailySeries
-storage.minFreeDiskSpaceBytes size
The minimum free disk space at -storageDataPath after which the storage stops accepting new data
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 10000000)
-storageDataPath string
Path to storage data (default "vmstorage-data")
-tls

View file

@ -17,8 +17,8 @@ import (
)
// ParseStream parses data sent from vminsert to bc and calls callback for parsed rows.
// Optional function isReadOnly must return storage writable status
// If it's read only, this status will be propagated to vminsert.
// Optional function isReadOnly must return true if the storage cannot accept new data.
// In thic case the data read from bc isn't accepted and the readonly status is sent back bc.
//
// The callback can be called concurrently multiple times for streamed data from req.
//
@ -63,8 +63,8 @@ func ParseStream(bc *handshake.BufferedConn, callback func(rows []storage.Metric
// readBlock reads the next data block from vminsert-initiated bc, appends it to dst and returns the result.
func readBlock(dst []byte, bc *handshake.BufferedConn, isReadOnly func() bool) ([]byte, error) {
sizeBuf := sizeBufPool.Get()
defer sizeBufPool.Put(sizeBuf)
sizeBuf := auxBufPool.Get()
defer auxBufPool.Put(sizeBuf)
sizeBuf.B = bytesutil.Resize(sizeBuf.B, 8)
if _, err := io.ReadFull(bc, sizeBuf.B); err != nil {
if err != io.EOF {
@ -73,20 +73,6 @@ func readBlock(dst []byte, bc *handshake.BufferedConn, isReadOnly func() bool) (
}
return dst, err
}
if isReadOnly != nil && isReadOnly() {
// send `read only` ack to vminsert node
sizeBuf.B[0] = 2
if _, err := bc.Write(sizeBuf.B[:1]); err != nil {
writeErrors.Inc()
return dst, fmt.Errorf("cannot send storage full `ack` to vminsert: %w", err)
}
if err := bc.Flush(); err != nil {
writeErrors.Inc()
return dst, fmt.Errorf("cannot flush storage full `ack` to vminsert: %w", err)
}
return dst, nil
}
packetSize := encoding.UnmarshalUint64(sizeBuf.B)
if packetSize > consts.MaxInsertPacketSize {
parseErrors.Inc()
@ -98,25 +84,42 @@ func readBlock(dst []byte, bc *handshake.BufferedConn, isReadOnly func() bool) (
readErrors.Inc()
return dst, fmt.Errorf("cannot read packet with size %d bytes: %w; read only %d bytes", packetSize, err, n)
}
// Send `ack` to vminsert that the packet has been received.
deadline := time.Now().Add(5 * time.Second)
if err := bc.SetWriteDeadline(deadline); err != nil {
writeErrors.Inc()
return dst, fmt.Errorf("cannot set write deadline for sending `ack` to vminsert: %w", err)
if isReadOnly != nil && isReadOnly() {
// The vmstorage is in readonly mode, so drop the read block of data
// and send `read only` status to vminsert.
dst = dst[:dstLen]
if err := sendAck(bc, 2); err != nil {
writeErrors.Inc()
return dst, fmt.Errorf("cannot send readonly status to vminsert: %w", err)
}
return dst, nil
}
sizeBuf.B[0] = 1
if _, err := bc.Write(sizeBuf.B[:1]); err != nil {
// Send `ack` to vminsert that the packet has been received.
if err := sendAck(bc, 1); err != nil {
writeErrors.Inc()
return dst, fmt.Errorf("cannot send `ack` to vminsert: %w", err)
}
if err := bc.Flush(); err != nil {
writeErrors.Inc()
return dst, fmt.Errorf("cannot flush `ack` to vminsert: %w", err)
}
return dst, nil
}
var sizeBufPool bytesutil.ByteBufferPool
func sendAck(bc *handshake.BufferedConn, status byte) error {
deadline := time.Now().Add(5 * time.Second)
if err := bc.SetWriteDeadline(deadline); err != nil {
return fmt.Errorf("cannot set write deadline: %w", err)
}
b := auxBufPool.Get()
defer auxBufPool.Put(b)
b.B[0] = status
if _, err := bc.Write(b.B[:1]); err != nil {
return err
}
if err := bc.Flush(); err != nil {
return err
}
return nil
}
var auxBufPool bytesutil.ByteBufferPool
var (
readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="clusternative"}`)

View file

@ -120,7 +120,7 @@ type Storage struct {
currHourMetricIDsUpdaterWG sync.WaitGroup
nextDayMetricIDsUpdaterWG sync.WaitGroup
retentionWatcherWG sync.WaitGroup
freeSpaceWatcherWG sync.WaitGroup
freeDiskSpaceWatcherWG sync.WaitGroup
// The snapshotLock prevents from concurrent creation of snapshots,
// since this may result in snapshots without recently added data,
@ -577,17 +577,15 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
s.tb.UpdateMetrics(&m.TableMetrics)
}
var (
storageFreeSpaceLimitBytes uint64
)
// SetFreeDiskSpaceLimit sets the minimum free disk space size of current storage path
//
// The function must be called before opening or creating any storage.
func SetFreeDiskSpaceLimit(bytes int) {
storageFreeSpaceLimitBytes = uint64(bytes)
freeDiskSpaceLimitBytes = uint64(bytes)
}
var freeDiskSpaceLimitBytes uint64
// IsReadOnly returns information is storage in read only mode
func (s *Storage) IsReadOnly() bool {
return atomic.LoadUint32(&s.isReadOnly) == 1
@ -596,28 +594,24 @@ func (s *Storage) IsReadOnly() bool {
func (s *Storage) startFreeDiskSpaceWatcher() {
f := func() {
freeSpaceBytes := fs.MustGetFreeSpace(s.path)
// not enough free space
if freeSpaceBytes < storageFreeSpaceLimitBytes {
if freeSpaceBytes < freeDiskSpaceLimitBytes {
// Switch the storage to readonly mode if there is no enough free space left at s.path
atomic.StoreUint32(&s.isReadOnly, 1)
return
}
atomic.StoreUint32(&s.isReadOnly, 0)
}
f()
s.freeSpaceWatcherWG.Add(1)
s.freeDiskSpaceWatcherWG.Add(1)
go func() {
defer s.freeSpaceWatcherWG.Done()
// zero value disables limit.
if storageFreeSpaceLimitBytes == 0 {
return
}
t := time.NewTicker(time.Minute)
defer t.Stop()
defer s.freeDiskSpaceWatcherWG.Done()
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-s.stop:
return
case <-t.C:
case <-ticker.C:
f()
}
}
@ -738,7 +732,7 @@ func (s *Storage) resetAndSaveTSIDCache() {
func (s *Storage) MustClose() {
close(s.stop)
s.freeSpaceWatcherWG.Wait()
s.freeDiskSpaceWatcherWG.Wait()
s.retentionWatcherWG.Wait()
s.currHourMetricIDsUpdaterWG.Wait()
s.nextDayMetricIDsUpdaterWG.Wait()