lib/storage: sync with cluster changes

This commit is contained in:
Aliaksandr Valialkin 2019-11-08 19:37:16 +02:00
parent 63b05c0b9f
commit 9ea549ed24
2 changed files with 46 additions and 37 deletions

View file

@ -68,8 +68,8 @@ type Storage struct {
prevHourMetricIDs atomic.Value
// Pending MetricID values to be added to currHourMetricIDs.
pendingHourMetricIDsLock sync.Mutex
pendingHourMetricIDs *uint64set.Set
pendingHourEntriesLock sync.Mutex
pendingHourEntries *uint64set.Set
stop chan struct{}
@ -125,7 +125,7 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) {
hmPrev := s.mustLoadHourMetricIDs(hour-1, "prev_hour_metric_ids")
s.currHourMetricIDs.Store(hmCurr)
s.prevHourMetricIDs.Store(hmPrev)
s.pendingHourMetricIDs = &uint64set.Set{}
s.pendingHourEntries = &uint64set.Set{}
// Load indexdb
idbPath := path + "/indexdb"
@ -140,8 +140,10 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) {
idbCurr.SetExtDB(idbPrev)
s.idbCurr.Store(idbCurr)
// Initialize iidx. hmCurr shouldn't be used till now,
// Initialize iidx. hmCurr and hmPrev shouldn't be used till now,
// so it should be safe initializing it inplace.
hmPrev.iidx = newInmemoryInvertedIndex()
hmPrev.iidx.MustUpdate(s.idb(), hmPrev.m)
hmCurr.iidx = newInmemoryInvertedIndex()
hmCurr.iidx.MustUpdate(s.idb(), hmCurr.m)
@ -520,6 +522,8 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
hour: hour,
}
}
// Unmarshal header
isFull := encoding.UnmarshalUint64(src)
src = src[8:]
hourLoaded := encoding.UnmarshalUint64(src)
@ -530,10 +534,12 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
hour: hour,
}
}
// Unmarshal hm.m
hmLen := encoding.UnmarshalUint64(src)
src = src[8:]
if uint64(len(src)) != 8*hmLen {
logger.Errorf("discarding %s, since it has broken body; got %d bytes; want %d bytes", path, len(src), 8*hmLen)
logger.Errorf("discarding %s, since it has broken hm.m data; got %d bytes; want %d bytes", path, len(src), 8*hmLen)
return &hourMetricIDs{
hour: hour,
}
@ -561,8 +567,12 @@ func (s *Storage) mustSaveHourMetricIDs(hm *hourMetricIDs, name string) {
if hm.isFull {
isFull = 1
}
// Marshal header
dst = encoding.MarshalUint64(dst, isFull)
dst = encoding.MarshalUint64(dst, hm.hour)
// Marshal hm.m
dst = encoding.MarshalUint64(dst, uint64(hm.m.Len()))
for _, metricID := range hm.m.AppendTo(nil) {
dst = encoding.MarshalUint64(dst, metricID)
@ -718,8 +728,7 @@ func (mr *MetricRow) String() string {
if err := mn.unmarshalRaw(mr.MetricNameRaw); err == nil {
metricName = mn.String()
}
return fmt.Sprintf("MetricName=%s, Timestamp=%d, Value=%f\n",
metricName, mr.Timestamp, mr.Value)
return fmt.Sprintf("MetricName=%s, Timestamp=%d, Value=%f\n", metricName, mr.Timestamp, mr.Value)
}
// Marshal appends marshaled mr to dst and returns the result.
@ -919,9 +928,9 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error
// Fast path: the metricID is in the current hour cache.
continue
}
s.pendingHourMetricIDsLock.Lock()
s.pendingHourMetricIDs.Add(metricID)
s.pendingHourMetricIDsLock.Unlock()
s.pendingHourEntriesLock.Lock()
s.pendingHourEntries.Add(metricID)
s.pendingHourEntriesLock.Unlock()
hm.iidx.AddMetricID(idb, metricID)
}
@ -946,17 +955,17 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error
func (s *Storage) updateCurrHourMetricIDs() {
hm := s.currHourMetricIDs.Load().(*hourMetricIDs)
s.pendingHourMetricIDsLock.Lock()
newMetricIDs := s.pendingHourMetricIDs
s.pendingHourMetricIDs = &uint64set.Set{}
s.pendingHourMetricIDsLock.Unlock()
s.pendingHourEntriesLock.Lock()
newMetricIDs := s.pendingHourEntries
s.pendingHourEntries = &uint64set.Set{}
s.pendingHourEntriesLock.Unlock()
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
if newMetricIDs.Len() == 0 && hm.hour == hour {
// Fast path: nothing to update.
return
}
// Slow path: hm.m must be updated with non-empty s.pendingHourMetricIDs.
// Slow path: hm.m must be updated with non-empty s.pendingHourEntries.
var m *uint64set.Set
var iidx *inmemoryInvertedIndex
isFull := hm.isFull

View file

@ -18,7 +18,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
var s Storage
s.currHourMetricIDs.Store(&hourMetricIDs{})
s.prevHourMetricIDs.Store(&hourMetricIDs{})
s.pendingHourMetricIDs = &uint64set.Set{}
s.pendingHourEntries = &uint64set.Set{}
return &s
}
t.Run("empty_pedning_metric_ids_stale_curr_hour", func(t *testing.T) {
@ -53,8 +53,8 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig)
}
if s.pendingHourMetricIDs.Len() != 0 {
t.Fatalf("unexpected s.pendingHourMetricIDs.Len(); got %d; want %d", s.pendingHourMetricIDs.Len(), 0)
if s.pendingHourEntries.Len() != 0 {
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0)
}
})
t.Run("empty_pedning_metric_ids_valid_curr_hour", func(t *testing.T) {
@ -92,17 +92,17 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty)
}
if s.pendingHourMetricIDs.Len() != 0 {
t.Fatalf("unexpected s.pendingHourMetricIDs.Len(); got %d; want %d", s.pendingHourMetricIDs.Len(), 0)
if s.pendingHourEntries.Len() != 0 {
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0)
}
})
t.Run("nonempty_pending_metric_ids_stale_curr_hour", func(t *testing.T) {
s := newStorage()
pendingHourMetricIDs := &uint64set.Set{}
pendingHourMetricIDs.Add(343)
pendingHourMetricIDs.Add(32424)
pendingHourMetricIDs.Add(8293432)
s.pendingHourMetricIDs = pendingHourMetricIDs
pendingHourEntries := &uint64set.Set{}
pendingHourEntries.Add(343)
pendingHourEntries.Add(32424)
pendingHourEntries.Add(8293432)
s.pendingHourEntries = pendingHourEntries
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hmOrig := &hourMetricIDs{
@ -122,8 +122,8 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
}
}
if !hmCurr.m.Equal(pendingHourMetricIDs) {
t.Fatalf("unexpected hmCurr.m; got %v; want %v", hmCurr.m, pendingHourMetricIDs)
if !hmCurr.m.Equal(pendingHourEntries) {
t.Fatalf("unexpected hmCurr.m; got %v; want %v", hmCurr.m, pendingHourEntries)
}
if !hmCurr.isFull {
t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, true)
@ -134,17 +134,17 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig)
}
if s.pendingHourMetricIDs.Len() != 0 {
t.Fatalf("unexpected s.pendingHourMetricIDs.Len(); got %d; want %d", s.pendingHourMetricIDs.Len(), 0)
if s.pendingHourEntries.Len() != 0 {
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0)
}
})
t.Run("nonempty_pending_metric_ids_valid_curr_hour", func(t *testing.T) {
s := newStorage()
pendingHourMetricIDs := &uint64set.Set{}
pendingHourMetricIDs.Add(343)
pendingHourMetricIDs.Add(32424)
pendingHourMetricIDs.Add(8293432)
s.pendingHourMetricIDs = pendingHourMetricIDs
pendingHourEntries := &uint64set.Set{}
pendingHourEntries.Add(343)
pendingHourEntries.Add(32424)
pendingHourEntries.Add(8293432)
s.pendingHourEntries = pendingHourEntries
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hmOrig := &hourMetricIDs{
@ -166,7 +166,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
// Do not run other checks, since they may fail.
return
}
m := pendingHourMetricIDs.Clone()
m := pendingHourEntries.Clone()
origMetricIDs := hmOrig.m.AppendTo(nil)
for _, metricID := range origMetricIDs {
m.Add(metricID)
@ -184,8 +184,8 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty)
}
if s.pendingHourMetricIDs.Len() != 0 {
t.Fatalf("unexpected s.pendingHourMetricIDs.Len(); got %d; want %d", s.pendingHourMetricIDs.Len(), 0)
if s.pendingHourEntries.Len() != 0 {
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0)
}
})
}