lib/storage: slow down concurrent searches when the number of concurrent inserts reaches the limit

This should improve data ingestion performance when heavy searches are executed

See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/648
See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/618
This commit is contained in:
Aliaksandr Valialkin 2020-08-07 08:47:32 +03:00
parent dd1d59f57a
commit 307281e922

View file

@ -1120,11 +1120,17 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
// Sleep for a while until giving up // Sleep for a while until giving up
atomic.AddUint64(&s.addRowsConcurrencyLimitReached, 1) atomic.AddUint64(&s.addRowsConcurrencyLimitReached, 1)
t := timerpool.Get(addRowsTimeout) t := timerpool.Get(addRowsTimeout)
// Prioritize data ingestion over concurrent searches.
storagepacelimiter.Search.Inc()
select { select {
case addRowsConcurrencyCh <- struct{}{}: case addRowsConcurrencyCh <- struct{}{}:
timerpool.Put(t) timerpool.Put(t)
storagepacelimiter.Search.Dec()
case <-t.C: case <-t.C:
timerpool.Put(t) timerpool.Put(t)
storagepacelimiter.Search.Dec()
atomic.AddUint64(&s.addRowsConcurrencyLimitTimeout, 1) atomic.AddUint64(&s.addRowsConcurrencyLimitTimeout, 1)
atomic.AddUint64(&s.addRowsConcurrencyDroppedRows, uint64(len(mrs))) atomic.AddUint64(&s.addRowsConcurrencyDroppedRows, uint64(len(mrs)))
return fmt.Errorf("cannot add %d rows to storage in %s, since it is overloaded with %d concurrent writers; add more CPUs or reduce load", return fmt.Errorf("cannot add %d rows to storage in %s, since it is overloaded with %d concurrent writers; add more CPUs or reduce load",