diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index eb1cd8550..f0b314988 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -360,6 +360,10 @@ func registerStorageMetrics(strg *storage.Storage) { return float64(m().AddRowsConcurrencyCurrent) }) + metrics.NewGauge(`vm_search_delays_total`, func() float64 { + return float64(m().SearchDelays) + }) + metrics.NewGauge(`vm_slow_row_inserts_total`, func() float64 { return float64(m().SlowRowInserts) }) diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index e15addbf5..9c991b80f 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -787,6 +787,8 @@ The required resources for query path: The higher number of scanned time series and lower `step` argument results in the higher RAM usage. * CPU cores: a CPU core per 30 millions of scanned data points per second. + This means that heavy queries that touch big number of time series (over 10K) and/or big number data points (over 100M) + usually require more CPU resources than tiny queries that touch a few time series with small number of data points. * Network usage: depends on the frequency and the type of incoming requests. Typical Grafana dashboards usually require negligible network bandwidth. @@ -975,7 +977,7 @@ The most interesting metrics are: of tweaking these flag values arises. * It is recommended upgrading to the latest available release from [this page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases), - since the issue could be already fixed there. + since the encountered issue could be already fixed there. * If VictoriaMetrics works slowly and eats more than a CPU core per 100K ingested data points per second, then it is likely you have too many active time series for the current amount of RAM. diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 9d33d890f..d13c093e3 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -342,6 +342,8 @@ type Metrics struct { AddRowsConcurrencyCapacity uint64 AddRowsConcurrencyCurrent uint64 + SearchDelays uint64 + SlowRowInserts uint64 SlowPerDayIndexInserts uint64 SlowMetricNameLoads uint64 @@ -400,6 +402,8 @@ func (s *Storage) UpdateMetrics(m *Metrics) { m.AddRowsConcurrencyCapacity = uint64(cap(addRowsConcurrencyCh)) m.AddRowsConcurrencyCurrent = uint64(len(addRowsConcurrencyCh)) + m.SearchDelays += atomic.LoadUint64(&searchDelays) + m.SlowRowInserts += atomic.LoadUint64(&s.slowRowInserts) m.SlowPerDayIndexInserts += atomic.LoadUint64(&s.slowPerDayIndexInserts) m.SlowMetricNameLoads += atomic.LoadUint64(&s.slowMetricNameLoads) @@ -856,8 +860,26 @@ func nextRetentionDuration(retentionMonths int) time.Duration { return deadline.Sub(t) } +var ( + searchTSIDsCondLock sync.Mutex + searchTSIDsCond = sync.NewCond(&searchTSIDsCondLock) + + searchDelays uint64 +) + // searchTSIDs returns sorted TSIDs for the given tfss and the given tr. func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]TSID, error) { + // Make sure that there are enough resources for processing the ingested data via Storage.AddRows + // before starting the query. + // This should prevent from data ingestion starvation when provessing heavy queries. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/291 . + searchTSIDsCondLock.Lock() + for len(addRowsConcurrencyCh) >= cap(addRowsConcurrencyCh) { + atomic.AddUint64(&searchDelays, 1) + searchTSIDsCond.Wait() + } + searchTSIDsCondLock.Unlock() + // Do not cache tfss -> tsids here, since the caching is performed // on idb level. tsids, err := s.idb().searchTSIDs(tfss, tr, maxMetrics) @@ -1069,7 +1091,6 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error { // goroutines call AddRows. select { case addRowsConcurrencyCh <- struct{}{}: - defer func() { <-addRowsConcurrencyCh }() default: // Sleep for a while until giving up atomic.AddUint64(&s.addRowsConcurrencyLimitReached, 1) @@ -1077,7 +1098,6 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error { select { case addRowsConcurrencyCh <- struct{}{}: timerpool.Put(t) - defer func() { <-addRowsConcurrencyCh }() case <-t.C: timerpool.Put(t) atomic.AddUint64(&s.addRowsConcurrencyLimitTimeout, 1) @@ -1093,6 +1113,10 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error { rr.rows, err = s.add(rr.rows, mrs, precisionBits) putRawRows(rr) + // Notify blocked goroutines at Storage.searchTSIDs that they may proceed with their work. + <-addRowsConcurrencyCh + searchTSIDsCond.Signal() + return err }