From 9ea549ed24c084702e265367e48ba55c7ed7790a Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 8 Nov 2019 19:37:16 +0200 Subject: [PATCH] lib/storage: sync with cluster changes --- lib/storage/storage.go | 39 +++++++++++++++++++------------- lib/storage/storage_test.go | 44 ++++++++++++++++++------------------- 2 files changed, 46 insertions(+), 37 deletions(-) diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 3228c0a84e..e14307a64a 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -68,8 +68,8 @@ type Storage struct { prevHourMetricIDs atomic.Value // Pending MetricID values to be added to currHourMetricIDs. - pendingHourMetricIDsLock sync.Mutex - pendingHourMetricIDs *uint64set.Set + pendingHourEntriesLock sync.Mutex + pendingHourEntries *uint64set.Set stop chan struct{} @@ -125,7 +125,7 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) { hmPrev := s.mustLoadHourMetricIDs(hour-1, "prev_hour_metric_ids") s.currHourMetricIDs.Store(hmCurr) s.prevHourMetricIDs.Store(hmPrev) - s.pendingHourMetricIDs = &uint64set.Set{} + s.pendingHourEntries = &uint64set.Set{} // Load indexdb idbPath := path + "/indexdb" @@ -140,8 +140,10 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) { idbCurr.SetExtDB(idbPrev) s.idbCurr.Store(idbCurr) - // Initialize iidx. hmCurr shouldn't be used till now, + // Initialize iidx. hmCurr and hmPrev shouldn't be used till now, // so it should be safe initializing it inplace. + hmPrev.iidx = newInmemoryInvertedIndex() + hmPrev.iidx.MustUpdate(s.idb(), hmPrev.m) hmCurr.iidx = newInmemoryInvertedIndex() hmCurr.iidx.MustUpdate(s.idb(), hmCurr.m) @@ -520,6 +522,8 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs hour: hour, } } + + // Unmarshal header isFull := encoding.UnmarshalUint64(src) src = src[8:] hourLoaded := encoding.UnmarshalUint64(src) @@ -530,10 +534,12 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs hour: hour, } } + + // Unmarshal hm.m hmLen := encoding.UnmarshalUint64(src) src = src[8:] if uint64(len(src)) != 8*hmLen { - logger.Errorf("discarding %s, since it has broken body; got %d bytes; want %d bytes", path, len(src), 8*hmLen) + logger.Errorf("discarding %s, since it has broken hm.m data; got %d bytes; want %d bytes", path, len(src), 8*hmLen) return &hourMetricIDs{ hour: hour, } @@ -561,8 +567,12 @@ func (s *Storage) mustSaveHourMetricIDs(hm *hourMetricIDs, name string) { if hm.isFull { isFull = 1 } + + // Marshal header dst = encoding.MarshalUint64(dst, isFull) dst = encoding.MarshalUint64(dst, hm.hour) + + // Marshal hm.m dst = encoding.MarshalUint64(dst, uint64(hm.m.Len())) for _, metricID := range hm.m.AppendTo(nil) { dst = encoding.MarshalUint64(dst, metricID) @@ -718,8 +728,7 @@ func (mr *MetricRow) String() string { if err := mn.unmarshalRaw(mr.MetricNameRaw); err == nil { metricName = mn.String() } - return fmt.Sprintf("MetricName=%s, Timestamp=%d, Value=%f\n", - metricName, mr.Timestamp, mr.Value) + return fmt.Sprintf("MetricName=%s, Timestamp=%d, Value=%f\n", metricName, mr.Timestamp, mr.Value) } // Marshal appends marshaled mr to dst and returns the result. @@ -919,9 +928,9 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error // Fast path: the metricID is in the current hour cache. continue } - s.pendingHourMetricIDsLock.Lock() - s.pendingHourMetricIDs.Add(metricID) - s.pendingHourMetricIDsLock.Unlock() + s.pendingHourEntriesLock.Lock() + s.pendingHourEntries.Add(metricID) + s.pendingHourEntriesLock.Unlock() hm.iidx.AddMetricID(idb, metricID) } @@ -946,17 +955,17 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error func (s *Storage) updateCurrHourMetricIDs() { hm := s.currHourMetricIDs.Load().(*hourMetricIDs) - s.pendingHourMetricIDsLock.Lock() - newMetricIDs := s.pendingHourMetricIDs - s.pendingHourMetricIDs = &uint64set.Set{} - s.pendingHourMetricIDsLock.Unlock() + s.pendingHourEntriesLock.Lock() + newMetricIDs := s.pendingHourEntries + s.pendingHourEntries = &uint64set.Set{} + s.pendingHourEntriesLock.Unlock() hour := uint64(timestampFromTime(time.Now())) / msecPerHour if newMetricIDs.Len() == 0 && hm.hour == hour { // Fast path: nothing to update. return } - // Slow path: hm.m must be updated with non-empty s.pendingHourMetricIDs. + // Slow path: hm.m must be updated with non-empty s.pendingHourEntries. var m *uint64set.Set var iidx *inmemoryInvertedIndex isFull := hm.isFull diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index 4a339afb76..3b8b9abdf3 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -18,7 +18,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { var s Storage s.currHourMetricIDs.Store(&hourMetricIDs{}) s.prevHourMetricIDs.Store(&hourMetricIDs{}) - s.pendingHourMetricIDs = &uint64set.Set{} + s.pendingHourEntries = &uint64set.Set{} return &s } t.Run("empty_pedning_metric_ids_stale_curr_hour", func(t *testing.T) { @@ -53,8 +53,8 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig) } - if s.pendingHourMetricIDs.Len() != 0 { - t.Fatalf("unexpected s.pendingHourMetricIDs.Len(); got %d; want %d", s.pendingHourMetricIDs.Len(), 0) + if s.pendingHourEntries.Len() != 0 { + t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0) } }) t.Run("empty_pedning_metric_ids_valid_curr_hour", func(t *testing.T) { @@ -92,17 +92,17 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty) } - if s.pendingHourMetricIDs.Len() != 0 { - t.Fatalf("unexpected s.pendingHourMetricIDs.Len(); got %d; want %d", s.pendingHourMetricIDs.Len(), 0) + if s.pendingHourEntries.Len() != 0 { + t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0) } }) t.Run("nonempty_pending_metric_ids_stale_curr_hour", func(t *testing.T) { s := newStorage() - pendingHourMetricIDs := &uint64set.Set{} - pendingHourMetricIDs.Add(343) - pendingHourMetricIDs.Add(32424) - pendingHourMetricIDs.Add(8293432) - s.pendingHourMetricIDs = pendingHourMetricIDs + pendingHourEntries := &uint64set.Set{} + pendingHourEntries.Add(343) + pendingHourEntries.Add(32424) + pendingHourEntries.Add(8293432) + s.pendingHourEntries = pendingHourEntries hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmOrig := &hourMetricIDs{ @@ -122,8 +122,8 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) } } - if !hmCurr.m.Equal(pendingHourMetricIDs) { - t.Fatalf("unexpected hmCurr.m; got %v; want %v", hmCurr.m, pendingHourMetricIDs) + if !hmCurr.m.Equal(pendingHourEntries) { + t.Fatalf("unexpected hmCurr.m; got %v; want %v", hmCurr.m, pendingHourEntries) } if !hmCurr.isFull { t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, true) @@ -134,17 +134,17 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig) } - if s.pendingHourMetricIDs.Len() != 0 { - t.Fatalf("unexpected s.pendingHourMetricIDs.Len(); got %d; want %d", s.pendingHourMetricIDs.Len(), 0) + if s.pendingHourEntries.Len() != 0 { + t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0) } }) t.Run("nonempty_pending_metric_ids_valid_curr_hour", func(t *testing.T) { s := newStorage() - pendingHourMetricIDs := &uint64set.Set{} - pendingHourMetricIDs.Add(343) - pendingHourMetricIDs.Add(32424) - pendingHourMetricIDs.Add(8293432) - s.pendingHourMetricIDs = pendingHourMetricIDs + pendingHourEntries := &uint64set.Set{} + pendingHourEntries.Add(343) + pendingHourEntries.Add(32424) + pendingHourEntries.Add(8293432) + s.pendingHourEntries = pendingHourEntries hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmOrig := &hourMetricIDs{ @@ -166,7 +166,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { // Do not run other checks, since they may fail. return } - m := pendingHourMetricIDs.Clone() + m := pendingHourEntries.Clone() origMetricIDs := hmOrig.m.AppendTo(nil) for _, metricID := range origMetricIDs { m.Add(metricID) @@ -184,8 +184,8 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty) } - if s.pendingHourMetricIDs.Len() != 0 { - t.Fatalf("unexpected s.pendingHourMetricIDs.Len(); got %d; want %d", s.pendingHourMetricIDs.Len(), 0) + if s.pendingHourEntries.Len() != 0 { + t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0) } }) }