optimize Storage.updatePerDateData()

This commit is contained in:
Aliaksandr Valialkin 2021-02-09 02:51:40 +02:00
parent ea328b7391
commit 9ed7789fef

View file

@ -1680,6 +1680,12 @@ func (s *Storage) updatePerDateData(rows []rawRow) error {
prevTimestamp = r.Timestamp prevTimestamp = r.Timestamp
} }
metricID := r.TSID.MetricID metricID := r.TSID.MetricID
if metricID == prevMetricID && date == prevDate {
// Fast path for bulk import of multiple rows with the same (date, metricID) pairs.
continue
}
prevDate = date
prevMetricID = metricID
if hour == hm.hour { if hour == hm.hour {
// The r belongs to the current hour. Check for the current hour cache. // The r belongs to the current hour. Check for the current hour cache.
if hm.m.Has(metricID) { if hm.m.Has(metricID) {
@ -1715,24 +1721,16 @@ func (s *Storage) updatePerDateData(rows []rawRow) error {
} }
// Slower path: check global cache for (date, metricID) entry. // Slower path: check global cache for (date, metricID) entry.
if metricID == prevMetricID && date == prevDate { if s.dateMetricIDCache.Has(date, metricID) {
// Fast path for bulk import of multiple rows with the same (date, metricID) pairs.
continue continue
} }
prevDate = date // Slow path: store the (date, metricID) entry in the indexDB.
prevMetricID = metricID pendingDateMetricIDs = append(pendingDateMetricIDs, pendingDateMetricID{
date: date,
if !s.dateMetricIDCache.Has(date, metricID) { metricID: metricID,
// Slow path: store the (date, metricID) entry in the indexDB. accountID: r.TSID.AccountID,
// It is OK if the (date, metricID) entry is added multiple times to db projectID: r.TSID.ProjectID,
// by concurrent goroutines. })
pendingDateMetricIDs = append(pendingDateMetricIDs, pendingDateMetricID{
date: date,
metricID: metricID,
accountID: r.TSID.AccountID,
projectID: r.TSID.ProjectID,
})
}
} }
if len(pendingNextDayMetricIDs) > 0 { if len(pendingNextDayMetricIDs) > 0 {
s.pendingNextDayMetricIDsLock.Lock() s.pendingNextDayMetricIDsLock.Lock()
@ -1771,22 +1769,9 @@ func (s *Storage) updatePerDateData(rows []rawRow) error {
is := idb.getIndexSearch(0, 0, noDeadline) is := idb.getIndexSearch(0, 0, noDeadline)
defer idb.putIndexSearch(is) defer idb.putIndexSearch(is)
var firstError error var firstError error
prevMetricID = 0
prevDate = 0
for _, dateMetricID := range pendingDateMetricIDs { for _, dateMetricID := range pendingDateMetricIDs {
date := dateMetricID.date date := dateMetricID.date
metricID := dateMetricID.metricID metricID := dateMetricID.metricID
if metricID == prevMetricID && date == prevDate {
// Fast path for bulk import of multiple rows with the same (date, metricID) pairs.
continue
}
prevDate = date
prevMetricID = metricID
if s.dateMetricIDCache.Has(date, metricID) {
// The metricID has been already added to per-day inverted index.
continue
}
is.accountID = dateMetricID.accountID is.accountID = dateMetricID.accountID
is.projectID = dateMetricID.projectID is.projectID = dateMetricID.projectID
ok, err := is.hasDateMetricID(date, metricID) ok, err := is.hasDateMetricID(date, metricID)
@ -1799,6 +1784,8 @@ func (s *Storage) updatePerDateData(rows []rawRow) error {
} }
if !ok { if !ok {
// The (date, metricID) entry is missing in the indexDB. Add it there. // The (date, metricID) entry is missing in the indexDB. Add it there.
// It is OK if the (date, metricID) entry is added multiple times to db
// by concurrent goroutines.
if err := is.storeDateMetricID(date, metricID); err != nil { if err := is.storeDateMetricID(date, metricID); err != nil {
if firstError == nil { if firstError == nil {
firstError = fmt.Errorf("error when storing (date=%d, metricID=%d) in database: %w", date, metricID, err) firstError = fmt.Errorf("error when storing (date=%d, metricID=%d) in database: %w", date, metricID, err)