mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-02-09 15:27:11 +00:00
lib/storage: optimize data ingestion in the beginning of every hour
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1046
This commit is contained in:
parent
1a6eb0c3cf
commit
2dbb12563b
1 changed files with 14 additions and 6 deletions
|
@ -1670,6 +1670,8 @@ func (s *Storage) updatePerDateData(rows []rawRow) error {
|
||||||
projectID uint32
|
projectID uint32
|
||||||
}
|
}
|
||||||
var pendingDateMetricIDs []pendingDateMetricID
|
var pendingDateMetricIDs []pendingDateMetricID
|
||||||
|
var pendingNextDayMetricIDs []uint64
|
||||||
|
var pendingHourEntries []pendingHourMetricIDEntry
|
||||||
for i := range rows {
|
for i := range rows {
|
||||||
r := &rows[i]
|
r := &rows[i]
|
||||||
if r.Timestamp != prevTimestamp {
|
if r.Timestamp != prevTimestamp {
|
||||||
|
@ -1696,20 +1698,16 @@ func (s *Storage) updatePerDateData(rows []rawRow) error {
|
||||||
accountID: r.TSID.AccountID,
|
accountID: r.TSID.AccountID,
|
||||||
projectID: r.TSID.ProjectID,
|
projectID: r.TSID.ProjectID,
|
||||||
})
|
})
|
||||||
s.pendingNextDayMetricIDsLock.Lock()
|
pendingNextDayMetricIDs = append(pendingNextDayMetricIDs, metricID)
|
||||||
s.pendingNextDayMetricIDs.Add(metricID)
|
|
||||||
s.pendingNextDayMetricIDsLock.Unlock()
|
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
s.pendingHourEntriesLock.Lock()
|
|
||||||
e := pendingHourMetricIDEntry{
|
e := pendingHourMetricIDEntry{
|
||||||
AccountID: r.TSID.AccountID,
|
AccountID: r.TSID.AccountID,
|
||||||
ProjectID: r.TSID.ProjectID,
|
ProjectID: r.TSID.ProjectID,
|
||||||
MetricID: metricID,
|
MetricID: metricID,
|
||||||
}
|
}
|
||||||
s.pendingHourEntries = append(s.pendingHourEntries, e)
|
pendingHourEntries = append(pendingHourEntries, e)
|
||||||
s.pendingHourEntriesLock.Unlock()
|
|
||||||
if date == hmPrevDate && hmPrev.m.Has(metricID) {
|
if date == hmPrevDate && hmPrev.m.Has(metricID) {
|
||||||
// The metricID is already registered for the current day on the previous hour.
|
// The metricID is already registered for the current day on the previous hour.
|
||||||
continue
|
continue
|
||||||
|
@ -1736,6 +1734,16 @@ func (s *Storage) updatePerDateData(rows []rawRow) error {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if len(pendingNextDayMetricIDs) > 0 {
|
||||||
|
s.pendingNextDayMetricIDsLock.Lock()
|
||||||
|
s.pendingNextDayMetricIDs.AddMulti(pendingNextDayMetricIDs)
|
||||||
|
s.pendingNextDayMetricIDsLock.Unlock()
|
||||||
|
}
|
||||||
|
if len(pendingHourEntries) > 0 {
|
||||||
|
s.pendingHourEntriesLock.Lock()
|
||||||
|
s.pendingHourEntries = append(s.pendingHourEntries, pendingHourEntries...)
|
||||||
|
s.pendingHourEntriesLock.Unlock()
|
||||||
|
}
|
||||||
if len(pendingDateMetricIDs) == 0 {
|
if len(pendingDateMetricIDs) == 0 {
|
||||||
// Fast path - there are no new (date, metricID) entires in rows.
|
// Fast path - there are no new (date, metricID) entires in rows.
|
||||||
return nil
|
return nil
|
||||||
|
|
Loading…
Reference in a new issue