diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 758c97325..5e1f7308c 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -308,6 +308,22 @@ func registerStorageMetrics(strg *storage.Storage) { return float64(m().TooSmallTimestampRows) }) + metrics.NewGauge(`vm_concurrent_addrows_limit_reached_total`, func() float64 { + return float64(m().AddRowsConcurrencyLimitReached) + }) + metrics.NewGauge(`vm_concurrent_addrows_limit_timeout_total`, func() float64 { + return float64(m().AddRowsConcurrencyLimitTimeout) + }) + metrics.NewGauge(`vm_concurrent_addrows_dropped_rows_total`, func() float64 { + return float64(m().AddRowsConcurrencyDroppedRows) + }) + metrics.NewGauge(`vm_concurrent_addrows_capacity`, func() float64 { + return float64(m().AddRowsConcurrencyCapacity) + }) + metrics.NewGauge(`vm_concurrent_addrows_current`, func() float64 { + return float64(m().AddRowsConcurrencyCurrent) + }) + metrics.NewGauge(`vm_rows{type="storage/big"}`, func() float64 { return float64(tm().BigRowsCount) }) diff --git a/lib/storage/storage.go b/lib/storage/storage.go index ae6ec78c1..28ae1e5a5 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -68,6 +68,10 @@ type Storage struct { tooSmallTimestampRows uint64 tooBigTimestampRows uint64 + + addRowsConcurrencyLimitReached uint64 + addRowsConcurrencyLimitTimeout uint64 + addRowsConcurrencyDroppedRows uint64 } // OpenStorage opens storage on the given path with the given number of retention months. @@ -277,6 +281,12 @@ type Metrics struct { TooSmallTimestampRows uint64 TooBigTimestampRows uint64 + AddRowsConcurrencyLimitReached uint64 + AddRowsConcurrencyLimitTimeout uint64 + AddRowsConcurrencyDroppedRows uint64 + AddRowsConcurrencyCapacity uint64 + AddRowsConcurrencyCurrent uint64 + TSIDCacheSize uint64 TSIDCacheSizeBytes uint64 TSIDCacheRequests uint64 @@ -317,6 +327,12 @@ func (s *Storage) UpdateMetrics(m *Metrics) { m.TooSmallTimestampRows += atomic.LoadUint64(&s.tooSmallTimestampRows) m.TooBigTimestampRows += atomic.LoadUint64(&s.tooBigTimestampRows) + m.AddRowsConcurrencyLimitReached += atomic.LoadUint64(&s.addRowsConcurrencyLimitReached) + m.AddRowsConcurrencyLimitTimeout += atomic.LoadUint64(&s.addRowsConcurrencyLimitTimeout) + m.AddRowsConcurrencyDroppedRows += atomic.LoadUint64(&s.addRowsConcurrencyDroppedRows) + m.AddRowsConcurrencyCapacity = uint64(cap(addRowsConcurrencyCh)) + m.AddRowsConcurrencyCurrent = uint64(len(addRowsConcurrencyCh)) + var cs fastcache.Stats s.tsidCache.UpdateStats(&cs) m.TSIDCacheSize += cs.EntriesCount @@ -726,15 +742,24 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error { // Limit the number of concurrent goroutines that may add rows to the storage. // This should prevent from out of memory errors and CPU trashing when too many // goroutines call AddRows. - t := timerpool.Get(addRowsTimeout) select { case addRowsConcurrencyCh <- struct{}{}: - timerpool.Put(t) defer func() { <-addRowsConcurrencyCh }() - case <-t.C: - timerpool.Put(t) - return fmt.Errorf("Cannot add %d rows to storage in %s, since it is overloaded with %d concurrent writers. Add more CPUs or reduce load", - len(mrs), addRowsTimeout, cap(addRowsConcurrencyCh)) + default: + // Sleep for a while until giving up + atomic.AddUint64(&s.addRowsConcurrencyLimitReached, 1) + t := timerpool.Get(addRowsTimeout) + select { + case addRowsConcurrencyCh <- struct{}{}: + timerpool.Put(t) + defer func() { <-addRowsConcurrencyCh }() + case <-t.C: + timerpool.Put(t) + atomic.AddUint64(&s.addRowsConcurrencyLimitTimeout, 1) + atomic.AddUint64(&s.addRowsConcurrencyDroppedRows, uint64(len(mrs))) + return fmt.Errorf("Cannot add %d rows to storage in %s, since it is overloaded with %d concurrent writers. Add more CPUs or reduce load", + len(mrs), addRowsTimeout, cap(addRowsConcurrencyCh)) + } } // Add rows to the storage.