mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/storage: limit the maximum concurrency for data ingestion to GOMAXPROCS
Previously the concurrency has been limited to GOMAXPROCS*2. This had little sense, since every call to Storage.AddRows is bound to CPU, so the maximum ingestion bandwidth is achieved when the number of concurrent calls to Storage.AddRows is limited to the number of CPUs, i.e. to GOMAXPROCS.
This commit is contained in:
parent
929ad74de6
commit
7335743d57
1 changed files with 10 additions and 1 deletions
|
@ -1122,7 +1122,10 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
addRowsConcurrencyCh = make(chan struct{}, runtime.GOMAXPROCS(-1)*2)
|
// Limit the concurrency for data ingestion to GOMAXPROCS, since this operation
|
||||||
|
// is CPU bound, so there is no sense in running more than GOMAXPROCS concurrent
|
||||||
|
// goroutines on data ingestion path.
|
||||||
|
addRowsConcurrencyCh = make(chan struct{}, runtime.GOMAXPROCS(-1))
|
||||||
addRowsTimeout = 30 * time.Second
|
addRowsTimeout = 30 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1183,6 +1186,9 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
|
||||||
}
|
}
|
||||||
if s.getTSIDFromCache(&r.TSID, mr.MetricNameRaw) {
|
if s.getTSIDFromCache(&r.TSID, mr.MetricNameRaw) {
|
||||||
// Fast path - the TSID for the given MetricName has been found in cache and isn't deleted.
|
// Fast path - the TSID for the given MetricName has been found in cache and isn't deleted.
|
||||||
|
// There is no need in checking whether r.TSID.MetricID is deleted, since tsidCache doesn't
|
||||||
|
// contain MetricName->TSID entries for deleted time series.
|
||||||
|
// See Storage.DeleteMetrics code for details.
|
||||||
prevTSID = r.TSID
|
prevTSID = r.TSID
|
||||||
prevMetricNameRaw = mr.MetricNameRaw
|
prevMetricNameRaw = mr.MetricNameRaw
|
||||||
continue
|
continue
|
||||||
|
@ -1229,6 +1235,9 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
|
||||||
}
|
}
|
||||||
if s.getTSIDFromCache(&r.TSID, mr.MetricNameRaw) {
|
if s.getTSIDFromCache(&r.TSID, mr.MetricNameRaw) {
|
||||||
// Fast path - the TSID for the given MetricName has been found in cache and isn't deleted.
|
// Fast path - the TSID for the given MetricName has been found in cache and isn't deleted.
|
||||||
|
// There is no need in checking whether r.TSID.MetricID is deleted, since tsidCache doesn't
|
||||||
|
// contain MetricName->TSID entries for deleted time series.
|
||||||
|
// See Storage.DeleteMetrics code for details.
|
||||||
prevTSID = r.TSID
|
prevTSID = r.TSID
|
||||||
prevMetricNameRaw = mr.MetricNameRaw
|
prevMetricNameRaw = mr.MetricNameRaw
|
||||||
continue
|
continue
|
||||||
|
|
Loading…
Reference in a new issue