mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
app/vmstorage: add vm_concurrent_addrows_*
metrics for tracking concurrency for Storage.AddRows calls
Track also the number of dropped rows due to the exceeded timeout on concurrency limit for Storage.AddRows. This number is tracked in `vm_concurrent_addrows_dropped_rows_total`
This commit is contained in:
parent
5c9e48417a
commit
b8bb74ffc6
2 changed files with 47 additions and 6 deletions
|
@ -365,6 +365,22 @@ func registerStorageMetrics() {
|
|||
return float64(m().TooSmallTimestampRows)
|
||||
})
|
||||
|
||||
metrics.NewGauge(`vm_concurrent_addrows_limit_reached_total`, func() float64 {
|
||||
return float64(m().AddRowsConcurrencyLimitReached)
|
||||
})
|
||||
metrics.NewGauge(`vm_concurrent_addrows_limit_timeout_total`, func() float64 {
|
||||
return float64(m().AddRowsConcurrencyLimitTimeout)
|
||||
})
|
||||
metrics.NewGauge(`vm_concurrent_addrows_dropped_rows_total`, func() float64 {
|
||||
return float64(m().AddRowsConcurrencyDroppedRows)
|
||||
})
|
||||
metrics.NewGauge(`vm_concurrent_addrows_capacity`, func() float64 {
|
||||
return float64(m().AddRowsConcurrencyCapacity)
|
||||
})
|
||||
metrics.NewGauge(`vm_concurrent_addrows_current`, func() float64 {
|
||||
return float64(m().AddRowsConcurrencyCurrent)
|
||||
})
|
||||
|
||||
metrics.NewGauge(`vm_rows{type="storage/big"}`, func() float64 {
|
||||
return float64(tm().BigRowsCount)
|
||||
})
|
||||
|
|
|
@ -68,6 +68,10 @@ type Storage struct {
|
|||
|
||||
tooSmallTimestampRows uint64
|
||||
tooBigTimestampRows uint64
|
||||
|
||||
addRowsConcurrencyLimitReached uint64
|
||||
addRowsConcurrencyLimitTimeout uint64
|
||||
addRowsConcurrencyDroppedRows uint64
|
||||
}
|
||||
|
||||
// OpenStorage opens storage on the given path with the given number of retention months.
|
||||
|
@ -277,6 +281,12 @@ type Metrics struct {
|
|||
TooSmallTimestampRows uint64
|
||||
TooBigTimestampRows uint64
|
||||
|
||||
AddRowsConcurrencyLimitReached uint64
|
||||
AddRowsConcurrencyLimitTimeout uint64
|
||||
AddRowsConcurrencyDroppedRows uint64
|
||||
AddRowsConcurrencyCapacity uint64
|
||||
AddRowsConcurrencyCurrent uint64
|
||||
|
||||
TSIDCacheSize uint64
|
||||
TSIDCacheSizeBytes uint64
|
||||
TSIDCacheRequests uint64
|
||||
|
@ -317,6 +327,12 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
|
|||
m.TooSmallTimestampRows += atomic.LoadUint64(&s.tooSmallTimestampRows)
|
||||
m.TooBigTimestampRows += atomic.LoadUint64(&s.tooBigTimestampRows)
|
||||
|
||||
m.AddRowsConcurrencyLimitReached += atomic.LoadUint64(&s.addRowsConcurrencyLimitReached)
|
||||
m.AddRowsConcurrencyLimitTimeout += atomic.LoadUint64(&s.addRowsConcurrencyLimitTimeout)
|
||||
m.AddRowsConcurrencyDroppedRows += atomic.LoadUint64(&s.addRowsConcurrencyDroppedRows)
|
||||
m.AddRowsConcurrencyCapacity = uint64(cap(addRowsConcurrencyCh))
|
||||
m.AddRowsConcurrencyCurrent = uint64(len(addRowsConcurrencyCh))
|
||||
|
||||
var cs fastcache.Stats
|
||||
s.tsidCache.UpdateStats(&cs)
|
||||
m.TSIDCacheSize += cs.EntriesCount
|
||||
|
@ -722,15 +738,24 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
|
|||
// Limit the number of concurrent goroutines that may add rows to the storage.
|
||||
// This should prevent from out of memory errors and CPU trashing when too many
|
||||
// goroutines call AddRows.
|
||||
t := timerpool.Get(addRowsTimeout)
|
||||
select {
|
||||
case addRowsConcurrencyCh <- struct{}{}:
|
||||
timerpool.Put(t)
|
||||
defer func() { <-addRowsConcurrencyCh }()
|
||||
case <-t.C:
|
||||
timerpool.Put(t)
|
||||
return fmt.Errorf("Cannot add %d rows to storage in %s, since it is overloaded with %d concurrent writers. Add more CPUs or reduce load",
|
||||
len(mrs), addRowsTimeout, cap(addRowsConcurrencyCh))
|
||||
default:
|
||||
// Sleep for a while until giving up
|
||||
atomic.AddUint64(&s.addRowsConcurrencyLimitReached, 1)
|
||||
t := timerpool.Get(addRowsTimeout)
|
||||
select {
|
||||
case addRowsConcurrencyCh <- struct{}{}:
|
||||
timerpool.Put(t)
|
||||
defer func() { <-addRowsConcurrencyCh }()
|
||||
case <-t.C:
|
||||
timerpool.Put(t)
|
||||
atomic.AddUint64(&s.addRowsConcurrencyLimitTimeout, 1)
|
||||
atomic.AddUint64(&s.addRowsConcurrencyDroppedRows, uint64(len(mrs)))
|
||||
return fmt.Errorf("Cannot add %d rows to storage in %s, since it is overloaded with %d concurrent writers. Add more CPUs or reduce load",
|
||||
len(mrs), addRowsTimeout, cap(addRowsConcurrencyCh))
|
||||
}
|
||||
}
|
||||
|
||||
// Add rows to the storage.
|
||||
|
|
Loading…
Reference in a new issue