optimize Storage.updatePerDateData()

This commit is contained in:
Aliaksandr Valialkin 2021-02-09 02:51:40 +02:00
parent fda61e8e96
commit d56390b925

View file

@ -1592,6 +1592,12 @@ func (s *Storage) updatePerDateData(rows []rawRow) error {
prevTimestamp = r.Timestamp
}
metricID := r.TSID.MetricID
if metricID == prevMetricID && date == prevDate {
// Fast path for bulk import of multiple rows with the same (date, metricID) pairs.
continue
}
prevDate = date
prevMetricID = metricID
if hour == hm.hour {
// The r belongs to the current hour. Check for the current hour cache.
if hm.m.Has(metricID) {
@ -1620,22 +1626,14 @@ func (s *Storage) updatePerDateData(rows []rawRow) error {
}
// Slower path: check global cache for (date, metricID) entry.
if metricID == prevMetricID && date == prevDate {
// Fast path for bulk import of multiple rows with the same (date, metricID) pairs.
if s.dateMetricIDCache.Has(date, metricID) {
continue
}
prevDate = date
prevMetricID = metricID
if !s.dateMetricIDCache.Has(date, metricID) {
// Slow path: store the (date, metricID) entry in the indexDB.
// It is OK if the (date, metricID) entry is added multiple times to db
// by concurrent goroutines.
pendingDateMetricIDs = append(pendingDateMetricIDs, pendingDateMetricID{
date: date,
metricID: metricID,
})
}
// Slow path: store the (date, metricID) entry in the indexDB.
pendingDateMetricIDs = append(pendingDateMetricIDs, pendingDateMetricID{
date: date,
metricID: metricID,
})
}
if len(pendingNextDayMetricIDs) > 0 {
s.pendingNextDayMetricIDsLock.Lock()
@ -1668,22 +1666,9 @@ func (s *Storage) updatePerDateData(rows []rawRow) error {
is := idb.getIndexSearch(noDeadline)
defer idb.putIndexSearch(is)
var firstError error
prevMetricID = 0
prevDate = 0
for _, dateMetricID := range pendingDateMetricIDs {
date := dateMetricID.date
metricID := dateMetricID.metricID
if metricID == prevMetricID && date == prevDate {
// Fast path for bulk import of multiple rows with the same (date, metricID) pairs.
continue
}
prevDate = date
prevMetricID = metricID
if s.dateMetricIDCache.Has(date, metricID) {
// The metricID has been already added to per-day inverted index.
continue
}
ok, err := is.hasDateMetricID(date, metricID)
if err != nil {
if firstError == nil {
@ -1693,6 +1678,8 @@ func (s *Storage) updatePerDateData(rows []rawRow) error {
}
if !ok {
// The (date, metricID) entry is missing in the indexDB. Add it there.
// It is OK if the (date, metricID) entry is added multiple times to db
// by concurrent goroutines.
if err := is.storeDateMetricID(date, metricID); err != nil {
if firstError == nil {
firstError = fmt.Errorf("error when storing (date=%d, metricID=%d) in database: %w", date, metricID, err)