From 97f70ccda79668e955645befb581a2922370131c Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin <valyala@gmail.com> Date: Thu, 19 Dec 2019 15:12:50 +0200 Subject: [PATCH] lib/storage: optimize bulk import performance when multiple data points are inserted for the same time series This should speed up `/api/v1/import` and make it more scalable on multi-core systems. --- lib/storage/storage.go | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 1f93c4baf2..741c71bea0 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -803,6 +803,11 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra } rows = rows[:rowsLen+len(mrs)] j := 0 + var ( + // These vars are used for speeding up bulk imports of multiple adjancent rows for the same metricName. + prevTSID TSID + prevMetricNameRaw []byte + ) minTimestamp, maxTimestamp := s.tb.getMinMaxTimestamps() // Return only the last error, since it has no sense in returning all errors. var lastWarn error @@ -830,9 +835,17 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra r.Timestamp = mr.Timestamp r.Value = mr.Value r.PrecisionBits = precisionBits + if string(mr.MetricNameRaw) == string(prevMetricNameRaw) { + // Fast path - the current mr contains the same metric name as the previous mr, so it contains the same TSID. + // This path should trigger on bulk imports when many rows contain the same MetricNameRaw. + r.TSID = prevTSID + continue + } if s.getTSIDFromCache(&r.TSID, mr.MetricNameRaw) { if !dmis.Has(r.TSID.MetricID) { // Fast path - the TSID for the given MetricName has been found in cache and isn't deleted. + prevTSID = r.TSID + prevMetricNameRaw = mr.MetricNameRaw continue } } @@ -890,6 +903,12 @@ func (s *Storage) updatePerDateData(rows []rawRow, lastError error) error { var date uint64 var hour uint64 var prevTimestamp int64 + var ( + // These vars are used for speeding up bulk imports when multiple adjancent rows + // contain the same (metricID, date) pairs. + prevMatchedDate uint64 + prevMatchedMetricID uint64 + ) idb := s.idb() hm := s.currHourMetricIDs.Load().(*hourMetricIDs) for i := range rows { @@ -913,8 +932,14 @@ func (s *Storage) updatePerDateData(rows []rawRow, lastError error) error { } // Slower path: check global cache for (date, metricID) entry. + if metricID == prevMatchedMetricID && date == prevMatchedDate { + // Fast path for bulk import of multiple rows with the same (date, metricID) pairs. + continue + } if s.dateMetricIDCache.Has(date, metricID) { // The metricID has been already added to per-day inverted index. + prevMatchedDate = date + prevMatchedMetricID = metricID continue }