mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vmstorage: add vm_concurrent_addrows_*
metrics for tracking concurrency for Storage.AddRows calls
Track also the number of dropped rows due to the exceeded timeout on concurrency limit for Storage.AddRows. This number is tracked in `vm_concurrent_addrows_dropped_rows_total`
This commit is contained in:
parent
2d869c6d9b
commit
f56c1298ad
2 changed files with 47 additions and 6 deletions
|
@ -308,6 +308,22 @@ func registerStorageMetrics(strg *storage.Storage) {
|
||||||
return float64(m().TooSmallTimestampRows)
|
return float64(m().TooSmallTimestampRows)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
metrics.NewGauge(`vm_concurrent_addrows_limit_reached_total`, func() float64 {
|
||||||
|
return float64(m().AddRowsConcurrencyLimitReached)
|
||||||
|
})
|
||||||
|
metrics.NewGauge(`vm_concurrent_addrows_limit_timeout_total`, func() float64 {
|
||||||
|
return float64(m().AddRowsConcurrencyLimitTimeout)
|
||||||
|
})
|
||||||
|
metrics.NewGauge(`vm_concurrent_addrows_dropped_rows_total`, func() float64 {
|
||||||
|
return float64(m().AddRowsConcurrencyDroppedRows)
|
||||||
|
})
|
||||||
|
metrics.NewGauge(`vm_concurrent_addrows_capacity`, func() float64 {
|
||||||
|
return float64(m().AddRowsConcurrencyCapacity)
|
||||||
|
})
|
||||||
|
metrics.NewGauge(`vm_concurrent_addrows_current`, func() float64 {
|
||||||
|
return float64(m().AddRowsConcurrencyCurrent)
|
||||||
|
})
|
||||||
|
|
||||||
metrics.NewGauge(`vm_rows{type="storage/big"}`, func() float64 {
|
metrics.NewGauge(`vm_rows{type="storage/big"}`, func() float64 {
|
||||||
return float64(tm().BigRowsCount)
|
return float64(tm().BigRowsCount)
|
||||||
})
|
})
|
||||||
|
|
|
@ -68,6 +68,10 @@ type Storage struct {
|
||||||
|
|
||||||
tooSmallTimestampRows uint64
|
tooSmallTimestampRows uint64
|
||||||
tooBigTimestampRows uint64
|
tooBigTimestampRows uint64
|
||||||
|
|
||||||
|
addRowsConcurrencyLimitReached uint64
|
||||||
|
addRowsConcurrencyLimitTimeout uint64
|
||||||
|
addRowsConcurrencyDroppedRows uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// OpenStorage opens storage on the given path with the given number of retention months.
|
// OpenStorage opens storage on the given path with the given number of retention months.
|
||||||
|
@ -277,6 +281,12 @@ type Metrics struct {
|
||||||
TooSmallTimestampRows uint64
|
TooSmallTimestampRows uint64
|
||||||
TooBigTimestampRows uint64
|
TooBigTimestampRows uint64
|
||||||
|
|
||||||
|
AddRowsConcurrencyLimitReached uint64
|
||||||
|
AddRowsConcurrencyLimitTimeout uint64
|
||||||
|
AddRowsConcurrencyDroppedRows uint64
|
||||||
|
AddRowsConcurrencyCapacity uint64
|
||||||
|
AddRowsConcurrencyCurrent uint64
|
||||||
|
|
||||||
TSIDCacheSize uint64
|
TSIDCacheSize uint64
|
||||||
TSIDCacheSizeBytes uint64
|
TSIDCacheSizeBytes uint64
|
||||||
TSIDCacheRequests uint64
|
TSIDCacheRequests uint64
|
||||||
|
@ -317,6 +327,12 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
|
||||||
m.TooSmallTimestampRows += atomic.LoadUint64(&s.tooSmallTimestampRows)
|
m.TooSmallTimestampRows += atomic.LoadUint64(&s.tooSmallTimestampRows)
|
||||||
m.TooBigTimestampRows += atomic.LoadUint64(&s.tooBigTimestampRows)
|
m.TooBigTimestampRows += atomic.LoadUint64(&s.tooBigTimestampRows)
|
||||||
|
|
||||||
|
m.AddRowsConcurrencyLimitReached += atomic.LoadUint64(&s.addRowsConcurrencyLimitReached)
|
||||||
|
m.AddRowsConcurrencyLimitTimeout += atomic.LoadUint64(&s.addRowsConcurrencyLimitTimeout)
|
||||||
|
m.AddRowsConcurrencyDroppedRows += atomic.LoadUint64(&s.addRowsConcurrencyDroppedRows)
|
||||||
|
m.AddRowsConcurrencyCapacity = uint64(cap(addRowsConcurrencyCh))
|
||||||
|
m.AddRowsConcurrencyCurrent = uint64(len(addRowsConcurrencyCh))
|
||||||
|
|
||||||
var cs fastcache.Stats
|
var cs fastcache.Stats
|
||||||
s.tsidCache.UpdateStats(&cs)
|
s.tsidCache.UpdateStats(&cs)
|
||||||
m.TSIDCacheSize += cs.EntriesCount
|
m.TSIDCacheSize += cs.EntriesCount
|
||||||
|
@ -726,15 +742,24 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
|
||||||
// Limit the number of concurrent goroutines that may add rows to the storage.
|
// Limit the number of concurrent goroutines that may add rows to the storage.
|
||||||
// This should prevent from out of memory errors and CPU trashing when too many
|
// This should prevent from out of memory errors and CPU trashing when too many
|
||||||
// goroutines call AddRows.
|
// goroutines call AddRows.
|
||||||
t := timerpool.Get(addRowsTimeout)
|
|
||||||
select {
|
select {
|
||||||
case addRowsConcurrencyCh <- struct{}{}:
|
case addRowsConcurrencyCh <- struct{}{}:
|
||||||
timerpool.Put(t)
|
|
||||||
defer func() { <-addRowsConcurrencyCh }()
|
defer func() { <-addRowsConcurrencyCh }()
|
||||||
case <-t.C:
|
default:
|
||||||
timerpool.Put(t)
|
// Sleep for a while until giving up
|
||||||
return fmt.Errorf("Cannot add %d rows to storage in %s, since it is overloaded with %d concurrent writers. Add more CPUs or reduce load",
|
atomic.AddUint64(&s.addRowsConcurrencyLimitReached, 1)
|
||||||
len(mrs), addRowsTimeout, cap(addRowsConcurrencyCh))
|
t := timerpool.Get(addRowsTimeout)
|
||||||
|
select {
|
||||||
|
case addRowsConcurrencyCh <- struct{}{}:
|
||||||
|
timerpool.Put(t)
|
||||||
|
defer func() { <-addRowsConcurrencyCh }()
|
||||||
|
case <-t.C:
|
||||||
|
timerpool.Put(t)
|
||||||
|
atomic.AddUint64(&s.addRowsConcurrencyLimitTimeout, 1)
|
||||||
|
atomic.AddUint64(&s.addRowsConcurrencyDroppedRows, uint64(len(mrs)))
|
||||||
|
return fmt.Errorf("Cannot add %d rows to storage in %s, since it is overloaded with %d concurrent writers. Add more CPUs or reduce load",
|
||||||
|
len(mrs), addRowsTimeout, cap(addRowsConcurrencyCh))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add rows to the storage.
|
// Add rows to the storage.
|
||||||
|
|
Loading…
Reference in a new issue